In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, dayofmonth, dayofweek, month, quarter, year

# Initialize a Spark session
spark = SparkSession.builder.appName("TransactionProcessing").getOrCreate()

In [2]:
spark

In [5]:
# Load the transaction data into a Spark DataFrame
transaction_df =spark.read.csv('data/raw/historical_transactions.csv', inferSchema=True, header=True)
transaction_df.show(5)

+-------+-----------+----------+--------+-----------+--------------+-------------------+
|sale_id|customer_id|product_id|quantity|amount_paid|payment_method|          timestamp|
+-------+-----------+----------+--------+-----------+--------------+-------------------+
|   1392|       1451|         5|       1|      43.99|          Card|2023-01-01 17:12:48|
|   1373|       1632|        14|       2|     199.98|          Card|2023-01-01 18:40:55|
|   1145|       1848|        19|       1|      39.99|          Cash|2023-01-01 20:50:48|
|   1225|       1906|        18|       3|     119.97|          Cash|2023-01-02 01:10:09|
|   1430|        393|         8|       3|      44.97|          Card|2023-01-04 12:36:00|
+-------+-----------+----------+--------+-----------+--------------+-------------------+
only showing top 5 rows



In [6]:
transaction_df.printSchema()

root
 |-- sale_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [7]:
# Convert the 'timestamp' column to datetime and extract the date component
transaction_df = transaction_df.withColumn('sales_date', col('timestamp').cast('date'))

from pyspark.sql.functions import lit

# Add a date_id column to the transaction DataFrame with constant values (None)
transaction_df = transaction_df.withColumn('date_id', lit(None).cast('integer'))

transaction_df.show(5)



+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+
|sale_id|customer_id|product_id|quantity|amount_paid|payment_method|          timestamp|sales_date|date_id|
+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+
|   1392|       1451|         5|       1|      43.99|          Card|2023-01-01 17:12:48|2023-01-01|   null|
|   1373|       1632|        14|       2|     199.98|          Card|2023-01-01 18:40:55|2023-01-01|   null|
|   1145|       1848|        19|       1|      39.99|          Cash|2023-01-01 20:50:48|2023-01-01|   null|
|   1225|       1906|        18|       3|     119.97|          Cash|2023-01-02 01:10:09|2023-01-02|   null|
|   1430|        393|         8|       3|      44.97|          Card|2023-01-04 12:36:00|2023-01-04|   null|
+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+
only showing top 5 rows



In [8]:
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Generate unique date IDs for each row in the DataFrame
# We create a window specification to order rows by the 'sales_date' column, ensuring chronological order.
window_spec = Window.orderBy(F.col("sales_date"))

# Using the window specification, we apply the dense_rank() function to assign a unique 'date_id' to each row.
# The 'date_id' represents the sequential order of sales dates, allowing for time-based analysis.
transaction_df = transaction_df.withColumn("date_id", F.dense_rank().over(window_spec))


# Show the DataFrame
transaction_df.show(5)



+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+
|sale_id|customer_id|product_id|quantity|amount_paid|payment_method|          timestamp|sales_date|date_id|
+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+
|   1392|       1451|         5|       1|      43.99|          Card|2023-01-01 17:12:48|2023-01-01|      1|
|   1373|       1632|        14|       2|     199.98|          Card|2023-01-01 18:40:55|2023-01-01|      1|
|   1145|       1848|        19|       1|      39.99|          Cash|2023-01-01 20:50:48|2023-01-01|      1|
|   1225|       1906|        18|       3|     119.97|          Cash|2023-01-02 01:10:09|2023-01-02|      2|
|   1430|        393|         8|       3|      44.97|          Card|2023-01-04 12:36:00|2023-01-04|      3|
+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+
only showing top 5 rows



In [9]:
transaction_df.printSchema()

root
 |-- sale_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- sales_date: date (nullable = true)
 |-- date_id: integer (nullable = false)



In [10]:

# Extract additional date components
transaction_df = transaction_df.withColumn('weekday', dayofweek(col('timestamp')))
transaction_df = transaction_df.withColumn('month_name', date_format(col('timestamp'), 'MMMM'))
transaction_df = transaction_df.withColumn('day', dayofmonth(col('timestamp')))
transaction_df = transaction_df.withColumn('month', month(col('timestamp')))
transaction_df = transaction_df.withColumn('quarter', quarter(col('timestamp')))
transaction_df = transaction_df.withColumn('year', year(col('timestamp')))
transaction_df = transaction_df.withColumn('weekday_name', date_format(col('timestamp'), 'EEEE'))


