In [None]:
import os
import pandas as pd
import requests
from tqdm import tqdm
from datetime import datetime
import logging
import altair as alt
import IPython.display as display


In [None]:
# Set up logging to show INFO-level messages
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [None]:
# --- Configuration ---
BASE_URL = "https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SIM/"
RAW_DIR = "./sim_data/"
PROCESSED_DIR = "./processed_data/"
PARQUET_DIR = "./raw_parquet/"
YEARS = range(2015, 2024)  # Adjust year as needed

In [None]:
# Create directories if they don't exist
os.makedirs(RAW_DIR, exist_ok=True)
os.makedirs(PROCESSED_DIR, exist_ok=True)
os.makedirs(PARQUET_DIR, exist_ok=True)

In [None]:
# --- Step 1: Download Files with Dual URL Format Support ---
def download_files():
    """
    Download mortality CSV files for each year, handling dual URL formats.
    Saves each file in RAW_DIR with the filename '<year>.csv'.
    """
    for year in tqdm(YEARS, desc="Downloading files"):
        # Choose URL format based on the year
        if year <= 2021:
            url = f"https://diaad.s3.sa-east-1.amazonaws.com/sim/Mortalidade_Geral_{year}.csv"
        else:
            url = f"https://s3.sa-east-1.amazonaws.com/ckan.saude.gov.br/SIM/DO{str(year)[-2:]}OPEN.csv"
        
        try:
            response = requests.get(url, stream=True)
            response.raise_for_status()  # Raise exception for HTTP errors
            output_file = os.path.join(RAW_DIR, f"{year}.csv")
            with open(output_file, "wb") as f:
                for chunk in response.iter_content(chunk_size=1024):
                    f.write(chunk)
            logging.info(f"Downloaded file for year {year} successfully.")
        except requests.exceptions.RequestException as e:
            logging.error(f"Failed to download file for {year} from {url}: {e}")

# Uncomment the following line to run the download step:
# download_files()

In [None]:
# --- Step 2: Convert CSV to Parquet ---

def convert_all_csv_to_parquet():
    """
    Convert all CSV files in RAW_DIR to Parquet format.
    The output files are saved in PARQUET_DIR with the same base filename.
    """
    csv_files = [f for f in os.listdir(RAW_DIR) if f.endswith(".csv")]
    
    for filename in tqdm(csv_files, desc="Converting CSVs to Parquet"):
        year = filename.split(".")[0]  # Extract year from filename
        input_path = os.path.join(RAW_DIR, filename)
        output_path = os.path.join(PARQUET_DIR, f"{year}.parquet")
        
        try:
            # Read CSV with proper encoding and data type preservation
            df = pd.read_csv(
                input_path,
                sep=";",
                encoding="latin-1",
                low_memory=False,
                dtype={
                    "DTOBITO": "string",
                    "CAUSABAS": "string",
                    "CODMUNRES": "string"
                }
            )
            # Save as Parquet with snappy compression
            df.to_parquet(
                output_path,
                engine="pyarrow",
                compression="snappy"
            )
            logging.info(f"Converted {filename} to Parquet successfully.")
        except Exception as e:
            logging.error(f"Error converting {filename}: {e}")

# Uncomment the following line to convert all CSV files:
# convert_all_csv_to_parquet()

In [None]:
def convert_csv_to_parquet(file_name):
    """
    Convert a specific CSV file to Parquet format.
    
    Parameters:
        file_name (str): Name of the CSV file in RAW_DIR to convert.
    """
    input_path = os.path.join(RAW_DIR, file_name)
    output_path = os.path.join(PARQUET_DIR, f"{file_name.split('.')[0]}.parquet")
    
    try:
        df = pd.read_csv(
            input_path,
            sep=";",
            encoding="latin-1",
            low_memory=False,
            dtype={
                "DTOBITO": "string",
                "CAUSABAS": "string",
                "CODMUNRES": "string"
            }
        )
        df.to_parquet(
            output_path,
            engine="pyarrow",
            compression="snappy"
        )
        logging.info(f"Successfully converted {file_name} to Parquet.")
    except Exception as e:
        logging.error(f"Error converting {file_name}: {e}")

# Example usage for a specific file:
# convert_csv_to_parquet("2021.csv")

In [None]:
# --- Step 3: Aggregate Mortality Data ---
def aggregate_obitos():
    """
    Aggregate mortality data from Parquet files.
    
    Reads each Parquet file, processes the 'DTOBITO' date field,
    groups data by date, and adds columns for year and day of year.
    
    Returns:
        pd.DataFrame: A DataFrame containing the aggregated daily counts.
    """
    summary_list = []
    
    for file in os.listdir(PARQUET_DIR):
        if file.endswith(".parquet"):
            year_str = file.split('.')[0]
            file_path = os.path.join(PARQUET_DIR, file)
            
            try:
                df = pd.read_parquet(file_path, engine="pyarrow")
                
                # Process the 'DTOBITO' column: pad with zeros and convert to datetime
                df["DTOBITO"] = df["DTOBITO"].astype(str).str.zfill(8)
                df["DTOBITO"] = pd.to_datetime(df["DTOBITO"], format="%d%m%Y", errors="coerce")
                
                # Normalize date (remove time component)
                df["date"] = df["DTOBITO"].dt.normalize()
                
                # Group by date and count the number of records (obitos)
                daily_counts = df.groupby("date").size().reset_index(name="number_of_obitos")
                
                # Add the year column as an integer and day of year column
                daily_counts["year"] = int(year_str)
                daily_counts["day_number"] = daily_counts["date"].dt.dayofyear
                
                summary_list.append(daily_counts)
            except Exception as e:
                logging.error(f"Error processing {file}: {e}")
    
    if summary_list:
        final_summary = pd.concat(summary_list, ignore_index=True)
        final_summary.sort_values(by=["year", "date"], inplace=True)
        return final_summary
    else:
        return pd.DataFrame()

In [None]:
# Aggregate the data and save it to CSV
result_df = aggregate_obitos()
if not result_df.empty:
    output_csv = os.path.join(PROCESSED_DIR, "combined_data.csv")
    result_df.to_csv(output_csv, index=False)
    logging.info(f"Aggregated data saved to {output_csv}.")
else:
    logging.warning("No data was aggregated; the resulting DataFrame is empty.")


In [None]:
# --- Step 4: Visualization with Altair ---
alt.renderers.enable('default')
alt.data_transformers.disable_max_rows()

chart = alt.Chart(result_df).mark_rect().encode(
    x=alt.X('day_number:Q', title='Day of Year'),
    y=alt.Y('year:N', title='Year'),
    color=alt.Color('number_of_obitos:Q', title='Number of Obitos', scale=alt.Scale(scheme='reds')),
    tooltip=[
        alt.Tooltip('year:N', title='Year'),
        alt.Tooltip('day_number:Q', title='Day of Year'),
        alt.Tooltip('number_of_obitos:Q', title='Number of Obitos')
    ]
).properties(
    width=600,
    height=300,
    title="Daily Obitos Heatmap"
)

display.display(chart)