# 1 - Importing libraries

In [1]:
from pyspark.sql import DataFrame
from pyspark.sql.types import *
from pyspark.sql.types import NumericType, StructType, Row
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import numpy as np
import basedosdados as bd
import pandas as pd
import os

## 1.1 - Setup ETL logging

In [2]:
from loggings.logging_setup import LoggerSetup

In [3]:
etl_logger = LoggerSetup.get_etl_logger()

# 2 - Initializing Spark Session

In [4]:
from utils.init_spark_session import SparkSessionManager

SPARK_CONFIG: dict = {
    "spark.executor.memory": "4g",
    "spark.executor.cores": "2",
    "spark.driver.extraClassPath": r"C:\mysql-connector-j-8.4.0.jar",
    "spark.executor.extraClassPath": r"C:\mysql-connector-j-8.4.0.jar",
    "spark.driver.memory": "4g",
    "spark.sql.shuffle.partitions": "200",
    "spark.default.parallelism": "100"
}

spark = SparkSessionManager.get_spark_session(
    app_name="ETL Process", config=SPARK_CONFIG
    )

# 3 - Extracting

## 3.1 - Scraping file

In [5]:
from scraping_file.arrecadacao_estado_scrap_file import FileHandlerArrecadacaoEstado

# Define the path to the download directory
download_dir = r"C:\RevenueByState\data"

# Create an instance of the FileHandlerArrecadacaoEstado class
scrap = FileHandlerArrecadacaoEstado(download_dir)

# Check if the download directory exists; create it if not
if not os.path.exists(download_dir):
    os.makedirs(download_dir)

# List files and directories in the base directory
paths = os.listdir(r"C:\RevenueByState")
# Extract the name of the download directory
name_path = download_dir[download_dir.rfind('\\') + 1:]

# Check if the file already exists in the directory
if name_path not in paths or "arrecadacao-estado.csv" not in os.listdir(download_dir):
    # Download the file if it does not exist
    scrap.download_file()
    etl_logger.info("File downloaded with success")
else:
    etl_logger.warning("File already exists")



## 3.2 - Getting another resources (population by states) from https://basedosdados.org/

In [6]:
def get_dataframe_from_basedosdados(dataset_id: str, table_id: str, billing_project_id: str) -> pd.DataFrame:
    """
    Read dataframe from basedosdados

    Parameters
    ----------
    dataset_id : str
        The dataset id where the table is in
    table_id : str
        To identify our table
    billing_project_id: str
        Our billing project 
        
    Returns
    -------
    pd.DataFrame
        A pandas DataFrame with our resource from BigQuery
    """

    return bd.read_table(
        dataset_id=dataset_id,
        table_id=table_id,
        billing_project_id=billing_project_id
    )

In [7]:
population_states_df: pd.DataFrame = get_dataframe_from_basedosdados(
    dataset_id="br_ibge_populacao", table_id="uf", billing_project_id="revenue-etl"
    )

Downloading: 100%|██████████| 837/837 [00:00<00:00, 2314.88rows/s]


In [8]:
inflation_df: pd.DataFrame = get_dataframe_from_basedosdados(
    dataset_id="br_ibge_inpc", table_id="mes_brasil", billing_project_id="revenue-etl"
) 

Downloading: 100%|██████████| 540/540 [00:00<00:00, 1489.94rows/s]


In [9]:
# changing np.nan to None (for Spark) 
population_states_df: pd.DataFrame  = population_states_df.replace([np.nan], [None])
inflation_df: pd.DataFrame  = inflation_df.replace([np.nan], [None])

In [10]:
population_states_df

Unnamed: 0,sigla_uf,ano,populacao,populacao_economicamente_ativa
0,AC,1991,417102,
1,AL,1991,2512658,
2,AM,1991,2102766,
3,AP,1991,289035,
4,BA,1991,11867336,
...,...,...,...,...
832,RS,2021,11466630,9533
833,SC,2021,7338473,5947
834,SE,2021,2338474,1873
835,SP,2021,46649132,38573


In [11]:
inflation_df

Unnamed: 0,ano,mes,indice,variacao_mensal,variacao_trimestral,variacao_semestral,variacao_anual,variacao_doze_meses
0,1979,3,0.0,,,,,
1,1979,4,0.0,3.45,,,,
2,1979,5,0.0,1.76,,,,
3,1979,6,0.0,3.0,8.43,,,
4,1979,7,0.0,5.36,10.43,,,
...,...,...,...,...,...,...,...,...
535,2023,10,6909.79,0.12,0.43,0.6,3.04,4.14
536,2023,11,6916.7,0.1,0.33,0.34,3.14,3.85
537,2023,11,6916.7,0.1,0.33,0.34,3.14,3.85
538,2023,12,6954.74,0.55,0.77,0.99,3.71,3.71


