In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, dayofmonth, month, year, quarter, substring_index, split, when, concat_ws, lit,round
from pyspark.sql.types import DateType
from datetime import datetime, timedelta
import os

In [2]:
# Initialize Spark session
spark = SparkSession\
    .builder\
    .appName("test")\
    .master("yarn")\
    .config("spark.submit.deployMode", "client")\
    .config("spark.executor.instances", "2")\
    .config("spark.executor.memory", "2g")\
    .config("spark.executor.cores", "2")\
    .config("spark.driver.memory", "2g")\
    .config("spark.eventLog.logBlockUpdates.enabled", True)\
    .enableHiveSupport()\
    .getOrCreate()

In [3]:
# Get the current time and subtract one hour
current_time = datetime.now()
previous_hour_time = current_time - timedelta(hours=1)

# Format the previous hour's directory path
base_path = "/retail_data/"
year = previous_hour_time.strftime("%Y")
month = previous_hour_time.strftime("%m")
day = previous_hour_time.strftime("%d")
hour = previous_hour_time.strftime("%H")

previous_hour_path = f"{base_path}year={year}/month={month}/day={day}/hour={hour}"

# Define the filename patterns with wildcard for batch number
branches_pattern = os.path.join(previous_hour_path, "branches*.csv")
sales_agents_pattern = os.path.join(previous_hour_path, "sales_agents*.csv")
sales_transactions_pattern = os.path.join(previous_hour_path, "sales_transactions*.csv")



In [4]:
# Read branches CSV files into DataFrame

branches_Dim = spark.read.csv(branches_pattern, header=True, inferSchema=True)

# Change the data type of the establish_date column to DateType
branches_Dim = branches_Dim.withColumn("establish_date", col("establish_date").cast(DateType()))
branches_Dim.show(10)


+---------+-----------+--------------+-----+
|branch_id|   location|establish_date|class|
+---------+-----------+--------------+-----+
|        1|   New York|    2017-01-15|    A|
|        2|Los Angeles|    2016-07-28|    B|
|        3|    Chicago|    2015-03-10|    A|
|        4|    Houston|    2016-11-05|    D|
|        5|    Phoenix|    2017-09-20|    C|
+---------+-----------+--------------+-----+



In [5]:
# Read sales agents CSV files into DataFrame

sales_agents_Dim = spark.read.csv(sales_agents_pattern, header=True, inferSchema=True)

# Change the data type of the value column to DateType
sales_agents_Dim = sales_agents_Dim.withColumn("hire_date", col("hire_date").cast(DateType()))
sales_agents_Dim.show(20)




+---------------+------------------+----------+
|sales_person_id|              name| hire_date|
+---------------+------------------+----------+
|              1|          John Doe|2020-06-03|
|              2|        Jane Smith|2018-05-13|
|              3|   Michael Johnson|2021-10-03|
|              4|       Emily Brown|2020-10-25|
|              5|      David Wilson|2021-04-08|
|              6|       Emma Taylor|2019-03-28|
|              7|Christopher Miller|2020-01-11|
|              8|      Olivia Davis|2021-10-24|
|              9|   Daniel Martinez|2018-10-08|
|             10|      Sophia Moore|2019-05-25|
+---------------+------------------+----------+



In [6]:
# Read sales transactions CSV files into DataFrame
sales_transactions = spark.read.csv(sales_transactions_pattern, header=True, inferSchema=True)

sales_transactions.show(5,truncate=False)

In [10]:
#Rename some columns
sales_transactions = sales_transactions.withColumnRenamed("cusomter_lname", "customer_lname") \
                                       .withColumnRenamed("cusomter_email", "customer_email")


In [11]:
#Cleaning Email column
sales_transactions = sales_transactions.withColumn("customer_email", 
                                                   substring_index("customer_email", ".com", 1))


In [14]:
# Show the DataFrame to verify changes
sales_transactions.select("customer_email").show(5,truncate=False)

