# Задание

Задача про джоин

Входные данные:
    1 - кликстрим (= поток фактов), нарезанный файлами, например csv.gz, первая колонка - ID пользователя (тип UUID), country_iso3_code - код страны по iso 3-х буквенный, остальные - не принципиально, но например event_name - строка, event_tms - unixtime 
    2 - дименшен с аттрибутами пользователя в какой-нибудть СУБД или опять же в csv; колонки ID, install_tms  (10+M записей)

На выходе хотим видеть (например в csv) данные в виде:
    ID,country_iso2_code (ISO 3166-1 alpha-2),install_tms,event_name,event_tms
    
Пожелания к решению:
    1 - Хотелось бы чтобы код умел работать эффективно как в случае когда у нас много (= неограничено) памяти, так и в случае если у нас есть верхний лимит N_bytes
    2 - Хотелось бы чтобы можно было распараллелить процесс ↑ на несколько ядер одного процессора (или в пределе - на другие машины)
    3 - базовый стек shell, python, psql, clickhouse; но не возбраняется использовать доп. средства (либы, пакеты, платформы, СУБД, ...) если есть необходимость

In [None]:
import sys
sys.path.append('/code')

import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask.delayed import delayed
from dask.distributed import Client
from os import environ
from task_generate_data import generate_data, DB_NAME, TABLE_NAME, CLCKSTREAM_NAME
from pathlib import Path

In [None]:
DB_HOST = environ["DB_HOST" ]
DASK_SCHEDULER_ADDRESS = environ["DASK_SCHEDULER_ADDRESS"]
DATA_DIR = environ["TASK_DATA_DIR"]
RESULT_DIR = environ["RESULT_DATA_DIR"]

In [None]:
pg_uri = f"postgresql://postgres:postgres@{DB_HOST}:5432/{DB_NAME}"

In [None]:
client = Client("tcp://scheduler:8786")

### Dask дашборд: (http://127.0.0.1:8787)

Выбор dask продиктован необходимостью уметь масштабироваться.

## Подготовка данных

In [None]:
# Эта команда:
#   Создаст базу данных вместе с таблицами
#   Сгенерирует фейковые данные в CSV и загрузит их в базу

generate_data(total_users=1000000,
              clickstream_file_max_lines=1000000,
              events_per_user=10)

## Задача 1

### Запрос для чтения данных из БД

Подразумевается, что данные в таблице не изменяются за период чтения. Если это не так, то необходимо предусмотреть дополнительную логику.

In [None]:
QUERY = f"""
select t.*
from {TABLE_NAME} as t
         left join (
    select user_id, ntile({{pages_total}}) over (order by user_id) as page
    from {TABLE_NAME}
) as p on p.user_id = t.user_id
where p.page = {{page}};
"""

### Источник 1 - БД

Генерируем lazy-load computations для dask. Используем .persist() для избежания повторных чтений.

In [None]:
total_pages = 20

In [None]:
df_dimensions = dd.from_delayed([
    delayed(pd.read_sql_query)(QUERY.format(pages_total=total_pages, page=page+1), pg_uri)
    for page in range(total_pages)
]).persist()

### Источник 2 - CSV

In [None]:
df_clickstream = dd.read_csv(f'{DATA_DIR}/{CLCKSTREAM_NAME}*.csv.gz', 
                             compression='gzip',
                             blocksize=None,
                             header=None,
                             names=['user_id', 'country_iso3_code', 'event_type', 'event_tms']
                            )

### Источник 3 - Коды стран

In [None]:
df_countries = pd.read_csv('https://raw.githubusercontent.com/datasets/country-codes/master/data/country-codes.csv', keep_default_na=False)
df_countries = df_countries[['ISO3166-1-Alpha-3', 'ISO3166-1-Alpha-2']]

### Джойним источники

In [None]:
df = (    
    df_dimensions
    .merge(df_clickstream, how='left', on='user_id')
    .merge(df_countries, how='left', left_on='country_iso3_code', right_on='ISO3166-1-Alpha-3')
    .rename(columns={"ISO3166-1-Alpha-2": "country_iso2_code"})
    
)
df=df[["user_id", "country_iso2_code", "install_tms", "event_type", "event_tms"]]

### Результат

In [None]:
df.to_csv(RESULT_DIR / Path('export-*.csv'))

In [None]:
%%sh
head /result_data/export-00.csv