# Take a sample of ARMD Dataset.

In [1]:
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

files_info = [
    {
        "name": "cultures_cohort",
        "path": "microbiology_cultures_cohort.csv",
        "merge_on": linked_features, 
        "dtype": {
            "ordering_mode": "object",
            "culture_description": "object",
            "was_positive": "Int32",
            "organism": "object",
            "antibiotic": "object",
            "susceptibility": "object"
        }
    },
    {
        "name": "ward_info",
        "path": "microbiology_cultures_ward_info.csv",
        "merge_on": linked_features, 
        "dtype": {
            "hosp_ward_IP": "Int32",
            "hosp_ward_OP": "Int32",
            "hosp_ward_ER": "Int32",
            "hosp_ward_ICU": "Int32"
        }
    },
    {
        "name": "prior_med",
        "path": "microbiology_cultures_prior_med.csv",
        "merge_on": linked_features,
        "dtype": {
            "medication_name": "object",
            "medication_time_to_culturetime": "Int32",
            "medication_category": "object"
        }
    },
    {
        "name": "microbial_resistance",
        "path": "microbiology_cultures_microbial_resistance.csv",
        "merge_on": linked_features, 
        "dtype": {
            "organism": "object",
            "antibiotic": "object",
            "resistant_time_to_culturetime": "Int32"
        }
    },
    {
        "name": "cultures_demographics",
        "path": "microbiology_cultures_demographics.csv",
        "merge_on": linked_features[:3], 
        "dtype": {
            "age": "object",
            "gender": "object"
        }
    },
     {
        "name": "cultures_labs",
        "path": "microbiology_cultures_labs.csv",
        "merge_on": linked_features[:3], 
        "dtype": {
            "Period_Day": "Int32",
            "Q75_wbc": "object",
            "Q25_wbc": "object",
            "median_wbc": "object",
            "Q25_neutrophils": "float32",
            "Q75_neutrophils": "float32",
            "median_neutrophils": "float32",
            "Q25_lymphocytes": "float32",
            "Q75_lymphocytes": "float32",
            "median_lymphocytes": "float32",
            "Q25_hgb": "object",
            "Q75_hgb": "object",
            "median_hgb": "object",
            "Q25_plt": "object",
            "Q75_plt": "object",
            "median_plt": "object",
            "Q75_na": "object",
            "Q25_na": "object",
            "median_na": "object",
            "Q75_hco3": "object",
            "Q25_hco3": "object",
            "median_hco3": "object",
            "Q75_bun": "object",
            "Q25_bun": "object",
            "median_bun": "object",
            "Q75_cr": "object",
            "Q25_cr": "object",
            "median_cr": "object",
            "Q75_lactate": "object",
            "Q25_lactate": "object",
            "median_lactate": "object",
            "Q75_procalcitonin": "object",
            "Q25_procalcitonin": "object",
            "median_procalcitonin": "object",
            "first_procalcitonin": "object",
            "last_procalcitonin": "object",
            "first_lactate":"object",
            "last_cr":"object",
            "first_cr":"object",
            "last_bun":"object",
            "first_bun":"object",
            "last_hco3":"object",
            "first_hco3":"object",
            "last_na":"object",
            "first_na":"object",
            "last_plt":"object",
            "first_plt":"object",
            "last_hgb":"object",
            "first_hgb":"object",
            "last_lymphocytes":"object",
            "first_lymphocytes":"object",
            "last_neutrophils":"object",
            "first_neutrophils":"object",
            "last_wbc":"object",
            "first_wbc":"object" 
        }
    },
    {
        "name": "cultures_vitals",
        "path": "microbiology_cultures_vitals.csv",
        "merge_on": linked_features[:3],
        "dtype": {
            "Q25_heartrate": "object",
            "Q75_heartrate": "object",
            "median_heartrate": "object",
            "Q25_resprate": "object",
            "Q75_resprate": "object",
            "median_resprate": "object",
            "Q25_temp": "object",
            "Q75_temp": "object",
            "median_temp": "object",
            "Q25_sysbp": "float32",
            "Q75_sysbp": "float32",
            "median_sysbp": "float32",
            "Q25_diasbp": "float32",
            "Q75_diasbp": "float32",
            "median_diasbp": "float32",
            "first_diasbp": "object",
            "last_diasbp": "object",
            "last_sysbp": "object",
            "first_sysbp": "object",
            "last_temp": "object",
            "first_temp": "object",
            "last_resprate": "object",
            "first_resprate": "object",
            "last_heartrate": "object",
            "first_heartrate": "object"
        }
    },
     {
        "name": "antibiotic_class_exposure",
        "path": "microbiology_cultures_antibiotic_class_exposure.csv",
        "merge_on": linked_features, 
        "dtype": {
            "medication_object": "object",
            "medication_name": "object",
            "antibiotic_class": "object",
            "time_to_culturetime": "Int32"
        }
    },
    {
        "name": "antibiotic_subtype_exposure",
        "path": "microbiology_cultures_antibiotic_subtype_exposure.csv",
        "merge_on": linked_features, 
        "dtype": {
            "medication_object": "object",
            "medication_name": "object",
            "antibiotic_subtype": "object",
            "antibiotic_subtype_object": "object",
            "medication_time_to_cultureTime": "Int32"
        }
    },
    {
        "name": "prior_infecting_organism",
        "path": "microbiology_culture_prior_infecting_organism.csv",
        "merge_on": linked_features, 
        "dtype": {
            "prior_organism": "object",
            "prior_infecting_organism_days_to_culutre": "Int32"
        }
    },
    {
        "name": "cultures_comorbidity",
        "path": "microbiology_cultures_comorbidity.csv",
        "merge_on": linked_features, 
        "dtype": {
            "comorbidity_component": "object",
            "comorbidity_component_start_days_culture": "Int32",
            "comorbidity_component_end_days_culture": "float32"
        }
    },
    {
        "name": "cultures_priorprocedures.csv",
        "path": "microbiology_cultures_priorprocedures.csv",
        "merge_on": linked_features, 
        "dtype": {
            "procedure_description": "object",
            "procedure_time_to_culturetime": "Int32"
        }
    },
    {
        "name": "adi_scores",
        "path": "microbiology_cultures_adi_scores.csv",
        "merge_on": linked_features, 
        "dtype": {
            "adi_score": "object",
            "adi_state_rank": "object"
        }
    },
    {
        "name": "nursing_home_visits",
        "path": "microbiology_cultures_adi_scores.csv",
        "merge_on": linked_features, 
        "dtype": {
            "nursing_home_visit_culture": "Int32"
        }
    },
    {
        "name": "implied_susceptibility",
        "path": "microbiology_cultures_implied_susceptibility.csv",
        "merge_on": linked_features[:3], 
        "dtype": {
            "organism": "object",
            "antibiotic": "object",
            "susceptibility": "object",
            "implied_susceptibility": "object"
        }
    }
    #, {
    #     "name": "implied_susceptibility_rules",
    #     "path": "microbiology_cultures_implied_susceptibility.csv",
    #     "merge_on": None , 
    #     "dtype": {
    #         "Organism": "object",
    #         "Antibiotic": "object",
    #         "Susceptibility": "object",
    #     }
    # }
]


