In [1]:
from pyspark.sql import SparkSession


In [2]:
spark = SparkSession.builder.appName('Datacamp Pyspark Tutorial').config('spark.memory.offHeap.enabled', 'true').config('spark.memory.offHeap.size', '10g').getOrCreate()


In [3]:
df = spark.read.csv('datacamp_ecommerce.csv', header=True, escape="\"", sep=';')


In [4]:
df.show(5, 0)


+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |
+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01.12.2010 08:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN                |6       |01.12.2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01.12.2010 08:26|2.75     |17850     |United Kingdom|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01.12.2010 08:26|3.39     |17850     |United Kingdom|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |01.12.2010 08:26|3.39     |17850     |United Kingdom|
+---------+---------+-------------------

In [5]:
df.count()


541909

In [6]:
df.select('CustomerID').distinct().count()


4373

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *


In [8]:
df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).show()


+------------------+-------------+
|           Country|country_count|
+------------------+-------------+
|            Sweden|            8|
|         Singapore|            1|
|           Germany|           95|
|               RSA|            1|
|            France|           87|
|            Greece|            4|
|European Community|            1|
|           Belgium|           25|
|           Finland|           12|
|             Malta|            2|
|       Unspecified|            4|
|             Italy|           15|
|              EIRE|            3|
|         Lithuania|            1|
|            Norway|           10|
|             Spain|           31|
|           Denmark|            9|
|         Hong Kong|            0|
|            Israel|            4|
|           Iceland|            1|
+------------------+-------------+
only showing top 20 rows



In [14]:
df.groupBy('Country').agg(countDistinct('CustomerID').alias('country_count')).orderBy(desc('country_count')).show()


+---------------+-------------+
|        Country|country_count|
+---------------+-------------+
| United Kingdom|         3950|
|        Germany|           95|
|         France|           87|
|          Spain|           31|
|        Belgium|           25|
|    Switzerland|           21|
|       Portugal|           19|
|          Italy|           15|
|        Finland|           12|
|        Austria|           11|
|         Norway|           10|
|        Denmark|            9|
|Channel Islands|            9|
|      Australia|            9|
|    Netherlands|            9|
|         Sweden|            8|
|         Cyprus|            8|
|          Japan|            8|
|         Poland|            6|
|         Greece|            4|
+---------------+-------------+
only showing top 20 rows



In [17]:
spark.sql('set spark.sql.legacy.timeParserPolicy=LEGACY')

df = df.withColumn('date', to_timestamp('InvoiceDate', 'yy/MM/dd HH:mm'))
df.select(max('date')).show()


+---------+
|max(date)|
+---------+
|     NULL|
+---------+



In [18]:
df.select(min('date')).show()


+---------+
|min(date)|
+---------+
|     NULL|
+---------+



In [19]:
df.show(5, 0)


+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+----+
|InvoiceNo|StockCode|Description                        |Quantity|InvoiceDate     |UnitPrice|CustomerID|Country       |date|
+---------+---------+-----------------------------------+--------+----------------+---------+----------+--------------+----+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER |6       |01.12.2010 08:26|2.55     |17850     |United Kingdom|NULL|
|536365   |71053    |WHITE METAL LANTERN                |6       |01.12.2010 08:26|3.39     |17850     |United Kingdom|NULL|
|536365   |84406B   |CREAM CUPID HEARTS COAT HANGER     |8       |01.12.2010 08:26|2.75     |17850     |United Kingdom|NULL|
|536365   |84029G   |KNITTED UNION FLAG HOT WATER BOTTLE|6       |01.12.2010 08:26|3.39     |17850     |United Kingdom|NULL|
|536365   |84029E   |RED WOOLLY HOTTIE WHITE HEART.     |6       |01.12.2010 08:26|3.39     |17850     |United Kingdom|NULL|


In [22]:
df = df.withColumn("from_date", lit("12/1/10 08:26"))
df = df.withColumn('from_date',to_timestamp("from_date", 'yy/MM/dd HH:mm'))

