In [0]:
import pandas as pd
import sys
import math
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, DoubleType, FloatType
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window
from plotnine import *
import seaborn as sns
import matplotlib.pyplot as plt

In [0]:
#I decided to pull the data from the ds460_sp22_muskrats feature since that table already had the ticker column included. In the table below, I am #including the necessary columns to do the wrangling and feature engineering to get my Customer Engagement score.
###################################################################################################################################################

monthly_patterns_data = spark.sql("""SELECT stock_symbol AS ticker, Year(date_range_start) AS year, Quarter(date_range_start) AS quarter, date_range_start, date_range_end, bucketed_dwell_times FROM ds460_sp22_muskrats.nl_feature
""")
display(monthly_patterns_data)

In [0]:
def return_score(data_collected):
    score_CE = score_less_5_min = score_5_10_min = score_11_20_min = score_21_60_min = score_61_120_min = 0
   #Multiply values by each dwell time to get a score reflected in the Points column. Each location will get big scores if they had visits staying longer.
    score_less_5_min = data_collected['<5'] * 1
    score_5_10_min = data_collected['5-10'] * 2
    score_11_20_min = data_collected['11-20'] * 3
    score_21_60_min = data_collected['21-60'] * 4
    score_61_120_min = data_collected['61-120'] * 5
    score_CE = score_less_5_min + score_5_10_min + score_11_20_min + score_21_60_min + score_61_120_min
    return score_CE
  
#Generate a new column "Points" with the results of the return_score function.
monthly_patterns_data = monthly_patterns_data.withColumn("Points", F.round(return_score(monthly_patterns_data.bucketed_dwell_times), 2))

display(monthly_patterns_data)


In [0]:
#Organizing the data a little bit and dropping duplicates. Order the rows in ascending order by year, quarter, and date_range_start. This will help me to get the zscore based on all the previous quarters.

monthly_patterns_data = monthly_patterns_data.orderBy(F.col("year").asc(), F.col("quarter").asc(), F.col("date_range_start").asc())
monthly_patterns_data = monthly_patterns_data.dropDuplicates(["ticker", "year", "quarter", "date_range_start", "date_range_end"])
display(monthly_patterns_data)

In [0]:
monthly_patterns_data = monthly_patterns_data.withColumn("Points", F.col("Points").cast("int"))

#This function returns the z-score for each row.
def get_z_score(value, window_partition):
    avg = F.avg(value).over(window_partition)
    avg_sq = F.avg(value * value).over(window_partition)
    sd = F.sqrt(avg_sq - avg * avg)
    return (value - avg) / sd

window_partition = Window.rowsBetween(-sys.maxsize, 0) #This will get the data partitioned by all the previous quarters of the current row, and then create a new column to store the z-scores.
z_scored_data = monthly_patterns_data.withColumn("zscore", F.round(get_z_score(monthly_patterns_data.Points, window_partition), 4))

z_scored_data = z_scored_data.na.drop() #Drop any null values if any

display(z_scored_data)

In [0]:
#The following variables and function helps to return the normal distribution value based on each z-score. I found the formula on how getting the percentage value of a z-score table. The values are returned as a list, and then I convert them into a spark dataframe. 
z_scores = []

index = 0
z_data = z_scored_data.select('zscore').rdd.flatMap(lambda x: x).collect()
for x in z_data:
    z_scores.append(.5 * (math.erf(z_data[index] / 2 ** .5) + 1))
    index += 1

zptile_data = spark.createDataFrame(z_scores, FloatType())

display(zptile_data) #Display the percentages in the value column

df1 = z_scored_data.withColumn("id1", monotonically_increasing_id())

window = Window.orderBy(F.col('id1'))
df1 = df1.withColumn('increasing_id', F.row_number().over(window))

df2 = zptile_data.withColumn("id2", monotonically_increasing_id())
window = Window.orderBy(F.col('id2'))
df2 = df2.withColumn('increasing_id2', F.row_number().over(window))

new_dataframe = df1.join(df2, df1.increasing_id == df2.increasing_id2, "inner").drop("id1","id2", "increasing_id", "increasing_id2", "zscore", "bucketed_dwell_times", "date_range_start", "date_range_end", "Points")
new_dataframe = new_dataframe.withColumnRenamed("value","CE_Score_Percentage")

display(new_dataframe) #Finally, the Customer Engagement percentages joined with the columns "year", "quarter", and "ticker". This is the final table that can be joined to the final table to feed our machine learning model.

In [0]:
#Order the table in descending mode based on the Customer Engagement Percentage. 
new_dataframe = new_dataframe.orderBy(F.col("CE_Score_Percentage").desc())
display(new_dataframe)

In [0]:
%sql
DROP table cgjde.sanahuano_Customer_Engagement_Score

In [0]:
new_dataframe.persist()
new_dataframe.repartition(10).write.format("delta").saveAsTable("cgjde.sanahuano_Customer_Engagement_Score")
new_dataframe.unpersist()

In [0]:
%sql
SELECT * FROM cgjde.sanahuano_Customer_Engagement_Score

In [0]:
%sql
DROP table model.sanahuano_Customer_Engagement_Score

In [0]:
my_model = spark.sql("""SELECT CE.ticker, CE.year, CE.quarter, CE.CE_Score_Percentage, T.revenue, T.eps_surprise_percentage FROM monthly_all.targets AS T INNER JOIN cgjde.sanahuano_Customer_Engagement_Score AS CE ON T.ticker = CE.ticker AND T.year = CE.year AND T.quarter_num = CE.quarter""")

my_model.persist()
my_model.repartition(10).write.format("delta").saveAsTable("model.sanahuano_Customer_Engagement_Score")
my_model.unpersist()

#Final Table

In [0]:
%sql
SELECT * FROM model.sanahuano_Customer_Engagement_Score