+----------------------+
|customer_email        |
+----------------------+
|alexander.brown@gmail |
|william.brown@gmail   |
|john.williams@gmail   |
|alexander.miller@yahoo|
|john.brown@hotmail    |
+----------------------+
only showing top 5 rows



In [12]:
# Selecting columns for customer_Dim DataFrame
customer_Dim = sales_transactions.select(
    col('customer_id'),
    col('customer_fname'),
    col('customer_lname'),
    col('customer_email')
).dropDuplicates()


In [16]:
# Show the customer_Dim DataFrame
customer_Dim.show(truncate=False)

+-----------+--------------+--------------+--------------------------+
|customer_id|customer_fname|customer_lname|customer_email            |
+-----------+--------------+--------------+--------------------------+
|85503      |Michael       |Jones         |michael.jones@yahoo       |
|85497      |William       |Wilson        |william.wilson@yahoo      |
|85506      |Ava           |Miller        |ava.miller@yahoo          |
|85546      |Alexander     |Johnson       |alexander.johnson@gmail   |
|85486      |James         |Johnson       |james.johnson@yahoo       |
|85498      |Olivia        |Davis         |olivia.davis@hotmail      |
|85487      |Mia           |Davis         |mia.davis@outlook         |
|85561      |Alexander     |Moore         |alexander.moore@yahoo     |
|85522      |John          |Brown         |john.brown@hotmail        |
|85496      |James         |Davis         |james.davis@yahoo         |
|85511      |Ava           |Moore         |ava.moore@gmail           |
|85532

In [7]:
#Handle Location_Dim

# Split the 'shipping_address' column
split_col = split(sales_transactions['shipping_address'], '/')

# Create 'city' and 'state' columns
df = sales_transactions.withColumn('city', split_col.getItem(1)) \
                       .withColumn('state', split_col.getItem(2))

# Select columns for location_Dim DataFrame
df = df.select('city', 'state')

# Drop rows with null values
df = df.na.drop()

# Drop duplicate rows
df = df.dropDuplicates()

# Create 'location_id' in the format 'city-state'
df = df.withColumn("location_id", concat_ws("-", col("city"), col("state")))

# Selecting columns for location_Dim DataFrame
location_Dim = df.select(
    col('location_id'),
    col('city'),
    col('state')
)

# Show the DataFrame
location_Dim.show(10, truncate=False)



+---------------+------------+-----+
|location_id    |city        |state|
+---------------+------------+-----+
|Mesa-AZ        |Mesa        |AZ   |
|Kenai-AK       |Kenai       |AK   |
|Arvada-CO      |Arvada      |CO   |
|Franklin-VT    |Franklin    |VT   |
|Burlington-VT  |Burlington  |VT   |
|Fayetteville-AR|Fayetteville|AR   |
|Washington-DC  |Washington  |DC   |
|Waddell-AZ     |Waddell     |AZ   |
|Hartford-VT    |Hartford    |VT   |
|Underhill-VT   |Underhill   |VT   |
+---------------+------------+-----+
only showing top 10 rows



In [8]:
#Handle Payment_Dim

# Select columns for Payment_Dim DataFrame
df = sales_transactions.select('is_online', 'payment_method')

# Drop rows with null values
df = df.na.drop()

# Drop duplicate rows
df = df.dropDuplicates()

# Create 'payment_id' in the format 'isonline-payment'
df = df.withColumn("payment_id", concat_ws("-", col("is_online"), col("payment_method")))

# Selecting columns for location_Dim DataFrame
payment_Dim = df.select(
    col('payment_id'),
    col('is_online'),
    col('payment_method')
)

# Show the DataFrame
payment_Dim.show(10, truncate=False)


