Reference: https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565

In [13]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Row
from numpy.random import rand
from pyspark.sql.types import IntegerType, StringType

My machine has following configuration...
- 6 cores with 12vCores
- 32GB RAM

Spark Standalone server:
```
cd /opt/softwares/spark-3.0.1-bin-hadoop3.2/

export PYSPARK_PYTHON=/opt/envs/ai4e/bin/python
export PYSPARK_DRIVER_PYTHON=/opt/envs/ai4e/bin/python

sbin/start-all.sh
sbin/stop-all.sh
```
Spark UI: [http://localhost:8080](http://localhost:8080)   
Spark Master URL : spark://IMCHLT276:7077

In [2]:
spark = SparkSession.builder \
    .master("spark://IMCHLT276:7077") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "2g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "6") \
    .config("spark.local.dir", "/opt/tmp/spark-temp/") \
    .appName("DataSkewness") \
    .getOrCreate()

# https://stackoverflow.com/questions/27774443/is-it-safe-to-temporarily-rename-tmp-and-then-create-a-tmp-symlink-to-a-differe
# sudo mount --bind /path/to/dir/with/plenty/of/space /tmp
# sudo lsof /tmp # check for apps
# sudo umount /tmp

In [3]:
spark

## Dataset

Products  
--------
: product_id - The product ID  
: product_name  - The product name   
: price - The product price   

Sellers  
-------
: seller_id  - The seller ID   
: seller_name  - The seller name    
: daily_target - The number of items (regardless of the product type) that the seller needs to hit his/her quota. For example, if the daily target is 100,000, the employee needs to sell 100,000 products he can hit the quota by selling 100,000 units of product_0, but also selling 30,000 units of product_1 and 70,000 units of product_2   

Sales  
-----
: order_id  - The order ID   
: product_id  - The single product sold in the order. All orders have exactly one product)   
: seller_id  - The selling employee ID that sold the product   
: date - The date of the order.    
: num_pieces_sold - The number of units sold for the specific product in the order    
: bill_raw_text  -  A string that represents the raw text of the bill associated with the order   


In [4]:
def read_df(path, is_print=False):
    if is_print: print(f"\nReading {path}")
    df = spark.read.parquet(path)
    if is_print: df.show()
    if is_print: print(f"Number of records in {path} is {df.count()}" )
    if is_print: print("_"*80 + "\n")
    return df


In [5]:
! ls

DataGenerator.ipynb	 products.csv	   sales.csv	  sellers_parquet
DataSkewExercises.ipynb  products_parquet  sales_parquet
data			 requirements.txt  sellers.csv


In [6]:
products_df, sales_df, sellers_df = None, None, None
def reload(is_print=False):
    global products_df, sales_df, sellers_df
    products_df = read_df("products_parquet", is_print=is_print)
    sellers_df = read_df("sellers_parquet", is_print=is_print)
    sales_df = read_df("sales_parquet/", is_print=is_print)
#     return products_df, sales_df, sellers_df

In [7]:
reload(True)


Reading products_parquet
+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
|         5|   product_5|  128|
|         6|   product_6|   66|
|         7|   product_7|  145|
|         8|   product_8|   51|
|         9|   product_9|   44|
|        10|  product_10|   53|
|        11|  product_11|   13|
|        12|  product_12|  104|
|        13|  product_13|  102|
|        14|  product_14|   24|
|        15|  product_15|   14|
|        16|  product_16|   38|
|        17|  product_17|   72|
|        18|  product_18|   16|
|        19|  product_19|   46|
+----------+------------+-----+
only showing top 20 rows

Number of records in products_parquet is 75000000
________________________________________________________________________________


Reading sellers_parquet
+---------+-----------+

In [None]:
sales_df.filter(F.col('order_id') == 1).show()

**1. Find out how many orders, how many products and how many sellers are in the data.**

In [None]:
%%time
print(f"Sellers no: {sellers_df.count()}")

In [None]:
%%time
print(f"Products no: {products_df.count()}")

