# Data Transformation and Aggregation by Identifier "Ano e Estado"

## Environment Configuration

In [0]:
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructField, StructType
from src.utils.udfs import functions_for_df_structure_management as ffdsm

## Data Ingestion from Bronze Layer

In [0]:
df_state_declarants_resindece = spark.table("brazilian_tax_big_numbers.bronze_layer.delta_estado_de_residencia_do_declarante")

In [0]:
df_state_capital_declarants_resindece = spark.table("brazilian_tax_big_numbers.bronze_layer.delta_capital_de_estado_de_residencia_do_declarante")

In [0]:
display(df_state_capital_declarants_resindece)

## Data Transformation

### "Estado de Residência do Declarante"

In [0]:
df_state_declarants_resindece = df_state_declarants_resindece.dropna(how='all')

In [0]:
df_state_declarants_resindece = df_state_declarants_resindece.dropna(subset=["AnoCalendario", "Estado"])

In [0]:
df_casted_state_declarants_resindece = ffdsm.cast_columns_to_float(df_state_declarants_resindece, ["AnoCalendario", "Estado"])

In [0]:
df_casted_state_declarants_resindece = ffdsm.rename_columns_with_df_name(df_casted_state_declarants_resindece, "EstadoDeResidenciaDoDeclarante", ["AnoCalendario", "Estado"])

In [0]:
dbutils.data.summarize(df_casted_state_declarants_resindece)

In [0]:
df_filled_state_declarants_resindece = ffdsm.fill_nulls(df_casted_state_declarants_resindece, ["AnoCalendario", "Estado"])

### "Capital de Estado de Residência do Declarante"

In [0]:
df_state_capital_declarants_resindece = df_state_capital_declarants_resindece.dropna(how='all')

In [0]:
df_state_capital_declarants_resindece = df_state_capital_declarants_resindece.dropna(subset=["AnoCalendario", "CapitalEstado"])

In [0]:
df_casted_state_capital_declarants_resindece = ffdsm.cast_columns_to_float(df_state_capital_declarants_resindece, ["AnoCalendario", "CapitalEstado"])

In [0]:
df_casted_state_capital_declarants_resindece = ffdsm.rename_columns_with_df_name(df_casted_state_capital_declarants_resindece, "CapitalDeEstadoDeResidenciaDoDeclarante", ["AnoCalendario", "CapitalEstado"])

In [0]:
dbutils.data.summarize(df_casted_state_capital_declarants_resindece)

In [0]:
df_filled_state_capital_declarants_resindece = ffdsm.fill_nulls(df_casted_state_capital_declarants_resindece, ["AnoCalendario", "CapitalEstado"])

In [0]:
brazilian_states = {
    "Acre": "AC",
    "Alagoas": "AL",
    "Amapá": "AP",
    "Amazonas": "AM",
    "Bahia": "BA",
    "Ceará": "CE",
    "Distrito Federal": "DF",
    "Espírito Santo": "ES",
    "Goiás": "GO",
    "Maranhão": "MA",
    "Mato Grosso": "MT",
    "Mato Grosso do Sul": "MS",
    "Minas Gerais": "MG",
    "Pará": "PA",
    "Paraíba": "PB",
    "Paraná": "PR",
    "Pernambuco": "PE",
    "Piauí": "PI",
    "Rio de Janeiro": "RJ",
    "Rio Grande do Norte": "RN",
    "Rio Grande do Sul": "RS",
    "Rondônia": "RO",
    "Roraima": "RR",
    "Santa Catarina": "SC",
    "São Paulo": "SP",
    "Sergipe": "SE",
    "Tocantins": "TO"
}

In [0]:
def get_full_state_name(capital_state):
    full_state = "Não se Aplica"
    for state, abbreviation in brazilian_states.items():
        if abbreviation in capital_state:
            full_state = state
            break
    
    return full_state

get_full_state_name_udf = udf(get_full_state_name, StringType())

In [0]:
df_filled_state_capital_declarants_resindece = df_filled_state_capital_declarants_resindece.withColumn(
    "Estado", get_full_state_name_udf(df_filled_state_capital_declarants_resindece["CapitalEstado"])
)

In [0]:
states_capitals = {
    "Rio Branco": "Acre",
    "Maceió": "Alagoas",
    "Macapá": "Amapá",
    "Manaus": "Amazonas",
    "Salvador": "Bahia",
    "Fortaleza": "Ceará",
    "Brasília": "Distrito Federal",
    "Vitória": "Espírito Santo",
    "Goiânia": "Goiás",
    "São Luís": "Maranhão",
    "Cuiabá": "Mato Grosso",
    "Campo Grande": "Mato Grosso do Sul",
    "Belo Horizonte": "Minas Gerais",
    "Belém": "Pará",
    "João Pessoa": "Paraíba",
    "Curitiba": "Paraná",
    "Recife": "Pernambuco",
    "Teresina": "Piauí",
    "Rio de Janeiro": "Rio de Janeiro",
    "Natal": "Rio Grande do Norte",
    "Porto Alegre": "Rio Grande do Sul",
    "Porto Velho": "Rondônia",
    "Boa Vista": "Roraima",
    "Florianópolis": "Santa Catarina",
    "São Paulo": "São Paulo",
    "Aracaju": "Sergipe",
    "Palmas": "Tocantins"
}

In [0]:
def get_capital_state_name(state):
    capital_state = "Não se Aplica"
    for capital_state_name, state_name in states_capitals.items():
        if state_name == state:
            capital_state = capital_state_name
            break
    return capital_state

get_capital_state_name_udf = udf(get_capital_state_name, StringType())

In [0]:
def get_state_abbreviation(state_name):
    return brazilian_states.get(state_name, "Não se Aplica")

get_state_abbreviation_udf = udf(get_state_abbreviation, StringType())

## Aggregation By "Ano e Estado"

In [0]:
df_silver_state_and_capital_of_declarants_residence = df_filled_state_declarants_resindece.join(df_filled_state_capital_declarants_resindece, on=["AnoCalendario", "Estado"], how="full_outer")

In [0]:
df_silver_state_and_capital_of_declarants_residence = ffdsm.fill_nulls(df_silver_state_and_capital_of_declarants_residence, ["AnoCalendario", "Estado", "CapitalEstado", "EstadoSigla"])

In [0]:
df_silver_state_and_capital_of_declarants_residence = df_silver_state_and_capital_of_declarants_residence.withColumn(
    "CapitalEstado", get_capital_state_name_udf(df_silver_state_and_capital_of_declarants_residence["Estado"])
)

In [0]:
df_silver_state_and_capital_of_declarants_residence = df_silver_state_and_capital_of_declarants_residence.withColumn(
    "EstadoSigla", get_state_abbreviation_udf(df_silver_state_and_capital_of_declarants_residence["Estado"])
)

In [0]:
display(df_silver_state_and_capital_of_declarants_residence)

In [0]:
df_nulls = ffdsm.count_nulls(df_silver_state_and_capital_of_declarants_residence)
display(df_nulls)

## Save as Delta in Silver Layer

In [0]:
spark.sql(f"CREATE SCHEMA IF NOT EXISTS brazilian_tax_big_numbers.silver_layer")

In [0]:
error = None

try:
    df_silver_state_and_capital_of_declarants_residence.write \
        .mode("overwrite") \
        .saveAsTable(f"brazilian_tax_big_numbers.silver_layer.delta_estados_e_capitais")
    error = None
except Exception as e:
    error = str(e)
    print(error)