<p align="center">
    <img src="https://upload.wikimedia.org/wikipedia/commons/thumb/f/f3/Apache_Spark_logo.svg/2560px-Apache_Spark_logo.svg.png" width="20%"/>
</p>

# PySpark
---
```Bash
Author: Witchakorn Wanasanwongkot
```

## Spark
Spark is a distributed cluster computing framework for big data processing. It prevides a parallel abstraction for manipulating and processing the data.

In [9]:
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import Window

import pandas as pd
import numpy as np

pd.set_option('display.max_columns', None)

In [4]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [32]:
# Overview
df = spark.read.csv('data/superstore_dataset2011-2015.csv', header=True, inferSchema=True)
print((df.count(), len(df.columns)))
df.limit(3).toPandas()

(51290, 22)


Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,City,State,Country,Postal Code,Market,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit
0,42433,AG-2011-2040,01-01-11,06-01-11,Standard Class,TB-11280,Toby Braunhardt,Consumer,Constantine,Constantine,Algeria,,Africa,Africa,OFF-TEN-10000025,Office Supplies,Storage,"Tenex Lockers, Blue",408.3,2,0.0,106.14
1,22253,IN-2011-47883,01-01-11,08-01-11,Standard Class,JH-15985,Joseph Holt,Consumer,Wagga Wagga,New South Wales,Australia,,APAC,Oceania,OFF-SU-10000618,Office Supplies,Supplies,"Acme Trimmer, High Speed",120.366,3,0.1,36.036
2,48883,HU-2011-1220,01-01-11,05-01-11,Second Class,AT-735,Annie Thurman,Consumer,Budapest,Budapest,Hungary,,EMEA,EMEA,OFF-TEN-10001585,Office Supplies,Storage,"Tenex Box, Single Width",66.12,4,0.0,29.64


In [10]:
df.printSchema()

root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Postal Code: integer (nullable = true)
 |-- Market: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: double (nullable = true)
 |-- Shipping Cost: double (nullable = true)
 |-- Order Priority: string (nullable = true)



### Transform schema

In [34]:
cols = df.columns

types = [
    IntegerType(),
    StringType(),
    DateType(),
    DateType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    StringType(),
    DoubleType(),
    IntegerType(),
    DoubleType(),
    DoubleType(),
]

schema_list = []
for i in range(len(cols)):
    schema_list.append(StructField(cols[i], types[i]))

schema = StructType(schema_list)
print(schema_list[0], schema, sep='\n\n')

StructField('Row ID', IntegerType(), True)

