In [65]:
"""
Python script to clean, process, and insert 
the Hospital Quality data into our SQL data tables.
"""
import numpy as np
import pandas as pd
import sys
import datetime
import psycopg
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

'\nPython script to clean, process, and insert \nthe Hospital Quality data into our SQL data tables.\n'

## 1. Read data

In [67]:
# Just use my own path?
path = "/Users/wangyicheng/Documents/GitHub/36614-Geese/Data/Quality/"

# info1 = pd.read_csv(path+sys.argv[2])
info1 = pd.read_csv(path+"Hospital_General_Information-2021-07.csv")

## 2. Preprocess data

In [68]:
# Replace 'Not Available' value to NaN
info1 = info1.replace('Not Available', np.nan)

In [69]:
## Insert date column as python date object
date = '2022-01-01'
# date = date.split('-')
# x = datetime.date(int(date[0]), int(date[1]), int(date[2]))
# date = x.strftime("%Y-%m-%d")
# type(date)

In [70]:
info1['Rating year'] = date

## 3. Load data into psql

In [71]:
# Connect to psql server
conn = psycopg.connect(
    host="sculptor.stat.cmu.edu", dbname="yicheng6",
    user="yicheng6", password="Oor4cah8p"
)
cur = conn.cursor()

#### 3.1 Hospital_Info(hospital_pk, name, address, city, state, zip_code, ownership, emergency)

CREATE TABLE Hospital_Info(\
	hospital_pk varchar(255) UNIQUE PRIMARY KEY,\
	name text, \
	address text,\
	city text,\
	state char(2),\
	zip_code varchar(5),\
      county text,\
	ownership text,\
	emergency boolean DEFAULT false);



In [72]:
# Create a seperate table containing useful columns
info_table = info1.loc[:,['Facility ID', 'Facility Name', 'Address', 'City',\
                          'State', 'ZIP Code', 'Hospital Ownership', 'Emergency Services']]

# Change the data type
info_table["Facility ID"] = info_table["Facility ID"].astype('string')
info_table["Facility Name"] = info_table["Facility Name"].astype('string')
info_table["Address"] = info_table["Address"].astype('string')
info_table["City"] = info_table["City"].astype('string')
info_table["State"] = info_table["State"].astype('string')
info_table["ZIP Code"] = info_table["ZIP Code"].astype('string')
info_table["Hospital Ownership"] = info_table["Hospital Ownership"].astype('string')
info_table["Emergency Services"] = info_table["Emergency Services"].astype('bool')
info_table.dtypes

Facility ID           string
Facility Name         string
Address               string
City                  string
State                 string
ZIP Code              string
Hospital Ownership    string
Emergency Services      bool
dtype: object

In [73]:
# Container to record insert failed row
key = ['Facility ID', 'Facility Name', 'Address', 'City',\
       'State', 'ZIP Code', 'Hospital Ownership', 'Emergency Services']
df_error = pd.DataFrame(columns=key)

In [None]:
num_rows_inserted = 0

# make a new transaction
with conn.transaction():
    
    for index, row in info_table.iterrows():
        try:
            # make a new SAVEPOINT -- like a save in a video game
            cur.execute("SAVEPOINT save1")
            with conn.transaction():  
                # now insert  (hospital_pk, rating_year, rating) into the data
                insert = ("INSERT INTO Hospital_Info "
                          "VALUES (%s, %s, %s, %s, %s, %s, %s, %s)")
                cur.execute(insert, tuple(row))
        except Exception as e:
            # if an exception/error happens in this block, Postgres goes back to
            # the last savepoint upon exiting the `with` block
            print("insert failed in row " + str(index))
            df_error = pd.concat([df_error, row])

            # add additional logging, error handling here
        else:
            # no exception happened, so we continue without reverting the savepoint
            num_rows_inserted += 1
            
    print('Inserted ' + str(num_rows_inserted) + ' rows for Hospital_Info table.')
    df_error.to_csv("Error_row_hospitalinfo.csv", index = False)

# now we commit the entire transaction
conn.commit()

#### 3.2 Rating(hospital_pk, rating_year, rating)

CREATE TABLE Rating(\
	hospital_pk varchar(255) REFERENCES Hospital_Info,\
	rating_year date CHECK (rating_year <= current_date),\
	rating int CHECK (rating >= 0));

In [75]:
# Create a seperate table containing useful columns
rate_table = info1.loc[:,["Facility ID","Hospital overall rating", "Rating year"]]
rate_table["Facility ID"] = rate_table["Facility ID"].astype('string')
rate_table["Hospital overall rating"] = rate_table["Hospital overall rating"].astype('Int64')
rate_table['Rating year'] = pd.to_datetime(rate_table['Rating year'], format="%Y-%m-%d")
rate_table.dtypes

Facility ID                        string
Hospital overall rating             Int64
Rating year                datetime64[ns]
dtype: object

In [76]:
# Container to record insert failed row
key = ["hospital_pk","Hospital overall rating", "Rating year"]
df_error = pd.DataFrame(columns=key)

In [84]:
num_rows_inserted = 0

# make a new transaction
with conn.transaction():
    
    for index, row in rate_table.iterrows():
        try:
            # make a new SAVEPOINT -- like a save in a video game
            cur.execute("SAVEPOINT save1")
            with conn.transaction():  
                # now insert  (hospital_pk, rating_year, rating) into the data
                insert = ("INSERT INTO Rating "
                          "VALUES (%(hospital_pk)s, %(rating)s, %(rating_year)s)"
                          "ON CONFLICT (hospital_pk) DO UPDATE "
                          "SET rating = %(rating)s, rating_year = %(rating_year)s")
                
                cur.execute(insert, {
                    "hospital_pk": row['Facility ID'],
                    "rating": row['Hospital overall rating'],
                    "rating_year": row['Rating year']
                })
        except Exception as e:
            # if an exception/error happens in this block, Postgres goes back to
            # the last savepoint upon exiting the `with` block
            print("insert failed in row " + str(index))
            df_error = pd.concat([df_error, row])

            # add additional logging, error handling here
        else:
            # no exception happened, so we continue without reverting the savepoint
            num_rows_inserted += 1

    print('Inserted ' + str(num_rows_inserted) + ' rows for Rating_Time table.')
    df_error.to_csv("Error_row_ratingtime.csv", index = False)

# now we commit the entire transaction
conn.commit()