# How to build a dataset for a Deep Learning model from scratch

## Business Problem 

### You need to predict the monthly sales for each client, and those clients shop at random dates during each month. 
### We're gonna solve this using a Deep Learning framework. The natural question is, how does it look a dataset to feed a DL network?

### When working with Convolutional Neural Nets, each image it's translated to a matrix of dimension $n \times m$ 

![cat_matrix.png](attachment:cat_matrix.png)

### For the particular problem we're trying to solve, we have a random amount of events (purchases made by a client) each having the same features that describes the event. The goal is to translate this random amount of events into a matrix (of dimension $l \times m$ where $l$ is arbitrary), in this way we can feed a Recurrent Neural Network (RNN).

### IMPORT LIBRARIES

In [1]:
import sys
#aws glue
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import Join
#PySpark
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorSizeHint, VectorAssembler, MinMaxScaler
#dense vectors
from pyspark.ml.linalg import SparseVector, DenseVector, VectorUDT,Vectors

#standard libraries
import boto3
import pandas as pd
import numpy as np
from io import StringIO
from datetime import date,datetime, timedelta
import calendar
import random

## flatten
from operator import add
import functools

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200309132055-0000
KERNEL_ID = 53b361d9-5f45-4f8b-b2d1-b9befa29f15c


## udf - user defined functions

In [4]:
def my_add_months(sourcedate, months):
    month = sourcedate.month - 1 + months
    year = sourcedate.year + month // 12
    month = month % 12 + 1
    day = np.min([sourcedate.day, calendar.monthrange(year,month)[1]])
    return date(year, month, day)

def my_diff_month(d1, d2):
    return (d1.year - d2.year) * 12 + d1.month - d2.month
  

### convert all rows to densevector from an sparse vector
to_dense_vector = udf(lambda vec: DenseVector(vec.toArray()), VectorUDT())

## dense vector to array float 
def dense_to_array(v):
    new_array = list([float(x) for x in v])
    return new_array

dense_to_array_udf = udf(dense_to_array, ArrayType(FloatType()))

def client_list(toy_list):
    spark_df = spark.createDataFrame(pd.DataFrame({'client_key':toy_list}))
    spark_df = spark_df.withColumn("client_key", spark_df["client_key"].cast("integer"))
    return spark_df

def train_dev_test_data(data_spark_df, clientekey_spark_df):
    tdt_df = data_spark_df.alias('dspark').join(clientekey_spark_df.alias('cl_spark')
                                                , col('dspark.client_key') == col('cl_spark.client_key')
                                               , 'inner').select([col('dspark.'+xx) for xx in data_spark_df.columns])
    return tdt_df    

### AWS credentials

In [None]:
aws_id = 'my_aws_id'
aws_secret = 'my_secret_access_key'
s3_client = boto3.client('s3', aws_access_key_id=aws_id, aws_secret_access_key=aws_secret)

In [None]:
glueContext = GlueContext(sc)

#### databases

In [None]:
database_sandbox = 'my_sandbox'
clie_sandbox = 'client_sandbox'

#### tables

In [None]:
table_data   = 'your_data_table'
table_dim_client = 'your_client_dimmension_table'
table_segment = 'your_client_segment_table'
table_location = 'your_client_location_table'
discount_data = 'your_discount_table'

#### predicates

In [None]:
sb_pred = "(year >='2014')"

#### reading tables from AWS catalog

In [None]:
data_sb = glueContext.create_dynamic_frame.from_catalog(database = database_sandbox 
                                                            , table_name = table_data 
                                                            , push_down_predicate= sb_pred )

data_sb_dim_client = glueContext.create_dynamic_frame.from_catalog(database = database_sandbox 
                                                            , table_name = table_dim_client 
                                                             )

data_segment = glueContext.create_dynamic_frame.from_catalog(database = clie_sandbox 
                                                            , table_name = table_segment 
                                                             )

data_szrt = glueContext.create_dynamic_frame.from_catalog(database = database_sandbox 
                                                            , table_name = table_location 
                                                             )

data_discount = glueContext.create_dynamic_frame.from_catalog(database = database_sandbox 
                                                            , table_name = discount_data 
                                                             )

### convert to pyspark dataframe

In [None]:
sdf_sb = data_sb.toDF()
sdf_dim_client = data_sb_dim_client.toDF()
sdf_segment = data_segmentos.toDF()
sdf_szrt = data_szrt.toDF()
sdf_discount = data_discount.toDF()

