In [1]:
from threading import Thread
from multiprocessing import Pool

# Parallel Map
### Restriction, don't take the short cut of using Pool packages

In [95]:
class ThreadWithReturnValue(Thread):
    "This subclass is created to get results from a thread,without needing to modify mutable objects"
    def __init__(self, group=None, target=None, name=None,
                args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        #print(type(self._target))
        if self._target is not None:
            if type(self._args) == str:
                self._return = self._target(self._args,
                                                **self._kwargs)
            else:
                self._return = self._target(*self._args,
                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
    
class MAP_REDUCE:
    """_summary_
    This is good for jobs that are I/O bound,makes no sense for computation bound jobs,
    Only use this class to simulate MAP-REDUCE process, other wise simply use functools.map,
    Or, Multiprocessing Pool could be helpful 
    __Restrictions__
    f is assumed to out put one single argument 
    lst is a list of arguments, of which are wished to be pushed into f 
    """
    def __init__(self,f,lst,thread = 10,max_threads = 99,REDUCE=None):
        assert thread < 100, 'thread can not excede 100'
        self.f = f
        self.lst = lst
        self.nthread = thread
        self.max_threads = max_threads
        #self.Threads = {i:None for i in range(len(lst))}
        self.GEN()
        self.Generate_Threads()
        #Reducr 
        if REDUCE is None:
            from functools import reduce
            REDUCE = lambda x: reduce(list.__add__,x)
        self.REDUCE = REDUCE
        
    def GEN(self):
        if len(self.lst) > self.max_threads:
            _n = self.nthread
        else:
            _n = len(self.lst)
        self.Threads = {i:None for i in range(_n)}  
        self.out = {i:None for i in range(_n)}
        
    def FAIL_SAFE(self,RETURN = False):
        "Check if f(arg) is going to throw error"
        out = self.f(self.lst[0])
        if RETURN :
            return out
        
    def Generate_Threads(self):
        from math import ceil
        if len(self.lst) > self.max_threads:
            
            self.workload = dict()
            #1.Devide Job into self.nthread portions 
            k = ceil(len(self.lst)/self.nthread)
            idx = 0
            for i in range(self.nthread):
                self.workload[i] = self.lst[idx:idx+k]
                idx += k 
            #2.Assign Job to threads 
            def f_iter(f,work_load):
                return [f(arg) for arg in work_load]
            
            for i in range(self.nthread):
                arg = self.workload[i]
                t = ThreadWithReturnValue(target=f_iter, args=(self.f,(arg))) 
                self.Threads[i] = t 
                
        else:#Generate a thread for each computation
            for idx,arg in enumerate(self.lst):
                t = ThreadWithReturnValue(target=self.f, args=(arg)) 
                self.Threads[idx] = t 
            
    def Run_jobs(self):
        for t_key in self.Threads:
            trd = self.Threads[t_key]
            trd.start()
        for t_key in self.Threads:
            trd = self.Threads[t_key]
            self.out[t_key] = trd.join()
            
    def OUT(self):
        from functools import reduce
        self.FAIL_SAFE()
        try:
            self.Run_jobs()
        except RuntimeError:
            pass 
        return self.REDUCE(list(self.out.values()))
    
    def __iter__(self):
        for o in self.OUT():
            yield o 

In [96]:
#Test Unit
def f(x):
    return x*x

MR = MAP_REDUCE(f,range(1000))
assert list(map(f,range(1000))) == MR.OUT(),'MR should generate same results as map'

# Classic Example, word count

In [97]:
#Word count example
with open('word.txt','r') as INfile:
    word_str = [l.strip() for l in INfile.readlines()]

In [98]:
def line_count(STR):
    lst_words = STR.split(' ')
    out_dict = {}
    for word in lst_words:
        if word not in out_dict:
            out_dict[word] = 1
        else:
            out_dict[word] += 1 
    return out_dict

def word_count_merger(c1,c2):
    out = c1 
    for word in c2:
        if word in c1:
            out[word] += c2[word]
        else:
            out[word] = c2[word]
    return out 

def MERGER(list_wc):
    from functools import reduce
    return reduce(word_count_merger,list_wc)

In [99]:
MR_word = MAP_REDUCE(line_count,lst = word_str,REDUCE = MERGER)

In [None]:
MR_word.OUT()

In [87]:
MR_word.FAIL_SAFE(True)

{'word': 1,
 'count': 1,
 'from': 1,
 'Wikipedia': 1,
 'the': 1,
 'free': 1,
 'encyclopedia': 1}

In [74]:
MR_word.out

{0: None,
 1: None,
 2: None,
 3: None,
 4: None,
 5: None,
 6: None,
 7: None,
 8: None,
 9: None,
 10: None,
 11: None,
 12: None,
 13: None,
 14: None,
 15: None,
 16: None,
 17: None,
 18: None,
 19: None,
 20: None,
 21: None,
 22: None,
 23: None,
 24: None,
 25: None,
 26: None,
 27: None,
 28: None,
 29: None,
 30: None,
 31: None,
 32: None,
 33: None,
 34: None,
 35: None,
 36: None,
 37: None,
 38: None,
 39: None,
 40: None,
 41: None,
 42: None,
 43: None}