In [1]:
!pip install delta-spark==3.1.0



### Etapa 1: Inicialização da sessão Spark (camada Silver)

Nesta etapa inicializo a sessão Spark configurada com suporte ao Delta Lake, específica para o processamento da camada Silver (`mvp_turnover_silver`). Essa sessão será utilizada para ler os dados da camada Bronze, aplicar as transformações necessárias e gerar a tabela Silver.

**Resultado esperado:**  
A sessão Spark é criada com sucesso e exibida na saída da célula, indicando que o ambiente está pronto para os próximos passos de tratamento dos dados.


In [2]:
from pyspark.sql import SparkSession
from delta import configure_spark_with_delta_pip

builder = (
    SparkSession.builder
        .appName("mvp_turnover_silver")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

spark


### Etapa 2: Carregamento da camada Bronze

Nesta etapa leio a tabela da camada Bronze (`delta_ibm_hr`) em formato Delta e carrego os dados para o DataFrame `df_bronze`. Em seguida, exibo as 5 primeiras linhas e o esquema completo da tabela para conferir se os campos e tipos estão de acordo com o esperado antes de iniciar os tratamentos da camada Silver.

**Resultado esperado:**  
Visualizar uma amostra dos dados brutos tratados na Bronze e o `printSchema()` com todas as colunas que servirão de base para as transformações da camada Silver.


In [3]:
path_bronze = "/home/jovyan/data/bronze/delta_ibm_hr"

df_bronze = spark.read.format("delta").load(path_bronze)

df_bronze.show(5)
df_bronze.printSchema()


+---+---------+-----------------+---------+--------------------+----------------+---------+--------------+-------------+--------------+-----------------------+------+----------+--------------+--------+--------------------+---------------+-------------+-------------+-----------+------------------+------+--------+-----------------+-----------------+------------------------+-------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+
|Age|Attrition|   BusinessTravel|DailyRate|          Department|DistanceFromHome|Education|EducationField|EmployeeCount|EmployeeNumber|EnvironmentSatisfaction|Gender|HourlyRate|JobInvolvement|JobLevel|             JobRole|JobSatisfaction|MaritalStatus|MonthlyIncome|MonthlyRate|NumCompaniesWorked|Over18|OverTime|PercentSalaryHike|PerformanceRating|RelationshipSatisfaction|StandardHours|StockOptionLevel|TotalWorkingYears|TrainingTimesLastYear|WorkLifeBalanc

### Etapa 3: Preparação do DataFrame Silver

Nesta etapa crio o DataFrame `df_silver` a partir de `df_bronze` e importo o módulo
`pyspark.sql.functions` com o alias `F`, que será utilizado nas transformações da camada
Silver (por exemplo: `F.col`, `F.trim`, `F.lower`, `F.when`, `F.count`, entre outras).  
A partir deste ponto, todas as alterações passam a ser feitas em `df_silver`, preservando
os dados originais em `df_bronze`.

**Resultado esperado:**  
Ter o DataFrame `df_silver` inicializado com o mesmo conteúdo da Bronze e pronto para
receber os tratamentos e análises da camada Silver.
a Silver.


In [4]:
import pyspark.sql.functions as F

df_silver = df_bronze


### Etapa 4: Contagem de valores nulos por coluna

Nesta etapa avalio a qualidade básica dos dados verificando a quantidade de valores
nulos (ou vazios) em cada coluna do DataFrame `df_silver`.  
Isso ajuda a identificar atributos que podem precisar de tratamento específico
(imputação, exclusão ou análise mais profunda).

**Resultado esperado:**  
Uma tabela com o número de valores nulos por coluna, além do total de linhas da base.


In [6]:
total_linhas = df_silver.count()
print(f"Total de linhas na base: {total_linhas}")

null_counts = df_silver.select([
    F.sum(
        F.when(F.col(c).isNull() | (F.col(c) == ""), 1).otherwise(0)
    ).alias(c)
    for c in df_silver.columns
])

null_counts.show(vertical=True, truncate=False)


Total de linhas na base: 1470
-RECORD 0-----------------------
 Age                      | 0   
 Attrition                | 0   
 BusinessTravel           | 0   
 DailyRate                | 0   
 Department               | 0   
 DistanceFromHome         | 0   
 Education                | 0   
 EducationField           | 0   
 EmployeeCount            | 0   
 EmployeeNumber           | 0   
 EnvironmentSatisfaction  | 0   
 Gender                   | 0   
 HourlyRate               | 0   
 JobInvolvement           | 0   
 JobLevel                 | 0   
 JobRole                  | 0   
 JobSatisfaction          | 0   
 MaritalStatus            | 0   
 MonthlyIncome            | 0   
 MonthlyRate              | 0   
 NumCompaniesWorked       | 0   
 Over18                   | 0   
 OverTime                 | 0   
 PercentSalaryHike        | 0   
 PerformanceRating        | 0   
 RelationshipSatisfaction | 0   
 StandardHours            | 0   
 StockOptionLevel         | 0   
 TotalWorking

### Etapa 5: Verificação de registros duplicados

Nesta etapa verifico se existem registros duplicados na tabela, considerando todas
as colunas.  
Essa informação é importante para avaliar se há necessidade de deduplicação antes
das análises.

**Resultado esperado:**  
Quantidade de registros duplicados (caso existam) na base Silver.


In [7]:
total_completo = df_silver.count()
total_distinto = df_silver.dropDuplicates().count()
qtd_duplicados = total_completo - total_distinto

print(f"Total de linhas (com possíveis duplicados): {total_completo}")
print(f"Total de linhas distintas: {total_distinto}")
print(f"Registros duplicados (todas as colunas): {qtd_duplicados}")


Total de linhas (com possíveis duplicados): 1470
Total de linhas distintas: 1470
Registros duplicados (todas as colunas): 0


### Etapa 6: Distribuição da variável target (`Attrition`)

Nesta etapa analiso a distribuição da variável alvo `Attrition`, que indica se o
funcionário saiu ou não da empresa.  
Calculo a quantidade e o percentual de cada categoria (`Yes` / `No`), pois isso
ajuda a entender o balanceamento da base para as análises de turnover.

**Resultado esperado:**  
Uma tabela com a contagem e o percentual de funcionários que saíram e que permaneceram.


In [9]:
from pyspark.sql.functions import round as ps_round

total = df_silver.count()

dist_attrition = (
    df_silver
      .groupBy("attrition") 
      .agg(F.count("*").alias("qtd"))
      .withColumn("perc", ps_round(F.col("qtd") / total * 100, 2))
)

dist_attrition.show(truncate=False)


+---------+----+-----+
|attrition|qtd |perc |
+---------+----+-----+
|No       |1233|83.88|
|Yes      |237 |16.12|
+---------+----+-----+



### Etapa 7: Estatísticas descritivas das variáveis numéricas

Nesta etapa calculo estatísticas descritivas (contagem, média, desvio padrão, mínimos
e máximos) para as variáveis numéricas da base.  
O objetivo é entender a escala dos atributos e identificar possíveis valores
inconsistentes.

**Resultado esperado:**  
Uma tabela com estatísticas descritivas para as colunas numéricas do DataFrame `df_silver`.


In [10]:
numeric_cols = [c for c, t in df_silver.dtypes if t in ("int", "double", "bigint")]
numeric_cols

['Age',
 'DailyRate',
 'DistanceFromHome',
 'Education',
 'EmployeeCount',
 'EmployeeNumber',
 'EnvironmentSatisfaction',
 'HourlyRate',
 'JobInvolvement',
 'JobLevel',
 'JobSatisfaction',
 'MonthlyIncome',
 'MonthlyRate',
 'NumCompaniesWorked',
 'PercentSalaryHike',
 'PerformanceRating',
 'RelationshipSatisfaction',
 'StandardHours',
 'StockOptionLevel',
 'TotalWorkingYears',
 'TrainingTimesLastYear',
 'WorkLifeBalance',
 'YearsAtCompany',
 'YearsInCurrentRole',
 'YearsSinceLastPromotion',
 'YearsWithCurrManager']

In [11]:
df_silver.select(numeric_cols).describe().show(truncate=False)


+-------+------------------+------------------+----------------+------------------+-------------+-----------------+-----------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+-------------------+------------------------+-------------+------------------+------------------+---------------------+------------------+------------------+------------------+-----------------------+--------------------+
|summary|Age               |DailyRate         |DistanceFromHome|Education         |EmployeeCount|EmployeeNumber   |EnvironmentSatisfaction|HourlyRate        |JobInvolvement    |JobLevel          |JobSatisfaction   |MonthlyIncome    |MonthlyRate       |NumCompaniesWorked|PercentSalaryHike |PerformanceRating  |RelationshipSatisfaction|StandardHours|StockOptionLevel  |TotalWorkingYears |TrainingTimesLastYear|WorkLifeBalance   |YearsAtCompany    |YearsInCurrentRole|YearsSinceLastPromotio

### Etapa 8: Detecção simples de outliers (Regra do IQR)

Nesta etapa faço uma detecção simples de outliers numéricos, utilizando a regra do
IQR (Intervalo Interquartil).  
Para cada variável numérica calculo Q1, Q3, IQR, limites inferior/superior e a
quantidade de registros fora desses limites.

**Resultado esperado:**  
Uma tabela com o resumo de outliers por coluna numérica, permitindo identificar
atributos com maior concentração de valores extremos.


In [13]:
from pyspark.sql import Row

outliers_info = []

for c in numeric_cols:
    q1, q3 = df_silver.approxQuantile(c, [0.25, 0.75], 0.01)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr

    qtd_outliers = df_silver.filter(
        (F.col(c) < lower) | (F.col(c) > upper)
    ).count()

    outliers_info.append(
        Row(
            coluna=c,
            q1=float(q1),
            q3=float(q3),
            iqr=float(iqr),
            limite_inf=float(lower),
            limite_sup=float(upper),
            qtd_outliers=int(qtd_outliers)
        )
    )

df_outliers = spark.createDataFrame(outliers_info)
df_outliers.orderBy(F.col("qtd_outliers").desc()).show(truncate=False)


+------------------------+------+-------+-------+----------+----------+------------+
|coluna                  |q1    |q3     |iqr    |limite_inf|limite_sup|qtd_outliers|
+------------------------+------+-------+-------+----------+----------+------------+
|TrainingTimesLastYear   |2.0   |3.0    |1.0    |0.5       |4.5       |238         |
|PerformanceRating       |3.0   |3.0    |0.0    |3.0       |3.0       |226         |
|YearsSinceLastPromotion |0.0   |2.0    |2.0    |-3.0      |5.0       |215         |
|MonthlyIncome           |2867.0|8020.0 |5153.0 |-4862.5   |15749.5   |129         |
|YearsAtCompany          |3.0   |9.0    |6.0    |-6.0      |18.0      |104         |
|StockOptionLevel        |0.0   |1.0    |1.0    |-1.5      |2.5       |85          |
|TotalWorkingYears       |6.0   |15.0   |9.0    |-7.5      |28.5      |63          |
|NumCompaniesWorked      |1.0   |4.0    |3.0    |-3.5      |8.5       |52          |
|YearsInCurrentRole      |2.0   |7.0    |5.0    |-5.5      |14.5 

### Etapa 9: Cardinalidade das colunas categóricas

Nesta etapa analiso a cardinalidade das variáveis categóricas, ou seja, a quantidade
de valores distintos em cada coluna do tipo `string`.  
Essa análise ajuda a identificar atributos com poucas categorias (bons candidatos
para agrupamentos) ou com cardinalidade muito alta (que podem ser mais complexos
para modelagem).

**Resultado esperado:**  
Uma tabela com a quantidade de categorias distintas (cardinalidade) por coluna
categórica.


In [14]:
string_cols = [c for c, t in df_silver.dtypes if t == "string"]
string_cols


['Attrition',
 'BusinessTravel',
 'Department',
 'EducationField',
 'Gender',
 'JobRole',
 'MaritalStatus',
 'Over18',
 'OverTime']

In [16]:
from pyspark.sql import Row

card_info = []
for c in string_cols:
    qtd_distintos = df_silver.select(c).distinct().count()
    card_info.append(Row(coluna=c, cardinalidade=int(qtd_distintos)))

df_card = spark.createDataFrame(card_info)
df_card.orderBy(F.col("cardinalidade").desc()).show(truncate=False)


+--------------+-------------+
|coluna        |cardinalidade|
+--------------+-------------+
|JobRole       |9            |
|EducationField|6            |
|BusinessTravel|3            |
|Department    |3            |
|MaritalStatus |3            |
|Gender        |2            |
|Attrition     |2            |
|OverTime      |2            |
|Over18        |1            |
+--------------+-------------+



### Etapa 10: Identificação das colunas do tipo string

Nesta etapa identifico todas as colunas do DataFrame Silver que possuem o tipo
`string`.  
Essa seleção é necessária para aplicar, posteriormente, as transformações de limpeza
e padronização exclusivamente nessas colunas.

**Resultado esperado:**  
Exibir a lista de colunas categóricas (strings) presentes no DataFrame `df_silver`,
que serão tratadas nas próximas etapas.


In [17]:
string_cols = [c for c, t in df_silver.dtypes if t == "string"]
string_cols


['Attrition',
 'BusinessTravel',
 'Department',
 'EducationField',
 'Gender',
 'JobRole',
 'MaritalStatus',
 'Over18',
 'OverTime']

### Etapa 11: Padronização das colunas do tipo string

Nesta etapa aplico duas transformações de limpeza em todas as colunas do tipo
`string`:

- remoção de espaços em branco nas extremidades (`trim`)
- conversão do texto para minúsculas (`lower`)

Isso garante maior consistência nos dados categóricos e evita problemas em
agrupamentos e filtros causados por variações de formatação.

**Resultado esperado:**  
As colunas de texto do DataFrame `df_silver` passam a estar padronizadas em letras
minúsculas e sem espaços sobrando no início ou no fim.


In [18]:
for c_name in string_cols:
    df_silver = df_silver.withColumn(
        c_name,
        F.trim(F.lower(F.col(c_name)))
    )

### Etapa 12: Criação de colunas derivadas (faixa salarial e faixa etária)

Nesta etapa crio duas colunas derivadas a partir de variáveis numéricas já existentes:

- `faixa_salarial`: classifica o funcionário em baixa, média ou alta faixa salarial
  com base no valor de `monthlyincome`;
- `faixa_etaria`: agrupa os funcionários em jovem, adulto ou sênior com base na
  idade (`age`).

Ao final, apresento as colunas `age`, `faixa_etaria`, `monthlyincome` e
`faixa_salarial` para conferir se as classificações foram aplicadas corretamente.

**Resultado esperado:**  
Visualizar as novas colunas `faixa_salarial` e `faixa_etaria` associadas às
informações de idade e renda, confirmando que as regras de categorização foram
aplicadas conforme o esperado.


In [20]:
# Faixa salarial com base em monthlyincome
df_silver = df_silver.withColumn(
    "faixa_salarial",
    F.when(F.col("monthlyincome") < 3000, "baixa")
     .when((F.col("monthlyincome") >= 3000) & (F.col("monthlyincome") < 7000), "media")
     .otherwise("alta")
)

# Faixa etária com base em age
df_silver = df_silver.withColumn(
    "faixa_etaria",
    F.when(F.col("age") < 30, "jovem")
     .when((F.col("age") >= 30) & (F.col("age") < 50), "adulto")
     .otherwise("senior")
)

df_silver.select("age", "faixa_etaria", "monthlyincome", "faixa_salarial").show(10)


+---+------------+-------------+--------------+
|age|faixa_etaria|monthlyincome|faixa_salarial|
+---+------------+-------------+--------------+
| 41|      adulto|         5993|         media|
| 49|      adulto|         5130|         media|
| 37|      adulto|         2090|         baixa|
| 33|      adulto|         2909|         baixa|
| 27|       jovem|         3468|         media|
| 32|      adulto|         3068|         media|
| 59|      senior|         2670|         baixa|
| 30|      adulto|         2693|         baixa|
| 38|      adulto|         9526|          alta|
| 36|      adulto|         5237|         media|
+---+------------+-------------+--------------+
only showing top 10 rows



### Etapa 13: Salvando a tabela Silver em formato Delta

Nesta etapa salvo o DataFrame `df_silver`, já transformado, limpo e enriquecido,
na camada Silver utilizando o formato Delta Lake.  
Essa operação cria uma versão persistida dos dados intermediários, que será utilizada
posteriormente nas análises da camada Gold.

**Resultado esperado:**  
A tabela Silver é gravada com sucesso no diretório  
`/home/jovyan/data/silver/delta_ibm_hr_silver`, ficando disponível para leitura
nas próximas etapas do projeto.


In [21]:
path_silver = "/home/jovyan/data/silver/delta_ibm_hr_silver"

df_silver.write.format("delta").mode("overwrite").save(path_silver)


### Etapa 14: Validação da tabela Silver gravada em Delta

Nesta etapa realizo a leitura da tabela Silver recém-salva em formato Delta para
validar se o processo de escrita ocorreu corretamente.  
Em seguida, mostro as 5 primeiras linhas e o esquema completo da tabela.

**Resultado esperado:**  
Confirmar que a tabela Silver foi salva e pode ser lida sem erros, mantendo as
colunas e tipos de dados esperados para uso na camada Gold.


In [22]:
df_silver_check = spark.read.format("delta").load(path_silver)

df_silver_check.show(5)
df_silver_check.printSchema()


+---+---------+-----------------+---------+--------------------+----------------+---------+--------------+-------------+--------------+-----------------------+------+----------+--------------+--------+--------------------+---------------+-------------+-------------+-----------+------------------+------+--------+-----------------+-----------------+------------------------+-------------+----------------+-----------------+---------------------+---------------+--------------+------------------+-----------------------+--------------------+--------------+------------+
|age|Attrition|   BusinessTravel|dailyrate|          Department|distancefromhome|Education|EducationField|employeecount|employeenumber|EnvironmentSatisfaction|Gender|hourlyrate|JobInvolvement|JobLevel|             JobRole|JobSatisfaction|MaritalStatus|monthlyincome|MonthlyRate|numcompaniesworked|Over18|OverTime|percentsalaryhike|performancerating|RelationshipSatisfaction|standardhours|stockoptionlevel|totalworkingyears|training