In [1]:
filePath = "/FileStore/tables/GroupData/" #put your own file path if necessary

#Importing the files one by one
Complaints = spark.read\
  .format("csv")\
  .option("inferSchema","true")\
  .option("header","true")\
  .option("delimiter",",")\
  .option("0","NA")\
  .load(filePath + "BDT2_1920_Complaints.csv")\

Delivery=spark.read\
  .format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .option("delimiter",",")\
  .load(filePath + "BDT2_1920_Delivery.csv")

Subscriptions=spark.read\
  .format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .option("delimiter",",")\
  .load(filePath + "BDT2_1920_Subscriptions.csv")

Customers=spark.read\
  .format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .option("delimiter",",")\
  .load(filePath + "BDT2_1920_Customers.csv")

Formula=spark.read\
  .format("csv")\
  .option("header","true")\
  .option("inferSchema","true")\
  .option("delimiter",",")\
  .load(filePath + "BDT2_1920_Formula.csv")

In [2]:
from pyspark.sql.functions import *

#Replacing NA in Complaints
#replacing NA with meaningfull value when possible.
#unknown ID will take the value 0
#unknown numeric values like quantities will take the value 999
#NA values in string type column will take a "NA" value or a "no response"/"no solution" depending on the context

#Replacing NA in Complaints
Complaints = Complaints.withColumn("ProductID", when(Complaints["ProductID"] == "NA", 0).otherwise(Complaints["ProductID"]))\
  .withColumn("ProductName", when(Complaints["ProductName"] == "NA", "NA").otherwise(Complaints["ProductName"]))\
  .withColumn("FeedbackTypeID", when(Complaints["FeedbackTypeID"] == "NA", 0).otherwise(Complaints["FeedbackTypeID"]))\
  .withColumn("FeedbackTypeDesc", when(Complaints["FeedbackTypeDesc"] == "NA", "no response").otherwise(Complaints["FeedbackTypeDesc"]))\
  .withColumn("SolutionTypeID", when(Complaints["SolutionTypeID"] == "NA", 0).otherwise(Complaints["SolutionTypeID"]))\
  .withColumn("SolutionTypeDesc", when(Complaints["SolutionTypeDesc"] == "NA", "no solution").otherwise(Complaints["SolutionTypeDesc"]))

#Replacing NA in Delivery
Delivery = Delivery.na.fill("NA", "DeliveryClass")

#Replacing NA in Subscriptions
#NbrMeals_EXCEP NAs have been replaced by the mean NbrMeals_EXCEP ordered by the same NbrMeals_REG category
Subscriptions = Subscriptions.withColumn("NbrMeals_EXCEP",when((Subscriptions["NbrMeals_EXCEP"] == "NA") & (Subscriptions["NbrMeals_REG"]==76), 12).otherwise(Subscriptions["NbrMeals_EXCEP"]))
Subscriptions = Subscriptions.withColumn("NbrMeals_EXCEP",when((Subscriptions["NbrMeals_EXCEP"] == "NA") & (Subscriptions["NbrMeals_REG"]==304), 13).otherwise(Subscriptions["NbrMeals_EXCEP"]))
Subscriptions = Subscriptions.withColumn("NbrMeals_EXCEP",when((Subscriptions["NbrMeals_EXCEP"] == "NA") & (Subscriptions["NbrMeals_REG"]==329), 17).otherwise(Subscriptions["NbrMeals_EXCEP"]))
Subscriptions = Subscriptions.withColumn("NbrMeals_EXCEP",when((Subscriptions["NbrMeals_EXCEP"] == "NA") & (Subscriptions["NbrMeals_REG"]==152), 13).otherwise(Subscriptions["NbrMeals_EXCEP"]))

#RenewalDate 1 and 0 (so if a client renewed 6 times his subscription, the value can be summed to 6)
Subscriptions = Subscriptions.withColumn("RenewalDate",when(Subscriptions["RenewalDate"] == "NA",0).otherwise(1))

#PaymentDate Redondant with PaymentStatus
  #GrossFormulaPrice
  #NetFormulaPrice
  #NbrMealsPrice
  #ProductDiscount
  #FormulaDiscount
  #TotalDiscount
  #TotalPrice
  #TotalCredit
  #All of those are codependent. Maybe after grouping the NbrMeals_REG/EXCEP features, we can replace NA's by the mean of 
  # the category they belong to