#### filtering 

In [None]:
my_custom_filter = trim(col('feature_a'))=='something' 
sdf_sb = sdf_sb.filter(my_custom_filter)

szrt_filter = ((trim(col('feature_b')).isin(['code_a','code_b','code_z']) )
               |(trim(col('feature_c')) == 'location_a'))

sdf_szrt = sdf_szrt.filter(~szrt_filter)
sdf_sb = sdf_sb.alias('dd').join(sdf_szrt.alias('dszrt')
                               , col('dd.client_key') == col('dszrt.client_key')
                               , 'inner')\
                        .select([col('dd.'+xx) for xx in sdf_sb.columns])

#### add column with day of week event

In [None]:
sdf_sb = sdf_sb.withColumn('day_of_week',dayofweek(col('event_date')))\
                .withColumn('event_date_month',month(col('event_date')))

#### discount and transportation cost data

In [None]:
sdf_sb = sdf_sb.alias('dd1').join(sdf_discount.alias('dct')
                                       , (col('dd1.client_key') == col('dct.client_key')) 
                                        & (col('dd1.event_date') == col('dct.event_date'))
                                       , 'left').select([col('dd1.'+xx) for xx in sdf_sb.columns] +
                                                        [col('dct.feature_a').alias('dcto_feature_a')
                                                        ,col('dct.feature_b').alias('dcto_feature_b')]
                                                        )

#### data socio demografica

In [None]:
all_data = sdf_sb.alias('sb').join(sdf_dim_client.alias('dcl')
                                  , col('sb.client_key') == col('dcl.client_key')
                                   , 'left').select([col('sb.'+xx) for xx in sdf_sb.columns] +
                                                    [col('dcl.feature_a').alias('dim_feature_a')
                                                    ,col('dcl.feature_b').alias('dim_feature_b')]
                                                    )

#### add features

In [None]:
all_data = all_data.withColumn('feature_f', col('feature_d')+ col('feature_e'))\
                    .withColumn('feature_h', log10(col('feature_g')))

#### read holidays file from S3

In [None]:
holidays_file = s3_client.list_objects_v2(Bucket='my_bucket'
                         , Prefix = 'my_folder/my_sub_folder')['Contents']    

file_object = s3_client.get_object(Bucket='my_bucket'
                             ,Key=holidays_file[0]['Key'])

file_body = file_object['Body']
csv_string = file_body.read().decode('utf-8')
holidays_cl_df = pd.read_csv(StringIO(csv_string))

holidays_cl = []
for hcl in holidays_cl_df['col']:
    holidays_cl.append(datetime.strptime(hcl, '%Y-%m-%d').date())

#### retrieve all dates where clients do a purchase, with these dates we can check how many holidays and working days are in the next rolling month, plus how long until the next holiday

In [None]:
all_events = all_data.select('event_date').distinct().toPandas()    
nhm = []
bd = []
dnh = []
for base in all_events['event_date']:
    ## NUMBER OF HOLIDAYS IN ROLLING MONTH
    numdays = (my_add_months(base,1) - base).days
    date_list = [base + timedelta(days=x) for x in range(numdays)]
    holidays_next_month =[]
    bd_count = 0
    for dt in date_list:
        holidays_next_month.append(dt in holidays_cl)
        ### BUSINESS DAYS
        if (dt.weekday() < 5 and not dt in holidays_cl):
            bd_count +=1
    nhm.append(np.sum(holidays_next_month ))
    ## BUSINESS DAYS
    bd.append(bd_count)
    # days until next holiday
    dnh.append((holidays_cl[np.searchsorted(holidays_cl, base)] - base).days)
    
all_events['rolling_month_holidays'] = nhm
all_events['rolling_month_business_days'] = bd   
all_events['days_to_next_holiday'] = dnh 

#### transform to spark dataframe to be able to do left join

In [None]:
all_events_spark = spark.createDataFrame(all_events)

In [None]:
## add info about holidays
all_data = all_data.alias('ad').join(all_events_spark.alias('aes'), col('ad.event_date') == col('aes.event_date')
                                               , 'left').select([col('ad.'+xx) for xx in all_data.columns] +
                                                                [col('aes.rolling_month_holidays')
                                                                ,col('aes.rolling_month_business_days')
                                                                ,col('aes.days_to_next_holiday')]
                                                                )

