In [1]:
import pandas as pd
import numpy as np
import warnings
import time
from pandas._libs.tslibs.parsing import DateParseError
import os
from concurrent.futures import ThreadPoolExecutor, as_completed

In [2]:
ndc_file_path = r"\\chcdfiles.uthouston.edu\extract\vivas_postpartum\chelsea\datalake\final_codes\ndc_opioids_codes.csv"
opioid_codes_df = pd.read_csv(ndc_file_path, dtype={"ndc": str})
ndc_codes = opioid_codes_df['ndc'].tolist()

In [None]:
input_file_path = r"\\chcdfiles.uthouston.edu\extract\vivas_postpartum\chelsea\datalake\New_cohort\New_filters\Cohort\42day_post_delivery_filter.csv"
output_file_path = r"\\chcdfiles.uthouston.edu\extract\vivas_postpartum\chelsea\datalake\New_cohort\New_filters\Cohort\42_opioid_info.csv"  
ndc_codes_path =  r"\\chcdfiles.uthouston.edu\extract\vivas_postpartum\chelsea\datalake\final_codes\ndc_opioids_codes.csv"  

# Load NDC codes for opioid prescriptions
ndc_codes_df = pd.read_csv(ndc_codes_path)
ndc_codes = ndc_codes_df['ndc'].tolist()

# Function to identify opioid dates and relevant information
def identify_opioids_dates(dataframe: pd.DataFrame, ndc_codes: list,
                           pat_id_p: str = 'pat_id_p',
                           ndc: str = "ndc", 
                           from_dt: str = "from_dt",
                           quan: str = "quan",
                           dayssup: str = "dayssup",
                           opioid_dts: str = "opioid_dates",
                           presc_opioid: str = "presc_opioid",
                           op_dayssup: str = "op_dayssup",
                           op_quan: str = "op_quan"):
                           
    grouped_df = dataframe.groupby(by=pat_id_p)
    result_df = pd.DataFrame()
    
    for pat_id, group in grouped_df:
        opioid_dates = group.loc[group[ndc].isin(ndc_codes), from_dt]
        opioid_dates = opioid_dates.tolist() if not opioid_dates.empty else []
        
        opioid_presc = group.loc[group[ndc].isin(ndc_codes), ndc]
        opioid_presc = opioid_presc.tolist() if not opioid_presc.empty else []
        
        opioid_quan = group.loc[group[ndc].isin(ndc_codes), quan]
        opioid_quan = opioid_quan.tolist() if not opioid_quan.empty else []
        
        opioid_ds = group.loc[group[ndc].isin(ndc_codes), dayssup]
        opioid_ds = opioid_ds.tolist() if not opioid_ds.empty else []

        # Create a DataFrame for the current patient's opioid information
        patient_result_df = pd.DataFrame({
            pat_id_p: [pat_id], 
            opioid_dts: [opioid_dates], 
            presc_opioid: [opioid_presc], 
            op_dayssup: [opioid_ds], 
            op_quan: [opioid_quan]
        })
        
        result_df = pd.concat([result_df, patient_result_df])
    
    # Merge back with the original dataframe
    result_df = pd.merge(dataframe, result_df, on=pat_id_p, how='left')

    return result_df

# Function to process chunks and create opioid dates
def process_chunks_and_create_opioid_dates(input_file, output_file, ndc_codes, chunk_size=5000):
    processed_patients = 0
    processed_chunks = 0
    start_time = time.time()

    with pd.read_csv(input_file, chunksize=chunk_size) as reader:
        for chunk_idx, chunk_df in enumerate(reader):
            print(f"\nProcessing chunk {chunk_idx + 1}")
            try:
                # Identify opioid dates
                chunk_df_with_opioid_dates = identify_opioids_dates(chunk_df, ndc_codes)

                # Append results to the output file
                chunk_df_with_opioid_dates.to_csv(output_file, mode='a', index=False, header=(chunk_idx == 0))

                # Track the number of processed patients
                processed_patients += len(chunk_df_with_opioid_dates)
                processed_chunks += 1

                print(f"Processed patients: {processed_patients}")
                
            except Exception as e:
                print(f"Error processing chunk {chunk_idx + 1}: {e}")
                continue

    elapsed_time = time.time() - start_time
    print(f"Total processing time: {elapsed_time:.2f} seconds")

# Process the file in chunks and create opioid dates
process_chunks_and_create_opioid_dates(input_file_path, output_file_path, ndc_codes)



In [None]:
# Define file paths
input_file_path =r"\\chcdfiles.uthouston.edu\extract\vivas_postpartum\chelsea\datalake\New_cohort\New_filters\Cohort\42_opioid_info.csv"  
output_file_path = r"\\chcdfiles.uthouston.edu\extract\vivas_postpartum\chelsea\datalake\New_cohort\New_filters\Cohort\42_opioid_in_pregnancy.csv"
chunk_size = 100000
start_time = time.time()