In [3]:
import pandas as pd
import os

output_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
file = 'sample_microbiology_cultures_cohort.csv'

for filename in os.listdir(output_folder):
    file_path = os.path.join(output_folder, filename)
    if os.path.isfile(file_path) and filename == file:
        os.remove(file_path)

folder_path = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/'
# Load the main culture file
cohort = pd.read_csv(folder_path + "microbiology_cultures_cohort.csv")

# Keep only cultures with a known susceptibility result
cohort = cohort[cohort["susceptibility"].isin(["Resistant", "Susceptible", "Intermediate"])]

# Select the first culture per patient (can be changed to latest or random)
cohort_sorted = cohort.sort_values("order_time_jittered_utc")
one_culture_per_patient = cohort_sorted.groupby("anon_id").first().reset_index()

# Take a sample of 10,000 patients
patient_sample = one_culture_per_patient.sample(n=1000, random_state=42)
patient_sample.to_csv(os.path.join(folder_path+'Processed_ARMD_Dataset/sample_microbiology_cultures_cohort.csv'), index=False)

# Extract linking keys
sample_order_ids = patient_sample["order_proc_id_coded"].unique()
sample_anon_ids = patient_sample["anon_id"].unique()


In [4]:
import pandas as pd
import os
import shutil


output_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
keep_file = 'sample_microbiology_cultures_cohort.csv'