+---------------+---------+--------------+
|payment_id     |is_online|payment_method|
+---------------+---------+--------------+
|yes-Credit Card|yes      |Credit Card   |
|no-Cash        |no       |Cash          |
|no-Credit Card |no       |Credit Card   |
|yes-Stripe     |yes      |Stripe        |
|yes-PayPal     |yes      |PayPal        |
+---------------+---------+--------------+



In [None]:
# Selecting columns for product_Dim DataFrame
product_Dim = sales_transactions.select("product_id", "product_name", "product_category").dropDuplicates()

In [None]:
product_Dim.show(5)

+----------+------------+----------------+
|product_id|product_name|product_category|
+----------+------------+----------------+
|        23|     Toaster|      Appliances|
|         9|       Boots|        Footwear|
|        14|      Camera|     Electronics|
|         6|       Jeans|        Clothing|
|         2|  Smartphone|     Electronics|
+----------+------------+----------------+
only showing top 5 rows



In [9]:
#Handle offer_Dim
df = sales_transactions.select('offer_1', 'offer_2','offer_3','offer_4','offer_5')

# Unpivot (melt) the DataFrame
unpivot_expr = "stack(5, 'offer_1', offer_1, 'offer_2', offer_2, 'offer_3', offer_3, 'offer_4', offer_4, 'offer_5', offer_5) as (offer_name, offer_value)"
unpivoted_df = df.selectExpr(unpivot_expr)

# Filter out rows where offer_value is null
unpivoted_df = unpivoted_df.filter(col("offer_value").isNotNull())

# Assign percentage based on the offer name
unpivoted_df = unpivoted_df.withColumn(
    "percentage",
    when(col("offer_name") == "offer_1", 5)
    .when(col("offer_name") == "offer_2", 10)
    .when(col("offer_name") == "offer_3", 15)
    .when(col("offer_name") == "offer_4", 20)
    .when(col("offer_name") == "offer_5", 25)
)

# Create offer_id by concatenating offer_name and percentage
unpivoted_df = unpivoted_df.withColumn(
    "offer_id",
    concat_ws("-",  col("percentage"),col("offer_name"))
)

# Select the required columns for offer_Dim DataFrame
offer_Dim = unpivoted_df.select(
    col('offer_id'),
    col('offer_name').alias('offer_name'),
    col('percentage')
).distinct()

# Show the offer_Dim DataFrame
offer_Dim.show(10,truncate=False)



+----------+----------+----------+
|offer_id  |offer_name|percentage|
+----------+----------+----------+
|25-offer_5|offer_5   |25        |
|15-offer_3|offer_3   |15        |
|5-offer_1 |offer_1   |5         |
|20-offer_4|offer_4   |20        |
|10-offer_2|offer_2   |10        |
+----------+----------+----------+



In [10]:
#Handle Date Dimension

# Check if the retail_dwh path exists in HDFS to determine initial or incremental loading
hdfs_path = "/retail_dwh/date_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, date_format, dayofmonth, month, year, quarter, substring_index, split, when, concat_ws, lit,round
    from pyspark.sql.types import DateType
    from datetime import datetime, timedelta
    import os

    # Define the start and end dates
    start_date = datetime(2020, 1, 1)
    end_date = datetime(2030, 12, 31)

    # Create a list of dates between start_date and end_date
    date_list = [(start_date + timedelta(days=x)) for x in range((end_date - start_date).days + 1)]

    # Convert the list of dates to a DataFrame
    date_df = spark.createDataFrame([(date,) for date in date_list], ["value"])

    # Add a date_id column formatted as 'yyyyMMdd'
    date_Dim = date_df.withColumn(
        "date_id",
        date_format(col("value"), "yyyyMMdd")
    )

    # Add day, month, year, quarter, and day_name columns
    date_Dim = date_Dim.withColumn("day", dayofmonth(col("value"))) \
                       .withColumn("month", month(col("value"))) \
                       .withColumn("year", year(col("value"))) \
                       .withColumn("quarter", quarter(col("value"))) \
                       .withColumn("day_name", date_format(col("value"), "EEEE"))

    # Change the data type of the value column to DateType
    date_Dim = date_Dim.withColumn("value", col("value").cast(DateType()))
    
    date_Dim.show()
    
