# Export COVID-19 cohort from OMOP
This notebook exports COVID-19 patient data from an OMOP database in patient medical coding sequence format.

### Notes
1. Update sql_config for the proper connection and desired database
2. Update data_dir as the desired output data directory 
3. Create the COVID cohorts with SQL queries
   1. COVID_inpatient_cohort.sql
   2. Update the COVID cohort table name in queries, e.g., `covid19_table_name`  
4. Run

In [None]:
import pyodbc
import getpass
import pandas as pd
from os import path
from matplotlib import pyplot as plt
import time
import concurrent.futures

In [None]:
dir_data = '/path/to/data/dir'
file_concepts = path.join(dir_data, 'concepts.csv')
file_persons = path.join(dir_data, 'persons_covid19.csv')
file_events = path.join(dir_data, 'events_covid19.csv')

# SQL server config
sql_config = {
    'driver': 'ODBC Driver 17 for SQL Server',
    'server': 'sql.server.host',
    'database': 'database_name',
    'uid': 'user_name'
}

In [None]:
pwd=getpass.getpass()

In [None]:
conn = pyodbc.connect(**sql_config, pwd=pwd)
cursor = conn.cursor()

# Get patient data

### Patient data extraction without data cleaning (default)

In [None]:
sql = """SELECT p.person_id, gender_concept_id, CONVERT(DATE, birth_datetime) AS birth_date, race_concept_id, ethnicity_concept_id 
    FROM 
        (SELECT DISTINCT person_id
        FROM covid19_table_name) r
    JOIN person p ON r.person_id = p.person_id"""
df_persons = pd.read_sql(sql, conn)
df_persons.to_csv(file_persons, sep='\t', na_rep='NULL', header=True, index=False)
n_persons = len(df_persons.index)

## Get sequence for each patient

