# 1. Miscellaneous techniques

## 1.1. Mapping
Mapping in PySpark requires using the `udf()` function, which allows a Python function to work on PySpark dataframes.

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [8]:
cars = pd.read_excel(r'data\cars.xlsx')
cars = spark.createDataFrame(cars.astype(str)).replace('nan', None)
cars = cars.withColumn('price', F.col('price').cast('float'))
cars.limit(5)

manufacturer,model,type,min_price,price,max_price,mpg_city,mpg_highway,airbags,drive_train,cylinders,engine_size,horsepower,rpm,rev_per_mile,man_trans_avail,fuel_tank_capacity,passengers,length,wheelbase,width,turn_circle,rear_seat_room,luggage_room,weight,origin,make
Chevrolet,Cavalier,Compact,8.5,13.4,18.3,25,36,,Front,4.0,2.2,110,5200,2380,Yes,15.2,5,182,101,66,38,25.0,13.0,2490,USA,Chevrolet Cavalier
Chevrolet,Corsica,Compact,11.4,11.4,11.4,25,34,Driver only,Front,4.0,2.2,110,5200,2665,Yes,15.6,5,184,103,68,39,26.0,14.0,2785,USA,Chevrolet Corsica
Chevrolet,Camaro,Sporty,13.4,15.1,16.8,19,28,Driver & Passenger,Rear,6.0,3.4,160,4600,1805,Yes,15.5,4,193,101,74,43,25.0,13.0,3240,USA,Chevrolet Camaro
Chevrolet,Lumina,Midsize,13.4,15.9,18.4,21,29,,Front,4.0,2.2,110,5200,2595,No,16.5,6,198,108,71,40,28.5,16.0,3195,USA,Chevrolet Lumina
Chevrolet,Lumina_APV,Van,14.7,16.3,18.0,18,23,,Front,6.0,3.8,170,4800,1690,No,20.0,7,178,110,74,44,30.5,,3715,USA,Chevrolet Lumina_APV


In [20]:
cars.approxQuantile('price', [0.25, 0.5, 0.75], relativeError=0)

[11.600000381469727, 15.899999618530273, 18.799999237060547]

In [33]:
def tmp_getPriceLevel(price):
    if price < 11.6:
        return 'very low'
    elif price < 15.9:
        return 'low'
    elif price < 18.8:
        return 'high'
    else:
        return 'very high'

getPriceLevel = F.udf(lambda price: tmp_getPriceLevel(price))

In [40]:
cars.select('price', getPriceLevel('price').alias('price_level')).limit(5)

price,price_level
13.4,low
11.4,very low
15.1,low
15.9,low
16.3,high


## 1.2. Window functions

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [15]:
data = (
    ('James', 'Sales', 3000),
    ('Harry', 'Sales', 3500),
    ('Ash', 'Sales', 3000),
    ('Michael', 'Sales', 4600),
    ('Robert', 'Sales', 4100),
    ('Maria', 'Finance', 3000),
    ('Wayne', 'Sales', 3000),
    ('Scott', 'Finance', 3300),
    ('Jen', 'Finance', 3900),
    ('Jeff', 'Marketing', 3000),
    ('Kumar', 'Marketing', 2000),
    ('Saif', 'Sales', 4100))
 
columns= ['employee', 'department', 'salary']

df = spark.createDataFrame(data=data, schema=columns)
df

employee,department,salary
James,Sales,3000
Harry,Sales,3500
Ash,Sales,3000
Michael,Sales,4600
Robert,Sales,4100
Maria,Finance,3000
Wayne,Sales,3000
Scott,Finance,3300
Jen,Finance,3900
Jeff,Marketing,3000


In addition to activate window functions, we firstly initialize the window.

In [43]:
from pyspark.sql.window import Window
window = Window.partitionBy('department').orderBy('salary')

#### Ranking

In [30]:
df\
    .withColumn('row_number', F.row_number().over(window))\
    .withColumn('rank', F.rank().over(window))\
    .withColumn('dense_rank', F.dense_rank().over(window))\
    .withColumn('percent_rank', F.round(F.percent_rank().over(window), 2))\
    .withColumn('cume_dist', F.round(F.cume_dist().over(window), 2))

employee,department,salary,row_number,rank,dense_rank,percent_rank,cume_dist
James,Sales,3000,1,1,1,0.0,0.43
Ash,Sales,3000,2,1,1,0.0,0.43
Wayne,Sales,3000,3,1,1,0.0,0.43
Harry,Sales,3500,4,4,2,0.5,0.57
Robert,Sales,4100,5,5,3,0.67,0.86
Saif,Sales,4100,6,5,3,0.67,0.86
Michael,Sales,4600,7,7,4,1.0,1.0
Maria,Finance,3000,1,1,1,0.0,0.33
Scott,Finance,3300,2,2,2,0.5,0.67
Jen,Finance,3900,3,3,3,1.0,1.0


