In [284]:
from pprint import pprint as pp
from tqdm import tqdm as tq
import psycopg2 as pg
import psycopg2.extras
import pandas as pd
import numpy as np
import json

data_path = 'H:/linewalks/' # 필요한 csv 파일의 경로 설정
df_concept = pd.read_csv(data_path + 'concept.csv', encoding = 'utf-8')
df_condition_occurrence = pd.read_csv(data_path + 'condition_occurrence.csv', encoding='utf-8')
df_death = pd.read_csv(data_path + 'death.csv',encoding='utf-8' )
df_drug_exposure = pd.read_csv(data_path + 'drug_exposure.csv', encoding='utf-8')
df_person = pd.read_csv(data_path + 'person.csv',encoding='utf-8')
df_visit_occurrence = pd.read_csv(data_path + 'visit_occurrence.csv',encoding='utf-8')



  interactivity=interactivity, compiler=compiler, result=result)


In [222]:
# 로컬의 postgreSql에 연결.
conn = pg.connect(host = 'localhost', user='(Username)', password='(password)',port='5432', dbname='(DBName)')
cur = conn.cursor()

In [275]:
# 각 삽입될 데이블 데이터의 조건에 맞게 table 생성. 
createPersonQ = """
    Drop Table If Exists person cascade;
    Create Table person (
        person_id integer Primary Key,
        gender_concept_id integer Not Null,
        year_of_birth integer Not Null,
        month_of_birth integer Not Null,
        day_of_birth integer Not Null,
        birth_datetime text,
        race_concept_id integer Not Null,
        ethnicity_concept_id integer Not Null,
        person_source_value text Not Null,
        gender_source_value text Not Null,
        gender_source_concept_id integer Not Null,
        race_source_value text Not Null,
        race_source_concept_id integer Not Null,
        ethnicity_source_value text Not Null,
        ethnicity_source_concept_id integer Not Null
    );
"""
cur.execute(createPersonQ)

createVisitOccurrenceQ = """
    Drop Table If Exists visit_occurrence cascade;
    Create Table visit_occurrence (
        visit_occurrence_id integer Primary Key,
        person_id integer References person,
        visit_concept_id integer Not Null,
        visit_start_date text Not Null,
        visit_start_datetime text Not Null,
        visit_end_date text Not Null,
        visit_end_datetime text Not Null,
        visit_type_concept_id integer Not Null,
        visit_source_value text Not Null,
        visit_source_concept_id integer Not Null,
        admitting_source_concept_id integer Not Null,
        discharge_to_concept_id integer Not Null,
        preceding_visit_occurrence_id integer
    );
"""
cur.execute(createVisitOccurrenceQ)

createDeathQ = """
    Drop Table If Exists death ;
    Create Table death (
        person_id integer References person,
        death_date text Not Null,
        death_type_concept_id integer Not Null,
        cause_concept_id integer Not Null,
        cause_source_value integer Not Null,
        cause_source_concept_id integer Not Null,
        Primary Key(person_id)
    );
"""
cur.execute(createDeathQ)

createConceptQ = """
    Drop Table If Exists concept;
    Create Table concept (
        concept_id integer Not Null,
        concept_name text,
        domain_id text Not Null,
        vocabulary_id text Not Null,
        concept_class_id text Not Null,
        standard_concept text,
        concept_code text,
        valid_start_date text Not Null,
        valid_end_date text Not Null,
        invalid_reason text,
        Constraint star_pk Primary Key(concept_id)
    );
"""
cur.execute(createConceptQ)

createConditionOccurrenceQ = """
    Drop Table If Exists condition_occurrence;
    Create Table condition_occurrence (
        condition_occurrence_id integer Primary Key,
        person_id integer References person,
        condition_concept_id integer Not Null,
        condition_start_date text Not Null,
        condition_start_datetime text Not Null,
        condition_end_date text,
        condition_end_datetime text,
        condition_type_concept_id integer Not Null,
        visit_occurrence_id integer References visit_occurrence,
        visit_detail_id integer Not Null,
        condition_source_value bigint Not Null,
        condition_source_concept_id integer Not Null,
        condition_status_concept_id integer Not Null
        );
"""
cur.execute(createConditionOccurrenceQ)

