# Описание задания

1. Разработать python-скрипт для загрузки, маппинга и фильтрации данных из источника в dds, согласно постановке от аналитика:\
    a. в процессе загрузки исправить все ошибки в данных, обнаруженные аналитиком;\
    б. загрузку выполнять полной перезаписью данных в слое dds - удалить всё что есть и загрузить новое;\
    в. обработку данных можно выполнять при помощи специализированной библиотеки (например, pandas), SQL-скриптов, либо обрабатывать чистым питоном;\
    г. желательно минимизировать время недоступности слоя dds (время с момента удаления старых данных и до окончания записи новых).

2. Разработать DAG для запуска скрипта загрузки:\
    а. это должен быть отдельный скрипт, который считывает конфигурации и параметры подключения к БД и описывает задачу (оператор) для запуска скрипта загрузки с нужными параметрами.

3. Организовать git-репозиторий:\
    а. создать, предоставить необходимые права аналитикам и менторам, загрузить весь код по заданиям 2 и 3\
    б. структура репозитория должна отражать архитектуру системы.

# 1. Импорт библиотек

In [1]:
import numpy as np
import pandas as pd
import scipy as sp
from sqlalchemy import create_engine, text
import yaml

import warnings
warnings.filterwarnings(action='ignore')

In [2]:
def get_yaml(path: str) -> dict:
    with open(path) as f:
        params = yaml.load(f, Loader=yaml.FullLoader)
    
    return params

In [3]:
params = get_yaml('./src/params.yaml')['connection']['internship_sources']
engine = create_engine(f"postgresql://{params['user']}:{params['password']}@{params['host']}:{5432}/{params['database']}")
conn = engine.connect()

# 2. Парсинг таблиц

## 2.1. category

In [22]:
df_cat = pd.read_sql('SELECT * FROM sources.category', conn, index_col='category_id')
df_brand = pd.read_sql('SELECT * FROM sources.brand', conn, index_col='brand_id')

init_shape = df_cat.shape[0]

# Дубликаты
df_cat.drop_duplicates(ignore_index=False, keep='first', inplace=True)

# ?!
df_cat.drop(axis=1, index=df_cat[df_cat['category_name'].str.contains(r'([?!<>])') == True].index, inplace=True)

# brand (Eng)
# df_cat = df_cat.drop(axis=1, index = df_cat[df_cat['category_name'].str.match(r'.*[а-яА-я].') == False].index)

# brand (All)
df_cat['cat_lower'] = df_cat['category_name'].str.lower()
df_brand['name_lower'] = df_brand['brand'].str.lower()
duplicates_ids = pd.Index(df_cat.reset_index().set_index('cat_lower').join(df_brand.set_index('name_lower'), how="inner")['category_id'].values)
df_cat = df_cat.drop('cat_lower', axis=1)
df_brand = df_brand.drop('name_lower', axis=1)
df_cat.drop(axis=1, index=duplicates_ids, inplace=True)

# НЕ ИСПОЛЬЗУЕМ
df_cat.drop(axis=1, index=df_cat[df_cat['category_name'] == 'НЕ ИСПОЛЬЗУЕМ'].index, inplace=True)

# Дубликат + цифра (работа вручную)
# display(df_cat[df_cat['category_name'].str.contains(r'([0-9])') == True].sort_values('category_name'))

print(f'Из таблицы category удалено {init_shape - df_cat.shape[0]} строк, {df_cat.shape[0] / init_shape:.2f}%')

Из таблицы category удалено 97 строк, 0.87%


## 2.2. brand 💻

In [202]:
df_brand = pd.read_sql('SELECT * FROM sources.brand', conn, index_col='brand_id')

init_shape = df_brand.shape[0]
# Дубликаты
df_brand.drop_duplicates(ignore_index=False, keep='first', inplace=True)

# TEVA - 1323 (не менять)
# 1323 - FARBITEX(?)

# 2989 - Нет (удалить)
df_brand.drop(axis=0, index='2989', inplace=True)

print(f'Из таблицы brand удалено {init_shape - df_brand.shape[0]} строк, {df_brand.shape[0] / init_shape:.2f}%')

Из таблицы brand удалено 20 строк, 0.99%


## 2.3. product ✅

In [155]:
df_product = pd.read_sql('SELECT * FROM sources.product', conn, index_col='product_id')

init_shape = df_product.shape[0]

# Дубликаты
df_product.drop_duplicates(ignore_index=False, keep='first', inplace=True)

# Empty pricing_line(empty char)
df_product['pricing_line_id'] = df_product.index.to_series()

# Incorrect name_short
display(df_product[df_product['name_short'].str.match(r'([a-zа-я0-9])')].sort_values(by='name_short').tail())

# NO BRAND
#

# Вопрос про diff product_id, но одинаковые name_short
# df_product[df_product['category_id'] == 'O10']

print(f'Из таблицы product удалено {init_shape - df_product.shape[0]} строк, {df_product.shape[0] / init_shape:.2f}%')

Unnamed: 0_level_0,name_short,category_id,pricing_line_id,brand_id
product_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
24597874,987983745,O02,24597874,1000
24597895,лвоымьыфжабыв,O00,24597895,1000
24597873,не завели,О00,24597873,1001
24597875,некорректное название,O02,24597875,1000
26159416,панель стеновая Лесная Сказка,K14,26159416,1000


Из таблицы product удалено 263 строк, 0.99%


## 2.4. transaction

In [204]:
df_trans = pd.read_sql('SELECT * FROM sources.transaction', conn, index_col='transaction_id')

df_trans

Unnamed: 0_level_0,product_id,recorded_on,quantity,price,price_full,order_type_id
transaction_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
X302021062367268,26191485,23.06.2021 16:41,1,256,270,BUY
X2720210612469927,26191478,12.06.2021 11:01,1,256,270,BUY
X2820210610618712,26191461,10.06.2021 13:15,1,256,270,BUY
X2820210610618712,26191515,10.06.2021 13:15,1,256,270,BUY
X2820210618616123,26191515,18.06.2021 9:57,1,256,270,BUY
...,...,...,...,...,...,...
X2820210625618406,23541788,25.06.2021 20:28,1,498,498,BUY
X2820210605611441,24848077,05.06.2021 9:24,1,256,261,BUY
X2820210627623823,26071220,27.06.2021 15:18,1,232,237,BUY
X2820210619616560,26114378,19.06.2021 12:57,1,29,30,BUY


In [212]:
df_trans[df_trans.duplicated() == True]

Unnamed: 0_level_0,product_id,recorded_on,quantity,price,price_full,order_type_id
transaction_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
X3820210615166745,23361591,15.06.2021 17:33,1,8,8,BUY
X2720210612470152,24202978,12.06.2021 15:59,2,39,43,BUY
X2720210605468666,23361591,05.06.2021 13:23,1,8,8,BUY
X3820210609141018,23361591,09.06.2021 13:25,1,8,8,BUY
X3820210605140387,23361591,05.06.2021 10:33,1,8,8,BUY
X2820210606617910,26121024,06.06.2021 15:25,1,15,16,BUY
X3020210619195443,23957879,19.06.2021 20:46,1,139,139,BUY
X2820210609618553,23361591,09.06.2021 16:53,1,8,8,BUY
X2820210619616650,23361591,19.06.2021 15:44,1,8,8,BUY
X2720210614470866,23361591,14.06.2021 11:57,1,8,8,BUY


## 2.5. stock