In [0]:
from pyspark.sql import SparkSession
import pandas as pd
from datetime import datetime
import re
import os

In [0]:
spark = SparkSession.builder.getOrCreate()

# Get JSON File Paths in Unity Catalog

In [0]:
volume_path = "/Volumes/md_project/bronze/json_files/"
files = os.listdir(volume_path)
print(files)
 

# Read in json files as spark dataframes

In [0]:
countries_df = spark.read.json(os.path.join(volume_path, files[0]))
customers_df = spark.read.json(os.path.join(volume_path, files[1]))
orders_df = spark.read.json(os.path.join(volume_path, files[2]))
products_df = spark.read.json(os.path.join(volume_path, files[3]))
sales_df = spark.read.json(os.path.join(volume_path, files[4]))

# Convert spark dataframes to pandas dataframes for easy data manipulation
- If the datasets were very large, I would use spark functions to perform data manipulation

In [0]:
countries_df = countries_df.toPandas()
customers_df = customers_df.toPandas()
orders_df = orders_df.toPandas()
products_df = products_df.toPandas()
sales_df = sales_df.toPandas()

# Perform Data Clean Up

### Check for Duplicates

In [0]:
def update_dict_values():
    dataframes = {
        "countries_df": countries_df,
        "customers_df": customers_df,
        "orders_df": orders_df,
        "products_df": products_df,
        "sales_df": sales_df
    }
    return dataframes

Based on the output below, there are 3 duplicate rows in sales_df

In [0]:
dataframes = update_dict_values()

for df_name, df in dataframes.items():
    print(f"Duplicates in {df_name}: {df.duplicated().sum()}")

Investigate the sales_df to make sure that duplicates shouldn't exist.

In [0]:
sales_df.head()

Drop the duplicates in sales_df assuming that it is incorrect to have a duplication. 

The output below confirms that the duplicate records have been dropped.

In [0]:
for df_name, df in dataframes.items():
    df.drop_duplicates(inplace=True)

dataframes = update_dict_values()

for df_name, df in dataframes.items():
    print(f"Duplicates in {df_name}: {df.duplicated().sum()}")

### Check for Null Values
- Based on the output below, for each file, there is 1 row that has all null values

In [0]:
for df_name, df in dataframes.items():
    print(f"Null rows in {df_name}: {df.isnull().all(axis=1).sum()}")

The output below confirms that all the null rows have been dropped

In [0]:
for df_name, df in dataframes.items():
    df.dropna(inplace=True)

dataframes = update_dict_values()

countries_df.isnull().all(axis=1).sum()
for df_name, df in dataframes.items():
    print(f"Null rows in {df_name}: {df.isnull().all(axis=1).sum()}")

# Add Date and Source Columns
- Adding these columns allows for easier debugging or tracking

In [0]:
def add_metadata_columns(df, file_name):
    df['file_name'] = file_name
    df['upload_date'] = datetime.now()

In [0]:
for df_name, df in dataframes.items():
    add_metadata_columns(df, df_name.replace("_df", ".json"))

Clean the column names to remove spaces and special characters

In [0]:
def clean_column_names(df):
    cleaned_columns = []
    for col in df.columns:
        col = col.replace(' ', '_').strip().lower()
        col = re.sub(r'[^\w]+', '', col)
        col = re.sub(r'_+', '_', col)
        col = col.strip('_')
        cleaned_columns.append(col)
    df.columns = cleaned_columns
    return df


In [0]:
for df_name, df in dataframes.items():
    clean_column_names(df)

# Write Tables to the Silver Layer in Unity Catalog

In [0]:
def write_to_unity_catalog(df, table_name):
    df.write \
        .format("delta") \
        .mode("overwrite") \
        .saveAsTable(f"md_project.silver.{table_name}")

In [0]:
for df_name, df in dataframes.items():
    df = spark.createDataFrame(df)
    write_to_unity_catalog(df, df_name.replace("_df", ""))