### Window Functions

In [0]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

Compute the Rank (skips numbers) for each product within its category based on the product's revenue

In [0]:
df = spark.read.option('header', True).csv('/FileStore/tables/productRevenue.csv')
display(spark.sql('select * from productRevenue').collect())

product,category,revenue
thin,cell phone,6000
normal,tablet,1500
mini,tablet,5500
ultra thin,cell phone,5000
very thin,cell phone,6000
big,tablet,2500
bendable,cell phone,3000
foldable,cell phone,3000
pro,tablet,4500
pro2,tablet,6500


In [0]:
windowSpec = Window.partitionBy(df['category']).orderBy(df['revenue'])
result = df.select(df['*'], func.rank().over(windowSpec).alias('rank_in_category'))
display(result.collect())

product,category,revenue,rank_in_category
normal,tablet,1500,1
big,tablet,2500,2
pro,tablet,4500,3
mini,tablet,5500,4
pro2,tablet,6500,5
bendable,cell phone,3000,1
foldable,cell phone,3000,1
ultra thin,cell phone,5000,3
thin,cell phone,6000,4
very thin,cell phone,6000,4


##### Using Dense Rank (consecutive numbers)

In [0]:
result = df.select(df['*'], func.dense_rank().over(windowSpec).alias('dense_rank_in_category'))
display(result.collect())

product,category,revenue,dense_rank_in_category
normal,tablet,1500,1
big,tablet,2500,2
pro,tablet,4500,3
mini,tablet,5500,4
pro2,tablet,6500,5
bendable,cell phone,3000,1
foldable,cell phone,3000,1
ultra thin,cell phone,5000,2
thin,cell phone,6000,3
very thin,cell phone,6000,3


##### In SQL

In [0]:
df.createOrReplaceTempView('productRevenue')
query = """SELECT *, 
  dense_rank() over (PARTITION BY category ORDER BY revenue) as dense_rank_in_category
  FROM productRevenue"""
display(spark.sql(query).collect())

product,category,revenue,dense_rank_in_category
normal,tablet,1500,1
big,tablet,2500,2
pro,tablet,4500,3
mini,tablet,5500,4
pro2,tablet,6500,5
bendable,cell phone,3000,1
foldable,cell phone,3000,1
ultra thin,cell phone,5000,2
thin,cell phone,6000,3
very thin,cell phone,6000,3


What are the best-selling and the second best-selling products in every category?

In [0]:
query = """SELECT
  rank,
  product,
  category,
  revenue
FROM (
  SELECT
    product,
    category,
    revenue,
    dense_rank() OVER (PARTITION BY category ORDER BY revenue DESC) as rank
  FROM productRevenue
  ) tmp
WHERE
  rank <= 2"""
display(spark.sql(query).collect())

rank,product,category,revenue
1,pro2,tablet,6500
2,mini,tablet,5500
1,thin,cell phone,6000
1,very thin,cell phone,6000
2,ultra thin,cell phone,5000


What is the difference between the revenue of each product and the revenue of the best selling product in the same category as that product?

In [0]:
windowSpec = Window.partitionBy(df['category']).orderBy(df['revenue'].desc())
result = df.select(df['*'], func.max(df['revenue']).over(windowSpec).alias('max_revenue'), (func.max(df['revenue']).over(windowSpec) - df['revenue']).alias('revenue_diff'))
display(result.collect())

product,category,revenue,max_revenue,revenue_diff
pro2,tablet,6500,6500,0.0
mini,tablet,5500,6500,1000.0
pro,tablet,4500,6500,2000.0
big,tablet,2500,6500,4000.0
normal,tablet,1500,6500,5000.0
thin,cell phone,6000,6000,0.0
very thin,cell phone,6000,6000,0.0
ultra thin,cell phone,5000,6000,1000.0
bendable,cell phone,3000,6000,3000.0
foldable,cell phone,3000,6000,3000.0


### Defining Row and Range Windows

In [0]:
# Display original data
df = sqlContext.table('productRevenue')
display(df)

product,category,revenue
thin,cell phone,6000
normal,tablet,1500
mini,tablet,5500
ultra thin,cell phone,5000
very thin,cell phone,6000
big,tablet,2500
bendable,cell phone,3000
foldable,cell phone,3000
pro,tablet,4500
pro2,tablet,6500


Row-Based Windows

In [0]:
windowSpec = Window.partitionBy(df.category).orderBy(df.revenue.desc()).rowsBetween(-1, 1)
result = df.select(df.category, df.product, df.revenue, func.avg(df.revenue).over(windowSpec).alias('running_avg'))
display(result.collect())

category,product,revenue,running_avg
tablet,pro2,6500,6000.0
tablet,mini,5500,5500.0
tablet,pro,4500,4166.666666666667
tablet,big,2500,2833.333333333333
tablet,normal,1500,2000.0
cell phone,thin,6000,6000.0
cell phone,very thin,6000,5666.666666666667
cell phone,ultra thin,5000,4666.666666666667
cell phone,bendable,3000,3666.6666666666665
cell phone,foldable,3000,3000.0


Range-Based Windows

In [0]:
Window.unboundedPreceding

In [0]:
from pyspark.sql.types import IntegerType
windowSpec = Window.partitionBy(df.category).orderBy(df.revenue.desc()).rangeBetween(-1000000000000000000000, Window.unboundedFollowing)
newCol = func.avg(df['revenue']).over(windowSpec)
result = df.select(df.category, df.product, df.revenue, newCol.alias('avg_revenue'))
display(result.collect())

category,product,revenue,avg_revenue
tablet,pro2,6500,4100.0
tablet,mini,5500,4100.0
tablet,pro,4500,4100.0
tablet,big,2500,4100.0
tablet,normal,1500,4100.0
cell phone,thin,6000,4600.0
cell phone,very thin,6000,4600.0
cell phone,ultra thin,5000,4600.0
cell phone,bendable,3000,4600.0
cell phone,foldable,3000,4600.0


### Broadcast Variables and Accumulators

Accumulators

In [0]:
total_age = sc.accumulator(0)

def add_age(row):
  total_age.add(row.age)
  
df = spark.createDataFrame([(10,), (22,), (28,)], ['age'])
df.foreach(add_age) # gets printed to stdout of each executor, not driver
print('Total age = ',total_age.value)

Broadcast Variables

In [0]:
broadcastVar = sc.broadcast([1, 2, 3])

In [0]:
broadcastVar.value

### RDD Transformations

Aggregate - a wide transformation

In [0]:
#aggregate(initial_val_per_partition, reducer_function, combiner_function)
l = [4,51,2,77,25,9]
inputRDD = sc.parallelize(l)
sumAndCount = inputRDD.aggregate((0,0), (lambda acc, elem: (acc[0] + elem, acc[1] + 1)), (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
print('Average = ', sumAndCount[0]/sumAndCount[1])

In [0]:
# check
sum(l) / len(l)

### Caching Data / Persistence Levels

In [0]:
from pyspark.sql.types import IntegerType
df = spark.createDataFrame(inputRDD, IntegerType())

In [0]:
# default storage levels
# StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication=1)
help(df.storageLevel)

In [0]:
df2 = df.persist(StorageLevel.MEMORY_ONLY_SER_2)
print(df2.storageLevel)
df2.storageLevel

In [0]:
df.cache().storageLevel

In [0]:
df.unpersist().storageLevel