In [0]:
%fs ls /mnt/data/2023-kaggle-final/store-sales/

In [0]:
# Create a Database

In [0]:
import re
userName = spark.sql("SELECT CURRENT_USER").collect()[0]['current_user()']
userName0 = userName.split("@")[0]
userName0 = re.sub('[!#$%&\'*+-/=?^`{}|\.]+', '_', userName0)
userName1 = userName.split("@")[1]
userName = f'{userName0}@{userName1}'
dbutils.fs.mkdirs(f"/Users/{userName}/data")
userDir = f"/Users/{userName}/data"
databaseName = f"{userName0}_FinalProject_01"

print('databaseName ' + databaseName)
print('UserDir ' + userDir)

spark.sql(f"CREATE DATABASE IF NOT EXISTS {databaseName}")
spark.sql(f"use {databaseName}")

In [0]:
#Create a AutoLoader to load the files from the path

In [0]:
print('UserDir ' + userDir)

In [0]:
rawDataSource='/mnt/data/2023-kaggle-final/store-sales'
bronzeCheckpoint = f"{userDir}/bronze_check_point"
bronzeTable = f"{userDir}/bronze"
bronzeSchema = f"{userDir}/bronze_schema"

In [0]:
%fs ls /Users/veg940@g.harvard.edu/data/bronze_schema

In [0]:
%fs ls /Users/veg940@g.harvard.edu/data/bronze_check_point

In [0]:
import os
from pyspark.sql.functions import input_file_name,expr
from pyspark.sql.types import ArrayType,IntegerType,StringType

In [0]:
# Define Autoloader options
options = {
  "cloudFiles.useNotifications": "true",
  "cloudFiles.format": "csv",
  "cloudFiles.includeExistingFiles": "true",
  "sep": ",",
  "cloudFiles.schemaLocation": bronzeSchema
}
#df = spark.readStream.format("cloudFiles").options(**options).load(rawDataSource)
# Read CSV files as a streaming DataFrame
streaming_df = (
    spark.readStream.format("cloudFiles")
    .options(**options)
    .load(rawDataSource)
    .withColumn("filename", input_file_name())
)


In [0]:
df=streaming_df.writeStream.option("path", bronzeTable).option("checkpointLocation", bronzeCheckpoint).option("mergeSchema", "true").table("bronzetable")

In [0]:
import os
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import ArrayType,IntegerType,StringType

In [0]:
# load the tables using spark.read as batch for doing null checks, eda and transforming and deduplicates

In [0]:
StorefilePath = [('dbfs:/mnt/data/2023-kaggle-final/store-sales/holidays_events.csv', 'holidays_events'),
('dbfs:/mnt/data/2023-kaggle-final/store-sales/oil.csv', 'oil'),
('dbfs:/mnt/data/2023-kaggle-final/store-sales/sample_submission.csv','sample_submission') ,
('dbfs:/mnt/data/2023-kaggle-final/store-sales/stores.csv','stores'),
('dbfs:/mnt/data/2023-kaggle-final/store-sales/test.csv','test_set'),
('dbfs:/mnt/data/2023-kaggle-final/store-sales/train.csv','train_set'),
('dbfs:/mnt/data/2023-kaggle-final/store-sales/transactions.csv','transactions')]

for file_name, tab_name in StorefilePath:
  StoresDF = (spark.read
    .option("sep", ",")
    .option("header", True)
    .option("inferSchema", True)
    .csv(file_name))
  StoresDF.createOrReplaceTempView(tab_name)

# Null checks and Handling Nulls by removing null rows.

In [0]:
holiday_events_df = spark.sql("SELECT * FROM holidays_events")
display(holiday_events_df)

In [0]:
# Number of Rows after null were dropped, the entire row will be dropped even if any of the column value is null

In [0]:
holiday_dropna_events_df = holiday_events_df.dropna("any")
display(holiday_dropna_events_df)

In [0]:
oil_df=spark.sql("SELECT * FROM oil")
display(oil_df)

In [0]:
oil_dropna_df = oil_df.dropna("any")
display(oil_dropna_df)

In [0]:
sample_submission_df = spark.sql("SELECT * FROM sample_submission")
display(sample_submission_df)

In [0]:
sample_submission_dropna_df = sample_submission_df.dropna("any")
display(sample_submission_dropna_df)

In [0]:
stores_df = spark.sql("SELECT * FROM stores")
display(stores_df)

In [0]:
stores_dropna_df = stores_df.dropna("any")
display(stores_dropna_df)


In [0]:
test_df = spark.sql("SELECT * FROM test_set")
#display(test_df)

In [0]:
test_dropna_df = test_df.dropna("any")
#display(test_dropna_df)

In [0]:
train_df = spark.sql("SELECT * FROM train_set")
#display(train_df)

In [0]:
train_dropna_df = train_df.dropna("any")
#display(train_dropna_df)

In [0]:
transactions_df = spark.sql("SELECT * FROM transactions")
display(transactions_df)

In [0]:
transactions_dropna_df = transactions_df.dropna("any")
display(transactions_dropna_df)

# EDA

In [0]:
import seaborn as sns
import matplotlib.pyplot as plt
pandas_df = train_dropna_df.toPandas()

**Check any correlation exists in train_set**

In [0]:
# Correlation Scatter Plot between the features.

In [0]:
sns.pairplot(pandas_df)
plt.show()

We could observe there is a relationship exists between on promotion predictor and sales (target variable) and observed one or two outliers

**2. Check any impact on oil price with target variable(sales)**

In [0]:
display(oil_dropna_df)

In [0]:
# Merging Oil Price DF with Train DataSEt

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.functions import when

In [0]:
# Convert string date column to date type
df1 = oil_df.withColumn("date", col("date").cast("date"))
df2 = train_df.withColumn("date", col("date").cast("date"))

# Joining DataFrames on the 'date' column
joined_df = df1.join(df2, on='date', how='inner')

# Display the joined DataFrame
joined_df.show()

In [0]:
# Scatter Plot between Sales and Oil Price 

In [0]:
# Collect the data for the two columns
data_to_plot = joined_df.select('sales', 'dcoilwtico').collect()

# Extract values for plotting
x = [row['dcoilwtico'] for row in data_to_plot]
y = [row['sales'] for row in data_to_plot]

# Plotting a scatter plot using Matplotlib
plt.scatter(x, y)
plt.xlabel('Oil Price')
plt.ylabel('Sales')
plt.title('Scatter Plot between Oil Price and Sales')
plt.show()

Analysis: As the oil price increases, the sales havent increased.<br>
Observed 1 outlier,  that might skew the data and might impact modeling.

In [0]:
# Data Transformation

In [0]:
joined_df.createTempView("train_with_oilprice")