In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.feature import VectorAssembler
import matplotlib.pyplot as plt
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from pyspark.sql.functions import first

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# 5. Linear Regression Analysis

## 5.1 Regression with RF metrics

In [5]:
venmo_reg = spark.read.parquet('/FileStore/tables/venmo_reg.parquet')

In [6]:
# Change the dataframe into the model suitable format
vectorAssembler = VectorAssembler(inputCols = ['recency', 'frequency'], outputCol = 'features')
venmo_reg_agg = vectorAssembler.transform(venmo_reg)

In [7]:
# Train and test split
for i in range(0,13):
    splits = venmo_reg_agg[venmo_reg_agg['month']==i].randomSplit([0.7, 0.3])
    train = splits[0].select(['features','Y'])
    test = splits[1].select(['features','Y'])
    train_df = train.withColumnRenamed('Y','label')
    test_df = test.withColumnRenamed('Y','label')
    train_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/train_rf_month'+str(i)+'.parquet')
    test_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/test_rf_month'+str(i)+'.parquet')

In [8]:
MSE_list = []
for i in range(0,13):  
    # For each user’s lifetime point, regress recency and frequency on Y. 
    lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # TrainValidationSplit will try all combinations of values and determine best model using the evaluator.
    paramGrid = ParamGridBuilder()\
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
        .build()

    # A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    tvs = TrainValidationSplit(estimator=lr,
                               estimatorParamMaps=paramGrid,
                               evaluator=RegressionEvaluator(),
                               # 80% of the data will be used for training, 20% for validation.
                               trainRatio=0.8)

    # Run TrainValidationSplit, and choose the best set of parameters.
    train_df = spark.read.parquet('/FileStore/tables/train_rf_month'+str(i)+'.parquet')
    test_df = spark.read.parquet('/FileStore/tables/test_rf_month'+str(i)+'.parquet')
    model = tvs.fit(train_df)
    
    # Out of Sample Mean Squared Error
    lr_predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName="mse")
    MSE = evaluator.evaluate(lr_predictions)
    MSE_list.append(MSE)
    
    # Print the performance of the models
    print('month'+str(i))
    print(model.bestModel._java_obj.getRegParam())
    print(model.bestModel._java_obj.getElasticNetParam())
    print("Coefficients: " + str(model.bestModel.coefficients))
    r2_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="label",metricName="r2")
    print("R Squared (R2) on test data = %g" % r2_evaluator.evaluate(lr_predictions))

In [9]:
month_list = [0,1,2,3,4,5,6,7,8,9,10,11,12]
MSE_list_RF = MSE_list
fig, ax = plt.subplots()
ax.plot(month_list, MSE_list_RF)

ax.set(xlabel='month of lifetime', ylabel='MSE',
       title=' MSE for each lifetime point - RF')
ax.grid()

plt.show()

## 5.2 Regression with RF metrics and spending behavior profile metrics

In [11]:
accum_profile_prop = spark.read.parquet('/FileStore/tables/accum_profile_prop.parquet')
accum_profile_prop.show()

In [12]:
venmo_reg_sb = venmo_reg.join(accum_profile_prop,['user','month'],how ='inner').orderBy('user','month').cache()

In [13]:
# Change the dataframe into the model suitable format
vectorAssembler = VectorAssembler(inputCols = ['activity_perc','food_perc','illegal_perc','others_perc','people_perc','utility_perc','cash_perc','event_perc','transp_perc','travel_perc','recency','frequency'], outputCol = 'features')
venmo_reg_sb_agg = vectorAssembler.transform(venmo_reg_sb)

In [14]:
# Train and test split
for i in range(0,13):
    splits = venmo_reg_sb_agg[venmo_reg_sb_agg['month']==i].randomSplit([0.7, 0.3])
    train = splits[0].select(['features','Y'])
    test = splits[1].select(['features','Y'])
    train_df = train.withColumnRenamed('Y','label')
    test_df = test.withColumnRenamed('Y','label')
    train_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/train_rf_sb_month'+str(i)+'.parquet')
    test_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/test_rf_sb_month'+str(i)+'.parquet')

