In [None]:
#*************************************************************************************************#
# Elisalvo Ribeiro, e-mail:elisalvo.ribeiro@gmail.com, cel: +55 11 95937-6412 | +55 79 98861-8857 #
#*************************************************************************************************#
from IPython import get_ipython
get_ipython().magic('reset -sf') # remove all, equal to R's rm(list=ls())
import gc
import time
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd
from dask.multiprocessing import get
#from dask.diagnostic import ProgressBar
from multiprocessing import cpu_count

In [None]:
########################################################################
### =====   Import large csv file (30 million row) with Dask   ===== ### 
########################################################################
# **    This file contains cost and revenue data from five shop     ** #
pathCost = 'C:/GitHub/sho/costShop.csv'
pathCostCod = 'C:/GitHub/sho/costCodShop.csv'
pathRevenue = 'C:/GitHub/sho/revenueShop.csv'
pathRevenueCod = 'C:/GitHub/sho/revenueCodShop.csv'
pathShop = 'C:/GitHub/sho/shop.csv'
start = time.time()
cost = dd.read_csv(pathCost, decimal = ',', sep = ';', dtype = {'Eaccount':str, 'Jc':str, 'Value':float, 'date':str, 'Manager':str, 'shop':str})
costCod = dd.read_csv(pathCostCod, decimal = ',', sep = ';', dtype = {'Eaccount':str, 'FirstO':str, 'SecondO':str, 'descr':str})
revenue = dd.read_csv(pathRevenue, decimal = ',', sep = ';', dtype = {'YearMonth':str, 'CodGroup':object, 'CodManager': object, 'CodShop':str,
                                                                     'CodType':object, 'TpPeople':object, 'Id':float, 'Balance':float, 'Revenue':float})
revenue = revenue.rename(columns = {'Id':'IdService'})
revenueCod = dd.read_csv(pathRevenueCod, decimal = ',', sep = ';', 
                         encoding = 'Latin1', header = None, names = ['IdType', 'Type', 'IdService', 'Service'], 
                         dtype = {'IdType':object, 'Type':str, 'IdService':float, 'Service':str})

shop = pd.read_excel(pathShop, sheet_name = 'Sheet1')
finish = time.time()
finish - start
gc.collect()
# revenue.info(memory_usage='deep'), revenue.head(2), revenue.dtypes, type(revenue), revenue.npartitions

In [None]:
# ** Join the files ** #
start = time.time()
shop_revenue = revenue.merge(revenueCod.reset_index(), on = 'IdService', how = 'left')
shop_cost = cost.merge(costCod.reset_index(), on = 'Eaccount', how = 'left')
finish - start
del revenue, revenueCod, cost, costCod
gc.collect()

In [None]:
# ** First cleaning data ** #
# Firts I exclude all data before 2018. The 'FirstO' column contains the cost category that I need to analyze the dataset, 
# then I will turn this column to head (new labels), next exclude all missing.
dtCost = shop_cost[shop_cost['date']>'201712'].groupby(['shop', 'date', 'FirstO'])['Value'].sum().reset_index()
dtCost = dtCost.categorize(columns = ['FirstO'])
dtCost['shopDate'] = dtCost['shop'] +'_'+ dtCost['date'] #Create new 'id'
dtCost = dtCost.pivot_table(index = 'shopDate', columns = 'FirstO', values = 'Value', aggfunc = 'sum')
dtCost = dtCost.rename(columns = {c: c.replace(" ", '') for c in dtCost.columns}) #remove blanks
dtCost.columns = dtCost.columns.str.replace("/",'')
dtCost = dtCost.drop(['ShopOther', 'FreeShop', 'RevenueOther'], axis = 1, errors = 'ignore') #remove variable that contains many missing
dtCost['TotalCost'] = dtCost.iloc[:, 1:18].sum(axis = 1)
del shop_cost
gc.collect()

In [None]:
# ** Second cleaning data ** #
dtRev = shop_revenue
dtRev.columns = dtRev.columns.str.replace(" ", "_")
dtRev = dtRev.groupby(['CodShop', 'YearMonth', 'Type'])['Revenue'].sum().reset_index()
dtRev = dtRev.categorize(columns = ['Type'])
dtRev['shopDate'] = dtRev['CodShop'] +'_'+ dtRev['date']
dtRev = dtRev.pivot_table(index = 'shopDate', columns = 'Type', values = 'Balance')
dtRev = dtRev.rename(columns = {c: c.replace(" ", "") for c in dtRev.columns})
dtRev = dtRev.reset_index()
dtRev['TotalRev'] = dtRev.iloc[:, 1:7].sum(axis = 1)
del shop_revenue
gc.collect()

In [None]:
# ** Join dtCost and dtRev dataset ** #
dt = dtCost.merge(dtRev.reset_index(), on = 'shopDate', how = 'left')
print(len(dtCost.index), len(dtRev.index)) #Note: The 'dtRev' dataset should have the same amount row of the 'dtCost', 
                                           #but don't have
del dtCost, dtRev
gc.collect()

In [None]:
# ** Calculate a productivity index ** #
dt['PI'] = dt['TotalCost']/dt['TotalRev']
# with these cleanings the base reduced to about 200.000 rows, then I convert the base to pandas dataframe #
dt2 = dt.compute()
del dt
gc.collect()

In [None]:
# ** Third cleaning data ** #
dt2 = dt2[(dt2['PI'].notnull()) & ((dt2['PI']>=0) & (dt2['PI']<1.5))]
dt2 = dt2.assign(PI_cat = lambda dataframe:datframe['PI'].map(lambda PI:'Adequado' if PI < 0.56 else 'Inadequado')) # equal mutate(dplyr)
# Filter just cost and total revenue variable #
cols = [c for c in dt2.iloc[:,[0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,28]]]
db = dt2.loc[:,cols]
del dt2
gc.collect()

In [None]:
# ** Forth cleaning data ** #
shop.columns = shop.columns.str.replace(" ","")
shop = shop.iloc[:, [0, 6, 12]]
shop = shop.rename(columns = {'CodSho':'shop'})
shop = shop.astype({'shop':int})

db["shop"] = db["shopDate"].str.split("_", 1).str.get(0) #generate the variabel 'shop' from 'shopDate'
db = db.astype({'shop':int})
db = pd.merge(db, shop, on = 'shop', how = 'left')

def regionBR(x):
    if x in ['AC', 'AP', 'AM', 'PA', 'RO', 'RR', 'TO']:
        value = 'Norte'
    elif x in ['CE', 'AL', 'BA', 'MA', 'PB', 'PE', 'PI', 'RN', 'SE']:
        value = 'Nordeste'
    elif x in ['GO', 'MT', 'MS', 'DF']:
        value = 'Centro-Oeste'
    elif x in ['ES', 'MG', 'RJ', 'SP']:
        value = 'Sudeste'
    elif x in ['PR', 'SC', 'RS']:
        value = 'Sul'
    else:
        value 'Other'
    return value

db = db.assign(region = db['UF'].apply(regionBR)) #equal mutate(dplyr)
gc.collect()

# The dataframe is now clean and I can normalize the data, create new variables...., to application machine leraning algoritm #
