## ETL layer

### **Description:**

- Create an ETL layer based on DQC

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import sys
sys.path.append('../')
from scripts.etl import ETLTransform, transform_df_types, create_full_train_dataset # etl.py module
from scripts.dqc import DataQualityCheck # for "check_negative_values" function

## 1. Change dtypes for **df_train** columns

### Load all necessary data into dataframes at first

In [3]:
train_df = pd.read_csv('../data/sales_train.csv')
test_df = pd.read_csv('../data/test.csv')
items_df = pd.read_csv('../data/items.csv')
categories_df = pd.read_csv('../data/item_categories.csv')
shops_df = pd.read_csv('../data/shops.csv')

In [4]:
train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2935849 entries, 0 to 2935848
Data columns (total 6 columns):
 #   Column          Dtype  
---  ------          -----  
 0   date            object 
 1   date_block_num  int64  
 2   shop_id         int64  
 3   item_id         int64  
 4   item_price      float64
 5   item_cnt_day    float64
dtypes: float64(2), int64(3), object(1)
memory usage: 134.4+ MB


As it was mentioned earlier, all numerical data can be safely put into int/float-32 dtypes. Moreover, all **item_cnt_day** values are actually integers as well as **date** feature should be of 'datetime' type

In [5]:
train_df = transform_df_types(train_df, transform_date=True)
train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2935849 entries, 0 to 2935848
Data columns (total 6 columns):
 #   Column          Dtype         
---  ------          -----         
 0   date            datetime64[ns]
 1   date_block_num  float32       
 2   shop_id         float32       
 3   item_id         float32       
 4   item_price      float32       
 5   item_cnt_day    float32       
dtypes: datetime64[ns](1), float32(5)
memory usage: 78.4 MB


### Do the same for other dataframes' integer columns

In [5]:
test_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 214200 entries, 0 to 214199
Data columns (total 3 columns):
 #   Column   Non-Null Count   Dtype
---  ------   --------------   -----
 0   ID       214200 non-null  int64
 1   shop_id  214200 non-null  int64
 2   item_id  214200 non-null  int64
dtypes: int64(3)
memory usage: 4.9 MB


In [6]:
test_df = transform_df_types(test_df)

In [7]:
import re

def clean_item_name(name):
    name = re.sub(r'[^\w\s]', '', name)
    name = re.sub(r' D$', '', name)
    return name.lower()

# add cleaned item_name column to analyze item_names after merging
items_df['clean_item_name'] = items_df['item_name'].apply(clean_item_name)

