In [16]:
import zipfile
import pandas as pd
usable_columns = ['co_municipio_paciente', 'co_vacina', 'sg_vacina', 'dt_vacina', 'no_uf_estabelecimento', 'nu_idade_paciente', 'tp_sexo_paciente', 'co_paciente']

def extract_zip(path_to_zip_file, directory_to_extract_to):
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(directory_to_extract_to)

def create_clean_pandas_dataframe_from_csv(csv_file, nrowns=None):
    df = pd.read_csv(csv_file, nrows=nrowns, encoding_errors='replace', delimiter=';', dtype=str)
    df = df[usable_columns]
    return df


In [17]:
import psycopg2
import numpy as np
import math

from psycopg2.extensions import register_adapter, AsIs
import os

import psycopg2.extras


# Function to adapt numpy.int64 to PostgreSQL int
def adapt_numpy_int64(numpy_int64):
    return AsIs(numpy_int64)


# Register the adapter
register_adapter(np.int64, adapt_numpy_int64)


conn = psycopg2.connect(
    database="defaultdb",
    host="ibd-tp1-jssantosqueiroz-badd.f.aivencloud.com",
    user="avnadmin",
    password="",
    port="27074",
)

## END database prep


def delete_and_create_vacination_table():

    cursor = conn.cursor()

    cursor.execute("DROP TABLE IF EXISTS vacinacao")

    cursor.execute(
        """

        CREATE TABLE vacinacao (

            co_municipio_paciente VARCHAR(255) DEFAULT '000000',
            co_uf_municipio_paciente int DEFAULT 00,

            co_vacina VARCHAR(255),

            sg_vacina VARCHAR(255),

            dt_vacina VARCHAR(255),

            nu_idade_paciente INT,

            tp_sexo_paciente CHAR(1),

            co_paciente VARCHAR(255)
            -- CONSTRAINT co_uf_municipio_paciente FOREIGN KEY(co_uf_municipio_paciente) REFERENCES estados(cod_uf)

        );
    """
    )
    conn.commit()
    cursor.close()



def insert_vacination_data(df, retry=False):
    if retry:
        conn.reset()

    cursor = conn.cursor()
    df.fillna(0, inplace=True)

    df["co_uf_municipio_paciente"] = df["co_municipio_paciente"].apply(
        lambda x: int(x[:2]) if x != 0 else 0
    )
    df["co_municipio_paciente"] = df["co_municipio_paciente"].apply(
        lambda x: int(x) if x != 0 else 0
    )
    df["co_vacina"] = df["co_vacina"].apply(lambda x: int(x) if x != 0 else 0)
    df["nu_idade_paciente"] = df["nu_idade_paciente"].apply(
        lambda x: int(x) if x != 0 else 0
    )
    df = df[
        [
            "co_municipio_paciente",
            "co_vacina",
            "sg_vacina",
            "dt_vacina",
            "nu_idade_paciente",
            "tp_sexo_paciente",
            "co_paciente",
            "co_uf_municipio_paciente",
        ]
    ]

    try:
        psycopg2.extras.execute_values(
            cursor,
            """

            INSERT INTO vacinacao 
                (
                co_municipio_paciente, 
                co_vacina, 
                sg_vacina, 
                dt_vacina, 
                nu_idade_paciente, 
                tp_sexo_paciente, 
                co_paciente,
                co_uf_municipio_paciente
            )

            VALUES %s;
        """,
            (df.itertuples(index=False, name=None)),
        )

        conn.commit()
    except Exception as e:
        print(e)
        print("Error inserting data")
        if str(e).find("transaction is blocked") != -1:
            print("Transaction blocked, trying again")
        if not os.path.exists("error.csv"):
            df.to_csv("error.csv")

        else:
            df.to_csv("error.csv", mode="a", header=False)
            insert_vacination_data(df, True, 1 if retry==False else 0)
    finally:
        cursor.close()



