In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Retail_Data").getOrCreate()

In [7]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col

In [8]:
order_schema =StructType([
    StructField("order_id", IntegerType(), True),
    StructField("order_date", TimestampType(), True),
    StructField("order_customer_id", IntegerType(), True),
    StructField("order_status", StringType(), True),
])

In [9]:
orders = spark.read.option("header", False).schema(order_schema).csv("orders.csv")

In [10]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [13]:
orders.show(5, truncate=False)

+--------+-------------------+-----------------+---------------+
|order_id|order_date         |order_customer_id|order_status   |
+--------+-------------------+-----------------+---------------+
|1       |2013-07-25 00:00:00|11599            |CLOSED         |
|2       |2013-07-25 00:00:00|256              |PENDING_PAYMENT|
|3       |2013-07-25 00:00:00|12111            |COMPLETE       |
|4       |2013-07-25 00:00:00|8827             |CLOSED         |
|5       |2013-07-25 00:00:00|11318            |COMPLETE       |
+--------+-------------------+-----------------+---------------+
only showing top 5 rows


In [14]:
orders.select(F.to_date("order_date").alias("order_day")).show(5, truncate = False)

+----------+
|order_day |
+----------+
|2013-07-25|
|2013-07-25|
|2013-07-25|
|2013-07-25|
|2013-07-25|
+----------+
only showing top 5 rows


In [17]:
orders.select(F.date_format("order_date", "yyyy-MM").alias("year-month")).show(5, truncate = False)

+----------+
|year-month|
+----------+
|2013-07   |
|2013-07   |
|2013-07   |
|2013-07   |
|2013-07   |
+----------+
only showing top 5 rows


TASK 1 : Load order items dataset and filter items with quantity and price grater than 0

In [19]:
order_item_schema =StructType([
    StructField("order_item_id", IntegerType(), True),
    StructField("order_item_order_id", IntegerType(), True),
    StructField("order_item_product_id", IntegerType(), True),
    StructField("order_item_quantity", IntegerType(), True),
    StructField("order_item_subtotal", DoubleType(), True),
    StructField("order_item_product_price", DoubleType(), True),

])

In [20]:
order_items = spark.read.option("header", False).schema(order_item_schema).csv("order_items.csv")

In [21]:
order_items.printSchema()

root
 |-- order_item_id: integer (nullable = true)
 |-- order_item_order_id: integer (nullable = true)
 |-- order_item_product_id: integer (nullable = true)
 |-- order_item_quantity: integer (nullable = true)
 |-- order_item_subtotal: double (nullable = true)
 |-- order_item_product_price: double (nullable = true)



In [22]:
order_items.show(5, truncate=False)

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|1            |1                  |957                  |1                  |299.98             |299.98                  |
|2            |2                  |1073                 |1                  |199.99             |199.99                  |
|3            |2                  |502                  |5                  |250.0              |50.0                    |
|4            |2                  |403                  |1                  |129.99             |129.99                  |
|5            |4                  |897                  |2                  |49.98              |24.99                   |
+-------------+-

In [25]:
order_items.filter((col("order_item_quantity") > 0) & (col("order_item_product_price") > 0)).show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6| 

Task 2 : Retrieve revenue for order id 2

In [37]:
orderid2items = order_items.filter(col("order_item_order_id") == 2)

In [41]:
orderid2items.show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+



In [38]:
from pyspark.sql.functions import sum

In [39]:
orderid2items.agg(sum("order_item_subtotal")).show()

+------------------------+
|sum(order_item_subtotal)|
+------------------------+
|                  579.98|
+------------------------+



Task 3 : Revenue By Category

In [42]:
product_schema= StructType([
    StructField("product_id", IntegerType(), True),
    StructField("product_category_id", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("product_description", StringType(), True),
    StructField("product_price", DoubleType(), True),
    StructField("product_image", StringType(), True),
])

In [43]:
products = spark.read.option("header", False).schema(product_schema).csv("products.csv")

In [47]:
product_order_join = order_items.join(products, order_items.order_item_product_id == products.product_id, "left")

In [48]:
product_order_join.groupBy("product_category_id").agg(sum("order_item_subtotal")).orderBy("product_category_id").show()

+-------------------+------------------------+
|product_category_id|sum(order_item_subtotal)|
+-------------------+------------------------+
|                  2|      26477.049999999974|
|                  3|       94057.15000000027|
|                  4|       27099.33000000001|
|                  5|                39464.79|
|                  6|      44585.090000000026|
|                  7|      48360.729999999996|
|                  9|       3694843.200000276|
|                 10|      54895.530000000006|
|                 11|                35601.44|
|                 12|                85205.41|
|                 13|       115355.2500000006|
|                 16|       20597.93999999999|
|                 17|       4431942.660000888|
|                 18|      2891757.5400006757|
|                 24|               3147800.0|
|                 26|       151706.2000000002|
|                 29|      1309522.0199998477|
|                 30|      47035.800000000076|
|            

**By Spark SQL**

In [49]:
orders.createOrReplaceTempView("orders")
order_items.createOrReplaceTempView("order_items")
products.createOrReplaceTempView("products")

TASK 1 : Load order items dataset and filter items with quantity and price grater than 0

In [50]:
query1= """
     select * from order_items where order_item_quantity > 0 and order_item_product_price > 0
"""

In [51]:
spark.sql(query1).show()

+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|order_item_id|order_item_order_id|order_item_product_id|order_item_quantity|order_item_subtotal|order_item_product_price|
+-------------+-------------------+---------------------+-------------------+-------------------+------------------------+
|            1|                  1|                  957|                  1|             299.98|                  299.98|
|            2|                  2|                 1073|                  1|             199.99|                  199.99|
|            3|                  2|                  502|                  5|              250.0|                    50.0|
|            4|                  2|                  403|                  1|             129.99|                  129.99|
|            5|                  4|                  897|                  2|              49.98|                   24.99|
|            6| 

Task 2 : Retrieve revenue for order id 2

In [52]:
query2= """
     select sum(order_item_subtotal) from order_items where order_item_order_id = 2
"""

In [53]:
spark.sql(query2).show()

+------------------------+
|sum(order_item_subtotal)|
+------------------------+
|                  579.98|
+------------------------+



Task 3 : Revenue By Category

In [54]:
query3= """
    select product_category_id, sum(order_item_subtotal) from order_items join products on
    order_items.order_item_product_id = products.product_id group by product_category_id order by product_category_id;

"""

In [55]:
spark.sql(query3).show()

+-------------------+------------------------+
|product_category_id|sum(order_item_subtotal)|
+-------------------+------------------------+
|                  2|      26477.049999999974|
|                  3|       94057.15000000027|
|                  4|       27099.33000000001|
|                  5|                39464.79|
|                  6|      44585.090000000026|
|                  7|      48360.729999999996|
|                  9|       3694843.200000276|
|                 10|      54895.530000000006|
|                 11|                35601.44|
|                 12|                85205.41|
|                 13|       115355.2500000006|
|                 16|       20597.93999999999|
|                 17|       4431942.660000888|
|                 18|      2891757.5400006757|
|                 24|               3147800.0|
|                 26|       151706.2000000002|
|                 29|      1309522.0199998477|
|                 30|      47035.800000000076|
|            