items_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 22170 entries, 0 to 22169
Data columns (total 4 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   item_name         22170 non-null  object
 1   item_id           22170 non-null  int64 
 2   item_category_id  22170 non-null  int64 
 3   clean_item_name   22170 non-null  object
dtypes: int64(2), object(2)
memory usage: 692.9+ KB


In [8]:
items_df = transform_df_types(items_df)

In [9]:
categories_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 84 entries, 0 to 83
Data columns (total 2 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   item_category_name  84 non-null     object
 1   item_category_id    84 non-null     int64 
dtypes: int64(1), object(1)
memory usage: 1.4+ KB


In [10]:
categories_df = transform_df_types(categories_df)

In [11]:
shops_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 60 entries, 0 to 59
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   shop_name  60 non-null     object
 1   shop_id    60 non-null     int64 
dtypes: int64(1), object(1)
memory usage: 1.1+ KB


In [12]:
shops_df = transform_df_types(shops_df)

## 2. Delete rows with negative values

As we already now from the DQC Layer, 'item_price' feature has one negative value that should be deleted. 

In [None]:
DataQualityCheck.check_negative_values(train_df, 'item_price')

3.406169731481422e-05 percent of values are negative


In [14]:
train_df = ETLTransform.del_negative(train_df, 'item_price')

### Check for negative values again

In [None]:
DataQualityCheck.check_negative_values(train_df, 'item_price')

No negative values found


In [16]:
train_df.reset_index(drop=True, inplace=True)
train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2935848 entries, 0 to 2935847
Data columns (total 6 columns):
 #   Column          Dtype         
---  ------          -----         
 0   date            datetime64[ns]
 1   date_block_num  int32         
 2   shop_id         int32         
 3   item_id         int32         
 4   item_price      float32       
 5   item_cnt_day    int32         
dtypes: datetime64[ns](1), float32(1), int32(4)
memory usage: 78.4 MB


## 3. Fix **shops_df** duplicate data 

In [17]:
shops_df = ETLTransform.change_shop_ids(shops_df)
shops_df = ETLTransform.change_shop_names(shops_df)

Now we'll get 3 duplicated shop_ids

In [18]:
shops_df.duplicated().sum()

3

Let's remove them

In [19]:
shops_df.drop_duplicates(inplace=True)
shops_df.reset_index(drop=True, inplace=True)

It's also necessary to unite some **shop_id**s in **train_df** and **test_df** to avoid missing values after merging

In [20]:
train_df = ETLTransform.change_shop_ids(train_df)
test_df = ETLTransform.change_shop_ids(test_df)

## 4. Create a copy of **df_train** to aggregate monthly sales

In [21]:
train_aggregated = train_df.groupby(['date_block_num', 'shop_id', 'item_id']).agg({'item_cnt_day': 'sum', 'item_price': 'mean'}).reset_index()

train_aggregated.rename(columns={'item_cnt_day': 'item_cnt_month'}, inplace=True)
train_aggregated.head()

Unnamed: 0,date_block_num,shop_id,item_id,item_cnt_month,item_price
0,0,2,27,1,2499.0
1,0,2,33,1,499.0
2,0,2,317,1,299.0
3,0,2,438,1,299.0
4,0,2,471,2,399.0


In [22]:
train_aggregated[train_aggregated['item_id'] == 12457]

Unnamed: 0,date_block_num,shop_id,item_id,item_cnt_month,item_price
1917,0,4,12457,1,149.0
7920,0,13,12457,1,69.0
20991,0,25,12457,2,149.0
27233,0,28,12457,1,149.0
30780,0,30,12457,1,149.0
...,...,...,...,...,...
1105260,20,57,12457,1,98.0
1220681,23,28,12457,1,58.0
1223225,23,30,12457,1,58.0
1298101,24,56,12457,1,58.0


## 5. Add **year** and **month** columns to received datasets

In [23]:
train_df = ETLTransform.add_month_year_columns(train_df)
train_aggregated = ETLTransform.add_month_year_columns(train_aggregated)

train_df.head()

Unnamed: 0,date,date_block_num,shop_id,item_id,item_price,item_cnt_day,month,year
0,2013-01-02,0,59,22154,999.0,1,0,0
1,2013-01-03,0,25,2552,899.0,1,0,0
2,2013-01-05,0,25,2552,899.0,-1,0,0
3,2013-01-06,0,25,2554,1709.050049,1,0,0
4,2013-01-15,0,25,2555,1099.0,1,0,0


## 6. Merge **train_df** with **items_df, categories_df, shops_df**

In [24]:
merged_train_df = ETLTransform.merge_df(train_df, items_df, categories_df, shops_df)
merged_train_aggregated_df = ETLTransform.merge_df(train_aggregated, items_df, categories_df, shops_df)

merged_train_df.head()

Unnamed: 0,date,date_block_num,shop_id,item_id,item_price,item_cnt_day,month,year,item_name,item_category_id,clean_item_name,item_category_name,shop_name
0,2013-01-02,0,59,22154,999.0,1,0,0,ЯВЛЕНИЕ 2012 (BD),37,явление 2012 bd,Кино - Blu-Ray,"Ярославль ТЦ ""Альтаир"""
1,2013-01-03,0,25,2552,899.0,1,0,0,DEEP PURPLE The House Of Blue Light LP,58,deep purple the house of blue light lp,Музыка - Винил,"Москва ТРК ""Атриум"""
2,2013-01-05,0,25,2552,899.0,-1,0,0,DEEP PURPLE The House Of Blue Light LP,58,deep purple the house of blue light lp,Музыка - Винил,"Москва ТРК ""Атриум"""
3,2013-01-06,0,25,2554,1709.050049,1,0,0,DEEP PURPLE Who Do You Think We Are LP,58,deep purple who do you think we are lp,Музыка - Винил,"Москва ТРК ""Атриум"""
4,2013-01-15,0,25,2555,1099.0,1,0,0,DEEP PURPLE 30 Very Best Of 2CD (Фирм.),56,deep purple 30 very best of 2cd фирм,Музыка - CD фирменного производства,"Москва ТРК ""Атриум"""


## 7. Look at **merged_train_df** item_name feature

### Print duplicate item_ids

In [25]:
items_df[items_df['clean_item_name'].duplicated()]['item_id'].unique()

array([ 2331,  8248,  8290,  8299,  8423,  8617,  8619,  8623,  8628,
        8630,  8633,  8641,  8643,  8650,  8731,  8759,  8762,  8807,
        9049,  9731,  9842,  9865, 10313, 10373, 10558, 10750, 10815,
       10841, 10999, 11001, 11002, 11119, 11200, 11468, 11481, 11651,
       11787, 11894, 12033, 12099, 12379, 12458, 13012, 14066, 14340,
       14607, 14690, 14788, 14856, 15539, 15611, 16505, 16615, 17203,
       17287, 18127, 18532, 18600, 18719, 18761, 18942, 18946, 18987,
       19070, 19078, 19087, 19856, 20291, 21850, 21870, 22024],
      dtype=int32)

### Look at one of duplicate pairs in merged_train_df

In [26]:
merged_train_aggregated_df[train_aggregated['item_id'] == 12457].head()

Unnamed: 0,date_block_num,shop_id,item_id,item_cnt_month,item_price,month,year,item_name,item_category_id,clean_item_name,item_category_name,shop_name
1917,0,4,12457,1,149.0,0,0,КИН-ДЗА-ДЗА (регион),40,киндзадза регион,Кино - DVD,"Волжский ТЦ ""Волга Молл"""
7920,0,13,12457,1,69.0,0,0,КИН-ДЗА-ДЗА (регион),40,киндзадза регион,Кино - DVD,"Казань ТЦ ""Бехетле"""
20991,0,25,12457,2,149.0,0,0,КИН-ДЗА-ДЗА (регион),40,киндзадза регион,Кино - DVD,"Москва ТРК ""Атриум"""
27233,0,28,12457,1,149.0,0,0,КИН-ДЗА-ДЗА (регион),40,киндзадза регион,Кино - DVD,"Москва ТЦ ""МЕГА Теплый Стан"" II"
30780,0,30,12457,1,149.0,0,0,КИН-ДЗА-ДЗА (регион),40,киндзадза регион,Кино - DVD,"Москва ТЦ ""Перловский"""


In [27]:
merged_train_aggregated_df[train_aggregated['item_id'] == 12458].head()

Unnamed: 0,date_block_num,shop_id,item_id,item_cnt_month,item_price,month,year,item_name,item_category_id,clean_item_name,item_category_name,shop_name
335279,5,46,12458,1,28.0,5,0,КИН-ДЗА-ДЗА! (Регион),40,киндзадза регион,Кино - DVD,"Сергиев Посад ТЦ ""7Я"""
761713,13,28,12458,1,149.0,1,1,КИН-ДЗА-ДЗА! (Регион),40,киндзадза регион,Кино - DVD,"Москва ТЦ ""МЕГА Теплый Стан"" II"
785862,13,57,12458,1,149.0,1,1,КИН-ДЗА-ДЗА! (Регион),40,киндзадза регион,Кино - DVD,"Якутск Орджоникидзе, 56"
804303,14,25,12458,1,149.0,2,1,КИН-ДЗА-ДЗА! (Регион),40,киндзадза регион,Кино - DVD,"Москва ТРК ""Атриум"""
827657,14,51,12458,1,70.0,2,1,КИН-ДЗА-ДЗА! (Регион),40,киндзадза регион,Кино - DVD,"Тюмень ТЦ ""Зеленый Берег"""


In [28]:
first_shops_set = set(merged_train_aggregated_df[train_aggregated['item_id'] == 12457]['shop_id'].unique())
first_prices_set = set(merged_train_aggregated_df[train_aggregated['item_id'] == 12457]['item_price'].unique())

In [29]:
second_shops_set = set(merged_train_aggregated_df[train_aggregated['item_id'] == 12458]['shop_id'].unique())
second_prices_set = set(merged_train_aggregated_df[train_aggregated['item_id'] == 12458]['item_price'].unique())

In [30]:
first_dates_set = set(merged_train_df[merged_train_df['item_id'] == 12457]['date_block_num'].unique())
second_dates_set = set(merged_train_df[merged_train_df['item_id'] == 12458]['date_block_num'].unique())

In [31]:
print("Different shops:", second_shops_set - first_shops_set)
print("Different prices:", second_prices_set - first_prices_set)

Different shops: set()
Different prices: {148.7, 148.5, 70.0}


In [32]:
print("First set of prices:", first_prices_set)
print("Second set of prices:", second_prices_set)

First set of prices: {98.0, 99.0, 69.0, 148.95, 149.0, 58.0, 28.0}
Second set of prices: {70.0, 148.5, 149.0, 148.7, 28.0}


In [33]:
print("First set of dates:", first_dates_set)
print("Second set of dates:", second_dates_set)

First set of dates: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 23, 24, 26}
Second set of dates: {5, 13, 14, 15, 16, 17, 18, 19, 20, 22, 24}


### These two items intersect a lot by the shops, prices and months sets. Due to the same format of duplicate names, the other items are also likely to have similar corresponding feature values, so it's safe to drop duplicates and merge all datasets again

In [34]:
# Create a DataFrame to keep only the first occurrence of each 'item_name'
items_df_unique = items_df.drop_duplicates(subset='clean_item_name', keep='first')

In [35]:
# Merge the original DataFrame with the unique rows to find duplicates
merged_df = items_df.merge(items_df_unique[['clean_item_name', 'item_id']], on='clean_item_name', how='left', suffixes=('', '_to_keep'))

In [36]:
# Filter out the rows where the 'item_id' is not the same as 'item_id_to_keep'
duplicates_df = merged_df[merged_df['item_id'] != merged_df['item_id_to_keep']]
print("Number of duplicates:", len(duplicates_df))
duplicates_df.head(5)

Number of duplicates: 71


Unnamed: 0,item_name,item_id,item_category_id,clean_item_name,item_id_to_keep
2331,"Call of Duty: Ghosts [PS4, русская версия]",2331,20,call of duty ghosts ps4 русская версия,2268
8248,АЛЫЕ ПАРУСА (регион),8248,40,алые паруса регион,8247
8290,АНГЛИЙСКИЙ ВМЕСТЕ С ХРЮШЕЙ И… ч. 1 (регион),8290,40,английский вместе с хрюшей и ч 1 регион,8289
8299,АНДРЕЙ РУБЛЕВ (регион),8299,40,андрей рублев регион,8298
8423,АФОНЯ (регион),8423,40,афоня регион,8422


In [37]:
# Create the dictionary {item_id_that_im_deleting: item_id_to_replace_it_with}
item_id_mapping = dict(zip(duplicates_df['item_id'], duplicates_df['item_id_to_keep']))

### Transform dataframes using the **item_id_mapping**

In [38]:
items_df = items_df.drop_duplicates(subset=['clean_item_name'])
items_df.drop(columns=['clean_item_name'], inplace=True)

train_df['item_id'] = train_df['item_id'].replace(item_id_mapping)
test_df['item_id'] = test_df['item_id'].replace(item_id_mapping)
train_aggregated['item_id'] = train_aggregated['item_id'].replace(item_id_mapping)

items_df['item_id'].nunique()

22099

### Drop duplicates in train_aggregated to consider removed item_ids

In [39]:
train_aggregated = train_aggregated.drop_duplicates(subset=['date_block_num', 'shop_id', 'item_id'])

## 8. Merge **train_df** with **items_df, categories_df, shops_df** again

In [40]:
merged_train_df = ETLTransform.merge_df(train_df, items_df, categories_df, shops_df)
merged_train_aggregated_df = ETLTransform.merge_df(train_aggregated, items_df, categories_df, shops_df)

merged_train_df.head()

Unnamed: 0,date,date_block_num,shop_id,item_id,item_price,item_cnt_day,month,year,item_name,item_category_id,item_category_name,shop_name
0,2013-01-02,0,59,22154,999.0,1,0,0,ЯВЛЕНИЕ 2012 (BD),37,Кино - Blu-Ray,"Ярославль ТЦ ""Альтаир"""
1,2013-01-03,0,25,2552,899.0,1,0,0,DEEP PURPLE The House Of Blue Light LP,58,Музыка - Винил,"Москва ТРК ""Атриум"""
2,2013-01-05,0,25,2552,899.0,-1,0,0,DEEP PURPLE The House Of Blue Light LP,58,Музыка - Винил,"Москва ТРК ""Атриум"""
3,2013-01-06,0,25,2554,1709.050049,1,0,0,DEEP PURPLE Who Do You Think We Are LP,58,Музыка - Винил,"Москва ТРК ""Атриум"""
4,2013-01-15,0,25,2555,1099.0,1,0,0,DEEP PURPLE 30 Very Best Of 2CD (Фирм.),56,Музыка - CD фирменного производства,"Москва ТРК ""Атриум"""


In [41]:
merged_train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2935848 entries, 0 to 2935847
Data columns (total 12 columns):
 #   Column              Dtype         
---  ------              -----         
 0   date                datetime64[ns]
 1   date_block_num      int32         
 2   shop_id             int32         
 3   item_id             int32         
 4   item_price          float32       
 5   item_cnt_day        int32         
 6   month               int32         
 7   year                int32         
 8   item_name           category      
 9   item_category_id    int32         
 10  item_category_name  category      
 11  shop_name           category      
dtypes: category(3), datetime64[ns](1), float32(1), int32(7)
memory usage: 123.9 MB


In [42]:
merged_train_aggregated_df.head()

Unnamed: 0,date_block_num,shop_id,item_id,item_cnt_month,item_price,month,year,item_name,item_category_id,item_category_name,shop_name
0,0,2,27,1,2499.0,0,0,"007 Legends [PS3, русская версия]",19,Игры - PS3,"Адыгея ТЦ ""Мега"""
1,0,2,33,1,499.0,0,0,1+1 (BD),37,Кино - Blu-Ray,"Адыгея ТЦ ""Мега"""
2,0,2,317,1,299.0,0,0,1С:Аудиокниги. Мединский В. Мифы о России. О р...,45,Книги - Аудиокниги 1С,"Адыгея ТЦ ""Мега"""
3,0,2,438,1,299.0,0,0,1С:Аудиотеатр. Лучшие произведения русских пис...,45,Книги - Аудиокниги 1С,"Адыгея ТЦ ""Мега"""
4,0,2,471,2,399.0,0,0,1С:Бухгалтерия 8 (ред.3.0) как на ладони. Изд ...,49,Книги - Методические материалы 1С,"Адыгея ТЦ ""Мега"""


In [43]:
merged_train_aggregated_df.isna().sum().sum()

0

## 9. Do the same for **test_df**

In [44]:
merged_test_df = ETLTransform.merge_df(test_df, items_df, categories_df, shops_df)
merged_test_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 214200 entries, 0 to 214199
Data columns (total 7 columns):
 #   Column              Non-Null Count   Dtype   
---  ------              --------------   -----   
 0   ID                  214200 non-null  int32   
 1   shop_id             214200 non-null  int32   
 2   item_id             214200 non-null  int32   
 3   item_name           214200 non-null  category
 4   item_category_id    214200 non-null  int32   
 5   item_category_name  214200 non-null  category
 6   shop_name           214200 non-null  category
dtypes: category(3), int32(4)
memory usage: 4.8 MB


## 10. Create a dataframe with **item_cnt_month** value for all (date_block_num, shop_id) pairs

It will be used later in the EDA Layer

In [46]:
full_train_df = create_full_train_dataset(train_df, train_aggregated, items_df, categories_df, shops_df)

In [47]:
full_train_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 13308702 entries, 0 to 13308701
Data columns (total 10 columns):
 #   Column              Dtype   
---  ------              -----   
 0   date_block_num      int16   
 1   shop_id             int16   
 2   item_id             int16   
 3   item_cnt_month      int32   
 4   month               float64 
 5   year                float64 
 6   item_name           category
 7   item_category_id    int32   
 8   item_category_name  category
 9   shop_name           category
dtypes: category(3), float64(2), int16(3), int32(2)
memory usage: 432.2 MB


In [48]:
full_train_df.isna().sum().sum()

0

## 11. Export dataframes to .csv files

In [95]:
merged_test_df.to_csv('../data/merged_test.csv', index=False)
merged_train_df.to_csv('../data/merged_train.csv', index=False)
merged_train_aggregated_df.to_csv('../data/merged_train_aggregated.csv', index=False)
full_train_df.to_csv('../data/full_train.csv', index=False)