In [1]:
import json
from typing import List, Any, Tuple, Dict
from sqlalchemy import create_engine, Engine
import pandas as pd
from pandas import DataFrame
import numpy as np
from typing import Union
from sqlalchemy.exc import PendingRollbackError

## Функции

### Обработчики

In [45]:
def read_json(path: str):
    """"Считываем json."""
    with open(path, 'rb') as json_file:
        return json.load(json_file)


def get_tuple_list(obj):
    """Разворачиваем Json в генератор кортежей.
    
    С сохранением пути к значениям.
    """
    if isinstance(obj, dict):
        for k, v in obj.items():
            if isinstance(v, list): 
                for idx, item in enumerate(v): 
                    if isinstance(item, dict):  
                        for i in get_tuple_list(item):  
                            yield (k + str(idx),) + i
                    else:
                        yield (k,) + (item,)
            else: 
                for i in get_tuple_list(v):
                    yield (k,) + i
    else:
        yield (obj,)
        

def filter_tuples(
    tuple_lst: List[Tuple[Any]],
    check_list: List[str],
    check_substring: str,
    tpl_idx: int,
) -> List[Tuple[Any]]:
    """Фильтрация списка кортежей по фильтру."""
    return [
        obj 
        for obj in tuple_lst
        if any(element in check_list for element in obj) 
        and check_substring in obj[tpl_idx] 
    ]

def clean_id_column(df: DataFrame, columns_strings: Dict[str, Any]) -> None:
    """Получений ID из Series."""
    for column, substring in columns_strings.items():
        df[column] = df[column].apply(lambda x: int(str(x).replace(substring, '')) if pd.notnull(x) else np.nan)
    
def to_datetime(df, columns: List[str]) -> None:
    """Приводим столбец к формату datetime."""
    for column in columns:
        df[column] = pd.to_datetime(df[column], unit='s')
        
def to_list(x):
    return x if len(x)==1 else list(x)


### Функции для формирования датафреймов

In [39]:
def get_df(
        tuple_lst: List[Tuple[Any]],
        initial_columns: List[str],
        index_for_pivot: List[str],
        series_to_columns: str,
        values: str,
        new_columns: List[str],
        columns_and_replace_substring_for_clean_id: Dict[str, Any],
        columns_for_datetime_change:  List[str],
        agg_func: Union[str, callable],
    ) -> DataFrame:
    """Функция для формирования датафрейма из списка кортежей."""
    df = pd.DataFrame(data=tuple_lst, columns=initial_columns)
    df_pivoted = pd.pivot_table(
        data=df,
        index=index_for_pivot,
        columns=series_to_columns,
        values=values,
        aggfunc=agg_func,
    ).reset_index()
    if new_columns:
        df_pivoted.columns = new_columns
    clean_id_column(df_pivoted, columns_and_replace_substring_for_clean_id)
    to_datetime(df_pivoted,columns_for_datetime_change)
    return df_pivoted

### Подключение к Базе данных

In [4]:
def engine() -> Engine:
    """Формируем подключение к БД."""
    conn_string = 'postgresql+psycopg2://flocktory:flocktory_pass_23@158.160.68.37/flocktory'
    return create_engine(conn_string)

## Запуск операций

In [41]:
# Парсим Json и превращаем его в список кортежей
train_json = read_json('train.json')
test_json = read_json('test.json')


In [42]:
train_tuple_lst = [*get_tuple_list(train_json)]
test_tuple_lst = [*get_tuple_list(test_json)]

### Формирование датафреймов и загрузка в БД

#### Визиты

In [None]:
visits_tuples_train = (filter_tuples(train_tuple_lst, ['site-id', 'first-seen', 'last-seen'], 'visits', 2), 'visits_train')
visits_tuples_test = (filter_tuples(test_tuple_lst, ['site-id', 'first-seen', 'last-seen'], 'visits', 2), 'visits_test')

for visits_tuple, table_name in [visits_tuples_train, visits_tuples_test]:
    try:
        with engine().connect() as connection:
            visits_df = get_df(
                tuple_lst = visits_tuple,
                initial_columns=['user_id', 'category', 'visit_id', 'attribute', 'value'],
                index_for_pivot=['user_id', 'visit_id'],
                series_to_columns='attribute',
                values='value',
                new_columns=['user_id', 'visit_id', 'first_seen',  'last_seen', 'site_id'],
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_', 
                    'visit_id': 'visits',
                },
                columns_for_datetime_change=['last_seen', 'first_seen'],
                agg_func='first',
            ) 
            visits_df.to_sql(con=connection, name=table_name, schema='public', index=False, if_exists='replace')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()


### Посещаемые объекты

In [None]:
features = ['visited-items', 'visited-general-categories', 'visited-universal-brands']
visited_items_train = (filter_tuples(train_tuple_lst, features, 'visits', 3), 'visited_items_train')
visited_items_test = (filter_tuples(test_tuple_lst, features, 'visits', 3), 'visited_items_test')


for items_tuple, table_name in [visited_items_train, visited_items_test]:
    try:
        with engine().connect() as connection:
            items_df = get_df(
                tuple_lst = items_tuple,
                initial_columns=['user_id', 'category', 'visit_id', 'session_id', 'attribute', 'value'],
                index_for_pivot=['user_id', 'visit_id', 'session_id'],
                series_to_columns='attribute',
                values='value',
                new_columns=[
                    'user_id',
                    'visit_id', 
                    'session_id',
                    'visited_category_id',
                    'visited_item_id',
                    'visited_brand_id',
                ],
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_', 
                    'visit_id': 'visits', 
                    'session_id': 'visits',
                },
                columns_for_datetime_change=[],
                agg_func=list,
            ) 
            items_df.to_sql(con=engine(), name=table_name, schema='public', index=False, if_exists='replace')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()

