# Customer Analysis - Model Customer Behavior


## Import

In [1]:
import os
import pyspark
import pandas as pd
import pyspark.sql.functions as f
import pyspark.sql.types as T
import plotly.express as px
import plotly.graph_objects as go

## Read

In [2]:
spark = pyspark.sql.SparkSession \
    .builder \
    .master("local") \
    .appName("app_great") \
    .config("spark.executor.memory", f"16g") \
    .config("spark.driver.memory", f"16g") \
    .config("spark.memory.offHeap.enabled", True) \
    .config("spark.memory.offHeap.size", f"16g") \
    .config("spark.sql.debug.maxToStringFields", f"16") \
    .getOrCreate()
sc = spark.sparkContext

In [3]:
# Testing / Debug

# sdf = spark.read.csv("data/test_data.csv", header=True, inferSchema=True)


In [4]:

sdf_201911 = spark.read.csv("data/2019-Nov.csv", header=True, inferSchema=True)
sdf_201910 = spark.read.csv("data/2019-Oct.csv", header=True, inferSchema=True)

sdf = sdf_201910.union(sdf_201911)
# sdf = sdf_201911


## Preparation

In [5]:
# Feature Splitting
# sdf = sdf.withColumn("category_class", f.substring_index(sdf.category_code, '.', 1))

sdf = sdf.withColumn("category_class", f.split(sdf["category_code"], "\.").getItem(0))
sdf = sdf.withColumn("category_sub_class", f.split(sdf["category_code"], "\.").getItem(1))
sdf = sdf.withColumn("category_sub_sub_class", f.split(sdf["category_code"], "\.").getItem(2))

sdf = sdf.withColumn("year", f.year("event_time"))
sdf = sdf.withColumn("month", f.month("event_time"))
sdf = sdf.withColumn("weekofyear", f.weekofyear("event_time"))
sdf = sdf.withColumn("dayofyear", f.dayofyear("event_time"))
sdf = sdf.withColumn("dayofweek", f.dayofweek("event_time"))
sdf = sdf.withColumn("dayofmonth", f.dayofmonth("event_time"))
sdf = sdf.withColumn("hour", f.hour("event_time"))

sdf = sdf.withColumn('turnover', f.when(f.col('event_type') == 'purchase', f.col('price')).otherwise(0))

sdf = sdf.withColumn('purchases', f.when(f.col('event_type') == 'purchase', f.lit(1)).otherwise(0))
sdf = sdf.withColumn('views', f.when(f.col('event_type') == 'view', f.lit(1)).otherwise(0))
sdf = sdf.withColumn('carts', f.when(f.col('event_type') == 'cart', f.lit(1)).otherwise(0))


# None Handling
# sdf = sdf.fillna(value="not defined")

sdf.printSchema()

