In [0]:
# Imports
from pyspark.sql.functions import explode, col, current_timestamp
from pyspark.sql.types import IntegerType
from utils.transform import explode_struct

In [0]:
#Variaveis do Unity Catalog
CATALOGO = "agroclima"
SCHEMA = "silver"
TABELA = "producao_milho"

# Montar caminho da tabela
caminho_tabela = f"{CATALOGO}.{SCHEMA}.{TABELA}"

In [0]:
#Carrega tabela de produção da camda bronze
df_producao_milho = spark.table("agroclima.bronze.producao_milho")

#### Transformação

In [0]:
# explode a coluna resultados
df_transformado = df_producao_milho.select(
    "*", 
    explode("resultados").alias("resultado")
)

In [0]:
# explode a coluna series
df_transformado = df_transformado.select(
    "*",
    explode("resultado.series").alias("item")
)

In [0]:
# Extrai as colunas da hierarquia de localidade
df_transformado = df_transformado.select(
    "*",
    col("item.localidade.id").alias("municipio_id"),
    col("item.localidade.nome").alias("municipio_nome")
)

In [0]:
# Pega nome das colunas para a despivotagem
colunas_df = df_transformado.columns

In [0]:
# Extrai as colunas da hierarquia de serie 
df_transformado = df_transformado.select(
    "*",
    col("item.serie.*")
)

In [0]:
#Busca Lista de Colunas 
colunas_pivotagem = df_transformado.columns

#Lista de Colunas que serão transformadas em Linhas
colunas_despivotagem  = list(set(colunas_pivotagem) - set(colunas_df))

In [0]:
#Obs: Impelemntação poderia ser feita com a função adaptada explode_struct

# Despivotagem no Dataframe Transformando as colunas com os anos em Linhas
df_producao_milho = df_transformado.unpivot(colunas_df, colunas_despivotagem, "ano_chave", "valor" )

#display(df_transformado)

In [0]:
# Explode a coluna de categoria pois e um lista
df_transformado = df_transformado = df_producao_milho.select(
    "*",
    explode(col("resultado.classificacoes.categoria")).alias("categoria")
)

In [0]:

#Chama função personalizada que explode a categoria em varias Linhas
df_transformado = explode_struct(df_transformado, "categoria", "safra_id", "safra")

#display(df_transformado)

#### Limpeza

In [0]:
#Verifica os nomes das colunas do dataframe
df_transformado.columns

In [0]:
#lista de colunas que serão removidas do dataframe
colunas_a_remover = ["resultados", "ano", "_data_ingestao", "resultado", "item", "categoria"]

#Remove colunas da Lista colunas_a_remover
df_limpeza = df_transformado.drop(*colunas_a_remover)

#display(df_limpeza)

In [0]:
#Renomeia Colunas 
df_limpeza  = df_limpeza.withColumnRenamed("id", "tipo_indicador_id") \
                            .withColumnRenamed("variavel", "tipo_indicador") \
                            .withColumnRenamed("ano_chave", "ano")

In [0]:
# Verifica  Percetual de valores Nulos
#df_limpeza.toPandas().isnull().sum() / df_limpeza.count()

In [0]:
# Transforma a coluna de valor em inteiro, valores não convertido será null
df_limpeza = df_limpeza.withColumn("valor", col("valor").try_cast("int"))

In [0]:
# Remove Valores Nulos
df_limpeza = df_limpeza.filter(
    (col("safra").isNotNull()) & col("valor").isNotNull()
)

In [0]:
# Altera  tipos dos Identificadores
df_limpeza = df_limpeza.withColumn("tipo_indicador_id", col("tipo_indicador_id").cast("int")) \
                       .withColumn("municipio_id", col("municipio_id").cast("int")) \
                       .withColumn("ano", col("ano").cast("int")) \
                       .withColumn("safra_id", col("safra_id").cast("int"))
    

#display(df_limpeza)

In [0]:
#Verifica se há linhas duplicadas
#print(df_limpeza.count())
#print(df_limpeza.distinct().count())

#### Carga

In [0]:
#Adiciona Data e horario de Ingestão dos dados
df_carga = df_limpeza.withColumn("_data_ingestao", current_timestamp())

In [0]:
#Salva Tabela Limpa e Transformada  na camada Silver
df_carga.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("uf", "tipo_indicador", "ano") \
        .option("overWriteSchema", "true") \
        .saveAsTable(caminho_tabela)