# 📘 Tutorial PySpark: Funções de Agregação e Criação de Funções Personalizadas

## Objetivo Geral
- Apresentar os principais conceitos e ferramentas do PySpark relacionados a agrupamentos e agregações de dados.  
- Ensinar como criar funções personalizadas e aplicá-las com `groupBy`.  
- Preparar a base conceitual para cálculos como o PSI (Population Stability Index), mas com aplicabilidade mais ampla.  

---

## Sumário


1. [Preparação do Ambiente](#2-preparação-do-ambiente)
2. [Introdução ao PySpark](#1-introdução-ao-pyspark)  
3. [Exploração do Dataset](#3-exploração-do-dataset)  
4. [🔍 Agrupamentos com groupBy](#4-agrupamentos-com-groupby)  
5. [🧱 Janela e Agrupamentos Avançados](#5-janela-e-agrupamentos-avançados)  
6. [🛠️ Funções Personalizadas com UDF](#6-funções-personalizadas-com-udf)  
7. [🧪 Estudos de Caso](#7-estudos-de-caso)  
8. [📊 Exemplo Aplicado: Cálculo do PSI](#8-exemplo-aplicado-cálculo-do-psi)  
9. [🔚 Conclusões e Próximos Passos](#9-conclusões-e-próximos-passos)  

---


## 1. Preparação do Ambiente

### 1.1. Importações necessárias.


In [38]:
# Bibliotecas padrão
import numpy as np
import pandas as pd

# PySpark - Sessão
from pyspark.sql import SparkSession

# PySpark - Funções
from pyspark.sql.functions import (
    concat_ws,
    avg,
    col,
    count,
    lit,
    log,
    percentile_approx,
    sum as Fsum,
    udf,
    when,
    to_date,
    lpad,
)
from pyspark.sql import functions as F

# PySpark - Tipos
from pyspark.sql.types import IntegerType, StringType

# PySpark - Janela
from pyspark.sql.window import Window


### 1.2. Simulação de um dataset 

Iremos simular um dataset com colunas `score`, `ambiente`, `modelo`, `target`.

🎯 Objetivo:
	•	Simular dois modelos (modelo_a, modelo_b)
	•	Cada um com dados nos ambientes:
	•	DEV: 6 meses
	•	OOT: 3 meses
	•	PRD: 12 meses
	•	modelo_a permanece calibrado
	•	modelo_b vai descalibrando ao longo do tempo

⸻

📦 Etapas da simulação:
	1.	Gerar uma base com colunas: score, env, year, month, model
	2.	modelo_a: score estável em todos os ambientes
	3.	modelo_b: score muda ao longo do tempo (ex: média ou variância cresce em PRD)


In [5]:
import pandas as pd
import numpy as np

def simulate_model_data(model_name, start_year=2023):
    """
    Simula scores e variável resposta para um modelo nos ambientes DEV, OOT e PRD.
    
    modelo_a: score calibrado e estável
    modelo_b: score descalibrando em PRD (aumenta média ao longo do tempo)
    """
    rows = []
    rng = np.random.default_rng(seed=42 if model_name == 'modelo_a' else 99)

    def generate_block(env, year, month, loc, size=1000):
        # Gera scores com média loc e desvio padrão 0.1
        scores = rng.normal(loc=loc, scale=0.1, size=size)
        scores = np.clip(scores, 0.01, 0.99)  # garantir intervalo válido [0, 1]
        vr = rng.binomial(1, p=scores)  # variável resposta: simula desfecho com base no score
        return pd.DataFrame({
            'model': model_name,
            'env': env,
            'year': year,
            'month': month,
            'score': scores,
            'vr': vr
        })

    # Ambiente DEV (6 meses estáveis)
    for month in range(1, 7):
        rows.append(generate_block('DEV', start_year, month, loc=0.5))

    # Ambiente OOT (3 meses)
    for month in range(7, 10):
        loc = 0.5 if model_name == 'modelo_a' else 0.55  # modelo_b levemente deslocado
        rows.append(generate_block('OOT', start_year, month, loc=loc))

    # Ambiente PRD (12 meses)
    for month in range(1, 13):
        year = start_year + 1
        loc = 0.5 if model_name == 'modelo_a' else 0.5 + 0.02 * month  # modelo_b descalibra gradualmente
        rows.append(generate_block('PRD', year, month, loc=loc))

    return pd.concat(rows, ignore_index=True)

# Gerar os dados simulados
df_a = simulate_model_data('modelo_a')
df_b = simulate_model_data('modelo_b')

# Unir os dois modelos em um único DataFrame
df = pd.concat([df_a, df_b], ignore_index=True)

# Visualizar primeiros registros
print(df.head())

      model  env  year  month     score  vr
0  modelo_a  DEV  2023      1  0.530472   0
1  modelo_a  DEV  2023      1  0.396002   0
2  modelo_a  DEV  2023      1  0.575045   1
3  modelo_a  DEV  2023      1  0.594056   1
4  modelo_a  DEV  2023      1  0.304896   0


🧪 Colunas do DataFrame final:

* model: "modelo_a" ou "modelo_b"
* env: ambiente (DEV, OOT, PRD)
* year: ano
* month: mês
* score: probabilidade estimada pelo modelo (entre 0 e 1)
* vr: variável resposta (0 ou 1), simulada com base no score

---

## 2. Introdução ao PySpark

### 2.1. Por que usar PySpark para grandes volumes de dados?

Em projetos de ciência de dados ou engenharia de dados que lidam com grandes volumes de informação (de gigabytes a terabytes ou mais), bibliotecas tradicionais como `pandas` deixam de ser eficientes, pois operam em memória (RAM) e em apenas uma máquina. Isso limita o processamento a conjuntos de dados menores e torna as operações mais lentas e sujeitas a erros de memória.

O **PySpark** é a API em Python do **Apache Spark**, um motor de processamento distribuído altamente escalável. Ele permite:

- **Processamento paralelo em cluster**: divide os dados entre várias máquinas ou núcleos.
- **Escalabilidade**: funciona localmente, em clusters locais (como com `Spark Standalone`) ou em ambientes distribuídos como Hadoop/YARN, Kubernetes, Databricks, EMR (AWS), etc.
- **Tolerância a falhas**: reexecuta automaticamente tarefas que falham.
- **Alto desempenho**: otimizado para processar dados em lote e em tempo real.

> 💡 Em resumo: PySpark permite que você escale seu código em Python para trabalhar com big data de maneira eficiente e robusta.


### 2.2. O que é o `SparkSession` e por que ele é essencial?

O `SparkSession` é a **porta de entrada** para utilizar o PySpark. Ele é o ponto central por onde você acessa todas as funcionalidades do Spark, como:

- Leitura e escrita de dados (CSV, Parquet, JSON, JDBC, etc.)
- Manipulação de DataFrames e execução de SQL
- Criação de RDDs (Resilient Distributed Datasets)
- Configuração do ambiente de execução (como número de partições, uso de cache, etc.)


In [6]:
spark = SparkSession.builder \
    .appName("Tutorial PySpark") \
    .getOrCreate()

🧠 Explicando o código:

*	.builder: inicia a configuração.
*	.appName("Tutorial PySpark"): define o nome da aplicação, útil para monitoramento.
*	.getOrCreate(): cria a sessão Spark se não existir, ou reutiliza uma existente.


📝 Nota: O objeto spark será utilizado ao longo de todo o tutorial para criar e manipular DataFrames.


### 2.3. Transformando o pandas em um dataframe do pyspark

In [8]:
spkdf = spark.createDataFrame(df)

---
## 3. Exploração do Dataset

- Inspeção inicial com `.show()`, `.printSchema()`, `.select()`, `.filter()`, `.distinct()`.
- Entendimento da estrutura dos dados.


In [9]:
spkdf.show(10)

+--------+---+----+-----+-------------------+---+
|   model|env|year|month|              score| vr|
+--------+---+----+-----+-------------------+---+
|modelo_a|DEV|2023|    1| 0.5304717079754432|  0|
|modelo_a|DEV|2023|    1| 0.3960015893759504|  0|
|modelo_a|DEV|2023|    1| 0.5750451195806457|  1|
|modelo_a|DEV|2023|    1| 0.5940564716391213|  1|
|modelo_a|DEV|2023|    1|0.30489648113461637|  0|
|modelo_a|DEV|2023|    1| 0.3697820493137682|  0|
|modelo_a|DEV|2023|    1| 0.5127840403167285|  0|
|modelo_a|DEV|2023|    1|0.46837574076564176|  1|
|modelo_a|DEV|2023|    1|0.49831988424957113|  1|
|modelo_a|DEV|2023|    1|  0.414695607242642|  0|
+--------+---+----+-----+-------------------+---+
only showing top 10 rows



In [10]:
spkdf.printSchema()

root
 |-- model: string (nullable = true)
 |-- env: string (nullable = true)
 |-- year: long (nullable = true)
 |-- month: long (nullable = true)
 |-- score: double (nullable = true)
 |-- vr: long (nullable = true)



In [17]:
#  Select columns by different ways
spkdf.select("model","env").show(3)
spkdf.select(spkdf.model,spkdf.env).show(3)
spkdf.select(spkdf["model"],spkdf["env"]).show(3)

+--------+---+
|   model|env|
+--------+---+
|modelo_a|DEV|
|modelo_a|DEV|
|modelo_a|DEV|
+--------+---+
only showing top 3 rows

+--------+---+
|   model|env|
+--------+---+
|modelo_a|DEV|
|modelo_a|DEV|
|modelo_a|DEV|
+--------+---+
only showing top 3 rows

+--------+---+
|   model|env|
+--------+---+
|modelo_a|DEV|
|modelo_a|DEV|
|modelo_a|DEV|
+--------+---+
only showing top 3 rows



In [26]:
# By using col() function
spkdf.select(col("model"),col("env")).show(3)

# Select columns by regular expression
spkdf.select(spkdf.colRegex("`.*(mod).*`")).show(3)

#Selects columns 2 to 4  and top 3 rows
spkdf.select(spkdf.columns[2:4]).show(3)

+--------+---+
|   model|env|
+--------+---+
|modelo_a|DEV|
|modelo_a|DEV|
|modelo_a|DEV|
+--------+---+
only showing top 3 rows

+--------+
|   model|
+--------+
|modelo_a|
|modelo_a|
|modelo_a|
+--------+
only showing top 3 rows

+----+-----+
|year|month|
+----+-----+
|2023|    1|
|2023|    1|
|2023|    1|
+----+-----+
only showing top 3 rows



In [29]:
# filter
spkdf.filter(col("model") == "modelo_a").show(3)
spkdf.filter(spkdf.model == "modelo_a").show(3)
spkdf.filter("model = 'modelo_a'").show(3)

+--------+---+----+-----+------------------+---+
|   model|env|year|month|             score| vr|
+--------+---+----+-----+------------------+---+
|modelo_a|DEV|2023|    1|0.5304717079754432|  0|
|modelo_a|DEV|2023|    1|0.3960015893759504|  0|
|modelo_a|DEV|2023|    1|0.5750451195806457|  1|
+--------+---+----+-----+------------------+---+
only showing top 3 rows

+--------+---+----+-----+------------------+---+
|   model|env|year|month|             score| vr|
+--------+---+----+-----+------------------+---+
|modelo_a|DEV|2023|    1|0.5304717079754432|  0|
|modelo_a|DEV|2023|    1|0.3960015893759504|  0|
|modelo_a|DEV|2023|    1|0.5750451195806457|  1|
+--------+---+----+-----+------------------+---+
only showing top 3 rows

+--------+---+----+-----+------------------+---+
|   model|env|year|month|             score| vr|
+--------+---+----+-----+------------------+---+
|modelo_a|DEV|2023|    1|0.5304717079754432|  0|
|modelo_a|DEV|2023|    1|0.3960015893759504|  0|
|modelo_a|DEV|2023|

In [35]:
spkdf.filter("model = 'modelo_a' AND env = 'DEV'").show(3)

+--------+---+----+-----+------------------+---+
|   model|env|year|month|             score| vr|
+--------+---+----+-----+------------------+---+
|modelo_a|DEV|2023|    1|0.5304717079754432|  0|
|modelo_a|DEV|2023|    1|0.3960015893759504|  0|
|modelo_a|DEV|2023|    1|0.5750451195806457|  1|
+--------+---+----+-----+------------------+---+
only showing top 3 rows



---

## 4. Agrupamentos com `groupBy`

O método `.groupBy()` em PySpark é usado para agrupar linhas de um DataFrame com base nos valores de uma ou mais colunas, permitindo aplicar funções de agregação sobre esses grupos. Ele é equivalente ao `groupby()` do pandas, mas funciona de forma distribuída e escalável.


### 4.1. Sintaxe básica

```python
df.groupBy("coluna").agg(função_agregadora)
```

Você pode agrupar por uma ou mais colunas, e aplicar funções como `count()`, `sum()`, `avg()`, `min()`, `max()`, entre outras.



In [40]:
# Contagem de registros por model
spkdf.groupBy("model").count().show()

# Média de score por env
spkdf.groupBy("env").agg(F.avg("score")).show()

# Soma de eventos por model e env
spkdf.groupBy("model", "env").agg(F.sum("vr")).show()



+--------+-----+
|   model|count|
+--------+-----+
|modelo_a|21000|
|modelo_b|21000|
+--------+-----+

+---+-------------------+
|env|         avg(score)|
+---+-------------------+
|PRD| 0.5645661802102114|
|OOT| 0.5278309204486932|
|DEV|0.49970832222858647|
+---+-------------------+

+--------+---+-------+
|   model|env|sum(vr)|
+--------+---+-------+
|modelo_a|DEV|   2987|
|modelo_a|OOT|   1485|
|modelo_a|PRD|   5947|
|modelo_b|DEV|   3052|
|modelo_b|OOT|   1656|
|modelo_b|PRD|   7588|
+--------+---+-------+



### 4.2. Agregações múltiplas com .agg({})

Você pode aplicar várias agregações ao mesmo tempo, inclusive sobre diferentes colunas:

In [42]:
spkdf.groupBy("model").agg({
    "score": "avg",
    "vr": "sum"
}).show()


+--------+------------------+-------+
|   model|        avg(score)|sum(vr)|
+--------+------------------+-------+
|modelo_a|0.5004163597996216|  10419|
|modelo_b|0.5811585789851526|  12296|
+--------+------------------+-------+




⚠️ Essa forma é mais limitada: você não pode adicionar alias com nomes personalizados.



### 4.3. Agregações múltiplas com .agg(F.func()) (forma recomendada)

A forma mais flexível é usando `F.func()` com alias (`.alias("nome_coluna")`):

In [49]:
spkdf.groupBy("model", "env").agg(
    F.avg("score").alias("media_score"),
    F.sum("vr").alias("total_eventos"),
    F.count("*").alias("quantidade_registros"),
    F.round(
        (F.sum("vr") / F.count("*")) * 100,
        2
    ).alias("exposição(%)")
).show()


+--------+---+-------------------+-------------+--------------------+------------+
|   model|env|        media_score|total_eventos|quantidade_registros|exposição(%)|
+--------+---+-------------------+-------------+--------------------+------------+
|modelo_a|DEV|0.49982095366508195|         2987|                6000|       49.78|
|modelo_a|OOT| 0.5012523963043177|         1485|                3000|        49.5|
|modelo_a|PRD| 0.5005050537407192|         5947|               12000|       49.56|
|modelo_b|DEV|0.49959569079209093|         3052|                6000|       50.87|
|modelo_b|OOT| 0.5544094445930686|         1656|                3000|        55.2|
|modelo_b|PRD| 0.6286273066797037|         7588|               12000|       63.23|
+--------+---+-------------------+-------------+--------------------+------------+



### 4.4. Agrupamentos não agregativos

Você também pode usar `groupBy()` para operações que não são diretamente agregações, como `collect_list` e `collect_set`:

*	`collect_list`: junta os valores de uma coluna em listas (permite repetição).
*	`collect_set`: junta os valores sem repetir (como um set).


In [51]:
# Lista de scores por model
spkdf.groupBy("model").agg(F.collect_list("score").alias("lista_scores")).show(truncate=False)

# Conjunto único de ambientes por model
spkdf.groupBy("model").agg(F.collect_set("env").alias("ambientes_unicos")).show()

                                                                                

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### 4.5.Uso com `.select()` ou `.withColumn()`

Em algumas situações, você pode querer transformar os resultados do groupBy() usando .select() ou .withColumn() para criar colunas derivadas ou realizar novos cálculos.

Exemplo: criar uma nova coluna com score médio classificado

In [54]:
media_df = spkdf.groupBy("model").agg(F.avg("score").alias("media_score"))

# Adiciona uma nova coluna que classifica o score como alto ou baixo
media_df = media_df.withColumn(
    "classificacao",
    F.when(F.col("media_score") >= 0.6, "Alto").otherwise("Baixo")
)

media_df.show()

+--------+------------------+-------------+
|   model|       media_score|classificacao|
+--------+------------------+-------------+
|modelo_a|0.5004163597996216|        Baixo|
|modelo_b|0.5811585789851526|        Baixo|
+--------+------------------+-------------+




Você também pode fazer .select() para reordenar ou renomear colunas após a agregação:


In [57]:
media_df.select("model", "classificacao", "media_score").show()

+--------+-------------+------------------+
|   model|classificacao|       media_score|
+--------+-------------+------------------+
|modelo_a|        Baixo|0.5004163597996216|
|modelo_b|        Baixo|0.5811585789851526|
+--------+-------------+------------------+



In [58]:
spkdf.groupBy("model", "env") \
    .agg(F.avg("score").alias("media_score")) \
    .orderBy("model", "env") \
    .show()

                                                                                

+--------+---+-------------------+
|   model|env|        media_score|
+--------+---+-------------------+
|modelo_a|DEV|0.49982095366508195|
|modelo_a|OOT| 0.5012523963043177|
|modelo_a|PRD| 0.5005050537407192|
|modelo_b|DEV|0.49959569079209093|
|modelo_b|OOT| 0.5544094445930686|
|modelo_b|PRD| 0.6286273066797037|
+--------+---+-------------------+



---

## 5. Janela e Agrupamentos Avançados

### 5.1. Diferença entre groupBy e funções de janela (window functions)



Embora `groupBy()` e `window functions` (funções de janela) pareçam semelhantes à primeira vista (pois ambos trabalham com agregações), eles têm finalidades e comportamentos bem diferentes:



| Característica        | `groupBy()`                                  | Window Functions                                           |
|-----------------------|-----------------------------------------------|-------------------------------------------------------------|
| Tipo de operação      | Reduz a cardinalidade do DataFrame           | Mantém a cardinalidade (não reduz número de linhas)         |
| Retorno por linha     | Não (gera 1 linha por grupo)                 | Sim (retorna uma nova coluna com valores agregados)         |
| Exemplo de uso        | Soma total por categoria                     | Soma acumulada dentro de cada categoria                     |
| Ideal para            | Resumos agregados                            | Cálculos linha a linha dentro de grupos                     |



### 5.2. Uso de Window.partitionBy().orderBy()

As funções de janela exigem que você defina um “escopo”, ou seja, dentro de qual grupo a função deve operar. Isso é feito com partitionBy().

Você pode também ordenar os dados dentro de cada partição com orderBy() — o que é necessário para funções como rank() ou row_number().




In [65]:
janela = Window.partitionBy("model").orderBy(F.desc("score"))

spkdf.withColumn("rank_score", F.rank().over(janela)).show(5)

+--------+---+----+-----+------------------+---+----------+
|   model|env|year|month|             score| vr|rank_score|
+--------+---+----+-----+------------------+---+----------+
|modelo_a|PRD|2024|   11|0.9326920740610131|  1|         1|
|modelo_a|PRD|2024|    9|0.9087923070821299|  1|         2|
|modelo_a|DEV|2023|    5|0.9025824042727265|  1|         3|
|modelo_a|PRD|2024|    2|0.8556362785550131|  1|         4|
|modelo_a|PRD|2024|    8|0.8476567355053107|  0|         5|
+--------+---+----+-----+------------------+---+----------+
only showing top 5 rows



**Exemplo 1**: Cálculo de proporções dentro de grupos

Queremos calcular a proporção de cada linha em relação ao total do grupo (por exemplo, proporção de eventos dentro de cada modelo):

In [72]:
janela_model = Window.partitionBy("model")

spkdf.withColumn("prop_eventos_modelo", 
                F.round(
                    F.col("vr") / F.sum("vr").over(janela_model)*100, 2
                )
                ).show(5)

+--------+---+----+-----+-------------------+---+-------------------+
|   model|env|year|month|              score| vr|prop_eventos_modelo|
+--------+---+----+-----+-------------------+---+-------------------+
|modelo_a|DEV|2023|    1| 0.5304717079754432|  0|                0.0|
|modelo_a|DEV|2023|    1| 0.3960015893759504|  0|                0.0|
|modelo_a|DEV|2023|    1| 0.5750451195806457|  1|               0.01|
|modelo_a|DEV|2023|    1| 0.5940564716391213|  1|               0.01|
|modelo_a|DEV|2023|    1|0.30489648113461637|  0|                0.0|
+--------+---+----+-----+-------------------+---+-------------------+
only showing top 5 rows



**Exemplo 2**: Ranking por score dentro de cada modelo

Aqui usamos `row_number()` e `rank()` para identificar os maiores scores por modelo:

In [None]:
janela_ordenada = Window.partitionBy("model").orderBy(F.desc("score"))

spkdf.withColumn("posicao", F.row_number().over(janela_ordenada)).show()

+--------+---+----+-----+------------------+---+-------+
|   model|env|year|month|             score| vr|posicao|
+--------+---+----+-----+------------------+---+-------+
|modelo_a|PRD|2024|   11|0.9326920740610131|  1|      1|
|modelo_a|PRD|2024|    9|0.9087923070821299|  1|      2|
|modelo_a|DEV|2023|    5|0.9025824042727265|  1|      3|
|modelo_a|PRD|2024|    2|0.8556362785550131|  1|      4|
|modelo_a|PRD|2024|    8|0.8476567355053107|  0|      5|
|modelo_a|DEV|2023|    3|0.8454046402244182|  1|      6|
|modelo_a|PRD|2024|    4|0.8327472286429946|  1|      7|
|modelo_a|PRD|2024|   10|0.8308059325188116|  1|      8|
|modelo_a|PRD|2024|   11|0.8290696813621817|  1|      9|
|modelo_a|DEV|2023|    3| 0.827102612221459|  1|     10|
|modelo_a|PRD|2024|    5|0.8225993124147924|  0|     11|
|modelo_a|DEV|2023|    4|0.8208160913916495|  1|     12|
|modelo_a|OOT|2023|    9|0.8200242980517064|  0|     13|
|modelo_a|PRD|2024|   10|0.8193132037801697|  1|     14|
|modelo_a|DEV|2023|    1|0.8178

> ⚠️ **Atenção**
> - `row_number()` dá números únicos e sequenciais (`1, 2, 3, …`)
> - `rank()` atribui a mesma posição a empates e pode pular posições (`1, 1, 3, …`)

### 5.3. Quando usar Window Functions?

Use funções de janela quando você quiser:

*	Calcular totais ou médias sem colapsar o DataFrame
*	Comparar cada linha com outras do mesmo grupo
*	Aplicar funções acumuladas, ranking, diferença entre linhas, entre outros



---

## 6. 🛠️ Funções Personalizadas com UDF

- Criação de UDFs com `@udf` ou `F.udf()`.
- Aplicação prática para transformar colunas.
- Exemplo: criar faixas de `score` com base em percentis.

---

## 7. 🧪 Estudos de Caso

- Cálculo da proporção de eventos por faixa de score.
- Comparação entre ambientes (ex: `DEV` vs `PRD`).
- Visualização de distribuições e diferenças ao longo do tempo.

---

## 8. 📊 Exemplo Aplicado: Cálculo do PSI

- O que é o PSI e sua importância para monitoramento de modelos.
- Passo a passo aplicando `groupBy`, UDFs e agregações.
- Comentários sobre performance e escalabilidade no PySpark.

---

## 9. 🔚 Conclusões e Próximos Passos

- Onde aplicar os conceitos aprendidos?
- Extensões possíveis:  
  - KS Test  
  - CSI (Characteristic Stability Index)  
  - Análises temporais com janelas de tempo  
  - Agregações em dados de streaming  

---

✅ **Pronto para começar?**  
Se desejar, posso agora gerar o conteúdo detalhado de cada seção com código PySpark e explicações passo a passo.