In [44]:
import pandas as pd
import boto3
from io import BytesIO
import gzip
import psycopg2
from datetime import datetime
from dateutil.parser import parse
import logging
import credss

In [45]:
# Read the CSV 
adm_df = pd.read_csv('Admission Data.csv')
mort_df = pd.read_csv('Mortality Data.csv')
poll_df = pd.read_csv('Pollution Data.csv')

## Print the resulting DataFrame
# print(adm_df.head(50))
# print(mort_df.head(50))
# print(poll_df.head(50))

## Check for missing values and drop them

# adm_df.isnull().sum()

# mort_df.isnull().sum()

# poll_df.isnull().sum()


adm_df.fillna(0, inplace=True)


poll_df = poll_df.dropna()



# # Identify duplicate rows
# duplicateRows = adm_df[adm_df.duplicated()]
# print(duplicateRows)
# duplicateRows = mort_df[adm_df.duplicated()]
# print(duplicateRows)
# duplicateRows = adm_df[adm_df.duplicated()]
# print(duplicateRows)



# Changing column to date format


def time_format(df: pd.DataFrame, col: str) -> pd.DataFrame:
    def parse_with_dayfirst(date_str):
        if pd.isna(date_str):
            return pd.NaT
        try:
            # Try parsing with month first (default behavior)
            return parse(date_str)
        except ValueError:
            try:
                # If month first fails, try with day first
                return parse(date_str, dayfirst=True)
            except ValueError:
                # If both fail, return NaT
                return pd.NaT

    # Apply the parsing function and ensure the result is datetime
    df[col] = pd.to_datetime(df[col].apply(parse_with_dayfirst), errors='coerce')

    # Identify rows with invalid dates
    invalid_dates = df[df[col].isna()]
    if not invalid_dates.empty:
        print(f"Warning: {len(invalid_dates)} rows have invalid dates in column {col}.")
        # print(invalid_dates)

    # Check if all values are NaT
    if df[col].isna().all():
        # print(f"Error: All values in column {col} are invalid dates.")
        
        # 1. Fill with a default date
        df[col] = pd.to_datetime('2000-01-01')
        # 2. Fill with the current date
        # df[col] = pd.to_datetime(datetime.now().date())
        # 3. Or just return the DataFrame with NaT values
        return df

    # Convert datetime values to strings in the desired format
    df[col] = df[col].dt.strftime('%Y-%m-%d')

    return df




adm_df = time_format(adm_df, 'D.O.A')
adm_df = time_format(adm_df, 'D.O.D')
adm_df = adm_df.dropna()

adm_values_to_drop = [3253, 4053, 4357, 4572, 4622, 10165]
adm_df = adm_df.drop(adm_df[adm_df['SNO'].isin(adm_values_to_drop)].index)

# # print(adm_df.head(50))

mort_df = time_format(mort_df, 'DATE OF BROUGHT DEAD')

# # print(mort_df.head(50))

poll_df = time_format(poll_df, 'DATE')
poll_values_to_drop = ['2018-11-16', '2019-01-23']
poll_df = poll_df.drop(poll_df[poll_df['DATE'].isin(poll_values_to_drop)].index)
# # print(poll_df.head(50))




In [46]:
adm_df.isnull().sum()

# mort_df.isnull().sum()

# poll_df.isnull().sum()



SNO                                0
MRD No.                            0
D.O.A                              0
D.O.D                              0
AGE                                0
GENDER                             0
RURAL                              0
TYPE OF ADMISSION-EMERGENCY/OPD    0
month year                         0
DURATION OF STAY                   0
duration of intensive unit stay    0
OUTCOME                            0
SMOKING                            0
ALCOHOL                            0
DM                                 0
HTN                                0
CAD                                0
PRIOR CMP                          0
CKD                                0
HB                                 0
TLC                                0
PLATELETS                          0
GLUCOSE                            0
UREA                               0
CREATININE                         0
BNP                                0
RAISED CARDIAC ENZYMES             0
E

In [47]:
# Preprocessed file

adm_df.to_csv('admission_data.csv', index=False, header=True, sep=',')

mort_df.to_csv('mortality_data.csv', index=False, header=True, sep=',')

poll_df.to_csv('pollution_data.csv', index=False, header=True, sep=',')

In [48]:
# Persist data in S3 bucket
s3 = boto3.client('s3')
bucket_name = credss.bucket_name
file_name = ["admission_data.csv", "mortality_data.csv", "pollution_data.csv"]

for local_file in file_name:
    s3.upload_file(local_file, bucket_name, local_file)
    print(f"Uploaded {local_file} to {bucket_name}")


Uploaded admission_data.csv to hospital-admiss
Uploaded mortality_data.csv to hospital-admiss
Uploaded pollution_data.csv to hospital-admiss


In [21]:
# # Getting column names to build table

# column_names = list(adm_df.columns)
# for column in column_names:
#     print(column)

# column_names = list(mort_df.columns)
# for column in column_names:
#     print(column)

# column_names = list(poll_df.columns)
# for column in column_names:
#     print(column)

In [49]:
# Redshift Connection
try:
    con = psycopg2.connect(dbname=credss.dbname,
                        host=credss.host,
                        port=credss.port,
                        user=credss.user,
                        password=credss.password)
    logging.info('Redshift connection succeeded')
except Exception as e:
    logging.exception(e)

In [50]:
con.autocommit = True
cur = con.cursor()

In [51]:
# Load data to redshift
def load_data_to_redshift(schema_name, table_name, s3_location):
    """
    Loads data from an S3 location into a Redshift table.

    Args:
        schema_name (str): Name of the Redshift schema.
        table_name (str): Name of the target table.
        s3_location (str): S3 location of the CSV file.

    Returns:
        None
    """
    try:
        copy_query = f"""
            COPY {schema_name}.{table_name}
            FROM '{s3_location}'
            IAM_ROLE {credss.IAM_ROLE}
            CSV
            IGNOREHEADER 1;
        """
        cur.execute(copy_query)
        print(f"Data loaded into {schema_name}.{table_name} successfully.")
    except psycopg2.Error as e:
        print(f"Error loading data into {schema_name}.{table_name}: {e}")


schema_name = 'hospital_data'
load_data_to_redshift(schema_name, 'admission_data', credss.s3_admin)
load_data_to_redshift(schema_name, 'mortality_data', credss.s3_hosp)
load_data_to_redshift(schema_name, 'pollution_data', credss.s3_poll)



Error loading data into hospital_data.admission_data: syntax error at or near "arn"
LINE 4:             IAM_ROLE arn:aws:iam::975050149002:role/service-...
                             ^

Error loading data into hospital_data.mortality_data: syntax error at or near "arn"
LINE 4:             IAM_ROLE arn:aws:iam::975050149002:role/service-...
                             ^

Error loading data into hospital_data.pollution_data: syntax error at or near "arn"
LINE 4:             IAM_ROLE arn:aws:iam::975050149002:role/service-...
                             ^

