In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Olist ecommerce data").getOrCreate() 

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

orders_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("order_status", StringType(), True),
    StructField("order_purchase_timestamp", TimestampType(), True),
    StructField("order_approved_at", TimestampType(), True),
    StructField("order_delivered_carrier_date", TimestampType(), True),
    StructField("order_delivered_customer_date", TimestampType(), True),
    StructField("order_estimated_delivery_date", TimestampType(), True),
])


Created ordersdf datframe with rqd schema


In [0]:
orders_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_orders_dataset.csv", header=True, schema=orders_schema)

In [0]:
order_reviews_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_order_reviews_dataset.csv", header=True, inferSchema=True)
products_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_products_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_sellers_dataset.csv", header=True, inferSchema=True)
category_name_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/product_category_name_translation.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_order_items_dataset.csv", header=True, inferSchema=True)
order_payments_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_order_payments_dataset.csv", header=True, inferSchema=True)
Customer_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_customers_dataset.csv", header=True, inferSchema=True)
geolocation_df = spark.read.csv("/Volumes/workspace/olist-data-schema/olist-db_volume/olist_geolocation_dataset.csv", header=True, inferSchema=True)

In [0]:
orders_df.limit(10).display()

In [0]:
from pyspark.sql.functions import col, sum as spark_sum, when

def get_null_counts(df):
    null_exprs = [spark_sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]
    return df.select(null_exprs)



In [0]:
get_null_counts(orders_df).display()

In [0]:
from pyspark.sql.functions import lit

orders_df = orders_df.withColumn(
    "order_id",
    when(col("order_id") == "e481f51cbdc54678b7cc49136f2d6af7", None).otherwise(col("order_id")
))

### Handling null in critical columns

In [0]:
def drop_nulls(df, subset_cols):
    """
    Drops rows with nulls in specified subset of columns.

    Parameters:
        df (DataFrame): The input DataFrame.
        subset_cols (list): List of column names to check for nulls.

    Returns:
        cleaned_df (DataFrame): DataFrame with rows dropped.
    """
    original_count = df.count()
    cleaned_df = df.dropna(subset=subset_cols)
    cleaned_count = cleaned_df.count()
    dropped_count = original_count - cleaned_count
    print(f"Dropped {dropped_count} rows based on nulls in {subset_cols}")
    return cleaned_df


In [0]:
orders_df_cleaned = drop_nulls(orders_df, ["order_id", "customer_id", "order_purchase_timestamp"])
orders_df_cleaned.display()

In [0]:
orders_df_cleaned.groupBy(col("order_status")).count().display()

In [0]:
orders_df_cleaned.filter(col("order_approved_at").isNull()).groupBy("order_status").count().display()

In [0]:
orders_df_cleaned = orders_df_cleaned.withColumn(
    "order_approved_at",
    when(
        (col("order_approved_at").isNull()) & (col("order_status") == "delivered"),
        col("order_purchase_timestamp")  # Use purchase time as fallback
    ).otherwise(col("order_approved_at"))
)

In [0]:
orders_df_cleaned.filter(col("order_approved_at").isNull()).groupBy("order_status").count().display()

In [0]:
orders_df_cleaned = orders_df_cleaned.withColumn(
    "order_delivered_carrier_date",
    when(
        col("order_delivered_carrier_date").isNull() & col("order_status").isin("delivered","approved","invoiced")
        ,col("order_approved_at")
    ).otherwise(col("order_delivered_carrier_date"))
)
orders_df_cleaned.filter(col("order_delivered_carrier_date").isNull()).groupBy("order_status").count().show()


In [0]:
orders_df_cleaned.filter(col("order_delivered_customer_date").isNull()).groupBy("order_status").count().show()


###adding 10 more days to below

In [0]:
from pyspark.sql.functions import when, col, date_add, lit
from pyspark.sql.types import TimestampType

orders_df_cleaned = orders_df_cleaned.withColumn(
    "order_delivered_customer_date",
    when(
        (col("order_delivered_customer_date").isNull()) & (col("order_status") == "delivered"),
        date_add(col("order_delivered_carrier_date"), 10)
    ).when(
        (col("order_delivered_customer_date").isNull()) & (col("order_status") != "delivered"),
        lit("0000-01-01T19:41:10.000+00:00").cast(TimestampType())
    ).otherwise(col("order_delivered_customer_date"))
)


In [0]:
orders_df_cleaned.limit(20).display()

