In [8]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text, Table, Column, Integer, String, Date, DateTime, Float, MetaData, Text, Boolean, Numeric
from datetime import datetime, timedelta
import warnings
import logging

warnings.filterwarnings('ignore')

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Database connection
engine = create_engine("postgresql://mahithareddy:Mahi1299@localhost:5433/omop")

# Load source data
base_path = '/Users/mahithareddy/Desktop/synthea/output/csv/'
try:
    # Explicitly specify dtype for 'CODE' columns to prevent type inference issues
    patients = pd.read_csv(base_path + 'patients.csv')
    conditions = pd.read_csv(base_path + 'conditions.csv', dtype={'CODE': str}) 
    medications = pd.read_csv(base_path + 'medications.csv', dtype={'CODE': str}) 
    encounters = pd.read_csv(base_path + 'encounters.csv')
    logging.info("Source data loaded successfully.")
except FileNotFoundError as e:
    logging.error(f"Error loading source data: {e}. Please ensure base_path is correct and files exist.")
    exit()

# Load OMOP vocabulary tables
try:
    concept_df = pd.read_csv('CONCEPT.csv', sep='\t', dtype=str, low_memory=False)
    concept_relationship_df = pd.read_csv('CONCEPT_RELATIONSHIP.csv', sep='\t', dtype=str, low_memory=False)
    logging.info("OMOP vocabulary tables loaded successfully.")
except FileNotFoundError as e:
    logging.error(f"Error loading OMOP vocabulary files: {e}. Please ensure CONCEPT.csv and CONCEPT_RELATIONSHIP.csv are in the same directory as the script.")
    exit()

concept_df.columns = concept_df.columns.str.strip().str.lower()
concept_relationship_df.columns = concept_relationship_df.columns.str.strip().str.lower()

patients['person_id'] = range(1, len(patients) + 1)

logging.info("Initial data preparation complete.")

def get_concept_mapping(source_vocab, target_vocab='SNOMED'):
    """
    Use CONCEPT_RELATIONSHIP for proper OMOP mapping.
    Handles missing mappings by returning them with target_concept_id as 0.
    """
    try:
        # Filter and clean source concepts
        source_concepts = concept_df[
            (concept_df['vocabulary_id'] == source_vocab) &
            (concept_df['invalid_reason'].isna())
        ][['concept_id', 'concept_code', 'concept_name']].copy()

        # Filter and clean maps_to relationships
        maps_to = concept_relationship_df[
            (concept_relationship_df['relationship_id'] == 'Maps to') &
            (concept_relationship_df['invalid_reason'].isna())
        ][['concept_id_1', 'concept_id_2']].copy()

        # Filter and clean standard target concepts
        target_concepts = concept_df[
            (concept_df['vocabulary_id'] == target_vocab) &
            (concept_df['standard_concept'] == 'S') &
            (concept_df['invalid_reason'].isna())
        ][['concept_id', 'concept_name']].copy()

        # Convert concept_id columns to numeric
        source_concepts['concept_id'] = pd.to_numeric(source_concepts['concept_id'], errors='coerce')
        maps_to['concept_id_1'] = pd.to_numeric(maps_to['concept_id_1'], errors='coerce')
        maps_to['concept_id_2'] = pd.to_numeric(maps_to['concept_id_2'], errors='coerce')
        target_concepts['concept_id'] = pd.to_numeric(target_concepts['concept_id'], errors='coerce')

        # Drop any rows with missing concept IDs that failed conversion
        source_concepts.dropna(subset=['concept_id'], inplace=True)
        maps_to.dropna(subset=['concept_id_1', 'concept_id_2'], inplace=True)
        target_concepts.dropna(subset=['concept_id'], inplace=True)

        # Ensure concept_code is string type for merging
        source_concepts['concept_code'] = source_concepts['concept_code'].astype(str) 

        # Merge source concepts with 'Maps to' relationships (left merge to keep all source codes)
        # Then merge with target concepts (left merge to keep all mapped source concepts)
        mapping = (
            source_concepts
            .merge(maps_to, left_on='concept_id', right_on='concept_id_1', how='left')
            .merge(target_concepts, left_on='concept_id_2', right_on='concept_id', how='left', suffixes=('_src', '_tgt'))
        )

        result = mapping[['concept_code', 'concept_id_2', 'concept_name_tgt']].copy()
        result.columns = ['source_code', 'target_concept_id', 'target_name']

        # Fill NaN target_concept_id with 0 for unmapped concepts.
        # This keeps the original source codes even if no standard mapping exists.
        result['target_concept_id'] = result['target_concept_id'].fillna(0).astype(int)
        
        logging.info(f"Mapped {len(result)} codes from {source_vocab} to {target_vocab}. (Includes unmapped with target_concept_id=0)")
        return result
    except Exception as e:
        logging.error(f"Concept mapping failed for {source_vocab} to {target_vocab}: {e}")
        return pd.DataFrame(columns=['source_code', 'target_concept_id', 'target_name'])


