## Data ingestion preparation pipeline
###  Data Engineer: Miguel Avila
###  Reviewers: Luis Hermenegildo, Edgar Correa, Diego JE
###  Proyecto Final del BootCamp Wizeline MLOPS2
###  Equipo: mlops-equipo5 Segunda Edicion, 2024


#### Install Kaggle Library to retrieve latest version

In [None]:
#from https://www.kaggle.com/discussions/general/74235
!pip install kaggle


####Manual upload of kaggle key file

In [None]:
#run this code if we are looking into uploading a local data source
from google.colab import files
files.upload()

#### Variable definitions and formats

In [None]:
#define config variables

src_folder = 'src_online_retail'
src_name = 'online_retail_raw.csv'
stg_folder = 'stg_online_retail'
stg_name = 'online_retail_stg.csv'
stg_rejected_name = 'online_retail_stg_rejected.csv'
prod_folder = 'prod_online_retail'
model_train_data = 'online_retail_train.csv'
model_train_pct = 0.6
model_validation_data = 'online_retail_validation.csv'
model_validation_pct = 0.2
model_test_data = 'online_retail_test.csv'
model_test_pct = 0.2


date_format = '%m/%d/%Y %H:%M'

#### Connect to Kaggle using local key file and download data file

In [None]:
#EXTRACT DATA PHASE
#connect to KAGGLE api and get raw data

! mkdir ~/.kaggle
! cp /content/kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json
#! kaggle datasets list
! kaggle datasets download -d vijayuv/onlineretail
! mkdir {src_folder}
! mkdir {stg_folder}
! mkdir {prod_folder}
! unzip onlineretail.zip -d {src_folder}/
! mv {src_folder}/OnlineRetail.csv {src_folder}/{src_name}

#### Import needed libraries for data transformation

In [None]:
import pandas as pd
import numpy as np
import datetime as dt



### exploratory functions
# onlineRetailDF.shape
# onlineRetailDF.head()
# onlineRetailDF.info()
# onlineRetailDF.describe()
# onlineRetailDF.isnull().sum()
# onlineRetailDF.duplicated().sum()
# onlineRetailDF[onlineRetailDF.duplicated()]
# onlineRetailDF.Country.value_counts()

#### Data cleansing step, remove duplicates, nulls, negavite and other invalid values

In [None]:
%%time
from ast import Delete
#RAW TO STG

raw_online_retail_df = pd.read_csv(f'{src_folder}/{src_name}', encoding='ISO-8859-1')

l_null_count = raw_online_retail_df.isnull().sum().sum()
df_rejected_rows = pd.DataFrame()

print('Starting Raw Data Processing ....')
print('Number of nulls values in the data set :' + str(l_null_count))


#remove nulls
if l_null_count > 0:
  print('Starting null removal process ...')
  online_detail_null_df = pd.DataFrame()
  for col in raw_online_retail_df.columns:
      null_col_df = raw_online_retail_df[raw_online_retail_df[col].isnull()]
      if null_col_df.shape[0] > 0:
        online_detail_null_df = pd.concat([online_detail_null_df, null_col_df.assign(Rejection_reason = ' NULL Column value : ' + col )])
  df_rejected_rows = online_detail_null_df
  raw_online_retail_df.dropna(inplace=True)
  print('Null removal process completed.')


#remove negative quantities
print('Starting negative quantities clean up ...')
l_negative_count =  (raw_online_retail_df['Quantity'] < 0).sum()

if l_negative_count > 0:
  df_negative_rows = raw_online_retail_df[raw_online_retail_df['Quantity'] < 0]
  df_rejected_rows = pd.concat([df_rejected_rows, df_negative_rows.assign(Rejection_reason = 'Negative Quantity')])
  raw_online_retail_df.drop(raw_online_retail_df[raw_online_retail_df['Quantity'] < 0].index, inplace=True)

print('Negative quantities clean up completed.')

#remove invalid prices
print('Starting invalid prices clean up ...')

l_invalid_price_count =  (raw_online_retail_df['UnitPrice'] < 0.01).sum()

