In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("OlistECommerceAnalysis") \
    .getOrCreate()

print("PySpark installed and SparkSession created successfully!")
print(f"Spark version: {spark.version}")


In [None]:
input_path = "/kaggle/input/brazilian-ecommerce/"

orders_df = spark.read.csv(input_path + "olist_orders_dataset.csv", header=True, inferSchema=True)
order_items_df = spark.read.csv(input_path + "olist_order_items_dataset.csv", header=True, inferSchema=True)
customers_df = spark.read.csv(input_path + "olist_customers_dataset.csv", header=True, inferSchema=True)
products_df = spark.read.csv(input_path + "olist_products_dataset.csv", header=True, inferSchema=True)
order_payments_df = spark.read.csv(input_path + "olist_order_payments_dataset.csv", header=True, inferSchema=True)
sellers_df = spark.read.csv(input_path + "olist_sellers_dataset.csv", header=True, inferSchema=True)

print("Successfully loaded 6 data tables!")


In [None]:
print("--- Orders Table Schema ---")
orders_df.printSchema()

print("\n--- First 5 rows of Orders Data ---")
orders_df.show(5)

In [None]:
print("--- Order Items Table Schema ---")
order_items_df.printSchema()

print("\n--- First 5 rows of Order Items Data ---")
order_items_df.show(5)


In [None]:
merged_df = orders_df.join(order_items_df, on="order_id", how="inner")
print(f"Number of rows in the merged table: {merged_df.count()}")
print("\n--- Merged Table Schema ---")
merged_df.printSchema()

print("\n--- First 5 rows of the Merged Table ---")
merged_df.show(5)


In [None]:
from pyspark.sql.functions import col
delivered_df = merged_df.filter(col("order_status") == "delivered")
print(f"Number of delivered orders: {delivered_df.count()}")
print("\n--- Checking for Null Values ---")
delivered_df.select([col(c).isNull().cast("integer").alias(c) for c in ["order_purchase_timestamp", "price", "freight_value"]]).groupBy().sum().show()


In [None]:
from pyspark.sql.functions import col
top_10_expensive_items = delivered_df.orderBy(col("price").desc()).limit(10)
print("--- Top 10 Most Expensive Products Sold ---")
top_10_expensive_items.select("order_id", "product_id", "price").show()


In [None]:
from pyspark.sql.functions import sum, col, desc

full_df = delivered_df.join(customers_df, on="customer_id", how="inner")
revenue_by_city = full_df.groupBy("customer_city", "customer_state") \
.agg(sum("price").alias("total_revenue"))
top_10_cities_by_revenue = revenue_by_city.orderBy(col("total_revenue").desc()).limit(10)
print("--- Top 10 Cities by Total Revenue ---")
top_10_cities_by_revenue.show()


In [None]:
from pyspark.sql.functions import year, month, sum, col, desc
sales_by_month_df = full_df.withColumn("year", year(col("order_purchase_timestamp"))) \
                           .withColumn("month", month(col("order_purchase_timestamp")))
revenue_by_month = sales_by_month_df.groupBy("year", "month") \
                                    .agg(sum("price").alias("total_revenue"))
top_months_by_revenue = revenue_by_month.orderBy(col("total_revenue").desc())
print("--- Top Months by Total Revenue (Across All Years) ---")
top_months_by_revenue.show(15)


In [None]:
from pyspark.sql.functions import col

df_with_revenue = full_df.withColumn("revenue_after_freight", col("price") - col("freight_value"))

financial_df = df_with_revenue.withColumn("margin_percentage", (col("revenue_after_freight") / col("price")) * 100)

print("--- Top 10 Orders by Profit Margin (Best) ---")
top_margins = financial_df.orderBy(col("margin_percentage").desc())
top_margins.select("order_id", "price", "freight_value", "margin_percentage").show(10)

print("\n--- Bottom 10 Orders by Profit Margin (Worst) ---")
bottom_margins = financial_df.orderBy(col("margin_percentage").asc())
bottom_margins.select("order_id", "price", "freight_value", "margin_percentage").show(10)



In [None]:
from pyspark.sql.functions import col

final_df = financial_df.join(products_df, on="product_id", how="inner")
target_category = "cama_mesa_banho"
category_df = final_df.filter(col("product_category_name") == target_category)
print(f"--- Top 3 Highest Value Orders in Category '{target_category}' ---")
top_3_in_category = category_df.orderBy(col("price").desc()).limit(3)
top_3_in_category.select("order_id", "product_id", "product_category_name", "price").show()


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

top_cities_pandas = top_10_cities_by_revenue.toPandas()
plt.figure(figsize=(12, 6)) 
sns.barplot(x="total_revenue", y="customer_city", data=top_cities_pandas, orient='h')
plt.title('Top 10 Cities by Total Revenue', fontsize=16)
plt.xlabel('Total Revenue', fontsize=12)
plt.ylabel('City', fontsize=12)
plt.show() 
revenue_by_month_pandas = revenue_by_month.orderBy("year", "month").toPandas()
# Create a proper date column for plotting
revenue_by_month_pandas['date'] = pd.to_datetime(revenue_by_month_pandas['year'].astype(str) + '-' + revenue_by_month_pandas['month'].astype(str))
plt.figure(figsize=(15, 7)) 
sns.lineplot(x='date', y='total_revenue', data=revenue_by_month_pandas, marker='o')
plt.title('Total Monthly Revenue (2016-2018)', fontsize=16)
plt.xlabel('Date', fontsize=12)
plt.ylabel('Total Revenue', fontsize=12)
plt.xticks(rotation=45) 
plt.grid(True) 
plt.show() 


