## Lib for Map-Reduce

In [5]:
from typing import Iterable, Callable, Any
from itertools import groupby


def flatten(inp: Iterable[Iterable[Any]]) -> Iterable[Any]:
    for it in inp:
        for item in it:
            yield item


def run_map(mapper: Callable[[Any], Iterable[Any]], input_stream: Iterable[Any]):
    return flatten(map(mapper, input_stream))


def run_reduce(reducer: Callable[[Iterable[Any]], Any],
               input_stream: Iterable[Any],
               key: [str]) -> Iterable[Any]:
    def key_func(item):
        return tuple(getattr(item, k) for k in key)
    
    sorted_stream = sorted(input_stream, key=key_func)
    grouped_stream = groupby(sorted_stream, key=key_func)
    return flatten(map(lambda x: reducer(x[1]), grouped_stream))


class SimpleMapReduce:
    def __init__(self, stream):
        self._stream = stream

    def map(self, mapper):
        self._stream = run_map(mapper, self._stream)
        return self

    def reduce(self, reducer, key):
        self._stream = run_reduce(reducer, self._stream, key)
        return self

    def output(self):
        return self._stream

## DAU поиска с MapReduce на коленке

In [26]:
from typing import Iterable, Callable, Any
from dataclasses import dataclass
import datetime

from simplemr import SimpleMapReduce


@dataclass
class UserEvent:
    user_id: str
    moment: datetime.datetime
    action: str
    value: float

@dataclass
class UserDate:
    user_id: str
    date: datetime.date

@dataclass
class DateDAU:
    date: datetime.date
    dau: int


def parse_user_event(line: str) -> DateDAU:
    row = line.strip().split('\t')
    if row[0] != 'userid':
        yield UserEvent(
            user_id=row[0],
            moment=datetime.datetime.fromisoformat(row[1]),
            action=row[2],
            value=float(row[3]),
        )
        
def filter_user_event(event: UserEvent) -> Iterable[Any]:
    if getattr(event, 'action', None) == 'search':
        yield event
        
def user_event_to_user_date(event: UserEvent) -> Iterable[UserDate]:
    yield UserDate(
        user_id=event.user_id,
        date=event.moment.date(),
    )
    
def passive_sort_by_key(inp: Iterable[UserDate]) -> Iterable[UserDate]:
    for ud in inp:
        yield ud
        break
        
def count_users_by_date(inp: Iterable[UserDate]) -> Iterable[DateDAU]:
    count = 0
    date = None
    for ud in inp:
        date = ud.date
        count += 1
    assert date is not None
    
    yield DateDAU(date=date, dau=count)


def process(mrjob: SimpleMapReduce) -> SimpleMapReduce:
     return mrjob.map(parse_user_event) \
         .map(filter_user_event) \
         .map(user_event_to_user_date) \
         .reduce(passive_sort_by_key, ['user_id', 'date']) \
         .reduce(count_users_by_date, ['date'])

## Testing

In [27]:
with open("log.tsv", "r") as input_stream:
    mrjob = process(SimpleMapReduce(input_stream))
    for item in mrjob.output():
        print(item)

DateDAU(date=datetime.date(2022, 8, 22), dau=2)
DateDAU(date=datetime.date(2022, 8, 25), dau=1)
DateDAU(date=datetime.date(2022, 9, 5), dau=1)