for filename in os.listdir(output_folder):
    file_path = os.path.join(output_folder, filename)
    if os.path.isfile(file_path) and filename != keep_file:
        os.remove(file_path)

# List of files related to order_proc_id_coded
order_level_files = [
    "microbiology_cultures_labs.csv",
    "microbiology_cultures_vitals.csv",
    "microbiology_cultures_prior_med.csv",
    "microbiology_cultures_microbial_resistance.csv",
    "microbiology_cultures_priorprocedures.csv",
    "microbiology_cultures_implied_susceptibility.csv"
]

# List of files related to anon_id
patient_level_files = [
    "microbiology_cultures_demographics.csv",
    "microbiology_cultures_comorbidity.csv",
    "microbiology_cultures_adi_scores.csv",
    "microbiology_cultures_nursing_home_visits.csv"
]

# def getDatatype(file):
#     for f in files_info:
#         if f['path']== file:
#             return f['dtype']
#     return None 


# Filter order-level files
chunk_size = 10000

for file in order_level_files:
    first_chunk = True 
    for chunk in pd.read_csv(folder_path + file, chunksize=chunk_size, low_memory=False):
        filtered_df = chunk[chunk["order_proc_id_coded"].isin(sample_order_ids)]
        filtered_df.to_csv(output_folder+'sample_'+file, mode='a', index=False, header=first_chunk)
        first_chunk = False


for file in patient_level_files:
    first_chunk = True 
    for chunk in pd.read_csv(folder_path + file, chunksize=chunk_size, low_memory=False):
        filtered_df = chunk[chunk["anon_id"].isin(sample_anon_ids)]
        filtered_df.to_csv(output_folder+'sample_'+file, mode='a', index=False, header=first_chunk)
        first_chunk = False


**sample files**

    "sample_microbiology_cultures_cohort.csv"
    "sample_microbiology_cultures_labs.csv"
    "sample_microbiology_cultures_vitals.csv"
    "sample_microbiology_cultures_prior_med.csv"
    "sample_microbiology_cultures_microbial_resistance.csv"
    "sample_microbiology_cultures_priorprocedures.csv"
    "sample_microbiology_cultures_implied_susceptibility.csv"
    "sample_microbiology_cultures_demographics.csv"
    "sample_microbiology_cultures_comorbidity.csv"
    "sample_microbiology_cultures_adi_scores.csv"
    "sample_microbiology_cultures_nursing_home_visits.csv"

## Using Dask 

In [35]:
import dask.dataframe as dd
import glob
import os 

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(input_folder + "*.csv")

print(f'left join, left: {os.path.basename(csv_files[1])} ,right: {os.path.basename(csv_files[0])} ')

df1 = dd.read_csv(csv_files[1])# cohort file
df2 = dd.read_csv(csv_files[0])

result = df1.merge(df2, how='left', on=linked_features)
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: sample_microbiology_cultures_cohort.csv ,right: sample_microbiology_cultures_adi_scores.csv 


In [37]:
import dask.dataframe as dd
import glob
import os 

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']


csv_files = glob.glob(input_folder + "*.csv")
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[2])} ')

df1 = dd.read_parquet(output_file)
df2 = dd.read_csv(csv_files[2])

result = df1.merge(df2, how='left', on=linked_features)
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: part.0.parquet ,right: sample_microbiology_cultures_comorbidity.csv 


