#### Exercise
Write a python program that takes as input the name of a txt file and creates another file
having the number of occurrences of each word in the original file in descending order.   
E.g.:  
the 563  
of 431  
to 320  
it 210  
that 109  
...  
Your program should distribute the computation by having 10 worker threads simultaneously
building the resulting list.

### Approach
In this exercise, I follow a comparison between multiprocessing, multithreaded and serial approach to calculate text processing time.   
I avoid using libraries such as Pandas and try to find the most similar approach between each methodology for accurate comparison.   

#### Imports

In [3]:
import multiprocessing as mp
import re, string
from collections import Counter
import os
from itertools import groupby

In [4]:
nr_threads = 10
#text_filepath="input/sample100.txt"
text_filepath="input/sample20.txt"

#### Functions for Multiprocess

In [5]:
def cleanse_text( text ):
    """
    Clean text from punctuation and normalize words to lower
    
    Params: 
        text (str): a string of text
    
    Returns:   
        clean_text (List[str]): list of all words in the text sanitized 
    """
    lower_text = text.lower().split(' ')
    clean_text = [re.sub('[\W_]+', '', word) for word in lower_text ]
    return clean_text
    
    
def chunk_it( filepath, nr_chunks ) :
    """
    Divide text file into chunks by line to distribute to processing nodes.
    
    Params: 
        filepath (str): absolute path of .txt file
        nr_chunks (int): number of partitions to divide text into 
    
    Returns:
        line_chunks (List[List[str]]) : A nested list of strings containing the lines for each node to process.
    """
    
    file = open(filepath, "r")
    sample_text = file.readlines()
    
    import math
    line_length = math.ceil(len(sample_text) / nr_chunks)

    line_chunks = []
    
    for i in range(nr_chunks):
        line_chunks.append(sample_text[i*line_length:(i+1)*line_length])
    
    print( "We have {} chunks to process".format( len(line_chunks) ) )
    
    return line_chunks


def count_occurrences( lines ):
    """
    Clean text and count each word in the text, iterating per line provided.
    
    Params: 
        lines (List[str]): list containing the lines within the text
        
    Returns:
        count (str, int): count of occurrences per word within the chunk of text 
    """
    
    print( "Process running: " + str(os.getpid()) + "\n" ) # print process id

    cleansed = []
    for line in lines:
        clean_line = cleanse_text(line)
        for word in clean_line:
            cleansed.append(word)
        
    count = Counter(cleansed).most_common()
    return count

#### Functions for regular count

In [30]:
def count_occurrences_single( text ):
    """
    Cleanse and count words within provided text.
    
    Params:
        text (str): the text to evaluate
        
    Returns:
        count (str, int): the count of occurrences per word
    """
    
    print( "Process running: " + str(os.getpid()) + "\n" ) # print process id

    cleansed = []
    clean_text = cleanse_text(text)
    for word in clean_text:
        cleansed.append(word)
        
    count = Counter(cleansed).most_common()
    return count

#### Execute and time single thread (serial)

In [31]:
%%time
"""
In this section we execute and time the regular counting process
to establish a baseline.
"""
file = open(text_filepath, "r")
sample_text = file.read()
word_count = count_occurrences_single( sample_text )

Process running: 36

CPU times: user 5.92 s, sys: 104 ms, total: 6.03 s
Wall time: 6.02 s


#### Execute and time multi process

Followed a similar approach to map and reduce in order to achieve paralellization accross 10 processes: 
1. Divide data to distribute to each node;
2. Count and aggregate in each separate process;
3. Aggregate sum results in main process;
4. Sort and save in destination;

In [19]:
%%time
def main():
    """
    Main multiprocessing function to divide text according number of partitions, 
    map to each count process and 
    reduce by summing occurrences per word.
    """
    lines_array = chunk_it( text_filepath, nr_threads)
    results = pool.map(count_occurrences, lines_array)
        
    words = [ entry for line in results for entry in line]

    sorted_words = groupby(sorted(words), key = lambda x: x[0])
    
    word_aggregate = [ [key, sum(value for _, value in group)] for key, group in sorted_words ]
    
    sorted_counts = sorted(word_aggregate, key = lambda x: x[1], reverse=True)
    
if __name__ == '__main__':
    pool = mp.Pool(processes=nr_threads)
    main()

Process running: 634

Process running: 635

Process running: 636

Process running: 637

Process running: 638
Process running: 639
Process running: 640


Process running: 642
Process running: 641



Process running: 643

We have 10 chunks to process
CPU times: user 45.4 ms, sys: 112 ms, total: 157 ms
Wall time: 1.61 s


#### Multithreaded approach
To compare with previous approach - subject to GIL and not truly parallel processing, only multithread (all in same process).  
https://medium.com/contentsquare-engineering-blog/multithreading-vs-multiprocessing-in-python-ece023ad55a

In [37]:
%%time
import concurrent.futures

def main():
    """
    Multithreading approach function to compare with prior .
    """
    lines_array = chunk_it( text_filepath, nr_threads)

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        results = executor.map(count_occurrences, lines_array)

    words = [ entry for line in results for entry in line]

    sorted_words = groupby(sorted(words), key = lambda x: x[0])
    
    word_aggregate = [ [key, sum(value for _, value in group)] for key, group in sorted_words ]
    
    sorted_counts = sorted(word_aggregate, key = lambda x: x[1], reverse=True)
    
if __name__ == '__main__':
    main()

We have 10 chunks to process
Process running: 36
Process running: 36


Process running: 36

Process running: 36
Process running: 36


Process running: 36

Process running: 36

Process running: 36

Process running: 36

Process running: 36

CPU times: user 9.26 s, sys: 203 ms, total: 9.47 s
Wall time: 9.22 s


#### Write wordcount file

In [None]:
def write_file ( destination_filepath , wordcount_list )
    with open( destination_filepath, 'w+') as f:
        for word, value in wordcount_list:
            f.write("{} , {}\n".format(word, value))

### Conclusion

The serial (regular) approach served as the baseline, and for text processing the Multiprocess was considerably faster, due to paralellism and load distribution.  
On the other hand, Multithread approach is detrimental in this case - better for concurrency (IO problems), not paralellism, single process. It even adds overhead when compared to baseline, due to thread spawn and communication.     