
## # **_- Walmart sales forecast data transformation in Azure Databricks using pyspark:_**

In this part of the project the data transformation process in performed using pyspark. Handling of NaN / null values along with rows and column repetitions is performed. In the end one comprehensive dataframe named as "dataset" is stored in the container of Azure data lake storage.

In [0]:
# Configure the storage account
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "baa10826-c406-453a-9ce5-7d4cf3340de8",
  "fs.azure.account.oauth2.client.secret": "dPA8Q~q0pttTvUuBUJwfYNo96x3zQafdmM7PBcfR",
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/25a72ede-1750-4016-b257-1edaee97756d/oauth2/token"
}

# # Provide a mount for the storage container
# dbutils.fs.mount(
#   source="abfss://walmart-sales-data-container@walmartsalesdata.dfs.core.windows.net",
#   mount_point="/mnt/walamrt-data-transform-mount",
#   extra_configs=configs
# )



In [0]:
%fs
ls "/mnt/walamrt-data-transform-mount"

path,name,size,modificationTime
dbfs:/mnt/walamrt-data-transform-mount/Cleaned Data/,Cleaned Data/,0,1720701292000
dbfs:/mnt/walamrt-data-transform-mount/Raw data/,Raw data/,0,1720527896000


In [0]:
#read the features and stores data as a Spark DataFrame.

features = spark.read.format("csv").option("header", "true").load("/mnt/walamrt-data-transform-mount/Raw data/features.csv", header=True)

stores = spark.read.format("csv").option("header", "true").load("/mnt/walamrt-data-transform-mount/Raw data/stores.csv", header=True)

# read train and test data set.

train = spark.read.format("csv").option("header", "true").load("/mnt/walamrt-data-transform-mount/Raw data/train.csv", header=True)

test = spark.read.format("csv").option("header", "true").load("/mnt/walamrt-data-transform-mount/Raw data/test.csv", header=True)


In [0]:
#Analyse the data 
#Display the schema and first few rows of the data.

features.printSchema()
features.show(5)

stores.printSchema()
stores.show(5) 

train.printSchema() 
train.show(5)

test.printSchema()
test.show(5)



root
 |-- Store: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Temperature: string (nullable = true)
 |-- Fuel_Price: string (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)
 |-- IsHoliday: string (nullable = true)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|    FALSE|
|  

In [0]:
#Transform the datatypes of the dataframes to appropriate datatypes:
from pyspark.sql.functions import col

#Transform the data types for features dataframe: (changing the IsHoliday name to IsHoliday_a for joining with features dataframe)
features = features.withColumn("Store", col("Store").cast("integer"))\
                    .withColumn("Date", col("Date").cast("date"))\
                    .withColumn("Temperature", col("Temperature").cast("float"))\
                    .withColumn("Fuel_Price", col("Fuel_Price").cast("float"))\
                    .withColumn("MarkDown1", col("MarkDown1").cast("float"))\
                    .withColumn("MarkDown2", col("MarkDown2").cast("float"))\
                    .withColumn("MarkDown3", col("MarkDown3").cast("float"))\
                    .withColumn("MarkDown4", col("MarkDown4").cast("float"))\
                    .withColumn("MarkDown5", col("MarkDown5").cast("float"))\
                    .withColumn("CPI", col("CPI").cast("float"))\
                    .withColumn("Unemployment", col("Unemployment").cast("float"))\
                    .withColumn("IsHoliday_a", col("IsHoliday").cast("boolean"))\
                    .drop("IsHoliday")        

#Transform the data types for stores dataframe:
stores = stores.withColumn("Store", col("Store").cast("integer"))\
                .withColumn("Size", col("Size").cast("integer"))

#Transform the data types for train dataframe: (changing the IsHoliday name to IsHoliday_b for joining with features dataframe)
train = train.withColumn("Store", col("Store").cast("integer"))\
             .withColumn("Dept", col("Dept").cast("integer"))\
             .withColumn("Date", col("Date").cast("date"))\
             .withColumn("IsHoliday_b", col("IsHoliday").cast("boolean"))\
             .withColumn("Weekly_Sales", col("Weekly_Sales").cast("float"))\
             .drop("IsHoliday") 

