## Importing required libs: 

In [3]:
import time
import pandas as pd
import os
from pandas_ods_reader import read_ods
import warnings
import re
import xlrd
from sqlalchemy import create_engine, Table, Column, Integer, String, Float, MetaData
from sqlalchemy.exc import IntegrityError

### Ignoring errors: 

In [4]:
warnings.filterwarnings("ignore")

# Pre-processing

# Part I: Intro to files

### 1.1 - We currently have five distinct data groups: 

### 1.1.1 - A. (Sales notifications of all industrialized drugs that have restricted sales in Brazil, like antidepressants, antibiotics, etc.)

Example: "project_data\raw_anvisa_files\industrialized_meds\EDA_Industrializados_201401.csv"   
(93 files in this dir ranging from 2014JAN ~ 2021NOV)

### 1.1.2 - B. (Sales notifications of all compounded drugs that have restricted sales in Brazil, like antidepressants, antibiotics, etc.) 

Example: "project_data\raw_anvisa_files\manipulated_meds\EDA_Manipulados_201401.csv"   
(93 files in this dir ranging from 2014JAN ~ 2021NOV)  

### 1.1.3 - C. (Notifications of all obituaries in Brazil)

Example: "project_data\raw_sim_files\Mortalidade_Geral_2014.csv"  
(8 files ranging yearly from 2014 to 2021)

### 1.1.4 - D. (Brazilian cities' total area)
Example:"\project_data\ibge_demographic\br_city_area.ods"  
(A single file from 2023) 



### 1.1.5 - E. (Brazilian cities' population)
Example: "\project_data\ibge_demographic\br_city_population.ods"   
(A single file from 2021) 


# Part II: Checking column names in all paths:

### 2.1 Defining all "folder" paths:

In [3]:
folder_paths = [
    r"(...)\Drug_Resistant_Bacteria\project_data\raw_anvisa_files\industrialized_meds",
    r"(...)\Drug_Resistant_Bacteria\project_data\raw_anvisa_files\manipulated_meds",
    r"(...)\Drug_Resistant_Bacteria\project_data\raw_sim_files",
    r"(...)\Drug_Resistant_Bacteria\project_data\ibge_demographic\area",
    r"(...)\Drug_Resistant_Bacteria\project_data\ibge_demographic\population"   

]

### 2.2 Defining a function that iterates through files inside "folder_paths"

In [4]:
def get_file_paths(folder_path, extension=None):
    if extension:
        return [os.path.join(folder_path, filename) for filename in os.listdir(folder_path) if filename.endswith(extension)]
    else:
        return [os.path.join(folder_path, filename) for filename in os.listdir(folder_path)]

### 2.3 Defining a function to verify the integrity of column names and return the file indices of columns that have different names.

#### I have also included another feature, namely chunk_size, with the goal of optimizing resource usage by not opening entire files at once. (Using 1k chunks is more than enough to retrieve column names).

In [5]:
def check_columns_similarity(folder_paths, chunk_size=1000):
    for folder_path in folder_paths:
        file_paths = get_file_paths(folder_path)

        if not file_paths:
            raise ValueError(f"Folder {folder_path} is empty. No files to process.")

        # Extract only the folder name
        folder_name = os.path.basename(folder_path)

        # Dictionary to store column names for each file within the folder
        folder_columns = {}

 # Process each file within the folder
    for file_path in file_paths:
        if file_path.endswith(".csv"):
            reader = pd.read_csv(file_path, delimiter=';', encoding='latin-1', quotechar='"', chunksize=chunk_size)
            df_chunk = next(reader)
            folder_columns[file_path] = df_chunk.columns.tolist()
        elif file_path.endswith(".xls"):
            for df_chunk in read_excel_in_chunks(file_path, chunk_size=chunk_size):
                folder_columns[file_path] = df_chunk.columns.tolist()
                break  # Only process the first chunk for column names
        elif file_path.endswith(".xlsx"):
            # For .xlsx files, continue using openpyxl
            reader = pd.read_excel(file_path, engine='openpyxl', chunksize=chunk_size)
            df_chunk = next(reader)
            folder_columns[file_path] = df_chunk.columns.tolist()
        else:
            print(f"Unsupported file format for {file_path}. Skipping.")

    # Check if all files within the folder have the same columns
    if len(set(map(tuple, folder_columns.values()))) == 1:
        print(f"\U0001F7E2 All files in folder \"{folder_name}\" have the same columns.\n")  # Green circle emoji
        return {'files_with_same_columns': list(range(len(file_paths))), 'files_with_different_columns': []}
    else:
        print(f"\U0001F534 Files in folder \"{folder_name}\" have different columns.\n")  # Red circle emoji

        # Group files based on columns
        grouped_files = {}
        for index, (path, columns) in enumerate(folder_columns.items()):
            key = tuple(columns)
            if key not in grouped_files:
                grouped_files[key] = [index]
            else:
                grouped_files[key].append(index)

        return {'files_with_same_columns': grouped_files.get(next(iter(grouped_files)), []),
                'files_with_different_columns': [index for index in range(len(file_paths)) if index not in grouped_files.get(next(iter(grouped_files)), [])]}