## 3.3 - Reading resources

### 3.3.1 - Listing file paths

In [12]:
# Path to files
data = {
    "arrecadacao-estado": r"C:\RevenueByState\data\\arrecadacao-estado.csv"
}

### 3.3.2 - Defining Schemas

In [13]:
# Defining the schema (translating columns from Portuguese to English)
collection_state_schema = StructType([
    StructField("year", IntegerType(), nullable=True),
    StructField("month", StringType(), nullable=True),
    StructField("state", StringType(), nullable=True),
    StructField("import_tax", DoubleType(), nullable=True),
    StructField("export_tax", DoubleType(), nullable=True),
    StructField("ipi_tobacco", DoubleType(), nullable=True),
    StructField("ipi_beverages", DoubleType(), nullable=True),
    StructField("ipi_auto", DoubleType(), nullable=True),
    StructField("ipi_linked_imports", DoubleType(), nullable=True),
    StructField("ipi_others", DoubleType(), nullable=True),
    StructField("income_tax_individual", DoubleType(), nullable=True),
    StructField("income_tax_financial", DoubleType(), nullable=True),
    StructField("income_tax_other", DoubleType(), nullable=True),
    StructField("withholding_tax_employment", DoubleType(), nullable=True),
    StructField("withholding_tax_capital", DoubleType(), nullable=True),
    StructField("withholding_tax_remittances", DoubleType(), nullable=True),
    StructField("withholding_tax_other", DoubleType(), nullable=True),
    StructField("tax_financial_operations", DoubleType(), nullable=True),
    StructField("rural_land_tax", DoubleType(), nullable=True),
    StructField("provisional_tax_transactions", DoubleType(), nullable=True),
    StructField("provisional_contribution_transactions", DoubleType(), nullable=True),
    StructField("cofins", DoubleType(), nullable=True),
    StructField("cofins_financial", DoubleType(), nullable=True),
    StructField("cofins_other", DoubleType(), nullable=True),
    StructField("contribution_social_integration", DoubleType(), nullable=True),
    StructField("contribution_social_integration_financial", DoubleType(), nullable=True),
    StructField("contribution_social_integration_other", DoubleType(), nullable=True),
    StructField("social_contribution_net_profit", DoubleType(), nullable=True),
    StructField("social_contribution_net_profit_financial", DoubleType(), nullable=True),
    StructField("social_contribution_net_profit_other", DoubleType(), nullable=True),
    StructField("intervention_economic_domain_non_deductible_fuels", DoubleType(), nullable=True),
    StructField("intervention_economic_domain_fuels", DoubleType(), nullable=True),
    StructField("contribution_security_plan_public_servants", DoubleType(), nullable=True),
    StructField("contributions_security_plan_public_servants", DoubleType(), nullable=True),
    StructField("contributions_special_fund_inspection_activities", DoubleType(), nullable=True),
    StructField("fiscal_recovery_program", DoubleType(), nullable=True),
    StructField("installment_payment_program", DoubleType(), nullable=True),
    StructField("withholding_tax_law_article_30", DoubleType(), nullable=True),
    StructField("unified_payment", DoubleType(), nullable=True),
    StructField("other_administered_revenues", DoubleType(), nullable=True),
    StructField("other_revenues", DoubleType(), nullable=True),
    StructField("social_security_revenue", DoubleType(), nullable=True),
    StructField("social_security_revenue_individual_contributors", DoubleType(), nullable=True),
    StructField("social_security_revenue_other_sources", DoubleType(), nullable=True),
    StructField("administered_by_other_agencies", DoubleType(), nullable=True)
])

In [14]:
population_states_schema = StructType([
    StructField(name="state_uf", dataType=StringType(), nullable=True),
    StructField(name="year", dataType=IntegerType(), nullable=True),
    StructField(name="population_state", dataType=IntegerType(), nullable=True),
    StructField(name="economically_active_population", dataType=IntegerType(), nullable=True)
])

