In [61]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("Read JSON from MinIO") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "admin") \
    .config("spark.hadoop.fs.s3a.secret.key", "password") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.391.jar,/opt/spark/jars/mysql-connector-j-9.1.0.jar") \
    .getOrCreate()

# Database connection properties
db_properties = {
    "url": "jdbc:mysql://mysql:3306/fakeshop",  
    "user": "etluser",  
    "password": "I957DO9cYXp6JDEv",  
    "driver": "com.mysql.cj.jdbc.Driver"
}

items_table_name = "items"
purchases_table_name = "purchases"

# Load the fakeshop.items table as a DataFrame 
items_df = spark.read \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", items_table_name) \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .load()

# Load the fakeshop.purchases table as a DataFrame
purchases_df = spark.read \
    .format("jdbc") \
    .option("url", db_properties["url"]) \
    .option("dbtable", purchases_table_name) \
    .option("user", db_properties["user"]) \
    .option("password", db_properties["password"]) \
    .option("driver", db_properties["driver"]) \
    .load()


In [57]:
# Show DataFrame contents
items_df.count()
purchases_df.count()

100

Now that we have the `purchases_df`, let's calculate the `purchase_summary` data frame out of it.

In [63]:
# Calculate the purchase summary
purchase_summary_df = purchases_df.groupBy("item_id") \
    .agg(
        F.sum("purchase_price").alias("revenue"),
        F.count("id").alias("orders"),
        F.sum("quantity").alias("items_sold")
    )

Let's also read the content inside the MinIO bucket `pageviews` into another data frame.

In [64]:
# Define the bucket and file path
bucket_name = "pageviews"
file_path = f"s3a://{bucket_name}/*.json"

# Read JSON files from the MinIO bucket into a DataFrame
pageviews_df = spark.read.json(file_path)

root
 |-- channel: string (nullable = true)
 |-- received_at: long (nullable = true)
 |-- url: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- item_id: string (nullable = true)
 |-- pageview_type: string (nullable = true)



In [65]:
# Enriching the data frame with additionally derived columns
enriched_pageviews_df = pageviews_df \
.withColumn("item_id", F.split(F.col("url"), "/").getItem(2)) \
.withColumn("pageview_type", F.split(F.col("url"), "/").getItem(1))

item_pageviews_df=enriched_pageviews_df \
.groupBy("item_id", "channel", "pageview_type") \
.agg(
    F.count("*").alias("pageviews")
)\
.filter('pageview_type == "products"') \
.sort(F.col("pageviews").desc()) \

item_pageviews_df.show()

+-------+--------------+-------------+---------+
|item_id|       channel|pageview_type|pageviews|
+-------+--------------+-------------+---------+
|    881|        social|     products|        5|
|    156|   paid search|     products|        4|
|    604|   paid search|     products|        4|
|    482|      referral|     products|        4|
|    658|      referral|     products|        4|
|    754|      referral|     products|        4|
|    377|      referral|     products|        4|
|    297|      referral|     products|        4|
|    804|      referral|     products|        4|
|    166|       display|     products|        4|
|    964|        social|     products|        4|
|    354|        social|     products|        4|
|     19|       display|     products|        4|
|    330|        social|     products|        4|
|    388|organic search|     products|        3|
|     77|organic search|     products|        3|
|    185|organic search|     products|        3|
|    640|organic sea

Now that we have two data frames ready, `purchase_summary_df` and `item_pageviews_df`, let's go ahead create our final data frame, `item_summary_df`.

In [73]:
item_summary_df = items_df.alias("items") \
    .join(purchase_summary_df.alias("purchase_summary"), F.col("items.id") == F.col("purchase_summary.item_id")) \
    .join(item_pageviews_df.alias("item_pageviews"), F.col("items.id") == F.col("item_pageviews.item_id")) \
    .groupBy(F.col("items.name").alias("item_name"), F.col("items.category").alias("item_category")) \
    .agg(
        F.sum("purchase_summary.items_sold").alias("items_sold"),
        F.sum("purchase_summary.orders").alias("orders"),
        F.sum("purchase_summary.revenue").alias("revenue"),
        F.sum("item_pageviews.pageviews").alias("pageviews"),
        (F.sum("purchase_summary.orders") / F.sum("item_pageviews.pageviews")).alias("conversion_rate")
    ) \

item_summary_df.show()

+--------------------+-------------+----------+------+-------+---------+-------------------+
|           item_name|item_category|items_sold|orders|revenue|pageviews|    conversion_rate|
+--------------------+-------------+----------+------+-------+---------+-------------------+
|ophthalmologist a...|      gadgets|         2|     2|  55.18|        5|                0.4|
|    wrinkles bracket|      doodads|         2|     1| 724.12|        1|                1.0|
|     religion France|    clearance|         5|     1|1475.70|        1|                1.0|
|       bongos cactus|      widgets|        16|     4|1842.40|        4|                1.0|
|        geology oval|      gadgets|         9|     3|1670.04|        7|0.42857142857142855|
|     equipment jeans|      gadgets|        10|     2|2552.00|        2|                1.0|
|      rainstorm beef|      gadgets|        12|     3|4578.84|        4|               0.75|
|         snail spear|      widgets|         4|     2|  66.40|        