# Apache Spark SQL Using Apache Hadoop HDFS
Click [here (Medium)](https://towardsdatascience.com/six-spark-exercises-to-rule-them-all-242445b24565) for source.

## Imports

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import Row, Window
from pyspark.sql.types import IntegerType
import hashlib
# import pyspark-stubs

## Datasets

In [2]:
#   Initialize the Spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "500mb") \
    .appName("Exercise1") \
    .getOrCreate()

In [3]:
#   Read the source tables in Parquet format
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

In [4]:
products_table.head(5)

[Row(product_id='0', product_name='product_0', price='22'),
 Row(product_id='1', product_name='product_1', price='30'),
 Row(product_id='2', product_name='product_2', price='91'),
 Row(product_id='3', product_name='product_3', price='37'),
 Row(product_id='4', product_name='product_4', price='145')]

In [5]:
products_table.columns

['product_id', 'product_name', 'price']

In [6]:
sales_table.head()

Row(order_id='1', product_id='0', seller_id='0', date='2020-07-10', num_pieces_sold='26', bill_raw_text='kyeibuumwlyhuwksxodcawelubbyznxvpfxyxzhftudwtiemdhqaqoDeodltimrusmwtfdikxoqpuFliyvacNgrpigspylwaxvbdwiqurqkqidkujwziucglsvzxjbswtkeogxyncoweuvczualykpszcbrmwwyjkgbvpfyijshnkbjdrxppwmclymvofjslldhwnwajnnyvzktbjixlsodivgmfsvwjfqjuwwrqelkkzupmlggzeqmcskkvjggnajgimzkdtzgvhuzlgubkigdctumjhdwnzzlnrlyuhlhmlgfxuhyxlzraqhqgfsgvtigkfuuuawoxherqhovyyyglsxwoqkwntuaesstxvtwhzvzxincwpkhigcwslpokwplgrotrnttohkzzumqcejkvgwkvexpnyjrwzbfauervihrvezyfndmekllabllewmqrgbfwimygcwlbeblcfqitaxnoaudfgaodqbmsgavetmrtlnyy')

In [7]:
sales_table.columns

['order_id',
 'product_id',
 'seller_id',
 'date',
 'num_pieces_sold',
 'bill_raw_text']

In [8]:
sellers_table.head(5)

[Row(seller_id='0', seller_name='seller_0', daily_target='2500000'),
 Row(seller_id='1', seller_name='seller_1', daily_target='257237'),
 Row(seller_id='2', seller_name='seller_2', daily_target='754188'),
 Row(seller_id='3', seller_name='seller_3', daily_target='310462'),
 Row(seller_id='4', seller_name='seller_4', daily_target='1532808')]

In [9]:
sellers_table.columns

['seller_id', 'seller_name', 'daily_target']

## Warm-Up #1
Find out how many orders, how many products and how many sellers are in the data.
How many products have been sold at least once? Which is the product contained in more orders?

In [10]:
#   Print the number of orders
print("Number of Orders: {}".format(sales_table.count()))

Number of Orders: 20000040


In [11]:
#   Print the number of sellers
print("Number of sellers: {}".format(sellers_table.count()))

Number of sellers: 10


In [12]:
#   Print the number of products
print("Number of products: {}".format(products_table.count()))

Number of products: 75000000


In [13]:
#   Output how many products have been actually sold at least once
print("Number of products sold at least once")
sales_table.agg(countDistinct(col("product_id"))).show()

Number of products sold at least once
+--------------------------+
|count(DISTINCT product_id)|
+--------------------------+
|                    993429|
+--------------------------+



In [14]:
#   Output which is the product that has been sold in more orders
print("Product present in more orders")
sales_table.groupBy(col("product_id")).agg(
    count("*").alias("cnt")).orderBy(col("cnt").desc()).limit(1).show()

Product present in more orders
+----------+--------+
|product_id|     cnt|
+----------+--------+
|         0|19000000|
+----------+--------+



## Warm-Up #2
How many distinct products have been sold in each day?

In [15]:
sales_table.groupby(col("date")).agg(countDistinct(col("product_id")).alias("distinct_products_sold")).orderBy(
    col("distinct_products_sold").desc()).show()

+----------+----------------------+
|      date|distinct_products_sold|
+----------+----------------------+
|2020-07-06|                100765|
|2020-07-09|                100501|
|2020-07-01|                100337|
|2020-07-03|                100017|
|2020-07-02|                 99807|
|2020-07-05|                 99796|
|2020-07-04|                 99791|
|2020-07-07|                 99756|
|2020-07-08|                 99662|
|2020-07-10|                 98973|
+----------+----------------------+



## Exercise #1
What is the average revenue of the orders?

In [16]:
# Create the Spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "500mb") \
    .appName("Exercise1") \
    .getOrCreate()

In [17]:
#   Read the source tables in Parquet format
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

#### Wrong way to implement - do not run the cell (skewed)
print(sales_table.join(products_table, sales_table["product_id"] == products_table["product_id"], "inner").
      agg(avg(products_table["price"] * sales_table["num_pieces_sold"])).show())

In [18]:
# Step 1 - Check and select the skewed keys 
results = sales_table.groupby(sales_table["product_id"]).count().sort(col("count").desc()).limit(100).collect()

In [19]:
# Step 2 - What we want to do is:
#  a. Duplicate the entries that we have in the dimension table for the most common products, e.g.
#       product_0 will become: product_0-1, product_0-2, product_0-3 and so on
#  b. On the sales table, we are going to replace "product_0" with a random duplicate (e.g. some of them 
#     will be replaced with product_0-1, others with product_0-2, etc.)
# Using the new "salted" key will unskew the join

# Let's create a dataset to do the trick
REPLICATION_FACTOR = 101
l = []
replicated_products = []
for _r in results:
    replicated_products.append(_r["product_id"])
    for _rep in range(0, REPLICATION_FACTOR):
        l.append((_r["product_id"], _rep))
rdd = spark.sparkContext.parallelize(l)
replicated_df = rdd.map(lambda x: Row(product_id=x[0], replication=int(x[1])))
replicated_df = spark.createDataFrame(replicated_df)

In [20]:
print("Products Table Schema:")
products_table.printSchema()

print("Replicated DataFrame Schema:")
replicated_df.printSchema()


Products Table Schema:
root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- price: string (nullable = true)

Replicated DataFrame Schema:
root
 |-- product_id: string (nullable = true)
 |-- replication: long (nullable = true)



In [21]:
#   Step 3: Generate the salted key
products_table = products_table.join(broadcast(replicated_df),
                                     products_table["product_id"] == replicated_df["product_id"], "left"). \
    withColumn("salted_join_key", when(replicated_df["replication"].isNull(), products_table["product_id"]).otherwise(
    concat(replicated_df["product_id"], lit("-"), replicated_df["replication"])))

sales_table = sales_table.withColumn("salted_join_key", when(sales_table["product_id"].isin(replicated_products),
                                                             concat(sales_table["product_id"], lit("-"),
                                                                    round(rand() * (REPLICATION_FACTOR - 1), 0).cast(
                                                                        IntegerType()))).otherwise(
    sales_table["product_id"]))

In [22]:
#   Step 4: Finally let's do the join
print(sales_table.join(products_table, sales_table["salted_join_key"] == products_table["salted_join_key"],
                       "inner").
      agg(avg(products_table["price"] * sales_table["num_pieces_sold"])).show())

print("Ok")

+------------------------------+
|avg((price * num_pieces_sold))|
+------------------------------+
|            1246.1338560822878|
+------------------------------+

None
Ok


## Exercise #2
For each seller, what is the average % contribution of an order to the seller's daily quota?

In [23]:
# Create the Spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "3g") \
    .appName("Exercise1") \
    .getOrCreate()

In [24]:
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

In [25]:
print(sales_table.join(broadcast(sellers_table), sales_table["seller_id"] == sellers_table["seller_id"], "inner").withColumn(
    "ratio", sales_table["num_pieces_sold"]/sellers_table["daily_target"]
).groupBy(sales_table["seller_id"]).agg(avg("ratio")).show())

+---------+--------------------+
|seller_id|          avg(ratio)|
+---------+--------------------+
|        0|2.019885898946922...|
|        7|2.595228787788170...|
|        3| 1.62888537056594E-4|
|        8|9.213030375408861E-5|
|        5|4.211073965904022E-5|
|        6|4.782147194369122E-5|
|        9|3.837913136180238E-5|
|        1|1.964233366461014...|
|        4|3.296428039825817E-5|
|        2|6.690408001060484E-5|
+---------+--------------------+

None


## Exercise #3
Who are the second most selling and the least selling persons (sellers) for each product? Who are those for product with `product_id = 0`

In [26]:
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "3g") \
    .appName("Exercise1") \
    .getOrCreate()


