In [1]:
 import re
from pyspark.sql import SparkSession
from functools import reduce
from pyspark.sql.functions import col, regexp_replace, trim,when ,monotonically_increasing_id,lit,year, month, dayofmonth, weekofyear, dayofweek, date_format,floor,dense_rank,\
substring,concat,split, row_number,lpad,lit, current_date
from pyspark.sql.window import Window
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from datetime import date, datetime, timedelta
import subprocess
from py4j.java_gateway import java_import
import os
import sys
from pyspark.sql.types import DateType
from pyspark.sql import functions as F

In [2]:
spark = SparkSession\
    .builder\
    .master("local[4]")\
    .appName("Gold_layer_transformation")\
    .config("spark.eventLog.logBlockUpdates.enabled", True)\
    .enableHiveSupport()\
    .getOrCreate()

sc = spark.sparkContext

In [3]:
 spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
 spark.sql("set hive.enforce.bucketing=true")

key,value
hive.enforce.buck...,True


In [4]:
now = datetime.now()
date_str = now.strftime("%Y%m%d")
hour_str = now.strftime("%H")
path = f"hdfs:///data/retail_silver/{date_str}/{hour_str}/sales_transactions_SS_cleaned_{date_str}_{hour_str}.parquet"

In [5]:
try:
    if os.system(f"hdfs dfs -test -e {path}") == 0:
        input_df = spark.read.parquet(path)
    else:
        raise SystemExit(f"Path does not exist: {path}")  # Exit with code 1 for missing path
except Exception as e:
    print(f"An error occurred: {e}")
    raise SystemExit(1)  # Exit with code 1 for other errors

In [6]:
# #function_to_rename_in_hdfs
# def rename_in_hdfs(golden_layer_path,file_extension,name):
#     # Run the Hadoop fs -ls command to list files
#     list_files_process = subprocess.run(["hadoop", "fs", "-ls", golden_layer_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

#     # Check for errors
#     if list_files_process.returncode != 0:
#         print(f"Error listing files in {golden_layer_path}: {list_files_process.stderr.decode()}")
#         exit(1)

#     # Decode stdout to string format and split lines
#     stdout_str = list_files_process.stdout.decode()
#     file_list = stdout_str.splitlines()

#     # Find the file to rename based on criteria
#     file_to_rename = None
#     for line in file_list:
#         if line.endswith(file_extension):
#             file_to_rename = line.split()[-1].strip()
#             break

#     # Check if a file matching the criteria was found
#     if file_to_rename:
#         new_filename = f"{golden_layer_path}/{name}{file_extension}"

#         # Move (rename) the file
#         subprocess.run(["hadoop", "fs", "-mv", file_to_rename, new_filename])

#         print(f"File moved and renamed to: {new_filename}")
#     else:
#         print("File matching the criteria not found.")

Function to check if a file exists in HDFS

In [7]:
def check_if_exists(path):
    jvm = spark._jvm
    jsc = spark._jsc
    fs = jvm.org.apache.hadoop.fs.FileSystem.get(jsc.hadoopConfiguration())
    return fs.exists(jvm.org.apache.hadoop.fs.Path(path))

A function that writes a df to hive external_table

In [8]:
def write_df_to_table(database_name, table_name, df, partition_columns=None):
    # Temporary table
    temp_table = "temp_table"

    # Create DataFrameWriter with overwrite mode
    writer = df.write.mode("overwrite")

    # If partition columns are provided, specify them in the writer
    if partition_columns:
        if isinstance(partition_columns, list):
            writer = writer.partitionBy(*partition_columns)
        else:
            writer = writer.partitionBy(partition_columns)

    # Save the DataFrame to a temporary table
    writer.saveAsTable(temp_table)

    # Construct the insert overwrite query
    if partition_columns:
        if isinstance(partition_columns, list):
            partition_str = ", ".join(partition_columns)
        else:
            partition_str = partition_columns
    else:
        partition_str = ""

    insert_query = f"""
         INSERT INTO TABLE {database_name}.{table_name}
         {f'PARTITION ({partition_str})' if partition_str else ''}
         SELECT * FROM {temp_table}
    """

    try:
        # Execute the insert overwrite query
        spark.sql(insert_query)

        # Drop the temporary table after use
        subprocess.run(["hadoop", "fs", "-rm", "-r", '/user/hive/warehouse/temp_table'])
        print(f"Data written successfully to {database_name}.{table_name}")
    except Exception as e:
        subprocess.run(["hadoop", "fs", "-rm", "-r", '/user/hive/warehouse/temp_table'])
        print(f"Error in writing to {table_name}: {e}")


