
# Retail Analytics (pandas vs polars vs PySpark)

Este notebook usa o dataset **RetailStoreProductSalesDataset.csv** (15.000 linhas) para mostrar **equivalências práticas** entre:

- **pandas** (DataFrame em memória)
- **polars** (DataFrame/expressões colunar, bem rápido)
- **PySpark** (DataFrame distribuído)

Em cada seção você verá **a mesma análise feita 3 vezes** (uma por biblioteca), usando operações comuns do dia a dia.



## 0) Setup e leitura do dataset

> Ajuste o caminho se necessário. Aqui estou usando o arquivo montado no ambiente.


In [None]:

# Imports
import pandas as pd

# Polars
import polars as pl

# PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Caminho do dataset
CSV_PATH = r"/mnt/data/RetailStoreProductSalesDataset.csv"

# Spark session
spark = (SparkSession.builder
         .appName("retail-analytics-pandas-polars-spark")
         .getOrCreate())

# Leitura: pandas
df_pd = pd.read_csv(CSV_PATH)

# Leitura: polars
df_pl = pl.read_csv(CSV_PATH)

# Leitura: spark
df_sp = (spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(CSV_PATH))

print("pandas shape:", df_pd.shape)
print("polars shape:", df_pl.shape)
print("spark count:", df_sp.count(), " | columns:", len(df_sp.columns))



## 1) Visão rápida e checagem de tipos

Só pra garantir que estamos olhando a mesma coisa nas 3 libs.


In [None]:

# pandas
display(df_pd.head(3))
print(df_pd.dtypes)

# polars
print(df_pl.head(3))
print(df_pl.schema)

# spark
df_sp.show(3, truncate=False)
df_sp.printSchema()



## 2) Somar uma coluna (ex.: total de `ad_spend`)

Pergunta: quanto foi investido em anúncios no período?


In [None]:

# pandas
total_ad_pd = df_pd["ad_spend"].sum()

# polars
total_ad_pl = df_pl["ad_spend"].sum()

# spark
total_ad_sp = df_sp.agg(F.sum("ad_spend").alias("total_ad_spend")).collect()[0]["total_ad_spend"]

print("Total ad_spend | pandas:", total_ad_pd)
print("Total ad_spend | polars:", total_ad_pl)
print("Total ad_spend | spark :", total_ad_sp)



## 3) Filtrar linhas (ex.: dias com desconto alto e estoque baixo)

Filtro: `discount > 0.30` **e** `stock_level < 20`  
Isso pode indicar risco de ruptura + promoção agressiva.


In [None]:

# pandas
high_disc_low_stock_pd = df_pd[(df_pd["discount"] > 0.30) & (df_pd["stock_level"] < 20)]

# polars
high_disc_low_stock_pl = df_pl.filter((pl.col("discount") > 0.30) & (pl.col("stock_level") < 20))

# spark
high_disc_low_stock_sp = df_sp.filter((F.col("discount") > 0.30) & (F.col("stock_level") < 20))

print("Linhas filtradas | pandas:", len(high_disc_low_stock_pd))
print("Linhas filtradas | polars:", high_disc_low_stock_pl.height)
print("Linhas filtradas | spark :", high_disc_low_stock_sp.count())

display(high_disc_low_stock_pd.head(5))



## 4) Contar uma coluna (não nulos)

Vamos contar registros válidos em `customer_sentiment` (deve ser quase tudo, mas é um check rápido).


In [None]:

# pandas
count_sent_pd = df_pd["customer_sentiment"].count()

# polars
count_sent_pl = df_pl["customer_sentiment"].count()

# spark
count_sent_sp = df_sp.select(F.count("customer_sentiment").alias("cnt")).collect()[0]["cnt"]

print("Count customer_sentiment | pandas:", count_sent_pd)
print("Count customer_sentiment | polars:", count_sent_pl)
print("Count customer_sentiment | spark :", count_sent_sp)



## 5) Contar valores distintos (ex.: `weather_index` arredondado)

Como é numérico contínuo, contar distintos “cru” costuma ser gigante.  
Então vamos arredondar (`weather_index` com 1 casa) e contar distintos.


In [None]:

# pandas
weather_rounded_pd = df_pd["weather_index"].round(1)
nunique_weather_pd = weather_rounded_pd.nunique()

# polars
nunique_weather_pl = (df_pl
    .select(pl.col("weather_index").round(1).n_unique().alias("n_unique_weather_round1"))
    .item())

# spark
nunique_weather_sp = (df_sp
    .select(F.round("weather_index", 1).alias("weather_round1"))
    .agg(F.countDistinct("weather_round1").alias("n_unique_weather_round1"))
    .collect()[0]["n_unique_weather_round1"])

print("Distinct weather_index (round 1) | pandas:", nunique_weather_pd)
print("Distinct weather_index (round 1) | polars:", nunique_weather_pl)
print("Distinct weather_index (round 1) | spark :", nunique_weather_sp)



## 6) Agrupar e somar (group by)

Vamos criar um *bucket* simples de sentimento e somar `footfall` por faixa.  
Isso dá uma ideia de “tráfego total” associado a diferentes níveis de sentimento.


In [None]:

# Função de bucketing (para manter a lógica simples e idêntica)
def sentiment_bucket(x):
    if x < -0.3:
        return "negativo"
    elif x > 0.3:
        return "positivo"
    return "neutro"

# pandas
df_pd_tmp = df_pd.copy()
df_pd_tmp["sent_bucket"] = df_pd_tmp["customer_sentiment"].apply(sentiment_bucket)
gb_pd = (df_pd_tmp.groupby("sent_bucket", as_index=False)["footfall"].sum()
         .sort_values("footfall", ascending=False))

