In [3]:
ecom_products_df = spark.read.format("delta").load("Tables/ecom_products")
ecom_products_df.printSchema()

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 5, Finished, Available)

root
 |-- date_accessed: timestamp (nullable = true)
 |-- img_url: string (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- price: float (nullable = true)
 |-- product_description: string (nullable = true)
 |-- product_features: string (nullable = true)
 |-- product_url: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- title: string (nullable = true)
 |-- rating_1: integer (nullable = true)
 |-- rating_2: integer (nullable = true)
 |-- rating_3: integer (nullable = true)
 |-- rating_4: integer (nullable = true)
 |-- rating_5: integer (nullable = true)
 |-- SellerID: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- source: string (nullable = true)



In [4]:
from pyspark.sql.functions import monotonically_increasing_id, col

ecom_reviews_df = spark.read.format("delta").load("Tables/ecom_reviews")
ecom_reviews_df = ecom_reviews_df.withColumn("id", monotonically_increasing_id())
ecom_reviews_df = ecom_reviews_df.withColumn("id", col("id").cast("int"))
ecom_reviews_df = ecom_reviews_df.dropna(subset=['ProductID'])

ecom_reviews_df.printSchema()

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 6, Finished, Available)

root
 |-- review_id: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- body: string (nullable = true)
 |-- num_helpful: integer (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- date: date (nullable = true)
 |-- sentiment: boolean (nullable = true)
 |-- id: integer (nullable = false)



In [5]:
dim_product = ecom_products_df.select(
    ecom_products_df.ProductID,
    ecom_products_df.title,
    ecom_products_df.product_description,
    ecom_products_df.product_features,
    ecom_products_df.img_url,
    ecom_products_df.product_url,
    ecom_products_df.SellerID
)

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 7, Finished, Available)

In [6]:
from pyspark.sql.functions import to_date

fact_products = ecom_products_df.select(
    ecom_products_df.ProductID,
    ecom_products_df.date_accessed,
    ecom_products_df.num_reviews,
    ecom_products_df.rating,
    ecom_products_df.rating_1,
    ecom_products_df.rating_2,
    ecom_products_df.rating_3,
    ecom_products_df.rating_4,
    ecom_products_df.rating_5,
    ecom_products_df.price,
)

display(fact_products)

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 8, Finished, Available)

SynapseWidget(Synapse.DataFrame, fae18be4-8b90-4ae4-91b4-ca04912e0a4b)

In [7]:
from pyspark.sql.functions import to_date, col, substring, regexp_replace, when, to_timestamp

fact_products = fact_products.withColumn(
    "date_accessed",
    when(col("date_accessed").cast("string").rlike(r'^\d{4}-\d{2}-\d{2}$'), to_date(col("date_accessed"), "yyyy-MM-dd"))
    .when(col("date_accessed").cast("string").rlike(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$'), to_timestamp(col("date_accessed"), "yyyy-MM-dd HH:mm:ss"))
    .otherwise(None)
)

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 9, Finished, Available)

In [8]:
display(fact_products)

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 10, Finished, Available)

SynapseWidget(Synapse.DataFrame, a093897b-b7c0-41f8-8d3c-96ea5dc3804c)

In [9]:
from pyspark.sql.types import StringType, IntegerType, DateType

fact_reviews = ecom_reviews_df.select(
    ecom_reviews_df.id,
    ecom_reviews_df.review_id,
    ecom_reviews_df.ProductID.cast(StringType()),
    ecom_reviews_df.date,
    ecom_reviews_df.rating,
    ecom_reviews_df.num_helpful,
    ecom_reviews_df.sentiment
)

fact_reviews.show()


StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 11, Finished, Available)

+---+--------------+----------+----------+------+-----------+---------+
| id|     review_id| ProductID|      date|rating|num_helpful|sentiment|
+---+--------------+----------+----------+------+-----------+---------+
|  0|R1IZR0DIF1ER8F|B08HMWZBXC|2021-09-16|     5|        681|     true|
|  1|R3DLSNO6JEA4SK|B08HMWZBXC|2023-05-16|     5|        383|     true|
|  2| RMXDTZBS91XP2|B08HMWZBXC|2023-07-22|     5|         51|     true|
|  3| RV52ZY9LSM6SU|B08HMWZBXC|2020-11-21|     5|        311|     true|
|  4|R2TS1OENT9Q75B|B08HMWZBXC|2022-12-14|     5|         19|     true|
|  5|R35VXNJT6LTT98|B08HMWZBXC|2022-12-13|     5|         19|     true|
|  6|R2AVS5ETLMACXJ|B08HMWZBXC|2024-03-14|     5|         22|     true|
|  7| RT7Z991S4A6SU|B08HMWZBXC|2022-11-25|     5|         15|     true|
|  8|R10E7XOJW2UD95|B07PYMK77Y|2019-06-14|     5|        120|     true|
|  9|R12FOT2CSOBU57|B07HGTXF95|2021-04-04|     5|         18|     true|
| 10|R16R3YKDM3ZOJK|B0BM4HGSFJ|2023-03-28|     5|         32|   

