### **Python Data Engineering Project**
##### *-- Anh Vi Pham --*

### **Setup Database**

#### Connect Server

In [1]:
import pyodbc
import json
import pandas as pd 
import warnings
warnings.filterwarnings('ignore')

# database credentials
server = 'localhost,1433' 
database = 'master' 
driver = '{ODBC Driver 18 for SQL Server}'

# import keys from json file
with open("database_keys.json", "r") as file:
    config = json.load(file)
username = config["username"]
password = config["password"]

# establish connecttion

conn = pyodbc.connect(f'DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password};TrustServerCertificate=yes')
cursor = conn.cursor()

#### Create new database

In [2]:
def get_current_database(cursor=cursor):
    cursor.execute("SELECT DB_NAME() AS CurrentDatabase")
    current_db = cursor.fetchone()[0]
    return print(f"Currently connected to database: {current_db}")
get_current_database(cursor)

Currently connected to database: master


In [3]:
# drop database
conn.autocommit = True

# Drop the database if it exists
cursor.execute("IF EXISTS (SELECT * FROM sys.databases WHERE name = 'HospitalOperation') DROP DATABASE HospitalOperation")
print("Database 'HospitalOperation' dropped successfully if it existed.")

Database 'HospitalOperation' dropped successfully if it existed.


In [4]:
conn.autocommit = True
try:
    cursor.execute("CREATE DATABASE HospitalOperation")
    print("Database 'HospitalOperation' created successfully.")
except Exception as e:
    print(f"Error: {e}")

conn.autocommit = False

Database 'HospitalOperation' created successfully.


In [5]:
# close connection to "master" database
conn.close()

#### Connect to new database

In [6]:
new_database_name = 'HospitalOperation'
conn = pyodbc.connect(f'DRIVER={driver};SERVER={server};DATABASE={new_database_name};UID={username};PWD={password};TrustServerCertificate=yes')
cursor = conn.cursor()

get_current_database(cursor)

Currently connected to database: HospitalOperation


#### Load database configuration file from table_creation_code.txt

In [7]:
def load_sql_from_txt(file_name: str, mode='r'):
    try:
        commands = []
        with open(file_name, mode) as file:
            imported_commands = file.read().split(';')
            for com in imported_commands:
                commands.append(com.replace('\n', ' ').strip())
        return commands
    except Exception as e:
        conn.close()
        return print(f'Failed to read commands from {file_name}  - \n Error: {e} \n Connection closed')

def execute_list_of_sql_commands(command_list, cursor, conn):
    try:
        for command in command_list:
            if command: 
                cursor.execute(command)

        conn.commit()
        return print(f'Commands excuted successfully')

    except Exception as e:
        conn.close()
        return print(f'Failed to execute commands - \n Error: {e} \n Connection closed')


In [8]:
db_file = 'table_creation_code.txt'
table_creation_codes = load_sql_from_txt(db_file)

execute_list_of_sql_commands(command_list=table_creation_codes, cursor=cursor, conn=conn)

Commands excuted successfully


#### Load data from sample_dataset.txt

In [9]:
data_file = 'sample_dataset.txt'
data_codes = load_sql_from_txt(data_file)

execute_list_of_sql_commands(command_list=data_codes, cursor=cursor, conn=conn)

Commands excuted successfully


### **Data Cleaning**

In [10]:
get_current_database(cursor)

Currently connected to database: HospitalOperation


In [11]:
def query(sql_code, engine = conn):
    df_query = pd.read_sql(sql_code, engine)
    return df_query

#### Change data type: text to varchar(150)

In [12]:
def find_columns_by_data_type(data_type: str, conn):
    sql_code = f"""
    SELECT TABLE_NAME, COLUMN_NAME 
    FROM INFORMATION_SCHEMA.COLUMNS 
    WHERE DATA_TYPE = '{data_type}'
    """
    columns = query(sql_code, conn)
    return columns

data_type = 'text'
text_columns = find_columns_by_data_type(data_type, conn)
print(f"Columns with {data_type} data type:")
print(text_columns)

Columns with text data type:
      TABLE_NAME      COLUMN_NAME
0      Physician             Name
1      Physician         Position
2     Department             Name
3   NewProcedure             Name
4        Patient             Name
5        Patient          Address
6        Patient            Phone
7          Nurse             Name
8          Nurse         Position
9    Appointment  ExaminationRoom
10    Medication             Name
11    Medication            Brand
12    Medication      Description
13    Prescribes             Dose
14          Room             Type


