- https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565
- https://intellipaat.com/community/6443/how-to-optimize-shuffle-spill-in-apache-spark-application#:~:text=Spark%201.4%20has%20some%20better,available%20for%20the%20shuffle%20buffer.&text=Increase%20the%20memory%20in%20your,executor.

In [1]:
import pyspark

In [2]:
pyspark.__file__

'/opt/spark/python/pyspark/__init__.py'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = (
    SparkSession
    .builder
    .master('local[*]')
    .config(
        'spark.sql.shuffle.partitions',
        '400'
    )
    .config("spark.sql.autoBroadcastJoinThreshold", -1)
    .config('spark.driver.memory', '5g')
    .appName('hello-spark')
    .getOrCreate()
)

In [5]:
spark

## Warmup 1

In [6]:
sdf_products = spark.read.parquet('./products_parquet')
sdf_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)



In [7]:
sdf_seller = spark.read.parquet('./sellers_parquet//')
sdf_seller.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [8]:
sdf_order = spark.read.parquet("./sales_parquet/")
sdf_order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [9]:
sdf_products.select('product_id').distinct().count()

75000000

In [10]:
sdf_seller.select('seller_id').distinct().count()

10

In [11]:
sdf_order.select('order_id').distinct().count()

20000040

## Warm Up 2

In [9]:
from pyspark.sql import functions as F

c = F.col

In [13]:
sdf_order.groupBy('date').agg(
    F.countDistinct('product_id').alias('uniq_product_cnt')
).show(30)

+----------+----------------+
|      date|uniq_product_cnt|
+----------+----------------+
|2020-07-10|          100218|
|2020-07-09|           99801|
|2020-07-06|           99869|
|2020-07-02|           99768|
|2020-07-03|          100224|
|2020-07-07|           99453|
|2020-07-01|           99755|
|2020-07-08|          100048|
|2020-07-04|          100294|
|2020-07-05|           99991|
+----------+----------------+



## Excercise 1

In [14]:
sdf_order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [15]:
sdf_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)



In [16]:
sdf_order.select(
    'product_id', 'num_pieces_sold'
).join(
    sdf_products.select('product_id', 'price'),
    on='product_id',
    how='left'
).withColumn(
    'revenue',
    c('num_pieces_sold').cast('float')*c('price').cast('float')
).groupBy().agg(
    F.avg('revenue').alias('avg_revenue')
).show()

+------------------+
|       avg_revenue|
+------------------+
|1245.9236386027228|
+------------------+



### salting

In [17]:
sdf_order_salted = (
    sdf_order.select(
        'product_id',
        'num_pieces_sold'
    ).withColumn(
        'salt-key',
        F.concat_ws('-', 'product_id', F.floor(F.rand() * 10).cast('string'))
    )
)

In [18]:
sdf_products_salted = (
    sdf_products
    .select('product_id', 'price')
    .withColumn(
        'salts',
        F.array([F.lit(f'{i}') for i in range(10)])
    ).withColumn(
        'salt',
        F.explode('salts')
    )
    .withColumn(
        'salt-key',
        F.concat_ws('-', 'product_id', 'salt')
    )
    .drop('salt', 'salts')
)

In [21]:
sdf_order_salted.join(
    sdf_products_salted,
    on='salt-key',
    how='left'
).withColumn(
    'revenue',
    c('price').cast('float') * c('num_pieces_sold').cast('float')
).coalesce(
    400
).groupBy().agg(
    F.avg('revenue').alias('avg_revenue')
).show()

+------------------+
|       avg_revenue|
+------------------+
|1245.9236386027228|
+------------------+



## Excercise 2

In [10]:
sdf_order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [11]:
sdf_seller.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [12]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", '-1')

In [14]:
sdf_order.select('seller_id', 'num_pieces_sold').join(
    sdf_seller,
    on='seller_id'
).withColumn(
    'ratio',
    c('num_pieces_sold') / c('daily_target')
).groupBy(
    'seller_id'
).agg(
    F.avg('ratio')
).show()