In [15]:
MSE_list = []
for i in range(0,13):  
    # For each user’s lifetime point, regress recency, frequency AND her spending behavior profile on Y.
    lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # TrainValidationSplit will try all combinations of values and determine best model using the evaluator.
    paramGrid = ParamGridBuilder()\
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .addGrid(lr.fitIntercept, [False, True])\
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
        .build()

    # A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    tvs = TrainValidationSplit(estimator=lr,
                               estimatorParamMaps=paramGrid,
                               evaluator=RegressionEvaluator(),
                               # 80% of the data will be used for training, 20% for validation.
                               trainRatio=0.8)

    # Run TrainValidationSplit, and choose the best set of parameters.
    train_df = spark.read.parquet('/FileStore/tables/train_rf_sb_month'+str(i)+'.parquet')
    test_df = spark.read.parquet('/FileStore/tables/test_rf_sb_month'+str(i)+'.parquet')
    model = tvs.fit(train_df)
    
    # Out of Sample Mean Squared Error
    lr_predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName="mse")
    MSE = evaluator.evaluate(lr_predictions)
    MSE_list.append(MSE)
    
    # Print the performance of the models
    print('month'+str(i))
    print(model.bestModel._java_obj.getRegParam())
    print(model.bestModel._java_obj.getElasticNetParam())
    print("Coefficients: " + str(model.bestModel.coefficients))
    r2_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="label",metricName="r2")
    print("R Squared (R2) on test data = %g" % r2_evaluator.evaluate(lr_predictions))

In [16]:
month_list = [0,1,2,3,4,5,6,7,8,9,10,11,12]
MSE_list_RF_SB  = MSE_list
fig, ax = plt.subplots()
ax.plot(month_list, MSE_list_RF_SB,color='tab:orange',label='RF and Spending Profile')

ax.set(xlabel='month of lifetime', ylabel='MSE',
       title=' MSE for each lifetime point - RF and Spending profile')
ax.grid()
ax.legend()

plt.show()

## 5.3 Regression with social network metrics

In [18]:
# Load the dataset
friend_df = spark.read.parquet('/FileStore/tables/dynamic_fof_degree.parquet')
cluster_coef_df = spark.read.parquet('/FileStore/tables/cluster_coef_final.parquet')
page_rank_df = spark.read.parquet('dbfs:/FileStore/tables/pagerank_df.parquet')

In [19]:
friend_df.createOrReplaceTempView("friend_df")
venmo_reg.createOrReplaceTempView("venmo_reg")
venmo_reg_sn = spark.sql("""
                SELECT v.user, v.month, v.Y, v.recency, v.frequency, 
                ifnull(f.degree,0) as friend_degree, 
                ifnull(f.fof_degree,0) as fof_degree
                FROM venmo_reg v LEFT JOIN friend_df f
                ON v.user = f.user AND v.month = f.month_passed
                """)
venmo_reg_sn.coalesce(1).write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/venmo_reg_sn.parquet')

In [20]:
venmo_reg_sn = spark.read.parquet('/FileStore/tables/venmo_reg_sn.parquet')
venmo_reg_sn.createOrReplaceTempView("venmo_reg_sn")
cluster_coef_df.createOrReplaceTempView("cluster_coef_df")
page_rank_df.createOrReplaceTempView("page_rank_df")
venmo_reg_sn_1 = spark.sql("""
                SELECT v.*,
                ifnull(c.cc,0) as clustering_coefficient,
                ifnull(p.pagerank,0) as pagerank
                FROM venmo_reg_sn v LEFT JOIN cluster_coef_df c
                ON v.user = c.user AND v.month = c.month_passed
                LEFT JOIN page_rank_df p
                ON v.user = p.user AND v.month = p.month_passed
                """)
venmo_reg_sn_1.sort('user','month').coalesce(1).write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/venmo_reg_sn_1.parquet')

In [21]:
# Change the dataframe into the model suitable format
venmo_reg_sn_1 = spark.read.parquet('/FileStore/tables/venmo_reg_sn_1.parquet')
vectorAssembler = VectorAssembler(inputCols = ['friend_degree','fof_degree','clustering_coefficient','pagerank'], outputCol = 'features')
venmo_reg_sn_agg = vectorAssembler.transform(venmo_reg_sn_1)