In [13]:
def alter_text_to_varchar(conn, table_name, column_name, varchar_length=150):
    alter_sql = f"ALTER TABLE [{table_name}] ALTER COLUMN [{column_name}] VARCHAR({varchar_length})"
    cursor = conn.cursor()
    cursor.execute(alter_sql)
    conn.commit()
    print(f"Column '{column_name}' in table '{table_name}' changed to VARCHAR({varchar_length})")

# Execute the change for each identified column
for index, row in text_columns.iterrows():
    alter_text_to_varchar(conn, row['TABLE_NAME'], row['COLUMN_NAME'])


Column 'Name' in table 'Physician' changed to VARCHAR(150)
Column 'Position' in table 'Physician' changed to VARCHAR(150)
Column 'Name' in table 'Department' changed to VARCHAR(150)
Column 'Name' in table 'NewProcedure' changed to VARCHAR(150)
Column 'Name' in table 'Patient' changed to VARCHAR(150)
Column 'Address' in table 'Patient' changed to VARCHAR(150)
Column 'Phone' in table 'Patient' changed to VARCHAR(150)
Column 'Name' in table 'Nurse' changed to VARCHAR(150)
Column 'Position' in table 'Nurse' changed to VARCHAR(150)
Column 'ExaminationRoom' in table 'Appointment' changed to VARCHAR(150)
Column 'Name' in table 'Medication' changed to VARCHAR(150)
Column 'Brand' in table 'Medication' changed to VARCHAR(150)
Column 'Description' in table 'Medication' changed to VARCHAR(150)
Column 'Dose' in table 'Prescribes' changed to VARCHAR(150)
Column 'Type' in table 'Room' changed to VARCHAR(150)


#### Data report

In [None]:
li_tables = query(""" 
    SELECT table_name 
    FROM information_schema.tables
    WHERE table_type = 'BASE TABLE';""")['table_name'].to_list()

print('Number of tables:', len(li_tables))

Number of tables: 15


In [17]:
def get_primary_key_columns(table_name, conn):
    sql_code = f"""
    SELECT COLUMN_NAME 
    FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
    JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE AS CCU 
    ON TC.CONSTRAINT_NAME = CCU.CONSTRAINT_NAME
    WHERE TC.TABLE_NAME = '{table_name}' AND TC.CONSTRAINT_TYPE = 'PRIMARY KEY'
    """
    primary_key_columns = query(sql_code, conn)
    return primary_key_columns['COLUMN_NAME'].tolist()

def check_table_data(table_name, conn):
    # Get primary key columns
    primary_key_columns = get_primary_key_columns(table_name, conn)
    print(f"---------------------------------------------")
    print(f"COLUMN: {table_name}")
    print(f"PRIMARY KEY: {', '.join([f'[{col}]' for col in primary_key_columns]) if primary_key_columns else 'None'}")
    
    # Check for missing values in each column
    print("\n- Number of missing values")
    columns_query = f"SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = '{table_name}'"
    columns = query(columns_query, conn)['COLUMN_NAME'].tolist()
    
    for col in columns:
        null_query = f"SELECT COUNT(*) AS NullCount FROM {table_name} WHERE [{col}] IS NULL"
        null_count = query(null_query, conn).iloc[0, 0]
        if null_count > 0:
            print(f"  + [{col}]: {null_count}")
    
    # Check for duplicates in the entire table
    duplicate_query = f"""
    SELECT COUNT(*) AS TotalDuplicates 
    FROM (SELECT *, COUNT(*) OVER (PARTITION BY {', '.join([f'[{col}]' for col in columns])}) AS cnt FROM {table_name}) AS DupCheck
    WHERE cnt > 1
    """
    duplicate_count = query(duplicate_query, conn).iloc[0, 0]
    print(f"\n- Total number of duplicates in the entire columns: {duplicate_count}")

for table in li_tables:
    check_table_data(table, conn)


---------------------------------------------
COLUMN: Physician
PRIMARY KEY: [EmployeeID]

- Number of missing values

- Total number of duplicates in the entire columns: 0
---------------------------------------------
COLUMN: Department
PRIMARY KEY: [DepartmentID]

- Number of missing values

- Total number of duplicates in the entire columns: 0
---------------------------------------------
COLUMN: Affiliated_With
PRIMARY KEY: [Department], [Physician]

- Number of missing values

- Total number of duplicates in the entire columns: 0
---------------------------------------------
COLUMN: NewProcedure
PRIMARY KEY: [Code]

- Number of missing values

- Total number of duplicates in the entire columns: 0
---------------------------------------------
COLUMN: Trained_In
PRIMARY KEY: [Physician], [Treatment]

- Number of missing values

- Total number of duplicates in the entire columns: 0
---------------------------------------------
COLUMN: Patient
PRIMARY KEY: [SSN]

- Number of missing v

### **Close Connection**

In [18]:
conn.close()