# Clustering with Spark - Customer Segmentation
In the previous exercise we have tried to get familiar to spark sql and the bills dataset. Now, we want to cluster the clients with respect to their shopping behaviour by using the KMeans algorithm of the Spark ML library.

As before we need to import the needed modules and **create a SparkSession**.

In [None]:
# Import SparkSession
from pyspark.sql import <FILL-IN>

In [None]:
# Create spark object
spark = SparkSession.builder.getOrCreate()

Read the dataframe **bills_exploded_parquet**, which we have created in the previous exercise. Call the dataframe **bills_exploded**. Afterwards, **cache** the dataframe by using the cache method and trigger the caching by applying the count method on the dataframe.

In [None]:
# Create DataFrame bills
<FILL-IN> = spark.read.parquet(<FILL-IN>)
<FILL_IN>.cache()

print(<FILL-IN>.count())
<FILL-IN>

## Feature Engineering

Now, we want to **count all products per customerId and category** and **all prices per customerId and category**. Therefore, you can either use the **groupBy('column1', 'column2')** method followed by the method **agg({'columnA': 'functionA', 'columnB': 'functionB'})** or you can **register** the dataframe bills_exploded as a **temporary table** by issuing the command **DataFrame.registerTempTable("tableName")** and solve the task with **spark.sql("SQL-Code")**. Finally, **rename the columns** sum(price) and count(category) to pricePerCategory and itemsPerCategory, respectively, by using the method **DataFrame.withColumnRenamed("oldName", "newName")**. Please name your final dataframe **bills_aggredated**.

In [None]:
### Solution 1 ###

bills_aggregated = bills_exploded.<FILL-IN>("customerId", <FILL-IN>)\
    .agg({<FILL-IN>: "count", "price": <FILL-IN>})\
    .withColumnRenamed(<FILL-IN>, "pricePerCategory")\
    .withColumnRenamed("count(category)", <FILL-IN>)


### Solution 2 ###

## Register Temp Table
#billsProducts.registerTempTable(<FILL IN>)
#billsAggregated = sqlContext.sql(<FILL IN>)

# Cachen Dataframe
bills_aggregated.cache()

# Print first twenty rows
bills_aggregated.show()

Next, we want to extract a list of all available categories in the dataframe. Please, **select** the **category** and extract the unique elements. Afterwards, access the underlying rdd attribute of the dataframe, use a flatMap to flatten the rdd elements and apply the collect method to get a list.

**Hint**: You can use the method *distinct()* (in Pandas it was unique()).

In [None]:
# get list of categories to speed up pivot (see below)
categoryList = bills_aggregated.<FILL-IN>('category').<FILL-IN>.rdd.flatMap(lambda x: x).collect()

categoryList

The next step is a bit more complicated. We want to transform each category to a column. As the column values we want to use the number of purchased items per customer. Therefore, we can use the **pivot** method. A pivot is an aggregation for which different entries of a grouped column will be transposed into separate columns. This method is very useful for data analysis. If you want to know more about pivot methods you can find further details here [Pivot-DataBricks](https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html).

**Remark**: *pivot* can only be applied after a *groupBy*. Afterwards, you have to perform an aggregation, e.g. with *agg*. In our case the sum only contains one element, since for each product category there is only one entry per customerId in the dataframe. We could have combined the pivot and the previous aggregation. Since some customers do not buy products from every category we get null values, which we fill with 0. Finally, we cache the dataframe.
By the way, the previously created list helps us to speed up the execution of the pivot command. 

In [None]:
# pivot
bills_agg_trans = bills_aggregated.groupBy("customerId")\
    .pivot("category").sum("itemsPerCategory")\
    .na.fill(0).cache()

# print customerId, BACKWAREN and MOLKE_EI
bills_agg_trans.select("customerId", "BACKWAREN", "MOLKE_EI").show(10)

In [None]:
# Combination of the previous two steps
bills_agg_trans_alt = bills_exploded.groupBy("customerId")\
    .pivot("category").agg({"category": "count"})\
    .na.fill(0).cache()
    
bills_agg_trans_alt.select("customerId", "BACKWAREN", "MOLKE_EI").show(10)

Please **print the Schema** of the bills_agg_trans dataframe.

In [None]:
# print schema
<FILL-IN>