In [22]:
# Train and test split
for i in range(0,13):
    splits = venmo_reg_sn_agg[venmo_reg_sn_agg['month']==i].randomSplit([0.7, 0.3])
    train = splits[0].select(['features','Y'])
    test = splits[1].select(['features','Y'])
    train_df = train.withColumnRenamed('Y','label')
    test_df = test.withColumnRenamed('Y','label')
    train_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/train_sn_month'+str(i)+'.parquet')
    test_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/test_sn_month'+str(i)+'.parquet')

In [23]:
MSE_list = []
for i in range(0,13):  
    # For each user’s lifetime point, regress her social network metrics on Y. 
    lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # TrainValidationSplit will try all combinations of values and determine best model using the evaluator.
    paramGrid = ParamGridBuilder()\
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .addGrid(lr.fitIntercept, [False, True])\
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
        .build()

    # A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    tvs = TrainValidationSplit(estimator=lr,
                               estimatorParamMaps=paramGrid,
                               evaluator=RegressionEvaluator(),
                               # 80% of the data will be used for training, 20% for validation.
                               trainRatio=0.8)

    # Run TrainValidationSplit, and choose the best set of parameters.
    train_df = spark.read.parquet('/FileStore/tables/train_sn_month'+str(i)+'.parquet')
    test_df = spark.read.parquet('/FileStore/tables/test_sn_month'+str(i)+'.parquet')
    model = tvs.fit(train_df)
    
    # Out of Sample Mean Squared Error
    lr_predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName="mse")
    MSE = evaluator.evaluate(lr_predictions)
    MSE_list.append(MSE)
    
    # Print the performance of the models
    print('month'+str(i))
    print(model.bestModel._java_obj.getRegParam())
    print(model.bestModel._java_obj.getElasticNetParam())
    print("Coefficients: " + str(model.bestModel.coefficients))
    r2_evaluator = RegressionEvaluator(predictionCol="prediction",labelCol="label",metricName="r2")
    print("R Squared (R2) on test data = %g" % r2_evaluator.evaluate(lr_predictions))

In [24]:
import matplotlib.pyplot as plt

month_list = [0,1,2,3,4,5,6,7,8,9,10,11,12]
MSE_list_SN  = MSE_list
fig, ax = plt.subplots()
ax.plot(month_list, MSE_list_SN,color='tab:blue',label='RF')

ax.set(xlabel='month of lifetime', ylabel='MSE',
       title=' MSE for each lifetime point - Social Network Metrics')
ax.grid()

plt.show()

## 5.4 Regression with social network metrics and the spending behavior profile metrics

In [26]:
venmo_reg_sn_1 = spark.read.parquet('/FileStore/tables/venmo_reg_sn_1.parquet')
accum_profile_prop = spark.read.parquet('/FileStore/tables/accum_profile_prop.parquet')
venmo_reg_sn_sb = venmo_reg_sn_1.join(accum_profile_prop,['user','month'],how ='inner').orderBy('user','month').cache()

In [27]:
# Prepare for user and friend table with month interval
venmo_data = spark.read.parquet('/FileStore/tables/VenmoSample_snappy-e020d.parquet')
venmo_data.createOrReplaceTempView('venmo_data')

dynamic_friending = spark.sql("SELECT distinct user, friend, datetime\
                               FROM  \
                               (SELECT user1 as user, user2 as friend, datetime \
                               FROM venmo_data \
                               UNION \
                               SELECT user2 as user, user1 as friend, datetime \
                               FROM venmo_data) ")
dynamic_friending.createOrReplaceTempView('dynamic_friending')

In [28]:
# create user and friend table with month interval
window = Window.partitionBy('user').orderBy('datetime')
dynamic_friending = dynamic_friending.withColumn("month_passed", F.ceil((F.datediff(dynamic_friending.datetime, 
                                   first(dynamic_friending.datetime).over(window)))/30) )