#### Shifting

In [38]:
df\
    .withColumn('lag_1', F.lag('salary', 1).over(window))\
    .withColumn('lead_2', F.lead('salary', 2).over(window))

employee,department,salary,lag_1,lead_2
James,Sales,3000,,3000.0
Ash,Sales,3000,3000.0,3500.0
Wayne,Sales,3000,3000.0,4100.0
Harry,Sales,3500,3000.0,4100.0
Robert,Sales,4100,3500.0,4600.0
Saif,Sales,4100,4100.0,
Michael,Sales,4600,4100.0,
Maria,Finance,3000,,3900.0
Scott,Finance,3300,3000.0,
Jen,Finance,3900,3300.0,


#### Aggregating

In [47]:
df\
    .withColumn('cumsum', F.sum('salary').over(window))

employee,department,salary,cumsum
James,Sales,3000,9000
Ash,Sales,3000,9000
Wayne,Sales,3000,9000
Harry,Sales,3500,12500
Robert,Sales,4100,20700
Saif,Sales,4100,20700
Michael,Sales,4600,25300
Maria,Finance,3000,3000
Scott,Finance,3300,6300
Jen,Finance,3900,10200


# 2. Pivot table

## 2.1. Unpivot

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [99]:
data = (
    ('red', 1000, 1200, 1500),
    ('green', 1500, 1500, 1575),
    ('blue', 2000, 2200, 2000)
)

columns = ['color', 'small', 'medium', 'large']

df = spark.createDataFrame(data, schema=columns)
df

color,small,medium,large
red,1000,1200,1500
green,1500,1500,1575
blue,2000,2200,2000


In [100]:
df.select('color', F.expr('stack(3, "small", small, "medium", medium, "large", large) as (size, price)'))

color,size,price
red,small,1000
red,medium,1200
red,large,1500
green,small,1500
green,medium,1500
green,large,1575
blue,small,2000
blue,medium,2200
blue,large,2000


## 2.2. Pivot table

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [62]:
supermarket = spark.read.csv(r'data\supermarket_sales.csv', header=True)
supermarket.limit(5)

invoice_id,brand,city,customer_type,gender,product_line,unit_price,quantity,tax,date,time,payment,cost,gross_margin_percentage,profit,rating
750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,01/05/2019,13:08,Ewallet,522.83,4.761904762,26.1415,9.1
226-31-3081,C,Naypyitaw,Normal,Female,Electronic access...,15.28,5,3.82,03/08/2019,10:29,Cash,76.4,4.761904762,3.82,9.6
631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,03/03/2019,13:23,Credit card,324.31,4.761904762,16.2155,7.4
123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,1/27/2019,20:33,Ewallet,465.76,4.761904762,23.288,8.4
373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,02/08/2019,10:37,Ewallet,604.17,4.761904762,30.2085,5.3


In [64]:
supermarket\
    .groupBy('product_line')\
    .pivot('city')\
    .agg(F.round(F.mean('profit'), 2))

product_line,Mandalay,Naypyitaw,Yangon
Home and lifestyle,16.71,14.7,16.42
Fashion accessories,12.61,15.79,15.25
Health and beauty,17.95,15.22,12.76
Electronic access...,14.76,16.42,14.54
Food and beverages,14.49,17.15,14.09
Sports and travel,15.35,16.68,15.64


#### Multivariate pivoting

In [67]:
supermarket\
    .withColumn('info', F.concat(F.col('gender'), F.lit(', '), F.col('customer_type')))\
    .groupBy('product_line')\
    .pivot('info')\
    .agg(F.round(F.mean('profit'), 2))

product_line,"Female, Member","Female, Normal","Male, Member","Male, Normal"
Home and lifestyle,17.46,19.05,14.21,13.84
Fashion accessories,15.32,14.88,13.68,14.03
Health and beauty,13.3,14.26,19.33,13.95
Electronic access...,15.17,15.5,14.78,15.38
Food and beverages,18.3,16.57,13.02,13.03
Sports and travel,15.55,15.34,15.31,16.98


## 2.3. Crosstab
Crosstab is a special case of pivot table, where `count` is selected as the aggregate function.

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [62]:
supermarket = spark.read.csv(r'data\supermarket_sales.csv', header=True)
supermarket.limit(5)