Compute the total price and total number of shoppings per customer. Therefore, you can use the **bills_exploded** dataframe and the methods **groupBy** and **agg**. Please, rename the columns to **total_shoppings** and **total_price**. Afterwards, cache the dataframe and trigger the caching via a count method. The final dataframe should be called **shoppings_df**.

**Hint**: For the aggregation you can either use agg({...}) followed by withColumnRenamed, or you can use agg(F.sum(column).alias('newName'), F.count()....) to directly rename the column inside the aggregate method. If you want you can also use the SQL solution. Don't forget to register the table first.

In [None]:
from pyspark.sql import functions as F

# compute total price and total number of shoppings per customer
<FILL-IN> = bills_exploded.groupBy(<FILL-IN>).agg(F.count("customerId").alias(<FILL-IN>),
                                             F.sum(<FILL-IN>).alias('total_price')).cache()
shoppings_df.count()
shoppings_df.show()

Join the **shoppings_df** dataframe with the **bills_agg_trans** dataframe by using the join method like **df1.join(df2, on='column')**. Please join the two dataframes over the column **customerId**.

In [None]:
# join
df = shoppings_df.<FILL-IN>.cache()
df.count()
df.show(5)

In [None]:
# print schema
df.<FILL-IN>

### Feature Extraction and Preparation

Now we have modified our dataset so that we can start with feature extraction and preparation. For the clustering algorithm we want to use all product categories and the total price as features.

Before we proceeed we import the following modules.

In [None]:
# Spark ML
from pyspark.ml.feature import VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.feature import SQLTransformer

# Spark MLlib
from pyspark.mllib.linalg import Vectors, VectorUDT

# Matplotlib, Pandas and Numpy
import matplotlib.pyplot as plt
plt.style.use('ggplot')
plt.style.use('seaborn-pastel')
import numpy as np
import pandas as pd

### Vector Assembler

To combine product category columns to a feature vector we can use the **VectorAssembler** Class. Please create an object of that class called **categoryAssembler**. Please set the constructor agument **inputCols** to the list of all categories, i.e. **categoryList**, and the outputCol to **"productCategoryFeatures"**.

In [None]:
# combine product category columns
categoryAssembler = <FILL-IN>

If you want, you can try out the transform method of the vectorAssembler object. As the input of the method please use the dataframe df. Later on we will combine all feature extraction and preparation steps to a so called machine learning pipeline.

In [None]:
<FILL-IN>.transform(df).select('productCategoryFeatures').show()

### Normalizing the Feature Vector
The generated feature vector contains the total amount of products per category which the customer has bought during all his shoppings. Since KMeans uses distances we would cluster clients which shop more often than others together. However, this is not what we want. Hence, we normalize the feature vector so that we get the proportion of items per category that a client has bought during all his shoppings. This can be achieved by using a normalizer which uses the L1 norm to our vector.

Example:

Imagine a client has bought 10 Milchprodukte, 5 Fleischprodukte and 15 Obstprodukte. The normalizer would give us the following vector:

$\vec{v} = [10,5,15] \rightarrow |\vec{v}|_{L_1} = 10 + 5 + 15 = 30 $  

$\vec{v}_{norm} = \vec{v}/30 = [0.33,0.167,0.5]$

To use the normalizer please create an object of the class Normalizer and set the arguments inputCol to productCategoryFeatures, outputCol to normCategoryFeatures and p to 1.0. Call the object normalizer.

In [None]:
# normalizer using L_1 Norm (normalize by sum)
<FILL-IN> = Normalizer(inputCol=<FILL-IN>, outputCol="normCategoryFeatures",
                        <FILL-IN>)

### Custom SQL Transformer and MinMax Scaler

Next, we want to construct another feature: the average price per shopping of a client. Therefore, we use the SQLTransformer class. Please just execute the statement in the following cell.

In [None]:
# custom sql transformer
sqlTrans = SQLTransformer(
    statement="SELECT *, total_price / total_shoppings as norm_price FROM __THIS__")

To create another feature vector we construct again a VectorAssembler object.

In [None]:
# transform price feature to vector; read and execute
priceAssembler = VectorAssembler(inputCols=['norm_price'],
                                  outputCol="normPriceFeature")

Since this feature is on a different scale than the previously created features we have to scale/standardize it. Please **create an object called scaler of the MinMaxScaler** class. As the parameters use **inputCol="normPriceFeature" and outputcol="scaledPrice"**. 

This scaler does the following:

Assume the column contains the values [5,10,3,4]. The max is 10 and the min is 3. All elements will be transformed by using the formula:

$$ z_i = \frac{x_i - min}{max - min}$$

and hence the range of the feature is between 0 and 1.

**Remark**: Scaling along a column is called Standarizing and along a row or a feature vector is called normalizing. Often people do not strictly use this differentiation.

In [None]:
# minMax scaler to scale column to get similar distances (range[0,1])
scaler = MinMaxScaler(<FILL-IN>)

Finally, we combine all features with another vector assembler.

In [None]:
# combine all feature columns; read and execute
assembler = VectorAssembler(inputCols=['normCategoryFeatures', 'scaledPrice'],
                                  outputCol="features")

### KMeans Model
As the next step we need to instanciate the KMeans model. Hence, **create an object of the KMeans class called kmeans**. As the **k** (the number of clusters) use the number **4**, as the featuresCol use "features" and as the predictionCol use "cluster".

In [None]:
# clustering model
kmeans = KMeans(<FILL-IN>)

### Pipeline

Next, we create a machine learning pipeline. All previously defined transformations will be automatically applied if we pass the dataframe to the fit method of the pipeline.

As the stages argument please pass a list of all needed steps in the correct order. The elements are:
**kmeans, scaler, priceAssembler, sqlTrans, categoryAssembler, assembler, normalizer**.

In [None]:
# build pipeline
pipeline = Pipeline(stages=[<FILL-IN>])

Use the fit method of the pipeline on the dataframe df.

In [None]:
# fit model
model = pipeline.<FILL-IN>

Finally, we can make predictions. Therefore, we can use the transform method of the model. This yields a new dataframe. Please call the result predictions. Cache the dataframe and trigger the caching via the count method.

In [None]:
# make predictions
predictions = model.<FILL-IN>(df)
predictions.<FILL-IN>.count()
predictions.select('customerId', 'features', 'cluster').show()

### Explore the Clusters

We can extract the cluster centroids from the kmeans model. Therefore, we have to extract the model from the pipeline. Please just read and execute the cells below.

In [None]:
# get cluster centers

# extract kmeans model
kmeansModel = model.stages[-1]

# extract centers
centers = kmeansModel.clusterCenters()

In [None]:
# TICKS ARE WRONG
# barplot with cluster centers
%matplotlib inline
plt.rcParams["figure.figsize"] = (25, 8)
centers
ind = np.arange(14)  # the x locations for the groups
width = 0.2   
fig, ax = plt.subplots()
rects1 = ax.bar(ind, (centers[0][0:14]), width, color='r')
rects2 = ax.bar(ind + width, (centers[1][0:14]), width, color='y')
rects3 = ax.bar(ind + 2*width, (centers[2][0:14]), width, color='b')
rects4 = ax.bar(ind + 3*width, (centers[3][0:14]), width, color='g')

ax.set_ylabel('Anteil')
ax.set_title('Warenanteil pro Cluster')
ax.set_xticks(ind + width)
ax.set_xticklabels(categoryList, rotation=45)


ax.legend((rects1[0], rects2[0], rects3[0], rects4[0]), ('Cluster 0', 'Cluster 1','Cluster 2','Cluster 3'))


plt.show()

Describe the plot above.  

We can also compute the total revenue of the clusters:

In [None]:
# total revenue
total_revenue = predictions.agg(F.sum('total_price')).rdd.flatMap(lambda x: x).collect()[0]

# revenue per cluster
predictions.groupBy('cluster').agg(F.count('*').alias('number_of_customers'),
                                   (F.sum('total_price')/total_revenue).alias('revenue_per_cluster')).show()

### Adding user Information

Finally we want to add user information. Please read the json file user.json and name the dataframe user_df.

In [None]:
<FILL-IN> = spark.read.json(<FILL-IN>)

Join the predictions and user_df dataframe over.

In [None]:
df = predictions.join(user_df.withColumnRenamed('Id', 'customerId'), on='customerId')

This dataframe is not that large anymore. Hence, we can transform it to Pandas by using the method **toPandas()**.

In [None]:
df_pandas = df.select('customerId', 'age','cluster').<FILL-IN>

Check the memory usage of the pandas dataframe.

In [None]:
# check memory


Plot the age distribution of the different clusters and describe the plot.

Hint: You can use the boxplot function of the seaborn module.

In [None]:
import seaborn as sns

In [None]:
# boxplot


**This is the end of the exercise!**