In [29]:
# Calculate the users' spending behavior of his/her social network
dynamic_friending.createOrReplaceTempView("dynamic_friending")
venmo_reg_sn_sb.createOrReplaceTempView("venmo_reg_sn_sb")
venmo_reg_sn_sb = spark.sql("""
                SELECT v1.user, v1.month, v1.Y, v1.recency, v1.frequency, 
                v1.friend_degree, v1.fof_degree, v1.clustering_coefficient,
                v1.pagerank, 
                AVG(v2.activity_perc) as activity_perc,
                AVG(v2.food_perc) as food_perc,
                AVG(v2.illegal_perc) as illegal_perc,
                AVG(V2.others_perc) as others_perc,
                AVG(V2.people_perc) as people_perc,
                AVG(V2.utility_perc) as utility_perc,
                AVG(V2.cash_perc) as cash_perc,
                AVG(V2.event_perc) as event_perc,
                AVG(V2.transp_perc) as transp_perc,
                AVG(V2.travel_perc) as travel_perc
                FROM venmo_reg_sn_sb v1 LEFT JOIN dynamic_friending f
                ON v1.user = f.user 
                LEFT JOIN venmo_reg_sn_sb v2
                ON f.friend = v2.user
                group by v1.user, v1.month, v1.Y, v1.recency, v1.frequency, 
                v1.friend_degree, v1.fof_degree, v1.clustering_coefficient,
                v1.pagerank
                """)

In [30]:
# Change the dataframe into the model suitable format
vectorAssembler = VectorAssembler(inputCols = ['friend_degree','fof_degree','clustering_coefficient','pagerank','activity_perc','food_perc','illegal_perc','others_perc','people_perc','utility_perc','cash_perc','event_perc','transp_perc','travel_perc'], outputCol = 'features')
venmo_reg_sn_sb_agg = vectorAssembler.transform(venmo_reg_sn_sb)
venmo_reg_sn_sb_agg.write.parquet('venmo_reg_sn_sb_agg.parquet',mode='overwrite')

In [31]:
venmo_reg_sn_sb_agg = spark.read.parquet('venmo_reg_sn_sb_agg.parquet')
# Train and test split
for i in range(0,13):
    splits = venmo_reg_sn_sb_agg[venmo_reg_sn_sb_agg['month']==i].randomSplit([0.7, 0.3])
    train = splits[0].select(['features','Y'])
    test = splits[1].select(['features','Y'])
    train_df = train.withColumnRenamed('Y','label')
    test_df = test.withColumnRenamed('Y','label')
    train_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/train_sn_sb_month'+str(i)+'.parquet')
    test_df.write.mode('overwrite').format("parquet").save('dbfs:/FileStore/tables/test_sn_sb_month'+str(i)+'.parquet')

In [32]:
MSE_list = []
for i in range(0,13):  
    # For each user’s lifetime point, regress his/her social network metrics and the spending behavior of her social network on Y.
    lr = LinearRegression(featuresCol='features', labelCol='label', maxIter=10)

    # We use a ParamGridBuilder to construct a grid of parameters to search over.
    # TrainValidationSplit will try all combinations of values and determine best model using the evaluator.
    paramGrid = ParamGridBuilder()\
        .addGrid(lr.regParam, [0.1, 0.01]) \
        .addGrid(lr.fitIntercept, [False, True])\
        .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
        .build()

    # A TrainValidationSplit requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
    tvs = TrainValidationSplit(estimator=lr,
                               estimatorParamMaps=paramGrid,
                               evaluator=RegressionEvaluator(),
                               # 80% of the data will be used for training, 20% for validation.
                               trainRatio=0.8)

    # Run TrainValidationSplit, and choose the best set of parameters.
    train_df = spark.read.parquet("/FileStore/tables/train_sn_sb_month"+str(i)+".parquet")
    test_df = spark.read.parquet("/FileStore/tables/test_sn_sb_month"+str(i)+".parquet")
    model = tvs.fit(train_df)
    
    # Out of Sample Mean Squared Error
    lr_predictions = model.transform(test_df)
    evaluator = RegressionEvaluator(metricName="mse")
    MSE = evaluator.evaluate(lr_predictions)
    MSE_list.append(MSE)
    
    #
    print(i)
    print("Coefficients: " + str(model.bestModel.coefficients))

In [33]:
import matplotlib.pyplot as plt

month_list = [0,1,2,3,4,5,6,7,8,9,10,11,12]
MSE_list_SN_SB  = MSE_list
fig, ax = plt.subplots()
ax.plot(month_list, MSE_list_SN_SB,color='tab:blue',label='RF')

ax.set(xlabel='month of lifetime', ylabel='MSE',
       title=' MSE for each lifetime point - Social Network Metrics and Spending Profile')
ax.grid()

plt.show()