In [None]:
from pyspark.sql.functions import collect_list, size, col
print("Starting to group products by order...")
basket_data = final_df.groupBy("order_id") \
                      .agg(collect_list("product_id").alias("items"))

print("Grouping complete!")
print("\n--- Sample of Basket Data (each row is one order) ---")
basket_data.select("items").show(10, truncate=False)
orders_with_multiple_items = basket_data.filter(size(col("items")) > 1)
print(f"\nAdditional Info: Number of orders with more than one item: {orders_with_multiple_items.count()}")

In [None]:
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import array_distinct

cleaned_basket_data = basket_data.withColumn("items", array_distinct(col("items")))
print("Training the FP-Growth model with relaxed settings...")
fpGrowth_relaxed = FPGrowth(itemsCol="items", minSupport=0.0005, minConfidence=0.01)
model_relaxed = fpGrowth_relaxed.fit(cleaned_basket_data)
print("Model training complete!")
print("\n--- Association Rules found by the relaxed model ---")
model_relaxed.associationRules.show(20, truncate=False)
print("\n--- Frequently purchased itemsets (relaxed model) ---")
model_relaxed.freqItemsets.show(20, truncate=False)


In [None]:
from pyspark.sql.functions import col, max, countDistinct, sum, datediff, lit
from pyspark.sql import Window

latest_date = full_df.select(max("order_purchase_timestamp")).first()[0]
current_date = latest_date.replace(day=latest_date.day + 1)
print(f"The 'current date' for Recency calculation is: {current_date}")

rfm_intermediate = full_df.groupBy("customer_id").agg(
    
    sum("price").alias("monetary"),
   
    countDistinct("order_id").alias("frequency"),
    
   
    max("order_purchase_timestamp").alias("last_purchase_date")
)

rfm_df = rfm_intermediate.withColumn(
    "recency",
    datediff(lit(current_date), col("last_purchase_date"))
)

print("\n--- RFM metrics for each customer ---")
rfm_df.select("customer_id", "recency", "frequency", "monetary").show(10)



In [None]:
from pyspark.sql.functions import ntile
from pyspark.sql import Window
recency_window = Window.orderBy(col("recency").desc())

frequency_window = Window.orderBy(col("frequency").asc())
monetary_window = Window.orderBy(col("monetary").asc())
rfm_scores = rfm_df.withColumn("r_score", ntile(4).over(recency_window)) \
                   .withColumn("f_score", ntile(4).over(frequency_window)) \
                   .withColumn("m_score", ntile(4).over(monetary_window))

print("--- RFM Scores for each customer ---")
rfm_scores.select("customer_id", "recency", "frequency", "monetary", "r_score", "f_score", "m_score").show(10)


In [None]:
from pyspark.sql.functions import concat, when

rfm_segments = rfm_scores.withColumn(
    "rfm_segment",
    concat(col("r_score"), col("f_score"), col("m_score"))
)

final_rfm = rfm_segments.withColumn(
    "segment_name",
    when(col("rfm_segment").isin("444", "443", "434", "344"), "Champions")
    .when(col("rfm_segment").isin("333", "334", "343", "433"), "Loyal Customers")
    .when(col("rfm_segment").isin("441", "442", "431", "432", "422", "421", "341", "342", "331", "332"), "Potential Loyalists")
    .when(col("rfm_segment").isin("411", "412", "423"), "New Customers")
    .when(col("rfm_segment").isin("244", "243", "234", "144", "143", "134"), "At Risk")
    .when(col("rfm_segment").isin("111", "112", "121", "122", "211", "212", "221", "222"), "Hibernating")
    .when(col("rfm_segment").isin("133", "132", "142", "241", "231", "242", "232"), "About to Sleep")
    .otherwise("Others") 
)

print("--- Final Customer Segments ---")
final_rfm.select("customer_id", "recency", "frequency", "monetary", "rfm_segment", "segment_name").show(20)
print("\n--- Customer Count by Segment ---")
final_rfm.groupBy("segment_name").count().orderBy(col("count").desc()).show()


In [None]:
output_path = "/kaggle/working/"

print(f"Starting to save final DataFrames to CSV files in: {output_path}")
final_rfm.repartition(1).write.csv(output_path + "rfm_customer_segments", header=True, mode="overwrite")
final_rfm.groupBy("segment_name").count().orderBy(col("count").desc()) \
    .repartition(1).write.csv(output_path + "segment_counts", header=True, mode="overwrite")
top_10_cities_by_revenue.repartition(1).write.csv(output_path + "top_cities_revenue", header=True, mode="overwrite")
revenue_by_month.repartition(1).write.csv(output_path + "monthly_revenue", header=True, mode="overwrite")
negative_margin_orders = financial_df.filter(col("margin_percentage") < 0)
negative_margin_orders.repartition(1).write.csv(output_path + "negative_margin_orders", header=True, mode="overwrite")
print("\nAll files have been saved successfully!")
print("You can now find them in the 'Output' section of your Kaggle notebook.")

