In [1]:
class InputData:
    def read(self):
        raise NotImplementedError

In [21]:
class PathInputData(InputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
    
    def read(self):
        with open(self.path, 'r') as f:
            return f.read()

In [3]:
class Worker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None
    
    def map(self):
        raise NotImplementedError
        
    def reduce(self, other):
        raise NotImplementedError
        

In [4]:
class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
    def reduce(self, other):
        self.result += other.result

In [24]:
import os 
def generate_inputs(data_dir):
    for name in os.listdir(data_dir):
        print(name)
        yield PathInputData(os.path.join(data_dir, name))

In [16]:
def create_workers(input_list):
    workers = []
    for each in input_list:
        workers.append(LineCountWorker(each))
    return workers

In [18]:
from threading import Thread
def execute(workers):
    threads = [Thread(target=w.map ) for w in workers]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()
    first, *rest = workers
    for worker in rest:
        first.reduce(worker)
    return first.result

In [19]:
def mapreduce(data_dir):
    inputs = generate_inputs(data_dir)
    workers = create_workers(inputs)
    return execute(workers)
    

In [28]:
data_dir = './data'
result = mapreduce(data_dir)
print(result)

my_numbers.txt
test1.txt
test2.txt
test3.txt
23