In [15]:
inflation_schema = StructType([
    StructField("year", IntegerType(), nullable=True),
    StructField("month", IntegerType(), nullable=True),
    StructField("index", FloatType(), nullable=True),
    StructField("monthly_variation", FloatType(), nullable=True),
    StructField("quarterly_variation", FloatType(), nullable=True),
    StructField("semiannual_variation", FloatType(), nullable=True),
    StructField("annual_variation", FloatType(), nullable=True),
    StructField("twelve_months_variation", FloatType(), nullable=True)
])

### 3.3.3 - Reading all files

In [16]:
def read_csv_file(file_path: str, schema: StructType = None, delimiter: str = ",", header: bool = True, infer_schema: bool = False, options: dict = None) -> DataFrame:
    """
    Reads a CSV file into a Spark DataFrame with various configurable options.

    Parameters
    ----------
    file_path :str
        Path to the CSV file.
    schema : StructType (optional)
        Schema to apply to the DataFrame. If not provided, schema can be inferred.
    delimiter : str, (optional)
        Delimiter used in the CSV file (default is ',').
    header : bool (optional)
        Whether the first row contains headers (default is True).
    infer_schema : bool (optional)
        Whether to infer the schema from the data (default is False).
    options : dict (optional)
        Additional options for the CSV reader.

    Returns
    ----------
        DataFrame
            The loaded DataFrame.
    """
    try:
        reader = spark.read.format("csv")
        
        reader = reader.option("delimiter", delimiter).option("header", str(header).lower()).option("inferSchema", str(infer_schema).lower())
        
        if options:
            for key, value in options.items():
                reader = reader.option(key, value)
        
        if schema:
            reader = reader.schema(schema)

        df = reader.load(file_path)
        
        return df
    
    except Exception as e:
        etl_logger.error(f"Error occurred while reading CSV file '{file_path}': {e}")
        return None

In [17]:
collection_state_df = read_csv_file(
    file_path= data.get("arrecadacao-estado"),
    schema=collection_state_schema,
    delimiter=";",
    infer_schema=True
    )

In [18]:
collection_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

In [19]:
def create_spark_dataframe_from_pandas(df: pd.DataFrame, schema: StructType = None) -> DataFrame:
    """
    Create a Spark DataFrame from a Pandas DataFrame.

    Parameters
    ----------
    df: pd.DataFrame
        The Pandas DataFrame that will be converted to a Spark DataFrame.
    schema: StructType, optional
        The schema to apply to the Spark DataFrame. If not provided, the schema will be inferred.
        
    Returns
    -------
    spark.DataFrame
        A Spark DataFrame created from the Pandas DataFrame.
    """
    try:
        return spark.createDataFrame(data=df, schema=schema) if schema else spark.createDataFrame(data=df) 
    except Exception as e:
        etl_logger.error(f"Error occurred while loading DataFrame: {e}")
        return None

In [20]:
population_states_df = create_spark_dataframe_from_pandas(df=population_states_df, schema=population_states_schema)
inflation_df = create_spark_dataframe_from_pandas(df=inflation_df, schema=inflation_schema)

In [21]:
collection_state_df.printSchema()
population_states_df.printSchema()
inflation_df.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- state: string (nullable = true)
 |-- import_tax: double (nullable = true)
 |-- export_tax: double (nullable = true)
 |-- ipi_tobacco: double (nullable = true)
 |-- ipi_beverages: double (nullable = true)
 |-- ipi_auto: double (nullable = true)
 |-- ipi_linked_imports: double (nullable = true)
 |-- ipi_others: double (nullable = true)
 |-- income_tax_individual: double (nullable = true)
 |-- income_tax_financial: double (nullable = true)
 |-- income_tax_other: double (nullable = true)
 |-- withholding_tax_employment: double (nullable = true)
 |-- withholding_tax_capital: double (nullable = true)
 |-- withholding_tax_remittances: double (nullable = true)
 |-- withholding_tax_other: double (nullable = true)
 |-- tax_financial_operations: double (nullable = true)
 |-- rural_land_tax: double (nullable = true)
 |-- provisional_tax_transactions: double (nullable = true)
 |-- provisional_contribution_transactio

In [22]:
collection_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

In [23]:
# Caching all dataframes
collection_state_df = collection_state_df.cache()
population_states_df = population_states_df.cache()
inflation_df = inflation_df.cache()

# 4 - Transforming

## 4.1 - Validate and Cleanse Data

### 4.1.1 - Joining tables (population_states and collection_state)

In [24]:
# Alias for the tables
collection_state_df = collection_state_df.alias("cs")
population_states_df = population_states_df.alias("ps")