else:
    print("Skipping the creation of the date dimension DataFrame.")



+----------+--------+---+-----+----+-------+---------+
|     value| date_id|day|month|year|quarter| day_name|
+----------+--------+---+-----+----+-------+---------+
|2020-01-01|20200101|  1|    1|2020|      1|Wednesday|
|2020-01-02|20200102|  2|    1|2020|      1| Thursday|
|2020-01-03|20200103|  3|    1|2020|      1|   Friday|
|2020-01-04|20200104|  4|    1|2020|      1| Saturday|
|2020-01-05|20200105|  5|    1|2020|      1|   Sunday|
|2020-01-06|20200106|  6|    1|2020|      1|   Monday|
|2020-01-07|20200107|  7|    1|2020|      1|  Tuesday|
|2020-01-08|20200108|  8|    1|2020|      1|Wednesday|
|2020-01-09|20200109|  9|    1|2020|      1| Thursday|
|2020-01-10|20200110| 10|    1|2020|      1|   Friday|
|2020-01-11|20200111| 11|    1|2020|      1| Saturday|
|2020-01-12|20200112| 12|    1|2020|      1|   Sunday|
|2020-01-13|20200113| 13|    1|2020|      1|   Monday|
|2020-01-14|20200114| 14|    1|2020|      1|  Tuesday|
|2020-01-15|20200115| 15|    1|2020|      1|Wednesday|
|2020-01-1

In [11]:
#Handle transaction Fact table

#extract the needed columns
df = sales_transactions.drop("customer_fname", "customer_lname","customer_email","product_name","product_category")

#change transaction_date data type to date
df = df.withColumn("transaction_date", col("transaction_date").cast("date"))

#add transaction_date_id
df = df.withColumn(
    "transaction_date_id",
    date_format(df["transaction_date"], "yyyyMMdd")
)

#add location_id

split_col = split(sales_transactions['shipping_address'], '/')

df = df.withColumn('city', split_col.getItem(1)) \
                       .withColumn('state', split_col.getItem(2))

df = df.withColumn("location_id", concat_ws("-", col("city"), col("state")))

#add payment_id
df = df.withColumn("payment_id", concat_ws("-", col("is_online"), col("payment_method")))

#add offer_id

offer_columns = ['offer_1', 'offer_2', 'offer_3', 'offer_4', 'offer_5']

df = df.withColumn('offer_name',
                   concat_ws(',',
                             *[when(col(col_name), lit(col_name)).otherwise(None) for col_name in offer_columns]
                             )
                  )

df = df.withColumn(
    "percentage",
    when(col("offer_name") == "offer_1", 5)
    .when(col("offer_name") == "offer_2", 10)
    .when(col("offer_name") == "offer_3", 15)
    .when(col("offer_name") == "offer_4", 20)
    .when(col("offer_name") == "offer_5", 25)
    .otherwise(0) 
)


df = df.withColumn(
    "offer_id",
    when(col("percentage") != 0, concat_ws("-", col("percentage"), col("offer_name")))
    .otherwise(None)  
)

#add total_price
df = df.withColumn('total_price', round(col('units') * col('unit_price') * (1 - col('percentage') / 100), 3))

sales_transactions_Fact = df.select(
    col('transaction_id'),
    col('transaction_date_id'),
    col('customer_id'),
    col('sales_agent_id'),
    col('branch_id'),
    col('product_id'),
    col('location_id'),
    col('payment_id'),
    col('offer_id'),
    col('units'),
    col('unit_price'),
    col('total_price'))

sales_transactions_Fact