In [39]:
import dask.dataframe as dd
import glob
import os

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[3])} ')


dtype={'gender': 'object'}

df1 = dd.read_parquet(output_file)
df2 = dd.read_csv(csv_files[3], dtype=dtype)

result = df1.merge(df2, how='left', on=linked_features[:3])
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: part.0.parquet ,right: sample_microbiology_cultures_demographics.csv 


In [41]:
import dask.dataframe as dd
import glob
import os

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[4])} ')


dtype={'gender': 'object'}

df1 = dd.read_parquet(output_file, dtype=dtype)
df2 = dd.read_csv(csv_files[4], dtype=dtype)

result = df1.merge(df2, how='left', on=linked_features[:3], suffixes=('_left', '_right'))
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: part.0.parquet ,right: sample_microbiology_cultures_implied_susceptibility.csv 


In [43]:
import dask.dataframe as dd
import glob
import os

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[5])} ')


dtype_fallback = {
    'adi_score': 'object',
    'adi_state_rank': 'object',
    'gender': 'object'
}

df1 = dd.read_parquet(output_file, dtype=dtype_fallback)
df2 = dd.read_csv(csv_files[5], dtype=dtype_fallback)

result = df1.merge(df2, how='left', on=linked_features[:3], suffixes=('_left', '_right'))
result.to_parquet(output_file, engine='pyarrow', write_index=False)


left join, left: part.0.parquet ,right: sample_microbiology_cultures_labs.csv 


In [45]:
import dask.dataframe as dd
import glob
import os

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[6])} ')


dtype={
            "Period_Day": "float64",
            "Q75_wbc": "category",
            "Q25_wbc": "category",
            "median_wbc": "category",
            "Q25_neutrophils": "float64",
            "Q75_neutrophils": "float64",
            "median_neutrophils": "float64",
            "Q25_lymphocytes": "float64",
            "Q75_lymphocytes": "float64",
            "median_lymphocytes": "float64",
            "Q25_hgb": "category",
            "Q75_hgb": "category",
            "median_hgb": "category",
            "Q25_plt": "category",
            "Q75_plt": "category",
            "median_plt": "category",
            "Q75_na": "category",
            "Q25_na": "category",
            "median_na": "category",
            "Q75_hco3": "category",
            "Q25_hco3": "category",
            "median_hco3": "category",
            "Q75_bun": "category",
            "Q25_bun": "category",
            "median_bun": "category",
            "Q75_cr": "category",
            "Q25_cr": "category",
            "median_cr": "category",
            "Q75_lactate": "category",
            "Q25_lactate": "category",
            "median_lactate": "category",
            "Q75_procalcitonin": "category",
            "Q25_procalcitonin": "category",
            "median_procalcitonin": "category",
            "first_procalcitonin": "category",
            "last_procalcitonin": "category",
            "first_lactate":"category",
            "last_cr":"category",
            "first_cr":"category",
            "last_bun":"category",
            "first_bun":"category",
            "last_hco3":"category",
            "first_hco3":"category",
            "last_na":"category",
            "first_na":"category",
            "last_plt":"category",
            "first_plt":"category",
            "last_hgb":"category",
            "first_hgb":"category",
            "last_lymphocytes":"category",
            "first_lymphocytes":"category",
            "last_neutrophils":"category",
            "first_neutrophils":"category",
            "last_wbc":"category",
            "first_wbc":"category",
            'adi_score': 'float64',
            'adi_state_rank': 'float64',
            'gender': 'float64',
            'implied_susceptibility': 'object'
        }

df1 = dd.read_parquet(output_file, dtype=dtype)
df2 = dd.read_csv(csv_files[6], dtype=dtype)

result = df1.merge(df2, how='left', on=linked_features, suffixes=('_left', '_right'))
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: part.0.parquet ,right: sample_microbiology_cultures_microbial_resistance.csv 


In [47]:
import dask.dataframe as dd
import glob
import os

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[7])} ')


