In [1]:
import pandas as pd
import os
from dotenv import load_dotenv
import snowflake.connector

In [16]:
cleaned = pd.read_csv('./Data/cleaned_data.csv')

In [24]:
load_dotenv()

user = os.getenv("SNOWFLAKE_USER")
password = os.getenv("SNOWFLAKE_PASSWORD")
account = os.getenv("SNOWFLAKE_ACCOUNT")
warehouse = os.getenv("SNOWFLAKE_WAREHOUSE")
role = os.getenv("SNOWFLAKE_ROLE")
database = os.getenv("SNOWFLAKE_DATABASE")
schema = os.getenv("SNOWFLAKE_SCHEMA")
table = os.getenv("SNOWFLAKE_TABLE")

In [8]:
conn = snowflake.connector.connect(
    user=user,
    password=password,
    account=account,
    role=role
)

In [9]:
cursor = conn.cursor()

In [13]:
# Create Warehouse
cursor.execute(f"""
    CREATE OR REPLACE WAREHOUSE {warehouse}
    WITH WAREHOUSE_SIZE = 'XSMALL'
    AUTO_SUSPEND = 300
    AUTO_RESUME = TRUE
""")
print(f"Warehouse '{warehouse}' created.")

Warehouse 'LLM_WH' created.


In [14]:
# Create Database
cursor.execute(f"CREATE OR REPLACE DATABASE {database}")
print(f"Database '{database}' created.")

Database 'HOSPITAL_ANALYTICS_DB' created.


In [15]:
# Create Schema
cursor.execute(f"CREATE OR REPLACE SCHEMA {database}.{schema}")
print(f"Schema '{schema}' created in database '{database}'.")

Schema 'CLEANED_DATA' created in database 'HOSPITAL_ANALYTICS_DB'.


In [27]:
# Map pandas dtypes to SQL types
def map_dtype_to_sql(dtype):
    if pd.api.types.is_integer_dtype(dtype):
        return 'INT'
    elif pd.api.types.is_float_dtype(dtype):
        return 'FLOAT'
    elif pd.api.types.is_bool_dtype(dtype):
        return 'BOOLEAN'
    elif pd.api.types.is_datetime64_any_dtype(dtype):
        return 'DATETIME'
    else:
        return 'VARCHAR(255)'

# Generate CREATE TABLE SQL query without backticks
def generate_create_table_sql(df, table_name):
    sql_lines = []
    for col in df.columns:
        col_clean = col.strip().replace(" ", "_").replace("-", "_").replace("__", "_").lower()
        sql_type = map_dtype_to_sql(df[col].dtype)
        sql_lines.append(f"  {col_clean} {sql_type}")
    sql_body = ",\n".join(sql_lines)
    return f"CREATE TABLE {table_name} (\n{sql_body}\n);"

# Example usage
create_table_query = generate_create_table_sql(cleaned, table)
print(create_table_query)

CREATE TABLE hospital_data (
  hospital_service_area VARCHAR(255),
  hospital_county VARCHAR(255),
  permanent_facility_id FLOAT,
  facility_name VARCHAR(255),
  age_group VARCHAR(255),
  zip_code_3_digits VARCHAR(255),
  gender VARCHAR(255),
  race VARCHAR(255),
  ethnicity VARCHAR(255),
  length_of_stay VARCHAR(255),
  type_of_admission VARCHAR(255),
  patient_disposition VARCHAR(255),
  discharge_year INT,
  ccsr_diagnosis_description VARCHAR(255),
  apr_drg_description VARCHAR(255),
  apr_mdc_description VARCHAR(255),
  apr_severity_of_illness_description VARCHAR(255),
  apr_risk_of_mortality VARCHAR(255),
  apr_medical_surgical_description VARCHAR(255),
  payment_typology_1 VARCHAR(255),
  birth_weight INT,
  emergency_department_indicator VARCHAR(255),
  total_charges VARCHAR(255),
  total_costs VARCHAR(255),
  region VARCHAR(255)
);


In [28]:
cursor.execute(create_table_query)
print(f"Table {table} created or replaced.")

Table hospital_data created or replaced.


In [33]:
csv_file_path = "./Data/cleaned_data.csv"

# Upload file to the table stage
cursor.execute(f"PUT file://{csv_file_path} @%{table} AUTO_COMPRESS=TRUE")
print(f"File uploaded to table stage @{table}")

File uploaded to table stage @hospital_data


In [None]:
table = os.getenv("SNOWFLAKE_TABLE")
file_name = os.path.basename(csv_file_path)

cursor.execute(f"""
COPY INTO {table}
FROM @%{table}/{file_name}.gz
FILE_FORMAT = (
  TYPE = 'CSV'
  SKIP_HEADER = 1
  FIELD_OPTIONALLY_ENCLOSED_BY = '"'
)
ON_ERROR = 'ABORT_STATEMENT';
""")
print(f"Data loaded into '{table}' table.")

Data loaded into 'hospital_data' table.


In [36]:
cursor.close()
conn.close()