+---------+--------------------+
|seller_id|          avg(ratio)|
+---------+--------------------+
|        3|7.060842894390445E-4|
|        0| 2.01973622529017E-5|
|        4|3.845384604576898...|
|        7|8.510553537464244E-5|
|        8|0.002071646546208...|
|        5|8.038980497173663E-5|
|        6|2.534518215186249...|
|        9|1.449276275189615E-4|
|        1|3.670188787905763...|
|        2|2.456721945951509E-4|
+---------+--------------------+



In [16]:
sdf_seller.count()

10

In [17]:
sdf_order.select('seller_id', 'num_pieces_sold').join(
    F.broadcast(sdf_seller.select('seller_id', 'daily_target')),
    on='seller_id'
).withColumn(
    'ratio',
    c('num_pieces_sold') / c('daily_target')
).groupBy(
    'seller_id'
).agg(
    F.avg('ratio')
).show()

+---------+--------------------+
|seller_id|          avg(ratio)|
+---------+--------------------+
|        3|7.060842894390356E-4|
|        0|2.019736225262708E-5|
|        4|3.845384604576818E-5|
|        7|8.510553537463733E-5|
|        8|0.002071646546208...|
|        5|8.038980497175858E-5|
|        6|2.534518215186312...|
|        9|1.449276275189590...|
|        1|3.670188787904605E-5|
|        2|2.456721945951487E-4|
+---------+--------------------+



## Exercise 3

In [13]:
sdf_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)



In [14]:
sdf_seller.printSchema()

root
 |-- seller_id: string (nullable = true)
 |-- seller_name: string (nullable = true)
 |-- daily_target: string (nullable = true)



In [16]:
sdf_order.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)



In [20]:
from pyspark.sql.window import Window

In [23]:
spec = Window.orderBy('total_pieces').partitionBy('seller_id', 'product_id').orderBy

Object `orderBy` not found.


In [24]:
spec = Window.orderBy('total_pieces').partitionBy('seller_id', 'product_id')

In [31]:
sdf_ex3 = (
    sdf_order
    .groupBy('seller_id', 'product_id')
    .agg(
        F.sum('num_pieces_sold').alias('total_pieces')
    )
    .withColumn(
        'rank_desc',
        F.dense_rank().over(spec.orderBy(c('total_pieces').desc()))
    )
    .withColumn(
        'rank_asc',
        F.dense_rank().over(spec.orderBy(c('total_pieces').asc()))
    )
    .filter(
        (c('rank_desc') == 2) | (c('rank_asc') == 1)
    )
)

In [32]:
sdf_ex3.show()

+---------+----------+------------+---------+--------+
|seller_id|product_id|total_pieces|rank_desc|rank_asc|
+---------+----------+------------+---------+--------+
|        1|  10236486|        13.0|        1|       1|
|        1|  10412465|        26.0|        1|       1|
|        1|  11343368|         2.0|        1|       1|
|        1|  11512358|        34.0|        1|       1|
|        1|  11624101|        87.0|        1|       1|
|        1|  11946844|        68.0|        1|       1|
|        1|  12318852|        94.0|        1|       1|
|        1|  12741803|        28.0|        1|       1|
|        1|  12810525|        13.0|        1|       1|
|        1|  13129262|        34.0|        1|       1|
|        1|  13334833|        73.0|        1|       1|
|        1|  13426415|        45.0|        1|       1|
|        1|  13434379|        17.0|        1|       1|
|        1|   1347749|        74.0|        1|       1|
|        1|  13629474|        65.0|        1|       1|
|        1

In [33]:
sdf_ex3.filter(
    c('product_id') == 0
).show()

+---------+----------+------------+---------+--------+
|seller_id|product_id|total_pieces|rank_desc|rank_asc|
+---------+----------+------------+---------+--------+
|        0|         0|9.59374707E8|        1|       1|
+---------+----------+------------+---------+--------+