dtype={
            "Period_Day": "float64",
            "Q75_wbc": "category",
            "Q25_wbc": "category",
            "median_wbc": "category",
            "Q25_neutrophils": "float64",
            "Q75_neutrophils": "float64",
            "median_neutrophils": "float64",
            "Q25_lymphocytes": "float64",
            "Q75_lymphocytes": "float64",
            "median_lymphocytes": "float64",
            "Q25_hgb": "category",
            "Q75_hgb": "category",
            "median_hgb": "category",
            "Q25_plt": "category",
            "Q75_plt": "category",
            "median_plt": "category",
            "Q75_na": "category",
            "Q25_na": "category",
            "median_na": "category",
            "Q75_hco3": "category",
            "Q25_hco3": "category",
            "median_hco3": "category",
            "Q75_bun": "category",
            "Q25_bun": "category",
            "median_bun": "category",
            "Q75_cr": "category",
            "Q25_cr": "category",
            "median_cr": "category",
            "Q75_lactate": "category",
            "Q25_lactate": "category",
            "median_lactate": "category",
            "Q75_procalcitonin": "category",
            "Q25_procalcitonin": "category",
            "median_procalcitonin": "category",
            "first_procalcitonin": "category",
            "last_procalcitonin": "category",
            "first_lactate":"category",
            "last_cr":"category",
            "first_cr":"category",
            "last_bun":"category",
            "first_bun":"category",
            "last_hco3":"category",
            "first_hco3":"category",
            "last_na":"category",
            "first_na":"category",
            "last_plt":"category",
            "first_plt":"category",
            "last_hgb":"category",
            "first_hgb":"category",
            "last_lymphocytes":"category",
            "first_lymphocytes":"category",
            "last_neutrophils":"category",
            "first_neutrophils":"category",
            "last_wbc":"category",
            "first_wbc":"category",
            'adi_score': 'float64',
            'adi_state_rank': 'float64',
            'gender': 'float64',
            'implied_susceptibility': 'object',
            'antibiotic_microbial_resistance': 'object',
           'implied_susceptibility': 'object',
           'organism_microbial_resistance': 'object'
        }

df1 = dd.read_parquet(output_file, dtype=dtype)
df2 = dd.read_csv(csv_files[7], dtype=dtype)

result = df1.merge(df2, how='left', on=linked_features, suffixes=('_left', '_right'))
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: part.0.parquet ,right: sample_microbiology_cultures_nursing_home_visits.csv 


In [49]:
import dask.dataframe as dd
import glob
import os

input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

csv_files = glob.glob(os.path.join(input_folder, "*.csv"))
parquet_files = glob.glob(output_file + "*.parquet")

print(f'left join, left: {os.path.basename(parquet_files[0])} ,right: {os.path.basename(csv_files[9])} ')


dtype_fix = {
    'antibiotic_microbial_resistance': 'object',
    'organism_microbial_resistance': 'object',
    'implied_susceptibility': 'object'
}

df1 = dd.read_parquet(output_file, dtype=dtype)
df2 = dd.read_csv(csv_files[9], dtype=dtype)

result = df1.merge(df2, how='left', on=linked_features, suffixes=('_left', '_right'))
result.to_parquet(output_file, engine='pyarrow', write_index=False)



left join, left: part.0.parquet ,right: sample_microbiology_cultures_prior_med.csv 


In [50]:
import dask.dataframe as dd
import glob
import os

# Set up file paths and keys for merging
input_folder = 'doi_10_5061_dryad_jq2bvq8kp__v20250411/Processed_ARMD_Dataset/'
output_file = 'output/'
linked_features = ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc']

# List CSV and Parquet files
csv_files = sorted(glob.glob(os.path.join(input_folder, "*.csv")))
parquet_files = sorted(glob.glob(os.path.join(output_file, "*.parquet")))

# Verify files exist
if not parquet_files:
    raise ValueError("No Parquet files found in output directory")
if not csv_files:
    raise ValueError("No CSV files found in input directory")

