In [8]:
# Importing required libraries

import time
from multiprocessing import Pool, cpu_count   
from functools import partial
from collections import Counter
import requests

In [9]:

# Multiprocessing parallel version 

def load_text_from_url(url):
    """
    Fetches the content of a text file from a given URL.
    """
    response = requests.get(url)
    response.raise_for_status()  # Raise an error if the request fails
    return response.text
  
# Fetch Books
book_1 = load_text_from_url("https://raw.githubusercontent.com/MonaTlili/GIK2Q3_Assignment1/main/Around%20the%20World%20in%20Eighty%20Days.txt")
book_2 = load_text_from_url("https://raw.githubusercontent.com/MonaTlili/GIK2Q3_Assignment1/main/A%20Journey%20to%20the%20Centre%20of%20the%20Earth.txt")
book_3 = load_text_from_url("https://raw.githubusercontent.com/MonaTlili/GIK2Q3_Assignment1/main/Twenty%20Thousand%20Leagues%20under%20the%20Sea.txt")

# Put books into list
books = [book_1, book_2, book_3]

# List for holding cleaned books
cleaned_books = []

# Split the texts in the books into sentences for smaller chunks of data and store them in the list
for book in books:
    book = book.replace('!', '.').replace('?', '.')
    book = book.split('.')
    cleaned_books.append(book)
    
# Use list comprehension to create one list with all the cleaned chunks from all books
cleaned_chunks = [chunk for book in cleaned_books for chunk in book]

In [10]:
# Misra-Gries Algorithm for Word Count
def misra_gries_word_count(words, k):
    """
    Misra-Gries algorithm to count word frequencies approximately.

    :param words: List of words to process
    :param k: Parameter determining the number of counters (frequency threshold)
    :return: Approximate word counts as a dictionary
    """

    counters = {}

    # First pass: count words with a maximum of k-1 counters
    for word in words:
        if word in counters:
            counters[word] += 1
        elif len(counters) < k - 1:
            counters[word] = 1
        else:
            # Decrement all counters if a new word can't be added
            for key in list(counters.keys()):
                counters[key] -= 1
                if counters[key] == 0:
                    del counters[key]

    # Second pass: refine the counts for the words in the counters
    refined_counts = {word: 0 for word in counters}
    for word in words:
        if word in refined_counts:
            refined_counts[word] += 1

    return refined_counts

In [13]:
# Function for splitting and processing the chunks for the Misra-Gries algortihm
# Needs to be defines like this so that eac core can use this function, can't be a lambda function
def process_chunk_for_misra_gries(chunk, k):
    # split chunks into words
    words = chunk.split()
    return misra_gries_word_count(words, k)

In [17]:
# Parallel MapReduce implementation
def parallel_wordcount(text_chunks):
    k = 3
     # Start timer
    start_time = time.perf_counter() 
    
    with Pool(cpu_count()) as pool:
        # Run Misra-Gries on each chunk in parallel
        word_counts = pool.starmap(process_chunk_for_misra_gries, [(chunk, k) for chunk in text_chunks])
    
    # Combine results from all chunks into a single dictionary
    combined_counts = Counter()
    for count in word_counts:
        combined_counts.update(count)
    
    # Sort and select the top 20 words
    top_20_counts = combined_counts.most_common(20)

    # Stop timer
    end_time = time.perf_counter()  
    duration = end_time - start_time

    # Measure performance
    # Top 20 words 
    print("Top 20 Words (Parallel Word Count):")
    for word, count in top_20_counts:
        print(f"{word}: {count}")
    print(f"\nParallel Duration: {duration:.6f} seconds")

    return dict(top_20_counts)

In [None]:
# Run program and measure performance

parallel_counts = parallel_wordcount(cleaned_chunks)