#Transform the data types for test dataframe:
test = test.withColumn("Store", col("Store").cast("integer"))\
             .withColumn("Dept", col("Dept").cast("integer"))\
             .withColumn("Date", col("Date").cast("date"))\
             .withColumn("IsHoliday", col("IsHoliday").cast("boolean"))


#Analyze the schema by printing the schema of the dataframes.
features.printSchema()
stores.printSchema()
train.printSchema()
test.printSchema()



root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Fuel_Price: float (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday_a: boolean (nullable = true)

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_Sales: float (nullable = true)
 |-- IsHoliday_b: boolean (nullable = true)

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [0]:
#Create one comprehensive data set dataframe by joining the features, stores, and train dataframes. (Ignoring the test dataframe because of repetative data)

# Join train dataset with features
train_features = train.join(features, on=["Store", "Date"], how="left")

# Join the resulting DataFrame with stores
merged_df = train_features.join(stores, on=["Store"], how="left")

# Display the schema of the merged DataFrame to verify
merged_df.printSchema()

# Display the first few rows of the merged DataFrame to verify
merged_df.show(5)


root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Weekly_Sales: float (nullable = true)
 |-- IsHoliday_b: boolean (nullable = true)
 |-- Temperature: float (nullable = true)
 |-- Fuel_Price: float (nullable = true)
 |-- MarkDown1: float (nullable = true)
 |-- MarkDown2: float (nullable = true)
 |-- MarkDown3: float (nullable = true)
 |-- MarkDown4: float (nullable = true)
 |-- MarkDown5: float (nullable = true)
 |-- CPI: float (nullable = true)
 |-- Unemployment: float (nullable = true)
 |-- IsHoliday_a: boolean (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

+-----+----------+----+------------+-----------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+-----------+----+------+
|Store|      Date|Dept|Weekly_Sales|IsHoliday_b|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|IsHolida

In [0]:
#clean the merge dataset (Remove the replace the Null / NA values to 0 from columns)
dataset = merged_df.fillna(0)

#check and remove duplicate rows If any:
# Check for duplicate rows
total_count = dataset.count()
distinct_count = dataset.distinct().count()

if total_count == distinct_count:
    print("No duplicate rows found.")
else:
    print(f"Duplicate rows found: {total_count - distinct_count}")

# Optional: Show duplicate rows if any
if total_count != distinct_count:
    duplicate_rows_df = dataset.groupBy(dataset.columns).count().filter("count > 1")
    duplicate_rows_df.show()   

#In the columns of the dataset dataframe, remove the IsHoliday_a and chnage IsHoliday_b name to IsHoliday.
dataset = dataset.drop("IsHoliday_a")
dataset = dataset.withColumnRenamed("IsHoliday_b", "IsHoliday")

dataset.printSchema()
dataset.show(5)


No duplicate rows found.
root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Weekly_Sales: float (nullable = false)
 |-- IsHoliday: boolean (nullable = true)
 |-- Temperature: float (nullable = false)
 |-- Fuel_Price: float (nullable = false)
 |-- MarkDown1: float (nullable = false)
 |-- MarkDown2: float (nullable = false)
 |-- MarkDown3: float (nullable = false)
 |-- MarkDown4: float (nullable = false)
 |-- MarkDown5: float (nullable = false)
 |-- CPI: float (nullable = false)
 |-- Unemployment: float (nullable = false)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

+-----+----------+----+------------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------+
|Store|      Date|Dept|Weekly_Sales|IsHoliday|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|Type|  Size|
+-----+----------+----

In [0]:
# In this project the aim is to predict the sales. hence, Let's analyse the weekly sales of each store and each departments.

#count the sales by department:
distinct_departments_count = dataset.select("Dept").distinct().count()
print(f"Distinct number of departments: {distinct_departments_count}")

#count the stores:
distinct_stores_count = dataset.select("Store").distinct().count()
print(f"Distinct number of stores: {distinct_stores_count}")

#Import the required functions
from pyspark.sql.functions import mean
import pandas as pd

# Create a pivot table showing mean Weekly_Sales for stores and departments
pivot_table = dataset.groupBy("Store").pivot("Dept").agg(mean("Weekly_Sales").alias("Avg_Weekly_Sales"))

# Convert the pivot table to Pandas DataFrame for better visualization
pivot_table_pd = pivot_table.toPandas()

# Display the pivot table using Pandas
pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.max_rows', None)     # Show all rows
pd.set_option('display.expand_frame_repr', False)  # Don't wrap the DataFrame in the output

print(pivot_table_pd)


Distinct number of departments: 81
Distinct number of stores: 45
    Store             1              2             3             4             5             6             7             8             9            10            11            12            13            14            16            17            18           19            20            21            22            23            24            25            26           27           28            29            30           31            32            33            34           35           36           37             38         39            40            41            42        43            44         45            46          47           48            49           50         51           52          54            55           56            58           59           60            65            67            71             72            74           77         78            79            80            81            82     

In [0]:
#From the above analysis it is observed that the average weekly sales for some department and stores are in negative and NaN values, which is illogical. Hence we need to remove these rows.

# Filter the DataFrame for Weekly_Sales <= 0
negative_or_zero_sales_df = dataset.filter(dataset.Weekly_Sales <= 0)

# Count the number of rows with Weekly_Sales <= 0
negative_or_zero_sales_count = negative_or_zero_sales_df.count()

# Display the count for total rows having weekly_sales <= 0:
print(f"Number of rows with Weekly_Sales <= 0: {negative_or_zero_sales_count}")

# Display the count for total rows of dataset
print(f"Total number of data rows: {dataset.count()}")

# Display the percentage of the data rows out of total to be discarded:
print(f"The invalid data to be discarded amounts to {negative_or_zero_sales_count / dataset.count() * 100:.2f}%")

# Optionally, show the rows with Weekly_Sales <= 0 for further analysis
negative_or_zero_sales_df.show()



Number of rows with Weekly_Sales <= 0: 1358
Total number of data rows: 421570
The invalid data to be discarded amounts to 0.32%
+-----+----------+----+------------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------+
|Store|      Date|Dept|Weekly_Sales|IsHoliday|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|Type|  Size|
+-----+----------+----+------------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------+
|    1|2012-08-10|   6|     -139.65|    false|      85.05|     3.494| 11436.22|    245.0|     6.85|  6964.26|  4836.22|221.95844|       6.908|   A|151315|
|    1|2012-05-04|  18|       -1.27|    false|      75.55|     3.749| 21290.13|      0.0|    69.89|  4977.35|  3261.04| 221.6718|       7.143|   A|151315|
|    1|2010-02-19|  47|      -863.0|    false|      39.93|     2.514|      0.0|      0.0|      0.

In [0]:
#Remove the invalid data rows from the dataset:
# Filter the DataFrame to remove rows with Weekly_Sales <= 0
dataset = dataset.filter(dataset.Weekly_Sales > 0)

# Count the number of rows and columns in the filtered DataFrame
row_count = dataset.count()
column_count = len(dataset.columns)

# Display the shape of the DataFrame
print(f"Shape of the DataFrame: ({row_count}, {column_count})")

# Optionally, show the first few rows of the filtered DataFrame for verification
dataset.show()

Shape of the DataFrame: (420212, 16)
+-----+----------+----+------------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------+
|Store|      Date|Dept|Weekly_Sales|IsHoliday|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|      CPI|Unemployment|Type|  Size|
+-----+----------+----+------------+---------+-----------+----------+---------+---------+---------+---------+---------+---------+------------+----+------+
|    1|2010-02-05|   1|     24924.5|    false|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.09636|       8.106|   A|151315|
|    1|2010-02-12|   1|    46039.49|     true|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.24217|       8.106|   A|151315|
|    1|2010-02-19|   1|    41595.55|    false|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.28914|       8.106|   A|151315|
|    1|2010-02-26|   1|    19403.

In [0]:
#Now that the process of cleaning and transforming the dataset is complete, we can persist the cleaned data to Azure data lake gen 2 in the form of csv:
# Specify the path to store the DataFrame in Azure Data Lake Storage Gen2
output_path = "/mnt/walamrt-data-transform-mount/Cleaned Data/"

# Write the DataFrame to the Data Lake as CSV files with headers
dataset.write.format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(output_path)

# Print a confirmation message
print("Data successfully written to Azure Data Lake Storage Gen2 with headers.")



Data successfully written to Azure Data Lake Storage Gen2 with headers.
