## `Defining Connections - OLTP to OLAP`

In [1]:
import psycopg2
from psycopg2.extras import execute_values

In [2]:
user = "postgres"          
password = "<PASSWORD>"   

source_conn = psycopg2.connect(
    host="localhost",
    database="hospital_oltp_db",
    user=user,
    password=password)

dest_conn = psycopg2.connect(
    host="localhost",
    database="hospital_olap_db",
    user=user,
    password=password)

In [3]:
dest_cur = dest_conn.cursor()

## `Creating OLAP Tables`

In [4]:
# Table Creation Queries

sql_table_PatientProcedureDim = \
"""
CREATE TABLE IF NOT EXISTS PatientProcedureDim (
    pr_id varchar(20) PRIMARY KEY,
    name varchar(150) NOT NULL,
    type varchar(50),
    price numeric
);
"""

sql_table_EmployeeDim = \
"""
CREATE TABLE IF NOT EXISTS EmployeeDim (
    emp_id varchar(30) PRIMARY KEY,
    emp_name varchar(150) NOT NULL,
    emp_dob DATE,
    emp_date_entry DATE NOT NULL,
    building char(1) NOT NULL,
    emp_salary Numeric,
    section varchar(100) NOT NULL,
    specialty varchar(300) NOT NULL
);
"""

sql_table_PatientDim = \
"""
CREATE TABLE IF NOT EXISTS PatientDim (
    p_id varchar(30) PRIMARY KEY,
    name varchar(150) NOT NULL,
    sex varchar(50) NOT NULL,
    race_group varchar(50),
    age_group varchar(50) NOT NULL,
    language varchar(50) NOT NULL,
    payer_source varchar(50) NOT NULL,
    disposition varchar(150) NOT NULL,
    social_security_num char(11),
    date_of_birth date NOT NULL,
    credit_card_number varchar(30),
    address varchar(500),
    state varchar(100),
    city varchar(150)
);
"""

sql_table_HealthFacilityDim = \
"""
CREATE TABLE IF NOT EXISTS HealthFacilityDim (
    location_id int NOT NULL,
    oshpd_id2 int NOT NULL,
    name varchar(250),
    city varchar(150) NOT NULL,
    county varchar(250),
    address varchar(500) NOT NULL,
    zip_code VARCHAR(15),
    license_type varchar(50) NOT NULL,
    PRIMARY KEY (location_id, oshpd_id2)
);
"""

sql_table_HealthFacilityAdditionalDim = \
"""
CREATE TABLE IF NOT EXISTS HealthFacilityAdditionalDim (
    h_additional_id_loc int,
    h_additional_id_fac int,
    oshpd_id int,
    congressional_district_num int,
    senate_district_num int,
    assembly_district_num int,
    control_type_desc varchar(250) NOT NULL,
    control_type_category_desc varchar(100) NOT NULL,
    mssa_name VARCHAR(200),
    mssa_designation varchar(50),
    PRIMARY KEY(h_additional_id_loc, h_additional_id_fac),
    
    FOREIGN KEY (h_additional_id_loc, h_additional_id_fac)
      REFERENCES HealthFacilityDim (location_id, oshpd_id2)
);
"""

sql_table_VisitFact = \
"""
CREATE TABLE IF NOT EXISTS VisitFact (
    visit_id varchar(35) PRIMARY KEY,
    patient_dim_id varchar(30) NOT NULL,
    procedure_dim_id varchar(20) NOT NULL,
    health_facility_dim_id_fac int NOT NULL,
    health_facility_dim_id_loc int NOT NULL,
    visit_date date NOT NULL,
    employee_dim_id varchar(30) NOT NULL,
    payment Numeric,
    rating int NOT NULL,
    
    FOREIGN KEY (patient_dim_id)
      REFERENCES PatientDim (p_id),
    FOREIGN KEY (procedure_dim_id)
      REFERENCES PatientProcedureDim (pr_id),
    FOREIGN KEY (health_facility_dim_id_loc, health_facility_dim_id_fac)
      REFERENCES HealthFacilityDim (location_id, oshpd_id2),
    FOREIGN KEY (employee_dim_id)
      REFERENCES EmployeeDim (emp_id)
    
);
"""

In [5]:
# Executing Queries
dest_cur.execute(sql_table_PatientProcedureDim)
dest_cur.execute(sql_table_EmployeeDim)
dest_cur.execute(sql_table_PatientDim)
dest_cur.execute(sql_table_HealthFacilityDim)
dest_cur.execute(sql_table_HealthFacilityAdditionalDim)
dest_cur.execute(sql_table_VisitFact)

## `Inserting data - Upsert`

In [6]:
# Queries for getting data sources from OLTP system

sql_select_source_PatientProcedureDim = \
"""
SELECT pp.pr_id,
       pp.name,
       pp.type,
       pp.price
FROM PatientProcedures pp;
"""