def create_complete_omop_tables():
    """Drop and recreate OMOP tables with ALL required and optional columns, adjusted for Synthea data."""

    # Drop existing tables first to ensure a clean slate for each run
    with engine.connect() as conn:
        logging.info("Dropping existing OMOP tables...")
        conn.execute(text("DROP TABLE IF EXISTS drug_exposure CASCADE"))
        conn.execute(text("DROP TABLE IF EXISTS condition_occurrence CASCADE"))
        conn.execute(text("DROP TABLE IF EXISTS visit_occurrence CASCADE"))
        conn.execute(text("DROP TABLE IF EXISTS person CASCADE"))
        conn.commit()
        logging.info("Existing OMOP tables dropped.")

    metadata = MetaData()

    # Person table with ALL columns
    person_table = Table("person", metadata,
        Column("person_id", Integer, primary_key=True),
        Column("gender_concept_id", Integer, nullable=False),
        Column("year_of_birth", Integer, nullable=False),
        Column("race_concept_id", Integer, nullable=False),
        Column("ethnicity_concept_id", Integer, nullable=False),
        Column("gender_source_concept_id", Integer),
        Column("race_source_concept_id", Integer),
        Column("ethnicity_source_concept_id", Integer),
        # Optional columns
        Column("month_of_birth", Integer),
        Column("day_of_birth", Integer),
        Column("birth_datetime", DateTime),
        Column("death_datetime", DateTime),
        Column("location_id", Integer),
        Column("provider_id", Integer),
        Column("care_site_id", Integer),
        Column("person_source_value", String(50)),
        Column("gender_source_value", String(50)),
        Column("race_source_value", String(50)),
        Column("ethnicity_source_value", String(50))
    )

    # Visit occurrence with ALL columns
    visit_occurrence_table = Table("visit_occurrence", metadata,
        Column("visit_occurrence_id", Integer, primary_key=True),
        Column("person_id", Integer, nullable=False),
        Column("visit_concept_id", Integer, nullable=False),
        Column("visit_start_date", Date, nullable=False),
        Column("visit_end_date", Date, nullable=False),
        Column("visit_type_concept_id", Integer, nullable=False),
        Column("visit_source_concept_id", Integer),
        # Optional columns
        Column("visit_start_datetime", DateTime),
        Column("visit_end_datetime", DateTime),
        Column("provider_id", Integer),
        Column("care_site_id", Integer),
        Column("visit_source_value", String(50)),
        Column("admitted_from_concept_id", Integer),
        Column("admitted_from_source_value", String(50)),
        Column("discharged_to_concept_id", Integer),
        Column("discharged_to_source_value", String(50)),
        Column("preceding_visit_occurrence_id", Integer)
    )

    # Condition occurrence with ALL columns
    condition_occurrence_table = Table("condition_occurrence", metadata,
        Column("condition_occurrence_id", Integer, primary_key=True),
        Column("person_id", Integer, nullable=False),
        Column("condition_concept_id", Integer, nullable=False),
        Column("condition_start_date", Date, nullable=False),
        Column("condition_type_concept_id", Integer, nullable=False),
        Column("condition_source_concept_id", Integer),
        # Optional columns
        Column("condition_start_datetime", DateTime),
        Column("condition_end_date", Date),
        Column("condition_end_datetime", DateTime),
        Column("condition_status_concept_id", Integer),
        Column("stop_reason", String(255)),
        Column("provider_id", Integer),
        Column("visit_occurrence_id", Integer),
        Column("visit_detail_id", Integer),
        Column("condition_source_value", String(50)),
        Column("condition_status_source_value", String(50))
    )

    # Drug exposure with ALL columns
    drug_exposure_table = Table("drug_exposure", metadata,
        Column("drug_exposure_id", Integer, primary_key=True),
        Column("person_id", Integer, nullable=False),
        Column("drug_concept_id", Integer, nullable=False),
        Column("drug_exposure_start_date", Date, nullable=False),
        Column("drug_type_concept_id", Integer, nullable=False),
        Column("drug_source_concept_id", Integer),
        # Optional columns
        Column("drug_exposure_start_datetime", DateTime),
        Column("drug_exposure_end_date", Date),
        Column("drug_exposure_end_datetime", DateTime),
        Column("verbatim_end_date", Date),
        Column("stop_reason", String(255)),
        Column("refills", Integer),
        Column("quantity", Numeric),
        Column("days_supply", Integer),
        Column("sig", Text),
        Column("route_concept_id", Integer),
        Column("lot_number", String(50)),
        Column("provider_id", Integer),
        Column("visit_occurrence_id", Integer),
        Column("visit_detail_id", Integer),
        Column("drug_source_value", String(50)),
        Column("route_source_value", String(50)),
        Column("dose_unit_source_value", String(50))
    )

    metadata.create_all(engine)
    logging.info("Complete OMOP tables created successfully.")

def deduplicate_medications(meds_df):
    """Advanced deduplication with treatment windows for drug exposures."""
    
    # Add person_id
    meds_with_person = meds_df.merge(patients[['Id', 'person_id']],
                                     left_on='PATIENT', right_on='Id', how='left')
    
    # Drop records where person_id is missing (shouldn't happen if patients loaded correctly)
    meds_with_person.dropna(subset=['person_id'], inplace=True)
    meds_with_person['person_id'] = meds_with_person['person_id'].astype(int)

    # Convert dates and make them timezone-naive if they have timezone info
    meds_with_person['start_date'] = pd.to_datetime(meds_with_person['START'], errors='coerce')
    meds_with_person['stop_date'] = pd.to_datetime(meds_with_person['STOP'], errors='coerce')
    
    # Ensure datetimes are timezone-naive
    if meds_with_person['start_date'].dt.tz is not None:
        meds_with_person['start_date'] = meds_with_person['start_date'].dt.tz_localize(None)
    if meds_with_person['stop_date'].dt.tz is not None:
        meds_with_person['stop_date'] = meds_with_person['stop_date'].dt.tz_localize(None)

    # Drop rows where start_date is invalid
    meds_with_person.dropna(subset=['start_date'], inplace=True)

    # Fill missing stop dates (30-day default if not provided, for calculation of days_supply)
    meds_with_person['stop_date'] = meds_with_person['stop_date'].fillna(
        meds_with_person['start_date'] + pd.Timedelta(days=30)
    )
    
    deduped = []
    
    # Group by person and medication code
    for (person_id, drug_code), group in meds_with_person.groupby(['person_id', 'CODE']):
        group = group.sort_values('start_date')
        
        episodes = []
        
        for _, med in group.iterrows():
            merged = False
            
            # Check if this medication can be merged with existing episodes
            for i, episode in enumerate(episodes):
                # Allow 7-day gap for prescription refills (or within 7 days of start)
                # This logic checks for overlap or close proximity
                if (med['start_date'] <= episode['end_date'] + pd.Timedelta(days=7) and
                    med['stop_date'] >= episode['start_date'] - pd.Timedelta(days=7)):
                        
                    # Merge into existing episode
                    episodes[i] = {
                        'start_date': min(episode['start_date'], med['start_date']),
                        'end_date': max(episode['end_date'], med['stop_date']),
                        'original_records': episode['original_records'] + [med] # Keep all original records
                    }
                    merged = True
                    break
            
            if not merged:
                # Create new episode
                episodes.append({
                    'start_date': med['start_date'],
                    'end_date': med['stop_date'],
                    'original_records': [med] # Start a new list of original records
                })
        
        # For each episode, create a single deduped record.
        # We take the first original record's non-date fields and update dates.
        for episode in episodes:
            if episode['original_records']:
                first_record = episode['original_records'][0].copy() # Take the first record's info
                first_record['START'] = episode['start_date']
                first_record['STOP'] = episode['end_date']
                deduped.append(first_record)
    
    result = pd.DataFrame(deduped)
    logging.info(f"Deduplication for medications: {len(meds_with_person)} raw records -> {len(result)} deduped records.")
    return result

