In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [38]:
spark = SparkSession.builder.appName('PySparkPractice').getOrCreate()

In [39]:
orders_data = [
    {"order_id": 1, "customer_id": 101, "product_id": 501, "quantity": 2, "order_date": "2023-01-15"},
    {"order_id": 2, "customer_id": 102, "product_id": 502, "quantity": 1, "order_date": "2023-01-16"},
    {"order_id": 3, "customer_id": 101, "product_id": 503, "quantity": 3, "order_date": "2023-01-17"},
    {"order_id": 4, "customer_id": 103, "product_id": 502, "quantity": 2, "order_date": "2023-01-18"},
    {"order_id": 5, "customer_id": 102, "product_id": 501, "quantity": 1, "order_date": "2023-01-19"}
]

products_data = [
    {"product_id": 501, "product_name": "Laptop", "price": 999.99, "category": "Electronics"},
    {"product_id": 502, "product_name": "Desk Chair", "price": 149.99, "category": "Furniture"},
    {"product_id": 503, "product_name": "Smartphone", "price": 699.99, "category": "Electronics"},
    {"product_id": 504, "product_name": "Monitor", "price": 249.99, "category": "Electronics"}
]

orders_df = spark.createDataFrame(orders_data)
products_df = spark.createDataFrame(products_data)

orders_df.show()
products_df.show()

+-----------+----------+--------+----------+--------+
|customer_id|order_date|order_id|product_id|quantity|
+-----------+----------+--------+----------+--------+
|        101|2023-01-15|       1|       501|       2|
|        102|2023-01-16|       2|       502|       1|
|        101|2023-01-17|       3|       503|       3|
|        103|2023-01-18|       4|       502|       2|
|        102|2023-01-19|       5|       501|       1|
+-----------+----------+--------+----------+--------+

+-----------+------+----------+------------+
|   category| price|product_id|product_name|
+-----------+------+----------+------------+
|Electronics|999.99|       501|      Laptop|
|  Furniture|149.99|       502|  Desk Chair|
|Electronics|699.99|       503|  Smartphone|
|Electronics|249.99|       504|     Monitor|
+-----------+------+----------+------------+



## Perform an inner join between orders and products

In [40]:
order_details = orders_df.join(products_df, 'product_id', 'inner')
order_details.show()

+----------+-----------+----------+--------+--------+-----------+------+------------+
|product_id|customer_id|order_date|order_id|quantity|   category| price|product_name|
+----------+-----------+----------+--------+--------+-----------+------+------------+
|       501|        101|2023-01-15|       1|       2|Electronics|999.99|      Laptop|
|       501|        102|2023-01-19|       5|       1|Electronics|999.99|      Laptop|
|       502|        102|2023-01-16|       2|       1|  Furniture|149.99|  Desk Chair|
|       502|        103|2023-01-18|       4|       2|  Furniture|149.99|  Desk Chair|
|       503|        101|2023-01-17|       3|       3|Electronics|699.99|  Smartphone|
+----------+-----------+----------+--------+--------+-----------+------+------------+



                                                                                

## Calculate total revenue per product

In [41]:
revenue_by_product = order_details.groupBy('product_id', 'product_name')\
                    .agg(sum(col('quantity') * col('price')).alias('total_revenue'))\
                    .orderBy(desc('total_revenue'))

revenue_by_product.show()

[Stage 124:>                                                        (0 + 4) / 4]

+----------+------------+------------------+
|product_id|product_name|     total_revenue|
+----------+------------+------------------+
|       501|      Laptop|2999.9700000000003|
|       503|  Smartphone|2099.9700000000003|
|       502|  Desk Chair|            449.97|
+----------+------------+------------------+



                                                                                

## Find the most popular product (by quantity sold)

In [42]:
most_popular_product = order_details.groupBy('product_id', 'product_name')\
    .agg(sum(col('quantity')).alias('total_quantity'))\
    .orderBy(desc('total_quantity'))

most_popular_product.show()

+----------+------------+--------------+
|product_id|product_name|total_quantity|
+----------+------------+--------------+
|       501|      Laptop|             3|
|       502|  Desk Chair|             3|
|       503|  Smartphone|             3|
+----------+------------+--------------+



                                                                                

## Calculate total revenue per customer

In [43]:
revenue_per_customer = order_details.groupBy('customer_id')\
    .agg(sum(col('quantity') * col('price')).alias('total_revenue'))\
    .orderBy(desc('total_revenue'))

revenue_per_customer.show()

+-----------+-----------------+
|customer_id|    total_revenue|
+-----------+-----------------+
|        101|4099.950000000001|
|        102|          1149.98|
|        103|           299.98|
+-----------+-----------------+



                                                                                

## Perform a left join to see all products, even those not ordered

In [44]:
all_products = orders_df.join(products_df, 'product_id', 'left')

all_products.show()

                                                                                

+----------+-----------+----------+--------+--------+-----------+------+------------+
|product_id|customer_id|order_date|order_id|quantity|   category| price|product_name|
+----------+-----------+----------+--------+--------+-----------+------+------------+
|       501|        101|2023-01-15|       1|       2|Electronics|999.99|      Laptop|
|       502|        102|2023-01-16|       2|       1|  Furniture|149.99|  Desk Chair|
|       503|        101|2023-01-17|       3|       3|Electronics|699.99|  Smartphone|
|       502|        103|2023-01-18|       4|       2|  Furniture|149.99|  Desk Chair|
|       501|        102|2023-01-19|       5|       1|Electronics|999.99|      Laptop|
+----------+-----------+----------+--------+--------+-----------+------+------------+

