In [40]:
import os
import pandas as pd
from sqlalchemy.sql import text
from sqlalchemy import create_engine


### **Setup database**

#### Connect to database: sys

In [41]:
def connect_database(database, username='root', password='rainscales2024', hostname='localhost', port='3306'):
    connection_string = f"mysql+mysqlconnector://{username}:{password}@{hostname}:{port}/{database}"
    engine = create_engine(connection_string)
    with engine.connect() as connection:
        print("Connection successful")
    return engine

engine = connect_database(database='sys')

Connection successful


#### Create new database: hospital_operation

In [42]:
database_hospital_operation = 'HospitalOperation'

In [43]:
def drop_database_if_exists(engine, database_name):
    with engine.connect() as connection:
        connection.execute(text(f"DROP DATABASE IF EXISTS {database_name}"))
        print(f"Database '{database_name}' dropped successfully")

drop_database_if_exists(engine=engine, database_name=database_hospital_operation)

Database 'HospitalOperation' dropped successfully


In [44]:
def create_database(engine, database_name):
    with engine.connect() as connection:
        connection.execute(text(f"CREATE DATABASE IF NOT EXISTS {database_name}"))
        print(f"Database '{database_name}' created successfully")
    
create_database(engine=engine, database_name=database_hospital_operation)

Database 'HospitalOperation' created successfully


#### Connect to database: hospital_operation

In [45]:
engine = connect_database(database=database_hospital_operation)

Connection successful


#### Load flat files into database in tables

In [46]:
def load_flat_files_to_database(engine, folder_path, file_type):
    """
    Loads all CSV files from a folder into tables in the database.
    Each CSV file name will be used as the table name.
    """
    for file_name in os.listdir(folder_path):
        if file_name.endswith(f'.{file_type}'):
            table_name = os.path.splitext(file_name)[0]
            file_path = os.path.join(folder_path, file_name)
            df = pd.read_csv(file_path)
            df.to_sql(table_name, con=engine, if_exists='replace', index=False)
            print(f"Data from '{file_name}' loaded into table '{table_name}'")

# get path of the folder and the data_source
def get_path_relative_to_current_file(folder_name):
    current_dir = os.getcwd()
    target_path = os.path.join(current_dir, folder_name)
    return target_path

path_data_source = get_path_relative_to_current_file(folder_name = 'data_source')

# load data into database
load_flat_files_to_database(engine=engine, folder_path=path_data_source, file_type='csv')

Data from 'providers.csv' loaded into table 'providers'
Data from 'departments.csv' loaded into table 'departments'
Data from 'insurance.csv' loaded into table 'insurance'
Data from 'procedures.csv' loaded into table 'procedures'
Data from 'cities.csv' loaded into table 'cities'
Data from 'patients.csv' loaded into table 'patients'
Data from 'visits.csv' loaded into table 'visits'
Data from 'diagnoses.csv' loaded into table 'diagnoses'


#### Load SQL scripts and execute

In [47]:
def load_sql_commands_from_file(file_path: str, mode='r'):
    """
    Reads a SQL file, splits the commands by semicolon, and returns a list of cleaned commands.
    """
    with open(file_path, mode) as file:
        commands = [
            command.replace('\n', ' ').strip()  # Replace newlines and strip whitespace
            for command in file.read().split(';')
            if command.strip()  # Ensure non-empty commands are included
        ]
    print('SQL commands loaded to list')
    return commands


path_sql_script = get_path_relative_to_current_file('sql_script/create_contraints.sql')

list_command_contrains = load_sql_commands_from_file(path_sql_script)

SQL commands loaded to list


In [48]:
def execute_sql_commands(engine, command_list):
    """
    Executes a list of SQL commands using SQLAlchemy engine.
    """
    with engine.connect() as connection:
        for command in command_list:
            if command:  # Skip empty commands
                connection.execute(text(command))
        print("Commands executed successfully")

execute_sql_commands(engine=engine, command_list = list_command_contrains)

Commands executed successfully