def create_person_table():
    """Create person table with all optional columns populated according to OMOP guidance."""
    person_data = []
    
    # Map Synthea gender, race, ethnicity to OMOP standard concepts.
    gender_map = {'M': 8507, 'F': 8532, 'U': 0} 
    race_map = {
        'white': 8527, 'black': 8516, 'asian': 8515,
        'native': 8657, 'other': 8522, 'hawaiian': 8522 
    }
    ethnicity_map = {'hispanic': 38003563, 'nonhispanic': 38003564}

    for _, patient in patients.iterrows():
        try:
            birth_date = pd.to_datetime(patient['BIRTHDATE'], errors='coerce')
            # Ensure birth_date is timezone-naive
            if pd.isna(birth_date):
                logging.warning(f"Skipping person_id {patient['person_id']} due to invalid BIRTHDATE: {patient['BIRTHDATE']}")
                continue
            
            # Ensure timezone is removed before processing
            if birth_date.tz is not None:
                birth_date = birth_date.tz_localize(None)

            death_date = pd.to_datetime(patient['DEATHDATE'], errors='coerce') if pd.notna(patient.get('DEATHDATE')) else None
            if death_date is not None and death_date.tz is not None:
                death_date = death_date.tz_localize(None)
            
            # Get gender concept ID
            gender_raw = patient.get('GENDER', 'U').upper()
            gender_id = gender_map.get(gender_raw, 0) 
            
            # Get race concept ID (default to 'Other' if race is missing or unmapped)
            race_raw = patient.get('RACE', 'other').lower()
            race_id = race_map.get(race_raw, 8522) 
            
            # Get ethnicity concept ID (default to 'Nonhispanic' if ethnicity is missing or unmapped)
            ethnicity_raw = patient.get('ETHNICITY', 'nonhispanic').lower()
            ethnicity_id = ethnicity_map.get(ethnicity_raw, 38003564) 

            person_data.append({
                'person_id': patient['person_id'],
                'gender_concept_id': gender_id,
                'year_of_birth': birth_date.year,
                'race_concept_id': race_id,
                'ethnicity_concept_id': ethnicity_id,
                'gender_source_concept_id': 0, 
                'race_source_concept_id': 0,
                'ethnicity_source_concept_id': 0,
                # Optional columns
                'month_of_birth': birth_date.month,
                'day_of_birth': birth_date.day,
                'birth_datetime': birth_date,
                'death_datetime': death_date,
                'location_id': None,
                'provider_id': None,
                'care_site_id': None,
                'person_source_value': patient['Id'],
                'gender_source_value': gender_raw,
                'race_source_value': race_raw,
                'ethnicity_source_value': ethnicity_raw
            })
        except Exception as e:
            logging.error(f"Error processing patient {patient.get('Id', 'Unknown')}: {e}")
            continue
            
    person_df = pd.DataFrame(person_data)
    try:
        person_df.to_sql('person', engine, if_exists='append', index=False, method='multi')
        logging.info(f"Person table: {len(person_df)} records inserted.")
    except Exception as e:
        logging.error(f"Error inserting into person table: {e}")
    return person_df