In [None]:
def process_patient(row):
    sql = """SELECT x.start_date, STRING_AGG(x.concept_id, ',') AS concept_ids
    FROM 
        (
        (SELECT co.condition_concept_id AS concept_id, co.condition_start_date AS start_date
        FROM dbo.condition_occurrence co
        JOIN concept c ON co.condition_concept_id = c.concept_id
        JOIN visit_occurrence v ON co.visit_occurrence_id = v.visit_occurrence_id
        WHERE co.person_id = ? AND co.condition_concept_id != 0 
            AND co.condition_start_date >= ? AND c.domain_id = 'Condition' 
            AND (DATEDIFF(DAY, co.condition_start_date, v.visit_start_date) <= 7
                AND (v.visit_end_date IS NULL OR 
                    DATEDIFF(DAY, co.condition_start_date, v.visit_end_date) >= -7))
        GROUP BY co.condition_concept_id, co.condition_start_date)
        UNION ALL
        (SELECT do.drug_concept_id AS concept_id, do.drug_exposure_start_date AS start_date
        FROM dbo.drug_exposure do
        JOIN concept c ON do.drug_concept_id = c.concept_id
        JOIN visit_occurrence v ON do.visit_occurrence_id = v.visit_occurrence_id
        WHERE do.person_id = ? AND do.drug_concept_id != 0
            AND do.drug_exposure_start_date >= ? AND c.domain_id = 'Drug' 
            AND (DATEDIFF(DAY, do.drug_exposure_start_date, v.visit_start_date) <= 7
                -- no upper bound on drug exposure start date
                )
        GROUP BY do.drug_concept_id, do.drug_exposure_start_date)
        UNION ALL
        (SELECT po.procedure_concept_id AS concept_id, po.procedure_date AS start_date
        FROM dbo.procedure_occurrence po
        JOIN concept c ON po.procedure_concept_id = c.concept_id
        JOIN visit_occurrence v ON po.visit_occurrence_id = v.visit_occurrence_id
        WHERE po.person_id = ? AND po.procedure_concept_id != 0
            AND po.procedure_date >= ? AND c.domain_id = 'Procedure' 
            AND (DATEDIFF(DAY, po.procedure_date, v.visit_start_date) <= 0
                AND (v.visit_end_date IS NULL OR 
                    DATEDIFF(DAY, po.procedure_date, v.visit_end_date) >= 0))
        GROUP BY po.procedure_concept_id, po.procedure_date)
        UNION ALL
        (SELECT m.measurement_concept_id AS concept_id, m.measurement_date AS start_date
        FROM dbo.measurement m
        JOIN concept c ON m.measurement_concept_id = c.concept_id
        JOIN visit_occurrence v ON m.visit_occurrence_id = v.visit_occurrence_id
        WHERE m.person_id = ? AND m.measurement_concept_id != 0
            AND m.measurement_date >= ? AND c.domain_id = 'Measurement' 
            AND (DATEDIFF(DAY, m.measurement_date, v.visit_start_date) <= 7
                AND (v.visit_end_date IS NULL OR 
                    DATEDIFF(DAY, m.measurement_date, v.visit_end_date) >= -7))
        GROUP BY m.measurement_concept_id, m.measurement_date)
        UNION ALL
        -- Get death and encode it a "Dead" condition (434489)
        (SELECT 434489 AS concept_id, MAX(death_date) AS start_date
        FROM dbo.death d   
        -- Don't add a "Dead" concept if it's already in the condition_occurrence table
        LEFT JOIN condition_occurrence co ON d.person_id = co.person_id AND co.condition_concept_id = 434489 
        WHERE d.person_id = ? AND death_date >= ? AND co.person_id IS NULL
        GROUP BY d.person_id) -- some people have multiple death records
        ) x
    GROUP BY x.start_date
    ORDER BY x.start_date;
    """    
    
    pid = row['person_id']
    dob = row['birth_date']
    
    conn = pyodbc.connect(**sql_config, pwd=pwd)
    cursor = conn.cursor()
    cursor.execute(sql, pid, dob, pid, dob, pid, dob, pid, dob, pid, dob)
    res = cursor.fetchall()
    cursor.close()
    conn.close()
        
    # Write the sequence if it's not empty
    sequence_str = None
    if res:           
        sequence_str = '\t'.join([x[0].strftime('%Y-%m-%d') + ':' + str(x[1]) for x in res])
        sequence_str = str(pid) + '\t' + sequence_str + '\n'
    return sequence_str

In [None]:
def write_out_patient_code_sequences(df_persons, n_persons, filename):
    t1 = time.time()
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor, \
            open(path.join(dir_data, filename), 'w') as fh:
        print('patient sequence progress: ')
        
        # Write out a header line with the format
        fh.write('Tab seperated cells. First cell is person_id. All following cells are YYYY-MM-DD:<concept IDs seperated by commas>\n')        
        count = 0
        n_patients = 0
        
        futures = [executor.submit(process_patient, row) for index, row in df_persons.iterrows()]
        for future in concurrent.futures.as_completed(futures):
            count += 1
            sequence_str = future.result()
            if sequence_str is not None:
                n_patients += 1
                fh.write(sequence_str)

            if count % 1000 == 0:
                percent = count / n_persons * 100
                elapsed_time = (time.time() - t1) / 60
                print(f'\t{percent:.02f}% - {elapsed_time:.01f} min')

    elapsed_time = (time.time() - t1) / 60
    print(elapsed_time)

In [None]:
write_out_patient_code_sequences(df_persons, n_persons, 'patient_code_sequences_COVID19.txt')

# Get the first event information for each patient

In [None]:
sql = """
WITH first_case AS 
    (SELECT person_id, MIN(event_id) AS event_id
    FROM covid19_table_name
    GROUP BY person_id)

SELECT e.*
FROM covid19_table_name e
JOIN first_case f ON e.person_id = f.person_id AND e.event_id = f.event_id
"""
df_events = pd.read_sql(sql, conn)
df_events.to_csv(file_events, sep='\t', na_rep='NULL', header=True, index=False)