transaction_id,transaction_date_id,customer_id,sales_agent_id,branch_id,product_id,location_id,payment_id,offer_id,units,unit_price,total_price
trx-152546429674,20230520,85469,1,2,22,,no-Cash,,10,79.99,799.9
trx-291375327542,20221025,85512,3,1,24,,no-Cash,20-offer_4,5,49.99,199.96
trx-312507679871,20220205,85484,10,3,4,,no-Credit Card,,1,99.99,99.99
trx-193384855491,20231020,85528,7,2,25,,no-Cash,,8,499.99,3999.92
trx-831626097654,20221117,85500,5,1,14,,no-Cash,15-offer_3,10,399.99,3399.915
trx-158496122054,20220927,85545,4,5,14,,no-Credit Card,25-offer_5,6,399.99,1799.955
trx-722817999024,20220421,85561,4,1,30,,no-Credit Card,20-offer_4,6,24.99,119.952
trx-813287633702,20230428,85520,1,1,26,,no-Cash,,4,199.99,799.96
trx-219568257432,20230308,85488,6,2,18,,no-Credit Card,,10,149.99,1499.9
trx-352160720823,20230617,85466,5,2,16,,no-Cash,,8,39.99,319.92


In [13]:
#Create retail database

spark.sql("CREATE DATABASE IF NOT EXISTS retail")

In [14]:
# Writing the sales_transactions_Fact into Hive

from pyspark.sql import functions as F

sales_transactions_Fact = sales_transactions_Fact.withColumn("year", F.substring("transaction_date_id", 1, 4))
sales_transactions_Fact = sales_transactions_Fact.withColumn("month", F.substring("transaction_date_id", 5, 2))
sales_transactions_Fact = sales_transactions_Fact.withColumn("day", F.substring("transaction_date_id", 7, 2))

# Check if the retail_dwh path exists in HDFS to determine initial or incremental loading
hdfs_path = "/retail_dwh/sales_transactions_Fact"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:

    # Writing the DataFrame with partitioning
    table_name = "retail.sales_transactions_Fact"
    sales_transactions_Fact.coalesce(1).write.mode("overwrite") \
      .format("parquet") \
      .partitionBy("year", "month", "day") \
      .option("path", "/retail_dwh/sales_transactions_Fact") \
      .saveAsTable(table_name)
    

else:

    # Writing the DataFrame with partitioning
    table_name = "retail.sales_transactions_Fact"
    sales_transactions_Fact.coalesce(1).write.mode("append") \
      .format("parquet") \
      .partitionBy("year", "month", "day") \
      .option("path", "/retail_dwh/sales_transactions_Fact") \
      .saveAsTable(table_name)
    

In [None]:
# Writing the branches_Dim into Hive

# Check if the retail_dwh path exists in HDFS to determine initial or incremental loading

hdfs_path = "/retail_dwh/branches_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:

    table_name = "retail.branches_Dim"
    branches_Dim.write.mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/branches_Dim") \
      .saveAsTable(table_name)

else:
    #apply incremental Loading
    branches_Dim_i=spark.sql("select * from retail.branches_Dim")
    #Finding the new records only 
    branches_Dim_n = branches_Dim.join(branches_Dim_i, ['branch_id'], 'left_anti')
   
    table_name = "retail.branches_Dim"
    branches_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/branches_Dim") \
      .saveAsTable(table_name)
    
    
    
    

In [118]:
# Writing the sales_agents_Dim into Hive

hdfs_path = "/retail_dwh/sales_agents_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    #apply incremental Loading
    table_name = "retail.sales_agents_Dim"
    sales_agents_Dim.write.mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/sales_agents_Dim") \
      .saveAsTable(table_name)
else:
    #apply incremental Loading
    sales_agents_Dim_i=spark.sql("select * from retail.sales_agents_Dim")
    #Finding the new records only 
    sales_agents_Dim_n = sales_agents_Dim.join(sales_agents_Dim_i,['sales_person_id'], 'left_anti')

    table_name = "retail.sales_agents_Dim"
    sales_agents_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/sales_agents_Dim") \
      .saveAsTable(table_name)
    

