In [1]:
#
# 객체를 범용으로 생성하려면 @classmethod 다향성을 이용할 것
#

# 파이썬에서는 객체가 다형성을 지원한다.
# 클래스도 다형성을 지원한다.
# 다형성은 계층 구조에 속한 여러 클래스가 자체의 메서드를 독립적인 버전으로 구현하는 방식이다.
  # => 특정 클래스를 상속받아서 그 클래스에 구현되어있는 함수를 오버라이드해서 이용하는 것이다.
# 다형성을 이용하면 여러 클래스가 같은 인터페이스나 추상 클래스(ABCclass)를 사용하면서도 다른 기능을 제공할 수 있다.


In [2]:
# 맵리듀스 구현을 작성할 때 입력 데이터를 표현할 공통 클래스가 필요하다고 하자.
# 다음은 서브클래스에서 정의해야 하는 read메서드가 있는 입력 데이터 클래스다.
class InputData:
    def read(self):
        raise NotImplementedError

In [3]:
# 다음은 디스크에 있는 파일에서 데이터를 읽어오도록 구현한 InputData의 서브클래스이다.
class PathInputData(InputData):
    def __init__(self, path):
        super().__init__()
        self.path = path
        
    def read(self):
        return open(self.path).read()

In [4]:
# PathInputData 같은 InputData의 서브클래스는 몇 개든 있을 수 있다.
# 각 서브 클래스에서는 처리할 바이트 데이터를 반환하는 표준 인터페이스인 read를 구현할 것이다.
# 다른 InputData 서브클래스는 네트워크에서 데이터를 읽어오거나 데이터의 압축을 해제하는 기능등을 할 수 있다.
class Worker:
    def __init__(self, input_data):
        self.input_data = input_data
        self.result = None
        
    def map(self):
        raise NotImplementedError
        
    def reduce(self, other):
        raise NotImplementedError

In [5]:
# 다음은 적용하려는 특정 맵리듀스 함수를 구현한 Worker의 구체 서브클래스다
class LineCountWorker(Worker):
    def map(self):
        data = self.input_data.read()
        self.result = data.count('\n')
        
    def reduce(self, other):
        self.result += other.result

In [6]:
# 잘 작동할 것처럼 보이지만 엄청난 문제에 직면한다.
# 이 모든 코드 조각을 무엇으로 연결해야 한단 말인가?
# 적절히 추상화한 클래스들이지만 일단 객체를 생성한 후에나 유용한 것들이다.
# 무엇으로 객체를 만들고 맵리듀스를 조율할까?

# 헬퍼 함수로 직접 객체를 만들고 연결하는 방법이 있다.
def generate_inputs(data_dir):
    for name in os.listdir(data_dir):
        yield PathInputData(os.path.join(data_dir, name))

In [7]:
# 그 후, generate_inputs에서 리턴 된 InputData 객체를 사용하는
# LineCountWorker 객체를 생성한다.
def create_workers(input_list):
    workers = []
    for input_data in input_list:
        workers.append(LineCountWorker(input_data))
    return workers

In [8]:
# map 단계를 여러 스레드로 나눠서 이 Worker 인스턴스들을 실행한다.
# 그런 다음 reduce를 반복적으로 호출해서 결과를 최종값 하나로 합친다.
def execute(workers):
    threads = [Thread(target=w.map) for w in workers]
    for thread in threads: thread.start()
    for thread in threads: thread.join()
        
    first, rest = workers[0], workers[1:]
    for worker in rest:
        first.reduce(worker)
    return first.result

In [9]:
# 마지막으로 단계별로 실행하려고 mapreduce 함수에서 모든 조각을 연결한다.
def mapreduce(data_dir):
    inputs = generate_inputs(data_dir)
    workers = create_workers(inputs)
    return execute(workers)