def create_visit_occurrence_table():
    """Create visit occurrence table from Synthea encounters."""
    # Add person_id to encounters
    encounters_with_person = encounters.merge(patients[['Id', 'person_id']],
                                             left_on='PATIENT', right_on='Id', how='left')
    
    # Drop encounters with no linked person_id (should not happen with proper patient loading)
    encounters_with_person.dropna(subset=['person_id'], inplace=True)
    encounters_with_person['person_id'] = encounters_with_person['person_id'].astype(int)

    visit_data = []
    
    # Visit type mapping from Synthea ENCOUNTERCLASS to OMOP visit concepts
    visit_type_map = {
        'ambulatory': 9202,  # Outpatient Visit
        'emergency': 9203,   # Emergency Room Visit
        'inpatient': 9201,   # Inpatient Visit
        'outpatient': 9202,  # Outpatient Visit
        'wellness': 9202,    # Wellness Visit (mapped to Outpatient)
        'home': 262,         # Home Visit (if applicable)
        'urgentcare': 9202   # Urgent Care (mapped to Outpatient)
    }
    # Concept ID for "EHR" (Type Concept ID) - usually 32827
    ehr_type_concept_id = 32827

    for idx, encounter in encounters_with_person.iterrows():
        try:
            start_datetime = pd.to_datetime(encounter['START'], errors='coerce')
            if pd.isna(start_datetime):
                logging.warning(f"Skipping encounter_id {encounter.get('Id')} due to invalid START date.")
                continue

            # --- CRITICAL FIX: Make datetime timezone-naive immediately ---
            if start_datetime.tz is not None:
                start_datetime = start_datetime.tz_localize(None)

            end_datetime = pd.to_datetime(encounter['STOP'], errors='coerce')
            if pd.isna(end_datetime):
                end_datetime = start_datetime # If stop is missing, assume it's same as start
            elif end_datetime.tz is not None:
                end_datetime = end_datetime.tz_localize(None)

            # Get visit_concept_id based on ENCOUNTERCLASS, default to 0 if unknown
            visit_concept_id = visit_type_map.get(str(encounter.get('ENCOUNTERCLASS', 'unknown')).lower(), 0)
            
            # Use the original Synthea encounter ID as visit_source_value
            encounter_id_val = str(encounter.get('Id', 'NO_ID_' + str(idx+1)))
            
            visit_data.append({
                'visit_occurrence_id': idx + 1, # Unique ID for each visit occurrence
                'person_id': encounter['person_id'],
                'visit_concept_id': visit_concept_id,
                'visit_start_date': start_datetime.date(),
                'visit_end_date': end_datetime.date(),
                'visit_type_concept_id': ehr_type_concept_id,  # Type Concept ID for "EHR record"
                'visit_source_concept_id': 0, 
                # Optional columns
                'visit_start_datetime': start_datetime,
                'visit_end_datetime': end_datetime,
                'provider_id': None,
                'care_site_id': None,
                'visit_source_value': encounter_id_val, # Original Synthea ID
                'admitted_from_concept_id': None,
                'admitted_from_source_value': None,
                'discharged_to_concept_id': None,
                'discharged_to_source_value': None,
                'preceding_visit_occurrence_id': None
            })
        except Exception as e:
            logging.error(f"Error processing encounter {encounter.get('Id', 'Unknown')}: {e}")
            continue
            
    visit_df = pd.DataFrame(visit_data)
    try:
        # Sort by person_id and start_datetime to help with future linking/ordering
        visit_df.sort_values(by=['person_id', 'visit_start_datetime'], inplace=True)
        visit_df.to_sql('visit_occurrence', engine, if_exists='append', index=False, method='multi')
        logging.info(f"Visit occurrence table: {len(visit_df)} records inserted.")
    except Exception as e:
        logging.error(f"Error inserting into visit_occurrence table: {e}")
    return visit_df

def create_condition_occurrence_table(visit_df):
    """Create condition occurrence table with proper mapping."""
    # Get ICD10CM to SNOMED mapping
    icd_mapping = get_concept_mapping('ICD10CM', 'SNOMED')
    
    # Add person_id to conditions
    conditions_with_person = conditions.merge(patients[['Id', 'person_id']],
                                             left_on='PATIENT', right_on='Id', how='left')
    
    # Drop conditions with no linked person_id
    conditions_with_person.dropna(subset=['person_id'], inplace=True)
    conditions_with_person['person_id'] = conditions_with_person['person_id'].astype(int)

    # Ensure 'CODE' column is string type before merging
    conditions_with_person['CODE'] = conditions_with_person['CODE'].astype(str)

    # Apply mapping
    conditions_mapped = conditions_with_person.merge(icd_mapping,
                                                     left_on='CODE', right_on='source_code', how='left')
    
    condition_data = []
    
    for idx, condition in conditions_mapped.iterrows():
        try:
            start_datetime = pd.to_datetime(condition['START'], errors='coerce')
            if pd.isna(start_datetime):
                logging.warning(f"Skipping condition_id {condition.get('Id')} due to invalid START date.")
                continue

            # --- CRITICAL FIX: Make condition start_datetime timezone-naive for comparison ---
            if start_datetime.tz is not None:
                start_datetime = start_datetime.tz_localize(None)

            end_datetime = pd.to_datetime(condition['STOP'], errors='coerce') if pd.notna(condition['STOP']) else None
            if end_datetime is not None and end_datetime.tz is not None:
                end_datetime = end_datetime.tz_localize(None)
            
            # Ensure target_concept_id is an integer (0 if no mapping found)
            condition_concept_id = int(condition['target_concept_id']) if pd.notna(condition['target_concept_id']) else 0
            
            # Link to visit: find a visit that contains the condition's start_datetime
            # NOTE: We can now safely compare `visit_start_datetime` (from visit_df) and `start_datetime` (from condition)
            # because both have been made timezone-naive.
            matching_visits = visit_df[
                (visit_df['person_id'] == condition['person_id']) &
                (visit_df['visit_start_datetime'] <= start_datetime) &
                (visit_df['visit_end_datetime'] >= start_datetime)
            ].sort_values(by='visit_start_datetime')
            
            visit_id = None
            if not matching_visits.empty:
                visit_id = matching_visits['visit_occurrence_id'].iloc[0]
            
            # OMOP Type Concept ID for "EHR" - often used for automatically extracted data
            condition_type_concept_id = 32827 
            
            condition_data.append({
                'condition_occurrence_id': idx + 1,
                'person_id': condition['person_id'],
                'condition_concept_id': condition_concept_id,
                'condition_start_date': start_datetime.date(),
                'condition_type_concept_id': condition_type_concept_id,
                'condition_source_concept_id': 0, 
                # Optional columns
                'condition_start_datetime': start_datetime,
                'condition_end_date': end_datetime.date() if end_datetime else None,
                'condition_end_datetime': end_datetime,
                'condition_status_concept_id': None, 
                'stop_reason': str(condition.get('REASONDESCRIPTION', None) or condition.get('REASONCODE', None))[:255] if condition.get('REASONDESCRIPTION', None) or condition.get('REASONCODE', None) else None, 
                'provider_id': None,
                'visit_occurrence_id': visit_id,
                'visit_detail_id': None,
                'condition_source_value': str(condition['CODE']),
                'condition_status_source_value': None
            })
        except Exception as e:
            # Added more context to the error message for better debugging
            logging.error(f"Error processing condition {condition.get('CODE', 'Unknown')} for patient {condition.get('person_id', 'Unknown')}: {e}")
            continue
            
    condition_df = pd.DataFrame(condition_data)
    try:
        condition_df.to_sql('condition_occurrence', engine, if_exists='append', index=False, method='multi')
        logging.info(f"Condition occurrence table: {len(condition_df)} records inserted.")
    except Exception as e:
        logging.error(f"Error inserting into condition_occurrence table: {e}")
    return condition_df

