Takes Ecommerce data from Kaggle:
    1. calculates input metrics for linear regression: 
            Total value of transactions over a longer time - total over a shorter time
            Month (just uses 1 to 12
            Avg spend over longer time vs shorter time
            uses two sets of long time/short time values
    2. Takes the custoemrs with trading activity on >40 days over the 18 months covered
    3. Performs Linear Regression on each to predict trading balance on each day
    4. Collects regression metrics into an output dataframe

In [33]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, IntegerType, StructType, StringType, FloatType
import numpy as np
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
from pyspark.ml.regression import LinearRegression
df = spark.read.csv('EcommerceData.csv',inferSchema=True,header=True)
#dftemplate = spark.read.csv('aargh.csv',inferSchema=True,header=True)
#dftemplate.show()

+----------+----+----+---------------+----------+---------------+---------------+
|CustomerID|  r2|RMSE|pVal_L1-S1_Tot3|pVal_Month|pVal_L1-S1_Tot5|pVal_L1-S1_Tot6|
+----------+----+----+---------------+----------+---------------+---------------+
|         1|2.01|2.01|           2.01|      2.01|           2.01|           2.01|
+----------+----+----+---------------+----------+---------------+---------------+



In [34]:
#add Year and Day of Year column in order to see transactin volume over 10days
from pyspark.sql.functions import (format_number,dayofmonth,hour,dayofyear,month,
                                   year,weekofyear,date_format,concat, lit, from_unixtime
                                   , unix_timestamp, to_date, sum as sm, format_number as fn
                                  ,bround, avg as av)
from pyspark.sql.types import IntegerType
import sys
from pyspark.sql import Window

# Import VectorAssembler and Vectors for use later on
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinMaxScaler, VectorAssembler

df = df.na.drop(subset='CustomerID')
df = df.withColumn('Date', to_date(from_unixtime(unix_timestamp(
    concat(df['InvoiceDate'],lit('')),format='MM/d/yyyy HH:mm'))))
df = df.withColumn('Unix', unix_timestamp(df['Date'],format='yyyy/MM/dd'))
df = df.withColumn('Val', df['Quantity']* df['Unitprice'])

#remove refunds
fdf = df.filter("Val is not null and Val >= 0 and StockCode <> 'M'")

fdf = fdf.groupBy(['CustomerID','Date', 'Unix']).agg({'Val':'sum'})
fdf = fdf.withColumnRenamed('sum(Val)','Val')
fdf = fdf.withColumn('Val', bround(fdf['Val'],scale=2))

fdf = fdf.withColumn('DayOfYear', dayofyear(fdf['Date']))
fdf = fdf.withColumn('Month', month(fdf['Date']))

#define offset window to capture activity in past 10 days
days = lambda x: x *86400
w = Window.partitionBy(fdf['CustomerID']).orderBy(fdf['Unix']).rangeBetween(-days(28),-days(1))
w2 = Window.partitionBy(fdf['CustomerID']).orderBy(fdf['Unix']).rangeBetween(-days(126),-days(1))

#Calc average daily spend over short 1 and long 1 periods
fdf = fdf.withColumn('S1Tot',bround(sm(fdf['Val']).over(w),scale=2))
fdf = fdf.withColumn('L1Tot',bround(sm(fdf['Val']).over(w2),scale=2))
fdf = fdf.na.fill(0,subset='S1Tot')
fdf = fdf.na.fill(0,subset='L1Tot')
fdf = fdf.withColumn('L1-S1Tot', bround(fdf['L1Tot']-fdf['S1Tot'],2))

#Calc average daily spend over short 1 and long 1 periods
fdf = fdf.withColumn('S1Avg',bround(av(fdf['Val']).over(w),scale=2))
fdf = fdf.withColumn('L1Avg',bround(av(fdf['Val']).over(w2),scale=2))
fdf = fdf.na.fill(0,subset='S1Avg')
fdf = fdf.na.fill(0,subset='L1Avg')
fdf = fdf.withColumn('L1-S1Avg', bround(fdf['L1Avg']-fdf['S1Avg'],2))

w3 = Window.partitionBy(fdf['CustomerID']).orderBy(fdf['Unix']).rangeBetween(-days(7),-days(1))
w4 = Window.partitionBy(fdf['CustomerID']).orderBy(fdf['Unix']).rangeBetween(-days(28),-days(1))

#Calc average daily spend over short 1 and long 1 periods
fdf = fdf.withColumn('S2Avg',bround(av(fdf['Val']).over(w3),scale=2))
fdf = fdf.withColumn('L2Avg',bround(av(fdf['Val']).over(w4),scale=2))
fdf = fdf.na.fill(0,subset='S2Avg')
fdf = fdf.na.fill(0,subset='L2Avg')
fdf = fdf.withColumn('L2-S2Avg', bround(fdf['L2Avg']-fdf['S2Avg'],2))

#Create df containing only customers with >40 transactions
gred = fdf.groupBy('CustomerID', 'Date').count()
gg = gred.groupBy('CustomerID').count().filter('count>39').drop('count')

assembler = VectorAssembler(
    
    inputCols=["L1-S1Tot", "Month","L1-S1Avg","L2-S2Avg"],
    outputCol="features")
#gg.count()B


In [35]:
from pyspark.sql.types import *

#create list to add columns for results to the 'gg' DF
metricCols =['r2', 'RMSE', 'pVal_L1-S1Tot','Month','pVal_L1-S1Avg', 'pVal_L2-S2Avg']

columnlist = ['CustomerID'] + metricCols
datal = [1,2.01,2.01,2.01,2.01,2.01,2.01]

schema1 = [StructField(columnlist[0],IntegerType(),True)
           , StructField(columnlist[1], FloatType(),True)
          , StructField(columnlist[2], FloatType(),True)
          , StructField(columnlist[3], FloatType(),True)
          , StructField(columnlist[4], FloatType(),True)
          , StructField(columnlist[5], FloatType(),True)
          , StructField(columnlist[6], FloatType(),True)
            ]

dataa = sc.parallelize([datal])
schema2 = StructType(fields=schema1)
resultdf = spark.createDataFrame(dataa,schema2)
resultdf.show()

+----------+----+----+-------------+-----+-------------+-------------+
|CustomerID|  r2|RMSE|pVal_L1-S1Tot|Month|pVal_L1-S1Avg|pVal_L2-S2Avg|
+----------+----+----+-------------+-----+-------------+-------------+
|         1|2.01|2.01|         2.01| 2.01|         2.01|         2.01|
+----------+----+----+-------------+-----+-------------+-------------+



In [37]:
#Create list of customers from df containing cust with >40 days of transactions
custList = [x[0] for x in gg.select('CustomerID').collect()]
#iterate through list running Linear Regression Model for each one
for c in custList:
    final = fdf.filter(fdf['CustomerID']==c).withColumnRenamed('Val','Label')
    final = final.select('L1-S1Tot','Month','L1-S1Avg','L2-S2Avg','Label')
    #Assembles vector for input to linear regression model
    output = assembler.transform(final)
    scaler = MinMaxScaler(inputCol='features',outputCol='sFeatures')
    scalerModel = scaler.fit(output)
    output = scalerModel.transform(output)
    #takes just the scaled Features column and labels to make a two column df for input to linear r
    final_data = output.select(['sFeatures', 'Label']).withColumnRenamed('sFeatures','features')
    train_data,test_data = final_data.randomSplit([0.7,0.3])
    # Create a Linear Regression Model object
    lr = LinearRegression(labelCol='Label')
    lrModel = lr.fit(train_data)
    summary = lrModel.summary 
    #creates a list of the regression metrics for the client
    r2 =summary.r2
    RMSE =lrModel.summary.rootMeanSquaredError
    datalist = [(c,r2,RMSE,lrModel.summary.pValues[0]
                 ,lrModel.summary.pValues[1]
                 ,lrModel.summary.pValues[2]
                 ,lrModel.summary.pValues[3])]
    #converts the regression output list to an RDD, then to a df and unions the df to the main results df
    pdatalist = sc.parallelize(datalist)
    raw_df = spark.createDataFrame(datalist,schema2)
    resultdf = resultdf.union(raw_df)

#filters out the literals used to create the inital df and sorts by R Squared
resultdf = resultdf.filter('CustomerID<>1').orderBy('r2',ascending=False)
#Formats to 4 decimal places
for c in resultdf.columns[1:]:
             resultdf = resultdf.withColumn(c,bround(c,4))
resultdf.show()


In [38]:
resultdf = resultdf.filter('CustomerID<>1').orderBy('r2',ascending=False)
for c in resultdf.columns[1:]:
             resultdf = resultdf.withColumn(c,bround(c,4))
resultdf.show()


+----------+------+---------+---------------+----------+---------------+---------------+
|CustomerID|    r2|     RMSE|pVal_L1-S1_Tot3|pVal_Month|pVal_L1-S1_Tot5|pVal_L1-S1_Tot6|
+----------+------+---------+---------------+----------+---------------+---------------+
|     17841|0.5075| 144.6799|         0.1024|    1.0E-4|         0.3395|         0.3949|
|     14527|0.4239|  56.4599|         0.4134|    0.0183|         0.6819|          0.442|
|     15039|0.2872| 195.5123|         0.5953|    0.0229|         0.2834|         0.7516|
|     13798|0.2496| 605.7141|         0.0229|    0.4618|         0.4169|         0.5994|
|     14646|0.1997|5504.3335|         0.8977|    0.7417|         0.0115|         0.8673|
|     14156|0.1454|3813.3564|         0.7905|    0.2849|         0.3052|           0.23|
|     16422|0.1442| 517.9576|         0.6189|    0.5622|         0.3058|         0.0637|
|     14606|0.1313|  49.8456|         0.0212|    0.0216|         0.3328|         0.7299|
|     12748|0.1232| 3

In [30]:
for c in dftemplate = dftemplate.withColumn('r2',bround('r2',3))
dftemplate.orderBy('r2',ascending=False).show()
# Print the coefficients and intercept for linear regression
#print("Coefficients: {} Intercept: {}".format(lrModel.coefficients,lrModel.intercept))

+----------+---+------------------+--------------------+--------------------+--------------------+-------------------+
|CustomerID| r2|              RMSE|     pVal_L1-S1_Tot3|          pVal_Month|     pVal_L1-S1_Tot5|    pVal_L1-S1_Tot6|
+----------+---+------------------+--------------------+--------------------+--------------------+-------------------+
|         1|2.0|              2.01|                2.01|                2.01|                2.01|               2.01|
|     14606|0.0| 51.21707534790039|0.004383583553135395| 0.24977006018161774|  0.6457734107971191| 0.6916141510009766|
|     15311|0.0|    514.8115234375|0.021972138434648514|0.008193759247660637| 0.15857551991939545|  0.935386598110199|
|     14156|0.0| 2388.231689453125|  0.7416616082191467|  0.8343241810798645|   0.752126157283783|0.09927629679441452|
|     14646|0.0|  6070.65380859375| 0.05948681756854057| 0.08887233585119247|0.004989327397197485| 0.7663782238960266|
|     15039|0.0|183.70236206054688|  0.711657762

In [9]:
summary = lrModel.summary
print("R squared:        {:.3f}".format(summary.r2))
print("RMSE:             {:,.0f}".format(lrModel.summary.rootMeanSquaredError))
print("pValue L1-S1Tot:  {:.4f}".format(lrModel.summary.pValues[0]))
print("pValue Month:     {:.4f}".format(lrModel.summary.pValues[1]))
print("pValue L1-S1Avg:  {:.4f}".format(lrModel.summary.pValues[2]))
print("pValue L2-S2Avg:  {:.4f}".format(lrModel.summary.pValues[3]))
rdf = summary.predictions
rdf = rdf.withColumn('Residuals', rdf.Label - rdf.prediction)
rdf.show(10)

R squared:        0.199
RMSE:             51
pValue L1-S1Tot:  0.0019
pValue Month:     0.0137
pValue L1-S1Avg:  0.9158
pValue L2-S2Avg:  0.3117
+--------------------+------+------------------+------------------+
|            features| Label|        prediction|         Residuals|
+--------------------+------+------------------+------------------+
|[0.0,1.0,0.450558...|316.79|174.87918741927427|141.91081258072575|
|[0.0,1.0,0.450558...|109.88|173.41625074663162|-63.53625074663162|
|[0.0,1.0,0.450558...|192.68|172.56201849372488| 20.11798150627513|
|[0.0,1.0,0.450558...|215.88|172.56201849372488| 43.31798150627512|
|[0.0,1.0,0.450558...|108.63| 158.4970247727116| -49.8670247727116|
|[0.09275777647036...|339.68| 193.7762902693422| 145.9037097306578|
|[0.26997732060503...|116.06|202.13398147581205|-86.07398147581205|
|[0.29574785067658...| 166.3| 198.1464756127939|-31.84647561279388|
|[0.29574785067658...|265.18|194.64351470472775| 70.53648529527226|
|[0.29574785067658...|164.87|189.665552

In [10]:
lrModel

LinearRegression_43fcae7f4e9e7953d8a7

In [11]:
print(summary)

<pyspark.ml.regression.LinearRegressionTrainingSummary object at 0x7fb840105eb8>