### Заказы

#### Сайты заказов

In [None]:
features = ['site-id']
orders_sites_train = (filter_tuples(train_tuple_lst, features, 'orders', 2), 'orders_sites_train')
orders_sites_test = (filter_tuples(test_tuple_lst, features, 'orders', 2), 'orders_sites_test')


for orders_tuple, table_name in [orders_sites_train, orders_sites_test]:
    try:
        with engine().connect() as connection:
            orders_df = get_df(
                tuple_lst = orders_tuple,
                initial_columns=['user_id', 'category', 'order_id', 'attribute', 'value'],
                index_for_pivot=['user_id', 'order_id'],
                series_to_columns='attribute',
                values='value',
                new_columns=[
                    'user_id',
                    'order_id', 
                    'site-id',
                ],
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_',
                    'order_id': 'orders', 
                },
                columns_for_datetime_change=[],
                agg_func='first',
            ) 
            orders_df.to_sql(con=engine(), name=table_name, schema='public', index=False, if_exists='replace')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()

#### Даты заказов

In [None]:
features = ['created-at']
orders_created_train = (filter_tuples(train_tuple_lst, features, 'orders', 3), 'orders_created_train')
orders_created_test = (filter_tuples(test_tuple_lst, features, 'orders', 3), 'orders_created_test')


for items_tuple, table_name in [orders_created_train, orders_created_test]:
    try:
        with engine().connect() as connection:
            orders_created_df = get_df(
                tuple_lst = items_tuple,
                initial_columns=['user_id', 'category', 'order_id', 'sub_order_id','attribute', 'value'],
                index_for_pivot=['user_id', 'order_id', 'sub_order_id'],
                series_to_columns='attribute',
                values='value',
                new_columns=[
                    'user_id',
                    'order_id', 
                    'sub_order_id',
                    'created_at',
                ],
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_', 
                    'order_id': 'orders', 
                    'sub_order_id': 'orders',
                },
                columns_for_datetime_change=['created_at'],
                agg_func='first',
            ) 
            orders_created_df.to_sql(con=engine(), name=table_name, schema='public', index=False, if_exists='replace')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()

#### Объекты заказов

In [40]:
features = ['id', 'count', 'general-category-path', 'brand-id']
orders_items_train = (filter_tuples(train_tuple_lst, features, 'orders', 3), 'orders_items')
orders_items_test = (filter_tuples(test_tuple_lst, features, 'orders', 3), 'orders_items')


for items_tuple, table_name in [orders_items_test, orders_items_train]:
    try:
        with engine().connect() as connection:
            items_df = get_df(
                tuple_lst = items_tuple,
                initial_columns=['user_id', 'category', 'order_id', 'sub_order_id','items','attribute', 'value'],
                index_for_pivot=['user_id', 'order_id', 'sub_order_id', 'items'],
                series_to_columns='attribute',
                values='value',
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_', 
                    'order_id': 'orders', 
                    'items': 'items',
                    'sub_order_id': 'orders',
                },
                new_columns=[],
                columns_for_datetime_change=[],
                agg_func=to_list,
            ) 
            items_df = items_df.iloc[:, 1:]
            items_df.to_sql(con=engine(), name=table_name, schema='public', index=False, if_exists='replace')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()

### Сессии

In [10]:
features = ['visited-at', 'session-duration', 'pages-count']
sessions_tuples_train = filter_tuples(train_tuple_lst, features, 'visits', 3) 
sessions_tuples_test = filter_tuples(test_tuple_lst, features, 'visits', 3) 


for sessions_tuple, table_name in [
    (sessions_tuples_test[:len(sessions_tuples_test)//2],'sessions_test'),
    (sessions_tuples_train[:len(sessions_tuples_train)//2],'sessions_test')
]:
    try:
        with engine().connect() as connection:
            session_df = get_df(
                tuple_lst = sessions_tuple,
                initial_columns=['user_id', 'category', 'visit_id', 'session_id', 'attribute', 'value'],
                index_for_pivot=['user_id', 'visit_id', 'session_id'],
                series_to_columns='attribute',
                values='value',
                new_columns=[
                    'user_id',
                    'visit_id', 
                    'session_id', 
                    'pages_count',
                    'session_duration', 
                    'visited_at',
                ],
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_', 
                    'visit_id': 'visits', 
                    'session_id': 'visits',
                },
                columns_for_datetime_change=['visited_at'],
                agg_func='first',
            ) 
            session_df.to_sql(con=engine(), name=table_name, schema='public', index=False, if_exists='replace')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()

In [11]:
for sessions_tuple, table_name in [
    (sessions_tuples_test[len(sessions_tuples_test)//2:],'sessions_test'),
    (sessions_tuples_train[len(sessions_tuples_train)//2:],'sessions_test'),
]:
    try:
        with engine().connect() as connection:
            session_df = get_df(
                tuple_lst = sessions_tuple,
                initial_columns=['user_id', 'category', 'visit_id', 'session_id', 'attribute', 'value'],
                index_for_pivot=['user_id', 'visit_id', 'session_id'],
                series_to_columns='attribute',
                values='value',
                new_columns=[
                    'user_id',
                    'visit_id', 
                    'session_id', 
                    'pages_count',
                    'session_duration', 
                    'visited_at',
                ],
                columns_and_replace_substring_for_clean_id={
                    'user_id': 'user_', 
                    'visit_id': 'visits', 
                    'session_id': 'visits',
                },
                columns_for_datetime_change=['visited_at'],
                agg_func='first',
            ) 
            session_df.to_sql(con=engine(), name=table_name, schema='public', index=False, if_exists='append')
    except PendingRollbackError:
        print(str(PendingRollbackError))
        connection.rollback()