def create_drug_exposure_table(visit_df):
    """Create drug exposure table with deduplication and proper mapping."""
    # Get RxNorm to RxNorm mapping (standard concepts within RxNorm)
    rxnorm_mapping = get_concept_mapping('RxNorm', 'RxNorm')
    
    # Deduplicate medications
    deduped_meds = deduplicate_medications(medications)
    
    # Ensure 'CODE' column is string type before merging
    deduped_meds['CODE'] = deduped_meds['CODE'].astype(str) 

    # Apply mapping
    meds_mapped = deduped_meds.merge(rxnorm_mapping,
                                     left_on='CODE', right_on='source_code', how='left')
    
    drug_data = []
    
    for idx, med in meds_mapped.iterrows():
        try:
            start_datetime = pd.to_datetime(med['START'], errors='coerce')
            if pd.isna(start_datetime):
                logging.warning(f"Skipping medication {med.get('CODE')} for patient {med.get('person_id')} due to invalid START date.")
                continue

            # --- CRITICAL FIX: Make drug start_datetime timezone-naive for comparison ---
            if start_datetime.tz is not None:
                start_datetime = start_datetime.tz_localize(None)

            end_datetime = pd.to_datetime(med['STOP'], errors='coerce')
            # If STOP date is missing, use the deduped stop_date, or a default 30 days
            if pd.isna(end_datetime):
                 end_datetime = start_datetime + pd.Timedelta(days=30)
            elif end_datetime.tz is not None:
                end_datetime = end_datetime.tz_localize(None)
            
            # Ensure target_concept_id is an integer (0 if no mapping found)
            drug_concept_id = int(med['target_concept_id']) if pd.notna(med['target_concept_id']) else 0
            
            # Link to visit: find a visit that contains the drug exposure's start_datetime
            matching_visits = visit_df[
                (visit_df['person_id'] == med['person_id']) &
                (visit_df['visit_start_datetime'] <= start_datetime) &
                (visit_df['visit_end_datetime'] >= start_datetime)
            ].sort_values(by='visit_start_datetime') 
            
            visit_id = None
            if not matching_visits.empty:
                visit_id = matching_visits['visit_occurrence_id'].iloc[0]
            else:
                pass 
            
            # Calculate days supply
            days_supply = None
            if end_datetime and start_datetime:
                days_supply = max(1, (end_datetime - start_datetime).days)
            
            # OMOP Type Concept ID for "EHR" - usually 32827
            drug_type_concept_id = 32827 

            drug_data.append({
                'drug_exposure_id': idx + 1,
                'person_id': med['person_id'],
                'drug_concept_id': drug_concept_id,
                'drug_exposure_start_date': start_datetime.date(),
                'drug_type_concept_id': drug_type_concept_id,
                'drug_source_concept_id': 0, 
                # Optional columns
                'drug_exposure_start_datetime': start_datetime,
                'drug_exposure_end_date': end_datetime.date() if end_datetime else None,
                'drug_exposure_end_datetime': end_datetime,
                'verbatim_end_date': None,
                'stop_reason': str(med.get('REASONDESCRIPTION', None) or med.get('REASONCODE', None))[:255] if med.get('REASONDESCRIPTION', None) or med.get('REASONCODE', None) else None, 
                'refills': None, 
                'quantity': None, 
                'days_supply': days_supply,
                'sig': str(med.get('REASONDESCRIPTION', None))[:255] if pd.notna(med.get('REASONDESCRIPTION')) else None, 
                'route_concept_id': None, 
                'lot_number': None, 
                'provider_id': None,
                'visit_occurrence_id': visit_id,
                'visit_detail_id': None,
                'drug_source_value': str(med['CODE']),
                'route_source_value': None,
                'dose_unit_source_value': None
            })
        except Exception as e:
            logging.error(f"Error processing medication {med.get('CODE', 'Unknown')} for patient {med.get('person_id', 'Unknown')}: {e}")
            continue
            
    drug_df = pd.DataFrame(drug_data)
    try:
        drug_df.to_sql('drug_exposure', engine, if_exists='append', index=False, method='multi')
        logging.info(f"Drug exposure table: {len(drug_df)} records inserted.")
    except Exception as e:
        logging.error(f"Error inserting into drug_exposure table: {e}")
    return drug_df

# MAIN EXECUTION
def main():
    logging.info("Starting OMOP CDM ETL process...")
    
    # Create tables (this will drop and recreate them)
    create_complete_omop_tables()
    
    # Load data
    person_df = create_person_table()
    # Visit DF must be created before Condition and Drug DFs to allow linking
    visit_df = create_visit_occurrence_table()
    
    # Ensure visit_df is not empty and has required columns before passing
    if visit_df.empty:
        logging.warning("No visit_occurrence records found. Condition and Drug tables might be empty.")
        condition_df = pd.DataFrame()
        drug_df = pd.DataFrame()
    else:
        condition_df = create_condition_occurrence_table(visit_df)
        drug_df = create_drug_exposure_table(visit_df)
        
    logging.info("OMOP CDM ETL completed successfully!")
    logging.info(f"Summary:")
    logging.info(f"  Persons: {len(person_df)}")
    logging.info(f"  Visits: {len(visit_df)}")
    logging.info(f"  Conditions: {len(condition_df)}")
    logging.info(f"  Medications: {len(drug_df)}")

