In [27]:
#https://www.geeksforgeeks.org/python/multiprocessing-python-set-1/
#https://docs.python.org/3/library/multiprocessing.html
#https://docs.python.org/3/library/os.html
#https://stackoverflow.com/questions/18300785/python-top-n-word-count-why-multiprocess-slower-then-single-process
#https://docs.python.org/3/library/concurrent.futures.html

In [28]:
import time  #Used to measure execution time (latency) for performance comparison.
import multiprocessing  #Required to detect the number of available CPU cores.
import os  #Used for file system operations, such as checking if the text file exists.
from collections import Counter  #specialized dictionary for efficient counting and merging of word frequencies.
from concurrent.futures import ProcessPoolExecutor  #Manages a pool of processes for parallel execution.
# Importing relevant libraries for distributed processing and file handling.

try:
    # Importing the worker function from an external file to avoid 'BrokenProcessPool' errors in Windows/Jupyter.
    from my_worker import clean_and_count
    print("Success: Imported 'clean_and_count' from external file.")
except ImportError:
    print("Error: Could not find 'my_worker.py'. Make sure the file is in the same directory.")

# Define the filename 
FILENAME = 'pg2701.txt'


Success: Imported 'clean_and_count' from external file.


In [29]:
full_text = ""
# Check if the file exists at the specified path
if os.path.exists(FILENAME):
    try:
        # Open the file using 'utf-8-sig' to handle Byte Order Mark (BOM) correctly
        with open(FILENAME, 'r', encoding='utf-8-sig') as f:
            original_text = f.read()
            
            # Multiply the text by 100 to simulate a larger dataset.
            # This increases the CPU load to better demonstrate the performance
            # benefits of multiprocessing vs single-threading.
            full_text = original_text * 100
            
        print(f"File loaded successfully. Total characters: {len(full_text)}")
        
    except Exception as e:
        print(f"error: Could not read the file: {e}")
else:
    print(f"Error: File '{FILENAME}' not found. Please check the filename and directory.")

File loaded successfully. Total characters: 123822500


In [30]:
if full_text:
    # Determine the number of available CPU cores (logical processors)
    # This defines how many parallel processes we can efficiently run.
    num_workers = multiprocessing.cpu_count()
    
    # Calculate the size of each chunk to ensure an even workload distribution.
    # We use max() to ensure chunk_size is at least 1 to avoid division errors.
    chunk_size = max(len(full_text) // num_workers, 1)
    
    # Create the list of chunks (Data Partitioning)
    # The large text string is sliced into smaller segments based on the calculated size.
    chunks = [full_text[i:i + chunk_size] for i in range(0, len(full_text), chunk_size)]
    
    print(f"Number of CPU cores available: {num_workers}")
    print(f"Data partitioning complete: Text split into {len(chunks)} chunks ready for processing.")

else:
    print("No text loaded.")

Number of CPU cores available: 12
Data partitioning complete: Text split into 13 chunks ready for processing.


In [31]:
start_seq = time.time()
# Initialize an empty Counter to aggregate word frequencies
seq_counts = Counter()

# Loop through each data chunk sequentially
# In a single-threaded approach, the CPU processes one chunk at a time.
# The code blocks (waits) until the current chunk is finished before moving to the next.
for chunk in chunks:
    # Clean the data and count words for the current chunk
    seq_counts.update(clean_and_count(chunk))

end_seq = time.time()
time_seq = end_seq - start_seq

print(f"Single-thread duration: {time_seq:.4f} seconds")

Single-thread duration: 19.5984 seconds


In [32]:
start_multi = time.time()
# Initialize a counter to aggregate results from all processes
multi_counts = Counter()
# The 'if __name__' block is mandatory on Windows to prevent recursive process spawning.
if __name__ == '__main__':
    # ProcessPoolExecutor manages a pool of independent worker processes.
    # Unlike threading, this bypasses Python's Global Interpreter Lock (GIL),
    # allowing true parallel execution on multiple CPU cores.
    with ProcessPoolExecutor(max_workers=num_workers) as executor:
        # The Map Step:
        # Distribute the 'clean_and_count' function across the list of data chunks.
        # The executor automatically assigns chunks to available idle workers.
        results = executor.map(clean_and_count, chunks)
        # The Reduce/Aggregation Step:
        # Collect results from workers as they finish and merge them into the main counter.
        for res in results:
            multi_counts.update(res)

end_multi = time.time()
time_multi = end_multi - start_multi

print(f"Multiprocessing duration: {time_multi:.4f} seconds")

Multiprocessing duration: 3.2966 seconds
