**TRANSFORMATION & DATA CLEANING** 

In [6]:
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql.types import DateType
from functools import reduce
from pyspark.sql import DataFrame

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, 8, Finished, Available, Finished)

In [7]:
# List of table names (to call them easier)
table_names = [
    "brands", "budget", "budget_rate", "customers", "employees", 
    "exchange_rate", "forecast", "invoice_doc_type", "invoices", "order_doc_type", 
    "order_status", "orders", "products", "regions" 
]

# Dictionary to hold each DataFrame
dfs = {}

# Create a variable for each table based in its name  
for table_name in table_names:
    dfs[table_name] = spark.table(table_name)

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, 9, Finished, Available, Finished)

In [10]:
# Creation of Date Dimension

# Relevant fact tables 
fact_tables_and_date_columns = {
    "orders": "order_date",
    "invoices": "billing_date",
    "forecast": "forecast_month",
    "budget": "month" 
}

# Create DataFrame of dates based on fact tables
date_dfs = []
for table_name, date_col_name in fact_tables_and_date_columns.items():
    if table_name in dfs and date_col_name in dfs[table_name].columns:
        date_dfs.append(dfs[table_name].select(F.col(date_col_name).alias("date")))
    else:
        continue

# Join all DataFrames into one date column
if not date_dfs:
    raise ValueError("There is no valid dates.")

all_dates_df = reduce(DataFrame.unionByName, date_dfs)

# Measure Min & Max of the new table
min_max_dates = all_dates_df.agg(
    F.min("date").alias("min_date"),
    F.max("date").alias("max_date")
).first()

start_date = min_max_dates["min_date"]
end_date = min_max_dates["max_date"]

if not start_date or not end_date:
    raise ValueError("is not posible to determine the date range")

# Create date series  
dim_date = spark.sql(f"SELECT explode(sequence(to_date('{start_date}'), to_date('{end_date}'), interval 1 day)) as Date")

# Add features to the table
dim_date = dim_date.withColumn("DateKey", F.date_format(F.col("Date"), "yyyyMMdd").cast("int")) \
    .withColumn("Year", F.year(F.col("Date"))) \
    .withColumn("Quarter", F.concat(F.lit("Q"), F.quarter(F.col("Date")))) \
    .withColumn("Month", F.month(F.col("Date"))) \
    .withColumn("MonthName", F.date_format(F.col("Date"), "MMMM")) \
    .withColumn("Day", F.dayofmonth(F.col("Date"))) \
    .withColumn("DayOfWeek", F.dayofweek(F.col("Date"))) \
    .withColumn("DayName", F.date_format(F.col("Date"), "EEEE")) \
    .withColumn("DayOfYear", F.dayofyear(F.col("Date"))) \
    .withColumn("WeekOfYear", F.weekofyear(F.col("Date"))) \
    .withColumn("YearMonth", F.date_format(F.col("Date"), "yyyy-MM"))

# Save Dim_Date
table_name_to_save = "Dim_Date"
dim_date.write.mode("overwrite").format("delta").saveAsTable(f"SpaceParts_Test.{table_name_to_save}")

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, 12, Finished, Available, Finished)

In [11]:
# Dimension & Schemas of tables

for table_name, df in dfs.items():
    row_count = df.count()
    column_count = len(df.columns)
    print(f"\n Table: {table_name} ")
    print(f"Dimensions: {row_count} rows x {column_count} columns")
    print("Schema (Data Types):")
    df.printSchema()

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, 13, Finished, Available, Finished)


 Table: brands 
Dimensions: 20 rows x 6 columns
Schema (Data Types):
root
 |-- flagship: string (nullable = true)
 |-- class: string (nullable = true)
 |-- type: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- sub_brand: string (nullable = true)
 |-- product_brand_vp: string (nullable = true)


 Table: budget 
Dimensions: 2947811 rows x 5 columns
Schema (Data Types):
root
 |-- month: date (nullable = true)
 |-- total_budget: double (nullable = true)
 |-- customer_key: string (nullable = true)
 |-- product_key: integer (nullable = true)
 |-- dwcreateddate: timestamp (nullable = true)


 Table: budget_rate 