In [3]:
#Complaints
Complaints = Complaints.withColumn("ProductID", Complaints["ProductID"].cast("integer"))\
  .withColumn("SolutionTypeID", Complaints["SolutionTypeID"].cast("integer"))\
  .withColumn("FeedbackTypeID", Complaints["FeedbackTypeID"].cast("integer"))

#Subscriptions
Subscriptions = Subscriptions.withColumn("NbrMeals_EXCEP",Subscriptions["NbrMeals_EXCEP"].cast("integer"))
Subscriptions = Subscriptions.withColumn("RenewalDate",Subscriptions["RenewalDate"].cast("integer"))
  #converting timestamps to number of days
#Subscriptions = Subscriptions.withColumn("EndDate",Subscriptions["EndDate"].cast("long")/86400)
#Subscriptions = Subscriptions.withColumn("StartDate",Subscriptions["StartDate"].cast("long")/86400)

In [4]:
#Subscriptions
Subscriptions = Subscriptions.withColumn("SubscriptionDuration", Subscriptions.EndDate.cast("long")/86400 - Subscriptions.StartDate.cast("long")/86400)
Subscriptions = Subscriptions.withColumn("NbrMealsPerDay", when(Subscriptions["SubscriptionDuration"] == 0, Subscriptions.NbrMeals_REG).otherwise(Subscriptions.NbrMeals_REG / Subscriptions.SubscriptionDuration))

In [5]:
display(Subscriptions)

SubscriptionID,CustomerID,StartDate,EndDate,NbrMeals_REG,NbrMeals_EXCEP,RenewalDate,PaymentType,PaymentStatus,PaymentDate,FormulaID,GrossFormulaPrice,NetFormulaPrice,NbrMealsPrice,ProductDiscount,FormulaDiscount,TotalDiscount,TotalPrice,TotalCredit,ProductName,SubscriptionDuration,NbrMealsPerDay
627529,775138,2016-11-30T00:00:00.000+0000,2017-02-28T00:00:00.000+0000,76,10,1,BT,Paid,2016-12-01,919,1480.0,1480.0,19.47368,0.0,0.0,0.0,1480.0,0.0,Custom Events,90.0,0.8444444444444444
637001,194809,2016-08-26T00:00:00.000+0000,2017-03-02T00:00:00.000+0000,152,25,1,BT,Paid,2016-08-22,4192,2760.0,1760.0,11.57894,0.0,1000.0,1000.0,1760.0,0.0,Custom Events,188.0,0.8085106382978723
1238870,654824,2018-11-25T00:00:00.000+0000,2019-02-23T00:00:00.000+0000,76,10,1,BT,Paid,2018-11-09,10961,1580.0,1580.0,20.78948,0.0,0.0,0.0,1580.0,0.0,Custom Events,90.0,0.8444444444444444
315743,626815,2016-01-01T00:00:00.000+0000,2016-12-30T00:00:00.000+0000,304,10,1,BT,Paid,2015-12-18,896,4980.0,4980.0,16.38158,0.0,0.0,0.0,4980.0,0.0,Custom Events,364.0,0.8351648351648352
1176762,1016426,2018-08-17T00:00:00.000+0000,2018-09-14T00:00:00.000+0000,25,25,0,BT,Paid,2018-08-13,12867,540.0,300.0,12.0,0.0,240.0,240.0,300.0,0.0,Custom Events,28.0,0.8928571428571429
916472,871676,2017-11-21T00:00:00.000+0000,2018-02-19T00:00:00.000+0000,76,10,1,BT,Paid,2017-11-20,5100,1540.0,1540.0,20.26316,0.0,0.0,0.0,1540.0,0.0,Custom Events,90.0,0.8444444444444444
646275,655981,2016-10-25T00:00:00.000+0000,2016-11-25T00:00:00.000+0000,25,10,1,DD,Paid,2016-10-15,924,520.0,520.0,20.8,0.0,0.0,0.0,520.0,0.0,Custom Events,31.0,0.8064516129032258
752611,704300,2017-03-05T00:00:00.000+0000,2017-04-02T00:00:00.000+0000,25,25,1,BT,Paid,2017-03-24,5389,458.0,458.0,18.32,0.0,0.0,0.0,458.0,0.0,Custom Events,28.0,0.8928571428571429
1079202,684448,2018-05-01T00:00:00.000+0000,2018-06-02T00:00:00.000+0000,25,25,1,DD,Paid,2018-04-21,9466,472.0,472.0,18.88,0.0,0.0,0.0,472.0,0.0,Custom Events,32.0,0.78125
669473,276941,2016-11-17T00:00:00.000+0000,2017-11-17T00:00:00.000+0000,304,25,1,BT,Paid,2016-11-24,891,4980.0,4980.0,16.38158,0.0,0.0,0.0,4980.0,0.0,Custom Events,365.0,0.8328767123287671