### 2.4 Usage example:

### - A)

In [26]:
# Choose the index of the folder to be processed (e.g., folder_paths[0] for the first folder)
selected_folder_path_a = folder_paths[0]

# Get all files in the selected folder
all_files_a = get_file_paths(selected_folder_path_a)


In [34]:
# Print the columns of the files in the selected folder
result_columns = check_columns_similarity([selected_folder_path_a])
print("Columns of one file:", result_columns)

🟢 All files in folder "industrialized_meds" have the same columns.

Columns of one file: {'files_with_same_columns': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93], 'files_with_different_columns': []}


### - B)  

In [27]:
selected_folder_path_b = folder_paths[1]
all_files_b = get_file_paths(selected_folder_path_b)

In [59]:
result_columns = check_columns_similarity([selected_folder_path_b])
print("Columns of one file:", result_columns)

🟢 All files in folder "manipulated_meds" have the same columns.

Columns of one file: {'files_with_same_columns': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93], 'files_with_different_columns': []}


### - C)  

In [28]:
selected_folder_path_c = folder_paths[2]
all_files_c = get_file_paths(selected_folder_path_c)

In [36]:
result_columns = check_columns_similarity([selected_folder_path_c])
print("Columns of one file:", result_columns)

🔴 Files in folder "raw_sim_files" have different columns.

Columns of one file: {'files_with_same_columns': [0, 1, 2, 3], 'files_with_different_columns': [4, 5, 6, 7]}


## The files in the 'C' folder don't have standardized columns.
#### We can keep this in mind when looking for errors during the subsequent data insertion into MSSQL.

### - D)  

In [29]:
selected_folder_path_d = folder_paths[3]
all_files_d = get_file_paths(selected_folder_path_d)

In [61]:
result_columns = check_columns_similarity([selected_folder_path_d])
print("Columns of one file:", result_columns)

🟢 All files in folder "area" have the same columns.

Columns of one file: {'files_with_same_columns': [0], 'files_with_different_columns': []}


### - E)  

In [None]:
selected_folder_path_e = folder_paths[4]
all_files_e = get_file_paths(selected_folder_path_e)

In [62]:
result_columns = check_columns_similarity([selected_folder_path_e])
print("Columns of one file:", result_columns)

🟢 All files in folder "population" have the same columns.

Columns of one file: {'files_with_same_columns': [0], 'files_with_different_columns': []}


# Part III: Joining D and E files:

### 3.1 - Doing a inner join with D and E

D represents the local directory containing data for the total area of cities, while E represents the file containing the total population of cities. Since both files are relatively small, it makes sense to perform this operation with Pandas, conducting an inner join based on the city code, and subsequently modifying it for compatibility with an SQL-supported extension.

In [55]:
# 3.1.1 Reading the data
df_d = read_ods(folder_path_d, 1)
df_e = read_ods(folder_path_e, 1)

