In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import DateType, ArrayType

In [None]:
#sc._conf.getAll()

In [None]:
config = SparkConf().setAll([
    ('spark.executor.cores', '2'), ('spark.executor.memory', '4g'), ('spark.driver.memory','4g'), ('spark.submit.deployMode','client')
])
sc.stop()
sc = SparkContext(conf=config)

#sc = SparkContext()
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [None]:
df = sqlContext.read.csv('ecommerce-data.csv', header=True)

df = df.na.drop()
df = df.withColumn("CustomerID", df["CustomerID"].cast('int'))
df = df.withColumn("Quantity", df["Quantity"].cast('int'))
df = df.withColumn("UnitPrice", df["UnitPrice"].cast('float'))

df = df.withColumn("Date", to_date(col("InvoiceDate"),"MM/dd/yyyy"))

df = df.filter((df['Quantity'] > 0) & (df['UnitPrice'] > 0))

df = df.withColumn('TotalPrice', df['Quantity'] * df['UnitPrice'])
df.count()

#df.show()
#df.show(df.count())

In [None]:
import datetime as dt
now = to_date(lit(dt.date(2011,12,9)))

recency_df = df.groupby('CustomerID').agg(max('Date').alias('LastPurchaseDate'))
recency_df = recency_df.withColumn("Diff", datediff(now, to_date(recency_df['LastPurchaseDate'])))
recency_df = recency_df.groupby('CustomerID').agg(min('Diff').alias('Recency'))

frequency_df = df.groupby('CustomerID').agg(count('InvoiceNo').alias('Frequency'))

monetary_df = df.groupby('CustomerID').agg(sum('TotalPrice').alias('Monetary'))

rfm = recency_df.join(frequency_df, on=['CustomerID'], how='inner')
rfm = rfm.join(monetary_df, on=['CustomerID'], how='inner')
#rfm.show()

In [None]:
def get_outliers(rfm, column):
    quantiles = rfm.stat.approxQuantile(column, [0.05, 0.95], 0.0)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lowerRange = Q1 - 1.5 * IQR
    upperRange = Q3 + 1.5 * IQR
    rfm.filter((rfm[column] < lowerRange) | (rfm[column] > upperRange)).show()

def remove_outliers(rfm, column):
    quantiles = rfm.stat.approxQuantile(column, [0.05, 0.95], 0.0)
    Q1 = quantiles[0]
    Q3 = quantiles[1]
    IQR = Q3 - Q1
    lowerRange = Q1 - 1.5 * IQR
    upperRange = Q3 + 1.5 * IQR
    rfm = rfm.filter((rfm[column] >= lowerRange) | (rfm[column] <= upperRange))

#get_outliers(rfm, 'Recency')
#get_outliers(rfm, 'Frequency')
#get_outliers(rfm, 'Monetary')
remove_outliers(rfm, 'Recency')
remove_outliers(rfm, 'Frequency')
remove_outliers(rfm, 'Monetary')

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['Recency', 'Frequency', 'Monetary'],
    outputCol="features")

rfm_feat = rfm.withColumnRenamed('CustomerID', 'id')

rfm_feat = assembler.transform(rfm_feat).select('id', 'features')
#rfm_feat.show()

In [None]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
scalerModel = scaler.fit(rfm_feat)

rfm_final = scalerModel.transform(rfm_feat).select('id', 'scaledFeatures')
rfm_final = rfm_final.withColumnRenamed('scaledFeatures', 'features')
#rfm_final.show()

In [None]:
def size_of_partition(map_of_rows):
    list_of_rows = list(map_of_rows)
    size_of_list = len(list_of_rows)
    return [size_of_list]

#df.rdd.mapPartitions(size_of_partition).collect()
#rfm_final.rdd.getNumPartitions()
#reparted_rdd = df.rdd.repartition(20)
#reparted_rdd.mapPartitions(size_of_partition).collect()
#reparted_rdd.map(lambda x: x).toDF().show(reparted_rdd.count())

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

n_clusters = 10
cost = np.zeros(n_clusters)
silh_val = []
silh_lst = []

for k in range(2, n_clusters):
    kmeans = KMeans().setK(k).setSeed(1).setMaxIter(50).setFeaturesCol("features")
    model = kmeans.fit(rfm_final)
    
    # elbow method
    cost[k] = model.summary.trainingCost
        
    predictions = model.transform(rfm_final)
    evaluator = ClusteringEvaluator()
    silhouette = evaluator.evaluate(predictions)
    silh_val.append(silhouette)


In [None]:
import numpy as np
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt
import seaborn as sbs
from matplotlib.ticker import MaxNLocator

fig, ax = plt.subplots(1,1, figsize =(10,5))
ax.plot(range(2, n_clusters),cost[2:n_clusters])
plt.xlabel('K - Clusters')
plt.ylabel('Cost')
plt.title('Elbow Curve')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
plt.show()

In [None]:
silh_array = np.asanyarray(silh_val)
silhouette = pd.DataFrame(list(zip(range(2, n_clusters),silh_array)),columns = ['K - Clusters', 'silhouette'])
silhouette

In [None]:
from pyspark.ml.clustering import KMeans

k = 3
kmeans = KMeans().setK(k).setSeed(1).setMaxIter(50).setFeaturesCol("features")
model = kmeans.fit(rfm_final)
predictions = model.transform(rfm_final)
predictions = predictions.withColumnRenamed('id', 'CustomerID')
predictions = predictions.withColumnRenamed('prediction', 'Cluster')
predictions = predictions.select(['CustomerID', 'Cluster'])

In [None]:
model.summary.clusterSizes

In [None]:
model.clusterCenters()

In [None]:
df_cluster = df.join(predictions, on=['CustomerID'], how='inner')
#df_cluster.show()

In [None]:
df_cluster.write.csv('user_ecommerce-data.csv', header=True)