In [6]:
Subscriptions.createOrReplaceTempView("subscriptions")

In [7]:
SubInter = spark.sql("select CustomerID, sum(NbrMeals_REG) as TotalMeal_REG, avg(NbrMeals_REG) as MeanMeal_REGPerSub, sum(NbrMeals_EXCEP) as TotalMeal_EXCEP, avg(NbrMeals_EXCEP) as MeanMeal_EXCEPPerSub, min(StartDate) as FirstSubDate, max(EndDate) as EndOfLastSub, count(SubscriptionID) as NbrSub, SUM(CASE WHEN PaymentStatus='Paid' THEN 1 ELSE 0 END) as SubPaid, SUM(CASE WHEN PaymentStatus='Not Paid' THEN 1 ELSE 0 END) as SubNotPaid, SUM(CASE WHEN PaymentStatus='Paid' THEN 1 ELSE 0 END)/count(SubscriptionID) as ProportionPaidSub, sum(SubscriptionDuration) as NbrDaysSub, avg(SubscriptionDuration) as AvgDurationPerSub from subscriptions group by CustomerID")
SubInter = SubInter.withColumn("FirstSubDate", SubInter.FirstSubDate*86400)
SubInter = SubInter.withColumn("FirstSubDate", SubInter.FirstSubDate.cast("timestamp"))
SubInter = SubInter.withColumn("EndOfLastSub", SubInter.EndOfLastSub*86400)
SubInter = SubInter.withColumn("EndOfLastSub", SubInter.EndOfLastSub.cast("timestamp"))
#SubInter = SubInter.withColumn("FirstSubDate", SubInter.select((unix_timestamp("FirstSubDate","yyy/MM/dd HH:mm:ss")).cast("timestamp")))
#df.select((unix_timestamp($"Date", "MM/dd/yyyy HH:mm:ss") * 1000).cast("timestamp"), $"Date")
display(SubInter)

CustomerID,TotalMeal_REG,MeanMeal_REGPerSub,TotalMeal_EXCEP,MeanMeal_EXCEPPerSub,FirstSubDate,EndOfLastSub,NbrSub,SubPaid,SubNotPaid,ProportionPaidSub,NbrDaysSub,AvgDurationPerSub
258487,785,196.25,55,13.75,2016-05-02T00:00:00.000+0000,2018-08-31T00:00:00.000+0000,4,3,1,0.75,849.0,212.25
671995,1254,25.08,677,13.54,2015-01-02T00:00:00.000+0000,2019-02-14T00:00:00.000+0000,50,50,0,1.0,1452.0,29.04
285977,1519,303.8,70,14.0,2014-01-04T00:00:00.000+0000,2019-02-07T00:00:00.000+0000,5,5,0,1.0,1856.0,371.2
682942,1519,303.8,85,17.0,2014-02-01T00:00:00.000+0000,2019-01-31T00:00:00.000+0000,5,5,0,1.0,1821.0,364.2
104880,1216,76.0,235,14.6875,2015-01-02T00:00:00.000+0000,2018-12-30T00:00:00.000+0000,16,16,0,1.0,1441.0,90.0625
965578,1216,304.0,55,13.75,2014-12-19T00:00:00.000+0000,2018-12-17T00:00:00.000+0000,4,4,0,1.0,1456.0,364.0
829912,684,114.0,65,10.833333333333334,2016-03-19T00:00:00.000+0000,2017-12-31T00:00:00.000+0000,6,5,1,0.8333333333333334,647.0,107.83333333333331
673836,1216,304.0,55,13.75,2015-01-02T00:00:00.000+0000,2019-01-03T00:00:00.000+0000,4,4,0,1.0,1459.0,364.75
659301,1218,304.5,60,15.0,2014-06-23T00:00:00.000+0000,2018-06-24T00:00:00.000+0000,4,4,0,1.0,1458.0,364.5
1012153,304,304.0,25,25.0,2017-03-02T00:00:00.000+0000,2018-03-01T00:00:00.000+0000,1,1,0,1.0,364.0,364.0


