In this homework we are going to use Jupyter/Databricks Notebooks. Please complete the tasks within one notebook. For each query (SELECT) please prepare 2 different solutions: using dataframe manipulation and Spark SQL.

**TASK 1**

In [84]:
from pyspark.sql import SparkSession
import datetime
import os
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType
from pyspark.sql.functions import to_date
from pyspark.sql.functions import countDistinct, count,  desc
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

In [85]:
spark = SparkSession.builder.appName("Spark: Dataframes_HW").config("spark.app.name", "Spark: Dataframes_HW").getOrCreate()
sc = spark.sparkContext

In [86]:
current_directory = os.getcwd()
relative_path = os.path.join(current_directory, 'RDD_HW')
file_path = relative_path +'/sellers.csv'
sellers_rdd = sc.textFile(file_path).map(lambda x: x.split(","))
sellers_rdd_header = sellers_rdd.first()
sellers_rdd = sellers_rdd.filter(lambda x: x != sellers_rdd_header)
sellers_rdd = sellers_rdd.map(lambda x: (int(x[0]), x[1], int(x[2])))
schema = StructType([
    StructField("seller_id", IntegerType(), True),
    StructField("seller_name", StringType(), True),
    StructField("daily_target", IntegerType(), True)
])
sellers_df = spark.createDataFrame(sellers_rdd, schema=schema)
display(sellers_df)
sellers_df.show()

DataFrame[seller_id: int, seller_name: string, daily_target: int]

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|      100000|
|        1|   seller_1|       83478|
|        2|   seller_2|       94114|
|        3|   seller_3|       50299|
|        4|   seller_4|       72654|
|        5|   seller_5|       28862|
|        6|   seller_6|       61878|
|        7|   seller_7|       72047|
|        8|   seller_8|       54715|
|        9|   seller_9|       82824|
+---------+-----------+------------+



In [87]:
file_path = relative_path +'/products.csv'
products_rdd = sc.textFile(file_path).map(lambda x: x.split(","))
products_rdd_header = products_rdd.first()
products_rdd = products_rdd.filter(lambda x: x != products_rdd_header)
products_rdd = products_rdd.map(lambda x: (int(x[0]), x[1], int(x[2])))
schema = StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("price", IntegerType(), True)
])
products_df = spark.createDataFrame(products_rdd, schema=schema)
display(products_df)
products_df.show()

DataFrame[product_id: int, product_name: string, price: int]

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   35|
|         2|   product_2|  146|
|         3|   product_3|   17|
|         4|   product_4|   66|
|         5|   product_5|   31|
|         6|   product_6|  127|
|         7|   product_7|  116|
|         8|   product_8|  121|
|         9|   product_9|   98|
|        10|  product_10|   54|
|        11|  product_11|   25|
|        12|  product_12|  125|
|        13|  product_13|    8|
|        14|  product_14|  100|
|        15|  product_15|  111|
|        16|  product_16|    1|
|        17|  product_17|  115|
|        18|  product_18|   69|
|        19|  product_19|   59|
+----------+------------+-----+
only showing top 20 rows



In [88]:

file_path = relative_path +'/sales.csv'
sales_rdd = sc.textFile(file_path).map(lambda x: x.split(","))
sales_rdd_header = sales_rdd.first()
sales_rdd = sales_rdd.filter(lambda x: x != sales_rdd_header)
sales_rdd = sales_rdd.map(lambda x: (
    int(x[0]),  
    int(x[1]), 
    int(x[2]),  
    x[3],   
    int(x[4]),  
    x[5]  
))
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("seller_id", IntegerType(), True),
    StructField("date", StringType(), True),
    StructField("num_pieces_sold", IntegerType(), True),
    StructField("bill_raw_text", StringType(), True)
])
sales_df = spark.createDataFrame(sales_rdd, schema=schema)
sales_df = sales_df.withColumn("date", to_date(sales_df["date"], 'yyyy-mm-dd'))
display(sales_df)
sales_df.show()


DataFrame[order_id: int, product_id: int, seller_id: int, date: date, num_pieces_sold: int, bill_raw_text: string]

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-01-07|              5|wmravyotewquljbnw...|
|       2|         0|        0|2020-01-06|             14|dctggbxvuwschfvte...|
|       3|         0|        0|2020-01-05|             10|zrgfjuaqwsyliyrfm...|
|       4|         0|        0|2020-01-08|             71|isnczxkbfrodexqnT...|
|       5|         0|        0|2020-01-08|             14|wehgahgpljcinzjhg...|
|       6|         0|        0|2020-01-09|             80|yfezqwnlsmprxkeuu...|
|       7|         0|        0|2020-01-05|             61|gglhmrggmstqbcxmg...|
|       8|         0|        0|2020-01-02|             78|yntlmywelrlhanqmk...|
|       9|         0|        0|2020-01-05|             99|hhedtiuedwjosrigz...|
|      10|         0|        0|2020-01-1

1.	Please create TempViews upon dataframes from previous lesson: products_df, sales_df, sellers_df

In [89]:
products_df.createOrReplaceTempView("products")
sales_df.createOrReplaceTempView("sales")
sellers_df.createOrReplaceTempView("sellers")

2.	Select number of products sold at least once. Use the TempViews created above

In [90]:
num_products_sold = spark.sql("SELECT COUNT(DISTINCT product_id) as `Number of products sold at least once` FROM sales")
num_products_sold.show()

