In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df_bronze = spark.read.format("delta").load(
    "/Volumes/workspace/default/bronze/corte_agua_delta/")

In [0]:
print("Schema original Bronze:")
df_bronze.printSchema()
display(df_bronze.limit(20))

In [0]:
df_silver = df_bronze

#converter colunas que pareçam datas
possiveis_datas = [c for c in df_silver.columns if "data" in c or "dt" in c]

for c in possiveis_datas:
    df_silver = df_silver.withColumn(
        c,
        to_date(col(c), "dd/MM/yyyy")
    )

possiveis_valores = [c for c in df_silver.columns if "valor" in c or "debito" in c]

for c in possiveis_valores:
    df_silver = df_silver.withColumn(
        c,
        regexp_replace(col(c), ",", ".").cast("double")
    )

possiveis_numeros = [c for c in df_silver.columns if "qtd" in c or "dias" in c]

for c in possiveis_numeros:
    df_silver = df_silver.withColumn(
        c,
        col(c).cast("int")
    )

In [0]:

colunas_texto = [
    c for c, t in df_silver.dtypes
    if t == "string" and not c.startswith("data")
]

for c in colunas_texto:
    df_silver = df_silver.withColumn(
        c,
        upper(trim(col(c))))

In [0]:
#remover duplicados
df_silver = df_silver.dropDuplicates()

In [0]:
colunas_obrigatorias = []


for c in df_silver.columns:
    if "matricula" in c or "cliente" in c:
        colunas_obrigatorias.append(c)
    if "data" in c and "corte" in c:
        colunas_obrigatorias.append(c)

df_silver = df_silver.na.drop(subset=colunas_obrigatorias)

print("Colunas obrigatórias detectadas:", colunas_obrigatorias)

In [0]:

for c in df_silver.columns:
    if "data" in c:
        df_silver = df_silver.withColumn("ano_corte", year(col(c)))
        df_silver = df_silver.withColumn("mes_corte", month(col(c)))
        break

if "status" in df_silver.columns:
    df_silver = df_silver.withColumn(
        "status_corte",
        when(col("status").like("%EFET%"), "EFETUADO")
        .when(col("status").like("%PEND%"), "PENDENTE")
        .otherwise("OUTRO")
    )

In [0]:
from pyspark.sql import functions as F

df_silver = (
    df_silver
    .withColumn(
        "bairro",
        F.trim(F.split(F.col("localidade"), "-").getItem(1))
    )
)


In [0]:
from pyspark.sql.functions import (
    dayofmonth, dayofweek, weekofyear, date_format,
    concat_ws, quarter, lit, col
)

df_silver = df_silver \
    .withColumn("dia_corte", dayofmonth(col("data_de_corte"))) \
    .withColumn("dia_da_semana", dayofweek(col("data_de_corte"))) \
    .withColumn("semana_do_ano", weekofyear(col("data_de_corte"))) \
    .withColumn("mes_nome", date_format(col("data_de_corte"), "MMMM")) \
    .withColumn("trimestre", quarter(col("data_de_corte"))) \
    .withColumn("ano_mes", concat_ws("-", col("ano_corte"), col("mes_corte"))) \
    .withColumn("data_formatada", date_format(col("data_de_corte"), "dd/MM/yyyy"))


In [0]:
from pyspark.sql.functions import when, col

df = df.withColumn(
    "dia_semana_nome",
    when(col("dia_da_semana") == 1, "Domingo")
    .when(col("dia_da_semana") == 2, "Segunda-feira")
    .when(col("dia_da_semana") == 3, "Terça-feira")
    .when(col("dia_da_semana") == 4, "Quarta-feira")
    .when(col("dia_da_semana") == 5, "Quinta-feira")
    .when(col("dia_da_semana") == 6, "Sexta-feira")
    .when(col("dia_da_semana") == 7, "Sábado")
)



In [0]:
#preview final
print("Schema final Silver:")
df_silver.printSchema()
display(df_silver.limit(30))

In [0]:
df_silver.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .save("/Volumes/workspace/default/silver/corte_agua/")

print("SILVER criado com sucesso!")

In [0]:
df = spark.read.format("delta").load("/Volumes/workspace/default/silver/corte_agua/")
df.printSchema()
