<a href="https://colab.research.google.com/github/annguyenhuynh/PySpark/blob/main/Spark_Chapter3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install spark
!pip install pyspark

Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/41.0 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m41.0/41.0 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: spark
  Building wheel for spark (setup.py) ... [?25l[?25hdone
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58748 sha256=9b7738efd37190cfeff6642d0158af1c668609092fe56cdf01b4ae9c66af7cdd
  Stored in directory: /root/.cache/pip/wheels/63/88/77/b4131110ea4094540f7b47c6d62a649807d7e94800da5eab0b
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("/content/drive/MyDrive/databricks Spark-The-Definitive-Guide master data-retail-data_by-day/*.csv")

In [5]:
df.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

In [6]:
df.createOrReplaceTempView("retail_data")
schema = df.schema

In [7]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [8]:
from pyspark.sql.functions import *

In [9]:
df\
.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
.groupBy(
    col("CustomerID"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)

+----------+--------------------+-----------------+
|CustomerID|              window|  sum(total_cost)|
+----------+--------------------+-----------------+
|   16057.0|{2011-12-05 00:00...|            -37.6|
|   14126.0|{2011-11-29 00:00...|643.6300000000001|
|   13500.0|{2011-11-16 00:00...|497.9700000000001|
|   17160.0|{2011-11-08 00:00...|516.8499999999999|
|   15608.0|{2011-11-11 00:00...|            122.4|
+----------+--------------------+-----------------+
only showing top 5 rows



### Code explain
* **selectExpr** allows SQL expressions
* **window(col("InvoiceDate"), "1 day")** creates a daily time window for each InvoiceDate, enabling aggregations on a per-day basis.

In [10]:
spark.conf.set("spark.sql.shuffle.partitions","5")

In [11]:
streamDF = spark.readStream\
  .schema(schema)\
  .option("maxFilesPerTrigger",1)\
  .format("csv")\
  .option("header","true")\
  .load("/content/drive/MyDrive/2010-12-01.csv")


In [12]:
streamDF.isStreaming

True

In [13]:
purchaseByCustomerPerHour = streamDF\
.selectExpr(
    "CustomerID",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
.groupBy(col("CustomerID"),window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")

In [14]:
purchaseByCustomerPerHour.writeStream\
  .format("memory")\
  .queryName("customer_purchases")\
  .outputMode("complete")\
  .start()

<pyspark.sql.streaming.query.StreamingQuery at 0x7a5ebfdc23e0>

In [15]:
spark.sql("""
    select *
    from customer_purchases
    order by `sum(total_cost)` desc
    """)\
    .show(5)

+----------+------+---------------+
|CustomerID|window|sum(total_cost)|
+----------+------+---------------+
+----------+------+---------------+



In [16]:
preppedData = df\
.na.fill(0)\
.withColumn("dayofweek", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

In [17]:
preppedData.show()

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|dayofweek|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+---------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|   Monday|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|   Monday|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|   Monday|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|   Monday|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|   Monday|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-

In [18]:
trainDF = preppedData\
  .where("InvoiceDate < '2011-07-01' ")
testDF = preppedData\
  .where("InvoiceDate >= '2011-07-01' ")

In [19]:
trainDF.count()

245903

In [20]:
testDF.count()

296006

In [21]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

indexer = StringIndexer()\
  .setInputCol("dayofweek")\
  .setOutputCol("day_of_week_index")

In [22]:
encoder = OneHotEncoder()\
  .setInputCol("day_of_week_index")\
  .setOutputCol("day_of_week_encoded")

In [23]:
vectorAssembler = VectorAssembler()\
  .setInputCols(["UnitPrice","Quantity","day_of_week_encoded"])\
  .setOutputCol("features")

In [24]:
from pyspark.ml import Pipeline

transformPipeline = Pipeline()\
  .setStages([indexer,encoder,vectorAssembler])

In [25]:
fittedPipeline = transformPipeline.fit(trainDF)

In [26]:
transformedData = fittedPipeline.transform(trainDF)

In [27]:
transformedData.cache() # We cache data b/c we will use this multiple times

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: timestamp, UnitPrice: double, CustomerID: double, Country: string, dayofweek: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]

In [28]:
from pyspark.ml.clustering import KMeans

In [29]:
kmeans = KMeans()\
  .setK(20)\
  .setSeed(42)

In [30]:
kmModel = kmeans.fit(transformedData)

In [31]:
transformedTest = fittedPipeline.transform(testDF)

In [32]:
# Parallelize data RDDs
from pyspark.sql import Row


In [33]:
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()

DataFrame[_1: bigint]