StructType([StructField('Row ID', IntegerType(), True), StructField('Order ID', StringType(), True), StructField('Order Date', DateType(), True), StructField('Ship Date', DateType(), True), StructField('Ship Mode', StringType(), True), StructField('Customer ID', StringType(), True), StructField('Customer Name', StringType(), True), StructField('Segment', StringType(), True), StructField('City', StringType(), True), StructField('State', StringType(), True), StructField('Country', StringType(), True), StructField('Postal Code', StringType(), True), StructField('Market', StringType(), True), StructField('Region', StringType(), True), StructField('Product ID', StringType(), True), StructField('Category', StringType(), True), StructField('Sub-Category', StringType(), True), StructField('Product Name', StringType(), True), StructField('Sales', DoubleType(), True), StructField('Quantity', IntegerType(), True), StructField('Discount', DoubleType(), T

In [40]:
# Apply a new schema to the DataFrame and change the date format
df = spark.read.csv('data/superstore_dataset2011-2015.csv', header=True, schema=schema, dateFormat='dd-MM-yy')
print((df.count(), len(df.columns)))
df.limit(3).toPandas()

(51290, 22)


Unnamed: 0,Row ID,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,City,State,Country,Postal Code,Market,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit
0,42433,AG-2011-2040,2011-01-01,2011-01-06,Standard Class,TB-11280,Toby Braunhardt,Consumer,Constantine,Constantine,Algeria,,Africa,Africa,OFF-TEN-10000025,Office Supplies,Storage,"Tenex Lockers, Blue",408.3,2,0.0,106.14
1,22253,IN-2011-47883,2011-01-01,2011-01-08,Standard Class,JH-15985,Joseph Holt,Consumer,Wagga Wagga,New South Wales,Australia,,APAC,Oceania,OFF-SU-10000618,Office Supplies,Supplies,"Acme Trimmer, High Speed",120.366,3,0.1,36.036
2,48883,HU-2011-1220,2011-01-01,2011-01-05,Second Class,AT-735,Annie Thurman,Consumer,Budapest,Budapest,Hungary,,EMEA,EMEA,OFF-TEN-10001585,Office Supplies,Storage,"Tenex Box, Single Width",66.12,4,0.0,29.64


In [41]:
# Drop the unnecessary column
df = df.drop('Row ID')

## Selection & Filtering

In [42]:
# Select the specific column
df.select(['Order ID', 'Sales', 'Profit']).limit(10).toPandas()

Unnamed: 0,Order ID,Sales,Profit
0,AG-2011-2040,408.3,106.14
1,IN-2011-47883,120.366,36.036
2,HU-2011-1220,66.12,29.64
3,IT-2011-3647632,44.865,-26.055
4,IN-2011-47883,113.67,37.77
5,IN-2011-47883,55.242,15.342
6,IN-2011-30733,285.78,71.4
7,CA-2011-115161,290.666,3.4196
8,AO-2011-1390,206.4,92.88
9,ID-2011-56493,162.72,68.31


In [43]:
# Filter the data to display only entries with profits over 500
df.filter(df.Profit > 500).limit(5).toPandas()

Unnamed: 0,Order ID,Order Date,Ship Date,Ship Mode,Customer ID,Customer Name,Segment,City,State,Country,Postal Code,Market,Region,Product ID,Category,Sub-Category,Product Name,Sales,Quantity,Discount,Profit
0,CA-2011-131926,2011-06-01,2011-06-06,Second Class,DW-13480,Dianna Wilson,Home Office,Lakeville,Minnesota,United States,55044.0,US,Central,FUR-CH-10004063,Furniture,Chairs,Global Deluxe High-Back Manager's Chair,2001.86,7,0.0,580.5394
1,IT-2011-4243443,2011-11-01,2011-11-06,Second Class,DF-13135,David Flashing,Consumer,Slough,England,United Kingdom,,EU,North,TEC-CO-10000013,Technology,Copiers,"Brother Fax Machine, Laser",1592.7,5,0.0,652.95
2,CA-2011-120474,2011-12-01,2011-12-03,First Class,RP-19390,Resi P�lking,Consumer,Madison,Wisconsin,United States,53711.0,US,Central,FUR-CH-10001854,Furniture,Chairs,Office Star - Professional Matrix Back Chair w...,2807.84,8,0.0,673.8816
3,ES-2011-5125239,2011-12-01,2011-12-06,Standard Class,MG-17875,Michael Grace,Home Office,Genk,Limburg,Belgium,,EU,Central,TEC-AC-10001258,Technology,Accessories,"Belkin Router, Erganomic",1795.92,7,0.0,843.99
4,ES-2011-3614277,2011-12-01,2011-12-03,Second Class,SL-20155,Sara Luxemburg,Home Office,Valencia,Valenciana,Spain,,EU,South,TEC-CO-10004042,Technology,Copiers,"Brother Wireless Fax, Laser",1894.05,5,0.0,700.65


In [44]:
# Select & Filter the data to display only entries with profits less than 0 and region is 'West'
df.select(['Region', 'Order ID', 'Sales', 'Profit']) \
    .filter(df.Profit < 0).filter(df.Region == 'West') \
    .limit(10) \
    .toPandas()

Unnamed: 0,Region,Order ID,Sales,Profit
0,West,CA-2011-148915,443.92,-94.333
1,West,CA-2011-108189,15.36,-3.264
2,West,CA-2011-108189,7.16,-0.0895
3,West,CA-2011-105648,626.352,-23.4882
4,West,CA-2011-133690,218.75,-161.875
5,West,CA-2011-125829,573.728,-64.5444
6,West,US-2011-112872,275.49,-170.8038
7,West,US-2011-163797,49.792,-11.8256
8,West,CA-2011-100090,502.488,-87.9354
9,West,CA-2011-151708,95.976,-10.7973


### SQLs

In [45]:
# Create TempView
df.createOrReplaceTempView('superstore')

In [47]:
# Query the data to display only entries with profits less than 0 and region is 'West'
spark.sql("""
    SELECT 
        Region,
        `Order ID`,
        Sales,
        Profit
    FROM superstore
    WHERE Profit < 0 AND Region = 'West'
""").limit(10).toPandas()

Unnamed: 0,Region,Order ID,Sales,Profit
0,West,CA-2011-148915,443.92,-94.333
1,West,CA-2011-108189,15.36,-3.264
2,West,CA-2011-108189,7.16,-0.0895
3,West,CA-2011-105648,626.352,-23.4882
4,West,CA-2011-133690,218.75,-161.875
5,West,CA-2011-125829,573.728,-64.5444
6,West,US-2011-112872,275.49,-170.8038
7,West,US-2011-163797,49.792,-11.8256
8,West,CA-2011-100090,502.488,-87.9354
9,West,CA-2011-151708,95.976,-10.7973


## Descriptive Function
Descriptive function provides short summary in single dimension of dataset.

In [48]:
df.describe(['Sales', 'Profit']).toPandas()

Unnamed: 0,summary,Sales,Profit
0,count,50990.0,51290.0
1,mean,247.4554011743472,28.597540880873467
2,stddev,488.7169016368617,174.3747508696452
3,min,0.444,-6599.978
4,max,22638.48,8399.976


In [49]:
# Count the number of distinct values of `Product Name` and display them
print(df.select('Product Name').distinct().count())
df.select('Product Name').distinct().limit(10).toPandas()

3785


Unnamed: 0,Product Name
0,"Chromcraft Computer Table, Fully Assembled"
1,"SanDisk Router, Erganomic"
2,"Avery 3-Hole Punch, Recycled"
3,Wilson Jones 14 Line Acrylic Coated Pressboard...
4,"SanDisk Keyboard, Programmable"
5,"Dania 3-Shelf Cabinet, Mobile"
6,Aastra 6757i CT Wireless VoIP phone
7,"Global Armless Task Chair, Royal Blue"
8,"Samsung Headset, VoIP"
9,"KitchenAid Refrigerator, Black"


In [51]:
# Count the number for each `Product Name`
df.groupBy('Product Name').count().sort('count', ascending=False).limit(10).toPandas()

Unnamed: 0,Product Name,count
0,Staples,227
1,"Cardinal Index Tab, Clear",92
2,"Eldon File Cart, Single Width",90
3,"Rogers File Cart, Single Width",84
4,"Ibico Index Tab, Clear",83
5,"Sanford Pencil Sharpener, Water Color",80
6,"Smead File Cart, Single Width",77
7,"Stanley Pencil Sharpener, Water Color",75
8,"Acco Index Tab, Clear",75
9,"Avery Index Tab, Clear",74


## Enrichment
Enrichment is the process of which emphasize the detail of the data. Sometimes involved with increase in data dimension.

In [52]:
# Creat a new column based on an existing column (`Shipping goal date` = `Order Date` + 3)
df = df.withColumn('Shipping goal date', date_add(col('Order Date'), 3))
df.select(['Order ID', 'Order Date', 'Shipping goal date', 'Ship Date']).limit(10).toPandas()

Unnamed: 0,Order ID,Order Date,Shipping goal date,Ship Date
0,AG-2011-2040,2011-01-01,2011-01-04,2011-01-06
1,IN-2011-47883,2011-01-01,2011-01-04,2011-01-08
2,HU-2011-1220,2011-01-01,2011-01-04,2011-01-05
3,IT-2011-3647632,2011-01-01,2011-01-04,2011-01-05
4,IN-2011-47883,2011-01-01,2011-01-04,2011-01-08
5,IN-2011-47883,2011-01-01,2011-01-04,2011-01-08
6,IN-2011-30733,2011-02-01,2011-02-04,2011-02-03
7,CA-2011-115161,2011-02-01,2011-02-04,2011-02-03
8,AO-2011-1390,2011-02-01,2011-02-04,2011-02-04
9,ID-2011-56493,2011-02-01,2011-02-04,2011-02-03


In [53]:
# Determine if the order was shipped late, compare the delivery date (`Ship date`) with the estimated delivery date (`Shipping goal date`)
df = df.withColumn('Is Shipped Late', when(datediff(col('Shipping goal date'), col('Ship Date')) >= 0, 'No').otherwise('Yes'))
df.select(['Order ID', 'Order Date', 'Shipping goal date', 'Ship Date', 'Is Shipped Late']).limit(10).toPandas()

Unnamed: 0,Order ID,Order Date,Shipping goal date,Ship Date,Is Shipped Late
0,AG-2011-2040,2011-01-01,2011-01-04,2011-01-06,Yes
1,IN-2011-47883,2011-01-01,2011-01-04,2011-01-08,Yes
2,HU-2011-1220,2011-01-01,2011-01-04,2011-01-05,Yes
3,IT-2011-3647632,2011-01-01,2011-01-04,2011-01-05,Yes
4,IN-2011-47883,2011-01-01,2011-01-04,2011-01-08,Yes
5,IN-2011-47883,2011-01-01,2011-01-04,2011-01-08,Yes
6,IN-2011-30733,2011-02-01,2011-02-04,2011-02-03,No
7,CA-2011-115161,2011-02-01,2011-02-04,2011-02-03,No
8,AO-2011-1390,2011-02-01,2011-02-04,2011-02-04,No
9,ID-2011-56493,2011-02-01,2011-02-04,2011-02-03,No


In [54]:
# Extract the year, month, and day values from the date column
df = df.withColumn('Year', year('Order Date')).withColumn('Month', month('Order Date')).withColumn('Day', dayofmonth('Order Date'))
df.select(['Order ID', 'Year', 'Month', 'Day']).limit(10).toPandas()

Unnamed: 0,Order ID,Year,Month,Day
0,AG-2011-2040,2011,1,1
1,IN-2011-47883,2011,1,1
2,HU-2011-1220,2011,1,1
3,IT-2011-3647632,2011,1,1
4,IN-2011-47883,2011,1,1
5,IN-2011-47883,2011,1,1
6,IN-2011-30733,2011,2,1
7,CA-2011-115161,2011,2,1
8,AO-2011-1390,2011,2,1
9,ID-2011-56493,2011,2,1


## Aggregation
Data aggregation is a process in which transform the information and display in summary.

In [56]:
# Identify the top spender, list the name of the individual who has spent the most
df.groupBy('Customer ID') \
    .agg(first('Customer Name'), sum(df['Sales'])) \
    .sort(col('sum(Sales)').desc()) \
    .limit(10) \
    .toPandas()

Unnamed: 0,Customer ID,first(Customer Name),sum(Sales)
0,TA-21385,Tom Ashbrook,35668.1208
1,GT-14710,Greg Tran,34471.89028
2,TC-20980,Tamara Chand,34183.899
3,SM-20320,Sean Miller,31125.29496
4,BW-11110,Bart Watters,30613.6165
5,HL-15040,Hunter Lopez,29664.23058
6,SE-20110,Sanjit Engle,29532.62502
7,PS-19045,Penelope Sewall,29252.3194
8,RB-19360,Raymond Buch,29197.6346
9,ZC-21910,Zuschuss Carroll,28468.24726


In [59]:
# Divide each customer's spending into categories
sales_per_customer = df.groupBy(['Customer ID', 'Customer name']) \
                        .pivot('Category') \
                        .agg(sum('Sales')) \
                        .fillna(0)

sales_per_customer = sales_per_customer.withColumn('Total', expr('Furniture + `Office Supplies` + Technology'))
sales_per_customer.sort(col('Total').desc()).limit(10).toPandas()

Unnamed: 0,Customer ID,Customer name,Furniture,Office Supplies,Technology,Total
0,TA-21385,Tom Ashbrook,3350.6235,12024.4565,20293.0408,35668.1208
1,GT-14710,Greg Tran,15690.047,7686.794,11095.04928,34471.89028
2,TC-20980,Tamara Chand,7803.339,3422.243,22958.317,34183.899
3,SM-20320,Sean Miller,1669.746,3879.1616,25576.38736,31125.29496
4,BW-11110,Bart Watters,12023.8106,7191.1989,11398.607,30613.6165
5,HL-15040,Hunter Lopez,5517.018,3191.4659,20955.74668,29664.23058
6,SE-20110,Sanjit Engle,6136.7955,4773.984,18621.84552,29532.62502
7,PS-19045,Penelope Sewall,10674.5736,8656.3581,9921.3877,29252.3194
8,RB-19360,Raymond Buch,4720.493,6018.214,18458.9276,29197.6346
9,ZC-21910,Zuschuss Carroll,6122.1111,9235.3917,13110.74446,28468.24726


In [60]:
# Summarize total sales by sub-category
df.groupBy('Sub-Category') \
    .agg(sum('Sales')) \
    .toPandas()

Unnamed: 0,Sub-Category,sum(Sales)
0,Envelopes,169767.4
1,Art,372092.0
2,Chairs,1501682.0
3,Furnishings,376625.3
4,Supplies,242353.2
5,Fasteners,83226.69
6,Binders,458404.5
7,Bookcases,1466572.0
8,Labels,73404.03
9,Paper,241168.6


In [62]:
# Divide sales of each sub-category into sales from each customer segment
df.groupBy('Sub-Category') \
    .pivot('Segment') \
    .agg(sum('Sales')) \
    .fillna(0) \
    .toPandas()

Unnamed: 0,Sub-Category,Consumer,Corporate,Home Office
0,Envelopes,88059.1481,51230.0448,30478.1967
1,Art,200870.6144,103571.3772,67649.9743
2,Chairs,778362.5301,448519.5128,274799.7213
3,Furnishings,198474.547,109992.366,68158.4089
4,Supplies,122827.2331,81354.1011,38171.8184
5,Fasteners,42063.2492,26136.4317,15027.011
6,Binders,252311.1346,126582.0367,79511.3184
7,Bookcases,765111.1448,457327.1684,244133.9286
8,Labels,38698.0352,21192.7337,13513.2611
9,Paper,118109.1151,71135.0553,51924.461


### Window function

In [70]:
# Calculate the contribution of each sub-category to the total sales of each customer segment, using a table-down approach
window = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.groupBy('Sub-Category') \
    .pivot('Segment') \
    .agg(sum('Sales')) \
    .fillna(0) \
    .withColumn('Consumer Total', sum(col('Consumer')).over(window)) \
    .withColumn('Percent Consumer', col('Consumer')*100/col('Consumer Total')) \
    .drop(col('Consumer Total')) \
    .withColumn('Corporate Total', sum(col('Corporate')).over(window)) \
    .withColumn('Percent Corporate', col('Corporate')*100/col('Corporate Total')) \
    .drop(col('Corporate Total')) \
    .withColumn('Home Office Total', sum(col('Home Office')).over(window)) \
    .withColumn('Percent Home Office', col('Home Office')*100/col('Home Office Total')) \
    .drop(col('Home Office Total')) \
    .toPandas()

Unnamed: 0,Sub-Category,Consumer,Corporate,Home Office,Percent Consumer,Percent Corporate,Percent Home Office
0,Envelopes,88059.1481,51230.0448,30478.1967,1.355441,1.342804,1.321759
1,Art,200870.6144,103571.3772,67649.9743,3.09188,2.714735,2.933802
2,Chairs,778362.5301,448519.5128,274799.7213,11.980864,11.756257,11.917342
3,Furnishings,198474.547,109992.366,68158.4089,3.054999,2.883037,2.955851
4,Supplies,122827.2331,81354.1011,38171.8184,1.890605,2.132393,1.655411
5,Fasteners,42063.2492,26136.4317,15027.011,0.647454,0.685069,0.651682
6,Binders,252311.1346,126582.0367,79511.3184,3.883673,3.317873,3.448197
7,Bookcases,765111.1448,457327.1684,244133.9286,11.776894,11.987117,10.587447
8,Labels,38698.0352,21192.7337,13513.2611,0.595655,0.555488,0.586035
9,Paper,118109.1151,71135.0553,51924.461,1.817982,1.864539,2.251827


In [71]:
# Calculate the contribution of each customer segment to the total sales of each sub-category, using a table-across approach
df.groupBy('Sub-Category') \
    .pivot('Segment') \
    .agg(sum('Sales')) \
    .fillna(0) \
    .withColumn('Total', expr('Consumer + Corporate + `Home Office`')) \
    .withColumn('Percent Consumer', col('Consumer')*100/col('Total')) \
    .withColumn('Percent Corporate', col('Corporate')*100/col('Total')) \
    .withColumn('Percent Home Office', col('Home Office')*100/col('Total')) \
    .withColumn('Percent Total', expr('`Percent Consumer` + `Percent Corporate` + `Percent Home Office`')) \
    .toPandas()

Unnamed: 0,Sub-Category,Consumer,Corporate,Home Office,Total,Percent Consumer,Percent Corporate,Percent Home Office,Percent Total
0,Envelopes,88059.1481,51230.0448,30478.1967,169767.4,51.870473,30.176611,17.952916,100.0
1,Art,200870.6144,103571.3772,67649.9743,372092.0,53.984131,27.834887,18.180982,100.0
2,Chairs,778362.5301,448519.5128,274799.7213,1501682.0,51.832722,29.867814,18.299464,100.0
3,Furnishings,198474.547,109992.366,68158.4089,376625.3,52.698142,29.204719,18.097139,100.0
4,Supplies,122827.2331,81354.1011,38171.8184,242353.2,50.681096,33.56841,15.750494,100.0
5,Fasteners,42063.2492,26136.4317,15027.011,83226.69,50.540576,31.403906,18.055519,100.0
6,Binders,252311.1346,126582.0367,79511.3184,458404.5,55.041157,27.613612,17.345231,100.0
7,Bookcases,765111.1448,457327.1684,244133.9286,1466572.0,52.170028,31.183405,16.646567,100.0
8,Labels,38698.0352,21192.7337,13513.2611,73404.03,52.719224,28.871349,18.409427,100.0
9,Paper,118109.1151,71135.0553,51924.461,241168.6,48.973664,29.495982,21.530354,100.0