df2 = df.withColumn('from_date',to_timestamp(col('from_date'))).withColumn('recency',col("date").cast("long") - col('from_date').cast("long"))


In [23]:
df2 = df2.join(df2.groupBy('CustomerID').agg(max('recency').alias('recency')),on='recency',how='leftsemi')
df2.show(5, 0)


+-------+---------+---------+-----------+--------+-----------+---------+----------+-------+----+---------+
|recency|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|date|from_date|
+-------+---------+---------+-----------+--------+-----------+---------+----------+-------+----+---------+
+-------+---------+---------+-----------+--------+-----------+---------+----------+-------+----+---------+



In [24]:
df2.printSchema()


root
 |-- recency: long (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- from_date: timestamp (nullable = true)



In [25]:
df_freq = df2.groupBy('CustomerID').agg(count('InvoiceDate').alias('frequency'))
df_freq.show(5, 0)


+----------+---------+
|CustomerID|frequency|
+----------+---------+
+----------+---------+



In [26]:
df3 = df2.join(df_freq, on='CustomerID', how='inner')
df3.printSchema()


root
 |-- CustomerID: string (nullable = true)
 |-- recency: long (nullable = true)
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- from_date: timestamp (nullable = true)
 |-- frequency: long (nullable = false)



In [27]:
m_val = df3.withColumn('TotalAmount', col('Quantity') * col('UnitPrice'))
m_val = m_val.groupBy('CustomerID').agg(sum('TotalAmount').alias('monetary_value'))


In [28]:
finaldf = m_val.join(df3, on='CustomerID', how='inner')
finaldf = finaldf.select(['recency', 'frequency', 'monetary_value', 'CustomerID']).distinct()


In [29]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler


In [32]:
# ERROR
#assemble = VectorAssembler(inputCols=['recency', 'frequency', 'monetary_value'], outputCol='features')
#assembled_data = assemble.transform(finaldf)

#scale = StandardScaler(inputCol='features', outputCol='standardized')

#data_scale = scale.fit(assembled_data)
#data_scale_output = data_scale.transform(assembled_data)


In [92]:
# ERROR
#data_scale_output.select('standarized').show(2, truncate=False)


In [93]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator


In [94]:
# ERROR
#cost = np.zeros(10)

#evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', metricName='silhouette', distanceMeasure='squaredEuclidean')

#for i in range(2, 10):
#    KMeans_algo = KMeans(featuresCol='standardized', k=i)
#    KMeans_fit = KMeans_algo.fit(data_scale_output)

#    output = KMeans_fit.transform(data_scale_utput)
#    cost[i] = KMeans_fit.summary.trainingCost


In [34]:
import pandas as pd
import pylab


In [35]:
# ERROR
#df_cost = pd.DataFrame(cost[2:])
#df_cost.columns = ['cost']
#new_col = range(2, 10)

#df_cost.insert(0, 'cluster', new_col)

#pylab.plot(df_cost.cluster, df_cost.cost)
#pylab.xlabel('Number of Clusters')
#pylab.ylabel('Score')
#pylab.title('Elbow Curve')
#pylab.show()


NameError: name 'cost' is not defined

In [36]:
# ERROR
#KMeans_algo = KMeans(featuresCol='standardized', k=4)
#KMeans_fit = KMeans_algo.fit(data_scale_output)


In [37]:
# ERROR
#preds = KMeans_fit.transform(data_scale_output)
#preds.show(5, 0)


In [38]:
import matplotlib.pyplot as plt
import seaborn as sns


In [40]:
# ERROR
#df_viz = preds.select('recency', 'frequency', 'monetary_value', 'prediction')
#df_viz = df_viz.toPandas()

#avg_df = df_viz.groupby(['prediction'], as_index=False).mean()

#list1 = ['recency', 'frequency', 'monetary_value']

#for i in list1:
#    sns.barplot(x='prediction', y=str(i), data=avg_df)
#    plt.show()
