In [16]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F


#### Continuar finalizando os clientes

In [17]:
# Criar a sessão Spark 
spark = SparkSession.builder.appName("ClientesFinalETL").getOrCreate()


In [18]:
# Ler o arquivo transformado
# Informa o caminho do arquivo
caminho_clientes = "C:\\Users\\Theuzao\\Desktop\\panvel_datalake\\data\\transformed\\clientes"
clientes = spark.read.csv(caminho_clientes, header=True, inferSchema=True)
#Imprime o Schema
clientes.printSchema()

root
 |-- v_id_cli: string (nullable = true)
 |-- d_dt_nasc: date (nullable = true)
 |-- v_sx_cli: string (nullable = true)
 |-- n_est_cvl: integer (nullable = true)
 |-- idade: integer (nullable = true)



In [19]:
# Converter todas as colunas para String após a leitura
clientes = clientes.select([F.col(col_name).cast("string").alias(col_name) for col_name in clientes.columns])


In [20]:
clientes.show()

+--------------------+----------+--------+---------+-----+
|            v_id_cli| d_dt_nasc|v_sx_cli|n_est_cvl|idade|
+--------------------+----------+--------+---------+-----+
|0000009DB36F622B7639|2003-11-14|       F|        0|   20|
|000000F51C15031D708E|2008-03-23|       F|        0|   16|
|00000F54BE2BBF0E7B13|1986-02-24|       F|        0|   38|
|000013E1FB44D9A9E50F|1984-12-13|       M|        0|   39|
|00001522AD94645C7688|1992-11-10|       F|        2|   31|
|0000161185110EE1BBE5|1975-12-27|       F|        0|   48|
|0000269BBD1888477D56|1961-10-25|       M|        0|   63|
|0000289F7EF0375B679D|1983-11-05|       F|        1|   40|
|00002A10FB7B5E5CB107|1991-08-19|       M|        0|   33|
|00002F84FF8B063A952C|1975-06-13|       F|        0|   49|
|000033354ABAAD3F2FA8|1964-01-01|       F|        1|   60|
|00003490C16D52FEB4C2|2004-03-20|       F|        0|   20|
|000040FC122DB8AEEF51|1982-04-06|       F|        0|   42|
|000042FA835BDA98ACCE|1965-05-03|       M|        2|   5

In [21]:
# Ler o arquivo transformado
# Informa o caminho do arquivo
caminho_enderecos_clientes = "C:\\Users\\Theuzao\\Desktop\\panvel_datalake\\data\\transformed\\endereco_clientes"
endereco_clientes = spark.read.csv(caminho_enderecos_clientes, header=True, inferSchema=True)
# Imprime o Schema
endereco_clientes.printSchema()

root
 |-- v_id_cli: string (nullable = true)
 |-- n_sq_end: integer (nullable = true)
 |-- d_dt_exc: timestamp (nullable = true)
 |-- v_lcl: string (nullable = true)
 |-- v_uf: string (nullable = true)



In [22]:
# Ler o arquivo transformado
# Informa o caminho do arquivo
caminho_clientes_opt = "C:\\Users\\Theuzao\\Desktop\\panvel_datalake\\data\\transformed\\clientes_opt"
clientes_opt = spark.read.csv(caminho_clientes_opt, header=True, inferSchema=True)
# Imprime o Schema
clientes_opt.printSchema()

root
 |-- b_call: string (nullable = true)
 |-- b_email: string (nullable = true)
 |-- b_push: string (nullable = true)
 |-- b_sms: string (nullable = true)
 |-- v_id_cli: string (nullable = true)



In [23]:
# Carregar os DataFrames 'clientes_com_idade', 'endereco_clientes', e 'clientes_opt'
df_clientes = clientes.alias("clientes")
df_enderecos = endereco_clientes.alias("enderecos")
df_clientes_opt = clientes_opt.alias("opt")

In [24]:
# Filtrar apenas endereços ativos (sem data de exclusão)
df_enderecos_ativos = df_enderecos.filter(F.col("d_dt_exc").isNull())


In [25]:
# Realizar o join entre clientes e endereços, usando "v_id_cli" como chave
df_clientes_com_endereco = df_clientes.join(
    df_enderecos_ativos,
    df_clientes["v_id_cli"] == df_enderecos_ativos["v_id_cli"],
    how="left"
).select(
    df_clientes["v_id_cli"].alias("codigo_cliente"),
    df_clientes["d_dt_nasc"].alias("data_nascimento"),
    df_clientes["idade"],
    df_clientes["v_sx_cli"].alias("sexo"),
    df_enderecos_ativos["v_uf"].alias("uf"),
    df_enderecos_ativos["v_lcl"].alias("cidade"),
    df_clientes["n_est_cvl"].alias("estado_civil")
)


In [26]:
# Tratamento dos valores nulos para os campos de localização (cidade e UF)
df_clientes_com_endereco = df_clientes_com_endereco.fillna({
    "uf": "Desconhecido",
    "cidade": "Desconhecido"
})


In [27]:
# Realizar o join entre `df_clientes_com_endereco` e `clientes_opt` para adicionar as informações de consentimento
# Usa a função coalesce para tratar valores nulos nas colunas de df_clientes_opt.
# Se o valor da coluna "b_call" for nulo, será substituído pela string "Desconhecido".
# A nova coluna será renomeada para "flag_lgpd_call".
df_clientes_final = df_clientes_com_endereco.join(
    df_clientes_opt,
    df_clientes_com_endereco["codigo_cliente"] == df_clientes_opt["v_id_cli"],
    how="left"
).select(
    df_clientes_com_endereco["*"],
    F.coalesce(df_clientes_opt["b_call"], F.lit("Desconhecido")).alias("flag_lgpd_call"),
    F.coalesce(df_clientes_opt["b_sms"], F.lit("Desconhecido")).alias("flag_lgpd_sms"),
    F.coalesce(df_clientes_opt["b_email"], F.lit("Desconhecido")).alias("flag_lgpd_email"),
    F.coalesce(df_clientes_opt["b_push"], F.lit("Desconhecido")).alias("flag_lgpd_push")
)




In [28]:
# Mostrar o DataFrame resultante
df_clientes_final.show(truncate=False)

+--------------------+---------------+-----+----+------------+-------------+------------+--------------+-------------+---------------+--------------+
|codigo_cliente      |data_nascimento|idade|sexo|uf          |cidade       |estado_civil|flag_lgpd_call|flag_lgpd_sms|flag_lgpd_email|flag_lgpd_push|
+--------------------+---------------+-----+----+------------+-------------+------------+--------------+-------------+---------------+--------------+
|0000009DB36F622B7639|2003-11-14     |20   |F   |RS          |PORTO ALEGRE |0           |Desconhecido  |Desconhecido |Desconhecido   |Desconhecido  |
|000000F51C15031D708E|2008-03-23     |16   |F   |SC          |LAGES        |0           |Desconhecido  |Desconhecido |Desconhecido   |Desconhecido  |
|00000F54BE2BBF0E7B13|1986-02-24     |38   |F   |Desconhecido|Desconhecido |0           |Desconhecido  |Desconhecido |Desconhecido   |Desconhecido  |
|000013E1FB44D9A9E50F|1984-12-13     |39   |M   |Desconhecido|Desconhecido |0           |Desconhecid

In [29]:
# Salvar o DataFrame em formato Parquet
df_clientes_final.write.mode("overwrite").parquet("C:/Users/Theuzao/Desktop/panvel_datalake/data/processed/clientes_parquet")


In [30]:
# Encerra a sessão Spark
spark.stop()