sql_select_source_EmployeeDim = \
"""
SELECT e.emp_id,
       e.emp_name,
       e.emp_dob,
       e.emp_date_entry,
       e.building,
       e.emp_salary,
       e.section,
       e.specialty
FROM Employees e;
"""

sql_select_source_PatientDim = \
"""
SELECT p.p_id,
       p.name,
       pc.sex,
       pc.race_group,
       pc.age_group,
       pc.language,
       pc.payer_source,
       pc.disposition,
       p.social_security_num,
       p.date_of_birth,
       p.credit_card_number,
       p.address,
       p.state,
       p.city
FROM Patients p
INNER JOIN PatientCharacteristics pc ON p.p_chid=pc.ch_id;
"""

sql_select_source_HealthFacilityDim = \
"""
SELECT 
       l.location_id,
       hf.h_id as oshpd_id2,
       hf.name,
       l.city,
       l.county,
       l.address,
       l.zip_code,
       hf.license_type
FROM HealthFacilities hf
INNER JOIN HealthFacilityLocations hfl ON hf.h_id=hfl.h_id
INNER JOIN Locations l ON hfl.location_id=l.location_id; 
"""


sql_select_source_HealthFacilityAdditionalDim = \
"""
SELECT l.location_id as h_additional_id_loc,
       hf.h_id as h_additional_id_fac,       
       hf.oshpd_id,
       l.congressional_district_num,
       l.senate_district_num,
       l.assembly_district_num,
       hf.control_type_desc,
       hf.control_type_category_desc,
       hf.mssa_name,
       hf.mssa_designation
FROM HealthFacilities hf
INNER JOIN HealthFacilityLocations hfl ON hf.h_id=hfl.h_id
INNER JOIN Locations l ON hfl.location_id=l.location_id; 
"""

sql_select_source_VisitFact = \
"""
SELECT pv.visit_id,
       pv.visit_pid as patient_dim_id,
       pv.procedure as procedure_dim_id,
       hf.h_id as health_facility_dim_id_fac,
       l.location_id as health_facility_dim_id_loc,
       pv.visit_date,
       pv.procedure_by as employee_dim,
       pv.payment,
       pv.rating
       
FROM HealthFacilities hf
INNER JOIN HealthFacilityLocations hfl ON hf.h_id=hfl.h_id
INNER JOIN Locations l ON hfl.location_id=l.location_id
INNER JOIN PatientVisits pv ON hf.h_id=pv.visit_hid; 
"""

In [10]:
# batch loading into postgresql olap system

SERVER_PAGINATION_LIMIT = 20000
CLIENT_PAGINATION_LIMIT = 10000

# table - PatientProcedureDim 
with source_conn.cursor(name='PatientProcedureDim_insert_cursor') as cur:
    print("inserting data into table - PatientProcedureDim", end='\n\n')
    cur.itersize = SERVER_PAGINATION_LIMIT
    cur.execute(sql_select_source_PatientProcedureDim)
    while rows := cur.fetchmany(CLIENT_PAGINATION_LIMIT):
        if not rows:
            break
        
        execute_values(dest_cur, '''INSERT INTO PatientProcedureDim VALUES %s 
                                    ON CONFLICT (pr_id)
                                    DO UPDATE SET
                                        name = EXCLUDED.name,
                                        type = EXCLUDED.type,
                                        price = EXCLUDED.price;
                                        ''', rows)


# table - EmployeeDim 
with source_conn.cursor(name='EmployeeDim_insert_cursor') as cur:
    print("inserting data into table - EmployeeDim", end='\n\n')
    cur.itersize = SERVER_PAGINATION_LIMIT
    cur.execute(sql_select_source_EmployeeDim)
    while rows := cur.fetchmany(CLIENT_PAGINATION_LIMIT):
        if not rows:
            break
        
        execute_values(dest_cur, '''INSERT INTO EmployeeDim VALUES %s    
                                    ON CONFLICT (emp_id)
                                    DO UPDATE SET
                                        emp_name = EXCLUDED.emp_name,
                                        emp_dob = EXCLUDED.emp_dob,
                                        emp_date_entry = EXCLUDED.emp_date_entry,
                                        building = EXCLUDED.building,
                                        emp_salary = EXCLUDED.emp_salary,
                                        section = EXCLUDED.section,
                                        specialty = EXCLUDED.specialty;''', rows)
        
        