In [27]:
# Read the source tables
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

In [28]:
# Calcuate the number of pieces sold by each seller for each product
sales_table = sales_table.groupby(col("product_id"), col("seller_id")). \
    agg(sum("num_pieces_sold").alias("num_pieces_sold"))

In [29]:
# Create the window functions, one will sort ascending the other one descending. Partition by the product_id
# and sort by the pieces sold
window_desc = Window.partitionBy(col("product_id")).orderBy(col("num_pieces_sold").desc())
window_asc = Window.partitionBy(col("product_id")).orderBy(col("num_pieces_sold").asc())

In [30]:
# Create a Dense Rank (to avoid holes)
sales_table = sales_table.withColumn("rank_asc", dense_rank().over(window_asc)). \
    withColumn("rank_desc", dense_rank().over(window_desc))

In [31]:
# Get products that only have one row OR the products in which multiple sellers sold the same amount
# (i.e. all the employees that ever sold the product, sold the same exact amount)
single_seller = sales_table.where(col("rank_asc") == col("rank_desc")).select(
    col("product_id").alias("single_seller_product_id"), col("seller_id").alias("single_seller_seller_id"),
    lit("Only seller or multiple sellers with the same results").alias("type")
)

In [32]:
# Get the second top sellers
second_seller = sales_table.where(col("rank_desc") == 2).select(
    col("product_id").alias("second_seller_product_id"), col("seller_id").alias("second_seller_seller_id"),
    lit("Second top seller").alias("type")
)

