If you took `CHE1147`, this looks familiar; it is one of the assignments on feature engineering.
Regardless, the situation is that our company is growing from a startup to a bigger organization and we now want to 
**migrate the Python code** we developed and ran on our laptops with small datasets to a big data/cloud architecture in Databricks.
You’ve been hired to do this **migration**. This is actually a very realistic scenario and possibly your work for a whole year: migrate features and models. The documentation of the old Python code below is all you have. You have to find out how to perform the transformations in PySpark. Some steps will be identical to plain Python, some will be different, some will not be needed at all.

In [0]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib
import matplotlib.pyplot as plt
import itertools

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import *

In [0]:
spark = SparkSession.builder.appName('Assignment 2 Feature Engineering').getOrCreate()

#### Q1 Feature engineering

Here, you are going to create features from a very simple dataset: retail transaction data from Kaggle. The dataset provides the customer ID, date of the transaction and transaction amount as shown in the table below. Although this may look like a very simple dataset, you will build a wide range of features. The features will then be used as inputs in several models in upcoming assignments, in which you will try to predict the client’s response to a promotion campaign.

##### 1.1 Import the data and create the anchor date columns

In order to create features, you need to create some anchor dates. The most typical for transaction data is the end of the month and the year.

###### 1. Import the dataset as txn and identify the number of rows.

In [0]:
txnSchema = StructType([
                        StructField("customer_id", StringType(), True),
                        StructField("trans_date", StringType(), True),
                        StructField("tran_amount", IntegerType(), True)
                      ])  

txn = spark.read.csv('/FileStore/tables/Retail_Data_Transactions.csv', schema = txnSchema, header = True)
txn.show(5)
txn.printSchema()
print('There are',txn.count(), 'rows in the file')

+-----------+----------+-----------+
|customer_id|trans_date|tran_amount|
+-----------+----------+-----------+
|     CS5295| 11-Feb-13|         35|
|     CS4768| 15-Mar-15|         39|
|     CS2122| 26-Feb-13|         52|
|     CS1217| 16-Nov-11|         99|
|     CS1850| 20-Nov-13|         78|
+-----------+----------+-----------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- trans_date: string (nullable = true)
 |-- tran_amount: integer (nullable = true)

There are 125000 rows in the file


###### Identify the number of rows

The number of ROWS: 125000

###### 2. The date-format in column ’trans date’ is not standard. Create a new column ’txn date’ from ’trans date’ with pd.to datetime and drop the column ’trans date’.

In [0]:
# Adding the new column with the dates in proper dtype and dropping the unnecessary column.

txn = txn.withColumn('txn_date', to_date(txn.trans_date, 'dd-MMM-yy')).drop('trans_date')
txn.show(5)
txn.printSchema()

+-----------+-----------+----------+
|customer_id|tran_amount|  txn_date|
+-----------+-----------+----------+
|     CS5295|         35|2013-02-11|
|     CS4768|         39|2015-03-15|
|     CS2122|         52|2013-02-26|
|     CS1217|         99|2011-11-16|
|     CS1850|         78|2013-11-20|
+-----------+-----------+----------+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- tran_amount: integer (nullable = true)
 |-- txn_date: date (nullable = true)



###### 3. Identify the min() and max() of column ’txn date’.

In [0]:
# The Minimum and Maximum dates

# txn.select(min('txn_date'), max('txn_date')).show()
txn.agg(min('txn_date'), max('txn_date')).show()

+-------------+-------------+
|min(txn_date)|max(txn_date)|
+-------------+-------------+
|   2011-05-16|   2015-03-16|
+-------------+-------------+



###### 4. Create the column ’ME DT’: the last day of the month in the ’trans date’ column. DateOffset objects is a simple way to do this in pandas.

ASSUMPTION: 'trans date' = 'txn_date', essentially because the latter is derived from the former. The only difference is the format, no numerical changes have taken place

In [0]:
txn = txn.withColumn('ME_DT', last_day(txn.txn_date))
txn.show(5)

+-----------+-----------+----------+----------+
|customer_id|tran_amount|  txn_date|     ME_DT|
+-----------+-----------+----------+----------+
|     CS5295|         35|2013-02-11|2013-02-28|
|     CS4768|         39|2015-03-15|2015-03-31|
|     CS2122|         52|2013-02-26|2013-02-28|
|     CS1217|         99|2011-11-16|2011-11-30|
|     CS1850|         78|2013-11-20|2013-11-30|
+-----------+-----------+----------+----------+
only showing top 5 rows



###### 5. Create the column ’YEAR’: the year in the ’trans date’ column. DatetimeIndex with attribute .year will help you do so.

In [0]:
txn = txn.withColumn('YEAR', year(txn.txn_date))
txn.show(5)
txn.printSchema()

+-----------+-----------+----------+----------+----+
|customer_id|tran_amount|  txn_date|     ME_DT|YEAR|
+-----------+-----------+----------+----------+----+
|     CS5295|         35|2013-02-11|2013-02-28|2013|
|     CS4768|         39|2015-03-15|2015-03-31|2015|
|     CS2122|         52|2013-02-26|2013-02-28|2013|
|     CS1217|         99|2011-11-16|2011-11-30|2011|
|     CS1850|         78|2013-11-20|2013-11-30|2013|
+-----------+-----------+----------+----------+----+
only showing top 5 rows

root
 |-- customer_id: string (nullable = true)
 |-- tran_amount: integer (nullable = true)
 |-- txn_date: date (nullable = true)
 |-- ME_DT: date (nullable = true)
 |-- YEAR: integer (nullable = true)



In [0]:
# Confirmation with the snapshot provided in the question

# NOTE: Only temporary view just to confirm with ss given in the question
txn.toPandas().iloc[55:60]

Unnamed: 0,customer_id,tran_amount,txn_date,ME_DT,YEAR
55,CS2662,88,2014-08-31,2014-08-31,2014
56,CS2209,35,2012-03-12,2012-03-31,2012
57,CS4530,40,2011-06-05,2011-06-30,2011
58,CS2848,53,2013-02-04,2013-02-28,2013
59,CS2596,55,2011-09-19,2011-09-30,2011


The table output should look like the snapshot below. Make sure that the column ’ME DT’ works as expected. E.g. for the first line ’trans date’: 2018-08-31 is converted to 2018-08-31. A common mistake in implementing the DateOffset transformation is to convert 2018-08-31 to 2018-09-30 (a date that falls on the last day of a month is converted to the last day of the next month!!!).

##### 1.2 Create features that capture annual spending

Here the approach is to capture the client’s annual spending. The rationale behind this approach is that the clients spend is not very frequent to capture in a monthly aggregation.

###### 1. Using groupby and NamedAgg create clnt_annual_aggregations, the annual aggregations dataframe: with sum, mean, std, var, sem, max, min, count as the aggregation functions. A snapshot of the output table is shown below. Notice that the output is a typical MultiIndex pandas dataframe.

In [0]:
txn.show(5)

+-----------+-----------+----------+----------+----+
|customer_id|tran_amount|  txn_date|     ME_DT|YEAR|
+-----------+-----------+----------+----------+----+
|     CS5295|         35|2013-02-11|2013-02-28|2013|
|     CS4768|         39|2015-03-15|2015-03-31|2015|
|     CS2122|         52|2013-02-26|2013-02-28|2013|
|     CS1217|         99|2011-11-16|2011-11-30|2011|
|     CS1850|         78|2013-11-20|2013-11-30|2013|
+-----------+-----------+----------+----------+----+
only showing top 5 rows



In [0]:
# Aggregate functions to find the values, basically used to find the value for many columns at once
# Used as an attribute in the groupby code

clnt_annual_aggregations = txn.groupby(['customer_id', 'YEAR']).agg(sum(txn.tran_amount).alias('ann_txn_amt_sum'),
                                                                    avg(txn.tran_amount).alias('ann_txn_amt_mean'),
                                                                    stddev(txn.tran_amount).alias('ann_txn_amt_std'),
                                                                    variance(txn.tran_amount).alias('ann_txn_amt_var'),
#                                                                     Standard error of mean (SEM) = stddev/sqrt(smaple size)
                                                                    (stddev(txn.tran_amount)/sqrt(count(txn.tran_amount))).alias('ann_txn_amt_sem'),
                                                                    max(txn.tran_amount).alias('ann_txn_amt_max'),
                                                                    min(txn.tran_amount).alias('ann_txn_amt_min'),
                                                                    count(txn.tran_amount).alias('ann_txn_amt_count'))
# Sorting the new dataframe based on customer id and YEAR
clnt_annual_aggregations = clnt_annual_aggregations.sort(clnt_annual_aggregations.customer_id.asc(), clnt_annual_aggregations.YEAR.asc())
display(clnt_annual_aggregations)
# clnt_annual_aggregations.show(5)

customer_id,YEAR,ann_txn_amt_sum,ann_txn_amt_mean,ann_txn_amt_std,ann_txn_amt_var,ann_txn_amt_sem,ann_txn_amt_max,ann_txn_amt_min,ann_txn_amt_count
CS1112,2011,212,70.66666666666667,22.03028218914441,485.33333333333337,12.719189352225944,96,56,3
CS1112,2012,337,67.4,12.720062892926276,161.8,5.688585061331157,81,52,5
CS1112,2013,212,70.66666666666667,34.50120770833006,1190.3333333333333,19.91928155777155,105,36,3
CS1112,2014,212,70.66666666666667,16.862186493255653,284.33333333333337,9.735387911006825,90,59,3
CS1112,2015,39,39.0,,,,39,39,1
CS1113,2011,244,81.33333333333333,21.07921567168317,444.33333333333337,12.170090842352456,94,57,3
CS1113,2012,374,74.8,17.035257556021865,290.20000000000005,7.618398781896364,95,51,5
CS1113,2013,426,85.2,13.0843417870369,171.2,5.851495535331117,97,65,5
CS1113,2014,226,56.5,27.958302285129307,781.6666666666666,13.979151142564652,97,36,4
CS1113,2015,220,73.33333333333333,27.30079363925769,745.3333333333334,15.762120556715852,98,44,3


###### 2. Plot the histogram of the sum and count.

In [0]:
# Histogram of the Annual Transactin Amount SUM

display(clnt_annual_aggregations)

customer_id,YEAR,ann_txn_amt_sum,ann_txn_amt_mean,ann_txn_amt_std,ann_txn_amt_var,ann_txn_amt_sem,ann_txn_amt_max,ann_txn_amt_min,ann_txn_amt_count
CS1112,2011,212,70.66666666666667,22.03028218914441,485.33333333333337,12.719189352225944,96,56,3
CS1112,2012,337,67.4,12.720062892926276,161.8,5.688585061331157,81,52,5
CS1112,2013,212,70.66666666666667,34.50120770833006,1190.3333333333333,19.91928155777155,105,36,3
CS1112,2014,212,70.66666666666667,16.862186493255653,284.33333333333337,9.735387911006825,90,59,3
CS1112,2015,39,39.0,,,,39,39,1
CS1113,2011,244,81.33333333333333,21.07921567168317,444.33333333333337,12.170090842352456,94,57,3
CS1113,2012,374,74.8,17.035257556021865,290.20000000000005,7.618398781896364,95,51,5
CS1113,2013,426,85.2,13.0843417870369,171.2,5.851495535331117,97,65,5
CS1113,2014,226,56.5,27.958302285129307,781.6666666666666,13.979151142564652,97,36,4
CS1113,2015,220,73.33333333333333,27.30079363925769,745.3333333333334,15.762120556715852,98,44,3


Output can only be rendered in Databricks

In [0]:
# Histogram of the Annual Transactin Amount COUNT