In [None]:
%%time
print(f"Sales no: {sales_df.count()}")

**2. How many products have been sold at least once?**

In [None]:
%%time
print("Number of products sold at least once")
sales_df.agg(F.countDistinct(F.col("product_id"))).show()
# sales_df.agg({"product_id" : "countDistinct"}).show()

In [None]:
%%time
sales_df.select(F.countDistinct(F.col("product_id"))).show()

**3. Which is the product contained in more orders?**

In [None]:
%%time
sales_df.groupBy("product_id")\
    .agg(F.count("*").alias("cnt"))\
    .orderBy(F.col("cnt").desc())\
    .limit(1).show()

**4. How many distinct products have been sold in each day?**

In [14]:
%%time
sales_df.groupBy("date") \
        .agg(F.countDistinct(F.col("product_id")).alias("cnt"))\
        .orderBy(F.col("cnt").desc()) \
        .show()

+----------+------+
|      date|   cnt|
+----------+------+
|2020-07-06|100765|
|2020-07-09|100501|
|2020-07-01|100337|
|2020-07-03|100017|
|2020-07-02| 99807|
|2020-07-05| 99796|
|2020-07-04| 99791|
|2020-07-07| 99756|
|2020-07-08| 99662|
|2020-07-10| 98973|
+----------+------+

CPU times: user 6.78 ms, sys: 0 ns, total: 6.78 ms
Wall time: 8.41 s


**5.What is the average revenue of the orders?**

In [8]:
%%time
reload()

CPU times: user 6 µs, sys: 3 µs, total: 9 µs
Wall time: 17.6 µs


Check the DAG visualization of following join...

In [24]:
%%time
res = products_df.join(sales_df, on="product_id", how="inner")
res.explain()

