In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, when
from pyspark.sql import DataFrame
import pandas as pd
import matplotlib.pyplot as plt
import geopandas as gpd
from matplotlib.colors import Normalize
from matplotlib.cm import ScalarMappable

In [2]:
spark = SparkSession.builder \
    .appName("Brazil States CSVs to DataFrame") \
    .getOrCreate()

In [3]:
file_paths = {
    'Alagoas': ["csv_a23/PAAL2305.csv"], 
    'Amazonas': ["csv_a23/PAAM2303.csv", "csv_a23/PAAM2309.csv", "csv_a23/PAAM2311.csv"],
    'Bahia': ["csv_a23/PABA2308.csv", "csv_a23/PABA2309.csv", "csv_a23/PABA2311.csv", "csv_a23/PABA2312.csv"],
    'Distrito Federal': ["csv_a23/PADF2304.csv"],
    'Pará': ["csv_a23/PAPA2311.csv"], 
    'Roraima': ["csv_a23/PARR2310.csv"], 
    "São Paulo": ["csv_a23/PASP2301a.csv", "csv_a23/PASP2301b.csv", "csv_a23/PASP2301c.csv",
                  "csv_a23/PASP2302a.csv", "csv_a23/PASP2302b.csv", "csv_a23/PASP2302c.csv",
                  "csv_a23/PASP2304a.csv", "csv_a23/PASP2304b.csv", "csv_a23/PASP2304c.csv",
                  "csv_a23/PASP2306a.csv", "csv_a23/PASP2306b.csv", "csv_a23/PASP2306c.csv"
                 ]
}

In [4]:
columns_to_keep = ['PA_CODUNI', 
                   'PA_GESTAO',
                   'PA_UFMUN',
                   'PA_TPUPS',
                   'PA_TIPPRE',
                   'PA_MVM',
                   'PA_CMP',
                   'PA_PROC_ID', 
                   'PA_CNSMED',
                   'PA_CBOCOD',
                   'PA_CIDPRI',
                   'PA_IDADE', 
                   'PA_SEXO',
                   'PA_RACACOR',
                   'PA_MUNPCN',
                   'PA_QTDPRO',
                   'PA_QTDAPR',
                   'PA_VALPRO',
                   'PA_UFDIF',
                   'PA_MNDIF',
                   'PA_VALAPR'
                   ]

In [5]:
all_states_df = None

for state, paths in file_paths.items():
    state_df = None
    for path in paths:
        # Read each CSV file
        df = spark.read.csv(path, header=True, inferSchema=True)
        
        # Ensure all columns_to_keep are present, add missing ones with null values
        existing_columns = df.columns
        missing_columns = [c for c in columns_to_keep if c not in existing_columns]
        for column in missing_columns:
            df = df.withColumn(column, lit(None))
        
        # Select the necessary columns
        df = df.select([col(column) for column in columns_to_keep])
        
        # Filter the rows where PA_CIDPRI equals 'A23' or fill with NA if no such rows exist
        if df.filter(col("PA_CIDPRI") == 'A23').count() > 0:
            df = df.filter(col("PA_CIDPRI") == 'A23')
        else:
            # Create a DataFrame with a single row of 'NA' for columns to keep if PA_CIDPRI = 'A23' does not exist
            na_values = ['NA'] * len(columns_to_keep)
            na_row = spark.createDataFrame([na_values], columns_to_keep)
            df = na_row
        
        # Add a new column for the state
        df = df.withColumn("State", lit(state))
        
        # Union the dataframes for the same state
        if state_df is None:
            state_df = df
        else:
            state_df = state_df.unionByName(df, allowMissingColumns=True)
    
    # Print the progress indicator
    print(f"State '{state}' processed.")
    
    # Union the dataframes for all states
    if all_states_df is None:
        all_states_df = state_df
    else:
        all_states_df = all_states_df.unionByName(state_df, allowMissingColumns=True)

State 'Alagoas' processed.
State 'Amazonas' processed.
State 'Bahia' processed.
State 'Distrito Federal' processed.
State 'Pará' processed.
State 'Roraima' processed.
State 'São Paulo' processed.


In [6]:
# Show the first 5 rows of the results
all_states_df.show(5)

+---------+---------+--------+--------+---------+------+------+----------+---------------+---------+---------+--------+-------+----------+---------+---------+---------+---------+--------+--------+---------+-------+
|PA_CODUNI|PA_GESTAO|PA_UFMUN|PA_TPUPS|PA_TIPPRE|PA_MVM|PA_CMP|PA_PROC_ID|      PA_CNSMED|PA_CBOCOD|PA_CIDPRI|PA_IDADE|PA_SEXO|PA_RACACOR|PA_MUNPCN|PA_QTDPRO|PA_QTDAPR|PA_VALPRO|PA_UFDIF|PA_MNDIF|PA_VALAPR|  State|
+---------+---------+--------+--------+---------+------+------+----------+---------------+---------+---------+--------+-------+----------+---------+---------+---------+---------+--------+--------+---------+-------+
|  3065383|   270240|  270240|      36|        0|  null|  null| 301040044|706705528339119|   223905|      A23|       6|      M|         3|   270240|        1|        1|     2.81|       0|       0|     2.81|Alagoas|
|  3065383|   270240|  270240|      36|        0|  null|  null| 301040044|706705528339119|   223905|      A23|       7|      M|         3|  

In [7]:
# Convert the Spark DataFrame to a Pandas DataFrame
all_states_pandas_df = all_states_df.toPandas()

In [9]:
all_states_pandas_df.to_csv('csv/a23.csv', index=False)

In [10]:
spark.stop()