In [33]:
# Get the least sellers and exclude those rows that are already included in the first piece
# We also exclude the "second top sellers" that are also "least sellers"
least_seller = sales_table.where(col("rank_asc") == 1).select(
    col("product_id"), col("seller_id"),
    lit("Least Seller").alias("type")
).join(single_seller, (sales_table["seller_id"] == single_seller["single_seller_seller_id"]) & (
        sales_table["product_id"] == single_seller["single_seller_product_id"]), "left_anti"). \
    join(second_seller, (sales_table["seller_id"] == second_seller["second_seller_seller_id"]) & (
        sales_table["product_id"] == second_seller["second_seller_product_id"]), "left_anti")


In [34]:
# Union all the pieces
union_table = least_seller.select(
    col("product_id"),
    col("seller_id"),
    col("type")
).union(second_seller.select(
    col("second_seller_product_id").alias("product_id"),
    col("second_seller_seller_id").alias("seller_id"),
    col("type")
)).union(single_seller.select(
    col("single_seller_product_id").alias("product_id"),
    col("single_seller_seller_id").alias("seller_id"),
    col("type")
))
union_table.show()

+----------+---------+------------+
|product_id|seller_id|        type|
+----------+---------+------------+
|  19986717|        1|Least Seller|
|  72017876|        1|Least Seller|
|   3534470|        3|Least Seller|
|  35669461|        4|Least Seller|
|  14542470|        5|Least Seller|
|  28592106|        5|Least Seller|
|  34681047|        5|Least Seller|
|  40496308|        5|Least Seller|
|  56011040|        5|Least Seller|
|  67723231|        5|Least Seller|
|  69790381|        5|Least Seller|
|  10978356|        7|Least Seller|
|  18182299|        7|Least Seller|
|  52606213|        7|Least Seller|
|  61475460|        7|Least Seller|
|  17944574|        8|Least Seller|
|  36269838|        8|Least Seller|
|  20774718|        9|Least Seller|
|  31136332|        9|Least Seller|
|  32602520|        9|Least Seller|
+----------+---------+------------+
only showing top 20 rows



In [35]:
# Which are the second top seller and least seller of product 0?
union_table.where(col("product_id") == 0).show()

+----------+---------+--------------------+
|product_id|seller_id|                type|
+----------+---------+--------------------+
|         0|        0|Only seller or mu...|
+----------+---------+--------------------+



## Exercise #4
Create a new column called "hashed_bill" defined as follows:
- if the order_id is even: apply MD5 hashing iteratively to the bill_raw_text field, once for each 'A' (capital 'A') present in the text. E.g. if the bill text is 'nbAAnllA', you would apply hashing three times iteratively (only if the order number is even)
- if the order_id is odd: apply SHA256 hashing to the bill text
Finally, check if there are any duplicate on the new column

In [36]:
#   Init spark session
spark = SparkSession.builder \
    .master("local") \
    .config("spark.sql.autoBroadcastJoinThreshold", -1) \
    .config("spark.executor.memory", "1g") \
    .appName("Exercise1") \
    .getOrCreate()

In [37]:
#   Load source data
products_table = spark.read.parquet("./data/products_parquet")
sales_table = spark.read.parquet("./data/sales_parquet")
sellers_table = spark.read.parquet("./data/sellers_parquet")

In [38]:
#   Define the UDF function
def algo(order_id, bill_text):
    #   If number is even
    ret = bill_text.encode("utf-8")
    if int(order_id) % 2 == 0:
        #   Count number of 'A'
        cnt_A = bill_text.count("A")
        for _c in range(0, cnt_A):
            ret = hashlib.md5(ret).hexdigest().encode("utf-8")
        ret = ret.decode('utf-8')
    else:
        ret = hashlib.sha256(ret).hexdigest()
    return ret

In [39]:
#   Register the UDF function.
algo_udf = spark.udf.register("algo", algo)

In [40]:
#   Use the `algo_udf` to apply the aglorithm and then check if there is any duplicate hash in the table
sales_table.withColumn("hashed_bill", algo_udf(col("order_id"), col("bill_raw_text")))\
    .groupby(col("hashed_bill")).agg(count("*").alias("cnt")).where(col("cnt") > 1).show()

+-----------+---+
|hashed_bill|cnt|
+-----------+---+
+-----------+---+