In [11]:
from pyspark.sql.functions import when, col

# Use the 'when' function to create the 'quarter_name' column
transaction_df = transaction_df.withColumn('quarter_name',
    when(col('quarter') == 1, 'Q1')
    .when(col('quarter') == 2, 'Q2')
    .when(col('quarter') == 3, 'Q3')
    .when(col('quarter') == 4, 'Q4')
    .otherwise(None)
)

# Show the DataFrame
transaction_df.show(5)



+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+-------+----------+---+-----+-------+----+------------+------------+
|sale_id|customer_id|product_id|quantity|amount_paid|payment_method|          timestamp|sales_date|date_id|weekday|month_name|day|month|quarter|year|weekday_name|quarter_name|
+-------+-----------+----------+--------+-----------+--------------+-------------------+----------+-------+-------+----------+---+-----+-------+----+------------+------------+
|   1392|       1451|         5|       1|      43.99|          Card|2023-01-01 17:12:48|2023-01-01|      1|      1|   January|  1|    1|      1|2023|      Sunday|          Q1|
|   1373|       1632|        14|       2|     199.98|          Card|2023-01-01 18:40:55|2023-01-01|      1|      1|   January|  1|    1|      1|2023|      Sunday|          Q1|
|   1145|       1848|        19|       1|      39.99|          Cash|2023-01-01 20:50:48|2023-01-01|      1|      1|   Ja

In [12]:
# Select columns for the dimension table
date_columns = ['date_id', 'sales_date', 'year', 'quarter', 'quarter_name', 'month', 'month_name', 'day', 'weekday', 'weekday_name']
raw_dim_date = transaction_df.select(date_columns)

In [13]:
# Drop duplicate rows based on the 'date_id' column
dim_date = raw_dim_date.dropDuplicates(['date_id'])



In [14]:
dim_date.show(5)

+-------+----------+----+-------+------------+-----+----------+---+-------+------------+
|date_id|sales_date|year|quarter|quarter_name|month|month_name|day|weekday|weekday_name|
+-------+----------+----+-------+------------+-----+----------+---+-------+------------+
|      1|2023-01-01|2023|      1|          Q1|    1|   January|  1|      1|      Sunday|
|      2|2023-01-02|2023|      1|          Q1|    1|   January|  2|      2|      Monday|
|      3|2023-01-04|2023|      1|          Q1|    1|   January|  4|      4|   Wednesday|
|      4|2023-01-05|2023|      1|          Q1|    1|   January|  5|      5|    Thursday|
|      5|2023-01-07|2023|      1|          Q1|    1|   January|  7|      7|    Saturday|
+-------+----------+----+-------+------------+-----+----------+---+-------+------------+
only showing top 5 rows



In [15]:
# Select columns for the FactSales table
fact_sales = transaction_df.select(['sale_id', 'customer_id', 'product_id', 'quantity', 'amount_paid', 'payment_method', 'date_id'])
fact_sales.show()

+-------+-----------+----------+--------+-----------+--------------+-------+
|sale_id|customer_id|product_id|quantity|amount_paid|payment_method|date_id|
+-------+-----------+----------+--------+-----------+--------------+-------+
|   1392|       1451|         5|       1|      43.99|          Card|      1|
|   1373|       1632|        14|       2|     199.98|          Card|      1|
|   1145|       1848|        19|       1|      39.99|          Cash|      1|
|   1225|       1906|        18|       3|     119.97|          Cash|      2|
|   1430|        393|         8|       3|      44.97|          Card|      3|
|   1421|       1154|         4|       1|     599.99|          Cash|      3|
|   1339|         72|         4|       1|     599.99|          Cash|      4|
|   1401|        310|        16|       4|     279.96|          Cash|      4|
|   1108|       1697|         2|       1|     599.99|          Card|      4|
|   1311|       1063|        24|       1|     129.99|          Card|      4|

In [None]:
# Stop the Spark session
spark.stop()