In [8]:
SubInter.withColumn("FirstSubDate", from_unixtime("FirstSubDate"))

In [9]:
#converting timestamps to number of day
#Complaints = Complaints.withColumn("ComplaintDate",Complaints["ComplaintDate"].cast("double")/86400)
display(Complaints)

ComplaintID,CustomerID,ProductID,ProductName,ComplaintDate,ComplaintTypeID,ComplaintTypeDesc,SolutionTypeID,SolutionTypeDesc,FeedbackTypeID,FeedbackTypeDesc
38338,143719,8,Grub Maxi (incl. staff),2015-10-03T00:00:00.000+0000,9,other,1,no compensation,0,no response
64191,674586,7,Grub Maxi (incl. staff),2016-06-07T00:00:00.000+0000,1,late delivery,1,no compensation,3,satisfied
141923,644766,6,Grub Flexi (excl. staff),2017-12-22T00:00:00.000+0000,1,late delivery,1,no compensation,3,satisfied
46632,87584,8,Grub Maxi (incl. staff),2015-12-01T00:00:00.000+0000,1,late delivery,1,no compensation,0,no response
115092,655449,2,Grub Mini,2017-05-30T00:00:00.000+0000,1,late delivery,1,no compensation,0,no response
94109,204587,8,Grub Maxi (incl. staff),2017-02-16T00:00:00.000+0000,1,late delivery,1,no compensation,5,not satisfied
124396,194809,0,,2017-08-17T00:00:00.000+0000,1,late delivery,1,no compensation,3,satisfied
54521,32244,8,Grub Maxi (incl. staff),2016-03-08T00:00:00.000+0000,1,late delivery,1,no compensation,4,no response
84923,466728,8,Grub Maxi (incl. staff),2016-12-15T00:00:00.000+0000,1,late delivery,1,no compensation,5,not satisfied
96210,494173,8,Grub Maxi (incl. staff),2017-03-02T00:00:00.000+0000,1,late delivery,1,no compensation,4,no response


In [10]:
display(Formula)

FormulaID,FormulaType,Duration
10020,CAM,3
10023,CAM,3
1003,CAM,12
10036,CAM,3
1004,CAM,12
10054,CAM,3
10055,CAM,3
1009,REG,1
10103,CAM,3
10124,CAM,12


In [11]:
Complaints.createOrReplaceTempView("complaints")

In [12]:
Intermediary = spark.sql("select CustomerID, count(ComplaintID) as NbrComplaints, max(ComplaintDate) as LastComplaint, min(ComplaintDate) as FirstComplaint from Complaints group by CustomerID")
Intermediary2 = spark.sql("select * from Complaints order by CustomerID")

display(Intermediary)

CustomerID,NbrComplaints,LastComplaint,FirstComplaint
285977,7,2018-11-27T00:00:00.000+0000,2014-09-18T00:00:00.000+0000
671995,2,2014-10-14T00:00:00.000+0000,2014-03-25T00:00:00.000+0000
466728,1,2016-12-15T00:00:00.000+0000,2016-12-15T00:00:00.000+0000
1012153,3,2018-08-07T00:00:00.000+0000,2018-08-02T00:00:00.000+0000
673836,29,2018-12-24T00:00:00.000+0000,2013-11-14T00:00:00.000+0000
865501,1,2016-02-04T00:00:00.000+0000,2016-02-04T00:00:00.000+0000
802664,1,2013-06-21T00:00:00.000+0000,2013-06-21T00:00:00.000+0000
204587,6,2017-02-19T00:00:00.000+0000,2015-12-14T00:00:00.000+0000
462878,2,2017-03-02T00:00:00.000+0000,2012-04-08T00:00:00.000+0000
67278,2,2018-11-08T00:00:00.000+0000,2018-07-19T00:00:00.000+0000