In [19]:
hdfs_path = "/retail_dwh/customer_Dim"

# Check if the HDFS path exists
if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    table_name = "retail.customer_Dim"
    
    
    # Write the DataFrame to Hive as a single file
    customer_Dim.coalesce(1).write.mode("overwrite") \
        .format("parquet") \
        .option("path", "/retail_dwh/customer_Dim") \
        .saveAsTable(table_name)
else:
    #apply incremental Loading
    customer_Dim_i=spark.sql("select * from retail.customer_Dim")
    #Finding the new records only 
    customer_Dim_n = customer_Dim.join(customer_Dim_i,['customer_id'], 'left_anti')
    
    table_name = "retail.customer_Dim"
    customer_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/customer_Dim") \
      .saveAsTable(table_name)

In [136]:
# Writing the location_Dim into Hive

hdfs_path = "/retail_dwh/location_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    
    table_name = "retail.location_Dim"
    location_Dim.write.mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/location_Dim") \
      .saveAsTable(table_name)
else:
    #apply incremental Loading
    location_Dim_i=spark.sql("select * from retail.location_Dim")
    #Finding the new records only 
    location_Dim_n = location_Dim.join(location_Dim_i,['location_id'], 'left_anti')
    
    table_name = "retail.location_Dim"
    location_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/location_Dim") \
      .saveAsTable(table_name)
    

In [139]:
# Writing the payment_Dim into Hive

hdfs_path = "/retail_dwh/payment_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    
    table_name = "retail.payment_Dim"
    payment_Dim.write.mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/payment_Dim") \
      .saveAsTable(table_name)
else:
    #apply incremental Loading
    payment_Dim_i=spark.sql("select * from retail.payment_Dim")
    #Finding the new records only 
    payment_Dim_n = payment_Dim.join(payment_Dim_i,['payment_id'], 'left_anti')
    
    table_name = "retail.payment_Dim"
    payment_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/payment_Dim") \
      .saveAsTable(table_name)
    

In [146]:
# Writing the product_Dim into Hive

hdfs_path = "/retail_dwh/product_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    
    table_name = "retail.product_Dim"
    product_Dim.write.mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/product_Dim") \
      .saveAsTable(table_name)
else:
    product_Dim_i=spark.sql("select * from retail.product_Dim")
    #Finding the new records only 
    product_Dim_n = product_Dim.join(product_Dim_i,['product_id'], 'left_anti')
    
    table_name = "retail.product_Dim"
    product_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/product_Dim") \
      .saveAsTable(table_name)
    

In [148]:
# Writing the offer_Dim into Hive

hdfs_path = "/retail_dwh/offer_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:
    
    table_name = "retail.offer_Dim"
    offer_Dim.write.mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/offer_Dim") \
      .saveAsTable(table_name)
else:
    offer_Dim_i=spark.sql("select * from retail.offer_Dim")
    #Finding the new records only 
    offer_Dim_n = offer_Dim.join(offer_Dim_i,['offer_id'], 'left_anti')
    
    table_name = "retail.offer_Dim"
    offer_Dim_n.write.mode("append") \
      .format("parquet") \
      .option("path", "/retail_dwh/offer_Dim") \
      .saveAsTable(table_name)
    
    

In [None]:
# Writing the date_Dim into Hive

hdfs_path = "/retail_dwh/date_Dim"

if not os.system(f"hdfs dfs -test -d {hdfs_path}") == 0:

    table_name = "retail.date_Dim"
    date_Dim.write.coalesce(1).mode("overwrite") \
      .format("parquet") \
      .option("path", "/retail_dwh/date_Dim") \
      .saveAsTable(table_name)
else:
    print("Skipping the writing of the date dimension.")

In [15]:
# Stop the Spark session
spark.stop()