root
 |-- event_time: string (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- category_class: string (nullable = true)
 |-- category_sub_class: string (nullable = true)
 |-- category_sub_sub_class: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- weekofyear: integer (nullable = true)
 |-- dayofyear: integer (nullable = true)
 |-- dayofweek: integer (nullable = true)
 |-- dayofmonth: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- turnover: double (nullable = true)
 |-- purchases: integer (nullable = false)
 |-- views: integer (nullable = false)
 |-- carts: integer (nullable = false)



In [6]:
# sdf.show(1, vertical=True, truncate=False)

## Data Creation

In [7]:
sdf_session = sdf.select("user_id", "user_session", "event_type", "product_id", "price", "event_time",'purchases',
'views', 'carts') 

sdf_session = sdf_session.withColumn("bought_product", f.when(sdf_session.event_type == "purchase", sdf_session["product_id"]).otherwise(None))

sdf_session = sdf_session.withColumn("first_event", sdf_session.event_time)
sdf_session = sdf_session.withColumn("last_event", sdf_session.event_time)

In [8]:
# sdf_session.show()

In [9]:
sdf_session_agg = sdf_session.groupBy("user_id", "user_session").agg(f.avg("price"), f.sum("views"), f.sum("purchases"), f.sum("carts"), f.min("event_time"), f.max("event_time"), f.collect_list("bought_product"))
sdf_session_agg = sdf_session_agg.withColumn("duration", (sdf_session_agg["max(event_time)"] - sdf_session_agg["min(event_time)"]))
sdf_session_agg = sdf_session_agg.withColumn("sum(events)", (sdf_session_agg["sum(views)"] + sdf_session_agg["sum(purchases)"] + sdf_session_agg["sum(carts)"]))
sdf_session_agg = sdf_session_agg.withColumn("turnover", f.when(sdf_session_agg["sum(purchases)"] > 0, (sdf_session_agg["sum(purchases)"] *  sdf_session_agg["avg(price)"])).otherwise(0))
sdf_session_agg = sdf_session_agg.withColumn("avg(price)", f.round(sdf_session_agg["avg(price)"],2) )

sdf_session_agg = sdf_session_agg.withColumn("successfull", f.when(sdf_session_agg["sum(purchases)"] > 0, 1).otherwise(0))
sdf_session_agg.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)
 |-- avg(price): double (nullable = true)
 |-- sum(views): long (nullable = true)
 |-- sum(purchases): long (nullable = true)
 |-- sum(carts): long (nullable = true)
 |-- min(event_time): string (nullable = true)
 |-- max(event_time): string (nullable = true)
 |-- collect_list(bought_product): array (nullable = false)
 |    |-- element: integer (containsNull = false)
 |-- duration: double (nullable = true)
 |-- sum(events): long (nullable = true)
 |-- turnover: double (nullable = true)
 |-- successfull: integer (nullable = false)



In [10]:
# sdf_session_agg.show(truncate=False)

In [11]:
# sdf_session_agg.where(sdf_session_agg["turnover"] > 0).show()

In [12]:
sdf_customer_profile = sdf_session_agg.groupBy("user_id").agg(f.sum("sum(events)").alias("sum_events"), f.sum("sum(views)").alias("sum_views"), f.sum("sum(purchases)").alias("sum_purchases"), f.sum("sum(carts)").alias("sum_carts"), f.sum("turnover").alias("sum_turnover"), f.count("user_session").alias("count_session"), f.sum("successfull").alias("sum_successfull"), f.collect_list("collect_list(bought_product)").alias("bought_product"), f.collect_list("user_session").alias("user_sessions"), f.avg("duration"))

sdf_customer_profile = sdf_customer_profile.withColumn("avg_turnover_per_session", (sdf_customer_profile["sum_turnover"] / sdf_customer_profile["count_session"]))
sdf_customer_profile = sdf_customer_profile.withColumn("avg_events_per_session", (sdf_customer_profile["sum_events"] / sdf_customer_profile["count_session"]))


sdf_customer_profile.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- sum_events: long (nullable = true)
 |-- sum_views: long (nullable = true)
 |-- sum_purchases: long (nullable = true)
 |-- sum_carts: long (nullable = true)
 |-- sum_turnover: double (nullable = true)
 |-- count_session: long (nullable = false)
 |-- sum_successfull: long (nullable = true)
 |-- bought_product: array (nullable = false)
 |    |-- element: array (containsNull = false)
 |    |    |-- element: integer (containsNull = false)
 |-- user_sessions: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- avg(duration): double (nullable = true)
 |-- avg_turnover_per_session: double (nullable = true)
 |-- avg_events_per_session: double (nullable = true)



In [13]:
# sdf_customer_profile.where(sdf_customer_profile["avg_turnover_per_session"] > 0).show(truncate=False)

In [14]:
sdf_export = sdf_customer_profile.withColumn("bought_product", sdf_customer_profile["bought_product"].cast(pyspark.sql.types.StringType()))
sdf_export = sdf_export.withColumn("user_sessions", sdf_export["user_sessions"].cast(pyspark.sql.types.StringType()))


In [15]:
sdf_export.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- sum_events: long (nullable = true)
 |-- sum_views: long (nullable = true)
 |-- sum_purchases: long (nullable = true)
 |-- sum_carts: long (nullable = true)
 |-- sum_turnover: double (nullable = true)
 |-- count_session: long (nullable = false)
 |-- sum_successfull: long (nullable = true)
 |-- bought_product: string (nullable = false)
 |-- user_sessions: string (nullable = false)
 |-- avg(duration): double (nullable = true)
 |-- avg_turnover_per_session: double (nullable = true)
 |-- avg_events_per_session: double (nullable = true)