# 3.1.2 Merge the DataFrames based on the common column 'NM_MUN' in df_d and 'NOME DO MUNICÍPIO' in df_e
merged_df = df_d.merge(df_e[['NOME DO MUNICÍPIO', ' POPULAÇÃO ESTIMADA ']], left_on='NM_MUN', right_on='NOME DO MUNICÍPIO', how='inner')

# 3.1.3 Drop the redundant 'NOME DO MUNICÍPIO' column
merged_df.drop(columns=['NOME DO MUNICÍPIO'], inplace=True)

# 3.1.4 Rename the merged DataFrame to 'city_area_population'
city_area_population = merged_df

# 3.1.5: Save the DataFrame as a CSV file

# 3.1.6 Save the 'city_area_population' DataFrame as a CSV file in the same directory

output_file_path= r"(...)\My_2ndEDA_Drug_Resistent_Bacteria\project_data\ibge_demographic\city_area_population.csv"

city_area_population.to_csv(output_file_path, index=False)

# 3.1.7 Display the first few rows of the DataFrame

# 3.1.8 Print the first few rows of the 'city_area_population' DataFrame
print(city_area_population.head())

  ID CD_UF     NM_UF NM_UF_SIGLA   CD_MUN                 NM_MUN  AR_MUN_2022  \
0  1    11  Rondônia          RO  1100015  Alta Floresta D'Oeste     7067.127   
1  2    11  Rondônia          RO  1100023              Ariquemes     4426.571   
2  3    11  Rondônia          RO  1100031                 Cabixi     1314.352   
3  4    11  Rondônia          RO  1100049                 Cacoal     3793.000   
4  5    11  Rondônia          RO  1100056             Cerejeiras     2783.300   

   POPULAÇÃO ESTIMADA   
0              22516.0  
1             111148.0  
2               5067.0  
3              86416.0  
4              16088.0  


# Part IV: Getting column dtypes:

### 4.1.1 - For "a" files:

In [44]:
if all_files_a:
    # Read the first CSV file in the list into a DataFrame
    df = pd.read_csv(all_files_a[0], delimiter=';', encoding='latin-1', quotechar='"')

    # Get the data types of the columns
    column_types = df.dtypes

    # Print the data types
    print(column_types)
else:
    print("No CSV files found in the directory.")

ANO_VENDA                   int64
MES_VENDA                   int64
UF_VENDA                   object
MUNICIPIO_VENDA            object
PRINCIPIO_ATIVO            object
DESCRICAO_APRESENTACAO     object
QTD_VENDIDA                 int64
UNIDADE_MEDIDA             object
CONSELHO_PRESCRITOR        object
UF_CONSELHO_PRESCRITOR     object
TIPO_RECEITUARIO          float64
CID10                      object
SEXO                      float64
IDADE                     float64
UNIDADE_IDADE             float64
dtype: object


### 4.1.2 - For "b" files:

In [47]:
if all_files_b:
    # Read the second CSV file in the list into a DataFrame
    df = pd.read_csv(all_files_b[1], delimiter=';', encoding='latin-1', quotechar='"')

    # Get the data types of the columns
    column_types = df.dtypes

    # Print the data types
    print(column_types)
else:
    print("No CSV files found in the directory.")

ANO_VENDA                           int64
MES_VENDA                           int64
UF_VENDA                           object
MUNICIPIO_VENDA                    object
PRINCIPIO_ATIVO                    object
QTD_ATIVO_POR_UNID_FARMACOTEC      object
UNIDADE_MEDIDA_PRINCIPIO_ATIVO     object
QTD_UNIDADE_FARMACOTECNICA         object
TIPO_UNIDADE_FARMACOTECNICA        object
CID10                              object
SEXO                              float64
IDADE                             float64
UNIDADE_IDADE                     float64
dtype: object


### 4.1.3 - For "c" files:

In [65]:
# Setting pandas option that will let us see all columns
pd.set_option('display.max_rows', None)

if all_files_c:
    # Read the third CSV file in the list into a DataFrame
    df = pd.read_csv(all_files_c[2], delimiter=';', encoding='latin-1', quotechar='"')

    # Get the data types of the columns
    column_types = df.dtypes

    # Print the data types
    print(column_types)