print("Available CSV files:")
for i, file in enumerate(csv_files):
    print(f"{i}: {os.path.basename(file)}")

# Let's use the vitals file (sample_microbiology_cultures_vitals.csv) which is index 10
csv_to_use = csv_files[10]  # This is now safe since we know there are 11 files
print(f'\nLeft join\nLeft: {os.path.basename(parquet_files[0])}, Right: {os.path.basename(csv_to_use)}')

# Define float32 types for numeric columns to save memory
dtype = {
    col: 'float32' for col in [
        "Q25_heartrate", "Q75_heartrate", "median_heartrate", "Q25_resprate", "Q75_resprate", "median_resprate",
        "Q25_temp", "Q75_temp", "median_temp", "Q25_sysbp", "Q75_sysbp", "median_sysbp",
        "Q25_diasbp", "Q75_diasbp", "median_diasbp", "first_diasbp", "last_diasbp",
        "last_sysbp", "first_sysbp", "last_temp", "first_temp",
        "last_resprate", "first_resprate", "last_heartrate", "first_heartrate"
    ]
}

# Load files
df1 = dd.read_parquet(parquet_files[0], dtype=dtype)
df2 = dd.read_csv(csv_to_use, dtype=dtype, na_values=["Null", "NA", "N/A", ""], blocksize="16MB")

# Inspect available columns
print("\nColumns in Parquet file:", df1.columns.tolist())
print("Columns in CSV file:", df2.columns.tolist())

# Determine common columns for merge
common_features = [col for col in linked_features if col in df1.columns and col in df2.columns]
print("\nCommon features for merge:", common_features)

if not common_features:
    raise ValueError("No common columns found for merging")

# Check for duplicates
total_rows = df2[common_features].shape[0].compute()
distinct_rows = df2[common_features].drop_duplicates().shape[0].compute()
dupes = total_rows - distinct_rows
print(f"\nDuplicates in df2 keys: {dupes}")

# Drop duplicates if any
if dupes > 0:
    df2 = df2.drop_duplicates(subset=common_features)

# Set index to the first common key for better performance
df1 = df1.set_index(common_features[0])
df2 = df2.set_index(common_features[0])

# Merge on remaining common keys
merge_features = common_features[1:] if len(common_features) > 1 else []
if merge_features:
    result = df1.merge(df2, how='left', on=merge_features)
else:
    result = df1.join(df2, how='left')

# Save result
result.to_parquet('output/merged_result.parquet', engine='pyarrow', write_index=False)
print("\nMerge completed successfully. Results saved to output/merged_result.parquet")

Available CSV files:
0: sample_microbiology_cultures_adi_scores.csv
1: sample_microbiology_cultures_cohort.csv
2: sample_microbiology_cultures_comorbidity.csv
3: sample_microbiology_cultures_demographics.csv
4: sample_microbiology_cultures_implied_susceptibility.csv
5: sample_microbiology_cultures_labs.csv
6: sample_microbiology_cultures_microbial_resistance.csv
7: sample_microbiology_cultures_nursing_home_visits.csv
8: sample_microbiology_cultures_prior_med.csv
9: sample_microbiology_cultures_priorprocedures.csv
10: sample_microbiology_cultures_vitals.csv

Left join
Left: part.0.parquet, Right: sample_microbiology_cultures_vitals.csv