In [0]:
orders_df_cleaned.filter(col("order_delivered_customer_date").isNull()).groupBy("order_status").count().show()
get_null_counts(orders_df_cleaned).show()

In [0]:
order_items_df.printSchema()
order_items_df.groupBy("order_id", "order_item_id").count().filter("count > 1").show()


In [0]:
from pyspark.sql.functions import *

order_items_df = order_items_df.withColumn(
    "total_value", round(col("price") + col("freight_value"))
)


In [0]:
order_items_df.groupby("seller_id").agg(sum("price").alias("total_sell_price"),avg("price").alias("avg_price"),max("price").alias("max_price"),min("price").alias("min_price")).show()


### productsdf

In [0]:
products_df.limit(10).display()

In [0]:
get_null_counts(products_df).show()

In [0]:
products_df.filter(
    col("product_category_name").isNull() &
    col("product_name_lenght").isNull() &
    col("product_description_lenght").isNull() &
    col("product_photos_qty").isNull()
).count()


In [0]:
products_df.filter(
    (col("product_category_name") == "Unknown category") &
    (col("product_name_lenght") == 0) &
    (col("product_description_lenght") == 0) &
    (col("product_photos_qty") == 0) &
    (col("product_weight_g") == 0) &
    (col("product_length_cm") == 0) &
    (col("product_height_cm") == 0) &
    (col("product_width_cm") == 0)
).show()


In [0]:
products_df_cleaned = products_df.fillna({
    "product_category_name": "Unknown category",
    "product_name_lenght": 0,
    "product_description_lenght": 0,
    "product_photos_qty": 0,
    "product_weight_g": 0,
    "product_length_cm": 0,
    "product_height_cm": 0,
    "product_width_cm": 0
})
products_df_cleaned.display()

In [0]:
get_null_counts(products_df_cleaned).show()

### order-payments

In [0]:
order_payments_df.show()

### replace boleto with UPI payment

In [0]:
order_payments_df_cleaned=order_payments_df.withColumn("payment_type",when(col("payment_type")=="boleto","UPI_payements") .when(col("payment_type") == "not_defined", "cash").otherwise(col("payment_type")))
order_payments_df_cleaned.groupBy("payment_type").count().show()

In [0]:
order_payments_df_cleaned.groupBy("payment_type").agg(sum("payment_value"),max("payment_value"),min("payment_value")).show()

In [0]:
order_payment_enriched = orders_df_cleaned.join(order_payments_df_cleaned, "order_id", "inner")
print(order_payment_enriched.count(),
order_payments_df_cleaned.count(),
orders_df_cleaned.count())
get_null_counts(order_payment_enriched).show()
print(get_null_counts(order_items_df).show(),get_null_counts(order_payments_df_cleaned).show())

In [0]:
order_payment_enriched.filter(col("order_delivered_customer_date").isNull()) \
    .groupBy("order_status").count().show()


In [0]:
display(Customer_df.groupBy(col("customer_state")).count())
get_null_counts(Customer_df).show()

In [0]:
from pyspark.sql.functions import when

customers_df_cleaned = Customer_df.withColumn(
    "region",
    when(col("customer_state").isin("SP", "RJ", "MG", "ES"), "Southeast")
    .when(col("customer_state").isin("RS", "SC", "PR"), "South")
    .when(col("customer_state").isin("BA", "SE", "AL", "PE", "PB", "RN", "PI", "MA", "CE"), "Northeast")
    .when(col("customer_state").isin("DF", "GO", "MT", "MS"), "Center-West")
    .when(col("customer_state").isin("AM", "PA", "AC", "RO", "RR", "TO", "AP"), "North")
    .otherwise("Unknown")
)


In [0]:
customers_df_cleaned.groupBy("region").count().show()


In [0]:
from pyspark.sql.functions import countDistinct

Customer_df.groupBy("customer_id").agg(countDistinct("customer_unique_id").alias("a")).orderBy("a",descending=True).show()




In [0]:
sellers_df.select([count(when(col(c).isNull(),1)).alias(c) for c in sellers_df.columns]).show()

In [0]:
orders_df_cleaned.select([count(when(col(x).isNull(),1)).alias(x) for x in orders_df_cleaned.columns]).show()

In [0]:
dateDifference=orders_df_cleaned.withColumn("date_difference",datediff(col("order_delivered_customer_date"),col("order_purchase_timestamp")))
dateDifference.display()