== Physical Plan ==
*(5) Project [product_id#397, product_name#398, price#399, order_id#409, seller_id#411, date#412, num_pieces_sold#413, bill_raw_text#414]
+- *(5) SortMergeJoin [product_id#397], [product_id#410], Inner
   :- *(2) Sort [product_id#397 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(product_id#397, 200), true, [id=#1035]
   :     +- *(1) Project [product_id#397, product_name#398, price#399]
   :        +- *(1) Filter isnotnull(product_id#397)
   :           +- *(1) ColumnarToRow
   :              +- FileScan parquet [product_id#397,product_name#398,price#399] Batched: true, DataFilters: [isnotnull(product_id#397)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/vlab/gyan42/pyspark-learning-ground/dataskew-experiments/products_par..., PartitionFilters: [], PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:string,product_name:string,price:string>
   +- *(4) Sort [product_id#410 ASC NULLS FIRST], false, 0
      +- Exchange hashpa

In [None]:
res.show()

In [18]:
%%time
products_df = products_df.repartition(128)
sales_df = sales_df.repartition(256)
products_df.join(sales_df, on="product_id").count() ### Will end up as timeout error

20000040

In [20]:
%%time
products_df = products_df.withColumnRenamed("product_id", "repartition_id").repartition(512, F.col("repartition_id"))
sales_df = sales_df.withColumnRenamed("product_id", "repartition_id").repartition(512, F.col("repartition_id"))     
products_df.join(sales_df, on="repartition_id").count() ### Will end up as timeout error

20000040

Get the average revenue without salting....

In [8]:
%time
res = products_df.join(sales_df, on="product_id", how="inner").agg(F.avg(products_df["price"] * sales_df["num_pieces_sold"]))
res.explain()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.53 µs
== Physical Plan ==
*(6) HashAggregate(keys=[], functions=[avg((cast(price#2 as double) * cast(num_pieces_sold#58 as double)))])
+- Exchange SinglePartition, true, [id=#188]
   +- *(5) HashAggregate(keys=[], functions=[partial_avg((cast(price#2 as double) * cast(num_pieces_sold#58 as double)))])
      +- *(5) Project [price#2, num_pieces_sold#58]
         +- *(5) SortMergeJoin [product_id#0], [product_id#55], Inner
            :- *(2) Sort [product_id#0 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(product_id#0, 200), true, [id=#169]
            :     +- *(1) Project [product_id#0, price#2]
            :        +- *(1) Filter isnotnull(product_id#0)
            :           +- *(1) ColumnarToRow
            :              +- FileScan parquet [product_id#0,price#2] Batched: true, DataFilters: [isnotnull(product_id#0)], Format: Parquet, Location: InMemoryFileIndex[file:/opt/vlab/gyan42/pyspark-learnin

In [10]:
# res.show() # Error : An error occurred while calling o76.showString.
# : org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 14 (showString at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times

**Saltify method 1**

In [11]:
def saltify(df, col_name, number_of_partition):
    """
    Adds a new column names `col_name`_salted, which has concatenated values of `col_name` and number in the range of 0 to number_of_partition
    new value = old_value-partition_id
    
    i.e in our use case, the product 0 is now split into 1024 partitions, the down size the product ids with less data will also try to get paritioned into 1024 partitions
    Note: import pyspark.sql.functions as F
    """
    salted_col = col_name + "_salted"  
    return df.withColumn("dummy", F.monotonically_increasing_id() % number_of_partition)\
            .withColumn(salted_col, F.concat(F.col(col_name), F.lit("-"),F.col("dummy")))\
            .drop(F.col("dummy")).repartition(number_of_partition, F.col(salted_col)) 

In [12]:
products_df = saltify(df=products_df, col_name="product_id", number_of_partition=1024)

In [13]:
products_df.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)
 |-- product_id_salted: string (nullable = true)



In [14]:
sales_df = saltify(df=sales_df, col_name="product_id", number_of_partition=1024)
sales_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- num_pieces_sold: string (nullable = true)
 |-- bill_raw_text: string (nullable = true)
 |-- product_id_salted: string (nullable = true)



In [15]:
# sales_df.groupBy("product_id_salted")\
#     .agg(F.count("*").alias("cnt"))\
#     .orderBy(F.col("cnt").desc())\
#     .limit(1).show()

In [16]:
%time
products_df.join(sales_df, on="product_id_salted", how="inner").agg(F.avg(products_df["price"] * sales_df["num_pieces_sold"])).show()

CPU times: user 3 µs, sys: 0 ns, total: 3 µs
Wall time: 5.48 µs
+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|             1242.751587464154|
+------------------------------+



**Saltify Method 2: Salt only the skewed product id**

In [18]:
# Step 1 - Check and select the skewed keys 
# In this case we are retrieving the top 100 keys: these will be the only salted keys.
results = sales_df.groupby(sales_df["product_id"]).count().sort(F.col("count").desc()).limit(100).collect()

In [19]:
# Step 2 - What we want to do is:
#  a. Duplicate the entries that we have in the dimension table for the most common products, e.g.
#       product_0 will become: product_0-1, product_0-2, product_0-3 and so on
#  b. On the sales table, we are going to replace "product_0" with a random duplicate (e.g. some of them 
#     will be replaced with product_0-1, others with product_0-2, etc.)
# Using the new "salted" key will unskew the join

# Let's create a dataset to do the trick
REPLICATION_FACTOR = 101
l = []
replicated_products = []
for _r in results:
    replicated_products.append(_r["product_id"])
    for _rep in range(0, REPLICATION_FACTOR):
        l.append((_r["product_id"], _rep))
        
rdd = spark.sparkContext.parallelize(l)
replicated_df = rdd.map(lambda x: Row(product_id=x[0], replication=int(x[1])))
replicated_df = spark.createDataFrame(replicated_df)
replicated_df.show()

+----------+-----------+
|product_id|replication|
+----------+-----------+
|         0|          0|
|         0|          1|
|         0|          2|
|         0|          3|
|         0|          4|
|         0|          5|
|         0|          6|
|         0|          7|
|         0|          8|
|         0|          9|
|         0|         10|
|         0|         11|
|         0|         12|
|         0|         13|
|         0|         14|
|         0|         15|
|         0|         16|
|         0|         17|
|         0|         18|
|         0|         19|
+----------+-----------+
only showing top 20 rows



In [15]:
#   Step 3: Generate the salted key
products_df = products_df.join(F.broadcast(replicated_df), products_df["product_id"] == replicated_df["product_id"], "left"). \
    withColumn("salted_join_key", F.when(replicated_df["replication"].isNull(), products_df["product_id"]).otherwise(
    F.concat(replicated_df["product_id"], F.lit("-"), replicated_df["replication"])))

products_df.show()

In [17]:
sales_df = sales_df.withColumn("salted_join_key", F.when(sales_df["product_id"].isin(replicated_products),
                                                             F.concat(sales_df["product_id"], F.lit("-"),
                                                                    F.lit(round(rand() * (REPLICATION_FACTOR - 1), 0)).cast(
                                                                        IntegerType()))).otherwise(sales_df["product_id"]))
sales_df.show()


In [18]:
#   Step 4: Finally let's do the join
print(sales_df.join(products_df, sales_df["salted_join_key"] == products_df["salted_join_key"],
                       "inner").
      agg(F.avg(products_df["price"] * sales_df["num_pieces_sold"])).show())

print("Ok")

+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+

None
Ok


**6. For each seller, what is the average % contribution of an order to the seller's daily quota?**

Example   
If Seller_0 with `quota=250` has 3 orders:Order 1: 10 products sold    
Order 2: 8 products sold    
Order 3: 7 products soldThe average % contribution of orders to the seller's quota would be:    
Order 1: 10/105 = 0.04   
Order 2: 8/105 = 0.032     
Order 3: 7/105 = 0.028Average % Contribution = (0.04+0.032+0.028)/3 = 0.03333    

In [121]:
reload()

In [122]:
sellers_df.show()

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      257237|
|        2|   seller_2|      754188|
|        3|   seller_3|      310462|
|        4|   seller_4|     1532808|
|        5|   seller_5|     1199693|
|        6|   seller_6|     1055915|
|        7|   seller_7|     1946998|
|        8|   seller_8|      547320|
|        9|   seller_9|     1318051|
+---------+-----------+------------+



In [123]:
sales_df.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|       9|         0|        0|2020-07-10|             25|jnykelwjjebgkwgmu...|
|      10|         0|        0|2020-07-0

In [134]:
sales_df.join(F.broadcast(sellers_df), on="seller_id", how="inner")\
.withColumn("ratio", sales_df["num_pieces_sold"]/sellers_df["daily_target"])\
.groupBy("seller_id").agg(F.avg("ratio")).alias("percent").show()

+---------+--------------------+
|seller_id|          avg(ratio)|
+---------+--------------------+
|        7|2.595228787788171E-5|
|        3|1.628885370565939...|
|        8| 9.21303037540886E-5|
|        0|2.019885898946922...|
|        5|4.211073965904021E-5|
|        6|4.782147194369122E-5|
|        9|3.837913136180238E-5|
|        1|1.964233366461015E-4|
|        4|3.296428039825816E-5|
|        2|6.690408001060484E-5|
+---------+--------------------+



**7.Who are the second most selling and the least selling persons (sellers) for each product?**

**Who are those for product with `product_id = 0`**

- **If a product has been sold by only one seller**, we’ll put it into a special category (category: Only seller or multiple sellers with the same quantity).
- **If a product has been sold by more than one seller**, but all of them sold the same quantity, we are going to put them in the same category as if they were only a single seller for that product (category: Only seller or multiple sellers with the same quantity).
- **If the “least selling” is also the “second selling”**, we will count it only as “second seller”

In [22]:
from pyspark.sql.window import Window

In [23]:
reload()

In [41]:
sales_df.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|       9|         0|        0|2020-07-10|             25|jnykelwjjebgkwgmu...|
|      10|         0|        0|2020-07-0

We get the sum of sales for each product and seller pairs.

In [24]:
# Calcuate the number of pieces sold by each seller for each product
sales_df_grpd = sales_df.groupby(F.col("product_id"), F.col("seller_id")). \
    agg(F.sum("num_pieces_sold").alias("num_pieces_sold"))
sales_df_grpd.explain()
sales_df_grpd.show()

== Physical Plan ==
*(2) HashAggregate(keys=[product_id#282, seller_id#283], functions=[sum(cast(num_pieces_sold#285 as double))])
+- Exchange hashpartitioning(product_id#282, seller_id#283, 200), true, [id=#215]
   +- *(1) HashAggregate(keys=[product_id#282, seller_id#283], functions=[partial_sum(cast(num_pieces_sold#285 as double))])
      +- *(1) ColumnarToRow
         +- FileScan parquet [product_id#282,seller_id#283,num_pieces_sold#285] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/opt/vlab/gyan42/pyspark-learning-ground/dataskew-experiments/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<product_id:string,seller_id:string,num_pieces_sold:string>


+----------+---------+---------------+
|product_id|seller_id|num_pieces_sold|
+----------+---------+---------------+
|  46681458|        9|           33.0|
|  64958710|        5|           93.0|
|  20420388|        2|           69.0|
|   4584308|        4|           37.0|


We add two new ranking columns: one that ranks the products’ sales in descending order and another one that ranks in ascending order.

In [25]:
# Create the window functions, one will sort ascending the other one descending. Partition by the product_id
# and sort by the pieces sold
window_desc = Window.partitionBy(F.col("product_id")).orderBy(F.col("num_pieces_sold").desc())
window_asc = Window.partitionBy(F.col("product_id")).orderBy(F.col("num_pieces_sold").asc())

In [26]:
# Create a Dense Rank (to avoid holes)
sales_df_grpd = sales_df_grpd.withColumn("rank_asc", F.dense_rank().over(window_asc)). \
    withColumn("rank_desc", F.dense_rank().over(window_desc))
sales_df_grpd.explain()
sales_df_grpd.show()

== Physical Plan ==
Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_desc#334], [product_id#282], [num_pieces_sold#300 DESC NULLS LAST]
+- *(4) Sort [product_id#282 ASC NULLS FIRST, num_pieces_sold#300 DESC NULLS LAST], false, 0
   +- Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_asc#321], [product_id#282], [num_pieces_sold#300 ASC NULLS FIRST]
      +- *(3) Sort [product_id#282 ASC NULLS FIRST, num_pieces_sold#300 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(product_id#282, 200), true, [id=#293]
            +- *(2) HashAggregate(keys=[product_id#282, seller_id#283], functions=[sum(cast(num_pieces_sold#285 as double))])
               +- Exchange hashpartitioning(product_id#282, sel

We split the dataset obtained in three pieces: one for each case that we want to handle (second top selling, least selling, single selling).

In [28]:
# Get products that only have one row OR the products in which multiple sellers sold the same amount
# (i.e. all the employees that ever sold the product, sold the same exact amount)
single_seller = sales_df_grpd.where(F.col("rank_asc") == F.col("rank_desc")).select(
    F.col("product_id").alias("single_seller_product_id"), F.col("seller_id").alias("single_seller_seller_id"),
    F.lit("Only seller or multiple sellers with the same results").alias("type"))
single_seller.explain()
single_seller.show()

== Physical Plan ==
*(5) Project [product_id#282 AS single_seller_product_id#367, seller_id#283 AS single_seller_seller_id#368, Only seller or multiple sellers with the same results AS type#369]
+- *(5) Filter ((isnotnull(rank_asc#321) AND isnotnull(rank_desc#334)) AND (rank_asc#321 = rank_desc#334))
   +- Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_desc#334], [product_id#282], [num_pieces_sold#300 DESC NULLS LAST]
      +- *(4) Sort [product_id#282 ASC NULLS FIRST, num_pieces_sold#300 DESC NULLS LAST], false, 0
         +- Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_asc#321], [product_id#282], [num_pieces_sold#300 ASC NULLS FIRST]
            +- *(3) Sort [product_id#282 ASC NULLS FIRST, num_pi

In [31]:
# Get the second top sellers
second_seller = sales_df_grpd.where(F.col("rank_desc") == 2).select(
    F.col("product_id").alias("second_seller_product_id"), F.col("seller_id").alias("second_seller_seller_id"),
    F.lit("Second top seller").alias("type")
)
second_seller.explain()
second_seller.show()

== Physical Plan ==
*(4) Project [product_id#282 AS second_seller_product_id#386, seller_id#283 AS second_seller_seller_id#387, Second top seller AS type#388]
+- *(4) Filter (isnotnull(rank_desc#334) AND (rank_desc#334 = 2))
   +- Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_desc#334], [product_id#282], [num_pieces_sold#300 DESC NULLS LAST]
      +- *(3) Sort [product_id#282 ASC NULLS FIRST, num_pieces_sold#300 DESC NULLS LAST], false, 0
         +- Exchange hashpartitioning(product_id#282, 200), true, [id=#561]
            +- *(2) HashAggregate(keys=[product_id#282, seller_id#283], functions=[sum(cast(num_pieces_sold#285 as double))])
               +- Exchange hashpartitioning(product_id#282, seller_id#283, 200), true, [id=#557]
                  +- *(1) HashAggregate(keys=[product_id#282, seller_id#283], functions=[partial_sum(cast(num_pi

When calculating the “least selling”, we exclude those products that have a single seller and those where the least selling employee is also the second most selling

In [36]:
# Get the least sellers and exclude those rows that are already included in the first piece
# We also exclude the "second top sellers" that are also "least sellers"
least_seller = sales_df_grpd.where(F.col("rank_asc") == 1).select(
    F.col("product_id"), F.col("seller_id"),
    F.lit("Least Seller").alias("type")
).join(single_seller, (sales_df_grpd["seller_id"] == single_seller["single_seller_seller_id"]) & (
        sales_df_grpd["product_id"] == single_seller["single_seller_product_id"]), "left_anti"). \
    join(second_seller, (sales_df_grpd["seller_id"] == second_seller["second_seller_seller_id"]) & (
        sales_df_grpd["product_id"] == second_seller["second_seller_product_id"]), "left_anti")

least_seller.explain()
least_seller.show()

== Physical Plan ==
*(17) Project [product_id#282, seller_id#283, Least Seller AS type#457]
+- SortMergeJoin [seller_id#283, product_id#282], [second_seller_seller_id#387, second_seller_product_id#386], LeftAnti
   :- SortMergeJoin [seller_id#283, product_id#282], [single_seller_seller_id#368, single_seller_product_id#367], LeftAnti
   :  :- *(5) Sort [seller_id#283 ASC NULLS FIRST, product_id#282 ASC NULLS FIRST], false, 0
   :  :  +- Exchange hashpartitioning(seller_id#283, product_id#282, 200), true, [id=#823]
   :  :     +- *(4) Project [product_id#282, seller_id#283]
   :  :        +- *(4) Filter (isnotnull(rank_asc#321) AND (rank_asc#321 = 1))
   :  :           +- Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_asc#321], [product_id#282], [num_pieces_sold#300 ASC NULLS FIRST]
   :  :              +- *(3) Sort [product_id#282 ASC NULLS FIR

We merge the pieces back together.

In [39]:

# Union all the pieces
union_table = least_seller.select(
    F.col("product_id"),
    F.col("seller_id"),
    F.col("type")
).union(second_seller.select(
    F.col("second_seller_product_id").alias("product_id"),
    F.col("second_seller_seller_id").alias("seller_id"),
    F.col("type")
)).union(single_seller.select(
    F.col("single_seller_product_id").alias("product_id"),
    F.col("single_seller_seller_id").alias("seller_id"),
    F.col("type")
))
union_table.explain()
union_table.show()

== Physical Plan ==
Union
:- *(17) Project [product_id#282, seller_id#283, Least Seller AS type#457]
:  +- SortMergeJoin [seller_id#283, product_id#282], [second_seller_seller_id#387, second_seller_product_id#386], LeftAnti
:     :- SortMergeJoin [seller_id#283, product_id#282], [single_seller_seller_id#368, single_seller_product_id#367], LeftAnti
:     :  :- *(5) Sort [seller_id#283 ASC NULLS FIRST, product_id#282 ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(seller_id#283, product_id#282, 200), true, [id=#1569]
:     :  :     +- *(4) Project [product_id#282, seller_id#283]
:     :  :        +- *(4) Filter (isnotnull(rank_asc#321) AND (rank_asc#321 = 1))
:     :  :           +- Window [dense_rank(num_pieces_sold#300) windowspecdefinition(product_id#282, num_pieces_sold#300 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank_asc#321], [product_id#282], [num_pieces_sold#300 ASC NULLS FIRST]
:     :  :              +- *(3) 

In [40]:
# Which are the second top seller and least seller of product 0?
union_table.where(F.col("product_id") == 0).show()

NameError: name 'col' is not defined

**8.**
Create a new column called "hashed_bill" defined as follows:

- if the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, once for each 'A' (capital 'A') present in the text. E.g. if the bill text is 'nbAAnllA', you would apply hashing three times iteratively (only if the order number is even)

- if the order_id is odd: apply SHA256 hashing to the bill textFinally, check if there are any duplicate on the new column

In [8]:
reload()
sales_df.show()
sales_df.printSchema()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|       9|         0|        0|2020-07-10|             25|jnykelwjjebgkwgmu...|
|      10|         0|        0|2020-07-0

In [9]:
import hashlib 
def get_hash(order_id, text):
    res = text
    if int(order_id) % 2 == 0:
        res = hashlib.md5(bytes(res, 'utf-8')).hexdigest()
        for c in text:
            if c == 'A':
                res = hashlib.md5(bytes(res, 'utf-8')).hexdigest()
    else:
        res = hashlib.sha256(bytes(res, 'utf-8')).hexdigest()
    return res
        
get_hash_udf = F.udf(get_hash, StringType())   

In [185]:
get_hash("24", "jfyuoyfkAAeyqkckwbu")

'67d25a783609ce62d5456dc297c05dfd'

In [10]:
# sales_df.withColumn("hashed_bill", get_hash_udf("order_id", "bill_raw_text")).show()
hashed_df = sales_df.withColumn("hashed_bill", get_hash_udf(F.col("order_id"), F.col("bill_raw_text")))
hashed_df.explain()

== Physical Plan ==
*(2) Project [order_id#114, product_id#115, seller_id#116, date#117, num_pieces_sold#118, bill_raw_text#119, pythonUDF0#160 AS hashed_bill#152]
+- BatchEvalPython [get_hash(order_id#114, bill_raw_text#119)], [pythonUDF0#160]
   +- *(1) ColumnarToRow
      +- FileScan parquet [order_id#114,product_id#115,seller_id#116,date#117,num_pieces_sold#118,bill_raw_text#119] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[file:/opt/vlab/gyan42/pyspark-learning-ground/dataskew-experiments/sales_parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<order_id:string,product_id:string,seller_id:string,date:string,num_pieces_sold:string,bill...




In [179]:
hashed_df.show()

+--------+----------+---------+----------+---------------+--------------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|         hashed_bill|
+--------+----------+---------+----------+---------------+--------------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|f6fa2a8be04a4ead6...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|416376a64cd652e7b...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|787d361b162a6aa1a...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|4540f452a7c4

In [180]:
hashed_df.groupby(F.col("hashed_bill")).agg(F.count("*").alias("cnt")).where(F.col("cnt") > 1).show()

+-----------+---+
|hashed_bill|cnt|
+-----------+---+
+-----------+---+



In [181]:
hashed_df.select("hashed_bill").count()

20000040

In [183]:
hashed_df.agg(F.countDistinct(F.col("hashed_bill"))).show()

+---------------------------+
|count(DISTINCT hashed_bill)|
+---------------------------+
|                   20000040|
+---------------------------+