if __name__ == "__main__":
    main()

2025-07-13 20:08:11,221 - INFO - Source data loaded successfully.
2025-07-13 20:09:24,358 - INFO - OMOP vocabulary tables loaded successfully.
2025-07-13 20:09:24,373 - INFO - Initial data preparation complete.
2025-07-13 20:09:24,387 - INFO - Starting OMOP CDM ETL process...
2025-07-13 20:09:24,522 - INFO - Dropping existing OMOP tables...
2025-07-13 20:09:24,531 - INFO - Existing OMOP tables dropped.
2025-07-13 20:09:24,548 - INFO - Complete OMOP tables created successfully.
2025-07-13 20:09:24,914 - INFO - Person table: 1128 records inserted.
2025-07-13 20:09:55,625 - INFO - Visit occurrence table: 63678 records inserted.
2025-07-13 20:10:04,739 - INFO - Mapped 127596 codes from ICD10CM to SNOMED. (Includes unmapped with target_concept_id=0)
2025-07-13 20:10:32,661 - INFO - Condition occurrence table: 39390 records inserted.
2025-07-13 20:10:41,515 - INFO - Mapped 216709 codes from RxNorm to RxNorm. (Includes unmapped with target_concept_id=0)
2025-07-13 20:10:44,984 - INFO - Dedupl

In [10]:
import pandas as pd
from sqlalchemy import create_engine, text
import logging

# Configure logging (optional, but good practice for notebooks too)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Database connection setup (assuming this part is already correct from your script) ---
# Replace with your actual database connection string
engine = create_engine("postgresql://mahithareddy:Mahi1299@localhost:5433/omop")

# --- Helper function to run SQL queries and display results ---
def run_sql_query(sql_query, title="Query Result"):
    """
    Helper function to execute a SQL query using the global 'engine'
    and display the results as a Pandas DataFrame.
    """
    logging.info(f"--- Executing: {title} ---")
    try:
        with engine.connect() as connection:
            df = pd.read_sql(text(sql_query), connection) # Use text() for SQLAlchemy 2.0 style
            logging.info(f"Query returned {len(df)} rows.")
            display(df) # Use display() for nicer output in Jupyter Notebooks
            return df
    except Exception as e:
        logging.error(f"Error executing query '{title}': {e}")
        return pd.DataFrame()

print("Notebook setup complete. Ready to run OMOP verification queries.")

# --- Start of Verification Queries ---

print("\n\n--- 1. Verifying OMOP CDM Table Completeness ---")

# Query 1.1: Check person table optional columns for non-null values
query_1_1 = """
SELECT
    COUNT(*) AS total_persons,
    COUNT(month_of_birth) AS persons_with_month_of_birth,
    COUNT(day_of_birth) AS persons_with_day_of_birth,
    COUNT(birth_datetime) AS persons_with_birth_datetime,
    COUNT(death_datetime) AS persons_with_death_datetime,
    COUNT(location_id) AS persons_with_location_id,
    COUNT(provider_id) AS persons_with_provider_id,
    COUNT(care_site_id) AS persons_with_care_site_id,
    COUNT(person_source_value) AS persons_with_person_source_value,
    COUNT(gender_source_value) AS persons_with_gender_source_value,
    COUNT(race_source_value) AS persons_with_race_source_value,
    COUNT(ethnicity_source_value) AS persons_with_ethnicity_source_value
FROM
    person;
"""
run_sql_query(query_1_1, "Person Table Completeness Check")

# Query 1.2: Check visit_occurrence table for optional columns
query_1_2 = """
SELECT
    COUNT(*) AS total_visits,
    COUNT(visit_start_datetime) AS visits_with_start_datetime,
    COUNT(visit_end_datetime) AS visits_with_end_datetime,
    COUNT(provider_id) AS visits_with_provider_id,
    COUNT(care_site_id) AS visits_with_care_site_id,
    COUNT(visit_source_value) AS visits_with_source_value,
    COUNT(admitted_from_concept_id) AS visits_with_admitted_from,
    COUNT(discharged_to_concept_id) AS visits_with_discharged_to,
    COUNT(preceding_visit_occurrence_id) AS visits_with_preceding_visit
FROM
    visit_occurrence;
"""
run_sql_query(query_1_2, "Visit Occurrence Table Completeness Check")

# Query 1.3: Check condition_occurrence for optional columns, especially visit_occurrence_id linkage.
query_1_3 = """
SELECT
    COUNT(*) AS total_conditions,
    COUNT(condition_start_datetime) AS conditions_with_start_datetime,
    COUNT(condition_end_date) AS conditions_with_end_date,
    COUNT(condition_end_datetime) AS conditions_with_end_datetime,
    COUNT(condition_status_concept_id) AS conditions_with_status_concept,
    COUNT(stop_reason) AS conditions_with_stop_reason,
    COUNT(provider_id) AS conditions_with_provider_id,
    COUNT(visit_occurrence_id) AS conditions_linked_to_visits,
    COUNT(visit_detail_id) AS conditions_with_visit_detail_id,
    COUNT(condition_source_value) AS conditions_with_source_value,
    COUNT(condition_status_source_value) AS conditions_with_status_source_value
FROM
    condition_occurrence;
"""
run_sql_query(query_1_3, "Condition Occurrence Table Completeness Check")