display(clnt_annual_aggregations)

customer_id,YEAR,ann_txn_amt_sum,ann_txn_amt_mean,ann_txn_amt_std,ann_txn_amt_var,ann_txn_amt_sem,ann_txn_amt_max,ann_txn_amt_min,ann_txn_amt_count
CS1112,2011,212,70.66666666666667,22.03028218914441,485.33333333333337,12.719189352225944,96,56,3
CS1112,2012,337,67.4,12.720062892926276,161.8,5.688585061331157,81,52,5
CS1112,2013,212,70.66666666666667,34.50120770833006,1190.3333333333333,19.91928155777155,105,36,3
CS1112,2014,212,70.66666666666667,16.862186493255653,284.33333333333337,9.735387911006825,90,59,3
CS1112,2015,39,39.0,,,,39,39,1
CS1113,2011,244,81.33333333333333,21.07921567168317,444.33333333333337,12.170090842352456,94,57,3
CS1113,2012,374,74.8,17.035257556021865,290.20000000000005,7.618398781896364,95,51,5
CS1113,2013,426,85.2,13.0843417870369,171.2,5.851495535331117,97,65,5
CS1113,2014,226,56.5,27.958302285129307,781.6666666666666,13.979151142564652,97,36,4
CS1113,2015,220,73.33333333333333,27.30079363925769,745.3333333333334,15.762120556715852,98,44,3


Output can only be rendered in Databricks

In [0]:
print('The number of rows is', clnt_annual_aggregations.count())
print('The number of columns is', len(clnt_annual_aggregations.columns))

The number of rows is 31140
The number of columns is 10


###### 3. Reset the index and reshape the table with the pivot table function to create the clnt_annual_aggregations pivot table shown below with 40 columns (why 40?). You should expect columns with NaN values. Impute the NaN entries when you perform the pivot table function and explain your choice of values.

In [0]:
clnt_annual_aggregations.show(5)

+-----------+----+---------------+-----------------+------------------+------------------+------------------+---------------+---------------+-----------------+
|customer_id|YEAR|ann_txn_amt_sum| ann_txn_amt_mean|   ann_txn_amt_std|   ann_txn_amt_var|   ann_txn_amt_sem|ann_txn_amt_max|ann_txn_amt_min|ann_txn_amt_count|
+-----------+----+---------------+-----------------+------------------+------------------+------------------+---------------+---------------+-----------------+
|     CS1112|2011|            212|70.66666666666667| 22.03028218914441|485.33333333333337|12.719189352225943|             96|             56|                3|
|     CS1112|2012|            337|             67.4|12.720062892926277|             161.8| 5.688585061331157|             81|             52|                5|
|     CS1112|2013|            212|70.66666666666667| 34.50120770833006|1190.3333333333333| 19.91928155777155|            105|             36|                3|
|     CS1112|2014|            212|70.666

In [0]:
clnt_annual_aggregations_pt = clnt_annual_aggregations.groupby('customer_id').pivot('YEAR').agg(first('ann_txn_amt_sum').alias('ann_txn_amt_sum'),
                                                                                                first('ann_txn_amt_mean').alias('ann_txn_amt_mean'),
                                                                                                first('ann_txn_amt_std').alias('ann_txn_amt_std'),
                                                                                                first('ann_txn_amt_var').alias('ann_txn_amt_var'),
                                                                                                first('ann_txn_amt_sem').alias('ann_txn_amt_sem'),
                                                                                                first('ann_txn_amt_max').alias('ann_txn_amt_max'),
                                                                                                first('ann_txn_amt_min').alias('ann_txn_amt_min'),
                                                                                                first('ann_txn_amt_count').alias('ann_txn_amt_count'))

clnt_annual_aggregations_pt = clnt_annual_aggregations_pt.select([col(cols).name('_'.join(x for x in cols.split('_')[::-1])) for cols in clnt_annual_aggregations_pt.columns])

display(clnt_annual_aggregations_pt)

id_customer,sum_amt_txn_ann_2011,mean_amt_txn_ann_2011,std_amt_txn_ann_2011,var_amt_txn_ann_2011,sem_amt_txn_ann_2011,max_amt_txn_ann_2011,min_amt_txn_ann_2011,count_amt_txn_ann_2011,sum_amt_txn_ann_2012,mean_amt_txn_ann_2012,std_amt_txn_ann_2012,var_amt_txn_ann_2012,sem_amt_txn_ann_2012,max_amt_txn_ann_2012,min_amt_txn_ann_2012,count_amt_txn_ann_2012,sum_amt_txn_ann_2013,mean_amt_txn_ann_2013,std_amt_txn_ann_2013,var_amt_txn_ann_2013,sem_amt_txn_ann_2013,max_amt_txn_ann_2013,min_amt_txn_ann_2013,count_amt_txn_ann_2013,sum_amt_txn_ann_2014,mean_amt_txn_ann_2014,std_amt_txn_ann_2014,var_amt_txn_ann_2014,sem_amt_txn_ann_2014,max_amt_txn_ann_2014,min_amt_txn_ann_2014,count_amt_txn_ann_2014,sum_amt_txn_ann_2015,mean_amt_txn_ann_2015,std_amt_txn_ann_2015,var_amt_txn_ann_2015,sem_amt_txn_ann_2015,max_amt_txn_ann_2015,min_amt_txn_ann_2015,count_amt_txn_ann_2015
CS1664,244.0,81.33333333333333,14.433756729740644,208.3333333333333,8.333333333333334,98.0,73.0,3.0,846.0,76.9090909090909,18.50110561806805,342.2909090909092,5.578293231078914,105.0,49.0,11.0,339.0,84.75,16.235249715767644,263.5833333333333,8.117624857883822,100.0,64.0,4.0,374.0,74.8,21.323695739716413,454.7,9.536246641105713,97.0,40.0,5.0,151.0,75.5,16.263455967290593,264.5,11.5,87.0,64.0,2.0
CS1802,143.0,71.5,31.81980515339464,1012.5,22.5,94.0,49.0,2.0,314.0,78.5,6.454972243679028,41.66666666666666,3.227486121839514,86.0,71.0,4.0,807.0,80.7,22.10103064464541,488.4555555555556,6.988959547425893,104.0,35.0,10.0,223.0,55.75,16.194134740701646,262.25,8.097067370350823,72.0,39.0,4.0,426.0,60.85714285714285,19.709799351494947,388.4761904761906,7.449603925005396,96.0,35.0,7.0
CS2282,185.0,61.66666666666666,30.74627348693388,945.3333333333331,17.75136927425913,97.0,41.0,3.0,248.0,62.0,25.46893532652409,648.6666666666666,12.734467663262045,92.0,35.0,4.0,516.0,73.71428571428571,13.524228699070491,182.90476190476195,5.111677973100445,97.0,59.0,7.0,424.0,70.66666666666667,18.790068298616337,353.06666666666666,7.671013260609347,91.0,43.0,6.0,79.0,79.0,,,,79.0,79.0,1.0
CS2412,68.0,68.0,,,,68.0,68.0,1.0,392.0,78.4,23.15815191244759,536.3,10.356640381899911,102.0,50.0,5.0,89.0,44.5,3.5355339059327378,12.5,2.5,47.0,42.0,2.0,480.0,68.57142857142857,14.081396034687552,198.2857142857143,5.322267431484902,86.0,42.0,7.0,55.0,55.0,,,,55.0,55.0,1.0
CS2754,185.0,61.66666666666666,26.025628394590846,677.3333333333334,15.025903559446194,87.0,35.0,3.0,275.0,68.75,23.12826553519884,534.9166666666666,11.56413276759942,101.0,46.0,4.0,544.0,68.0,20.63284482843521,425.7142857142857,7.2948122466781635,102.0,39.0,8.0,545.0,90.83333333333331,10.361788777362076,107.36666666666665,4.230182554505709,104.0,74.0,6.0,89.0,89.0,,,,89.0,89.0,1.0
CS2757,350.0,70.0,26.201145013147805,686.5,11.717508267545622,100.0,42.0,5.0,331.0,66.2,24.345430782797827,592.7,10.88760763437037,101.0,35.0,5.0,451.0,64.42857142857143,19.10372688645807,364.95238095238096,7.220530065152329,93.0,37.0,7.0,311.0,77.75,27.536339626028727,758.25,13.768169813014364,102.0,45.0,4.0,83.0,83.0,,,,83.0,83.0,1.0
CS2811,236.0,59.0,26.166135875720485,684.6666666666666,13.083067937860244,96.0,40.0,4.0,330.0,82.5,11.387127235025815,129.66666666666666,5.693563617512908,96.0,69.0,4.0,270.0,67.5,27.380041392712805,749.6666666666666,13.690020696356402,95.0,42.0,4.0,274.0,54.8,10.329569206893384,106.7,4.619523784980439,68.0,41.0,5.0,,,,,,,,
CS3083,233.0,58.25,22.91105992019284,524.9166666666666,11.45552996009642,85.0,35.0,4.0,272.0,68.0,24.61706725018234,606.0,12.30853362509117,91.0,38.0,4.0,239.0,59.75,24.67623688220444,608.9166666666666,12.33811844110222,93.0,41.0,4.0,283.0,70.75,18.83923211457056,354.91666666666674,9.41961605728528,94.0,49.0,4.0,42.0,42.0,,,,42.0,42.0,1.0
CS3128,170.0,85.0,8.48528137423857,72.0,5.999999999999999,91.0,79.0,2.0,271.0,67.75,27.83732506306356,774.9166666666666,13.91866253153178,100.0,37.0,4.0,553.0,61.44444444444444,17.321309932501578,300.02777777777777,5.773769977500526,88.0,38.0,9.0,478.0,59.75,16.429503079173497,269.92857142857144,5.808706519404421,91.0,35.0,8.0,,,,,,,,
CS3155,391.0,78.2,26.752569970004757,715.7,11.964113005150027,105.0,43.0,5.0,503.0,71.85714285714286,13.107613200332231,171.80952380952377,4.9542121156723615,90.0,49.0,7.0,720.0,80.0,21.1482859825566,447.25,7.0494286608522,101.0,38.0,9.0,174.0,58.0,21.16601048851673,448.0,12.220201853215574,82.0,42.0,3.0,70.0,70.0,,,,70.0,70.0,1.0


In [0]:
# Impute the NaN entries when you perform the pivot table function and explain your choice of values.

# Pivot Table Function with Zero-Value Imputation

clnt_annual_aggregations_pt = clnt_annual_aggregations_pt.fillna(value = 0)           # 0-Value Imputation
display(clnt_annual_aggregations_pt)

