# Data Reading Process

Importing libraries

In [1]:
import dask
import dask.dataframe as dd
import os
from dask.distributed import Client, LocalCluster
from dask_ml.impute import SimpleImputer
from dask_ml.compose import make_column_transformer

Declaring local cluster instance to manipulate and process parallel data

In [2]:
cluster = LocalCluster(n_workers = 8, threads_per_worker = 2, memory_limit = '8GB')
client = Client(cluster)

Show Dashboard link

In [3]:
client.dashboard_link

'http://127.0.0.1:8787/status'

creating a function to restart client

Importing databases to use to create de machine learning model

In [3]:
# reading all tables from database
table_names = os.listdir('databases')[1:]

# importing all databases
tables = {tbl_name.split('.')[0]: client.persist(dd.read_csv(f"databases/{tbl_name}", blocksize ='100MB')) for tbl_name in table_names}

# Data Wrangling

Joining tables to create a single one to use as the main dataframe

In [4]:
# merging all tables
data = tables['balances'].merge(tables['liabilities'], on = ['ID', 'period', 'month'], how = 'left')\
                        .merge(tables['movements'], on = ['ID', 'period', 'month'], how = 'left')\
                        .merge(tables['customers'], on = ['ID'], how = 'left')\
                        .merge(tables['digital'], on = ['ID', 'period', 'month'], how = 'left').repartition(25)

# removing from memory singular tables by deleting
del tables

Showing five records

In [None]:
data.head()

2023-10-08 02:23:54,801 - tornado.application - ERROR - Exception in callback <bound method BokehTornado._keep_alive of <bokeh.server.tornado.BokehTornado object at 0x00000143577C2B90>>
Traceback (most recent call last):
  File "C:\Users\Acer\anaconda3\envs\machine_learning\Lib\site-packages\tornado\ioloop.py", line 919, in _run
    val = self.callback()
          ^^^^^^^^^^^^^^^
  File "C:\Users\Acer\anaconda3\envs\machine_learning\Lib\site-packages\bokeh\server\tornado.py", line 779, in _keep_alive
    c.send_ping()
  File "C:\Users\Acer\anaconda3\envs\machine_learning\Lib\site-packages\bokeh\server\connection.py", line 91, in send_ping
    self._socket.ping(str(self._ping_count).encode("utf-8"))
  File "C:\Users\Acer\anaconda3\envs\machine_learning\Lib\site-packages\tornado\websocket.py", line 439, in ping
    raise WebSocketClosedError()
tornado.websocket.WebSocketClosedError
2023-10-08 02:23:55,833 - tornado.application - ERROR - Exception in callback <bound method BokehTornado._k

## Null Values Imputation

Checking data types and null values

In [7]:
dd.concat([data.dtypes, data.isnull().sum()/ len(data)], axis = 1).rename(columns = {0: 'data_type', 1: 'null_ratio'}).compute()

Unnamed: 0,data_type,null_ratio
ID,string,0.0
age,float64,0.0
balance_amount,float64,0.0
bureau_risk,string,0.0
days_default,float64,0.0
dig_1,float64,0.247692
dig_10,float64,0.247692
dig_11,float64,0.247692
dig_2,float64,0.247692
dig_3,float64,0.247692


Before deciding what kind of impution it is going to be performed, we observe its quartils to decide if imputing with mean or median

In [12]:
# selecting group of columns based on their name
dig_columns = [column for column in data.columns if 'dig' in column]
prod_columns = ['product_1_x', 'product_2_x']
type_columns = ['type_1', 'type_2', 'type_3', 'type_4']

# describing selected columns
data[dig_columns + prod_columns + type_columns].describe().compute()

Unnamed: 0,dig_1,dig_2,dig_3,dig_4,dig_5,dig_6,dig_7,dig_8,dig_9,dig_10,dig_11,product_1_x,product_2_x,type_1,type_2,type_3,type_4
count,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,8725112.0,10411700.0,10411700.0,5199137.0,5199137.0,5199137.0,5199137.0
mean,1.729927,3.250468,3.033453,2.620164,1.598258,3.604106,2.253315,2.012096,3.075644,1.5354,2.711378,3.038695,0.5321534,2.075979,2.599862,1.599407,1.741834
std,1.181385,0.8854857,0.8896423,1.07341,1.10379,1.116196,1.49034,1.398692,1.057191,1.587534,1.256354,1.209978,1.31691,1.884979,1.751632,1.857044,1.93127
min,0.0,0.0,-0.0001,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0001,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.905,2.8135,2.5268,1.9496,0.8656,3.2269,1.0292,1.0637,2.4311,0.0,1.9419,2.4634,0.0,0.0,0.0,0.0,0.0
50%,1.5559,3.3969,3.1072,2.5198,1.3914,3.8699,2.3898,2.0585,3.1871,1.3132,2.8337,3.21755,0.0,2.5762,3.1921,0.0,0.0
75%,2.3858,3.8939,3.6664,3.2382,2.155725,4.33525,3.429,3.0615,3.8788,2.7894,3.6603,3.90475,0.0,3.80645,3.9968,3.4514,3.7117
max,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0,5.0


As it can be seen, data has no skew apparently beacuse of the proximity between max value and quartil 3, and mean an median are quite the same, so median is chosen to be de imputer

In [None]:
# joining all selected columns
missing_numeric_columns = dig_columns + prod_columns + type_columns
# declaring column trasnformer
column_transformer = make_column_transformer((SimpleImputer(strategy = 'mean'), missing_numeric_columns), 
                                            remainder = 'passthrough').set_output(transform = 'pandas')

# imputing values
data = column_transformer.fit_transform(data)