In [16]:
sdf_export.coalesce(1).write.format("csv").mode("overwrite").save("data/customer_profile.csv", header="true")

In [17]:
sdf_customer_profile = spark.read.csv("data/customer_profiles.csv", header=True, inferSchema=True)

In [18]:
sdf_customer_profile.show()

+---------+----------+---------+-------------+---------+------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+
|  user_id|sum_events|sum_views|sum_purchases|sum_carts|sum_turnover|count_session|sum_successfull|      bought_product|       user_sessions|avg(duration)|avg_turnover_per_session|avg_events_per_session|
+---------+----------+---------+-------------+---------+------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+
|240522111|        34|       34|            0|        0|         0.0|           18|              0|[[], [], [], [], ...|[e11c1288-8be8-4d...|         null|                     0.0|    1.8888888888888888|
|269003139|        15|       14|            0|        1|         0.0|            2|              0|            [[], []]|[29f24176-f471-42...|         null|                     0.0|    

## Data Split

In [19]:
(trainingData, testData, devData) = sdf_customer_profile.where(sdf_customer_profile["avg_turnover_per_session"] > 0).randomSplit([0.6, 0.3, 0.1], seed=123)

In [20]:
devData = devData.limit(10000)
devData.show(5)

+---------+----------+---------+-------------+---------+-----------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+
|  user_id|sum_events|sum_views|sum_purchases|sum_carts|     sum_turnover|count_session|sum_successfull|      bought_product|       user_sessions|avg(duration)|avg_turnover_per_session|avg_events_per_session|
+---------+----------+---------+-------------+---------+-----------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+
|303160429|         3|        2|            1|        0|           340.59|            2|              1|     [[5100443], []]|[8957377b-66b3-46...|         null|                 170.295|                   1.5|
|426687856|        13|       10|            1|        2|55.01285714285713|            5|              1|[[3900739], [], [...|[589f9173-b999-44...|         null|    

## Modelling

### K-Means

Task:
Cluster (unsupervised) the customer in groups:

- high 
- medium
- low
- NULL with potential
- NULL without potential


Target Feature -> Turnover

In [21]:
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.feature import VectorAssembler


In [25]:
dataset = trainingData.where(sdf_customer_profile["avg_turnover_per_session"] > 0)

In [26]:
dataset.show()

+---------+----------+---------+-------------+---------+------------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+
|  user_id|sum_events|sum_views|sum_purchases|sum_carts|      sum_turnover|count_session|sum_successfull|      bought_product|       user_sessions|avg(duration)|avg_turnover_per_session|avg_events_per_session|
+---------+----------+---------+-------------+---------+------------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+
|225644257|        17|       14|            1|        2|             40.91|            9|              1|[[6800309], [], [...|[3601c672-9a75-46...|         null|       4.545555555555556|    1.8888888888888888|
|304707635|         5|        2|            1|        2|2007.5199999999998|            2|              1|     [[], [1005136]]|[9d0ec806-edbc-4e...|         null

In [27]:
features=("sum_views", "sum_turnover", "sum_purchases", "sum_carts", "count_session")

assembler = VectorAssembler(inputCols=features,outputCol="features")

dataset=assembler.transform(dataset)
dataset.select("features").show(truncate=False)

+--------------------------------------+
|features                              |
+--------------------------------------+
|[14.0,40.91,1.0,2.0,9.0]              |
|[2.0,2007.5199999999998,1.0,2.0,2.0]  |
|[7.0,74.13,1.0,3.0,5.0]               |
|[74.0,142.29,1.0,4.0,63.0]            |
|[11.0,177.81833333333336,1.0,5.0,4.0] |
|[180.0,468.0293882978723,2.0,5.0,14.0]|
|[72.0,2121.8056785714284,3.0,0.0,18.0]|
|[24.0,123.25791666666667,1.0,0.0,2.0] |
|[13.0,20.113333333333337,1.0,3.0,4.0] |
|[7.0,301.495,1.0,0.0,1.0]             |
|[32.0,95.45,1.0,4.0,6.0]              |
|[1.0,231.64,1.0,1.0,1.0]              |
|[6.0,347.4571428571429,1.0,1.0,2.0]   |
|[8.0,21.11,1.0,0.0,4.0]               |
|[26.0,66.93,1.0,0.0,4.0]              |
|[7.0,951.7442857142858,1.0,0.0,2.0]   |
|[16.0,82.99,1.0,3.0,13.0]             |
|[3.0,586.03,1.0,1.0,1.0]              |
|[15.0,157.81,2.0,3.0,12.0]            |
|[5.0,33.44,1.0,2.0,4.0]               |
+--------------------------------------+
only showing top

In [30]:
# Trains a k-means model.
kmeans = KMeans().setK(4).setSeed(123)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {str(silhouette)}\n")


# Evaluate clustering.
# cost = model.computeCost(dataset)
# print("Within Set Sum of Squared Errors = " + str(cost))

# Shows the result.
print("Cluster Centers: ")
ctr=[]
centers = model.clusterCenters()
for center in centers:
    ctr.append(center)
    print(center)

Silhouette with squared euclidean distance = 0.9375706272150515

Cluster Centers: 
[ 55.21483558 431.33503323   1.85265346   3.37314517  10.0899097 ]
[  175.22133939 18684.07166066    31.51986379    34.78036322
    43.85584563]
[  88.69686213 4412.9756773     9.32046003   12.03365043   19.29859435]
[  379.2745098  78925.39632139    94.38235294    95.32352941
    86.90196078]


In [38]:
model.write().overwrite().save("knn-model")

In [27]:
predictions.show()

+---------+----------+---------+-------------+---------+------------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+--------------------+----------+
|  user_id|sum_events|sum_views|sum_purchases|sum_carts|      sum_turnover|count_session|sum_successfull|      bought_product|       user_sessions|avg(duration)|avg_turnover_per_session|avg_events_per_session|            features|prediction|
+---------+----------+---------+-------------+---------+------------------+-------------+---------------+--------------------+--------------------+-------------+------------------------+----------------------+--------------------+----------+
|468371772|        20|       16|            1|        3|             82.99|           13|              1|[[3200543], [], [...|[6ee8d8e6-c8fd-4b...|         null|       6.383846153846154|    1.5384615384615385|[16.0,82.99,1.0,3...|         0|
|512375293|         5|        3|

In [28]:
predictions.select("prediction").distinct().show()

+----------+
|prediction|
+----------+
|         1|
|         3|
|         2|
|         0|
+----------+



In [29]:
centers = pd.DataFrame(ctr,columns=features)

In [30]:
centers.head()

Unnamed: 0,sum_views,sum_turnover,sum_purchases,sum_carts,count_session
0,55.485569,488.225229,1.957207,3.494342,10.204791
1,104.864327,6388.053047,12.88707,15.847328,23.719289
2,488.871795,115258.60899,141.794872,143.076923,113.74359
3,223.815686,28788.055955,45.777778,48.598693,54.303268


In [95]:
# Visualise result
fig = px.scatter(centers, x="sum_carts", y="sum_turnover", size="count_session", color="sum_views", hover_data=["sum_purchases"])
fig.show()

In [96]:
fig = px.scatter(predictions.toPandas(), x="sum_carts", y="sum_turnover", size="count_session", color="sum_views", hover_data=["sum_purchases"])
fig.show()

In [97]:
df = px.data.iris()
fig = px.scatter(df, x="sepal_width", y="sepal_length", color="species",
                 size='petal_length', hover_data=['petal_width'])
fig.show()

In [98]:
# Ellbow Curve


### Gaussian Mixture

In [99]:

# loads data
# dataset = spark.read.format("libsvm").load("data/mllib/sample_kmeans_data.txt")

# gmm = GaussianMixture().setK(2).setSeed(538009335)
# model = gmm.fit(dataset)

# print("Gaussians shown as a DataFrame: ")
# model.gaussiansDF.show(truncate=False)

Next Steps: Find similiarites inside these customer groups:

- Buys same products
- similar behavior -> same marketing