In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
# Create spark session
spark = SparkSession.builder \
 .master("local") \
 .config("spark.sql.autoBroadcastJoinThreshold", -1) \
 .config("spark.executor.memory", "500mb") \
 .appName("Exercise1") \
 .getOrCreate()

22/04/24 13:20:36 WARN Utils: Your hostname, Zipcoders-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.77 instead (on interface en0)
22/04/24 13:20:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/04/24 13:20:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Warm-up #1

Find out how many orders, how many products and how many
sellers are in the data.

In [5]:
# Read parquet files into dataframes
sales_df = spark.read.parquet("/Users/allenc/PyCharmProjects/JupyterProjects/DatasetToCompleteTheSixSparkExercises/sales_parquet/*/")
products_df = spark.read.parquet("/Users/allenc/PyCharmProjects/JupyterProjects/DatasetToCompleteTheSixSparkExercises/products_parquet/*/")
sellers_df = spark.read.parquet("/Users/allenc/PyCharmProjects/JupyterProjects/DatasetToCompleteTheSixSparkExercises/sellers_parquet/*/")

                                                                                

In [4]:
sales_df.show()

[Stage 4:>                                                          (0 + 1) / 1]

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|       1|         0|        0|2020-07-10|             26|kyeibuumwlyhuwksx...|
|       2|         0|        0|2020-07-08|             13|jfyuoyfkeyqkckwbu...|
|       3|         0|        0|2020-07-05|             38|uyjihlzhzcswxcccx...|
|       4|         0|        0|2020-07-05|             56|umnxvoqbdzpbwjqmz...|
|       5|         0|        0|2020-07-05|             11|zmqexmaawmvdpqhih...|
|       6|         0|        0|2020-07-01|             82|lmuhhkpyuoyslwmvX...|
|       7|         0|        0|2020-07-04|             15|zoqweontumefxbgvu...|
|       8|         0|        0|2020-07-08|             79|sgldfgtcxufasnvsc...|
|       9|         0|        0|2020-07-10|             25|jnykelwjjebgkwgmu...|
|      10|         0|        0|2020-07-0

                                                                                

In [5]:
products_df.show()

+----------+------------+-----+
|product_id|product_name|price|
+----------+------------+-----+
|         0|   product_0|   22|
|         1|   product_1|   30|
|         2|   product_2|   91|
|         3|   product_3|   37|
|         4|   product_4|  145|
|         5|   product_5|  128|
|         6|   product_6|   66|
|         7|   product_7|  145|
|         8|   product_8|   51|
|         9|   product_9|   44|
|        10|  product_10|   53|
|        11|  product_11|   13|
|        12|  product_12|  104|
|        13|  product_13|  102|
|        14|  product_14|   24|
|        15|  product_15|   14|
|        16|  product_16|   38|
|        17|  product_17|   72|
|        18|  product_18|   16|
|        19|  product_19|   46|
+----------+------------+-----+
only showing top 20 rows



In [6]:
sellers_df.show()

+---------+-----------+------------+
|seller_id|seller_name|daily_target|
+---------+-----------+------------+
|        0|   seller_0|     2500000|
|        1|   seller_1|      257237|
|        2|   seller_2|      754188|
|        3|   seller_3|      310462|
|        4|   seller_4|     1532808|
|        5|   seller_5|     1199693|
|        6|   seller_6|     1055915|
|        7|   seller_7|     1946998|
|        8|   seller_8|      547320|
|        9|   seller_9|     1318051|
+---------+-----------+------------+



In [6]:
# Number of total sales (rows)
sales_df.count()

                                                                                

994971

In [8]:
# Number of total products (rows)
products_df.count()

75000000

In [9]:
# Number of total sellers (rows)
sellers_df.count()

10

How many products have been sold at least once? Which is the product contained in more orders?

In [10]:
# Use spark sql functions to count the distinct products in sales table
sales_df.agg(countDistinct(col("product_id"))).show()



+-----------------+
|count(product_id)|
+-----------------+
|           993429|
+-----------------+





In [11]:
# Group all product ids, count them, and order from largest count to smallest
sales_df.groupBy(col("product_id")).agg(
    count("*").alias("count")).orderBy(col("count").desc()).show(1)



+----------+--------+
|product_id|   count|
+----------+--------+
|         0|19000000|
+----------+--------+
only showing top 1 row



                                                                                

# Warm-up #2

How many distinct products have been sold in each day?

In [12]:
# Group by date, count all distinct products and order from earliest date to most recent date
sales_df.groupby(col("date")).agg(countDistinct(col("product_id"))).orderBy(
    col("date").asc()).show()



+----------+-----------------+
|      date|count(product_id)|
+----------+-----------------+
|2020-07-01|           100337|
|2020-07-02|            99807|
|2020-07-03|           100017|
|2020-07-04|            99791|
|2020-07-05|            99796|
|2020-07-06|           100765|
|2020-07-07|            99756|
|2020-07-08|            99662|
|2020-07-09|           100501|
|2020-07-10|            98973|
+----------+-----------------+





# Exercise #1

What is the average revenue of the orders?

In [7]:
# Join records from sales df and products df.
sales_products_merged = sales_df.join(products_df, 
                                      sales_df.product_id == products_df.product_id,
                                      "inner")

In [11]:
# Add a 'Total' column that calculates total revenue for each product sold (num_of_pieces sold * the price) 
Total=sales_products_merged.withColumn("Total",(col("num_pieces_sold")*col("price")))
Total.show()



+--------+----------+---------+----------+---------------+--------------------+----------+----------------+-----+-------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|product_id|    product_name|price|  Total|
+--------+----------+---------+----------+---------------+--------------------+----------+----------------+-----+-------+
|12478308|  10005243|        6|2020-07-04|             98|qfvpgiscflyjxphcq...|  10005243|product_10005243|   44| 4312.0|
| 8996776|  10023464|        9|2020-07-03|             59|jjbyqkzcimBfoehbv...|  10023464|product_10023464|   19| 1121.0|
|10476976|  10050363|        6|2020-07-03|             18|xqhlvkpxtzrfdadry...|  10050363|product_10050363|   98| 1764.0|
| 5977582|  10089524|        2|2020-07-01|             53|jchvhzbeaicqitpvx...|  10089524|product_10089524|  100| 5300.0|
| 1482892|  10122266|        2|2020-07-07|             25|kezhpglnqigaqwrss...|  10122266|product_10122266|   89| 2225.0|
| 2987749|  10134574|   

                                                                                

In [9]:
# Create Dataframe that calculatees 'Average' revenue of 'Total' column
Avg = Total.select(mean('Total'))
Avg.show()



+-----------------+
|       avg(Total)|
+-----------------+
|3814.589524719816|
+-----------------+



                                                                                