Objetivo do notebook:
1. Analisar os dados do arquivo _custumer_dataset.csv_.
2. Verificar a existência de outliers.
3. Verificar padrões temporais.
4. Relacionar recência com valor (Cross-tabs)

# Imports & Definitions

In [1]:
import pyspark
import pyspark.sql.functions as f
from pyspark.sql.window import Window
from itertools import chain
from functools import reduce

## Session

In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder 
    .master("local[6]")
    .appName("delta")
    .config("spark.driver.memory", "13g")
    .config("spark.jars.packages",
            "io.delta:delta-core_2.12:3.3.2")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Dataframe

In [3]:
data_path = "../data/customer_dataset.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- favorite_category: string (nullable = true)
 |-- acquisition_channel: string (nullable = true)
 |-- first_purchase_date: string (nullable = true)
 |-- total_orders: integer (nullable = true)
 |-- total_spent: integer (nullable = true)
 |-- last_purchase_date: string (nullable = true)
 |-- returns_count: integer (nullable = true)
 |-- avg_order_value: integer (nullable = true)
 |-- days_since_last_purchase: integer (nullable = true)
 |-- customer_lifetime_months: integer (nullable = true)



In [5]:
df.show(35, truncate=False)

+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+
|customer_id|favorite_category|acquisition_channel|first_purchase_date|total_orders|total_spent|last_purchase_date|returns_count|avg_order_value|days_since_last_purchase|customer_lifetime_months|
+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+
|CUST0001   |Tablets          |Google             |16/11/2022         |10          |17302      |25/03/2024        |0            |1730           |124                     |20                      |
|CUST0002   |Games            |Google             |18/04/2023         |21          |18160      |21/07/2024        |0            |865            |6                       |15                      |
|CUST0003   |Tablets

## Functions

In [6]:
def summarize_numeric(df, cols, approx_accuracy=1000):
    summaries = []
    total_rows = df.count()

    for c in cols:
        s = (df.select(
                f.lit(c).alias("column"),
                f.count(f.col(c)).alias("non_null"),
                (f.lit(total_rows) - f.count(f.col(c))).alias("n_missing"),
                f.mean(c).alias("mean"),
                f.expr(f"percentile_approx({c}, 0.5, {approx_accuracy})").alias("median"),
                f.stddev_samp(c).alias("stddev"),
                f.min(c).alias("min"),
                f.expr(f"percentile_approx({c}, 0.25, {approx_accuracy})").alias("p25"),
                f.expr(f"percentile_approx({c}, 0.75, {approx_accuracy})").alias("p75"),
                f.max(c).alias("max"),
                f.countDistinct(c).alias("n_distinct"),
            ))
        summaries.append(s)

    return reduce(lambda a, b: a.unionByName(b), summaries)


In [25]:
def hist_equal_width(df, col, bins=5):
    mm = df.select(f.min(col).alias("min"), f.max(col).alias("max")).first()
    if mm is None or mm["min"] is None or mm["max"] is None:
        return None
    lo, hi = float(mm["min"]), float(mm["max"])
    if lo == hi:  # degenerate case
        return df.groupBy(col).count().withColumn("column", f.lit(col))

    edges, counts = (df.select(col).na.drop()
                       .rdd.flatMap(lambda r: [float(r[0])])
                       .histogram(bins))
    rows = [(float(edges[i]), float(edges[i+1]), int(counts[i])) for i in range(len(counts))]
    return spark.createDataFrame(rows, ["bin_start", "bin_end", "count"]).withColumn("column", f.lit(col))


# EDA

## Average, Mean, Distribution

In [8]:
numeric_cols = [c for c, t in df.dtypes if t in ("int", "bigint", "double", "float")]

numeric_summary = summarize_numeric(df, numeric_cols)
numeric_summary.show(truncate=False)

+------------------------+--------+---------+------------------+------+------------------+---+----+-----+-----+----------+
|column                  |non_null|n_missing|mean              |median|stddev            |min|p25 |p75  |max  |n_distinct|
+------------------------+--------+---------+------------------+------+------------------+---+----+-----+-----+----------+
|total_orders            |35      |0        |10.82857142857143 |8     |8.26575399317148  |1  |5   |16   |32   |22        |
|total_spent             |35      |0        |7965.5142857142855|6432  |6066.909976990558 |567|2945|13987|19876|35        |
|returns_count           |35      |0        |0.8857142857142857|1     |0.9000466841300206|0  |0   |1    |3    |4         |
|avg_order_value         |35      |0        |766.8571428571429 |759   |352.3320745494263 |215|551 |880  |2100 |34        |
|days_since_last_purchase|35      |0        |92.2              |53    |98.9883534444687  |0  |10  |164  |343  |34        |
|customer_lifeti