invoice_id,brand,city,customer_type,gender,product_line,unit_price,quantity,tax,date,time,payment,cost,gross_margin_percentage,profit,rating
750-67-8428,A,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,01/05/2019,13:08,Ewallet,522.83,4.761904762,26.1415,9.1
226-31-3081,C,Naypyitaw,Normal,Female,Electronic access...,15.28,5,3.82,03/08/2019,10:29,Cash,76.4,4.761904762,3.82,9.6
631-41-3108,A,Yangon,Normal,Male,Home and lifestyle,46.33,7,16.2155,03/03/2019,13:23,Credit card,324.31,4.761904762,16.2155,7.4
123-19-1176,A,Yangon,Member,Male,Health and beauty,58.22,8,23.288,1/27/2019,20:33,Ewallet,465.76,4.761904762,23.288,8.4
373-73-7910,A,Yangon,Normal,Male,Sports and travel,86.31,7,30.2085,02/08/2019,10:37,Ewallet,604.17,4.761904762,30.2085,5.3


In [68]:
supermarket.crosstab('city', 'payment')

city_payment,Cash,Credit card,Ewallet
Naypyitaw,124,98,106
Mandalay,110,109,113
Yangon,110,104,126


# 3. Combining datasets

## 3.1. Union
PySpark supports two union methods:
- `union`: union using the current order of columns
- `unionByName`: union using column names

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [102]:
data = (
    (2019, 1, 2500),
    (2019, 2, 3500),
    (2019, 3, 4000),
    (2019, 4, 5000)
)

columns = ['year', 'quarter', 'profit']

df_19 = spark.createDataFrame(data, schema=columns)
df_19

year,quarter,profit
2019,1,2500
2019,2,3500
2019,3,4000
2019,4,5000


In [103]:
data = (
    (2020, 1, 2700),
    (2020, 2, 3900),
    (2020, 3, 5000),
    (2020, 4, 8000)
)

columns = ['year', 'quarter', 'profit']

df_20 = spark.createDataFrame(data, schema=columns)
df_20

year,quarter,profit
2020,1,2700
2020,2,3900
2020,3,5000
2020,4,8000


In [104]:
df_19.union(df_20)

year,quarter,profit
2019,1,2500
2019,2,3500
2019,3,4000
2019,4,5000
2020,1,2700
2020,2,3900
2020,3,5000
2020,4,8000


In [105]:
df_19.unionByName(df_20)

year,quarter,profit
2019,1,2500
2019,2,3500
2019,3,4000
2019,4,5000
2020,1,2700
2020,2,3900
2020,3,5000
2020,4,8000


## 3.2. Join

In [1]:
import findspark; findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

import pyspark.sql.functions as F
import pyspark.sql.types as T

import numpy as np
import pandas as pd

In [125]:
data = (
    ('Hannah', 1200, 'Allowance'),
    ('James', 3000, 'Basic'),
    ('Gabriel', 700, 'Allowance'),
    ('Smith', 2000, 'Basic'),
    ('Alex', 10000, 'Higher'),
)

columns = ['name', 'income_before_tax', 'tax_band']

income = spark.createDataFrame(data, schema=columns)
income

name,income_before_tax,tax_band
Hannah,1200,Allowance
James,3000,Basic
Gabriel,700,Allowance
Smith,2000,Basic
Alex,10000,Higher


In [118]:
data = (
    ('Allowance', 'Up to 12,500', 0.0),
    ('Basic', '12,501 to 50,000', 0.2),
    ('Higher', '50,001 to 150,000', 0.4),
    ('Additional', 'Over 150,000', 0.45),
)

columns = ['band', 'income_range', 'tax_rate']

tax = spark.createDataFrame(data, columns)
tax

band,income_range,tax_rate
Allowance,"Up to 12,500",0.0
Basic,"12,501 to 50,000",0.2
Higher,"50,001 to 150,000",0.4
Additional,"Over 150,000",0.45


In [122]:
income.join(tax, income.tax_band==tax.band, how='left')

name,income_before_tax,tax_band,band,income_range,tax_rate
Hannah,1200,Allowance,Allowance,"Up to 12,500",0.0
Gabriel,700,Allowance,Allowance,"Up to 12,500",0.0
Alex,10000,Higher,Higher,"50,001 to 150,000",0.4
James,3000,Basic,Basic,"12,501 to 50,000",0.2
Smith,2000,Basic,Basic,"12,501 to 50,000",0.2


In [128]:
income\
    .withColumnRenamed('tax_band', 'band')\
    .join(tax, on='band', how='left')

band,name,income_before_tax,income_range,tax_rate
Allowance,Hannah,1200,"Up to 12,500",0.0
Allowance,Gabriel,700,"Up to 12,500",0.0
Higher,Alex,10000,"50,001 to 150,000",0.4
Basic,James,3000,"12,501 to 50,000",0.2
Basic,Smith,2000,"12,501 to 50,000",0.2