In [25]:
# Joining collection with population
collection_population_condition = (
    (F.col("ps.year") == F.col("cs.year")) &
    (F.col("ps.state_uf") == F.col("cs.state"))
)
collection_population_state_df: DataFrame = collection_state_df.join(
    other=population_states_df,
    on=collection_population_condition,
    how="left"
).select(
    "cs.*",
    "ps.population_state",
    "ps.economically_active_population"
)

In [26]:
collection_population_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

### 4.1.2 - Solving ASCII problem

In [27]:
# We have a non ASCII character (ç), that is from Portuguese language so we need to translate
special_char = "�"

correct_char = "c"

collection_population_state_df = collection_population_state_df.withColumn("month", F.translate(F.col("month"), special_char, correct_char))

In [28]:
# Changing months names from Portuguese to English and normalizing
collection_population_state_df = collection_population_state_df.withColumn(
    "month",
    F.when(F.col("month") == "Janeiro", "january")
    .when(F.col("month") == "Fevereiro", "february")
    .when(F.col("month") == "Marco", "march")
    .when(F.col("month") == "Abril", "april")
    .when(F.col("month") == "Maio", "may")
    .when(F.col("month") == "Junho", "june")
    .when(F.col("month") == "Julho", "july")
    .when(F.col("month") == "Agosto", "august")
    .when(F.col("month") == "Setembro", "september")
    .when(F.col("month") == "Outubro", "october")
    .when(F.col("month") == "Novembro", "november")
    .when(F.col("month") == "Dezembro", "december")
    .otherwise(F.col("month"))
)

In [29]:
collection_population_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

### 4.1.3 - Checking count grouped by month

In [30]:
months_counts_names = collection_population_state_df.groupBy("month").count()
months_counts_names.show()

+---------+-----+
|    month|count|
+---------+-----+
|  october|  648|
|     NULL|    1|
|      may|  675|
|september|  648|
|   august|  648|
|    april|  675|
| november|  648|
|     july|  648|
|  january|  675|
| february|  675|
|    march|  675|
|     june|  675|
| december|  648|
+---------+-----+



In [31]:
# Let's see this NULL month line
collection_population_state_df.filter(F.col("month").isNull()).show()
collection_population_state_df.count()

+----+-----+-----+----------+----------+-----------+-------------+--------+------------------+----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------------+------

7939

In [32]:
# Dropping lines with year and month with null
collection_population_state_df = collection_population_state_df.dropna(subset=["year", "month"])
collection_population_state_df.count()

7938

### 4.1.4 - Adding month in numeric representation

In [33]:
months = {
    "january": 1,
    "february": 2,
    "march": 3,
    "april": 4,
    "may": 5,
    "june": 6,
    "july": 7,
    "august": 8,
    "september": 9,
    "october": 10,
    "november": 11,
    "december": 12
}

In [34]:
collection_population_state_df = collection_population_state_df.withColumn(
    "month_numeric",
    F.when(F.col("month") == "january", F.lit(months["january"]))
    .when(F.col("month") == "february", F.lit(months["february"]))
    .when(F.col("month") == "march", F.lit(months["march"]))
    .when(F.col("month") == "april", F.lit(months["april"]))
    .when(F.col("month") == "may", F.lit(months["may"]))
    .when(F.col("month") == "june", F.lit(months["june"]))
    .when(F.col("month") == "july", F.lit(months["july"]))
    .when(F.col("month") == "august", F.lit(months["august"]))
    .when(F.col("month") == "september", F.lit(months["september"]))
    .when(F.col("month") == "october", F.lit(months["october"]))
    .when(F.col("month") == "november", F.lit(months["november"]))
    .when(F.col("month") == "december", F.lit(months["december"]))
    .otherwise(None)
)

### 4.1.5 - Checking count grouped by month in numeric representation

In [35]:
numeric_month_count = collection_population_state_df.groupBy("month_numeric").count()
numeric_month_count.show()

+-------------+-----+
|month_numeric|count|
+-------------+-----+
|           12|  648|
|            1|  675|
|            6|  675|
|            3|  675|
|            5|  675|
|            9|  648|
|            4|  675|
|            8|  648|
|            7|  648|
|           10|  648|
|           11|  648|
|            2|  675|
+-------------+-----+



### 4.1.6 - Joining tables (collection_state and inflation_df)

#### 4.1.6.1 - Tables alias

In [36]:
collection_population_state_df = collection_population_state_df.alias("cs")
inflation_df = inflation_df.alias("inf")

