In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
!pip install pyspark

In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Book').getOrCreate()

In [4]:
spark

In [5]:
# Creating a range of numbers. This range of numbers is just like a named column in a spreadsheet
# This range of numbers represents a distributed collection
# When run on a cluster, each part of this range of numbers exists on a different executor
#  This is a Spark DataFrame
    
myRange = spark.range(1000).toDF("number")

In [6]:
myRange

In [7]:
myRange.show()

In [8]:
type(myRange)

In [9]:
myRange.printSchema()

In [10]:
divisBy2 = myRange.where("number % 2 = 0")

In [11]:
divisBy2.show()

In [12]:
divisBy2.count()

In [13]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("../input/apchesparkbook/2015-summary.csv")


In [14]:
flightData2015.take(3)

In [15]:
flightData2015.head(3)

In [16]:
flightData2015.show()

In [17]:
flightData2015.sort("count").explain()

In [18]:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)

**DataFrames and SQL**

In [19]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [20]:
# in Python
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")

dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

In [21]:
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)

In [22]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()

In [23]:
### Book page 38
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()

In [24]:
# in Python
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()


**Structured Streaming**

In [25]:
# in Python
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("../input/apchesparkbook/by-day/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [26]:
staticDataFrame.show()

In [27]:
'''Because we’re working with time–series data, it’s worth mentioning how we might go along grouping
and aggregating our data. In this example we’ll take a look at the sale hours during which a given
customer (identified by CustomerId) makes a large purchase. For example, let’s add a total cost
column and see on what days a customer spent the most.'''

# in Python
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)


In [28]:
streamingDataFrame = spark.readStream\
    .schema(staticSchema)\
    .option("maxFilesPerTrigger", 1)\
    .format("csv")\
    .option("header", "true")\
    .load("../input/apchesparkbook/by-day/by-day/*.csv")

In [29]:
streamingDataFrame.isStreaming 

In [30]:
# in Python
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")


In [31]:
# in Python
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()

In [32]:
# in Python
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)

**MLLib Library**

In [33]:
staticDataFrame.printSchema()

In [34]:
# in Python
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)

In [35]:
# in Python
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")

In [36]:
trainDataFrame.count()

In [37]:
testDataFrame.count()

In [38]:
# in Python
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")

In [39]:
testDataFrame.show()

In [40]:
indexer

In [41]:
# in Python
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")

In [42]:
# in Python
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")

In [43]:
# Next, we’ll set this up
# into a pipeline so that any future data we need to transform can go through the exact same process

# in Python
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])

In [44]:
# in Python
fittedPipeline = transformationPipeline.fit(trainDataFrame)

In [45]:
# in Python
transformedTraining = fittedPipeline.transform(trainDataFrame)

In [46]:
transformedTraining.cache()

In [47]:
# in Python
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1)

In [48]:
# in Python
kmModel = kmeans.fit(transformedTraining)