## Use psycopg2 engine to extract charts events for patient data

This script connects to the patient database 'extumate' and extracts chart events for the labelled patients identified by the field, hadm_id, in the table 'sample_vents'.

The script utilizes the pandas chunksize argument in order to avoid memory issues.

Finally, the data is stored with using `pd.DataFrame.to_feather` so it can be stored for future processing.

In [1]:
import sys
sys.path.append("../extumate")

from sqlalchemy import create_engine
from sqlalchemy_utils import database_exists, create_database
import psycopg2
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import psutil
import os

from extumate.config import data_dir, extumate_engine_url

#### Set user-defined variables

In [2]:
data_dir

'../data/feathered/'

In [3]:
export_name = "sputem"
#feather_folder = "../data/feathered/"
export_path = data_dir+export_name
export_path


'../data/feathered/sodium'

#### Write sql query

In [4]:
sql_query = """
SELECT 
  chartevents.*,
  sample_vents.endtime, sample_vents.re_intub_class,sample_vents.time_on_vent
FROM
  chartevents
  INNER JOIN sample_vents ON chartevents.hadm_id = sample_vents.hadm_id
WHERE
  (
    (chartevents.itemid = 224369) OR
    (chartevents.itemid = 224370) OR 
    (chartevents.itemid = 224372) OR 
    (chartevents.itemid = 224373) 
  );
"""

#### print virtual memory available

In [5]:
svmem = psutil.virtual_memory()
print (svmem.available) #in bytes 

5686128640


#### print size of database we're pulling from

In [6]:
os.path.getsize('../data/raw/chartevents.csv') 

29184776616

#### figure out chunk size for pandas dataframe reading

In [7]:
df_sample = pd.read_csv('../data/raw/chartevents.csv', nrows=10)
df_sample_size = df_sample.memory_usage(index=True).sum()
my_chunk = (2000000000 / df_sample_size)/10
my_chunk = int(my_chunk//1) # we get the integer part
print (my_chunk)

215517


In [8]:
# Define a database name (we're using a dataset on births, so we'll call it birth_db)

# Set your postgres username/password, and connection specifics

username = 'postgres'

password = 'password'    # change this

host     = 'localhost'

port     = '5432'            # default port that postgres listens on

db_name  = 'extumate'

#db_name  = 'birth_db'

In [9]:
## 'engine' is a connection to a database
## Here, we're using postgres, but sqlalchemy can connect to other things too.
engine = create_engine( 'postgresql://{}:{}@{}:{}/{}'.format(username, password, host, port, db_name) )
print(engine.url)

postgresql://postgres:password@localhost:5432/extumate


#### Check engine is working by checking for 'sample_vents' table

In [10]:
engine.has_table('sample_vents')

True

#### Connect using psycopg2 connection and query the database. 

Joining 'chartevents' with the 'sample_vents' table on the field hadm_id (so only pulling from patients who were ventilated), before selecting the type of event using the chartevents.itemid speeds up extraction of this data.

In [11]:
# Connect to make queries using psycopg2
con = None
con = psycopg2.connect(database = db_name, user = username, host=host,password=password)

df_result = pd.read_sql_query(sql_query,con,chunksize=my_chunk)
df_result

<generator object SQLiteDatabase._query_iterator at 0x7f476a971c10>

In [12]:
concat_df = pd.concat(
    [chunk
    for chunk in df_result])

In [13]:
concat_df

Unnamed: 0,subject_id,hadm_id,stay_id,charttime,storetime,itemid,value,valuenum,valueuom,warning,endtime,re_intub_class,time_on_vent
0,10004235,24181354,30276431,2196-02-25 04:44:00,2196-02-25 05:55:00,220645,133,133,mEq/L,0,2196-02-27 16:28:00,0,71.60000
1,10004235,24181354,30276431,2196-02-24 23:34:00,2196-02-25 00:59:00,220645,131,131,mEq/L,1,2196-02-27 16:28:00,0,71.60000
2,10004235,24181354,30276431,2196-02-25 14:54:00,2196-02-25 16:11:00,220645,131,131,mEq/L,1,2196-02-27 16:28:00,0,71.60000
3,10004235,24181354,30276431,2196-02-25 21:45:00,2196-02-25 22:27:00,220645,129,129,mEq/L,1,2196-02-27 16:28:00,0,71.60000
4,10004235,24181354,30276431,2196-02-26 00:28:00,2196-02-26 01:33:00,220645,130,130,mEq/L,1,2196-02-27 16:28:00,0,71.60000
...,...,...,...,...,...,...,...,...,...,...,...,...,...
205634,19999068,21606769,31096823,2161-08-28 04:00:00,2161-08-28 04:53:00,220645,145,145,mEq/L,0,2161-08-28 13:36:00,0,70.01667
205635,19999068,21606769,31096823,2161-08-26 16:20:00,2161-08-26 17:17:00,220645,139,139,mEq/L,0,2161-08-28 13:36:00,0,70.01667
205636,19999068,21606769,31096823,2161-08-27 01:57:00,2161-08-27 02:54:00,220645,142,142,mEq/L,0,2161-08-28 13:36:00,0,70.01667
205637,19999068,21606769,31096823,2161-08-29 01:05:00,2161-08-29 02:01:00,220645,141,141,mEq/L,0,2161-08-28 13:36:00,0,70.01667


In [14]:
##concat_df.to_sql('pulseox', engine, if_exists='replace',chunksize=my_chunk) ### very, very slow!

#### Feather dataframe for future processing

In [15]:
concat_df.reset_index(inplace=True)

In [16]:
concat_df.to_feather(export_path)