In [9]:
#write customer dim in HDFS
#input_df = spark.read.parquet(f"hdfs:///data/retail_silver/20240712/19/sales_transactions_SS_cleaned_20240712_19.parquet")

cust_data = input_df.select('customer_id', 'customer_fname', 'customer_lname', 'customer_email')
golden_layer_path="hdfs://localhost:9000/data/retail_gold/customer_dim"

if check_if_exists(golden_layer_path):
    cust_dim = spark.read.parquet(golden_layer_path)
    existing_cust_dim_without_sk = cust_dim.select('customer_id', 'customer_fname', 'customer_lname', 'customer_email','hashkey')
    all_cols = F.concat_ws("", *cust_data.columns)
    new_customers_data = cust_data.withColumn("hashkey", F.md5(all_cols))
    new_customers_data = new_customers_data.dropDuplicates(['hashkey'])
    new_customers_data = new_customers_data.join(existing_cust_dim_without_sk, "hashkey", "left_anti")
    # Get the maximum surrogate key from existing data
    max_sur_key = cust_dim.agg({"customer_sur_key": "max"}).collect()[0][0]
    
    # Combine existing data with new data
    if new_customers_data.rdd.isEmpty() == False:
        print(new_customers_data.show(100))
        window_spec = Window.orderBy("customer_id")
        #Add surrogate keys to new data starting from max_sur_key + 1
        new_customers_data = new_customers_data.withColumn('customer_sur_key', (row_number().over(window_spec) + max_sur_key).cast("int"))
        customers_dim_sk = new_customers_data.select('customer_sur_key','customer_id', 'customer_fname', 'customer_lname', 'customer_email')
        existing_cust_dim_without_hash = cust_dim.drop("hashkey")
        #write the new data on its location on the gold layer
        new_customers_data = new_customers_data.select('customer_sur_key','customer_id', 'customer_fname', 'customer_lname', 'customer_email','hashkey')
        customers_dim = new_customers_data.repartition(1)
        customers_dim.write.mode('append') \
                            .format('parquet') \
                            .save(golden_layer_path)
        #write the new data into the customer dimension hive table
        cust_dim = spark.read.parquet(golden_layer_path)
        updated_customers_dim = cust_data.join(cust_dim, on=
                                               [cust_data.customer_id.alias('2')==cust_dim.customer_id, cust_data.customer_email==cust_dim.customer_email],
                                              how='left')
        updated_customers_dim = updated_customers_dim.select('customer_sur_key', cust_data.customer_id.alias('customer_id'), cust_data.customer_fname.alias('customer_fname'), cust_data.customer_lname.alias('customer_lname'), cust_data.customer_email.alias('customer_email'))
        write_df_to_table('retail_DWH', 'Customer_Dim', customers_dim_sk)
        print("done")
    else:
        updated_customers_dim = cust_dim.drop("hashkey")
        print("No new Data")
