In [None]:
staticDataFrame = spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("by-day/*.csv")

In [None]:
staticDataFrame.createOrReplaceGlobalTempView("retail_data")
staticSchema = staticDataFrame.schema

In [None]:
from pyspark.sql.functions import window, column, desc,col

In [None]:
staticDataFrame\
  .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
  .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
  .sum("total_cost")\
  .sort(desc("sum(total_cost)"))\
  .show(5)

In [None]:
spark.conf.set("spark.sql.shuffle.partitions", "5")

In [10]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger",1)\
    .format("csv")\
    .option("header", "true")\
    .load("by-day/*.csv")


In [11]:
streamingDataFrame.isStreaming

True

In [17]:
#the DataFrame
purchaseByCustomerPerHour = streamingDataFrame\
    .selectExpr(
    "CustomerId",
    "(UnitPrice * Quantity) as total_cost",
    "InvoiceDate")\
    .groupBy(
    col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
    .sum("total_cost")

In [19]:
purchaseByCustomerPerHour.writeStream\
    .format("memory")\
    .queryName("customer_purchases")\
    .outputMode("complete")\
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x11a84bf98>

In [25]:
spark.sql("""
    SELECT * 
    FROM customer_purchases
    ORDER BY 'sum(total_cost)' DESC
    """)\
    .show(5)

+----------+--------------------+------------------+
|CustomerId|              window|   sum(total_cost)|
+----------+--------------------+------------------+
|   15237.0|[2011-12-08 00:00...|              83.6|
|   16811.0|[2011-12-05 00:00...|             232.3|
|   12921.0|[2011-03-30 01:00...|-87.30000000000001|
|   17652.0|[2011-03-03 00:00...|             222.3|
|   14506.0|[2011-11-22 00:00...|496.91999999999996|
+----------+--------------------+------------------+
only showing top 5 rows



**Machine Learning and Advanced Analytics**

In [21]:
staticDataFrame.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [19]:
from pyspark.sql.functions import date_format, col

In [20]:
preppedDataFrame = staticDataFrame\
    .na.fill(0)\
    .withColumn("day_of_week", date_format(col("InvoiceDate"),"EEEE"))\
    .coalesce(5)

NameError: name 'staticDataFrame' is not defined

In [18]:
#Split th data into training and test sets
trainDataFrame = preppedDataFrame\
    .where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
    .where("InvoiceDate >= '2011-07-01'")

NameError: name 'preppedDataFrame' is not defined

In [29]:
trainDataFrame.count()
testDataFrame.count()

296006

In [8]:
from pyspark.ml.feature import StringIndexer

In [9]:
indexer = StringIndexer()\
    .setInputCol("day_of_week")\
    .setOutputCol("day_of_week_index")

In [10]:
# fix the days issue

In [11]:
from pyspark.ml.feature import OneHotEncoder

In [12]:
encoder = OneHotEncoder()\
    .setInputCol("day_of_week_index")\
    .setOutputCol("day_of_week_encoded")

In [13]:
from pyspark.ml.feature import VectorAssembler

In [14]:
vectorAssembler = VectorAssembler()\
    .setInputCols(["UnitPrice","Quantity", "day_of_week_encoded"])\
    .setOutputCol("features")

In [15]:
from pyspark.ml import Pipeline

The history saving thread hit an unexpected error (OperationalError('disk I/O error')).History will not be written to the database.


In [16]:
transformationPipeline = Pipeline()\
    .setStages([indexer, encoder, vectorAssembler])

In [17]:
fittedPipeline = transformationPipeline.fit(trainDataFrame)

NameError: name 'trainDataFrame' is not defined

In [None]:
transformedTraining.cache()

In [None]:
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
    .setK(20)\
    .setSeed(1)

In [None]:
kmModel = kmeans.fit(transformedTraining)

In [None]:
kmModel.computeCost(transformedTraining)

In [None]:
transforedTest = fittedPipeline.transform(testDataFrame)

kmModel.computeCost(transformedTest)

In [None]:
**Lower-Level APIs**

In [None]:
from pyspark.sql import Row

spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()