Dimensions: 15 rows x 4 columns
Schema (Data Types):
root
 |-- rate: decimal(10,5) (nullable = true)
 |-- from_currency: string (nullable = true)
 |-- to_currency: string (nullable = true)
 |-- currency_system: string (nullable = true)


 Table: customers 
Dimensions: 3911 rows x 9 columns
Schema (Data Types):
root
 |-- customer_key: string (nullable = true)
 |-- c

In [12]:
# Loop through the dictionary of DataFrames, to view the general structure & cardinality
for table_name, df in dfs.items():
    display(table_name)
    display(df)

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, 14, Finished, Available, Finished)

'brands'

SynapseWidget(Synapse.DataFrame, be0f3408-beb7-43e5-94e2-6654c0848d32)

'budget'

SynapseWidget(Synapse.DataFrame, 5cbbc498-abce-48bc-8814-bbdb7f69ac4c)

'budget_rate'

SynapseWidget(Synapse.DataFrame, d63c37ad-a2f3-4bfe-b0b5-be32512c22c2)

'customers'

SynapseWidget(Synapse.DataFrame, a57d8d33-7e9f-451d-913b-a2a5ac73fcad)

'employees'

SynapseWidget(Synapse.DataFrame, aeee7e3e-69b4-440d-aaca-84485737d2a3)

'exchange_rate'

SynapseWidget(Synapse.DataFrame, ca929bae-8507-422e-b766-32d5760261e4)

'forecast'

SynapseWidget(Synapse.DataFrame, 9db76db9-63b7-42ef-9b25-d628ef0fbea7)

'invoice_doc_type'

SynapseWidget(Synapse.DataFrame, 679afa5e-e515-4124-ae5d-ea582288d9b6)

'invoices'

SynapseWidget(Synapse.DataFrame, 3dc3038b-79d0-4772-85ff-f71589d2d63a)

'order_doc_type'

SynapseWidget(Synapse.DataFrame, 367a6dcc-fb7b-420d-ae75-960c784443b4)

'order_status'

SynapseWidget(Synapse.DataFrame, 0f5a54b7-7643-41c3-b8d3-f5766a20d1d9)

'orders'

SynapseWidget(Synapse.DataFrame, 3e1d827c-72c0-4fa0-b0cb-5093d2d7953a)

'products'

SynapseWidget(Synapse.DataFrame, d551182e-ee98-4c82-b9ce-b0ad2c74ec66)

'regions'

SynapseWidget(Synapse.DataFrame, a6426ffa-fd05-4a3b-a6c9-4062b013ab3a)

In [None]:
# Loop through the dictionary of DataFrames, to view general statistics
for table_name, df in dfs.items():
    print(table_name)
    display(df.describe())

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, -1, Cancelled, , Cancelled)

In [None]:
# Look for null values

for table_name, df in dfs.items():
    print(table_name)
    null_counts_df = df.agg(*[
        F.sum(F.col(c).isNull().cast("int")).alias(c) 
        for c in df.columns
    ])

    null_counts_pd = null_counts_df.toPandas()

    if null_counts_pd.sum(axis=1)[0] == 0:
        print("No nulls")
        continue

    # This turns columns into rows.
    nulls_report_pd = null_counts_pd.melt(
        var_name="column_name", 
        value_name="null_count"
    )

    # Show only the columns that actually have null values.
    final_report = nulls_report_pd[nulls_report_pd['null_count'] > 0]

    # Final Result
    display(final_report.sort_values(by='null_count', ascending=False))

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, -1, Cancelled, , Cancelled)

In [None]:
# Look for duplicates

for table_name, df in dfs.items():
    print(table_name)
    total_rows = df.count()
    distinct_rows = df.distinct().count()
    
    if total_rows > distinct_rows:
        print("there is duplicate rows.")
    else:
        print("there is not duplicated rows")

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, -1, Cancelled, , Cancelled)

In [None]:
# Eliminate duplicates of DataFrame "invoices"

dfs["invoices"] = dfs["invoices"].dropDuplicates()

StatementMeta(, 09f8b1fd-40c7-447a-80e7-a586d44f5eff, -1, Cancelled, , Cancelled)