else:
    print("No CSV files found in the directory.")

CONTADOR        int64
ORIGEM          int64
TIPOBITO        int64
DTOBITO         int64
HORAOBITO     float64
NATURAL       float64
CODMUNNATU    float64
DTNASC        float64
IDADE           int64
SEXO            int64
RACACOR       float64
ESTCIV        float64
ESC           float64
ESC2010       float64
SERIESCFAL    float64
OCUP          float64
CODMUNRES       int64
LOCOCOR         int64
CODESTAB      float64
ESTABDESCR    float64
CODMUNOCOR      int64
IDADEMAE      float64
ESCMAE        float64
ESCMAE2010    float64
SERIESCMAE    float64
OCUPMAE       float64
QTDFILVIVO    float64
QTDFILMORT    float64
GRAVIDEZ      float64
SEMAGESTAC    float64
GESTACAO      float64
PARTO         float64
OBITOPARTO    float64
PESO          float64
TPMORTEOCO    float64
OBITOGRAV     float64
OBITOPUERP    float64
ASSISTMED     float64
EXAME         float64
CIRURGIA      float64
NECROPSIA     float64
LINHAA         object
LINHAB         object
LINHAC         object
LINHAD         object
LINHAII   

### 4.1.4 - For "F" files:
#### (We refer to the output obtained by combining "E" and "D" as "F")

In [30]:
# Defining the "file_path_f": 

file_path_f = r"(...)\Drug_Resistant_Bacteria\project_data\ibge_demographic\city_area_population.csv"

In [8]:
# Lê o arquivo CSV em um DataFrame
df = pd.read_csv(file_path_f, delimiter=';', encoding='utf-8', quotechar='"')

# Obtém os tipos de dados das colunas
column_types = df.dtypes

# Imprime os tipos de dados
print(column_types)

ID,CD_UF,NM_UF,NM_UF_SIGLA,CD_MUN,NM_MUN,AR_MUN_2022, POPULAÇÃO ESTIMADA     object
dtype: object


# Part V: "Dropping unnecessary columns"
#### (Or selecting desired columns, you choose, lol)

### 5.1 - Defining desired columns for each file types:

In [31]:
desired_columns_a = ['ANO_VENDA', 'MES_VENDA', 'UF_VENDA', 'MUNICIPIO_VENDA', 'PRINCIPIO_ATIVO', 'QTD_VENDIDA', 'UNIDADE_MEDIDA',
 'CID10', 'SEXO', 'IDADE', 'UNIDADE_IDADE']

desired_columns_b = ['ANO_VENDA', 'MES_VENDA', 'UF_VENDA', 'MUNICIPIO_VENDA', 'PRINCIPIO_ATIVO', 
                      'CID10', 'SEXO', 'IDADE', 'UNIDADE_IDADE']

desired_columns_c = ['CONTADOR', 'DTOBITO', 'NATURAL', 'CODMUNNATU', 'IDADE', 'SEXO', 'CODMUNRES', 'CODESTAB',
 'CODMUNOCOR', 'LINHAA', 'LINHAB', 'LINHAC', 'LINHAD', 'LINHAII', 'CAUSABAS','CAUSABAS_O',
 'DTCADASTRO', 'ATESTADO', 'CAUSAMAT','STDOEPIDEM']

desired_columns_d = ['ID','CD_UF','NM_UF','NM_UF_SIGLA','CD_MUN','NM_MUN','AR_MUN_2022']

# Part VI: Connecting to SQL server and creating target tables:

### 6.1 - Defining function that creates table inside mssql: 

In [None]:
def create_table(engine, table_name, column_definitions, if_exists='fail'):
    metadata = MetaData()
    table = Table(table_name, metadata, *[Column(name, col_type) for name, col_type in column_definitions.items()])

    try:
        metadata.create_all(engine)
        print(f'Table "{table_name}" created successfully.')
    except IntegrityError as e:
        if if_exists == 'replace':
            metadata.drop_all(engine)
            metadata.create_all(engine)
            print(f'Table "{table_name}" replaced successfully.')
        else:
            raise e