# table - PatientDim 
with source_conn.cursor(name='PatientDim_insert_cursor') as cur:
    print("inserting data into table - PatientDim", end='\n\n')
    cur.itersize = SERVER_PAGINATION_LIMIT
    cur.execute(sql_select_source_PatientDim)
    while rows := cur.fetchmany(CLIENT_PAGINATION_LIMIT):
        if not rows:
            break
        
        execute_values(dest_cur, '''INSERT INTO PatientDim VALUES %s
                                    ON CONFLICT (p_id)
                                    DO UPDATE SET
                                        name = EXCLUDED.name,
                                        sex = EXCLUDED.sex,
                                        race_group = EXCLUDED.race_group,
                                        age_group = EXCLUDED.age_group,
                                        language = EXCLUDED.language,
                                        payer_source = EXCLUDED.payer_source,
                                        disposition = EXCLUDED.disposition,
                                        social_security_num = EXCLUDED.social_security_num,
                                        date_of_birth = EXCLUDED.date_of_birth,
                                        credit_card_number = EXCLUDED.credit_card_number,
                                        address = EXCLUDED.address,
                                        state = EXCLUDED.state,
                                        city = EXCLUDED.city;''', rows)

        
# table - HealthFacilityDim 
with source_conn.cursor(name='HealthFacilityDim_insert_cursor') as cur:
    print("inserting data into table - HealthFacilityDim", end='\n\n')
    cur.itersize = SERVER_PAGINATION_LIMIT
    cur.execute(sql_select_source_HealthFacilityDim)
    while rows := cur.fetchmany(CLIENT_PAGINATION_LIMIT):
        if not rows:
            break
        
        execute_values(dest_cur, '''INSERT INTO HealthFacilityDim VALUES %s
                                    ON CONFLICT (location_id, oshpd_id2)
                                    DO UPDATE SET
                                        name = EXCLUDED.name,
                                        city = EXCLUDED.city,
                                        county = EXCLUDED.county,
                                        address = EXCLUDED.address,
                                        zip_code = EXCLUDED.zip_code,
                                        license_type = EXCLUDED.license_type;''', rows)
        
        
# table - HealthFacilityAdditionalDim 
with source_conn.cursor(name='HealthFacilityAdditionalDim_insert_cursor') as cur:
    print("inserting data into table - HealthFacilityAdditionalDim", end='\n\n')
    cur.itersize = SERVER_PAGINATION_LIMIT
    cur.execute(sql_select_source_HealthFacilityAdditionalDim)
    while rows := cur.fetchmany(CLIENT_PAGINATION_LIMIT):
        if not rows:
            break
        
        execute_values(dest_cur, '''INSERT INTO HealthFacilityAdditionalDim VALUES %s
                                    ON CONFLICT (h_additional_id_loc, h_additional_id_fac)
                                    DO UPDATE SET
                                        oshpd_id = EXCLUDED.oshpd_id,
                                        congressional_district_num = EXCLUDED.congressional_district_num,
                                        senate_district_num = EXCLUDED.senate_district_num,
                                        assembly_district_num = EXCLUDED.assembly_district_num,
                                        control_type_desc = EXCLUDED.control_type_desc,
                                        control_type_category_desc = EXCLUDED.control_type_category_desc,
                                        mssa_name = EXCLUDED.mssa_name,
                                        mssa_designation = EXCLUDED.mssa_designation;''', rows)
 


# table - VisitFact 
with source_conn.cursor(name='VisitFact_insert_cursor') as cur:
    print("inserting data into table - VisitFact", end='\n\n')
    cur.itersize = SERVER_PAGINATION_LIMIT
    cur.execute(sql_select_source_VisitFact)
    while rows := cur.fetchmany(CLIENT_PAGINATION_LIMIT):
        if not rows:
            break
        
        execute_values(dest_cur, '''INSERT INTO VisitFact VALUES %s
                                    ON CONFLICT (visit_id)
                                    DO UPDATE SET
                                        patient_dim_id = EXCLUDED.patient_dim_id,
                                        procedure_dim_id = EXCLUDED.procedure_dim_id,
                                        health_facility_dim_id_fac = EXCLUDED.health_facility_dim_id_fac,
                                        health_facility_dim_id_loc = EXCLUDED.health_facility_dim_id_loc,
                                        visit_date = EXCLUDED.visit_date,
                                        employee_dim_id = EXCLUDED.employee_dim_id,
                                        payment = EXCLUDED.payment,
                                        rating = EXCLUDED.rating;''', rows)

inserting data into table - PatientProcedureDim

inserting data into table - EmployeeDim

inserting data into table - PatientDim

inserting data into table - HealthFacilityDim

inserting data into table - HealthFacilityAdditionalDim

inserting data into table - VisitFact



In [13]:
# commit and close changes
dest_cur.close()
dest_conn.commit()

source_conn.close()

<hr>

**`Links for proposed code solutions:`**
- https://rizwanbutt314.medium.com/efficient-way-to-read-large-postgresql-table-with-python-934d3edfdcc
- https://medium.com/dev-bits/understanding-postgresql-cursors-with-python-ebc3da591fe7
- https://towardsdatascience.com/postgresql-how-to-upsert-safely-easily-and-fast-246040514933