#### exclude big clients

#### say you have some clients that are catalogued as "big", we're gonna implement a left_anti join 

In [None]:
# filter out clients, based on internal size classification 
def exclude_big_clients(all_data_df):
    # define size filter to exclude clients
    size_filter = (trim(col('cluster_size')).isin[('size_xl','size_xxl')])

    segments_filt = sdf_segment.filter(size_filter).select('client_key').distinct()
    
    all_data_df = all_data_df.alias('tdt0').join(segments_filt.alias('d_seg')
                                                    , col('tdt0.client_key') == col('d_seg.client_key')
                                               , 'left_anti').select([col('tdt0.'+xx) for xx in all_data_df.columns])
    return(all_data_df)

In [None]:
all_data = exclude_big_clients(all_data)

#### Extreme values filter ,non normal distribution. Usually the distribution is non-normal (long tail), and therefore for this particular problem we used the 97.5% percentile to filter outliers.

In [None]:
#compute bounds for outlier detection under normal distribution, using inter quartile range
def calculate_bounds(df):
    bounds = {
        c: dict(
            zip(["q1", "q3"], df.approxQuantile(c, [0.25, 0.75], 0))
        )
        for c,d in zip(df.columns, df.dtypes) 
    }

    for c in bounds:
        iqr = bounds[c]['q3'] - bounds[c]['q1']
        bounds[c]['min'] = bounds[c]['q1'] - (iqr * 1.5)
        bounds[c]['max'] = bounds[c]['q3'] + (iqr * 1.5)

    return bounds
#compute bounds for outlier detection under non-normal distribution
def calculate_bounds_non_normal(df):
    bounds = {
        c: dict(
            zip(["min", "max"], df.approxQuantile(c, [0.025, 0.975], 0)) # q0025 and q0975
        )
        for c,d in zip(df.columns, df.dtypes) 
    }
    return bounds  

In [None]:
outliers_bounds = calculate_bounds_non_normal(all_data.select('feature_m','feature_l'))
clie_schema = StructType([StructField('client_key',LongType(), True)])
outliers_clie = spark.createDataFrame(spark.sparkContext.emptyRDD(), clie_schema)

#### since we're interested in the purchase pattern of each client, if any event is classified as an outlier, then we exclude all the history of that client.
#### please note that we only filtered the 'max' value, this was a business call. But you can easily implement the filter for both min and max by getting rid of the commented line of code

In [None]:
for cols in outliers_bounds:
    outlier_filter = (all_data[cols] > outliers_bounds[cols]['max'] ) # | (all_data[cols] < outliers_bounds[cols]['min']) 
    outliers_clie = outliers_clie.unionAll(all_data.filter(outlier_filter).select('client_key').distinct())

#### exclude outlier clients

In [None]:
all_data_no_outlier = all_data.alias("ado").join(outliers_clie.alias("out_clie")
                                      ,(col("ado.client_key") == col("out_clie.client_key")), 'left_anti')

## one hot encoding
#### Here we have two types of encoding, one is made based on integers (although in an string data type), while th other is done baed on categorical data. The process differ if using one or the other.

#### For a "integer" variable we would simply use

In [None]:
encoder_integer_type = OneHotEncoder(inputCol="feature_i", outputCol="feature_i_OHE", dropLast = True)
all_data_no_outlier = encoder_integer_type.transform(all_data_no_outlier)

#### Whilst if we have a "categorical" variable we would do the following, first you need to create an indexer which it just a mapping from the categorical values to integer (ordered by descending frequency). Then based on this indexer you can create a one hot encoder

In [None]:
indexer = StringIndexer(inputCol="feature_j", outputCol="feature_j_Index")
all_data_no_outlier = indexer.fit(all_data_no_outlier).transform(all_data_no_outlier)
encoder_event_desc = OneHotEncoder(inputCol="feature_j_Index", outputCol="feature_j_OHE", dropLast = True)
all_data_no_outlier = encoder_event_desc.transform(all_data_no_outlier)

#### OneHotEncoder outputs sparse vectors, therefore we need to convert them to dense vectors (otherwise VectorAssembler doesn't work)

In [None]:
all_data_no_outlier = all_data_no_outlier.withColumn('feature_i_OHE_dense', to_dense_vector(col('feature_i_OHE')))\
                            .withColumn('feature_j_OHE_dense', to_dense_vector(col('feature_j_OHE')))