# Function to check opioid use in the 90 days prior to delivery
def check_opioid_in_pregnancy(
    chunk_df,
    pat_id_p='pat_id_p',
    delivery_dt='delivery_dt',
    opioid_dates_col='opioid_dates'
):
    # 1 = exposed in 90 days pre-delivery, 0 = not exposed
    chunk_df['opioid_exposed_90d_pre_delivery'] = 0

    for pat_id, group_df in chunk_df.groupby(pat_id_p):
        delivery_date = pd.to_datetime(
            group_df[delivery_dt].iloc[0],
            errors='coerce'
        )

        if pd.isna(delivery_date):
            continue

        start = delivery_date - pd.Timedelta(days=90)
        end = delivery_date  # delivery day not counted as "pre"

        opioid_dates = group_df[opioid_dates_col].iloc[0]
        if pd.isna(opioid_dates):
            continue

        opioid_dates = [
            pd.to_datetime(d.strip("[Timestramp('')]"), errors='coerce')
            for d in opioid_dates.split(", ")
        ]

        if any(pd.notna(d) and (start <= d < end) for d in opioid_dates):
            chunk_df.loc[
                group_df.index,
                'opioid_exposed_90d_pre_delivery'
            ] = 1

    return chunk_df

# Function to check opioid use in the 7 days after delivery
def check_opioid_after_delivery(chunk_df, pat_id_p='pat_id_p', delivery_dt='delivery_dt', opioid_dates_col='opioid_dates'):
    chunk_df['opioid_after_delivery'] = 'No'  # Default to "No" (no opioid use after delivery period)

    for pat_id, group_df in chunk_df.groupby(pat_id_p):
        delivery_date = pd.to_datetime(group_df[delivery_dt].iloc[0], errors='coerce')
        
        if pd.notna(delivery_date):
            delivery_dt_7 = delivery_date + pd.Timedelta(days=7)

            # Parse opioid dates
            opioid_dates = group_df[opioid_dates_col].iloc[0]
            if not pd.isna(opioid_dates):
                opioid_dates = [pd.to_datetime(date.strip("[Timestramp('')]"), errors='coerce') for date in opioid_dates.split(", ")]

                # If any opioid use is found within 0 to 7 days, set "Yes"
                # If any opioid use is found beyond 7 days, set "No"
                if any(pd.notna(date) and delivery_date <= date <= delivery_dt_7 for date in opioid_dates):
                    chunk_df.loc[group_df.index, 'opioid_after_delivery'] = 'Yes'
                elif any(pd.notna(date) and date > delivery_dt_7 for date in opioid_dates):
                    chunk_df.loc[group_df.index, 'opioid_after_delivery'] = 'No'
                    
    return chunk_df

# Function to process each chunk and filter based on both conditions
def process_chunk(chunk_df):
    # Create binary pre-90 exposure flag
    chunk_df = check_opioid_in_pregnancy(chunk_df)

    # Create postpartum 7-day Yes/No flag
    chunk_df = check_opioid_after_delivery(chunk_df)

    # KEEP ALL pregnancies, only keep those with opioid within 7 days after delivery
    filtered_chunk = chunk_df[chunk_df['opioid_after_delivery'] == 'Yes']

    return filtered_chunk

# Function to process each chunk and track progress
def process_data_sequentially():
    total_chunks = sum(1 for _ in pd.read_csv(input_file_path, chunksize=chunk_size))
    processed_chunks = 0

    with pd.read_csv(input_file_path, chunksize=chunk_size) as reader:
        for chunk_idx, chunk_df in enumerate(reader):
            print(f"\nProcessing chunk {chunk_idx + 1} of {total_chunks}")
            
            # Process the chunk
            filtered_chunk = process_chunk(chunk_df)
            
            # Append results to the output file
            mode = 'a' if chunk_idx > 0 else 'w'
            filtered_chunk.to_csv(output_file_path, mode=mode, index=False, header=(chunk_idx == 0))

            # Track progress
            processed_chunks += 1
            elapsed_time = time.time() - start_time
            estimated_time_remaining = (total_chunks - processed_chunks) * (elapsed_time / processed_chunks)

            # Convert estimated time remaining to HH:MM:SS
            hours, rem = divmod(estimated_time_remaining, 3600)
            minutes, seconds = divmod(rem, 60)
            progress_percentage = (processed_chunks / total_chunks) * 100

            # Display progress
            print(f"Chunk {chunk_idx + 1}/{total_chunks} processed")
            print(f"Total elapsed time: {elapsed_time:.2f} seconds")
            print(f"Estimated time remaining: {int(hours):02}:{int(minutes):02}:{int(seconds):02}")
            print(f"Progress: {progress_percentage:.2f}%")

    total_elapsed_time = time.time() - start_time
    print(f"\nTotal processing time: {total_elapsed_time:.2f} seconds")

# Run the main processing function
process_data_sequentially()