if l_invalid_price_count > 0:
  df_invalid_price_rows = raw_online_retail_df[raw_online_retail_df['UnitPrice'] < 0.01]
  df_rejected_rows = pd.concat([df_rejected_rows, df_invalid_price_rows.assign(Rejection_reason = 'Invalid Price')])
  raw_online_retail_df.drop(raw_online_retail_df[raw_online_retail_df['UnitPrice'] < 0.01].index, inplace=True)

print('Invalid Prices clean up completed.')

#remove invalid country values
print('Starting Unespecified Country clean up ...')

l_unspecified_country_count =  (raw_online_retail_df['Country'] == 'Unspecified').sum()

if l_unspecified_country_count > 0:
  df_unspecified_country_rows = raw_online_retail_df[raw_online_retail_df['Country'] == 'Unspecified']
  df_rejected_rows = pd.concat([df_rejected_rows, df_unspecified_country_rows.assign(Rejection_reason = 'Unspecified')])
  raw_online_retail_df.drop(raw_online_retail_df[raw_online_retail_df['Country'] == 'Unspecified'].index, inplace=True)

print('Invalid Countries clean up completed.')
print('End Raw Data Processing ....')

pd.DataFrame.to_csv(df_rejected_rows, f'{stg_folder}/{stg_rejected_name}',index = False)
pd.DataFrame.to_csv(raw_online_retail_df, f'{stg_folder}/{stg_name}',index = False)

del df_rejected_rows
del raw_online_retail_df

#### Standarize, normalize, format date and create new columns for easier calculations

In [None]:
%%time

#STG TO PROD

df_online_retail_stg = pd.read_csv(f'{stg_folder}/{stg_name}', encoding='ISO-8859-1')

#Standarize Country Values, EIRE = Ireland, RSA = South Africa
df_online_retail_stg.replace(to_replace='EIRE', value='Ireland', inplace=True)
df_online_retail_stg.replace(to_replace='RSA', value='South Africa', inplace=True)

#Create new columns for date calculations
df_date_set = pd.DataFrame(pd.to_datetime(df_online_retail_stg['InvoiceDate'],format = date_format))
df_date_set.rename(columns={'InvoiceDate':'Invoice_Date'},inplace= True)
df_date_set['Invoice_Year'] = list(x.year for x in df_date_set['Invoice_Date'])
df_date_set['Invoice_Month'] = list(x.month for x in df_date_set['Invoice_Date'])
df_date_set['Invoice_Day'] = list(x.day for x in df_date_set['Invoice_Date'])
df_date_set['Invoice_Hour'] = list(x.hour for x in df_date_set['Invoice_Date'])
df_date_set['Invoice_Minute'] = list(x.minute for x in df_date_set['Invoice_Date'])

#Standarize Descriptions
df_online_retail_stg['Description'] = df_online_retail_stg['Description'].str.strip()
df_online_retail_stg['Description'] = df_online_retail_stg['Description'].str.lower()

#Remove duplicated column
df_online_retail_stg = pd.concat([df_online_retail_stg, df_date_set], axis=1)
df_online_retail_stg.drop(['InvoiceDate'], axis= 1, inplace=True)


####################
# Code to split processed file into training, validation and tests data sets

####################
#stg_df_len = len(df_online_retail_stg)
#model_train_data_df, model_validation_data_df, model_test_data_df = np.split(df_online_retail_stg, [int(model_train_pct * stg_df_len), int((model_validation_pct + model_train_pct) * stg_df_len)])

#pd.DataFrame.to_csv(model_train_data_df, f'{prod_folder}/{model_train_data}',index = False)
#pd.DataFrame.to_csv(model_validation_data_df, f'{prod_folder}/{model_validation_data}',index = False)
#pd.DataFrame.to_csv(model_test_data_df, f'{prod_folder}/{model_test_data}',index = False)

#del df_online_retail_stg, model_train_data_df, model_validation_data_df, model_test_data_df

#Generate single processed file
pd.DataFrame.to_csv(df_online_retail_stg, f'{prod_folder}/online_retail_prod.csv',index = False)




CPU times: user 9.36 s, sys: 208 ms, total: 9.57 s
Wall time: 9.77 s
