In [None]:
import pandas as pd
import dask.dataframe as dd

# Define chunk size
chunk_size = 1000  # Adjust based on memory

# Function to clean columns and rename
def clean_columns(df, prefix):
    # Strip column names of any leading/trailing spaces
    df.columns = df.columns.str.strip()
    print(f"Columns in {prefix}: {df.columns.tolist()}")  # Print column names for inspection
    if 'PATIENT' not in df.columns:
        raise ValueError(f"PATIENT column missing in {prefix} dataframe")
    df = df.rename(columns=lambda x: f"{prefix}_{x}" if x != "PATIENT" else x)
    return df

# Initialize an empty Dask dataframe for merging
merged_df = dd.from_pandas(pd.DataFrame(), npartitions=1)

# Read and process the allergies dataset in chunks
chunk_list = []
for chunk in pd.read_csv('10k_synthea_covid19_csv/allergies.csv', chunksize=chunk_size):
    chunk = clean_columns(chunk, 'allergies')
    chunk_dd = dd.from_pandas(chunk, npartitions=4)  # Set appropriate partitions
    chunk_list.append(chunk_dd)

# Concatenate all chunks of allergies into a single Dask DataFrame
allergies_df = dd.concat(chunk_list, axis=0, interleave_partitions=True)

# Reset chunk_list for next file
chunk_list = []

# Repeat the merging process for all datasets
# Merging careplans, conditions, immunizations, medications, and observations

def merge_csv(file_path, prefix):
    chunk_list = []
    for chunk in pd.read_csv(file_path, chunksize=chunk_size):
        chunk = clean_columns(chunk, prefix)
        chunk_dd = dd.from_pandas(chunk, npartitions=4)
        chunk_list.append(chunk_dd)
    return dd.concat(chunk_list, axis=0, interleave_partitions=True)

careplans_df = merge_csv('10k_synthea_covid19_csv/careplans.csv', 'careplans')
merged_df = dd.merge(allergies_df, careplans_df, on="PATIENT", how="outer")

conditions_df = merge_csv('10k_synthea_covid19_csv/conditions.csv', 'conditions')
merged_df = dd.merge(merged_df, conditions_df, on="PATIENT", how="outer")

immunizations_df = merge_csv('10k_synthea_covid19_csv/immunizations.csv', 'immunizations')
merged_df = dd.merge(merged_df, immunizations_df, on="PATIENT", how="outer")

medications_df = merge_csv('10k_synthea_covid19_csv/medications.csv', 'medications')
merged_df = dd.merge(merged_df, medications_df, on="PATIENT", how="outer")

observations_df = merge_csv('10k_synthea_covid19_csv/observations.csv', 'observations')
merged_df = dd.merge(merged_df, observations_df, on="PATIENT", how="outer")

# Instead of computing the entire dataframe into memory, we will write it directly to CSV
output_path = 'merged_patient_data-*.csv'  # Dask will save this as multiple CSV files

# Export the merged DataFrame to CSV in parts
merged_df.to_csv(output_path, index=False, single_file=False)

# If you want to monitor the progress or print the first few rows, compute the head
merged_df_computed = merged_df.head()
print(merged_df_computed)


Columns in allergies: ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
Columns in allergies: ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
Columns in allergies: ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
Columns in allergies: ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
Columns in allergies: ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
Columns in allergies: ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
Columns in careplans: ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'REASONCODE', 'REASONDESCRIPTION']
Columns in careplans: ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'REASONCODE', 'REASONDESCRIPTION']
Columns in careplans: ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'REASONCODE', 'REASONDESCRIPTION']
Columns in careplans: ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', '

In [9]:
""" import pandas as pd
import dask.dataframe as dd


conditions_df = pd.read_csv('10k_synthea_covid19_csv/conditions.csv')
immunizations_df = pd.read_csv('10k_synthea_covid19_csv/immunizations.csv')
medications_df = pd.read_csv('10k_synthea_covid19_csv/medications.csv')
observations_df = pd.read_csv('10k_synthea_covid19_csv/observations.csv')
patients_df = pd.read_csv('10k_synthea_covid19_csv/patients.csv')
allergies_df = pd.read_csv(r"10k_synthea_covid19_csv/allergies.csv")
careplans_df = pd.read_csv('10k_synthea_covid19_csv/careplans.csv')

# Display the first few rows of each dataframe to understand their structure
{
    "allergies": allergies_df.head(),
    "careplans": careplans_df.head(),
    "conditions": conditions_df.head(),
    "immunizations": immunizations_df.head(),
    "medications": medications_df.head(),
    "observations": observations_df.head(),
    "patients": patients_df.head()
}
 """

{'allergies':         START STOP                               PATIENT  \
 0  1983-01-22  NaN  df6b563d-1ff4-4833-9af8-84431e641e9c   
 1  1983-01-22  NaN  df6b563d-1ff4-4833-9af8-84431e641e9c   
 2  1983-01-22  NaN  df6b563d-1ff4-4833-9af8-84431e641e9c   
 3  1983-01-22  NaN  df6b563d-1ff4-4833-9af8-84431e641e9c   
 4  2002-02-24  NaN  ff7b040b-aa96-4003-8926-3dac8ca8eb05   
 
                               ENCOUNTER       CODE              DESCRIPTION  
 0  d68c19fe-e9ad-4fad-a047-33b55d5e9ff3  424213003     Allergy to bee venom  
 1  d68c19fe-e9ad-4fad-a047-33b55d5e9ff3  418689008  Allergy to grass pollen  
 2  d68c19fe-e9ad-4fad-a047-33b55d5e9ff3  419263009   Allergy to tree pollen  
 3  d68c19fe-e9ad-4fad-a047-33b55d5e9ff3  417532002          Allergy to fish  
 4  71400bea-4ebf-43d0-923e-fd425584da87   91934008           Allergy to nut  ,
 'careplans':                                      Id       START        STOP  \
 0  fea43343-7312-423f-bb82-b2f5ae71a260  2020-03-01  2020-03-0

In [None]:
""" # Rename columns with Dask for proper annotation
allergies_df = allergies_df.rename(columns=lambda x: f"allergies_{x}" if x != "PATIENT" else x)
careplans_df = careplans_df.rename(columns=lambda x: f"careplans_{x}" if x != "PATIENT" else x)
conditions_df = conditions_df.rename(columns=lambda x: f"conditions_{x}" if x != "PATIENT" else x)
immunizations_df = immunizations_df.rename(columns=lambda x: f"immunizations_{x}" if x != "PATIENT" else x)
medications_df = medications_df.rename(columns=lambda x: f"medications_{x}" if x != "PATIENT" else x)
observations_df = observations_df.rename(columns=lambda x: f"observations_{x}" if x != "PATIENT" else x)

# Merge datasets using Dask
merged_df = dd.merge(allergies_df, careplans_df, on="PATIENT", how="outer")
merged_df = dd.merge(merged_df, conditions_df, on="PATIENT", how="outer")
merged_df = dd.merge(merged_df, immunizations_df, on="PATIENT", how="outer")
merged_df = dd.merge(merged_df, medications_df, on="PATIENT", how="outer")
merged_df = dd.merge(merged_df, observations_df, on="PATIENT", how="outer")

# Compute the result to load into memory (you can persist or save it if needed)
merged_df_computed = merged_df.compute()

# Display the first few rows of the computed dataframe to the user
merged_df_computed.head()
 """

AttributeError: 'DataFrame' object has no attribute 'compute'