### 4.1.6.1 - Joining both tables with alias

In [37]:
collection_inflation_condition = (
    (F.col("cs.year") == F.col("inf.year")) &
    (F.col("cs.month_numeric") == F.col("inf.month"))
)
collection_population_state_df = collection_population_state_df.join(
    other=inflation_df,
    on=collection_inflation_condition,
    how="left"
).select(
    "cs.*",
    "inf.index",
    "inf.monthly_variation",
    "inf.quarterly_variation",
    "inf.semiannual_variation",
    "inf.annual_variation"
)

In [38]:
collection_population_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

### 4.1.7 - Removing duplicates

In [39]:
collection_population_state_df.dropDuplicates()

DataFrame[year: int, month: string, state: string, import_tax: double, export_tax: double, ipi_tobacco: double, ipi_beverages: double, ipi_auto: double, ipi_linked_imports: double, ipi_others: double, income_tax_individual: double, income_tax_financial: double, income_tax_other: double, withholding_tax_employment: double, withholding_tax_capital: double, withholding_tax_remittances: double, withholding_tax_other: double, tax_financial_operations: double, rural_land_tax: double, provisional_tax_transactions: double, provisional_contribution_transactions: double, cofins: double, cofins_financial: double, cofins_other: double, contribution_social_integration: double, contribution_social_integration_financial: double, contribution_social_integration_other: double, social_contribution_net_profit: double, social_contribution_net_profit_financial: double, social_contribution_net_profit_other: double, intervention_economic_domain_non_deductible_fuels: double, intervention_economic_domain_fuels

### 4.1.8 - Handling missing values 

In [40]:
def fill_values_na_with_zero_or_drop(df: DataFrame) -> DataFrame:
    """
    Fill NA values with zero for numeric columns and drop rows with NA values 
    in string columns.

    Parameters
    ----------
    df : DataFrame
        The Spark DataFrame to process.

    Returns
    -------
    DataFrame
        The processed DataFrame with NA values handled according to column types.
    """
    numeric_cols = [col for col, dtype in df.dtypes if isinstance(df.schema[col].dataType, NumericType)]
    string_cols = [col for col, dtype in df.dtypes if isinstance(df.schema[col].dataType, StringType)]
    
    # Fill NA values with 0 for numeric columns
    if numeric_cols:
        df = df.fillna(0, subset=numeric_cols)
    
    # Drop rows with NA values in string columns
    if string_cols:
        df = df.dropna(subset=string_cols)
    
    return df

In [41]:
collection_population_state_df = fill_values_na_with_zero_or_drop(df = collection_population_state_df)

In [42]:
collection_population_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

## 4.2 - Adding some columns

### 4.2.1 - Regions in Brazil

In [43]:
regions_uf = {
    "north": ("AC", "AP", "AM", "PA", "RO", "RR", "TO"),
    "north_east": ("AL", "BA", "CE", "MA", "PB", "PE", "PI", "RN", "SE"),
    "midwest": ("DF", "GO", "MT", "MS"),
    "south": ("PR", "RS", "SC"),
    "south_east": ("ES", "MG", "RJ", "SP")
}

In [44]:
from typing import Dict, List

def add_region_column(df: DataFrame, regions_uf: Dict[str, List[str]]) -> DataFrame:
    """
    Adds a 'region' column to the given DataFrame based on the 'state' column and the provided mapping of states to regions.

    Parameters
    ----------
    df : DataFrame
        The Spark DataFrame to which the 'region' column will be added.
    regions_uf : dict
        A dictionary where keys are region names (strings) and values are lists of state codes (strings) that belong to each region.

    Returns
    -------
    DataFrame
        A new DataFrame with an additional column 'region'. The 'region' column is populated based on the 'state' column and the mapping provided.
        States not found in the mapping will be assigned the value 'unknown'.
    """
    # Initialize the conditions for the 'when' function
    conditions = None

    # Iterate through the regions and their states to build conditions
    for region, states in regions_uf.items():
        if conditions is None:
            conditions = F.when(F.col("state").isin(*states), F.lit(region))
        else:
            conditions = conditions.when(F.col("state").isin(*states), F.lit(region))
    
    # Use 'otherwise' to set the default value
    df = df.withColumn(
        "region",
        conditions.otherwise(F.lit("unknown"))
    )

    return df

In [45]:
collection_population_state_df = add_region_column(df=collection_population_state_df, regions_uf=regions_uf)

In [46]:
collection_population_state_df.show()

+----+-------+-----+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------------+--------

### 4.2.2 - Adding 'id' column, this will be our surrogate key

In [47]:
def add_id_column(df: DataFrame, order_by: str) -> DataFrame:
    """
    Adds a sequential 'id' column to the DataFrame based on the specified ordering column.

    Parameters
    ----------
    df : DataFrame
        The Spark DataFrame to which the 'id' column will be added.
    order_by : str
        The column name by which the DataFrame should be ordered to generate the sequential 'id' values.

    Returns
    -------
    DataFrame
        A new DataFrame with an additional 'id' column. The 'id' column contains sequential integers based on the specified order.
        The resulting DataFrame will have 'id' as the first column, followed by the original columns.
        """
    window_spec = Window.orderBy(order_by)

    # Add the ID column and reorder columns
    df = df.withColumn("id", F.row_number().over(window_spec))
    id_first_columns = ["id"] + [col for col in df.columns if col != "id"]
    df = df.select(*id_first_columns)

    return df

In [48]:
collection_population_state_df = add_id_column(df=collection_population_state_df, order_by="year")

### 4.2.3 - Moving region column to be near of the state column

In [49]:
columns = []
for col in collection_population_state_df.columns:
    if col != "region":
        columns.append(col)
    if col == "state":
        columns.append("region")
        
print(columns)

['id', 'year', 'month', 'state', 'region', 'import_tax', 'export_tax', 'ipi_tobacco', 'ipi_beverages', 'ipi_auto', 'ipi_linked_imports', 'ipi_others', 'income_tax_individual', 'income_tax_financial', 'income_tax_other', 'withholding_tax_employment', 'withholding_tax_capital', 'withholding_tax_remittances', 'withholding_tax_other', 'tax_financial_operations', 'rural_land_tax', 'provisional_tax_transactions', 'provisional_contribution_transactions', 'cofins', 'cofins_financial', 'cofins_other', 'contribution_social_integration', 'contribution_social_integration_financial', 'contribution_social_integration_other', 'social_contribution_net_profit', 'social_contribution_net_profit_financial', 'social_contribution_net_profit_other', 'intervention_economic_domain_non_deductible_fuels', 'intervention_economic_domain_fuels', 'contribution_security_plan_public_servants', 'contributions_security_plan_public_servants', 'contributions_special_fund_inspection_activities', 'fiscal_recovery_program', 

In [50]:
collection_population_state_final_df = collection_population_state_df.select(*columns)

In [51]:
collection_population_state_final_df.show()

+---+----+-------+-----+----------+-----------+----------+-----------+-------------+-----------+------------------+-----------+---------------------+--------------------+----------------+--------------------------+-----------------------+---------------------------+---------------------+------------------------+--------------+----------------------------+-------------------------------------+------------+----------------+------------+-------------------------------+-----------------------------------------+-------------------------------------+------------------------------+----------------------------------------+------------------------------------+-------------------------------------------------+----------------------------------+------------------------------------------+-------------------------------------------+------------------------------------------------+-----------------------+---------------------------+------------------------------+---------------+---------------------

# 5 - Loading

## 5.1 - Creating MySQL database

In [52]:
from database.create_database import DatabaseConnector

host = os.getenv('DB_HOST', 'localhost')
user = os.getenv('DB_USER', 'root')
password = os.getenv('DB_PASSWORD', '')

db_connector = DatabaseConnector(host=host, user=user, password=password)

In [53]:
try:
    db_connector.get_connection()
    db_connector.create_database("revenue_by_state")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    db_connector.close_connection()

2024-08-31 21:39:55,035 - INFO - Connected to MySQL server
2024-08-31 21:39:55,038 - INFO - Database 'revenue_by_state' created or already exists
2024-08-31 21:39:55,040 - INFO - MySQL connection is closed


## 5.2 - Defining url and properties

In [54]:
jdbc_url, jdbc_properties = db_connector.get_jdbc_url_and_properties()

## 5.3 - Writing on MySQL

In [57]:
table_name: str = "collection_state"
mode: str = "overwrite"

try:
    collection_population_state_final_df.write.jdbc(
        url=jdbc_url,
        table=table_name,
        mode=mode,
        properties=jdbc_properties
        )
    etl_logger.info("Loaded with success")
except Exception as e:
    etl_logger.error(f"Error while loading: {e}")

2024-08-31 21:41:01,905 - INFO - Loaded with success