#### keep only relevant columns

In [None]:
use_cols = ['feature_a', ... , 'feature_z','client_key','event_date']
                
all_data_no_outlier = all_data_no_outlier.select(use_cols)

#### Split data sets to training develop testing. Be aware that the Spark built-in method randomSplit doesn't work properly, i.e. some samples belongs to both training and testing datasets, that's why we used random sampling.

In [None]:
current_client = all_data_no_outlier.filter((col('event_date') >= date(2020,9,1) ) )\
                .select('client_key').distinct().toPandas()

historic_clients = all_data_no_outlier.select('client_key').distinct().toPandas()

test_client = random.sample(list(current_client['client_key'])
                           ,int(len(current_client['client_key'])*0.1)) 

train_dev_client = list(set(historic_clients['client_key']) - set(test_client))
dev_client = random.sample(train_dev_client, int(len(train_dev_client)*0.1) )
train_client = list(set(train_dev_client) - set(dev_client))

## amount of clients to train - dev - test
print('amount of clientes to train ' + str(len(train_client)))
print('amount of clientes to dev ' + str(len(dev_client)))
print('amount of clientes to test ' + str(len(test_client)))

In [None]:
clientekey_train = client_list(train_client)
clientekey_dev = client_list(dev_client)
clientekey_test = client_list(test_client)

## train / dev / test data sets -- bluit with respective client lists
train_data  = train_dev_test_data(all_data_no_outlier, clientekey_train)
dev_data  = train_dev_test_data(all_data_no_outlier, clientekey_dev)
test_data  = train_dev_test_data(all_data_no_outlier, clientekey_test)

# Sampling your data
### the basic idea here is that given any time step you'll look $N$ months into the "PAST", and given that behaviour you'll predict what's gonna happen in the "FUTURE". Now the future here it's as simple as adding up the amount of units a client bought in the next month (our target), whereas the data with what we'll try to predict this target is the purchase pattern in the previous $N$ months, be aware that for each client this pattern means a different amount of purchases, each with it's corresponding features (same features for all the events).

![client_history.jpeg](attachment:client_history.jpeg)

# time steps to sample data

In [None]:
t_steps = [date(2015,1,1)]
while t_steps[-1] < date(2020,9,1):
    t_steps.append(add_months(t_steps[-1],1))

### dates to validate clients in time steps
### the idea is that given any time step, the valid information for that specific time step will be the clients that have both data previous and post that time

In [None]:
min_max_dates_train = train_data.groupBy("client_key").agg(min("event_date"), max("event_date"))
min_max_dates_dev   = dev_data.groupBy("client_key").agg(min("event_date"), max("event_date"))
min_max_dates_test  = test_data.groupBy("client_key").agg(min("event_date"), max("event_date"))

In [None]:
## variables to be selected for output
feature_list = ['feature_a',... ,'feature_z']

### function to assemble all input columns into just one vector

In [None]:
assembler = VectorAssembler(
    inputCols=feature_list ,
    outputCol="input_list")

### convert to interval [0,1]

In [None]:
scaler = MinMaxScaler(inputCol="input_list_dense", outputCol="scaledFeatures")

# function to sample data

In [None]:
my_schema = StructType(
            [StructField('client_key',LongType(), True),
             StructField('save_features',StringType(), True), 
             StructField('TARGET', DoubleType(), True)]
            )


