In [0]:
customers_df = spark.read.table('olist_ecommerce.bronze.brz_customers')
orders_df = spark.read.table('olist_ecommerce.bronze.brz_orders')
order_items_df = spark.read.table('olist_ecommerce.bronze.brz_order_items')
products_df = spark.read.table('olist_ecommerce.bronze.brz_products')
payments_df = spark.read.table('olist_ecommerce.bronze.brz_order_payments')
reviews_df = spark.read.table('olist_ecommerce.bronze.brz_order_reviews')
sellers_df = spark.read.table('olist_ecommerce.bronze.brz_sellers')
geolocation_df = spark.read.table('olist_ecommerce.bronze.brz_geolocation')
product_category_name_translation_df = spark.read.table('olist_ecommerce.bronze.brz_product_category_name_translation')

In [0]:
customers_df.show()

# Identify Missing Values

In [0]:
from pyspark.sql.functions import *

catalog_name = 'olist_ecommerce'

In [0]:
# This function checks for null values in each column of the given DataFrame.
# Identifying missing data helps businesses ensure data quality, enabling more accurate analysis and decision-making.

def missing_values(df, df_name):
    print(f'Missing values in {df_name}:')
    df.select([count(when(col(c).isNull(), 1)).alias(c) for c in df.columns]).show()

In [0]:
# Check for missing (null) values in each column of the customers_df DataFrame.
missing_values(customers_df, 'customer')

In [0]:
# Check for missing (null) values in each column of the orders_df DataFrame.
missing_values(orders_df, 'orders')

In [0]:
# Check for missing (null) values in each column of the order_item_df DataFrame.
missing_values(order_items_df, 'order_item')

In [0]:
missing_values(payments_df,'payments')

# Handle Missing Values

1. Drop missing Values ( for non - critical columns )

2. Fill missing values ( for numerical columns )

3. Impute Missing Values ( for continous data )

In [0]:
# Drop rows with missing values in critical columns: 'order_id', 'customer_id', and 'order_status'
orders_df_silver =  orders_df.na.drop(subset=['order_id', 'customer_id', 'order_status'])

In [0]:
orders_df_silver.show()

In [0]:
# Fill missing values in 'order_delivered_customer_date' with a placeholder date '9999-12-31' to indicate undelivered orders.

from pyspark.sql.functions import when, col, lit, to_timestamp

orders_df_silver = orders_df_silver.withColumn(
    'order_delivered_customer_date',
    when(
        col('order_delivered_customer_date').isNull(),
        to_timestamp(lit('9999-12-31'))
    ).otherwise(col('order_delivered_customer_date'))
)

In [0]:
orders_df_silver.show()

# Impute missing values 

In [0]:
# Impute missing values in the 'payment_value' column using the mean strategy and create a new column 'payment_value_imputed'.
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['payment_value'], outputCols=['payment_value_imputed']).setStrategy('mean')
payments_df_silver = imputer.fit(payments_df).transform(payments_df)

In [0]:
payments_df_silver.show()

# Standardizing the format


In [0]:
def print_schema(df,df_name):
    # Print the schema of the given DataFrame with its name
    print(f'schema of {df_name}:')
    df.printSchema()

In [0]:
print_schema(orders_df,'orders')

In [0]:
print_schema(customers_df,'customers')

In [0]:
print_schema(payments_df,'payments')

In [0]:
orders_df_silver.show()

In [0]:
# Convert 'order_purchase_timestamp' to date format to standardize the column for further analysis.
orders_df_silver = orders_df_silver.withColumn('order_purchase_timestamp', to_date(col('order_purchase_timestamp')))

In [0]:
orders_df_silver.show()

In [0]:
payments_df_silver.show()

In [0]:
# Standardize 'payment_type' values for consistency
payments_df_silver = payments_df_silver.withColumn('payment_type',when(col('payment_type')=='boleto','Bank Transfer')
                                                     .when(col('payment_type')=='credit_card','Credit Card')
                                                     .when(col('payment_type')=='debit_card','Debit Card')
                                                    .otherwise('other'))

In [0]:
payments_df_silver.show()

In [0]:
customers_df.printSchema()

In [0]:
customers_df.show()

In [0]:
customers_df_silver = customers_df.withColumn('customer_zip_code_prefix',col('customer_zip_code_prefix').cast('string'))

In [0]:
customers_df_silver.printSchema()