### 6.2 - Defining engine parameters to connect to sql instance:

In [None]:
# Replace 'your_server', 'your_database', 'your_user', and 'your_password' with the correct information
engine = create_engine(r'mssql+pyodbc://"your_server"/"your_database"?driver=ODBC+Driver+17+for+SQL+"your_password")

### 6.3 - Defining which dtypes for creating each table

In [42]:
# List of table names
table_names = ['industrialized_meds', 'manipulated_meds', 'death_obituaries', 'demographics']  # Add desired column names here

industrialized_meds = {
    'ANO_VENDA': Integer(),
    'MES_VENDA': Integer(),
    'UF_VENDA': String(),
    'MUNICIPIO_VENDA': String(),
    'PRINCIPIO_ATIVO': String(),
    'DESCRICAO_APRESENTACAO': String(),
    'QTD_VENDIDA': Integer(),
    'UNIDADE_MEDIDA': String(),
    'CONSELHO_PRESCRITOR': String(),
    'UF_CONSELHO_PRESCRITOR': String(),
    'TIPO_RECEITUARIO': Float(),
    'CID10': String(),
    'SEXO': Float(),
    'IDADE': Float(),
    'UNIDADE_IDADE': Float()
}

manipulated_meds = {
    'ANO_VENDA': Integer(),
    'MES_VENDA': Integer(),
    'UF_VENDA': String(),
    'MUNICIPIO_VENDA': String(),
    'PRINCIPIO_ATIVO': String(),
    'QTD_ATIVO_POR_UNID_FARMACOTEC': String(),
    'UNIDADE_MEDIDA_PRINCIPIO_ATIVO': String(),
    'QTD_UNIDADE_FARMACOTECNICA': String(),
    'TIPO_UNIDADE_FARMACOTECNICA': String(),
    'CID10': String(),
    'SEXO': Float(),
    'IDADE': Float(),
    'UNIDADE_IDADE': Float()
}

death_obituaries = {
 
    'DTOBITO': Integer(),
    'NATURAL': Integer(),
    'CODMUNNATU': Integer(),
    'IDADE': Integer(),
    'SEXO': Integer(),
    'CODMUNRES':Integer(),
    'LOCOCOR': Integer(),  
    'CODMUNOCOR':Integer(),
    'LINHAA': String(),
    'LINHAB': String(),
    'LINHAC': String(),
    'LINHAD': String(),
    'LINHAII': String(),
    'CAUSABAS': String(),       
    'ATESTADO': String(),
    'STDOEPIDEM': Integer()
}

demographics = {
    'ID': Integer(),
    'CD_UF': Integer(),
    'NM_UF': String(),
    'NM_UF_SIGLA': String(),
    'CD_MUN': Integer(),
    'NM_MUN': String(),
    'AR_MUN_2022': Float(),
    'POPULAÇÃO ESTIMADA': String()
}

### 5.3 - Calling function to create tables:

In [48]:
# Iterate over table names and create tables
for table_name, column_definitions in zip(table_names, [industrialized_meds, manipulated_meds, death_obituaries, demographics]):
    create_table(engine, table_name, column_definitions)

Table "industrialized_meds" created successfully.
Table "manipulated_meds" created successfully.
Table "death_obituaries" created successfully.
Table "demographics" created successfully.


# Part VI: Counting number of rows in each file to verify data integrity after insertions

### 6.1 - Defining the function:

In [38]:
def count_rows_in_file(file_path, encoding='latin-1'):
    try:
        df = pd.read_csv(file_path, delimiter=';', encoding=encoding, quoting=csv.QUOTE_ALL)
        row_count = len(df)
        return row_count
    except UnicodeDecodeError:
        print(f"UnicodeDecodeError: Could not read the file {file_path}.")
        return 0

def create_report(folder_path, output_file_path, encoding='latin-1'):
    row_counts = {}

    # Get all CSV files in the folder
    csv_files = get_file_paths(folder_path, extension='.csv')

    for file_path in csv_files:
        row_count = count_rows_in_file(file_path, encoding)

        # Extract year and month from the filename
        match = re.search(r'_(\d{4})(\d{2})?\.csv', os.path.basename(file_path))  # Updated regex pattern
        if match:
            year = match.group(1)
            month = match.group(2) or ''
            key = f"{year};{month}"

            row_counts[key] = row_count

    # Convert row_counts to a DataFrame
    report_df = pd.DataFrame(list(row_counts.items()), columns=["year;month", "count"])

    # Split the "year;month" column into two columns: "year" and "month"
    report_df[["year", "month"]] = report_df["year;month"].str.split(";", expand=True)

    # Now you can rename the columns
    report_df = report_df[["year", "month", "count"]]

    # Save the report to the CSV file
    report_df.to_csv(output_file_path, index=False, sep=';', decimal=',', header=True)

    # Display the first few rows of the report
    print(report_df.head())

## 6.2 Calling the report function:
### We need to provide two arguments to generate the report that counts number of rows per file in each different dir: 

"folder_path" and "output_file_path": Example below:

### 'A' folder: 

In [11]:
output_report_path_a = r"(...)\Drug_Resistant_Bacteria\project_data\validation_reports\validation_report_a.csv"
create_report(selected_folder_path_a, output_report_path_a)

   year month    count
0  2014    01  4663124
1  2014    02  4461102
2  2014    03  4770799
3  2014    04  4959036
4  2014    05  5257119


### 'B' folder: 

In [39]:
output_report_path_a = r"(...)\Drug_Resistant_Bacteria\project_data\validation_reports\validation_report_b.csv"
create_report(selected_folder_path_b, output_report_path_b)

   year month   count
0  2014    01  272874
1  2014    02  277087
2  2014    03  272527
3  2014    04  283229
4  2014    05  295848


### 'C' folder: 

In [40]:
output_report_path_c = r"(...)\Drug_Resistant_Bacteria\project_data\validation_reports\validation_report_c.csv"
create_report(selected_folder_path_c, output_report_path_c)

   year month    count
0  2014        1227040
1  2015        1264176
2  2016        1309775
3  2017        1312664
4  2018        1316720


### 'F' folder: 
"The file in 'f' dir does not contain either years or months, so we only need to count the number of rows."

In [16]:
try:
    df = pd.read_csv(file_path_f)
    line_count = len(df)
    print(f"Total lines in the file: {line_count}")
except FileNotFoundError:
    print("File not found.")
except pd.errors.EmptyDataError:
    print("The file is empty or does not contain data.")
except pd.errors.ParserError:
    print("Error parsing the CSV file.")

Total lines in the file: 6225


# Part VII: Inserting files into respectives tables

### 7.1 Defining function that inserts files into mssql:

In [37]:
def insert_csv_files_into_mssql(folder_path, table_name, desired_columns, engine, chunk_size=5000):
    column_index_mapping = {col_name: i for i, col_name in enumerate(desired_columns)}

    start_time_total = time.time()

    file_paths = get_file_paths(folder_path, extension=".csv")

    # Abrir a conexão uma vez para todas as inserções
    with engine.connect() as conn:
        for file_path in file_paths:
            file_name = os.path.basename(file_path)

            start_time = time.time()

            chunks = pd.read_csv(file_path, delimiter=';', encoding='latin-1', quotechar='"', chunksize=chunk_size)

            for i, chunk in enumerate(chunks):
                df_selected_columns = chunk[[col for col in chunk.columns if col in column_index_mapping]]

                # Usar a conexão para inserção
                df_selected_columns.to_sql(table_name, con=conn, if_exists='append', index=False, method='multi')

                print(f'Chunk {i+1} inserted for {file_name}.')

            end_time = time.time()
            elapsed_time = end_time - start_time

            print(f'Time elapsed for {file_name}: {elapsed_time} seconds')

    end_time_total = time.time()
    elapsed_time_total = end_time_total - start_time_total

    print(f'Total time elapsed: {elapsed_time_total} seconds')

### 7.2 - Inserting files into 'industrialized_meds' table:

In [None]:
insert_into_a = insert_csv_files_into_mssql_(selected_folder_path_a, 'industrialized_meds', desired_columns_a.keys(), engine, chunk_size=5000)

### 7.3 - Inserting files into 'manipulated_meds' table:

In [None]:
insert_into_b = insert_csv_files_into_mssql(selected_folder_path_b, 'manipulated_meds', desired_columns_b.keys(), engine, chunk_size=5000)

### 7.4 - Inserting Files into the 'death_obituaries' table:

#### (Initially, my plan was to insert all files directly into the death_obituaries table in the SQL Server and perform data cleaning within the SQL instance. However, this approach was proving to be very time-consuming, and multiple attempts to insert into MSSQL were failing. Therefore, I decided to adopt a different approach, which included the following steps:

#### 1- Reducing Sample Size: I used Pandas to work with a reduced sample of the data, aiming to optimize processing and analysis,
#### taking only absolutely neccessary columns
#### 2 - Data preparation: Converted specific numeric columns to integers and rounded the values to ensure data consistency.
#### 3 - Standardized the dates to a uniform format, facilitating database insertion. 

In [None]:
# Setting to display more columns
pd.set_option('display.max_columns', 20)

# Necessary columns
necessary_columns = ['DTOBITO', 'NATURAL', 'CODMUNNATU', 'IDADE', 'SEXO',
                     'CODMUNRES', 'LOCOCOR', 'CODMUNOCOR', 'LINHAA', 'LINHAB',
                     'LINHAC', 'LINHAD', 'LINHAII', 'CAUSABAS', 'ATESTADO', 'STDOEPIDEM']

def convert_to_integer(df, columns):
    for column in columns:
        if column in df.columns:
            df[column] = df[column].fillna(0).round().astype(int)
    return df

def standardize_date(column):
    column = column.astype(str)
    column = column.str.pad(width=8, side='left', fillchar='0')
    column = pd.to_datetime(column, format='%d%m%Y', errors='coerce')
    return column

def process_and_insert_csv_folder_c(file_path, table_name, engine, chunk_size=5000):
    csv_reader = pd.read_csv(file_path, delimiter=';', encoding='latin-1', quotechar='"', chunksize=chunk_size)
    for chunk in csv_reader:
        # Process the chunk
        chunk_filtered = chunk[necessary_columns]
        chunk_filtered = convert_to_integer(chunk_filtered, ['NATURAL', 'CODMUNNATU', 'CODMUNRES', 'LOCOCOR', 'CODMUNOCOR', 'IDADE', 'SEXO', 'STDOEPIDEM'])
        chunk_filtered['DTOBITO'] = standardize_date(chunk_filtered['DTOBITO'])

        # Insert processed data into the table
        chunk_filtered.to_sql(table_name, con=engine, if_exists='append', index=False)

# Selected folder and file list
# selected_folder_path_c = folder_paths[2]
# all_files_c = get_file_paths(selected_folder_path_c)

# Process and insert each CSV file from the selected folder
for file_path in all_files_c:
    process_and_insert_csv_folder_c(file_path, 'death_obituaries', engine, chunk_size=5000)

### 7.5 - Inserting F file into 'demographics' table:

In [None]:
insert_into_f = insert_csv_files_into_mssql(folder_path_f, 'demographics', desired_columns_f, engine)

# Part VIII: Validating all the insertions on the sql instance:

### 8.1 - Defining functions: 

In [6]:
#8.1.1 - Function to execute SQL query and return a DataFrame

def execute_sql_query(sql_query, engine):
    return pd.read_sql(sql_query, engine)

#8.1.2 - Function to read a CSV and adjust the 'count' column

def read_and_adjust_csv(csv_file_path):
    df = pd.read_csv(csv_file_path, delimiter=';')
    df['count'] = df['count'] - 1
    df.rename(columns={'count': 'count_csv'}, inplace=True)
    return df

#8.1.3 - Function to merge and check if the counts match

def merge_and_compare(df_sql, df_csv, join_columns):
    merged_df = pd.merge(df_sql, df_csv, on=join_columns, how='outer')
    merged_df['count_match'] = merged_df['count_sql'] == merged_df['count_csv']
    return merged_df

### 8.2 - Validating 'industrialized_meds': 

In [8]:
# SQL Query

sql_query_a = """
SELECT COUNT(*) AS count_sql, ANO_VENDA AS year, MES_VENDA AS month
FROM dbo.industrialized_meds
GROUP BY ANO_VENDA, MES_VENDA
ORDER BY ANO_VENDA, MES_VENDA;
"""
validation_report_sql_a = execute_sql_query(sql_query_a, engine)

validation_report_csv_a = read_and_adjust_csv(output_report_path_a)

merged_df_a = merge_and_compare(validation_report_sql_a, validation_report_csv_a, ['year', 'month'])

print(merged_df_a)


    count_sql  year  month  count_csv  count_match
0     4663123  2014      1    4663123         True
1     4461101  2014      2    4461101         True
2     4770798  2014      3    4770798         True
3     4959035  2014      4    4959035         True
4     5257118  2014      5    5257118         True
..        ...   ...    ...        ...          ...
89    6300744  2021      7    6300744         True
90    6263825  2021      8    6263825         True
91    6131691  2021      9    6131691         True
92    5989708  2021     10    5989708         True
93    2785568  2021     11    2785568         True

[94 rows x 5 columns]


### 8.3 - Validating 'manipulated_meds': 

In [10]:
# SQL Query 

sql_query_b = """
SELECT COUNT(*) AS count_sql, ANO_VENDA AS year, MES_VENDA AS month
FROM dbo.manipulated_med
GROUP BY ANO_VENDA, MES_VENDA
ORDER BY ANO_VENDA, MES_VENDA;
"""


validation_report_sql_b = execute_sql_query(sql_query_b, engine)

validation_report_csv_b = read_and_adjust_csv(output_report_path_b)

merged_df_b = merge_and_compare(validation_report_sql_b, validation_report_csv_b, ['year', 'month'])

print(merged_df_b)

    count_sql  year  month  count_csv  count_match
0      272873  2014      1     272873         True
1      277086  2014      2     277086         True
2      272526  2014      3     272526         True
3      283228  2014      4     283228         True
4      295847  2014      5     295847         True
..        ...   ...    ...        ...          ...
89     391425  2021      7     391425         True
90     394156  2021      8     394156         True
91     379754  2021      9     379754         True
92     350748  2021     10     350748         True
93     246909  2021     11     246909         True

[94 rows x 5 columns]


### 8.4 - Validating 'death_obituaries': 

In [17]:
# SQL Query

sql_query_c = """
SELECT YEAR(DTOBITO) AS year, COUNT(*) AS count_sql
FROM dbo.death_obituaries
GROUP BY YEAR(DTOBITO)
ORDER BY year;
"""

validation_report_sql_c = execute_sql_query(sql_query_c, engine)

validation_report_csv_c = read_and_adjust_csv(output_report_path_c)

merged_df_c = merge_and_compare(validation_report_sql_c, validation_report_csv_c, ['year'])

print(merged_df_c)

   year  count_sql  month  count_csv  count_match
0  2014    1227039    NaN    1227039         True
1  2015    1264175    NaN    1264175         True
2  2016    1309774    NaN    1309774         True
3  2017    1312663    NaN    1312663         True
4  2018    1316719    NaN    1316719         True
5  2019    1349801    NaN    1349801         True
6  2020    1556824    NaN    1556824         True
7  2021    1832649    NaN    1832649         True


### 8.5 - Validating 'demographics': 

In [7]:
# SQL Query
sql_query_f = """
SELECT COUNT(*) AS total_count
FROM dbo.demographics
"""

# Execute the query and store the result in a DataFrame
validation_report_sql_f = execute_sql_query(sql_query_f, engine)

# Print the total number of rows counted
total_count = validation_report_sql_f.iloc[0]['total_count']
print(f"Total rows: {total_count}")

Total de linhas: 6225