else:
    # Add a sequential surrogate key column
    cust_data = cust_data.dropDuplicates(['customer_id'])
    all_cols = F.concat_ws("", *cust_data.columns)
    hashed_customer_dim = cust_data.withColumn("hashkey", F.md5(all_cols))
    window_spec = Window.orderBy("customer_id")
    hashed_customer_dim = hashed_customer_dim.withColumn('customer_sur_key', row_number().over(window_spec))
    hashed_customer_dim = hashed_customer_dim.select('customer_sur_key','customer_id', 'customer_fname', 'customer_lname', 'customer_email','hashkey')
    #make customer dim 
    updated_customers_dim = hashed_customer_dim.select('customer_sur_key','customer_id', 'customer_fname', 'customer_lname', 'customer_email')

    hashed_customer_dim.repartition(1)
    hashed_customer_dim.write.mode('overwrite') \
            .format('parquet') \
            .save(golden_layer_path)
    updated_customers_dim.show(5)
    #write the new data into the customer dimension hive table
    write_df_to_table('retail_DWH', 'Customer_Dim', updated_customers_dim)

+--------------------+-----------+--------------+--------------+--------------------+
|             hashkey|customer_id|customer_fname|customer_lname|      customer_email|
+--------------------+-----------+--------------+--------------+--------------------+
|8136b9acc2f707b95...|      85558|          Emma|         Moore|emma.moore@hotmai...|
|334e5bbbced897a06...|      85553|          John|         Moore|john.moore@outloo...|
|4a4e7d1d2aa4f663a...|      85488|           Mia|      Williams|mia.williams@hotm...|
|f9db3b38b29049616...|      85538|          Emma|         Davis|emma.davis@gmail.com|
|c39860bcae99ec816...|      85492|       Michael|        Miller|michael.miller@gm...|
|bc53cd9808b3d23cb...|      85470|        Sophia|         Smith|sophia.smith@yaho...|
|5165d5220df6f6360...|      85505|        Sophia|        Taylor|sophia.taylor@yah...|
|c504cf9fb8faf3c35...|      85551|       William|         Davis|william.davis@hot...|
|91bfeb7f7044348bb...|      85487|          Emma|     

In [10]:
# Ensure product_dim is distinct by product_id and add a sequential surrogate key
# Select relevant columns for product_dim
product_dim = input_df.select('product_id', 'product_name', 'product_category')

# Drop duplicates based on product_id if necessary
product_dim = product_dim.dropDuplicates(['product_id','product_name','product_category'])

# Define the golden layer path and file details
golden_layer_path = "hdfs://localhost:9000/data/retail_gold/product_dim/"

if check_if_exists(golden_layer_path):
    #Hashing the records and compare the old and new dataframes
    existing_product_dim = spark.read.parquet(golden_layer_path)
    existing_product_dim = existing_product_dim.withColumn("product_sur_key", col("product_sur_key").cast("int"))
    existing_product_dim_without_sk = existing_product_dim.select('product_id', 'product_name', 'product_category', 'hashkey')
    all_cols = F.concat_ws("", *product_dim.columns)
    new_products_data = product_dim.withColumn("hashkey", F.md5(all_cols))
    new_products_data = new_products_data.join(existing_product_dim_without_sk, "hashkey", "left_anti")
    
    # Combine existing data with new data
    if new_products_data.rdd.isEmpty() == False:
        # Get the maximum surrogate key from existing data
        max_sur_key = existing_product_dim.agg({"product_sur_key": "max"}).collect()[0][0]
        window_spec = Window.orderBy("product_id")
        product_dim = new_products_data.withColumn('product_sur_key', (row_number().over(window_spec) + max_sur_key).cast("int"))
        product_dim_sk = product_dim.select('product_sur_key','product_id', 'product_name', 'product_category')
        existing_product_dim_without_hash = existing_product_dim.drop("hashkey")
        updated_product_dim = existing_product_dim_without_hash.union(product_dim_sk)
        product_dim = product_dim.select('product_sur_key','product_id', 'product_name', 'product_category','hashkey')
        product_dim = product_dim.repartition(1)
        product_dim.write.mode('append') \
                            .format('parquet') \
                            .save(golden_layer_path)
        
        write_df_to_table('retail_DWH', 'Product_Dim', product_dim_sk)
    else:
        updated_product_dim = existing_product_dim.drop("hashkey")
        print("No new Data")
