In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark DataFrames")
spark = spark.getOrCreate()


In [0]:
df_bronze_2009 = spark.read.option("header", True).option("inferSchema", False).csv("/Volumes/workspace/default/online_retail/online_retail_II_2009.csv")


In [0]:
df_bronze_2011 = spark.read.option("header", True).option("inferSchema", False).csv("/Volumes/workspace/default/online_retail/online_retail_II_2011.csv")

In [0]:
df_bronze_2009.printSchema()
df_bronze_2011.printSchema()

In [0]:
df_bronze_2009.columns == df_bronze_2011.columns

In [0]:
df_bronze_2009.count()
df_bronze_2011.count()

In [0]:
from pyspark.sql.functions import lit

df_bronze_2009 = df_bronze_2009.withColumn(
    "source_period", lit("2009-2010")
)

df_bronze_2011 = df_bronze_2011.withColumn(
    "source_period", lit("2010-2011")
)


In [0]:
df_bronze_combined = df_bronze_2009.unionByName(df_bronze_2011)


In [0]:
assert df_bronze_combined.count() == (
    df_bronze_2009.count() + df_bronze_2011.count()
)


In [0]:
df_bronze_combined.groupBy("source_period").count().show()


In [0]:
df_bronze_combined = (
    df_bronze_combined
    .withColumnRenamed("Customer ID", "customer_id")
    .withColumnRenamed("InvoiceDate", "invoice_date")
    .withColumnRenamed("StockCode", "stock_code")
)




In [0]:
df_bronze_combined.write.mode("overwrite").saveAsTable("bronze_online_retail")



In [0]:
df = spark.table("bronze_online_retail")
df.show()


In [0]:
df.count()


In [0]:
len(df.columns)

In [0]:
df.printSchema()


In [0]:
from pyspark.sql.functions import col, sum

null_profile = df.select([
    sum(col(c).isNull().cast("int")).alias(c)
    for c in df.columns
])

null_profile.show(truncate=False)


In [0]:
df.groupBy("invoice").count().orderBy("count", ascending=False).show(10)


In [0]:
df.filter(col("invoice").startswith("C")).count()


In [0]:
df.filter(col("quantity") < 0).count()


In [0]:
from pyspark.sql.functions import col

df.filter(col("price").cast("double") <= 0).count()



In [0]:
df.groupBy("country").count().orderBy("count", ascending=False).show(10)


In [0]:
df.select("invoice_date").show(5, truncate=False)


In [0]:
from pyspark.sql.functions import expr


In [0]:
df_bronze = spark.table("bronze_online_retail")


In [0]:
df_silver = (
    df_bronze
    .withColumn("quantity", expr("try_cast(quantity as int)"))
    .withColumn("price", expr("try_cast(price as double)"))
    .withColumn("customer_id", expr("try_cast(customer_id as long)"))
)


In [0]:
from pyspark.sql.functions import to_timestamp, col
df_silver = df_bronze.withColumn(
    "invoice_date",
    to_timestamp(col("invoice_date"), "dd-MM-yyyy HH:mm")
)



In [0]:
df_silver.filter(col("price").isNull()).count()
df_silver.filter(col("quantity").isNull()).count()


In [0]:
from pyspark.sql.functions import when

df_silver = df_silver.withColumn(
    "is_cancellation",
    when(col("invoice").startswith("C"), True).otherwise(False))


In [0]:
df_silver = df_silver.withColumn(
    "is_return",
    when(col("quantity") < 0, True).otherwise(False)
)


In [0]:
df_silver_clean = df_silver.filter(
    (col("price") > 0) &
    (col("quantity") > 0) &
    (col("is_cancellation"))
)


In [0]:
df_silver.printSchema()

In [0]:
df_silver.createOrReplaceTempView("retail")


In [0]:
spark.sql("""
SELECT *,
       CAST(price AS DOUBLE) * CAST(quantity AS INT) AS revenue
FROM retail
""").write.mode("overwrite").saveAsTable("silver_online_retail")


In [0]:
df_gold_base = spark.table("silver_online_retail")


In [0]:
from pyspark.sql.functions import sum

total_revenue = df_gold_base.select(
    sum("revenue").alias("total_revenue")
)

total_revenue.show()


In [0]:
from pyspark.sql.functions import date_format


In [0]:
df_monthly = (
    df_gold_base
    .withColumn("year_month", date_format("invoice_date", "yyyy-MM"))
)
df_monthly.show()


In [0]:
monthly_revenue = (
    df_monthly
    .groupBy("year_month")
    .agg(sum("revenue").alias("monthly_revenue"))
    .orderBy("year_month")
)
display(monthly_revenue)


In [0]:
from pyspark.sql.functions import countDistinct


In [0]:
customer_clv = (
    df_gold_base
    .groupBy("customer_id")
    .agg(
        sum("revenue").alias("lifetime_value"),
        countDistinct("invoice").alias("total_orders")
    )
    .orderBy(col("lifetime_value").desc())
)
display(customer_clv)



In [0]:
product_revenue = (
    df_gold_base
    .groupBy("stock_code", "description")
    .agg(sum("revenue").alias("product_revenue"))
    .orderBy(col("product_revenue").desc())
)
display(product_revenue)


In [0]:
country_revenue = (
    df_gold_base
    .groupBy("country")
    .agg(sum("revenue").alias("country_revenue"))
    .orderBy(col("country_revenue").desc())
)
display(country_revenue)

In [0]:
from pyspark.sql.functions import count_distinct, col

customer_clv = (
    df_gold_base
    .groupBy("customer_id")
    .agg(
        sum("revenue").alias("lifetime_value"),
        count_distinct("invoice").alias("total_orders") 
    )
    .orderBy(col("lifetime_value").desc())
)