# Query 1.4: Check drug_exposure for optional columns, especially days_supply and visit_occurrence_id linkage.
query_1_4 = """
SELECT
    COUNT(*) AS total_drugs,
    COUNT(drug_exposure_start_datetime) AS drugs_with_start_datetime,
    COUNT(drug_exposure_end_date) AS drugs_with_end_date,
    COUNT(drug_exposure_end_datetime) AS drugs_with_end_datetime,
    COUNT(verbatim_end_date) AS drugs_with_verbatim_end_date,
    COUNT(stop_reason) AS drugs_with_stop_reason,
    COUNT(refills) AS drugs_with_refills,
    COUNT(quantity) AS drugs_with_quantity,
    COUNT(days_supply) AS drugs_with_days_supply,
    COUNT(sig) AS drugs_with_sig,
    COUNT(route_concept_id) AS drugs_with_route_concept_id,
    COUNT(lot_number) AS drugs_with_lot_number,
    COUNT(provider_id) AS drugs_with_provider_id,
    COUNT(visit_occurrence_id) AS drugs_linked_to_visits,
    COUNT(visit_detail_id) AS drugs_with_visit_detail_id,
    COUNT(drug_source_value) AS drugs_with_source_value,
    COUNT(route_source_value) AS drugs_with_route_source_value,
    COUNT(dose_unit_source_value) AS drugs_with_dose_unit_source_value
FROM
    drug_exposure;
"""
run_sql_query(query_1_4, "Drug Exposure Table Completeness Check")


print("\n\n--- 2. Verifying Medication Deduplication Logic ---")

# Query 2.1: Distribution of days_supply in drug_exposure
query_2_1 = """
SELECT
    days_supply,
    COUNT(*) AS count_of_exposures
FROM
    drug_exposure
WHERE
    days_supply IS NOT NULL
GROUP BY
    days_supply
ORDER BY
    days_supply DESC -- Order by descending to see longer durations first
LIMIT 20; -- Show top 20 most common days_supply values
"""
run_sql_query(query_2_1, "Days Supply Distribution in Drug Exposure")

# Query 2.2: Examine a specific patient's drug exposures for merging evidence
# First, find a person_id with multiple drug exposures
query_find_person = """
SELECT person_id, COUNT(*) AS drug_count
FROM drug_exposure
GROUP BY person_id
ORDER BY drug_count DESC
LIMIT 1;
"""
person_id_df = run_sql_query(query_find_person, "Finding Person with Most Drug Exposures")

if not person_id_df.empty:
    most_active_person_id = person_id_df['person_id'].iloc[0]
    print(f"\nAnalyzing drug exposures for person_id: {most_active_person_id}")

    query_2_2 = f"""
    SELECT
        person_id,
        drug_concept_id,
        (SELECT concept_name FROM concept WHERE concept_id = drug_concept_id) AS drug_concept_name,
        drug_exposure_start_date,
        drug_exposure_end_date,
        days_supply,
        drug_source_value
    FROM
        drug_exposure
    WHERE
        person_id = {most_active_person_id}
    ORDER BY
        drug_exposure_start_date;
    """
    run_sql_query(query_2_2, f"Drug Exposures for Person ID {most_active_person_id}")
else:
    print("Could not find a person with drug exposures to analyze for deduplication.")

print("\n\n--- 3. Verifying Standard Code Mapping ---")

# Query 3.1: Check a sample of condition_occurrence records for mapping.
query_3_1 = """
SELECT
    co.condition_occurrence_id,
    co.condition_source_value,
    co.condition_concept_id,
    (SELECT c.concept_name FROM concept c WHERE c.concept_id = co.condition_concept_id) AS standard_condition_name,
    (SELECT c_src.concept_name FROM concept c_src WHERE c_src.concept_code = co.condition_source_value AND c_src.vocabulary_id = 'ICD10CM') AS source_condition_name
FROM
    condition_occurrence co
WHERE
    co.condition_concept_id != 0 -- Exclude unmapped (concept_id = 0)
    AND co.condition_source_value IS NOT NULL
LIMIT 10;
"""
run_sql_query(query_3_1, "Sample of Mapped Condition Occurrences")

# Query 3.2: Check a sample of drug_exposure records for mapping.
query_3_2 = """
SELECT
    de.drug_exposure_id,
    de.drug_source_value,
    de.drug_concept_id,
    (SELECT c.concept_name FROM concept c WHERE c.concept_id = de.drug_concept_id) AS standard_drug_name,
    (SELECT c_src.concept_name FROM concept c_src WHERE c_src.concept_code = de.drug_source_value AND c_src.vocabulary_id = 'RxNorm') AS source_drug_name
FROM
    drug_exposure de
WHERE
    de.drug_concept_id != 0 -- Exclude unmapped (concept_id = 0)
    AND de.drug_source_value IS NOT NULL
LIMIT 10;
"""
run_sql_query(query_3_2, "Sample of Mapped Drug Exposures")

print("\n--- All verification queries executed. ---")

2025-07-13 20:14:44,595 - INFO - --- Executing: Person Table Completeness Check ---
2025-07-13 20:14:44,667 - INFO - Query returned 1 rows.


Notebook setup complete. Ready to run OMOP verification queries.


--- 1. Verifying OMOP CDM Table Completeness ---


Unnamed: 0,total_persons,persons_with_month_of_birth,persons_with_day_of_birth,persons_with_birth_datetime,persons_with_death_datetime,persons_with_location_id,persons_with_provider_id,persons_with_care_site_id,persons_with_person_source_value,persons_with_gender_source_value,persons_with_race_source_value,persons_with_ethnicity_source_value
0,1128,1128,1128,1128,128,0,0,0,1128,1128,1128,1128


2025-07-13 20:14:44,701 - INFO - --- Executing: Visit Occurrence Table Completeness Check ---
2025-07-13 20:14:44,725 - INFO - Query returned 1 rows.


Unnamed: 0,total_visits,visits_with_start_datetime,visits_with_end_datetime,visits_with_provider_id,visits_with_care_site_id,visits_with_source_value,visits_with_admitted_from,visits_with_discharged_to,visits_with_preceding_visit
0,63678,63678,63678,0,0,63678,0,0,0


