# Import pyspark libs

In [1]:
%%pyspark
import random
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, when, current_timestamp, trim, udf
from pyspark.sql.types import DateType
from datetime import datetime, timedelta

SRC_PARQUET_DIR = 'abfss://depi3@depi3.dfs.core.windows.net/Bronze Layer'
DIST_PARQUET_DIR = 'abfss://depi3@depi3.dfs.core.windows.net/Silver Layer'

StatementMeta(Depi, 7, 2, Finished, Available, Finished)

# Read Data from Parquet files

In [2]:
accounts_df = spark.read.load(f'{SRC_PARQUET_DIR}/Accounts/part-*.snappy.parquet', format='parquet')
major_df = spark.read.load(f'{SRC_PARQUET_DIR}/Major/part-*.snappy.parquet', format='parquet')
manager_df = spark.read.load(f'{SRC_PARQUET_DIR}/Manager/part-*.snappy.parquet', format='parquet')
office_df = spark.read.load(f'{SRC_PARQUET_DIR}/Office/part-*.snappy.parquet', foramt='parquet')
sales_agent_df = spark.read.load(f'{SRC_PARQUET_DIR}/Sales Agent/part-*.snappy.parquet', format='parquet')
sales_pipeline_df = spark.read.load(f'{SRC_PARQUET_DIR}/Sales PipeLine/part-*.snappy.parquet', format='parquet')
product_df = spark.read.load(f'{SRC_PARQUET_DIR}/Products/part-*.snappy.parquet', format='parquet')
regional_office_df = spark.read.load(f'{SRC_PARQUET_DIR}/Regional Office/part-*.snappy.parquet', format='parquet')
sector_df = spark.read.load(f'{SRC_PARQUET_DIR}/Sector/part-*.snappy.parquet', format='parquet')
series_df = spark.read.load(f'{SRC_PARQUET_DIR}/Series/part-*.snappy.parquet', format='parquet')

StatementMeta(Depi, 7, 3, Finished, Available, Finished)

In [3]:
display(accounts_df.limit(10))

StatementMeta(Depi, 7, 4, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 51519910-98c7-45d3-bba6-9608ad14c16e)

# Clean data (Preprocessing)

- Drop Duplicates
- Replace NULLs
- Strip strings

In [4]:
data_frames = [
    accounts_df, major_df, manager_df,
    office_df, sales_agent_df, sales_pipeline_df,
    product_df, regional_office_df,
    sector_df, series_df
]

def transform_and_clean_data(df):
    """Clean and transform the Spark DataFrame."""
    # 1. Drop duplicate rows
    df = df.dropDuplicates()

    # 2. Fill missing numerical columns with 0
    numerical_cols = [f.name for f in df.schema.fields if str(f.dataType) in ("IntegerType", "DoubleType", "LongType")]
    for col_name in numerical_cols:
        df = df.withColumn(col_name, when(col(col_name).isNull(), lit(0)).otherwise(col(col_name)))

    # 3. Trim whitespace from string columns
    str_cols = [f.name for f in df.schema.fields if str(f.dataType) == "StringType"]
    for col_name in str_cols:
        df = df.withColumn(col_name, trim(col(col_name)))

    return df


for df in data_frames:
    df = transform_and_clean_data(df)

StatementMeta(Depi, 7, 5, Finished, Available, Finished)

# Data Generation

Generated dates for `StartDate` Column in `Product` table

In [5]:
def generate_random_date(start_date, end_date):
    """
    Generate a random date between two given dates.

    Parameters:
    start_date (str): The start date in 'YYYY-MM-DD' format.
    end_date (str): The end date in 'YYYY-MM-DD' format.

    Returns:
    datetime: A random date between start_date and end_date.
    """
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    delta = end - start
    random_days = random.randint(0, delta.days)

    return start + timedelta(days=random_days)

generate_random_date_udf = udf(lambda: generate_random_date("2020-01-01", "2023-12-31"), DateType())
product_df = product_df.withColumn('StartDate', generate_random_date_udf())

display(product_df.limit(10))

StatementMeta(Depi, 7, 6, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, df2fdf2b-aaf5-4b80-972a-bd6b5ed2a572)

# Final Changes

- Changed `engage_date` and `close_date` in `Sales_PipeLine` table
- Saved the tables in Parquet Files in the Selver Layer Folder

In [7]:
sales_pipeline_df = sales_pipeline_df.withColumn(
    "engage_date",
    when(sales_pipeline_df["engage_date"] < "1900-01-01", "2004-01-01").otherwise(sales_pipeline_df["engage_date"])
)
sales_pipeline_df = sales_pipeline_df.withColumn(
    "close_date",
    when(sales_pipeline_df["close_date"] < "1900-01-01", "2004-01-01").otherwise(sales_pipeline_df["close_date"])
)

accounts_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Accounts/')
major_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Major/')
manager_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Manager/')
office_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Office/')
sales_agent_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Sales Agent/')
sales_pipeline_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Sales PipeLine/')
regional_office_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Regional Office/')
sector_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Sector/')
series_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Series/')
product_df.write.mode("overwrite").parquet(f'{DIST_PARQUET_DIR}/Products/')

StatementMeta(Depi, 7, 8, Finished, Available, Finished)