def format_output(dates_seq,tdt_spark_df,dates_df,file_name, window_t):    
    #create empty data frame
    df_global = spark.createDataFrame(spark.sparkContext.emptyRDD(), my_schema)
    
    #save an empty copy of the output_df, this is done to then append more info (t_steps) to the file
    df_global.write.mode('overwrite').parquet("s3://my_bucket/" + file_name)
    
    for t_step in dates_seq:
        ## filter clients with valid sample for each date, meaning that if you step at a particular time T, then there's a past data set and a future data set
        valid_client_samples = dates_df.filter(col('max(event_date)')>=t_step)
        valid_df = tdt_spark_df.alias('tdt').join(valid_client_samples.alias('vclie')
                                                  , col('tdt.client_key') == col('vclie.client_key')
                                        , 'inner').select([col('tdt.'+xx) for xx in tdt_spark_df.columns])
        ## past and future data sets
        past_df = valid_df.filter((col('event_date')>=add_months(t_step,-window_t))& (col('event_date')<t_step))
        future_df = valid_df.filter((col('event_date')>=t_step)
                                  & (col('event_date')< add_months(t_step,1)) )\
                                    .select('client_key','amount_units') 

        #build target data frame
        target = future_df.groupBy('client_key').agg(sum("amount_units").alias("TARGET"))
        
        #build output dataframe
        #first compute time difference between time step and event_date
        past_df = past_df.withColumn("date_diff",datediff(lit(t_step),col('event_date')))
        #replace null values by zero
        past_df = past_df.fillna(my_diff_month(t_step, date(2007,1,1)), subset=['some_feature'])
        past_df = past_df.fillna(0, subset=feature_list)
        #vector assembler
        output = assembler.transform(past_df)
        output = output.withColumn('input_list_dense', to_dense_vector(col('input_list')))
        # generate MinMaxScalerModel
        scalerModel = scaler.fit(output)
        # rescale each feature to range [min, max].
        output = scalerModel.transform(output)
        ## dense vector to array(double)
        output = output.withColumn('input_vec_array', dense_to_array_udf('scaledFeatures'))
        # collect scaled features into list
        df2_agg = output.groupBy("client_key").agg(collect_list("input_vec_array").alias('input_vec'))
        
        ## format it to array(float)
        df2_agg = df2_agg.rdd.map(lambda row: (row['client_key'], functools.reduce(add, row['input_vec'])))\
                    .toDF(['client_key', 'input_vec'])
        #final output
        df_to_append = df2_agg.alias('ip').join(target.alias('tgt'), col('ip.client_key') == col('tgt.client_key')
                                 , 'inner').select( 'ip.client_key'
                                                    ,'ip.input_vec'
                                                   , 'tgt.TARGET') 
        #format to be able to read as csv (change ',' to '|')
        df_to_append = df_to_append.select( 'client_key'
                                , concat_ws("|", col("input_vec").cast("array<string>")).alias("save_features")
                                , 'TARGET')
        
        ## append to the existing file
        print('appending data of time step ' + str(t_step))
        df_to_append.write.mode('append').parquet("s3://my_bucket/" + file_name)
        
        ## UNPERSIST
        valid_client_samples = valid_client_samples.unpersist()
        valid_df = valid_df.unpersist()
        past_df = past_df.unpersist()
        future_df = future_df.unpersist()
        target = target.unpersist()
        output = output.unpersist()
        df2_agg = df2_agg.unpersist()
        df_to_append = df_to_append.unpersist()
        
        #end of for loop
    return df_to_append

### be aware that we did data_frame.persist() that's only because it's faste to persist the specific data frame to memory, for the same reason we .unpersist() it to free memory.

#### dev data

In [None]:
dev_data = dev_data.persist()
min_max_dates_dev = min_max_dates_dev.persist()

dev_df = format_output(dates_seq = t_steps
                             ,tdt_spark_df = dev_data
                             ,dates_df = min_max_dates_dev
                             ,file_name = '/model_XYZ/parquet_files/dev_data'
                             ,window_t=12 #it could be as many months as you want
                      )
                             
dev_data = dev_data.unpersist()
min_max_dates_dev = min_max_dates_dev.unpersist()

#### train data

In [None]:
train_data = train_data.persist()
min_max_dates_train = min_max_dates_train.persist()

train_df = format_output(dates_seq = t_steps
                             ,tdt_spark_df = train_data
                             ,dates_df = min_max_dates_train
                             ,file_name = '/model_XYZ/parquet_files/train_data'
                             ,window_t=12
                             )

train_data = train_data.unpersist()
min_max_dates_train = min_max_dates_train.unpersist()

#### test data
#### we wanted to test our model in this particular dates given the pandemics, and that everything changed, so it was a way to test how good it was generalizing

In [None]:
t_steps_test = [date(2019,10,1)]
while t_steps_test[-1] < date(2020,11,1):
    t_steps_test.append(add_months(t_steps_test[-1],1))

test_data = test_data.persist()
min_max_dates_test = min_max_dates_test.persist()

test_df = format_output(dates_seq = t_steps_test
                             ,tdt_spark_df = test_data
                             ,dates_df = min_max_dates_test
                             ,file_name = '/model_XYZ/parquet_files/test_data'
                             ,window_t=12
                             )

test_data = test_data.unpersist()
min_max_dates_test = min_max_dates_test.unpersist()