else:
    # Add a sequential surrogate key column
    window_spec = Window.orderBy("product_id")
    all_cols = F.concat_ws("", *product_dim.columns)
    hashed_product_dim = product_dim.withColumn("hashkey", F.md5(all_cols))
    hashed_product_dim = hashed_product_dim.withColumn('product_sur_key', row_number().over(window_spec))
    hashed_product_dim = hashed_product_dim.select('product_sur_key','product_id', 'product_name', 'product_category','hashkey')
    updated_product_dim = hashed_product_dim.select('product_sur_key','product_id', 'product_name', 'product_category')

    updated_product_dim.show()
    
    # Repartition to one file for efficient writing
    hashed_product_dim = hashed_product_dim.repartition(1)

    # Write the updated data back to HDFS
    hashed_product_dim.write.mode('overwrite') \
        .format('parquet') \
        .save(golden_layer_path)
    
    write_df_to_table('retail_DWH', 'Product_Dim', updated_product_dim)


No new Data


In [11]:
# Define the file path for the initial CSV data and the golden layer path on HDFS
file_path = f"hdfs://localhost:9000/data/retail_bronze/{date_str}/08/branches_SS_raw_{date_str}_08.parquet"
golden_layer_path = "hdfs://localhost:9000/data/retail_gold/branches_dim"

# Load the CSV data into a PySpark DataFrame
branches_dim = spark.read.parquet(file_path)

# Convert establish_date to date type if needed
branches_dim = branches_dim.withColumn("establish_date", col("establish_date").cast("date"))

# Drop duplicates based on branch_id if necessary
branches_dim = branches_dim.dropDuplicates(['branch_id'])

if check_if_exists(golden_layer_path):
    #Hashing the records and compare the old and new dataframes
    existing_branch_dim = spark.read.parquet(golden_layer_path)
    existing_branch_dim_without_sk = existing_branch_dim.select('branch_id', 'location', 'establish_date', 'class', 'hashkey')
    all_cols = F.concat_ws("", *branches_dim.columns)
    new_branches_data = branches_dim.withColumn("hashkey", F.md5(all_cols))
    new_branches_data = new_branches_data.join(existing_branch_dim_without_sk, "hashkey", "left_anti")
        
    if new_branches_data.rdd.isEmpty() == False:
        # Get the maximum surrogate key from existing data
        max_sur_key = existing_branch_dim.agg({"branch_sur_key": "max"}).collect()[0][0]
        window_spec = Window.orderBy("branch_id")
        # Add surrogate keys to new data starting from max_sur_key + 1
        branches_dim = new_branches_data.withColumn('branch_sur_key', (row_number().over(window_spec) + max_sur_key).cast("int"))
        branches_dim_sk = branches_dim.select('branch_sur_key','branch_id', 'location', 'establish_date', 'class')
        # Combine existing data with new data
        existing_branch_dim_without_hash = existing_branch_dim.drop("hashkey")
        updated_branches_dim = existing_branch_dim_without_hash.union(branches_dim_sk)
        branches_dim = branches_dim.select('branch_sur_key','branch_id', 'location', 'establish_date', 'class','hashkey')
        branches_dim = branches_dim.repartition(1)
        branches_dim.write.mode('append') \
                    .format('parquet') \
                    .save(golden_layer_path)
        #write the new data on the branches dimension hive table
        write_df_to_table('retail_DWH', 'Branches_Dim', branches_dim_sk)
    else:
        updated_branches_dim = existing_branch_dim.drop("hashkey")
        print("No new Data")
