# Задание 2

In [None]:
import sys
sys.path.append('/code')

import pandas as pd
import dask
import dask.dataframe as dd
from dask.delayed import delayed
from dask.distributed import Client
from os import environ
from task_2_generate_data import generate_data

In [None]:
DB_HOST = environ["DB_HOST" ]
DASK_SCHEDULER_ADDRESS = environ["DASK_SCHEDULER_ADDRESS"]
DATA_DIR = environ["TASK_2_DATA_DIR"]

In [None]:
sql_uri = f"postgresql://postgres:postgres@{DB_HOST}:5432/task_2"

In [None]:
client = Client("tcp://scheduler:8786")

### Dask дашборд: (http://127.0.0.1:8787)

Выбор dask продиктован необходимостью уметь масштабироваться и работать с big-data.
Я не очень хорошо знаком с pandas и dask но кажется что эти инструменты хорошо подходят для демо чтобы не пилить эту задачу вечно.

Также я не знаком с термином "реконсиляция", пришлось гуглить. Надеюсь что основную идею задачи уловил верно.

Для строк и дат реконсиляцию делать не стал, так как работа с датами это то же самое - перевести даты в unix timestamp и работать с числами (секундами, миллисекундами), а сравнение строк это случай когда толеранс - 0. Для pandas код будет выглядеть одинаково.

## Подготовка данных

In [None]:
# Эта команда:
#   Создаст базу данных вместе с таблицами
#   Сгенерирует фейковые данные в CSV и загрузит их в базу
#   Изменит некоторые строки в CSV

generate_data(total_files=10,
              lines_per_file=100000,
              changed_lines_per_files=10)

## Задача 1

### Запрос для чтения данных из БД

Подразумевается, что данные в таблице не изменяются за период чтения. Если это не так, то необходимо предусмотреть дополнительную логику.

In [None]:
QUERY = """
select t.*
from tb_transactions as t
         left join (
    select transaction_id, ntile({pages_total}) over (order by transaction_id) as page
    from tb_transactions
) as p on p.transaction_id = t.transaction_id
where p.page = {page};
"""

### Источник 1 - БД

генерируем lazy-load computations для dask.
Команда .set_index() вызывает полное чтение данных из бд (неоптимально, но для демо сойдёт).
(можно использовать .persist() для избежания повторных чтений)

In [None]:
total_pages = 20

In [None]:
df_source_1 = dd.from_delayed([
    delayed(pd.read_sql_query)(QUERY.format(pages_total=total_pages, page=page+1), sql_uri)
    for page in range(total_pages)
]).persist().set_index('transaction_id')

### Источник 2 - CSV

In [None]:
df_source_2 = dd.read_csv(f'{DATA_DIR}/transactions_*.csv', 
                  header=None, parse_dates=['transaction_date'],
                  names=['user_uid', 'transaction_id', 'transaction_date', 'transaction_type', 'amount']
                         ).set_index('transaction_id')

## Реконсиляция с заданным толеранс

In [None]:
tolerance = 0.1 # set tolerance in %percents%

In [None]:
df = (
    df_source_1
    .merge(df_source_2, how='outer', on='transaction_id')
)

df_difference_idx = df[
    (abs((df['amount_y'] - df['amount_x'])/df['amount_x']) > (tolerance / 100)) 
    | (df['amount_x'].isnull())
    | (df['amount_y'].isnull())
].compute().index

In [None]:
print(f"Найдено {len(df_difference_idx)} отличающихся записей")

In [None]:
df_difference_idx[:3]

## Собираем реконсилированые данные по индексу

In [None]:
df_reconciliated = df_source_1.loc[~df_source_1.index.isin(df_difference_idx)]

In [None]:
print(f"Очищенные данные содержат {len(df_reconciliated)} записей")

In [None]:
df_reconciliated.head()

## Задача 2

In [None]:
df_reconciliated['month'] = df_reconciliated['transaction_date'].dt.to_period('M').dt.to_timestamp()
df_reconciliated['day'] = df_reconciliated['transaction_date'].dt.round('D')

In [None]:
total_per_user = (
    df_reconciliated
    .groupby('user_uid').sum()
)
total_per_user.head()

In [None]:
total_per_user_per_month = (
    df_reconciliated
    .groupby(['user_uid', 'month']).sum()
)
total_per_user_per_month.head()

In [None]:
total_per_user_per_day = (
    df_reconciliated
    .groupby(['user_uid', 'day']).sum()
)
total_per_user_per_day.head()