# nrowns = 10
# df = create_clean_pandas_dataframe_from_csv('./raw_data/datasus/vacinacao_set_2024/vacinacao_set_2024.csv', nrowns)
# delete_and_create_vacination_table()

# insert_vacination_data(df)


## IMPROVING INSERTION TIME
# import io

# def clean_csv_value(value: Optional[Any]) -> str:
#     if value is None:
#         return r'\N'
#     return str(value).replace('\n', '\\n')

# def copy_stringio(connection, data: Iterator[Dict[str, Any]]) -> None:
#     with connection.cursor() as cursor:
#         csv_file_like_object = io.StringIO()
#         for row in data:
#             csv_file_like_object.write(';'.join(map(clean_csv_value, (
#                 row["co_municipio_paciente"],
#                     row["co_municipio_paciente"][:2],
#                     row["co_vacina"],
#                     row["sg_vacina"],
#                     row["dt_vacina"],
#                     row["nu_idade_paciente"],
#                     row["tp_sexo_paciente"],
#                     row["co_paciente"],
#             ))) + '\n')
#         csv_file_like_object.seek(0)
#         cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')

In [19]:
zip_files = [f for f in os.listdir("./raw_data/datasus") if f.endswith(".zip")]
print(zip_files)
delete_and_create_vacination_table()
count_max = 2
count = 0
count_files = 0
start_chunk = 0
start_file = 0

for zip_file in zip_files:
    csv_files = [f for f in os.listdir("./raw_data/datasus") if f.endswith(".csv")]
    csv_file = zip_file.split(".")[0] + ".csv"
    csv_file = f"./raw_data/datasus/{csv_file}"
    if os.path.isfile(csv_file):
        print(f"{csv_file} already extracted")
    else:
        print(f"Extracting {zip_file}")
        extract_zip(f"./raw_data/datasus/{zip_file}", "./raw_data/datasus")

    nrowns = 15000
    ## reads the csv in chunks
    for df in pd.read_csv(
        csv_file,
        chunksize=nrowns,
        encoding_errors="replace",
        delimiter=";",
        dtype=str,
    ):
        count += 1
        if count < start_chunk:
            continue
        df = df[usable_columns]
        print(f"Inserting chunk {count_files}/{count}")
        # if count >= count_max:
        #     break
        insert_vacination_data(df)
    count_files += 1
    # if count >= count_max:
    #     break;

['vacinacao_abr_2022.zip', 'vacinacao_abr_2023.zip', 'vacinacao_abr_2024.zip', 'vacinacao_ago_2022.zip', 'vacinacao_ago_2023.zip', 'vacinacao_ago_2024.zip', 'vacinacao_dez_2022.zip', 'vacinacao_dez_2023.zip', 'vacinacao_fev_2022.zip', 'vacinacao_fev_2023.zip', 'vacinacao_fev_2024.zip', 'vacinacao_jan_2022.zip', 'vacinacao_jan_2023.zip', 'vacinacao_jan_2024.zip', 'vacinacao_jul_2022.zip', 'vacinacao_jul_2023.zip', 'vacinacao_jul_2024.zip', 'vacinacao_jun_2022.zip', 'vacinacao_jun_2023.zip', 'vacinacao_jun_2024.zip', 'vacinacao_mai_2022.zip', 'vacinacao_mai_2023.zip', 'vacinacao_mai_2024.zip', 'vacinacao_mar_2022.zip', 'vacinacao_mar_2023.zip', 'vacinacao_mar_2024.zip', 'vacinacao_nov_2022.zip', 'vacinacao_nov_2023.zip', 'vacinacao_out_2022.zip', 'vacinacao_out_2023.zip', 'vacinacao_set_2022.zip', 'vacinacao_set_2023.zip', 'vacinacao_set_2024.zip']
./raw_data/datasus/vacinacao_abr_2022.csv already extracted
Inserting chunk 0/162
Inserting chunk 0/163


KeyboardInterrupt: 

: 