else:
    # Add a sequential surrogate key column
    all_cols = F.concat_ws("", *branches_dim.columns)
    hashed_branch_dim = branches_dim.withColumn("hashkey", F.md5(all_cols))
    window_spec = Window.orderBy("branch_id")
    hashed_branch_dim = hashed_branch_dim.withColumn('branch_sur_key', row_number().over(window_spec))
    hashed_branch_dim = hashed_branch_dim.select('branch_sur_key','branch_id', 'location', 'establish_date', 'class','hashkey')
    updated_branches_dim = hashed_branch_dim.select('branch_sur_key', 'branch_id', 'location', 'establish_date', 'class')

    updated_branches_dim.show()
    #write the new data on the branches dimension hive table
    write_df_to_table('retail_DWH', 'Branches_Dim', updated_branches_dim)
    # Write the updated data back to HDFS
    branches_dim.repartition(1)
    branches_dim.write.mode('overwrite') \
        .format('parquet') \
        .save(golden_layer_path)    


No new Data


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 35378)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/opt/spark2/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/opt/spark2/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/opt/spark2/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/opt/spark2/python/pyspark/serializers.py", line 724, in read_int
 

In [12]:
# Define the file path for the initial CSV data and the golden layer path on HDFS
file_path = f"hdfs://localhost:9000/data/retail_bronze/{date_str}/08/sales_agents_SS_raw_{date_str}_08.parquet"
golden_layer_path = "hdfs://localhost:9000/data/retail_gold/sales_agent_dim"

# Load the CSV data into a PySpark DataFrame
agent_dim = spark.read.parquet(file_path)

# Convert hire_date to date type if needed
agent_dim = agent_dim.withColumn("hire_date", col("hire_date").cast("date"))

# Drop duplicates based on sales_person_id if necessary
agent_dim = agent_dim.dropDuplicates(['sales_person_id'])

if check_if_exists(golden_layer_path):
    #Hashing the records and compare the old and new dataframes
    existing_agent_dim = spark.read.parquet(golden_layer_path)
    existing_agent_dim = existing_agent_dim.withColumn('sales_agent_sur_key', col("sales_agent_sur_key").cast("int"))
    existing_agent_dim_without_sk = existing_agent_dim.select('sales_person_id', 'name', 'hire_date', 'hashkey')
    all_cols = F.concat_ws("", *agent_dim.columns)
    new_sales_agent_data = agent_dim.withColumn("hashkey", F.md5(all_cols))
    new_sales_agent_data = new_sales_agent_data.join(existing_agent_dim_without_sk, "hashkey", "left_anti")
    
    if not new_sales_agent_data.rdd.isEmpty():
        # Get the maximum surrogate key from existing data
        max_sur_key = existing_agent_dim.agg({"sales_agent_sur_key": "max"}).collect()[0][0]
        print(max_sur_key)
        # Add surrogate keys to new data starting from max_sur_key + 1
        window_spec = Window.orderBy("sales_person_id")
        new_sales_agent_data = new_sales_agent_data.withColumn('sales_agent_sur_key', (row_number().over(window_spec) + max_sur_key).cast("int"))
        agent_dim_with_sk = agent_dim.select('sales_agent_sur_key','sales_person_id', 'name', 'hire_date')
        # Combine existing data with new data
        existing_agent_dim_without_hash = existing_agent_dim.drop("hashkey")
        updated_agent_dim = existing_agent_dim_without_hash.union(agent_dim_with_sk)
        new_sales_agent_data = new_sales_agent_data.select('sales_agent_sur_key','sales_person_id', 'name', 'hire_date', 'hashkey')
        new_sales_agent_data = new_sales_agent_data.repartition(1)
        
        # Write the new data back to HDFS
        new_sales_agent_data.write.mode('append') \
            .format('parquet') \
            .save(golden_layer_path)
        #write the new data on the sales agent dimension hive table
        write_df_to_table('retail_DWH', 'sales_agents_Dim', agent_dim_with_sk)
    else:
        updated_agent_dim = existing_agent_dim.drop("hashkey")
        print("No new Data")