id_customer,sum_amt_txn_ann_2011,mean_amt_txn_ann_2011,std_amt_txn_ann_2011,var_amt_txn_ann_2011,sem_amt_txn_ann_2011,max_amt_txn_ann_2011,min_amt_txn_ann_2011,count_amt_txn_ann_2011,sum_amt_txn_ann_2012,mean_amt_txn_ann_2012,std_amt_txn_ann_2012,var_amt_txn_ann_2012,sem_amt_txn_ann_2012,max_amt_txn_ann_2012,min_amt_txn_ann_2012,count_amt_txn_ann_2012,sum_amt_txn_ann_2013,mean_amt_txn_ann_2013,std_amt_txn_ann_2013,var_amt_txn_ann_2013,sem_amt_txn_ann_2013,max_amt_txn_ann_2013,min_amt_txn_ann_2013,count_amt_txn_ann_2013,sum_amt_txn_ann_2014,mean_amt_txn_ann_2014,std_amt_txn_ann_2014,var_amt_txn_ann_2014,sem_amt_txn_ann_2014,max_amt_txn_ann_2014,min_amt_txn_ann_2014,count_amt_txn_ann_2014,sum_amt_txn_ann_2015,mean_amt_txn_ann_2015,std_amt_txn_ann_2015,var_amt_txn_ann_2015,sem_amt_txn_ann_2015,max_amt_txn_ann_2015,min_amt_txn_ann_2015,count_amt_txn_ann_2015
CS1664,244,81.33333333333333,14.433756729740644,208.3333333333333,8.333333333333334,98,73,3,846,76.9090909090909,18.50110561806805,342.2909090909092,5.578293231078914,105,49,11,339,84.75,16.235249715767644,263.5833333333333,8.117624857883822,100,64,4,374,74.8,21.323695739716413,454.7,9.536246641105713,97,40,5,151,75.5,16.263455967290593,264.5,11.5,87,64,2
CS1802,143,71.5,31.81980515339464,1012.5,22.5,94,49,2,314,78.5,6.454972243679028,41.66666666666666,3.227486121839514,86,71,4,807,80.7,22.10103064464541,488.4555555555556,6.988959547425893,104,35,10,223,55.75,16.194134740701646,262.25,8.097067370350823,72,39,4,426,60.85714285714285,19.709799351494947,388.4761904761906,7.449603925005396,96,35,7
CS2282,185,61.66666666666666,30.74627348693388,945.3333333333331,17.75136927425913,97,41,3,248,62.0,25.46893532652409,648.6666666666666,12.734467663262045,92,35,4,516,73.71428571428571,13.524228699070491,182.90476190476195,5.111677973100445,97,59,7,424,70.66666666666667,18.790068298616337,353.06666666666666,7.671013260609347,91,43,6,79,79.0,0.0,0.0,0.0,79,79,1
CS2412,68,68.0,0.0,0.0,0.0,68,68,1,392,78.4,23.15815191244759,536.3,10.356640381899911,102,50,5,89,44.5,3.5355339059327378,12.5,2.5,47,42,2,480,68.57142857142857,14.081396034687552,198.2857142857143,5.322267431484902,86,42,7,55,55.0,0.0,0.0,0.0,55,55,1
CS2754,185,61.66666666666666,26.025628394590846,677.3333333333334,15.025903559446194,87,35,3,275,68.75,23.12826553519884,534.9166666666666,11.56413276759942,101,46,4,544,68.0,20.63284482843521,425.7142857142857,7.2948122466781635,102,39,8,545,90.83333333333331,10.361788777362076,107.36666666666665,4.230182554505709,104,74,6,89,89.0,0.0,0.0,0.0,89,89,1
CS2757,350,70.0,26.201145013147805,686.5,11.717508267545622,100,42,5,331,66.2,24.345430782797827,592.7,10.88760763437037,101,35,5,451,64.42857142857143,19.10372688645807,364.95238095238096,7.220530065152329,93,37,7,311,77.75,27.536339626028727,758.25,13.768169813014364,102,45,4,83,83.0,0.0,0.0,0.0,83,83,1
CS2811,236,59.0,26.166135875720485,684.6666666666666,13.083067937860244,96,40,4,330,82.5,11.387127235025815,129.66666666666666,5.693563617512908,96,69,4,270,67.5,27.380041392712805,749.6666666666666,13.690020696356402,95,42,4,274,54.8,10.329569206893384,106.7,4.619523784980439,68,41,5,0,0.0,0.0,0.0,0.0,0,0,0
CS3083,233,58.25,22.91105992019284,524.9166666666666,11.45552996009642,85,35,4,272,68.0,24.61706725018234,606.0,12.30853362509117,91,38,4,239,59.75,24.67623688220444,608.9166666666666,12.33811844110222,93,41,4,283,70.75,18.83923211457056,354.91666666666674,9.41961605728528,94,49,4,42,42.0,0.0,0.0,0.0,42,42,1
CS3128,170,85.0,8.48528137423857,72.0,5.999999999999999,91,79,2,271,67.75,27.83732506306356,774.9166666666666,13.91866253153178,100,37,4,553,61.44444444444444,17.321309932501578,300.02777777777777,5.773769977500526,88,38,9,478,59.75,16.429503079173497,269.92857142857144,5.808706519404421,91,35,8,0,0.0,0.0,0.0,0.0,0,0,0
CS3155,391,78.2,26.752569970004757,715.7,11.964113005150027,105,43,5,503,71.85714285714286,13.107613200332231,171.80952380952377,4.9542121156723615,90,49,7,720,80.0,21.1482859825566,447.25,7.0494286608522,101,38,9,174,58.0,21.16601048851673,448.0,12.220201853215574,82,42,3,70,70.0,0.0,0.0,0.0,70,70,1


*Why 40?*

`There's 5 years and 8 variables to calculate for each year, thus 40 columns in total.`


*Explain your choice of Imputation*

`I have choosen to do a 0 value imputation here, because these are transaction counts, sums and maxs', etc essentially, therefore if a trasaction is not made then the above aggregations are not possible and thus;`
        
        done = Positive integer
        OR
        not done = 0

###### 4. The pivoted object you created is a MultiIndex object with hierarchical indexes. You can see the first level (i.e. 0) in the snapshot above with names ’ann txn amt ave’, ’ann txn amt max’ (and more as indicated by the ...) and the second level (i.e. 1) with names ’2011’, ’2012’, etc. You can confirm the multiple levels of the columns with the following two expressions

        clnt_annual_aggregations_pivot . columns . nlevels
        clnt_annual_aggregations_pivot . columns

In [0]:
# List of Columns names and Number of Columns

print('The column names are listed below: \n', clnt_annual_aggregations_pt.columns)
print('\nThere are', len(clnt_annual_aggregations_pt.columns), 'coumns')
print('\nThere are', clnt_annual_aggregations_pt.count(), 'rows')