In [10]:
delta_table_name = "fact_reviews"
fact_reviews.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable(delta_table_name)

print(f"DataFrame has been written to Delta table: {delta_table_name}")

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 12, Finished, Available)

DataFrame has been written to Delta table: fact_reviews


In [11]:
delta_table_name = "fact_products"
fact_products.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable(delta_table_name)

print(f"DataFrame has been written to Delta table: {delta_table_name}")

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 13, Finished, Available)

DataFrame has been written to Delta table: fact_products


In [12]:
delta_table_name = "dim_products"
dim_product.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .saveAsTable(delta_table_name)

print(f"DataFrame has been written to Delta table: {delta_table_name}")

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 14, Finished, Available)

DataFrame has been written to Delta table: dim_products


In [13]:
from pyspark.sql.functions import col, dayofmonth, month, year, quarter, sequence, explode, lit
from pyspark.sql.types import DateType

# Calculate the minimum and maximum dates from the fact tables
min_date = spark.sql("""
    SELECT MIN(date) AS min_date
    FROM (
        SELECT MIN(date) AS date FROM ecom_reviews
        UNION ALL
        SELECT MIN(date_accessed) AS date FROM ecom_products
    )
""").collect()[0]['min_date']

max_date = spark.sql("""
    SELECT MAX(date) AS max_date
    FROM (
        SELECT MAX(date) AS date FROM ecom_reviews
        UNION ALL
        SELECT MAX(date_accessed) AS date FROM ecom_products
    )
""").collect()[0]['max_date']

# Generate a range of dates based on the calculated min and max dates
date_range_df = spark.sql(f"""
    SELECT sequence(
        to_date('{min_date}'),
        to_date('{max_date}'),
        interval 1 day
    ) AS date_range
""")

# Explode the sequence of dates into individual rows
date_df = date_range_df.selectExpr("explode(date_range) as date")

# Create the date dimension with day, month, year, and quarter
dim_time_df = date_df.select(
    col("date").alias("date"),
    dayofmonth(col("date")).alias("day"),
    month(col("date")).alias("month"),
    year(col("date")).alias("year"),
    quarter(col("date")).alias("quarter")
)

# Show the resulting DataFrame
dim_time_df.show()


StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 15, Finished, Available)

+----------+---+-----+----+-------+
|      date|day|month|year|quarter|
+----------+---+-----+----+-------+
|2012-07-08|  8|    7|2012|      3|
|2012-07-09|  9|    7|2012|      3|
|2012-07-10| 10|    7|2012|      3|
|2012-07-11| 11|    7|2012|      3|
|2012-07-12| 12|    7|2012|      3|
|2012-07-13| 13|    7|2012|      3|
|2012-07-14| 14|    7|2012|      3|
|2012-07-15| 15|    7|2012|      3|
|2012-07-16| 16|    7|2012|      3|
|2012-07-17| 17|    7|2012|      3|
|2012-07-18| 18|    7|2012|      3|
|2012-07-19| 19|    7|2012|      3|
|2012-07-20| 20|    7|2012|      3|
|2012-07-21| 21|    7|2012|      3|
|2012-07-22| 22|    7|2012|      3|
|2012-07-23| 23|    7|2012|      3|
|2012-07-24| 24|    7|2012|      3|
|2012-07-25| 25|    7|2012|      3|
|2012-07-26| 26|    7|2012|      3|
|2012-07-27| 27|    7|2012|      3|
+----------+---+-----+----+-------+
only showing top 20 rows



In [14]:
delta_table_name = "dim_time"
dim_time_df.write.format("delta") \
    .option("mergeSchema", "true") \
    .mode("overwrite") \
    .saveAsTable(delta_table_name)

print(f"DataFrame has been written to Delta table: {delta_table_name}")

StatementMeta(, 4421fae3-0ef6-4142-9ce3-fc95807fe86a, 16, Finished, Available)

DataFrame has been written to Delta table: dim_time