createDrugExposureQ = """
    Drop Table If Exists drug_exposure;
    Create Table drug_exposure (
        drug_exposure_id integer Primary Key,
        person_id integer References person,
        drug_concept_id integer Not Null,
        drug_exposure_start_date text Not Null,
        drug_exposure_start_datetime text Not Null,
        drug_exposure_end_date text Not Null,
        drug_exposure_end_datetime text Not Null,
        verbatim_end_date text,
        drug_type_concept_id integer Not Null,
        refills integer Not Null,
        quantity integer Not Null,
        days_supply integer Not Null,
        route_concept_id integer Not Null,
        lot_number integer Not Null,
        provider_id integer Not Null,
        visit_occurrence_id integer References visit_occurrence,
        visit_detail_id integer Not Null,
        drug_source_value integer Not Null,
        drug_source_concept_id integer Not Null
        );
"""
cur.execute(createDrugExposureQ)

conn.commit()

In [100]:
#테이블 생성이 잘 되었는지 확인. check if the tables is well created.
pd.read_sql("select * from pg_tables where tableowner = 'dongkey';", conn)

Unnamed: 0,schemaname,tablename,tableowner,tablespace,hasindexes,hasrules,hastriggers,rowsecurity
0,public,death,dongkey,,True,False,True,False
1,public,concept,dongkey,,True,False,False,False
2,public,person,dongkey,,True,False,True,False
3,public,visit_occurrence,dongkey,,True,False,True,False
4,public,condition_occurrence,dongkey,,True,False,True,False
5,public,drug_exposure,dongkey,,True,False,True,False


In [260]:
#자주 사용하게될 df와 table을 리스트에 별도로 저장. make list of Dataframe and table
df_list = [ 'df_person','df_visit_occurrence','df_concept','df_condition_occurrence','df_death','df_drug_exposure' ]
table_list = ['person','visit_occurrence','concept','condition_occurrence','death','drug_exposure']

In [212]:
#df에 NaN으로 가득찬 컬럼 (불필요한 컬럼)은 삭제하고, 중간중간에 들어있는 결측값은 None으로 변환. Delete NaN Column (which column's lenght == num of Nan row)
for df in df_list : 
    globals()[f'{df}'] = globals()[f'{df}'].drop(globals()[f'{df}'].columns[globals()[f'{df}'].isnull().sum() == len(globals()[f'{df}'])].to_list(), axis=1)
    globals()[f'{df}'] = globals()[f'{df}'].where(pd.notnull(globals()[f'{df}']), None) # Nan convert to None

In [276]:
#전역변수에 선언되어 있는 df(csv)의 데이터들을 table에 저장하는 코드.
for i in range(len(df_list)) : # loop for number of tables
    print('Preparing data : ' , df_list[i])
    print(globals()[f'{df_list[i]}'].shape)
    
    table = table_list[i]
    values = globals()[f'{df_list[i]}'].values.tolist()
    
    print('total data : ', len(values))
    print('executing inserting values\n')
    psycopg2.extras.execute_values(cur, f"""
        Insert Into {table} Values %s;
    """,values)
    
conn.commit()

Inserting data :  df_person
(1000, 15)
total data :  1000
executing inserting values
Inserting data :  df_visit_occurrence
(41810, 13)
total data :  41810
executing inserting values
Inserting data :  df_concept
(7403692, 10)
total data :  7403692
executing inserting values
Inserting data :  df_condition_occurrence
(12167, 13)
total data :  12167
executing inserting values
Inserting data :  df_death
(152, 6)
total data :  152
executing inserting values
Inserting data :  df_drug_exposure
(46579, 19)
total data :  46579
executing inserting values


In [277]:
#누락된 값 없이 모두 삽입되었는지 확인. check inserted data
pd.read_sql("""
    SELECT schemaname,relname,n_live_tup 
        FROM pg_stat_user_tables 
        ORDER BY n_live_tup DESC;
""",conn)

Unnamed: 0,schemaname,relname,n_live_tup
0,public,concept,7403692
1,public,drug_exposure,46579
2,public,visit_occurrence,41810
3,public,condition_occurrence,12167
4,public,person,1000
5,public,death,152


In [278]:
df_concept.shape

(7403692, 10)

In [279]:
df_drug_exposure.shape

(46579, 19)

In [280]:
df_visit_occurrence.shape

(41810, 13)

In [281]:
df_condition_occurrence.shape

(12167, 13)

In [282]:
df_person.shape

(1000, 15)

In [283]:
df_death.shape

(152, 6)

삽입한 데이터 개수와 각 csv파일의 데이터 개수 (row) 비교