else:
    all_cols = F.concat_ws("", *agent_dim.columns)
    hashed_agent_dim = agent_dim.withColumn("hashkey", F.md5(all_cols))
    # Add a sequential surrogate key column
    window_spec = Window.orderBy("sales_person_id")
    hashed_agent_dim = hashed_agent_dim.withColumn('sales_agent_sur_key', row_number().over(window_spec))
    
    updated_agent_dim = hashed_agent_dim.select('sales_agent_sur_key', 'sales_person_id', 'name', 'hire_date')
    updated_agent_dim.show()
    hashed_agent_dim=hashed_agent_dim.select('sales_agent_sur_key','sales_person_id', 'name', 'hire_date', 'hashkey')
    # Write the updated data back to HDFS
    hashed_agent_dim.repartition(1)
    hashed_agent_dim.write.mode('overwrite') \
        .format('parquet') \
        .save(golden_layer_path)
    #write the new data on the sales agent dimension hive table
    write_df_to_table('retail_DWH', 'sales_agents_Dim', updated_agent_dim)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:34417)
Traceback (most recent call last):
  File "/opt/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1159, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/opt/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark2/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway

Py4JError: An error occurred while calling o41.read

In [29]:
#create date dimension
date_dim_path = "hdfs://localhost:9000/data/retail_gold/date_dim"

if not check_if_exists(date_dim_path):
    # Generate date range
    start_date = date(2012, 1, 1)
    end_date = date(2100, 12, 31)

    date_range = [start_date + timedelta(days=x) for x in range((end_date - start_date).days + 1)]
    date_df = spark.createDataFrame([(d,) for d in date_range], ["datee"]).withColumn("datee", col("datee").cast("date"))

    # Add date attributes
    date_dim = date_df.withColumn("year", year(col("datee"))) \
        .withColumn("month", month(col("datee"))) \
        .withColumn("day", lpad(dayofmonth(col("datee")), 2, "0")) \
        .withColumn("week", weekofyear(col("datee"))) \
        .withColumn("weekday", dayofweek(col("datee"))) \
        .withColumn("quarter", floor((month(col("datee")) - 1) / 3) + 1) \
        .withColumn("day_name", date_format(col("datee"), "EEEE")) \
        .withColumn("month_name", date_format(col("datee"), "MMMM")) \
        .withColumn("is_weekend", when(col("weekday").isin([1, 7]), lit(1)).otherwise(lit(0)))

    # Add surrogate key column
    date_dim = date_dim.withColumn("date_sur_key",concat(col('year'), col('month'), col('day')).cast('long'))

    # Define the output directory for the date dimension
    date_dim_path = "hdfs:///data/retail_gold/date_dim"

    try:
        # Write the date dimension to a single parquet file
        date_dim.repartition(1) \
            .write.mode('overwrite') \
            .format('parquet') \
            .save(date_dim_path)
        print(f"Date dimension table saved to {date_dim_path}")
        write_df_to_table('retail_DWH', 'date_dim', date_dim, ['year'])
    except Exception as e:
        print(f"An error occurred: {e}")

else:
    date_dim = spark.read.parquet(date_dim_path)
    print("Date dimension already exists")

Date dimension already exists


In [30]:
 #fact One (offline)
#print(input.columns)
offline_fact=input_df.filter(col('is_online')=="no")
columns_to_drop=['shipping_address','customer_fname','cusomter_lname','offer_1','offer_2',
                'offer_3','offer_4','offer_5','product_name','product_category','customer_email']

offline_fact=offline_fact.drop(*columns_to_drop)