# polars
df_pl_tmp = df_pl.with_columns(
    pl.when(pl.col("customer_sentiment") < -0.3).then(pl.lit("negativo"))
     .when(pl.col("customer_sentiment") > 0.3).then(pl.lit("positivo"))
     .otherwise(pl.lit("neutro"))
     .alias("sent_bucket")
)
gb_pl = (df_pl_tmp
         .group_by("sent_bucket")
         .agg(pl.col("footfall").sum().alias("footfall_sum"))
         .sort("footfall_sum", descending=True))

# spark
df_sp_tmp = (df_sp
    .withColumn(
        "sent_bucket",
        F.when(F.col("customer_sentiment") < -0.3, F.lit("negativo"))
         .when(F.col("customer_sentiment") > 0.3, F.lit("positivo"))
         .otherwise(F.lit("neutro"))
    )
)
gb_sp = (df_sp_tmp
         .groupBy("sent_bucket")
         .agg(F.sum("footfall").alias("footfall_sum"))
         .orderBy(F.col("footfall_sum").desc()))

display(gb_pd)
print(gb_pl)
gb_sp.show()



## 7) Transformar colunas em linhas (melt / unpivot)

Exemplo: pegar um subconjunto de features e transformar para formato “long”  
para facilitar plots, comparações e checagem de distribuição.


In [None]:

FEATURES = ["price", "discount", "promotion_intensity", "footfall", "ad_spend"]

# Para não explodir tamanho, vamos usar uma amostra
sample_pd = df_pd[FEATURES].sample(1000, random_state=42)

# pandas: melt
long_pd = sample_pd.melt(var_name="feature", value_name="value")

# polars: unpivot (o mesmo efeito)
sample_pl = df_pl.select(FEATURES).sample(n=1000, seed=42)
long_pl = sample_pl.unpivot(on=FEATURES, variable_name="feature", value_name="value")

# spark: stack (unpivot)
sample_sp = df_sp.select(*FEATURES).sample(False, 1000/df_pd.shape[0], seed=42)
expr = "stack({n}, {pairs}) as (feature, value)".format(
    n=len(FEATURES),
    pairs=", ".join([f"'{c}', {c}" for c in FEATURES])
)
long_sp = sample_sp.selectExpr(expr)

print("pandas long shape:", long_pd.shape)
print("polars long shape:", long_pl.shape)
print("spark long count:", long_sp.count())

display(long_pd.head(10))
print(long_pl.head(10))
long_sp.show(10, truncate=False)



## 8) Estatísticas descritivas

Um “raio-x” rápido das colunas numéricas.


In [None]:

# pandas
desc_pd = df_pd.describe()
display(desc_pd)

# polars
desc_pl = df_pl.describe()
print(desc_pl)

# spark
df_sp.describe().show(truncate=False)



## 9) Criar nova coluna (ex.: `effective_price`)

Vamos criar `effective_price = price * (1 - discount)`  
Isso é uma proxy simples de preço efetivo após desconto.


In [None]:

# pandas
df_pd_eff = df_pd.copy()
df_pd_eff["effective_price"] = df_pd_eff["price"] * (1 - df_pd_eff["discount"])

# polars
df_pl_eff = df_pl.with_columns(
    (pl.col("price") * (1 - pl.col("discount"))).alias("effective_price")
)

# spark
df_sp_eff = df_sp.withColumn(
    "effective_price",
    F.col("price") * (F.lit(1) - F.col("discount"))
)

print("pandas effective_price head:")
display(df_pd_eff[["price","discount","effective_price"]].head(5))

print("polars effective_price head:")
print(df_pl_eff.select(["price","discount","effective_price"]).head(5))

print("spark effective_price head:")
df_sp_eff.select("price","discount","effective_price").show(5, truncate=False)



## 10) Excluir colunas (drop)

Suponha que você quer remover colunas para um modelo específico.


In [None]:

cols_to_drop = ["competitor_price", "return_rate"]

# pandas
df_pd_drop = df_pd.drop(columns=cols_to_drop)

# polars
df_pl_drop = df_pl.drop(cols_to_drop)

# spark
df_sp_drop = df_sp.drop(*cols_to_drop)

print("Cols originais:", len(df_pd.columns))
print("Depois do drop | pandas:", len(df_pd_drop.columns))
print("Depois do drop | polars:", len(df_pl_drop.columns))
print("Depois do drop | spark :", len(df_sp_drop.columns))



## 11) (Bônus) Ordenar (sort) e pegar Top N

Exemplo: Top 10 dias com maior `footfall` (tráfego).


In [None]:

# pandas
top_pd = df_pd.sort_values("footfall", ascending=False).head(10)

# polars
top_pl = df_pl.sort("footfall", descending=True).head(10)

# spark
top_sp = df_sp.orderBy(F.col("footfall").desc()).limit(10)

display(top_pd)
print(top_pl)
top_sp.show(10, truncate=False)



## Fechamento

Você acabou de ver o mesmo conjunto de operações (as mais comuns de análise/engenharia de dados) em **pandas**, **polars** e **PySpark**.

Dica de cérebro:  
- pandas = “faço agora na memória”  
- polars = “faço com expressões colunar rápidas”  
- spark = “faço em cluster, com plano de execução”

Se quiser, dá pra estender isso com:
- joins, window functions, pivot, tratamento de nulos, encoding/categorias
- baseline de modelagem (split, treino, métricas) com features derivadas