### Outliers

# RFV by Rank

Como o dataset estudado tem apenas 35 colunas, utilizou-se uma estratégia de pontuação RFV por rank.

In [10]:
def atribbute_score(col_name):
    return(
        f.when(f.col(col_name) <= 6, 5)
        .when(f.col(col_name) <= 13, 4)
        .when(f.col(col_name) <= 20, 3)
        .when(f.col(col_name) <= 28, 2)
        .otherwise(1)
    )

In [30]:
customer_dict = {
    "Champions": [
        "444", "445", "454", "455", "544", "545", "554", "555"
    ],
    "Loyal Customers": [
        "343", "344", "345", "353", "354", "355", "443", "453", "543", "553"
    ],
    "Potential Loyalists": [
        "422", "423", "432", "433", "522", "523", "532", "533"
    ],
    "Recent Customers": [
        "511", "512", "521"
    ],
    "Promising": [
        "411", "412", "421"
    ],
    "Need Attention": [
        "311", "312", "313", "321", "322", "323", "331", "332", "333",
        "413", "414", "415", "424", "425", "431", "434", "435", "441", "442",
        "451", "452", "513", "514", "515", "524", "525", "531", "534", "535",
        "541", "542", "551", "552"
    ],
    "At Risk": [
        "214", "215", "224", "225", "234", "235", "241", "242", "243", "244", "245",
        "251", "252", "253", "254", "255", "314", "315", "324", "325", "334", "335",
        "341", "342", "351", "352"
    ],
    "Cannot Lose Them": [
        "114", "115", "124", "125", "134", "135", "141", "142", "143", "144", "145",
        "151", "152", "153", "154", "155"
    ],
    "Hibernating": [
        "211", "212", "213", "221", "222", "223", "231", "232", "233"
    ],
    "Lost": [
        "111", "112", "113", "121", "122", "123", "131", "132", "133"
    ]
}


rfv_lookup = {code: segment for segment, codes in customer_dict.items() for code in codes}
rfv_map_expr = f.create_map([f.lit(x) for x in chain(*rfv_lookup.items())])

In [31]:
metrics_df = (
    df
    .withColumn("aproximated_revenue", f.col("total_spent")-(f.col("avg_order_value")*f.col("returns_count")))
    .withColumn("average_order_count", f.round(f.col("total_orders") / f.col("customer_lifetime_months"), 2))
    .withColumn("return_rate", f.round(f.col("returns_count") / f.col("total_orders"), 2))
    .withColumn("recency", f.rank().over(Window.orderBy(f.asc("days_since_last_purchase"))))
    .withColumn("frequency", f.rank().over(Window.orderBy(f.desc("average_order_count"))))
    .withColumn("value", f.rank().over(Window.orderBy(f.desc("aproximated_revenue"))))
    .withColumn("R", atribbute_score("recency"))
    .withColumn("F", f.when((f.col("return_rate") > 0.3) & (atribbute_score("frequency") > 1), atribbute_score("frequency") - 1).otherwise(atribbute_score("frequency")))
    .withColumn("V", atribbute_score("value"))
    .withColumn("RFV_code", f.concat(f.col("R"), f.col("F"), f.col("V")))
    .withColumn("Customer_Segment", f.coalesce(rfv_map_expr.getItem(f.col("RFV_code")), f.lit("Unclassified")))
)

In [32]:
metrics_df.show(35, truncate=False)

+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+-------------------+-------------------+-----------+-------+---------+-----+---+---+---+--------+----------------+
|customer_id|favorite_category|acquisition_channel|first_purchase_date|total_orders|total_spent|last_purchase_date|returns_count|avg_order_value|days_since_last_purchase|customer_lifetime_months|aproximated_revenue|average_order_count|return_rate|recency|frequency|value|R  |F  |V  |RFV_code|Customer_Segment|
+-----------+-----------------+-------------------+-------------------+------------+-----------+------------------+-------------+---------------+------------------------+------------------------+-------------------+-------------------+-----------+-------+---------+-----+---+---+---+--------+----------------+
|CUST0032   |Games            |Google             |13/02/2023         