offline_fact=offline_fact.withColumn("transaction_date", col("transaction_date").cast(DateType()))
final_price=(col('units') * col('unit_price') * (1 - col('discount_perc') / 100))
offline_fact=offline_fact.withColumn("total_price",final_price)
offline_fact=offline_fact.join(updated_customers_dim, on='customer_id', how='left') \
                       .join(updated_product_dim, on='product_id', how='left') \
                       .join(date_dim, date_dim.datee == offline_fact.transaction_date, 'left') \
                       .join(updated_agent_dim, updated_agent_dim.sales_person_id == offline_fact.sales_agent_id, 'left') \
                       .join(updated_branches_dim, updated_branches_dim.branch_id == offline_fact.branch_id, 'left')

offline_fact = offline_fact.withColumn("insertion_date", date_format(lit(current_date()), "yyyyMMdd"))
        
offline_fact = offline_fact.select(
    'transaction_id',
    'branch_sur_key',
    'product_sur_key',
    'customer_sur_key',
    'sales_agent_sur_key',
    'date_sur_key',
    'units',
    'unit_price',
    'discount_perc',
    'total_price',
    'payment_method',
    'insertion_date'
)

fact_off_dim_path="hdfs://localhost:9000/data/retail_gold/offline_fact"

if check_if_exists(fact_off_dim_path):
    #Hashing the records and compare the old and new dataframes
    old_df = spark.read.parquet(fact_off_dim_path)
    all_cols = F.concat_ws("", *offline_fact.columns)
    new_offline_fact = offline_fact.withColumn("hashkey", F.md5(all_cols))
    new_offline_fact = new_offline_fact.join(old_df, "hashkey", "left_anti")
    if new_offline_fact.rdd.isEmpty() == False:
        offline_fact=new_offline_fact.drop("hashkey")
        new_offline_fact=new_offline_fact.repartition(1)
        #write the new data on HDFS
        new_offline_fact.write.mode('append') \
                    .format('parquet') \
                    .save(fact_off_dim_path)
        #inserting new data into hive branches_TRX_fact table
        write_df_to_table('retail_DWH', 'branches_TRX_fact', offline_fact,['payment_method'])
        print("done")
    else:
        print("No new Data")
    
else:
    all_cols = F.concat_ws("", *offline_fact.columns)
    hashed_offline_fact = offline_fact.withColumn("hashkey", F.md5(all_cols))
    hashed_offline_fact=hashed_offline_fact.repartition(1)
    #write the new data on HDFS
    hashed_offline_fact.write.mode('overwrite') \
                .format('parquet') \
                .save(fact_off_dim_path)

    #rename_in_hdfs(fact_off_dim_path,extension,name)
    #inserting new data into hive branches_TRX_fact table
    write_df_to_table('retail_DWH', 'branches_TRX_fact', offline_fact,['payment_method'])

No new Data


In [31]:
# online_fact
online_fact = input_df.filter(col('is_online')=="yes")

columns_to_drop = ['customer_fname','cusomter_lname','sales_agent_id','offer_1','offer_2',
                'offer_3','offer_4','offer_5','product_name','product_category','customer_email']

online_fact = online_fact.drop(*columns_to_drop)

# cast transaction_date to date type 
online_fact=online_fact.withColumn("transaction_date", col("transaction_date").cast(DateType()))

# calculate the final price 
final_price=(col('units') * col('unit_price') * (1 - (col('discount_perc') / 100)))
online_fact=online_fact.withColumn("total_price",final_price)

#processing address column
split_address_col=split(col("shipping_address"),'/')
online_fact=online_fact.withColumn('street',split_address_col.getItem(0))\
                        .withColumn('city',split_address_col.getItem(1))\
                        .withColumn('state',split_address_col.getItem(2))\
                        .withColumn('postal_code',split_address_col.getItem(3))

# Join with dimension tables using left join
online_fact = online_fact.join(updated_customers_dim, on='customer_id', how='left') \
                         .join(updated_product_dim, on='product_id', how='left') \
                         .join(date_dim, date_dim.datee == online_fact.transaction_date, 'left')
online_fact = online_fact.withColumn('insertion_date', date_format(lit(current_date()), "yyyyMMdd"))


