## PYSPARK

### Imports

In [37]:
from pyspark.sql.functions import window, column, desc, col,date_format
from pyspark.ml.feature import StringIndexer, OneHotEncoder,VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.sql import Row
import pandas as pd

### SparkContext

In [38]:
spark

### Configure Spark to reduce the output sufflle partitions from 200 to 5

In [39]:
spark.conf.set('spark.sql.shuffle.partitions', '5') 

### Reading Data

In [40]:
data=spark\
    .read\
    .option('inferSchema', 'true')\
    .option('header', 'true')\
    .csv('2015-summary.csv')

In [41]:
data.take(3)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]

### Sort Data

In [42]:
data.sort('count').take(10)

[Row(DEST_COUNTRY_NAME='Suriname', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Estonia', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Cyprus', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
 Row(DEST_COUNTRY_NAME='Zambia', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='Burkina Faso', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Gibraltar', count=1),
 Row(DEST_COUNTRY_NAME='Djibouti', ORIGIN_COUNTRY_NAME='United States', count=1),
 Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1)]

### Create a view

In [43]:
data.createOrReplaceTempView('data')

### Create a SQL query

In [44]:
sql = spark.sql('''SELECT DEST_COUNTRY_NAME, count(1) FROM data GROUP BY DEST_COUNTRY_NAME''')

In [45]:
spark.sql('SELECT max(count) from data').take(1)

[Row(max(count)=370002)]

In [46]:
maxSql = spark.sql('''SELECT DEST_COUNTRY_NAME, sum(count) as destination_total FROM data GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5''')
maxSql.collect()

[Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

### Working with structured streaming

In [47]:
staticDataFrame = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('by-day/*.csv')
staticDataFrame.createOrReplaceTempView('retail_data')
staticSchema = staticDataFrame.schema

In [48]:
streamingData = spark.readStream\
    .schema(staticSchema)\
    .option('maxFilesPerTrigger', 1)\
    .format('csv')\
    .option('header', 'true')\
    .load('by-day/*.csv')

In [49]:
sql = spark.sql('''SELECT * FROM retail_data''').show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [50]:
sql2 = spark.sql('''SELECT CustomerId, InvoiceDate, sum(UnitPrice * Quantity) as total_cost FROM retail_data GROUP BY CustomerId, InvoiceDate  ORDER BY total_cost DESC''')
sql2.show(5)

+----------+-------------------+------------------+
|CustomerId|        InvoiceDate|        total_cost|
+----------+-------------------+------------------+
|   16446.0|2011-12-09 09:15:00|          168469.6|
|   12346.0|2011-01-18 10:01:00|           77183.6|
|      null|2011-11-07 17:42:00| 52940.93999999999|
|      null|2011-11-14 17:55:00|50653.909999999996|
|   15098.0|2011-06-10 15:28:00|           38970.0|
+----------+-------------------+------------------+
only showing top 5 rows



In [51]:
purchaseByCustomerPerHour = streamingData\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")

In [52]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

IllegalArgumentException: Cannot start query with name customer_purchases as a query with that name is already active in this SparkSession

### Machine Learning

In [53]:
data = staticDataFrame\
  .na.fill(0)\
  .withColumn('day_of_week', date_format(col('InvoiceDate'), 'EEEE'))\
  .coalesce(5)
data.take(5)

[Row(InvoiceNo='580538', StockCode='23084', Description='RABBIT NIGHT LIGHT', Quantity=48, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.79, CustomerID=14075.0, Country='United Kingdom', day_of_week='Monday'),
 Row(InvoiceNo='580538', StockCode='23077', Description='DOUGHNUT LIP GLOSS ', Quantity=20, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.25, CustomerID=14075.0, Country='United Kingdom', day_of_week='Monday'),
 Row(InvoiceNo='580538', StockCode='22906', Description='12 MESSAGE CARDS WITH ENVELOPES', Quantity=24, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.65, CustomerID=14075.0, Country='United Kingdom', day_of_week='Monday'),
 Row(InvoiceNo='580538', StockCode='21914', Description='BLUE HARMONICA IN BOX ', Quantity=24, InvoiceDate='2011-12-05 08:38:00', UnitPrice=1.25, CustomerID=14075.0, Country='United Kingdom', day_of_week='Monday'),
 Row(InvoiceNo='580538', StockCode='22467', Description='GUMBALL COAT RACK', Quantity=6, InvoiceDate='2011-12-05 08:38:00', UnitPrice=2.55

### Split the data in train and test

In [54]:
train = data.where("InvoiceDate < '2011-07-01'")
test = data.where("InvoiceDate >= '2011-07-01'")

In [55]:
train.count()

245903

In [56]:
test.count()

296006

### Transform the days of weeks in numerical values

In [57]:
indexer = StringIndexer()\
  .setInputCol("day_of_week")\
  .setOutputCol("day_of_week_index")

In [58]:
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [59]:
vectorAssembler = VectorAssembler()\
  .setInputCols(['UnitPrice', 'Quantity', 'day_of_week_encoded'])\
  .setOutputCol('features')


In [60]:
transformationPipeline = Pipeline()\
  .setStages([indexer, encoder, vectorAssembler])

In [61]:
fittedPipeline = transformationPipeline.fit(train)


In [62]:
transformedTraining = fittedPipeline.transform(train)
transformedTraining.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [63]:
transformedTesting = fittedPipeline.transform(test)

### Run Kmeans Algorithm

In [64]:
kmeans = KMeans()\
  .setK(20)\
  .setSeed(1)

In [65]:
kmModel = kmeans.fit(transformedTraining)


In [66]:
predictions = kmModel.transform(transformedTesting)

In [67]:
transformedTest = fittedPipeline.transform(test)

In [68]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

In [69]:
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))



Silhouette with squared euclidean distance = 0.5427938390491535


In [70]:
print("Cluster Centers: ")
centroid=[]
centers = kmModel.clusterCenters()
for center in centers:
    centroid.append(center)
    print(center)

Cluster Centers: 
[4.09293606 2.73959977 0.18896861 0.19629835 0.18589279 0.16698473
 0.14731972]
[1.0400e+00 7.4215e+04 0.0000e+00 1.0000e+00 0.0000e+00 0.0000e+00
 0.0000e+00]
[ 1.0400e+00 -7.4215e+04  0.0000e+00  1.0000e+00  0.0000e+00  0.0000e+00
  0.0000e+00]
[ 3.897e+04 -1.000e+00  0.000e+00  0.000e+00  0.000e+00  0.000e+00
  1.000e+00]
[ 1.6670865e+04 -1.0000000e+00  0.0000000e+00  0.0000000e+00
  0.0000000e+00  1.0000000e+00  0.0000000e+00]
[ 7.5000e-03 -9.4045e+03  2.5000e-01  7.5000e-01  0.0000e+00  0.0000e+00
  0.0000e+00]
[ 7.385808e+03 -6.000000e-01  0.000000e+00  8.000000e-01  2.000000e-01
  0.000000e+00  0.000000e+00]
[ 1.94092118e+03 -1.76470588e-01  5.88235294e-02  1.76470588e-01
  4.11764706e-01  0.00000000e+00  3.52941176e-01]
[8.407500e-01 1.213475e+03 2.125000e-01 2.500000e-01 1.125000e-01
 2.000000e-01 1.625000e-01]
[ 2.91481481e-01 -1.34285185e+03  1.85185185e-01  2.96296296e-01
  1.85185185e-01  2.59259259e-01  7.40740741e-02]
[  1.41599732 114.6946616    0.2081

In [73]:
pandasDF=predictions.toPandas()
centers = pd.DataFrame(centroid)
centers

Unnamed: 0,0,1,2,3,4,5,6
0,4.092936,2.7396,0.188969,0.196298,0.185893,0.166985,0.14732
1,1.04,74215.0,0.0,1.0,0.0,0.0,0.0
2,1.04,-74215.0,0.0,1.0,0.0,0.0,0.0
3,38970.0,-1.0,0.0,0.0,0.0,0.0,1.0
4,16670.865,-1.0,0.0,0.0,0.0,1.0,0.0
5,0.0075,-9404.5,0.25,0.75,0.0,0.0,0.0
6,7385.808,-0.6,0.0,0.8,0.2,0.0,0.0
7,1940.921176,-0.176471,0.058824,0.176471,0.411765,0.0,0.352941
8,0.84075,1213.475,0.2125,0.25,0.1125,0.2,0.1625
9,0.291481,-1342.851852,0.185185,0.296296,0.185185,0.259259,0.074074