Columns in Parquet file: ['anon_id', 'pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc', 'ordering_mode_x', 'culture_description_x', 'was_positive_x', 'organism_x', 'antibiotic_x', 'susceptibility_x', 'adi_score', 'adi_state_rank', 'ordering_mode_y', 'culture_description_y', 'was_positive_y', 'organism_y', 'antibiotic_y', 'susceptib

# Check the last left join result

In [52]:
import dask.dataframe as dd

output_file = 'output/merged_result.parquet'
df = dd.read_parquet(output_file)
print("Available columns:", df.columns.tolist())

Available columns: ['pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc', 'ordering_mode_x', 'culture_description_x', 'was_positive_x', 'organism_x', 'antibiotic_x', 'susceptibility_x', 'adi_score', 'adi_state_rank', 'ordering_mode_y', 'culture_description_y', 'was_positive_y', 'organism_y', 'antibiotic_y', 'susceptibility_y', 'age', 'gender', 'organism_left', 'antibiotic_left', 'susceptibility', 'implied_susceptibility', 'Period_Day', 'Q75_wbc', 'Q25_wbc', 'median_wbc', 'Q25_neutrophils', 'Q75_neutrophils', 'median_neutrophils', 'Q25_lymphocytes', 'Q75_lymphocytes', 'median_lymphocytes', 'Q25_hgb', 'Q75_hgb', 'median_hgb', 'Q25_plt', 'Q75_plt', 'median_plt', 'Q75_na', 'Q25_na', 'median_na', 'Q75_hco3', 'Q25_hco3', 'median_hco3', 'Q75_bun', 'Q25_bun', 'median_bun', 'Q75_cr', 'Q25_cr', 'median_cr', 'Q75_lactate', 'Q25_lactate', 'median_lactate', 'Q75_procalcitonin', 'Q25_procalcitonin', 'median_procalcitonin', 'first_procalcitonin', 'last_procalcitonin', 'last_lactat

# To make sure the size of sample after each join

In [55]:
import pandas as pd
import pyarrow.parquet as pq

sample_folder='output/merged_result.parquet/'
parquet_files = glob.glob(sample_folder + "*.parquet") 
sample_file = sample_folder + os.path.basename(parquet_files[0])
pf = pq.ParquetFile(sample_file)

print("Number of rows:", pf.metadata.num_rows)
print("Available columns:", pf.schema.names)


Number of rows: 2902702
Available columns: ['pat_enc_csn_id_coded', 'order_proc_id_coded', 'order_time_jittered_utc', 'ordering_mode_x', 'culture_description_x', 'was_positive_x', 'organism_x', 'antibiotic_x', 'susceptibility_x', 'adi_score', 'adi_state_rank', 'ordering_mode_y', 'culture_description_y', 'was_positive_y', 'organism_y', 'antibiotic_y', 'susceptibility_y', 'age', 'gender', 'organism_left', 'antibiotic_left', 'susceptibility', 'implied_susceptibility', 'Period_Day', 'Q75_wbc', 'Q25_wbc', 'median_wbc', 'Q25_neutrophils', 'Q75_neutrophils', 'median_neutrophils', 'Q25_lymphocytes', 'Q75_lymphocytes', 'median_lymphocytes', 'Q25_hgb', 'Q75_hgb', 'median_hgb', 'Q25_plt', 'Q75_plt', 'median_plt', 'Q75_na', 'Q25_na', 'median_na', 'Q75_hco3', 'Q25_hco3', 'median_hco3', 'Q75_bun', 'Q25_bun', 'median_bun', 'Q75_cr', 'Q25_cr', 'median_cr', 'Q75_lactate', 'Q25_lactate', 'median_lactate', 'Q75_procalcitonin', 'Q25_procalcitonin', 'median_procalcitonin', 'first_procalcitonin', 'last_proc

In [59]:
import pyarrow.parquet as pq
import glob
import os

sample_folder='output/merged_result.parquet/'
parquet_files = glob.glob(sample_folder + "*.parquet") 
sample_file = sample_folder + os.path.basename(parquet_files[0])

sample_file = 'output/'+ os.path.basename(parquet_files[0])

print(f'Sample file: {os.path.basename(parquet_files[0])}')

table = pq.ParquetFile(sample_file)
for batch in table.iter_batches(batch_size=100000, columns=id_columns):
    df_batch = batch.to_pandas()
    unique_combinations.update(df_batch.drop_duplicates().itertuples(index=False, name=None))

print(f'Unique combinations: {len(unique_combinations)}')

Sample file: part.0.parquet
Unique combinations: 1000


The sample is 1000 as expected