In [13]:
Complaints = Complaints.withColumn("ComplaintDate",Complaints["ComplaintDate"].cast("double")/2628000)
Complaints.createOrReplaceTempView("complaints")

In [14]:
Intermediary2 = spark.sql("select CustomerID,(count(ComplaintID)/(max(ComplaintDate)-min(ComplaintDate))) as ComplaintsPerMonth from Complaints group by CustomerID")

display(Intermediary2)

CustomerID,ComplaintsPerMonth
285977,0.1390703244067057
671995,0.2996715927750424
466728,
1012153,18.25000000000052
673836,0.4727134690961053
865501,
802664,
204587,0.4214780600461899
462878,0.0340040991242779
67278,0.5431547619047673


In [15]:
Intermediary3 = Intermediary.join(Intermediary2,on=['CustomerID'],how='full')
display(Intermediary3)

CustomerID,NbrComplaints,LastComplaint,FirstComplaint,ComplaintsPerMonth
285977,7,2018-11-27T00:00:00.000+0000,2014-09-18T00:00:00.000+0000,0.1390703244067057
671995,2,2014-10-14T00:00:00.000+0000,2014-03-25T00:00:00.000+0000,0.2996715927750424
466728,1,2016-12-15T00:00:00.000+0000,2016-12-15T00:00:00.000+0000,
673836,29,2018-12-24T00:00:00.000+0000,2013-11-14T00:00:00.000+0000,0.4727134690961053
802664,1,2013-06-21T00:00:00.000+0000,2013-06-21T00:00:00.000+0000,
865501,1,2016-02-04T00:00:00.000+0000,2016-02-04T00:00:00.000+0000,
1012153,3,2018-08-07T00:00:00.000+0000,2018-08-02T00:00:00.000+0000,18.25000000000052
67278,2,2018-11-08T00:00:00.000+0000,2018-07-19T00:00:00.000+0000,0.5431547619047673
204587,6,2017-02-19T00:00:00.000+0000,2015-12-14T00:00:00.000+0000,0.4214780600461899
462878,2,2017-03-02T00:00:00.000+0000,2012-04-08T00:00:00.000+0000,0.0340040991242779


In [16]:
#Base Table
#base = Customers.join(Complaints,on=['CustomerID'],how='full')
#base = Customers.join(Subscriptions,on=['CustomerID'],how='full')
base1 = Customers.join(Intermediary3,on=['CustomerID'],how='full')


In [17]:
display(base1)

CustomerID,Region,StreetID,NbrComplaints,LastComplaint,FirstComplaint,ComplaintsPerMonth
104880,5,45805,0,,,0.0
258487,1,14628,0,,,0.0
285977,1,18415,7,2018-11-27T00:00:00.000+0000,2014-09-18T00:00:00.000+0000,0.1390703244067057
671995,1,28929,2,2014-10-14T00:00:00.000+0000,2014-03-25T00:00:00.000+0000,0.2996715927750424
682942,1,18048,0,,,0.0
829912,5,40317,0,,,0.0
965578,5,45860,0,,,0.0
75070,5,43993,0,,,0.0
107896,5,46836,0,,,0.0
158050,5,41138,0,,,0.0


In [18]:
#Replacing NA in Complaints
base1 = base1.na.fill(0)

#base1 = base1.withColumn("NbrComplaints", when(base1["NbrComplaints"] == "null", 0).otherwise(base1["NbrComplaints"]))\
  #.withColumn("ProductName", when(base1["ProductName"] == "NA", "NA").otherwise(base1["ProductName"]))\
  #.withColumn("FeedbackTypeID", when(base1["FeedbackTypeID"] == "NA", 0).otherwise(base1["FeedbackTypeID"]))\
  #.withColumn("FeedbackTypeDesc", when(base1["FeedbackTypeDesc"] == "NA", "no response").otherwise(base1["FeedbackTypeDesc"]))\
  

In [19]:
#base2 = base1.join(Formula,on=['FormulaID'],how='full') needs Subscriptions
#BaseFinal = base1.join(Delivery,on=['SubscriptionID'],how='full') needs Subscriptions