# 0_num_of_patients.ipynb

In [None]:
import os
import sys
import json
import pathlib
sys.path.append("..")

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from datetime import timedelta
import traceback

current_dir = pathlib.Path.cwd()
parent_dir = current_dir.parent
with open(parent_dir.joinpath("config.json")) as file:
    cfg = json.load(file)

In [None]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

driver = cfg["dbms"]
db_cfg = cfg[driver]
username = db_cfg["@user"]
password = db_cfg["@password"]
host = db_cfg["@server"]
port = db_cfg["@port"]
database = db_cfg["@database"]
if cfg["dbms"] == "mssql":
    sqldriver = "mssql+pymssql"
elif cfg["dbms"] == "postgresql":
    sqldriver = "postgresql+psycopg2"
url = f"{sqldriver}://{username}:{password}@{host}:{port}/{database}"
engine = create_engine(url, echo=False)
sessionlocal = sessionmaker(autocommit=False, autoflush=True, bind=engine)

In [None]:
drug_order = ["Acetaminophen", "Vancomycin", "Naproxen", "Celecoxib", "Acyclovir"]

In [None]:
def executeQuerynCommit(engine, sql_query):
    """
        SQL 쿼리 실행 후 결과를 반환하는 함수
    """
    result = None
    try:
        with engine.connect() as conn:
            result = conn.execute(sql_query)
    except:
        traceback.print_exc()
    return result

In [None]:
def createTableAllDrugsPerson(engine, drug_order, psm=False):
    Query = """
    DROP TABLE IF EXISTS {@all_drugs_table};
    SELECT person_id, cohort_start_date, first_abnormal_date , gender_source_value, age, n_diff
    INTO {@all_drugs_table}
    FROM (
    SELECT *, 
    CASE WHEN n_diff IS NULL THEN 'False' ELSE 'True' END AS is_abnormal, 
    ROW_NUMBER() OVER (PARTITION BY p.person_id ORDER BY p.cohort_start_date) AS rn 
    FROM (SELECT * FROM {@drug1_table} 
        UNION SELECT * FROM {@drug2_table} 
        UNION SELECT * FROM {@drug3_table} 
        UNION SELECT * FROM {@drug4_table} 
        UNION SELECT * FROM {@drug5_table}) p 
    ) A
    WHERE A.rn = 1
    """
    sql_param_dict = {}
    sql_param_dict["@person_database_schema"] = db_cfg["@person_database_schema"]
    _psm = "_psm" if psm else ""
    sql_param_dict["@drug1_table"] = db_cfg["@person_database_schema"] + ".person_" + drug_order[0].lower() + _psm 
    sql_param_dict["@drug2_table"] = db_cfg["@person_database_schema"] + ".person_" + drug_order[1].lower() + _psm
    sql_param_dict["@drug3_table"] = db_cfg["@person_database_schema"] + ".person_" + drug_order[2].lower() + _psm
    sql_param_dict["@drug4_table"] = db_cfg["@person_database_schema"] + ".person_" + drug_order[3].lower() + _psm
    sql_param_dict["@drug5_table"] = db_cfg["@person_database_schema"] + ".person_" + drug_order[4].lower() + _psm
    sql_param_dict["@all_drugs_table"] = db_cfg["@person_database_schema"] + ".person_" + "all_drugs" + _psm
    Query = Query.format(**sql_param_dict)
    executeQuerynCommit(engine, Query)

# person_all_drugs table 생성
createTableAllDrugsPerson(engine, drug_order, psm=True) 
# person_all_drugs_psm table 생성
createTableAllDrugsPerson(engine, drug_order, psm=False)

In [None]:
drug_order = ["Acetaminophen", "Vancomycin", "Naproxen", "Celecoxib", "Acyclovir", "all_drugs"]

In [None]:
def executeQuerynfetchall(engine, sql_query):
    """
        SQL 쿼리 실행 후 결과를 반환하는 함수
    """
    result = None
    try:
        with engine.connect() as conn:
            result = conn.execute(sql_query)
            result = result.fetchall()
        # engine.commit()
    except:
        traceback.print_exc()
    return result

In [None]:
def get_result_of_cdm_num_of_patients(engine):
    SQL = """SELECT COUNT(DISTINCT p.person_id) as n_patients FROM {@cdm_database_schema}.person p;"""
    sql_param_dict = {}
    sql_param_dict["@cdm_database_schema"] = db_cfg["@cdm_database_schema"]
    query = SQL.format(**sql_param_dict)
    result = executeQuerynfetchall(engine, query)
    if not result:
        print(f"person table is not found in the database.")
        return dict()
    return result[0][0]

def get_result_of_num_patients(engine, drug_name, psm=False):
    SQL = """SELECT p.is_abnormal, COUNT(DISTINCT p.person_id) as n_patients
    FROM (SELECT *, CASE WHEN n_diff IS NULL THEN 'False' ELSE 'True' END as is_abnormal FROM {@person_database_schema}.{@target_person_table}) p
    GROUP BY p.is_abnormal;"""
    sql_param_dict = {}
    sql_param_dict["@person_database_schema"] = db_cfg["@person_database_schema"]
    sql_param_dict["@target_person_table"] = f"person_{drug_name}" if not psm else f"person_{drug_name}_psm"
    query = SQL.format(**sql_param_dict)
    result = executeQuerynfetchall(engine, query)
    if not result:
        print(f"{drug_name} is not found in the database.")
        return dict()
    return {i[0]:i[1] for i in result}

In [None]:
result_dir = current_dir.joinpath("result")
result_dir.mkdir(parents=True, exist_ok=True)

number_of_patients_dict = {}
number_of_patients_dict["n_total_patients"] = get_result_of_cdm_num_of_patients(engine)
for drug_name in tqdm(drug_order):
    drug_name = drug_name.lower()
    number_of_patients_dict[drug_name] = get_result_of_num_patients(engine, drug_name, psm=False)

with open(result_dir.joinpath("number_of_patients.json"), "w") as file:
    json.dump(number_of_patients_dict, file, indent=4)

In [None]:
""" person for after propensity score matching """
result_dir = current_dir.joinpath("result", "psm")
result_dir.mkdir(parents=True, exist_ok=True)

number_of_patients_dict = {}
number_of_patients_dict["n_total_patients"] = get_result_of_cdm_num_of_patients(engine)
for drug_name in tqdm(drug_order):
    drug_name = drug_name.lower()
    number_of_patients_dict[drug_name] = get_result_of_num_patients(engine, drug_name, psm=True)

with open(result_dir.joinpath("number_of_patients.json"), "w") as file:
    json.dump(number_of_patients_dict, file, indent=4)