+-------------------------------------+
|Number of products sold at least once|
+-------------------------------------+
|                                86485|
+-------------------------------------+



3.	Select number of distinct sellers who sold any of the products

In [91]:
num_distinct_sellers = sales_df.select(countDistinct("seller_id").alias("Number of distinct sellers who sold any of the products"))
num_distinct_sellers.show()

+-------------------------------------------------------+
|Number of distinct sellers who sold any of the products|
+-------------------------------------------------------+
|                                                     10|
+-------------------------------------------------------+



4.	Select the most popular product (by number of orders) in the sales table.

In [92]:
most_popular_product = sales_df.groupBy("product_id").agg(count("*").alias("Number of orders")) \
    .orderBy(desc("Number of orders")).limit(1)
print("Most popular product by number of orders:")
most_popular_product.show()


Most popular product by number of orders:
+----------+----------------+
|product_id|Number of orders|
+----------+----------------+
|         0|         3800000|
+----------+----------------+



5.	Select the number of distinct products has been sold in each date

In [93]:
distinct_products_sold_by_date = sales_df.groupBy("date").agg(countDistinct("product_id").alias("Number of distinct products")) \
    .orderBy("date")
print("Number of distinct products sold in each date:")
distinct_products_sold_by_date.show()

Number of distinct products sold in each date:
+----------+---------------------------+
|      date|Number of distinct products|
+----------+---------------------------+
|2020-01-01|                      18210|
|2020-01-02|                      18305|
|2020-01-03|                      17952|
|2020-01-04|                      18167|
|2020-01-05|                      18386|
|2020-01-06|                      17984|
|2020-01-07|                      18285|
|2020-01-08|                      17892|
|2020-01-09|                      18053|
|2020-01-10|                      18330|
+----------+---------------------------+



6.	Select all sales made by seller with seller_id=7

In [94]:
sales_by_seller_7 = sales_df.filter(sales_df.seller_id == 7)
print("All sales made by seller with seller_id=7:")
sales_by_seller_7.show()

All sales made by seller with seller_id=7:
+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|   95016|     10292|        7|2020-01-07|             86|ddjxhepqkpwionxlw...|
|   95023|     13642|        7|2020-01-03|              5|hqlmpvnyluuvdxNyl...|
|   95042|     54476|        7|2020-01-10|             75|frecsxijhiwsujzuu...|
|   95065|     86757|        7|2020-01-04|             95|dxrsOgtaxcxcpulcc...|
|   95067|      9749|        7|2020-01-06|             83|unhtoxghdrgvvtemo...|
|   95070|     88175|        7|2020-01-09|             32|azfydzqrzsftwhzqy...|
|   95075|     36251|        7|2020-01-09|             56|rrezcdybsktrdfuvr...|
|   95089|     73423|        7|2020-01-10|             15|noqsnpzdtiobzqqlu...|
|   95092|     57982|        7|2020-01-06|             94|tkaviwhjZlzrtviqq..

7.	Remove the bill_raw_text column from sales dataframe.

In [95]:
sales_df = sales_df.drop("bill_raw_text")
sales_df.show()

+--------+----------+---------+----------+---------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|
+--------+----------+---------+----------+---------------+
|       1|         0|        0|2020-01-07|              5|
|       2|         0|        0|2020-01-06|             14|
|       3|         0|        0|2020-01-05|             10|
|       4|         0|        0|2020-01-08|             71|
|       5|         0|        0|2020-01-08|             14|
|       6|         0|        0|2020-01-09|             80|
|       7|         0|        0|2020-01-05|             61|
|       8|         0|        0|2020-01-02|             78|
|       9|         0|        0|2020-01-05|             99|
|      10|         0|        0|2020-01-10|             14|
|      11|         0|        0|2020-01-09|             95|
|      12|         0|        0|2020-01-02|             33|
|      13|         0|        0|2020-01-10|             39|
|      14|         0|        0|2020-01-04|             8

8.	Select 10 biggest (num_pieces_sold) orders.

In [96]:
# as we do not have any additional criteria for filtering I desided to use dense rank
window_spec = Window.orderBy(sales_df.num_pieces_sold.desc())
sales_df = sales_df.withColumn("rank", dense_rank().over(window_spec))
top_10_biggest_orders = sales_df.filter(sales_df.rank <= 10)
print("Top 10 biggest (num_pieces_sold) orders:")
top_10_biggest_orders.show()

Top 10 biggest (num_pieces_sold) orders:
+--------+----------+---------+----------+---------------+----+
|order_id|product_id|seller_id|      date|num_pieces_sold|rank|
+--------+----------+---------+----------+---------------+----+
|      31|         0|        0|2020-01-07|            100|   1|
|      89|         0|        0|2020-01-08|            100|   1|
|     106|         0|        0|2020-01-04|            100|   1|
|     115|         0|        0|2020-01-02|            100|   1|
|     346|         0|        0|2020-01-08|            100|   1|
|     378|         0|        0|2020-01-06|            100|   1|
|     590|         0|        0|2020-01-07|            100|   1|
|     593|         0|        0|2020-01-06|            100|   1|
|     595|         0|        0|2020-01-07|            100|   1|
|     640|         0|        0|2020-01-03|            100|   1|
|     758|         0|        0|2020-01-05|            100|   1|
|    1119|         0|        0|2020-01-09|            100|   1|