The column names are listed below: 
 ['id_customer', 'sum_amt_txn_ann_2011', 'mean_amt_txn_ann_2011', 'std_amt_txn_ann_2011', 'var_amt_txn_ann_2011', 'sem_amt_txn_ann_2011', 'max_amt_txn_ann_2011', 'min_amt_txn_ann_2011', 'count_amt_txn_ann_2011', 'sum_amt_txn_ann_2012', 'mean_amt_txn_ann_2012', 'std_amt_txn_ann_2012', 'var_amt_txn_ann_2012', 'sem_amt_txn_ann_2012', 'max_amt_txn_ann_2012', 'min_amt_txn_ann_2012', 'count_amt_txn_ann_2012', 'sum_amt_txn_ann_2013', 'mean_amt_txn_ann_2013', 'std_amt_txn_ann_2013', 'var_amt_txn_ann_2013', 'sem_amt_txn_ann_2013', 'max_amt_txn_ann_2013', 'min_amt_txn_ann_2013', 'count_amt_txn_ann_2013', 'sum_amt_txn_ann_2014', 'mean_amt_txn_ann_2014', 'std_amt_txn_ann_2014', 'var_amt_txn_ann_2014', 'sem_amt_txn_ann_2014', 'max_amt_txn_ann_2014', 'min_amt_txn_ann_2014', 'count_amt_txn_ann_2014', 'sum_amt_txn_ann_2015', 'mean_amt_txn_ann_2015', 'std_amt_txn_ann_2015', 'var_amt_txn_ann_2015', 'sem_amt_txn_ann_2015', 'max_amt_txn_ann_2015', 'min_amt_txn_ann_2015'

*What are your observations regarding the number of levels and the column names?*

`Since it is the PySpark dataframe, there is only single level of index after pivoting, therefore only 1 level of columns. There are a total of 41 columns including the 'id_customer' column and there are 6889 rows `

###### 5. Finally, you want to save the dataframe clnt annual aggregations pivot as an .xlsx file for future use in the machine learning assignment. To do so, you want to remove the two levels in columns and create a single level with column names: ’ann txn amt ave 2011’, ’ann txn amt ave 2012’, etc. To do so, use the code snippet below prior to saving the dataframe as an Excel file.

      level_0 = clnt_annual_aggregations_pivot . columns . get_level_values ( 0 ) . astype ( str)

      level_1 = clnt_annual_aggregations_pivot . columns . get_level_values ( 1 ) . astype ( str)
      
      clnt_annual_aggregations_pivot . columns = level_0 + ’_’ + level_1

*Describe what each line of code in the box does and save the output dataframe as an Excel file annual features.xlsx. A snapshot of the desired final output is shown below.*

    'level_0 = clnt_annual_aggregations_pivot . columns . get_level_values ( 0 ) . astype ( str)' Here, this code basically extracts the first level in the columns (i.e. ann_txn_amt_...) for each year. 
    Thus we will have 5 repetitions of the same value (1 for each year). Once it extracts them, it converts them to a 'string' datatype and finally assigns all the values to 'level_1'.

    'level_1 = clnt_annual_aggregations_pivot . columns . get_level_values ( 1 ) . astype ( str)' This code does the same job as above, but the only difference is in the level, 
    here it selects and extracts the years column level, and the rest is essentially the same.

    clnt_annual_aggregations_pivot . columns = level_0 + ' _ ' + level_1 This code is a version of formatted string that directly combines the 2 newly created variables (level_0 and level_1) 
    with an underscore in the middle, and assigns each column it's new name that is a combination of the 'ann_txn_amt_...' & 'Year', all in a single level.

In [0]:
# Since the function of Step 5 is already acheived in Step 4, the dataframe is only renamed and saved directly as excel file.
# This step would also render Step 6 meaningless since we are using PySpark instead of Python.

annual_features = clnt_annual_aggregations_pt # Renamed
annual_features = annual_features.withColumnRenamed('id_customer', 'customer_id')
annual_features = annual_features.sort(annual_features.customer_id.asc())
display(annual_features)

customer_id,sum_amt_txn_ann_2011,mean_amt_txn_ann_2011,std_amt_txn_ann_2011,var_amt_txn_ann_2011,sem_amt_txn_ann_2011,max_amt_txn_ann_2011,min_amt_txn_ann_2011,count_amt_txn_ann_2011,sum_amt_txn_ann_2012,mean_amt_txn_ann_2012,std_amt_txn_ann_2012,var_amt_txn_ann_2012,sem_amt_txn_ann_2012,max_amt_txn_ann_2012,min_amt_txn_ann_2012,count_amt_txn_ann_2012,sum_amt_txn_ann_2013,mean_amt_txn_ann_2013,std_amt_txn_ann_2013,var_amt_txn_ann_2013,sem_amt_txn_ann_2013,max_amt_txn_ann_2013,min_amt_txn_ann_2013,count_amt_txn_ann_2013,sum_amt_txn_ann_2014,mean_amt_txn_ann_2014,std_amt_txn_ann_2014,var_amt_txn_ann_2014,sem_amt_txn_ann_2014,max_amt_txn_ann_2014,min_amt_txn_ann_2014,count_amt_txn_ann_2014,sum_amt_txn_ann_2015,mean_amt_txn_ann_2015,std_amt_txn_ann_2015,var_amt_txn_ann_2015,sem_amt_txn_ann_2015,max_amt_txn_ann_2015,min_amt_txn_ann_2015,count_amt_txn_ann_2015
CS1112,212,70.66666666666667,22.03028218914441,485.33333333333337,12.719189352225944,96,56,3,337,67.4,12.720062892926276,161.8,5.688585061331157,81,52,5,212,70.66666666666667,34.50120770833006,1190.3333333333333,19.91928155777155,105,36,3,212,70.66666666666667,16.862186493255653,284.33333333333337,9.735387911006825,90,59,3,39,39.0,0.0,0.0,0.0,39,39,1
CS1113,244,81.33333333333333,21.07921567168317,444.33333333333337,12.170090842352456,94,57,3,374,74.8,17.035257556021865,290.20000000000005,7.618398781896364,95,51,5,426,85.2,13.0843417870369,171.2,5.851495535331117,97,65,5,226,56.5,27.958302285129307,781.6666666666666,13.979151142564652,97,36,4,220,73.33333333333333,27.30079363925769,745.3333333333334,15.762120556715852,98,44,3
CS1114,426,85.2,9.731392500562292,94.70000000000005,4.352011029397789,97,76,5,150,75.0,31.11269837220809,968.0,22.0,97,53,2,352,70.4,24.88573888796553,619.3,11.129240764760189,105,43,5,425,70.83333333333333,26.947479783212877,726.1666666666666,11.001262553806166,95,37,6,79,79.0,0.0,0.0,0.0,79,79,1
CS1115,261,87.0,15.0,225.0,8.660254037844387,102,72,3,473,67.57142857142857,21.84686966788868,477.2857142857143,8.257340580924813,104,41,7,557,79.57142857142857,10.533393610436333,110.95238095238098,3.9812485649673297,94,66,7,313,78.25,22.632940595512547,512.2499999999999,11.316470297756274,98,50,4,55,55.0,0.0,0.0,0.0,55,55,1
CS1116,235,58.75,21.203380233664003,449.5833333333333,10.601690116832,87,41,4,230,76.66666666666667,30.138568866708543,908.3333333333334,17.400510848184254,105,45,3,59,59.0,0.0,0.0,0.0,59,59,1,333,66.6,25.234896472940004,636.8,11.285388783732706,96,40,5,0,0.0,0.0,0.0,0.0,0,0,0
CS1117,213,71.0,26.057628441590765,679.0,15.044378795195678,98,46,3,196,65.33333333333333,32.715949219506584,1070.3333333333337,18.88856208867625,100,35,3,347,69.4,17.38677658451963,302.30000000000007,7.775602870517502,94,49,5,429,71.5,15.871357849913158,251.9,6.47945470956726,94,56,6,0,0.0,0.0,0.0,0.0,0,0,0
CS1118,124,62.0,26.87005768508881,722.0,19.0,81,43,2,63,63.0,0.0,0.0,0.0,63,63,1,144,72.0,7.071067811865476,50.0,5.0,77,67,2,620,68.88888888888889,23.5661008890124,555.3611111111111,7.855366963004133,101,41,9,60,60.0,0.0,0.0,0.0,60,60,1
CS1119,0,0.0,0.0,0.0,0.0,0,0,0,411,82.2,27.30750812505601,745.7,12.212288892750612,102,35,5,239,79.66666666666667,26.85764943797825,721.3333333333334,15.506271132817345,99,49,3,449,74.83333333333333,21.5445275340785,464.1666666666666,8.795516534639175,93,36,6,59,59.0,0.0,0.0,0.0,59,59,1
CS1120,410,82.0,22.56102834535696,509.0,10.08959860450355,105,53,5,352,70.4,19.37266114915553,375.3,8.663717446916191,96,50,5,508,72.57142857142857,23.387013083170345,546.9523809523808,8.839460075240382,100,45,7,75,75.0,0.0,0.0,0.0,75,75,1,332,55.333333333333336,15.819818793736758,250.2666666666667,6.458413977991124,84,41,6
CS1121,288,72.0,32.03123475609393,1026.0,16.015617378046965,102,40,4,613,61.3,22.652201266592662,513.1222222222223,7.163255001898384,104,35,10,283,56.6,20.971409108593537,439.8,9.378699270154684,88,38,5,303,50.5,12.973048986263793,168.30000000000004,5.296225070746145,73,38,6,37,37.0,0.0,0.0,0.0,37,37,1


In [0]:
# Save as excel
# annual_features.toPandas().to_csv('annual_features.csv', index = False)

###### 6. What are the possible disadvantages in capturing client transaction behavior with the annual features described in this section (if any)?

Disadvantages of capturing client transaction behavior with Annual Features.
If we want to find out the behavior of a client on a monthly basis in a particular year, perhaps 2015, then it would be really difficult to gauge that value from the annual features as we derived.

##### 1.3 Create monthly aggregations
Here, you want to explore the monthly sum of amounts and count of client transactions.

###### 1. Create the dataframe that captures the monthly sum and count of transactions per client (name it clnt_monthly_aggregations). Use the `groupby` function with the `Named Aggregation` feature which was introduced in pandas v0.25.0. Make sure that you name the columns as shown in the figure sample on the right.

In [0]:
txn.show(5)

+-----------+-----------+----------+----------+----+
|customer_id|tran_amount|  txn_date|     ME_DT|YEAR|
+-----------+-----------+----------+----------+----+
|     CS5295|         35|2013-02-11|2013-02-28|2013|
|     CS4768|         39|2015-03-15|2015-03-31|2015|
|     CS2122|         52|2013-02-26|2013-02-28|2013|
|     CS1217|         99|2011-11-16|2011-11-30|2011|
|     CS1850|         78|2013-11-20|2013-11-30|2013|
+-----------+-----------+----------+----------+----+
only showing top 5 rows



In [0]:
clnt_monthly_aggregations = txn.groupby(['customer_id', 'ME_DT']).agg(sum(txn.tran_amount).alias('mth_txn_amt_sum'),
#                                                                       avg(txn.tran_amount).alias('mth_txn_amt_mean'),
#                                                                       stddev(txn.tran_amount).alias('mth_txn_amt_std'),
#                                                                       variance(txn.tran_amount).alias('mth_txn_amt_var'),
# #                                                                     Standard error of mean (SEM) = stddev/sqrt(smaple size)
#                                                                       (stddev(txn.tran_amount)/sqrt(count(txn.tran_amount))).alias('mth_txn_amt_sem'),
#                                                                       max(txn.tran_amount).alias('mth_txn_amt_max'),
#                                                                       min(txn.tran_amount).alias('mth_txn_amt_min'),
                                                                      count(txn.tran_amount).alias('mth_txn_amt_count'))
# Sorting the new dataframe based on customer id and ME_DT
clnt_monthly_aggregations = clnt_monthly_aggregations.sort(clnt_monthly_aggregations.customer_id.asc(), clnt_monthly_aggregations.ME_DT.asc())
display(clnt_monthly_aggregations)
# clnt_monthly_aggregations.show(3)

customer_id,ME_DT,mth_txn_amt_sum,mth_txn_amt_count
CS1112,2011-06-30,56,1
CS1112,2011-08-31,96,1
CS1112,2011-10-31,60,1
CS1112,2012-04-30,56,1
CS1112,2012-06-30,52,1
CS1112,2012-07-31,81,1
CS1112,2012-09-30,72,1
CS1112,2012-12-31,76,1
CS1112,2013-03-31,105,1
CS1112,2013-07-31,36,1


In [0]:
print('The number of rows is', clnt_monthly_aggregations.count())
print('The number of columns is', len(clnt_monthly_aggregations.columns))

The number of rows is 103234
The number of columns is 4


###### 2. Create a histogram of both columns you created. What are your observations? What are the most common and maximum values for each column? How do they compare with the ones in section 1.2?

The output dataframe should look like the snapshot shown on the right for client with ID CS1112 (confirm this with slicing your output dataframe).

Most clients in this dataset shop a few times a year. For example, the client with ’customer id’ CS1112 shown here made purchases in 15 out of 47 months of data in the txn table. The information in this dataset is ”irregular”; some clients may have an entry for a month, while others do not have an entry (e.g. when they don’t shop for this particular month).

In [0]:
# Histogram for the Monthly Sum

display(clnt_monthly_aggregations)

customer_id,ME_DT,mth_txn_amt_sum,mth_txn_amt_count
CS1112,2011-06-30,56,1
CS1112,2011-08-31,96,1
CS1112,2011-10-31,60,1
CS1112,2012-04-30,56,1
CS1112,2012-06-30,52,1
CS1112,2012-07-31,81,1
CS1112,2012-09-30,72,1
CS1112,2012-12-31,76,1
CS1112,2013-03-31,105,1
CS1112,2013-07-31,36,1


Output can only be rendered in Databricks

In [0]:
# Histogram for the Monthly Count

display(clnt_monthly_aggregations)

customer_id,ME_DT,mth_txn_amt_sum,mth_txn_amt_count
CS1112,2011-06-30,56,1
CS1112,2011-08-31,96,1
CS1112,2011-10-31,60,1
CS1112,2012-04-30,56,1
CS1112,2012-06-30,52,1
CS1112,2012-07-31,81,1
CS1112,2012-09-30,72,1
CS1112,2012-12-31,76,1
CS1112,2013-03-31,105,1
CS1112,2013-07-31,36,1


Output can only be rendered in Databricks

*What are your observations?*

`The histograms are skewed to the left side of the table. There are more number of people with less number of transactions and vice versa.`

*What are the most common and maximum values for each column?*

Most common number;
                    
                    SUM = 0-15,15-90 and 105-200
                    COUNT = 1,3

Max number;
            
            SUM = 70
            COUNT = 1
            
*How do they compare with the ones in section 1.2?*

`They both are skewed to the left hand side, with transaction happening more with lower values and, those with high amount have less frequency, indicating that they only make such transactions mostly once a month and it is usually between $0 - $100`

##### 1.4 Create the base_table for the rolling window features

In order to create the rolling window features (more on this in the next section), you need to create a base table with all possible combinations of 'customer_id' and 'NE_DT'. 
For e.g., customer 'CS1112' should have 47 entires for each month, in which 15 will have the value of trasaction amount and the rest 32 will have 0 value for transaction. 
This'll essentially help you convert the "irregular" clnt_monthly_aggregations table into a singular table.

###### 1. Create the numpy array of the unique elements in columns ’customer id’ and ’ME DT’ of the txn table you created in section 1.1. Confirm that you have 6,889 unique clients and 47 unique month-end-dates.

In [0]:
txn.show(4)

+-----------+-----------+----------+----------+----+
|customer_id|tran_amount|  txn_date|     ME_DT|YEAR|
+-----------+-----------+----------+----------+----+
|     CS5295|         35|2013-02-11|2013-02-28|2013|
|     CS4768|         39|2015-03-15|2015-03-31|2015|
|     CS2122|         52|2013-02-26|2013-02-28|2013|
|     CS1217|         99|2011-11-16|2011-11-30|2011|
+-----------+-----------+----------+----------+----+
only showing top 4 rows



In [0]:
# Ditsinct values from the 2 columns collected

customer_id = txn.select('customer_id').distinct().collect()
me_dt = txn.select('ME_DT').distinct().collect()

# Creating arrays from the disttint values
customer_id = np.array(customer_id)
me_dt = np.array(me_dt)

print('There are', len(customer_id), 'clients')
print('There are', len(me_dt), 'MEDs')

There are 6889 clients
There are 47 MEDs


###### 2. Use itertools.product to generate all the possible combinations of ’customer id’ and ’ME DT’. Itertools is a Python module that iterates over data in a computationally efficient way. You can perform the same task with a for-loop, but the execution may be inefficient. For a brief overview of the Itertools module see here. If you named the numpy arrays with the unique elements: clnt no and me dt, then the code below will create an itertools.product object (you can confirm this by running: type(base table))

In [0]:
# Itertools generates all possible combinations of these 2 columns and stores as an itertool product.

base_table = itertools.product(customer_id, me_dt)
type(base_table)

Out[102]: itertools.product

###### 3. Next, you want to convert the itertools.product object base table into a pandas object called base table pd. To do so, use pd.DataFrame.from records and name the columns ’CLNT NO’ and ’ME DT’.

In [0]:
# Pivot_Table to convert the itertool into a pandas DataFrame that is workable.
# ME_DT = MONTH for later purposes

base_table_pd = pd.DataFrame(base_table,
                             columns = ['CLNT_NO', 'MONTH'])
base_table_pd

Unnamed: 0,CLNT_NO,MONTH
0,[CS6001],[2012-05-31]
1,[CS6001],[2013-03-31]
2,[CS6001],[2012-08-31]
3,[CS6001],[2012-01-31]
4,[CS6001],[2014-02-28]
...,...,...
323778,[CS8076],[2014-08-31]
323779,[CS8076],[2012-10-31]
323780,[CS8076],[2012-12-31]
323781,[CS8076],[2012-03-31]


###### 4. Finally, you want to validate that you created the table you originally wanted. There are two checks you want to perform:### 4. Finally, you want to validate that you created the table you originally wanted. There are two checks you want to perform:

        • Filter client CS1112 and confirm that the dates fall between the min and max month-dates you identified in section 1.1. 
          Also, confirm that the snapshot of client CS1112 has 47 rows, one for each month in the dataset.

In [0]:
# Tmmporary data for client CS1112 who has 47 entries

CS1112 = base_table_pd[base_table_pd['CLNT_NO'] == 'CS1112']
CS1112 = spark.createDataFrame(CS1112)
print('There are', CS1112.count(), 'entries for CS1112')
CS1112.show(5)
CS1112.printSchema()

There are 47 entries for CS1112
+--------+------------+
| CLNT_NO|       MONTH|
+--------+------------+
|[CS1112]|[2012-05-31]|
|[CS1112]|[2013-03-31]|
|[CS1112]|[2012-08-31]|
|[CS1112]|[2012-01-31]|
|[CS1112]|[2014-02-28]|
+--------+------------+
only showing top 5 rows

root
 |-- CLNT_NO: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- MONTH: array (nullable = true)
 |    |-- element: date (containsNull = true)



In [0]:
# Section 1.1 Min and Max Dates

txn.agg(min('txn_date'), max('txn_date')).show()

# CS1112 Min and Max Dates

CS1112.agg(min('MONTH'), max('MONTH')).show()

+-------------+-------------+
|min(txn_date)|max(txn_date)|
+-------------+-------------+
|   2011-05-16|   2015-03-16|
+-------------+-------------+

+------------+------------+
|  min(MONTH)|  max(MONTH)|
+------------+------------+
|[2011-05-31]|[2015-03-31]|
+------------+------------+



• Confirm that the base table pd has 323,783 rows, which is the expected value of combinations for 6,889 unique 
          clients and 47 unique month-end dates.

In [0]:
base_table_pd = spark.createDataFrame(base_table_pd)
base_table_pd.show(4)
base_table_pd.printSchema()

print('There are', base_table_pd.count(), 'rows.')

+--------+------------+
| CLNT_NO|       MONTH|
+--------+------------+
|[CS6001]|[2012-05-31]|
|[CS6001]|[2013-03-31]|
|[CS6001]|[2012-08-31]|
|[CS6001]|[2012-01-31]|
+--------+------------+
only showing top 4 rows

root
 |-- CLNT_NO: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- MONTH: array (nullable = true)
 |    |-- element: date (containsNull = true)

There are 323783 rows.


##### 1.5 Create the monthly rolling window features
With the base table pd as a starting point you can convert the irregular transaction data into the typical time series data; data captured at equal intervals. Feature engineering of time series data gives you the potential to build very powerful predictive models.

###### 1. Left-join the base table pd with the clnt monthly aggregations table from section 1.3 on [CLNT NO, ME DT] to create the table base clnt mth. Comment on the following questions in Markdown:

In [0]:
# Converting the array<string> and array<date> in base table to a complete string so it doesnt cause error while merging.
base_table_pd = base_table_pd.withColumn('CLNT_NO', concat_ws(',', 'CLNT_NO')) \
                             .withColumn('MONTH', concat_ws(',', 'MONTH'))
base_table_pd

Out[107]: DataFrame[CLNT_NO: string, MONTH: string]

In [0]:
# Using the merge function to left join the client monthly aggregations table to the base table.
# Other functions similar to merge are join and concat but concat is used to add rows and join is used to add columns directly provided they have the same index.

base_clnt_mth = base_table_pd.join(clnt_monthly_aggregations,
                                   on = [base_table_pd['CLNT_NO'] == clnt_monthly_aggregations['customer_id'], base_table_pd['MONTH'] == clnt_monthly_aggregations['ME_DT']],
                                   how = 'left')
base_clnt_mth = base_clnt_mth.drop('customer_id', 'ME_DT')
base_clnt_mth = base_clnt_mth.withColumnRenamed(existing = 'MONTH', new = 'ME_DT')
display(base_clnt_mth)

CLNT_NO,ME_DT,mth_txn_amt_sum,mth_txn_amt_count
CS6001,2012-05-31,,
CS6001,2013-03-31,,
CS6001,2012-08-31,65.0,1.0
CS6001,2012-01-31,,
CS6001,2014-02-28,63.0,1.0
CS6001,2013-05-31,,
CS6001,2011-07-31,,
CS6001,2011-12-31,,
CS6001,2013-02-28,,
CS6001,2011-09-30,,


In [0]:
# 0 Value Imputation of null values

imputed_base_clnt_mth = base_clnt_mth.fillna(value = 0)
display(imputed_base_clnt_mth)

CLNT_NO,ME_DT,mth_txn_amt_sum,mth_txn_amt_count
CS6001,2012-05-31,0,0
CS6001,2013-03-31,0,0
CS6001,2012-08-31,65,1
CS6001,2012-01-31,0,0
CS6001,2014-02-28,63,1
CS6001,2013-05-31,0,0
CS6001,2011-07-31,0,0
CS6001,2011-12-31,0,0
CS6001,2013-02-28,0,0
CS6001,2011-09-30,0,0


*Why do some rows have NaN values?*

Becasue those values were not pased when the 2 tables kept for matching during the 'merge' function thus giving a Null value for that row.

*What values will you choose to impute NaN values in the sum and count columns? Perform the imputation you suggest.*

I have choosen to do a 0-Value imputation as it makes the most sense, as mentioned in the section above. It indicates that there hasn't been any sort of transaction.

In [0]:
print('There are', base_table_pd.count(), 'rows in "base_table_pd"')
print('There are', clnt_monthly_aggregations.count(), 'rows in "clnt_monthly_aggregations"')
print('There are', imputed_base_clnt_mth.count(), 'rows in "base_clnt_mth"')

There are 323783 rows in "base_table_pd"
There are 103234 rows in "clnt_monthly_aggregations"
There are 323783 rows in "base_clnt_mth"


• Confirm that the number of rows is what you expect. What is the value?
        
The value of rows would remain the same in 'Left' merging which is 323783.

        • How are tables base clnt mth and clnt monthly aggregations different?  
          Comment on the number of rows and the content of each table.

The base client month table has null values, has 323783 rows and all the clients are randomly ordered, whilest in the aggregations table, the index is set to each client's monthly day-to-day and the null values are filled, and has 103234 rows.

This is useful in the merging porcess.

###### 2. For the next step, the calculation of the rolling window features, you need to sort the data first by ’CLNT NO’ and then by ’ME DT’ in ascending order. This is necessary to create the order for rolling windows, e.g. 2011-05-31, 2011-06-30, etc.

In [0]:
# Sort the values based on the given column names.

sorted_imputed_base_clnt_mth = imputed_base_clnt_mth.sort(imputed_base_clnt_mth.CLNT_NO.asc(), imputed_base_clnt_mth.ME_DT.asc())
display(sorted_imputed_base_clnt_mth)

CLNT_NO,ME_DT,mth_txn_amt_sum,mth_txn_amt_count
CS1112,2011-05-31,0,0
CS1112,2011-06-30,56,1
CS1112,2011-07-31,0,0
CS1112,2011-08-31,96,1
CS1112,2011-09-30,0,0
CS1112,2011-10-31,60,1
CS1112,2011-11-30,0,0
CS1112,2011-12-31,0,0
CS1112,2012-01-31,0,0
CS1112,2012-02-29,0,0


###### 3. The idea behind rolling window features is captured in the image below. You calculate some statistical properties (e.g. average) based on a window that is sliding. In the image below, the window is 7 which means that the last 7 points are used at every row to calculate the statistical property.

Here, you have to calculate separately the 3, 6 and 12-month rolling window features (tables: rolling features 3M, rolling features 6M, rolling features 12M) for every client that calculates the aggregations ’sum’, mean’ and ’max’ for both columns ’mth txn amt sum’ and ’mth txn cnt’. The steps to achieve this with base clnt mth as the starting dataframe are:

        • groupby the client number
        • select the two columns you want to aggregate
        • use the rolling function with the appropriate windows
        • aggregate with ’sum’, 'mean’ and ’max’

In [0]:
sorted_imputed_base_clnt_mth.show(5)

+-------+----------+---------------+-----------------+
|CLNT_NO|     ME_DT|mth_txn_amt_sum|mth_txn_amt_count|
+-------+----------+---------------+-----------------+
| CS1112|2011-05-31|              0|                0|
| CS1112|2011-06-30|             56|                1|
| CS1112|2011-07-31|              0|                0|
| CS1112|2011-08-31|             96|                1|
| CS1112|2011-09-30|              0|                0|
+-------+----------+---------------+-----------------+
only showing top 5 rows



In [0]:
# Creating a window for 3M Rolling
window = Window.partitionBy('CLNT_NO').orderBy(desc('ME_DT')).rowsBetween(0, 2)

# Defining the aggregations
# For 'sum' column
sum_mth_txn_amt_sum = sum(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
mean_mth_txn_amt_sum = avg(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
max_mth_txn_amt_sum = max(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
# For 'count' column
sum_mth_txn_amt_cnt = sum(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)
mean_mth_txn_amt_cnt = avg(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)
max_mth_txn_amt_cnt = max(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)

# Adding the columns to a dataframe
rolling_features_3M = sorted_imputed_base_clnt_mth.orderBy('CLNT_NO').select(sorted_imputed_base_clnt_mth.CLNT_NO, sorted_imputed_base_clnt_mth.ME_DT, 
                                                                             sum_mth_txn_amt_sum.alias('amt_sum_3M'),
                                                                             mean_mth_txn_amt_sum.alias('amt_mean_3M'),
                                                                             max_mth_txn_amt_sum.alias('amt_max_3M'),
                                                                             sum_mth_txn_amt_cnt.alias('txn_cnt_sum_3M'),
                                                                             mean_mth_txn_amt_cnt.alias('txn_cnt_mean_3M'),
                                                                             max_mth_txn_amt_cnt.alias('txn_cnt_max_3M'))

# Rolling the new columns over the window 
rolling_features_3M = rolling_features_3M.withColumn('row_no', row_number().over(Window.partitionBy('CLNT_NO').orderBy('ME_DT'))) \
                                         .withColumn('amt_sum_3M', when(col('row_no') <= 2, lit(None)).otherwise(rolling_features_3M.amt_sum_3M)) \
                                         .withColumn('amt_mean_3M', when(col('row_no') <= 2, lit(None)).otherwise(rolling_features_3M.amt_mean_3M)) \
                                         .withColumn('amt_max_3M', when(col('row_no') <= 2, lit(None)).otherwise(rolling_features_3M.amt_max_3M)) \
                                         .withColumn('txn_cnt_sum_3M', when(col('row_no') <= 2, lit(None)).otherwise(rolling_features_3M.txn_cnt_sum_3M)) \
                                         .withColumn('txn_cnt_mean_3M', when(col('row_no') <= 2, lit(None)).otherwise(rolling_features_3M.txn_cnt_mean_3M)) \
                                         .withColumn('txn_cnt_max_3M', when(col('row_no') <= 2, lit(None)).otherwise(rolling_features_3M.txn_cnt_max_3M))

rolling_features_3M = rolling_features_3M.drop('row_no').sort(rolling_features_3M.CLNT_NO.asc(), rolling_features_3M.ME_DT.asc())

rolling_features_3M.show(10)

# Creating a window for 6M Rolling
window = Window.partitionBy('CLNT_NO').orderBy(desc('ME_DT')).rowsBetween(0, 5)

# Defining the aggregations
# For 'sum' column
sum_mth_txn_amt_sum = sum(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
mean_mth_txn_amt_sum = avg(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
max_mth_txn_amt_sum = max(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
# For 'count' column
sum_mth_txn_amt_cnt = sum(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)
mean_mth_txn_amt_cnt = avg(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)
max_mth_txn_amt_cnt = max(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)

# Adding the columns to a dataframe
rolling_features_6M = sorted_imputed_base_clnt_mth.orderBy('CLNT_NO').select(sorted_imputed_base_clnt_mth.CLNT_NO, sorted_imputed_base_clnt_mth.ME_DT, 
                                                                             sum_mth_txn_amt_sum.alias('amt_sum_6M'),
                                                                             mean_mth_txn_amt_sum.alias('amt_mean_6M'),
                                                                             max_mth_txn_amt_sum.alias('amt_max_6M'),
                                                                             sum_mth_txn_amt_cnt.alias('txn_cnt_sum_6M'),
                                                                             mean_mth_txn_amt_cnt.alias('txn_cnt_mean_6M'),
                                                                             max_mth_txn_amt_cnt.alias('txn_cnt_max_6M'))

# Rolling the new columns over the window 
rolling_features_6M = rolling_features_6M.withColumn('row_no', row_number().over(Window.partitionBy('CLNT_NO').orderBy('ME_DT'))) \
                                         .withColumn('amt_sum_6M', when(col('row_no') <= 5, lit(None)).otherwise(rolling_features_6M.amt_sum_6M)) \
                                         .withColumn('amt_mean_6M', when(col('row_no') <= 5, lit(None)).otherwise(rolling_features_6M.amt_mean_6M)) \
                                         .withColumn('amt_max_6M', when(col('row_no') <= 5, lit(None)).otherwise(rolling_features_6M.amt_max_6M)) \
                                         .withColumn('txn_cnt_sum_6M', when(col('row_no') <= 5, lit(None)).otherwise(rolling_features_6M.txn_cnt_sum_6M)) \
                                         .withColumn('txn_cnt_mean_6M', when(col('row_no') <= 5, lit(None)).otherwise(rolling_features_6M.txn_cnt_mean_6M)) \
                                         .withColumn('txn_cnt_max_6M', when(col('row_no') <= 5, lit(None)).otherwise(rolling_features_6M.txn_cnt_max_6M))

rolling_features_6M = rolling_features_6M.drop('row_no').sort(rolling_features_6M.CLNT_NO.asc(), rolling_features_6M.ME_DT.asc())

rolling_features_6M.show(10)

# Creating a window for 12M Rolling
window = Window.partitionBy('CLNT_NO').orderBy(desc('ME_DT')).rowsBetween(0, 11)

# Defining the aggregations
# For 'sum' column
sum_mth_txn_amt_sum = sum(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
mean_mth_txn_amt_sum = avg(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
max_mth_txn_amt_sum = max(sorted_imputed_base_clnt_mth.mth_txn_amt_sum).over(window)
# For 'count' column
sum_mth_txn_amt_cnt = sum(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)
mean_mth_txn_amt_cnt = avg(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)
max_mth_txn_amt_cnt = max(sorted_imputed_base_clnt_mth.mth_txn_amt_count).over(window)

# Adding the columns to a dataframe
rolling_features_12M = sorted_imputed_base_clnt_mth.orderBy('CLNT_NO').select(sorted_imputed_base_clnt_mth.CLNT_NO, sorted_imputed_base_clnt_mth.ME_DT, 
                                                                             sum_mth_txn_amt_sum.alias('amt_sum_12M'),
                                                                             mean_mth_txn_amt_sum.alias('amt_mean_12M'),
                                                                             max_mth_txn_amt_sum.alias('amt_max_12M'),
                                                                             sum_mth_txn_amt_cnt.alias('txn_cnt_sum_12M'),
                                                                             mean_mth_txn_amt_cnt.alias('txn_cnt_mean_12M'),
                                                                             max_mth_txn_amt_cnt.alias('txn_cnt_max_12M'))

# Rolling the new columns over the window 
rolling_features_12M = rolling_features_12M.withColumn('row_no', row_number().over(Window.partitionBy('CLNT_NO').orderBy('ME_DT'))) \
                                         .withColumn('amt_sum_12M', when(col('row_no') <= 11, lit(None)).otherwise(rolling_features_12M.amt_sum_12M)) \
                                         .withColumn('amt_mean_12M', when(col('row_no') <= 11, lit(None)).otherwise(rolling_features_12M.amt_mean_12M)) \
                                         .withColumn('amt_max_12M', when(col('row_no') <= 11, lit(None)).otherwise(rolling_features_12M.amt_max_12M)) \
                                         .withColumn('txn_cnt_sum_12M', when(col('row_no') <= 11, lit(None)).otherwise(rolling_features_12M.txn_cnt_sum_12M)) \
                                         .withColumn('txn_cnt_mean_12M', when(col('row_no') <= 11, lit(None)).otherwise(rolling_features_12M.txn_cnt_mean_12M)) \
                                         .withColumn('txn_cnt_max_12M', when(col('row_no') <= 11, lit(None)).otherwise(rolling_features_12M.txn_cnt_max_12M))

rolling_features_12M = rolling_features_12M.drop('row_no').sort(rolling_features_12M.CLNT_NO.asc(), rolling_features_12M.ME_DT.asc())

rolling_features_12M.show(15)

+-------+----------+----------+------------------+----------+--------------+------------------+--------------+
|CLNT_NO|     ME_DT|amt_sum_3M|       amt_mean_3M|amt_max_3M|txn_cnt_sum_3M|   txn_cnt_mean_3M|txn_cnt_max_3M|
+-------+----------+----------+------------------+----------+--------------+------------------+--------------+
| CS1112|2011-05-31|      null|              null|      null|          null|              null|          null|
| CS1112|2011-06-30|      null|              null|      null|          null|              null|          null|
| CS1112|2011-07-31|        56|18.666666666666668|        56|             1|0.3333333333333333|             1|
| CS1112|2011-08-31|       152|50.666666666666664|        96|             2|0.6666666666666666|             1|
| CS1112|2011-09-30|        96|              32.0|        96|             1|0.3333333333333333|             1|
| CS1112|2011-10-31|       156|              52.0|        96|             2|0.6666666666666666|             1|
|

The output of the 3-month rolling window dataframe is shown below. Also, answer the following questions in the .ipynb notebook as Markdown comments.

    • How many rows appear with NaN values at the beginning of each client for 3, 6 and 12-month windows, respectively? 
      Why do they appear?

Because these are rolling features, thus, they actually take the value(mentioned) and find the answer and store it in the last row.
i.e. Thus for rolling features with 3 rows, the first 2 rows are NaN because they don't have values above them.
Same is the case for the rolling features with 6 and with 9 rows.

    • How many levels do the index and columns have? Are these MultiIndex dataframes?

All 3 dataframes have only 1 column index as it is not supported in PySpark, atleast from what I tried to research and found, thus it had to be worked around in such a way that it gives only 1 index and not a MultiIndex dataframe.

    • Rename the columns as following: ’amt sum 3M’, ’amt mean 3M’, ’amt max 3M’, ’txn cnt sum 3M’, ’txn cnt mean 3M’, 
      ’txn cnt max 3M’ and follow the same naming convention for 6M and 12M.

The dataframes for all 3 are created in such a way that it provdes these columns directly.

###### 4. Merge the 4 tables: base clnt mth, rolling features 3M, rolling features 6M, rolling features 12M in the output all rolling features. It is recommended to drop the level:0 of the rolling features MultiIndex table and join with base clnt mth on the indexes.

Make sure you understand why joining on the indexes preserves the CLNT NO and ME DT for each index.

In [0]:
# Join function used here as mentioned previously to join the columns of the other 3 tables into the base table based on the index.

all_rolling_features = sorted_imputed_base_clnt_mth.join(rolling_features_3M, on = ['CLNT_NO', 'ME_DT'], how = 'left') \
                                                   .join(rolling_features_6M, on = ['CLNT_NO', 'ME_DT'], how = 'left') \
                                                   .join(rolling_features_12M, on = ['CLNT_NO', 'ME_DT'], how = 'left')

all_rolling_features = all_rolling_features.sort(all_rolling_features.CLNT_NO.asc(), all_rolling_features.ME_DT.asc())

display(all_rolling_features)

CLNT_NO,ME_DT,mth_txn_amt_sum,mth_txn_amt_count,amt_sum_3M,amt_mean_3M,amt_max_3M,txn_cnt_sum_3M,txn_cnt_mean_3M,txn_cnt_max_3M,amt_sum_6M,amt_mean_6M,amt_max_6M,txn_cnt_sum_6M,txn_cnt_mean_6M,txn_cnt_max_6M,amt_sum_12M,amt_mean_12M,amt_max_12M,txn_cnt_sum_12M,txn_cnt_mean_12M,txn_cnt_max_12M
CS1112,2011-05-31,0,0,,,,,,,,,,,,,,,,,,
CS1112,2011-06-30,56,1,,,,,,,,,,,,,,,,,,
CS1112,2011-07-31,0,0,56.0,18.666666666666668,56.0,1.0,0.3333333333333333,1.0,,,,,,,,,,,,
CS1112,2011-08-31,96,1,152.0,50.66666666666666,96.0,2.0,0.6666666666666666,1.0,,,,,,,,,,,,
CS1112,2011-09-30,0,0,96.0,32.0,96.0,1.0,0.3333333333333333,1.0,,,,,,,,,,,,
CS1112,2011-10-31,60,1,156.0,52.0,96.0,2.0,0.6666666666666666,1.0,212.0,35.333333333333336,96.0,3.0,0.5,1.0,,,,,,
CS1112,2011-11-30,0,0,60.0,20.0,60.0,1.0,0.3333333333333333,1.0,212.0,35.333333333333336,96.0,3.0,0.5,1.0,,,,,,
CS1112,2011-12-31,0,0,60.0,20.0,60.0,1.0,0.3333333333333333,1.0,156.0,26.0,96.0,2.0,0.3333333333333333,1.0,,,,,,
CS1112,2012-01-31,0,0,0.0,0.0,0.0,0.0,0.0,0.0,156.0,26.0,96.0,2.0,0.3333333333333333,1.0,,,,,,
CS1112,2012-02-29,0,0,0.0,0.0,0.0,0.0,0.0,0.0,60.0,10.0,60.0,1.0,0.1666666666666666,1.0,,,,,,


###### 5. Confirm that your final output all rolling features has 323,783 rows and 22 columns and save it as mth rolling features.xlsx.

In [0]:
print('There are', all_rolling_features.count(), 'rows in "all_rolling_features".')
print('There are', len(all_rolling_features.columns), 'columns in "all_rolling_features".')

There are 323783 rows in "all_rolling_features".
There are 22 columns in "all_rolling_features".


In [0]:
# Save as csv file
# all_rolling_features.toPandas().to_csv('mth_rolling_features.csv', index=False)

##### 1.6 Date-related features: date of the week

In this section, you will create the date-related features that capture information about the day of the week the transactions were performed.

###### 1. The DatetimeIndex object you used earlier allows you to extract many components of a DateTime object. Here, you want to use the attributes dt.dayofweek and/or dt.day name() to extract the day of the week from column ’txn date’ of the txn table (with Monday=0, Sunday=6). The expected output below shows both columns.

In [0]:
txn = txn.withColumn('day_of_the_week', dayofweek(txn.txn_date)-2) \
         .withColumn('day_name', date_format(txn.txn_date, 'EEEE'))

txn = txn.withColumn("day_of_the_week", when(txn.day_of_the_week == -1, 6).otherwise(txn.day_of_the_week))

txn.show(5)

+-----------+-----------+----------+----------+----+---------------+---------+
|customer_id|tran_amount|  txn_date|     ME_DT|YEAR|day_of_the_week| day_name|
+-----------+-----------+----------+----------+----+---------------+---------+
|     CS5295|         35|2013-02-11|2013-02-28|2013|              0|   Monday|
|     CS4768|         39|2015-03-15|2015-03-31|2015|              6|   Sunday|
|     CS2122|         52|2013-02-26|2013-02-28|2013|              1|  Tuesday|
|     CS1217|         99|2011-11-16|2011-11-30|2011|              2|Wednesday|
|     CS1850|         78|2013-11-20|2013-11-30|2013|              2|Wednesday|
+-----------+-----------+----------+----------+----+---------------+---------+
only showing top 5 rows



###### 2. Create the bar plot that shows the count of transactions per day of the week.

In [0]:
Count_of_Transactions_per_day = txn.groupby('day_name').agg(count('tran_amount').alias('Count of Transactions per WeekDay'))
display(Count_of_Transactions_per_day)

day_name,Count of Transactions per WeekDay
Wednesday,18028
Tuesday,18031
Friday,17590
Thursday,17796
Saturday,17929
Monday,17885
Sunday,17741


Output can only be rendered in Databricks

###### 3. Following the same logic as in section 1.2, generate the features that capture the count of transactions per client, year and day of the week. The intermediate MultiIndex dataframe (with nlevels=3) and the final pivoted output with a single index are shown in the snapshots below.

In [0]:
# Groupby the 3 columns
annual_day_of_week_counts = txn.groupby(['customer_id', 'YEAR', 'day_name']).agg(count(txn.tran_amount).alias('ann_day_of_week_count'))

# Sorting the new dataframe based on customer id and YEAR
annual_day_of_week_counts = annual_day_of_week_counts.sort(annual_day_of_week_counts.customer_id.asc(), annual_day_of_week_counts.YEAR.asc())

# Pivoting the data by visualization in 'annual_day_of_week_counts'
annual_day_of_week_counts_pt = annual_day_of_week_counts.groupby(['customer_id', 'YEAR']).pivot('day_name').agg(first('ann_day_of_week_count').alias('ann_day_of_week_count'))
annual_day_of_week_counts_pt = annual_day_of_week_counts_pt.groupby('customer_id').pivot('YEAR').agg(first('Sunday').alias('count_trans_Sunday'),
                                                                                                 first('Monday').alias('count_trans_Monday'),
                                                                                                 first('Tuesday').alias('count_trans_Wednesday'),
                                                                                                 first('Wednesday').alias('count_trans_Tuesday'),
                                                                                                 first('Thursday').alias('count_trans_Thursday'),
                                                                                                 first('Friday').alias('count_trans_Friday'),
                                                                                                 first('Saturday').alias('count_trans_Saturday'))

annual_day_of_week_counts_pt = annual_day_of_week_counts_pt.select([col(cols).name('_'.join(x for x in cols.split('_')[::-1])) for cols in annual_day_of_week_counts_pt.columns])
annual_day_of_week_counts_pt = annual_day_of_week_counts_pt.sort(annual_day_of_week_counts_pt.id_customer.asc())

# 0 - Value Imputation
annual_day_of_week_counts_pt = annual_day_of_week_counts_pt.fillna(value = 0)

display(annual_day_of_week_counts_pt)

id_customer,Sunday_trans_count_2011,Monday_trans_count_2011,Wednesday_trans_count_2011,Tuesday_trans_count_2011,Thursday_trans_count_2011,Friday_trans_count_2011,Saturday_trans_count_2011,Sunday_trans_count_2012,Monday_trans_count_2012,Wednesday_trans_count_2012,Tuesday_trans_count_2012,Thursday_trans_count_2012,Friday_trans_count_2012,Saturday_trans_count_2012,Sunday_trans_count_2013,Monday_trans_count_2013,Wednesday_trans_count_2013,Tuesday_trans_count_2013,Thursday_trans_count_2013,Friday_trans_count_2013,Saturday_trans_count_2013,Sunday_trans_count_2014,Monday_trans_count_2014,Wednesday_trans_count_2014,Tuesday_trans_count_2014,Thursday_trans_count_2014,Friday_trans_count_2014,Saturday_trans_count_2014,Sunday_trans_count_2015,Monday_trans_count_2015,Wednesday_trans_count_2015,Tuesday_trans_count_2015,Thursday_trans_count_2015,Friday_trans_count_2015,Saturday_trans_count_2015
CS1112,1,0,0,1,0,1,0,3,0,1,0,0,0,1,0,1,0,1,0,1,0,0,0,1,1,1,0,0,0,0,0,1,0,0,0
CS1113,1,1,0,0,0,1,0,0,1,1,1,0,2,0,0,1,2,1,0,0,1,0,0,3,0,0,0,1,0,2,0,1,0,0,0
CS1114,1,1,0,2,1,0,0,1,0,0,0,0,0,1,1,1,0,1,2,0,0,1,1,1,1,0,1,1,0,0,0,0,1,0,0
CS1115,1,0,0,1,0,0,1,2,1,1,1,1,1,0,0,1,0,2,2,0,2,0,0,0,2,2,0,0,0,0,0,0,1,0,0
CS1116,0,1,1,0,1,1,0,0,0,1,0,1,1,0,0,0,0,0,0,1,0,1,1,0,0,0,3,0,0,0,0,0,0,0,0
CS1117,0,0,0,1,1,1,0,0,1,0,1,0,1,0,3,2,0,0,0,0,0,1,0,2,1,1,0,1,0,0,0,0,0,0,0
CS1118,0,0,0,2,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,1,0,3,2,2,1,0,1,0,0,0,0,0,0,0,1
CS1119,0,0,0,0,0,0,0,1,0,1,2,1,0,0,0,0,1,1,1,0,0,0,1,0,2,0,2,1,0,0,0,0,1,0,0
CS1120,0,1,0,0,2,1,1,1,0,1,1,1,1,0,0,1,2,1,1,1,1,0,1,0,0,0,0,0,0,1,0,0,0,3,2
CS1121,0,1,1,0,2,0,0,4,1,1,1,1,1,1,0,0,1,0,0,4,0,2,1,1,0,2,0,0,0,0,1,0,0,0,0


###### 4. Confirm that your output has the same number of rows as the final output in section 1.2 and save it as annual day of week counts pivot.xlsx. How many features/columns did you create in this section?

In [0]:
# List of Columns names and Number of Columns

print('The column names are listed below: \n', annual_day_of_week_counts_pt.columns)
print('\nThere are', len(annual_day_of_week_counts_pt.columns), 'coumns')
print('\nThere are', annual_day_of_week_counts_pt.count(), 'rows in "annual_day_of_week_counts_pt"')
print('\nThere are', clnt_annual_aggregations_pt.count(), 'rows in "clnt_annual_aggregations_pt"')

The column names are listed below: 
 ['id_customer', 'Sunday_trans_count_2011', 'Monday_trans_count_2011', 'Wednesday_trans_count_2011', 'Tuesday_trans_count_2011', 'Thursday_trans_count_2011', 'Friday_trans_count_2011', 'Saturday_trans_count_2011', 'Sunday_trans_count_2012', 'Monday_trans_count_2012', 'Wednesday_trans_count_2012', 'Tuesday_trans_count_2012', 'Thursday_trans_count_2012', 'Friday_trans_count_2012', 'Saturday_trans_count_2012', 'Sunday_trans_count_2013', 'Monday_trans_count_2013', 'Wednesday_trans_count_2013', 'Tuesday_trans_count_2013', 'Thursday_trans_count_2013', 'Friday_trans_count_2013', 'Saturday_trans_count_2013', 'Sunday_trans_count_2014', 'Monday_trans_count_2014', 'Wednesday_trans_count_2014', 'Tuesday_trans_count_2014', 'Thursday_trans_count_2014', 'Friday_trans_count_2014', 'Saturday_trans_count_2014', 'Sunday_trans_count_2015', 'Monday_trans_count_2015', 'Wednesday_trans_count_2015', 'Tuesday_trans_count_2015', 'Thursday_trans_count_2015', 'Friday_trans_coun

In [0]:
# Save as Excel
# annual_day_of_week_counts_pt.toPandas().to_csv('annual_day_of_week_counts_pivot.csv', index=False)

###### 5. Similarly, generate the features that capture the count of transactions per client, month-end-date and day of the week. In contrast with the annual pivot table in the previous step, here you want to create the pivot with [’customer id’, ’ME DT’] as index to obtain the following output dataframe.

In [0]:
# Groupby the 3 columns
annual_ME_DT_counts = txn.groupby(['customer_id', 'ME_DT', 'day_name']).agg(count(txn.tran_amount).alias('ann_day_of_week_count'))

# Sorting the new dataframe based on customer id and YEAR
annual_ME_DT_counts = annual_ME_DT_counts.sort(annual_ME_DT_counts.customer_id.asc(), annual_ME_DT_counts.ME_DT.asc())

# Pivoting the data by visualization in 'annual_day_of_week_counts'
annual_ME_DT_counts_pt = annual_ME_DT_counts.groupby(['customer_id', 'ME_DT']).pivot('day_name').agg(first('ann_day_of_week_count').alias('ann_day_of_week_count'))
annual_ME_DT_counts_pt = annual_ME_DT_counts_pt.withColumnRenamed('Sunday', 'trans_count_Sunday') \
                                               .withColumnRenamed('Monday', 'trans_count_Monday') \
                                               .withColumnRenamed('Tuesday', 'trans_count_Tuesday') \
                                               .withColumnRenamed('Wednesday', 'trans_count_Wednesday') \
                                               .withColumnRenamed('Thursday', 'trans_count_Thursday') \
                                               .withColumnRenamed('Friday', 'trans_count_Friday') \
                                               .withColumnRenamed('Saturday', 'trans_count_Saturday')                                           

# annual_ME_DT_counts_pt = annual_ME_DT_counts_pt.select([col(cols).name('_'.join(x for x in cols.split('_')[::-1])) for cols in annual_ME_DT_counts_pt.columns])
annual_ME_DT_counts_pt = annual_ME_DT_counts_pt.sort(annual_ME_DT_counts_pt.customer_id.asc())

# 0 - Value Imputation
annual_ME_DT_counts_pt = annual_ME_DT_counts_pt.fillna(value = 0)

display(annual_ME_DT_counts_pt)

customer_id,ME_DT,trans_count_Friday,trans_count_Monday,trans_count_Saturday,trans_count_Sunday,trans_count_Thursday,trans_count_Tuesday,trans_count_Wednesday
CS1112,2011-06-30,0,0,0,0,0,0,1
CS1112,2011-08-31,1,0,0,0,0,0,0
CS1112,2011-10-31,0,0,0,1,0,0,0
CS1112,2012-04-30,0,0,0,1,0,0,0
CS1112,2012-06-30,0,0,0,1,0,0,0
CS1112,2012-07-31,0,0,0,0,0,1,0
CS1112,2012-09-30,0,0,0,1,0,0,0
CS1112,2012-12-31,0,0,1,0,0,0,0
CS1112,2013-03-31,1,0,0,0,0,0,0
CS1112,2013-07-31,0,1,0,0,0,0,0


###### 6. Join with base table pd as you did in section 1.5 and impute with your choice of value for NaN. Save the final output as mth day counts.xlxs.

In [0]:
# Merging Tables
mth_day_counts = base_table_pd.join(annual_ME_DT_counts_pt,
                                   on = [base_table_pd['CLNT_NO'] == annual_ME_DT_counts_pt['customer_id'], base_table_pd['MONTH'] == annual_ME_DT_counts_pt['ME_DT']],
                                   how = 'left')

# Dropping unneccessary columns
mth_day_counts = mth_day_counts.drop('customer_id', 'ME_DT')

mth_day_counts = mth_day_counts.withColumnRenamed(existing = 'MONTH', new = 'ME_DT')
mth_day_counts = mth_day_counts.sort(mth_day_counts.CLNT_NO.asc())

# 0 value Imputation
mth_day_counts = mth_day_counts.fillna(value = 0)

display(mth_day_counts)

CLNT_NO,ME_DT,trans_count_Friday,trans_count_Monday,trans_count_Saturday,trans_count_Sunday,trans_count_Thursday,trans_count_Tuesday,trans_count_Wednesday
CS1112,2012-05-31,0,0,0,0,0,0,0
CS1112,2013-03-31,1,0,0,0,0,0,0
CS1112,2012-08-31,0,0,0,0,0,0,0
CS1112,2012-01-31,0,0,0,0,0,0,0
CS1112,2014-02-28,0,0,0,0,0,0,0
CS1112,2013-05-31,0,0,0,0,0,0,0
CS1112,2011-07-31,0,0,0,0,0,0,0
CS1112,2011-12-31,0,0,0,0,0,0,0
CS1112,2013-02-28,0,0,0,0,0,0,0
CS1112,2011-09-30,0,0,0,0,0,0,0


In [0]:
# Save as Excel
# mth_day_counts.toPandas().to_csv('mth_day_counts.csv', index=False)

##### 1.7 Date-related features: days since last transaction

In this date-related features set, you want to capture the frequency of the transactions in terms of the days since the last transaction. This set of features applies only to the monthly features.

###### 1. The starting point is again the txn table. Recall that most clients have a single purchase per month, but some clients have multiple purchases in a month. Since you want to calculate the ”days since last transaction”, you want to capture the last transaction in a month for every client. Use the appropriate groupby to create the table last monthly purchase that captures the last ’txn date’ (aggfunc=max) for every client and month.

In [0]:
txn.show(5)

+-----------+-----------+----------+----------+----+---------------+---------+
|customer_id|tran_amount|  txn_date|     ME_DT|YEAR|day_of_the_week| day_name|
+-----------+-----------+----------+----------+----+---------------+---------+
|     CS5295|         35|2013-02-11|2013-02-28|2013|              0|   Monday|
|     CS4768|         39|2015-03-15|2015-03-31|2015|              6|   Sunday|
|     CS2122|         52|2013-02-26|2013-02-28|2013|              1|  Tuesday|
|     CS1217|         99|2011-11-16|2011-11-30|2011|              2|Wednesday|
|     CS1850|         78|2013-11-20|2013-11-30|2013|              2|Wednesday|
+-----------+-----------+----------+----------+----+---------------+---------+
only showing top 5 rows



In [0]:
last_monthly_purchase = txn.groupby(['customer_id', 'ME_DT']).agg(max('txn_date').alias('last_monthly_purchase'))
last_monthly_purchase = last_monthly_purchase.sort(last_monthly_purchase.customer_id.asc(), last_monthly_purchase.ME_DT.asc())

display(last_monthly_purchase)

customer_id,ME_DT,last_monthly_purchase
CS1112,2011-06-30,2011-06-15
CS1112,2011-08-31,2011-08-19
CS1112,2011-10-31,2011-10-02
CS1112,2012-04-30,2012-04-08
CS1112,2012-06-30,2012-06-24
CS1112,2012-07-31,2012-07-03
CS1112,2012-09-30,2012-09-16
CS1112,2012-12-31,2012-12-15
CS1112,2013-03-31,2013-03-01
CS1112,2013-07-31,2013-07-01


###### 2. Join base table pd with last monthly purchase as you did in section 1.5. The snapshot below shows the output of the created object last monthly purchase base for client CS1112 who made her/his first purchase on June 2011, then no purchase on July and made a purchase again on August 2011. What values will you use to impute the NaT values here? NaT stands for ”Not a Timestamp”.

In [0]:
# Joining and sorting
last_monthly_purchase_base = base_table_pd.join(last_monthly_purchase,
                                                on = [base_table_pd['CLNT_NO'] == last_monthly_purchase['customer_id'], base_table_pd['MONTH'] == last_monthly_purchase['ME_DT']],
                                                how = 'left')
last_monthly_purchase_base = last_monthly_purchase_base.sort(last_monthly_purchase_base.CLNT_NO.asc(), last_monthly_purchase_base.MONTH.asc())

# Dropping unnecessary columns
last_monthly_purchase_base = last_monthly_purchase_base.drop('customer_id', 'ME_DT')
last_monthly_purchase_base = last_monthly_purchase_base.withColumnRenamed('MONTH', 'ME_DT')

display(last_monthly_purchase_base)

CLNT_NO,ME_DT,last_monthly_purchase
CS1112,2011-05-31,
CS1112,2011-06-30,2011-06-15
CS1112,2011-07-31,
CS1112,2011-08-31,2011-08-19
CS1112,2011-09-30,
CS1112,2011-10-31,2011-10-02
CS1112,2011-11-30,
CS1112,2011-12-31,
CS1112,2012-01-31,
CS1112,2012-02-29,


###### 3. To answer the imputation problem, we have to think what value should we use for say July 2011 for ’last monthly purchase’? The answer is that in July the value for the last monthly purchase is the previous line value: 2011-06-15. In other words, for every client we want to forward-fill the NaT values.

While pandas fillna() method has a method to forward-fill, here we want to use the apply and a lambda function with the forward-fill function ffill(), with the following expression: .apply(lambda x: x.ffill()) applied on object last monthly - purchase base grouped by CLNT NO. Below, I am showing a snapshot for lines [92:98] that confirm the transition between clients CS1113 and CS1114.

You can also recreate the forward-fill with the fillna() method, however there is a disadvantage and a reason the .apply() method is preferred here.

In [0]:
w_forward = Window.partitionBy('CLNT_NO').orderBy('ME_DT').rowsBetween(Window.unboundedPreceding, Window.currentRow)
last_monthly_purchase_base = last_monthly_purchase_base.withColumn('fill_forward', last('last_monthly_purchase',ignorenulls=True).over(w_forward))

# Dropping and Renaming
last_monthly_purchase_base = last_monthly_purchase_base.drop('last_monthly_purchase') \
                                                       .withColumnRenamed('fill_forward', 'last_monthly_purchase')

display(last_monthly_purchase_base)

CLNT_NO,ME_DT,last_monthly_purchase
CS1112,2011-05-31,
CS1112,2011-06-30,2011-06-15
CS1112,2011-07-31,2011-06-15
CS1112,2011-08-31,2011-08-19
CS1112,2011-09-30,2011-08-19
CS1112,2011-10-31,2011-10-02
CS1112,2011-11-30,2011-10-02
CS1112,2011-12-31,2011-10-02
CS1112,2012-01-31,2011-10-02
CS1112,2012-02-29,2011-10-02


In [0]:
display(last_monthly_purchase_base.filter(last_monthly_purchase_base.CLNT_NO.between('CS1113', 'CS1114')))

CLNT_NO,ME_DT,last_monthly_purchase
CS1113,2011-05-31,2011-05-27
CS1113,2011-06-30,2011-05-27
CS1113,2011-07-31,2011-07-25
CS1113,2011-08-31,2011-07-25
CS1113,2011-09-30,2011-07-25
CS1113,2011-10-31,2011-10-23
CS1113,2011-11-30,2011-10-23
CS1113,2011-12-31,2011-10-23
CS1113,2012-01-31,2011-10-23
CS1113,2012-02-29,2011-10-23


The forward-fill on the grouped by CLNT NO object is expected to leave NaT values for the first months of every client until they purchase something. The above snapshot confirms that for client CS1114.

###### 4. Subtract the two date columns and convert the output to .dt.days to calculate the column ’days since last txn’ as shown in the following snapshot.

In [0]:
# Creates a column with the date difference with the function 'datediff'
last_monthly_purchase_base = last_monthly_purchase_base.withColumn('days_since_last_txn', 
                                                                   datediff(last_monthly_purchase_base.ME_DT, last_monthly_purchase_base.last_monthly_purchase))
display(last_monthly_purchase_base)

# last_monthly_purchase_base['days_since_last_txn'] = last_monthly_purchase_base['ME DT'] - last_monthly_purchase_base['last_monthly_purchase']
# last_monthly_purchase_base['days_since_last_txn'] = last_monthly_purchase_base['days_since_last_txn'].dt.days

CLNT_NO,ME_DT,last_monthly_purchase,days_since_last_txn
CS1112,2011-05-31,,
CS1112,2011-06-30,2011-06-15,15.0
CS1112,2011-07-31,2011-06-15,46.0
CS1112,2011-08-31,2011-08-19,12.0
CS1112,2011-09-30,2011-08-19,42.0
CS1112,2011-10-31,2011-10-02,29.0
CS1112,2011-11-30,2011-10-02,59.0
CS1112,2011-12-31,2011-10-02,90.0
CS1112,2012-01-31,2011-10-02,121.0
CS1112,2012-02-29,2011-10-02,150.0


###### 5. Plot a histogram of the ’days since last txn’. Based on the values you observe in the histogram, impute the remaining NaN values (i.e. for the initial months before a client makes a purchase). Save the columns [’CLNT NO’, ’ME DT’, ’days since last txn’] as days since last txn.xlsx.

In [0]:
display(last_monthly_purchase_base)

CLNT_NO,ME_DT,last_monthly_purchase,days_since_last_txn
CS1112,2011-05-31,,
CS1112,2011-06-30,2011-06-15,15.0
CS1112,2011-07-31,2011-06-15,46.0
CS1112,2011-08-31,2011-08-19,12.0
CS1112,2011-09-30,2011-08-19,42.0
CS1112,2011-10-31,2011-10-02,29.0
CS1112,2011-11-30,2011-10-02,59.0
CS1112,2011-12-31,2011-10-02,90.0
CS1112,2012-01-31,2011-10-02,121.0
CS1112,2012-02-29,2011-10-02,150.0


Output can only be rendered in Databricks

In [0]:
# Filled with -10
last_monthly_purchase_base = last_monthly_purchase_base.fillna(value = -10)

# Dropped column and created a new dataFrame
days_since_last_txn = last_monthly_purchase_base.drop('last_monthly_purchase')
display(days_since_last_txn)

CLNT_NO,ME_DT,days_since_last_txn
CS1112,2011-05-31,-10
CS1112,2011-06-30,15
CS1112,2011-07-31,46
CS1112,2011-08-31,12
CS1112,2011-09-30,42
CS1112,2011-10-31,29
CS1112,2011-11-30,59
CS1112,2011-12-31,90
CS1112,2012-01-31,121
CS1112,2012-02-29,150


In [0]:
# Save as Excel
# days_since_last_txn.toPandas().to_csv('days_since_last_txn.csv', index=False)