online_fact = online_fact.select(
    'transaction_id',
    'units',
    'unit_price',
    'payment_method',
    'discount_perc',
    'total_price',
    'customer_sur_key',
    'product_sur_key',
    'date_sur_key',
    'street',
    'city',
    'state',
    'postal_code',
    'insertion_date'
)

online_fact_path="hdfs://localhost:9000/data/retail_gold/online_fact"

if check_if_exists(online_fact_path):
    #Hashing the records and compare the old and new dataframes
    old_df = spark.read.parquet(online_fact_path)
    all_cols = F.concat_ws("", *online_fact.columns)
    new_online_fact = online_fact.withColumn("hashkey", F.md5(all_cols))
    new_online_fact = new_online_fact.join(old_df, "hashkey", "left_anti")
    if new_online_fact.rdd.isEmpty() == False:
        online_fact=new_online_fact.drop("hashkey")
        new_online_fact=new_online_fact.repartition(1)
        #writing the new data on HDFS
        new_online_fact.write.mode('append') \
                    .format('parquet') \
                    .save(online_fact_path)
        #inserting new data into hive online_TRX_fact table
        write_df_to_table('retail_DWH', 'online_TRX_fact', online_fact,['payment_method'])
        print("done")
    else:
        print("No new Data")    
else:
    all_cols = F.concat_ws("", *online_fact.columns)
    hashed_online_fact = online_fact.withColumn("hashkey", F.md5(all_cols))
    hashed_online_fact=hashed_online_fact.repartition(1)
    #writing the new data on HDFS
    hashed_online_fact.write.mode('overwrite') \
             .format('parquet') \
             .save(online_fact_path)
    #inserting new data into hive online_TRX_fact table
    write_df_to_table('retail_DWH', 'online_TRX_fact', online_fact,['payment_method'])

No new Data


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 45482)
Traceback (most recent call last):
  File "/usr/lib/python3.6/socketserver.py", line 320, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 351, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.6/socketserver.py", line 364, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.6/socketserver.py", line 724, in __init__
    self.handle()
  File "/opt/spark2/python/pyspark/accumulators.py", line 269, in handle
    poll(accum_updates)
  File "/opt/spark2/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/opt/spark2/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/opt/spark2/python/pyspark/serializers.py", line 724, in read_int
 

Daily report for the B2B team

In [None]:
if hour_str == 23:
    offline_fact = spark.read.parquet("hdfs:///data/retail_gold/offline_fact/")
    daily_fact = offline_fact.filter(col('insertion_date') == date_str)
    daily_fact = daily_fact.join(updated_agent_dim, on ='sales_agent_sur_key' , how='left') \
                            .join(updated_product_dim, on='product_sur_key', how='left')
    daily_fact = daily_fact.select('name', 'product_name', 'units')
    daily_report = daily_fact.groupBy("name","product_name").agg({"units": "sum"})
    daily_report.show()
    daily_report = daily_report.coalesce(1)
    daily_report.write.option("header", "true").csv(f"file:///home/itversity/itversity-material/Retail_pipeline_project/Daily_report/report_{date_str}")

In [None]:
spark.stop()

In [None]:
#updated_customers_dim.printSchema()

In [None]:
#updated_product_dim.printSchema()

In [None]:
#date_dim.printSchema()

In [None]:
#online_fact.printSchema()

In [None]:
#offline_fact.printSchema()

In [None]:
#updated_agent_dim.printSchema()

In [None]:
#updated_branches_dim.printSchema()

In [None]:
#cust = spark.read.parquet("hdfs:///data/retail_gold/customer_dim/")
#cust.show()
#cust.printSchema()

In [None]:
#spark.sql("DESCRIBE FORMATTED retail_DWH.branches_TRX_fact").show(50,truncate=False)

#SET hive.exec.dynamic.partition = true;
#SET hive.exec.dynamic.partition.mode = nonstrict;
#SET hive.mapred.mode = nonstrict;