2025-07-13 20:14:44,740 - INFO - --- Executing: Condition Occurrence Table Completeness Check ---
2025-07-13 20:14:44,753 - INFO - Query returned 1 rows.


Unnamed: 0,total_conditions,conditions_with_start_datetime,conditions_with_end_date,conditions_with_end_datetime,conditions_with_status_concept,conditions_with_stop_reason,conditions_with_provider_id,conditions_linked_to_visits,conditions_with_visit_detail_id,conditions_with_source_value,conditions_with_status_source_value
0,39390,39390,29421,29421,0,0,0,71,0,39390,0


2025-07-13 20:14:44,756 - INFO - --- Executing: Drug Exposure Table Completeness Check ---
2025-07-13 20:14:44,763 - INFO - Query returned 1 rows.


Unnamed: 0,total_drugs,drugs_with_start_datetime,drugs_with_end_date,drugs_with_end_datetime,drugs_with_verbatim_end_date,drugs_with_stop_reason,drugs_with_refills,drugs_with_quantity,drugs_with_days_supply,drugs_with_sig,drugs_with_route_concept_id,drugs_with_lot_number,drugs_with_provider_id,drugs_linked_to_visits,drugs_with_visit_detail_id,drugs_with_source_value,drugs_with_route_source_value,drugs_with_dose_unit_source_value
0,12861,12861,12861,12861,0,12861,0,0,12861,7586,0,0,0,12596,0,12861,0,0


2025-07-13 20:14:44,766 - INFO - --- Executing: Days Supply Distribution in Drug Exposure ---
2025-07-13 20:14:44,773 - INFO - Query returned 20 rows.




--- 2. Verifying Medication Deduplication Logic ---


Unnamed: 0,days_supply,count_of_exposures
0,24082,1
1,21857,1
2,19001,1
3,18937,1
4,18837,1
5,18561,1
6,18548,1
7,17554,1
8,17076,1
9,14808,1


2025-07-13 20:14:44,776 - INFO - --- Executing: Finding Person with Most Drug Exposures ---
2025-07-13 20:14:44,781 - INFO - Query returned 1 rows.


Unnamed: 0,person_id,drug_count
0,1127,360


2025-07-13 20:14:44,783 - INFO - --- Executing: Drug Exposures for Person ID 1127 ---



Analyzing drug exposures for person_id: 1127


2025-07-13 20:18:40,120 - INFO - Query returned 360 rows.


Unnamed: 0,person_id,drug_concept_id,drug_concept_name,drug_exposure_start_date,drug_exposure_end_date,days_supply,drug_source_value
0,1127,19126352,nitroglycerin 0.4 MG/ACTUAT Mucosal Spray,1977-01-24,1977-02-23,30,705129
1,1127,1539463,simvastatin 10 MG Oral Tablet,1985-03-25,2024-11-21,14486,314231
2,1127,40166830,24 HR metoprolol succinate 50 MG Extended Rele...,1991-04-30,1991-05-30,30,866436
3,1127,40163720,prasugrel 10 MG Oral Tablet,1991-04-30,1991-05-30,30,855812
4,1127,19059056,aspirin 81 MG Oral Tablet,1991-04-30,1991-05-30,30,243670
...,...,...,...,...,...,...,...
355,1127,40243589,proparacaine hydrochloride 5 MG/ML Ophthalmic ...,2025-05-19,2025-05-19,1,1191013
356,1127,906115,tropicamide 5 MG/ML Ophthalmic Solution,2025-05-19,2025-05-19,1,313521
357,1127,906115,tropicamide 5 MG/ML Ophthalmic Solution,2025-06-18,2025-06-18,1,313521
358,1127,40243589,proparacaine hydrochloride 5 MG/ML Ophthalmic ...,2025-06-18,2025-06-18,1,1191013


2025-07-13 20:18:40,149 - INFO - --- Executing: Sample of Mapped Condition Occurrences ---
2025-07-13 20:18:40,159 - INFO - Query returned 0 rows.




--- 3. Verifying Standard Code Mapping ---


Unnamed: 0,condition_occurrence_id,condition_source_value,condition_concept_id,standard_condition_name,source_condition_name


2025-07-13 20:18:40,162 - INFO - --- Executing: Sample of Mapped Drug Exposures ---
2025-07-13 20:18:55,581 - INFO - Query returned 10 rows.


Unnamed: 0,drug_exposure_id,drug_source_value,drug_concept_id,standard_drug_name,source_drug_name
0,1,204892,19028343,clonazepam 0.25 MG Oral Tablet,clonazepam 0.25 MG Oral Tablet
1,2,1043400,40229134,acetaminophen 21.7 MG/ML / dextromethorphan hy...,acetaminophen 21.7 MG/ML / dextromethorphan hy...
2,3,2001499,964261,vitamin B12 5 MG/ML Injectable Solution,vitamin B12 5 MG/ML Injectable Solution
3,4,106258,19008572,hydrocortisone 10 MG/ML Topical Cream,hydrocortisone 10 MG/ML Topical Cream
4,5,1535362,44818179,sodium fluoride 0.0272 MG/MG Oral Gel,sodium fluoride 0.0272 MG/MG Oral Gel
5,6,1870230,1594382,NDA020800 0.3 ML epinephrine 1 MG/ML Auto-Inje...,NDA020800 0.3 ML epinephrine 1 MG/ML Auto-Inje...
6,7,308192,19073188,amoxicillin 500 MG Oral Tablet,amoxicillin 500 MG Oral Tablet
7,8,312617,1551192,prednisone 5 MG Oral Tablet,prednisone 5 MG Oral Tablet
8,9,313820,19079924,acetaminophen 160 MG Chewable Tablet,acetaminophen 160 MG Chewable Tablet
9,10,665078,19125062,loratadine 5 MG Chewable Tablet,loratadine 5 MG Chewable Tablet



--- All verification queries executed. ---
