In [2]:
import pandas as pd
import os
import logging
import numpy as np

In [3]:
# Logging events 
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)

In [4]:
# Set directory to files to get list of file_paths excluding subdirectories
directory = '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms'
file_paths = [os.path.join(directory, file) for file in os.listdir(directory) if os.path.isfile(os.path.join(directory, file))]

In [7]:
# Make PtReg file the first item in the list since we shall need it to be set as the base with all the participant registrations
PtReg = '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg.csv'
if PtReg in file_paths: 
    file_paths.insert(0, file_paths.pop(file_paths.index(PtReg)))
else:
    file_paths.insert(0, PtReg)
len(file_paths)

340

In [26]:
# Define the aggregation functions
def custom_agg(series):
    # Filter out NaN values before joining
    return ', '.join(series.dropna().astype(str))

def list_agg(series):
    return list(series.dropna())

# Specify the columns you want to group by
group_columns = ['xCohortID', 'xParticipantID']
drop_columns = ['ProtocolID', 'xSiteID']

def aggregate_data(grouping_columns, grouping_df): 
    # Create an empty aggregation dictionary
    agg_dict = {}
    
    # Dynamically add columns to the dictionary with the appropriate function
    for col in grouping_df.columns:
        if col in grouping_columns:
            continue  # Skip group by columns
        #if df[col].dtype == 'float64' or df[col].dtype == 'int64':
        #agg_dict[col] = ['mean', 'max']  # Apply both mean and max
        elif grouping_df[col].dtype in [np.float64, np.int64]:
            agg_dict[col] = list_agg  # Use list aggregation for numeric columns
    
        else:
            agg_dict[col] = custom_agg  # Apply custom concatenation
    
    # Apply the aggregation to the DataFrame
    grouped_df = grouping_df.groupby(group_columns).agg(agg_dict)

    # Flatten the multi-level columns if necessary
    grouped_df.columns = ['_'.join(col) if isinstance(col, tuple) else col for col in grouped_df.columns.values]
    grouped_df.reset_index(inplace=True)

    # Initialize a list to collect row data dictionaries
    rows_list = []

    # Preprocessing grouped_df 
    # add life stages/visit_name on columns to capture responses where multiple visits are created e.g. life_stage_id.variable_name
    for index, row in grouped_df.iterrows():
        participant = row['xParticipantID']
        cohort = row['xCohortID']
        visits = row['VisitName'].split(",")
        dates = row['xFormDT'].split(",")

        row_data = {'xParticipantID': participant, 'xCohortID':cohort, 'VisitName':visits, 'xFormDT': dates}

        for col in grouped_df.columns[4:]: # Skip the four three columns which is 'xParticipantID', 'xCohortID', 'VisitName' 'xFormDT'
            values = row[col]
            for visit, value in zip(visits, values):
                col_name = f"{visit}.{col}"
                row_data[col_name] = value
        
        rows_list.append(row_data)

    # Convert the list of dictionaries to a DataFrame
    result_df = pd.DataFrame(rows_list)
    
    return result_df
    
def combine_echo_files(files):
    # Initialize an empty DataFrame and trackers
    base_df = pd.DataFrame()
    file_count = 0
    merged_files = []
    # Process each file
    for index, file in enumerate(files):
        if file.endswith('.csv'):
            # Read the current file into a DataFrame
            temp_df = pd.read_csv(file, low_memory=False)

            # Dynamic suffixes based on the file names
            base_file = files[index-1].replace(directory, '')
            base_form_name = os.path.splitext(base_file)[0]
            temp_file = file.replace(directory, '')
            temp_form_name = os.path.splitext(temp_file)[0]
            suffixes = (f'_{base_form_name }', f'_{temp_form_name}')
            
            logging.info('Reading %s - %s', file, suffixes)
        
            # Add the first file(PtReg)
            if base_df.empty:
                base_df = temp_df
            else:
               # Remove columns that are repeated and/or not relevant 
                temp_df = temp_df.drop(columns=[col for col in drop_columns if col in temp_df.columns])
               
                # Aggregate the data in the form to combine with the PtReg 
                temp_df = aggregate_data(group_columns, temp_df)
            
                # Merge the new DataFrame with the base DataFrame on the join key
                base_df = pd.merge(base_df, temp_df, on=['xCohortID', 'xParticipantID'], how='left', suffixes=suffixes)
        file_count += 1
        merged_files.append(file)
        logging.info('Files processed: %s', file_count)
    # Remove PtReg file from merged files so that it can be used as base in next batch
    merged_files.remove(PtReg)
    return base_df, merged_files, file_count

In [27]:
combined_df, merged_files, files_count= combine_echo_files(file_paths[0:3])

2024-06-03 13:24:28,824 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg.csv - ('_/Ess_HSE_HOME_EC', '_/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg')
2024-06-03 13:24:28,825 - Files processed: 1
2024-06-03 13:24:28,872 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_CNH_SDQ_4.csv - ('_/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg', '_/Ess_CNH_SDQ_4')
2024-06-03 13:25:21,474 - Files processed: 2
2024-06-03 13:25:21,509 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_HSE_HOME_EC.csv - ('_/Ess_CNH_SDQ_4', '_/Ess_HSE_HOME_EC')
2024-06-03 13:25:29,484 - Files processed: 3


In [31]:
combined_df.to_csv('combined_echo_data_3.csv', index=False)

In [28]:
combined_df.shape

(63215, 2187)

In [30]:
combined_df.head()

Unnamed: 0,xParticipantID,xOthPtID1,ProtocolID,xCohortID,xSiteID,xCohortID_UH3,ExplicitRegistration,ParticipantType,ParticipationLevel,PregnancyNumber,...,X1_MC02.ftr_setting,X1_EC05.ftr_setting,X1_MC02.ftr_mode,X1_EC05.ftr_mode,X1_MC02.HOME_EC_EmanifestSource,X1_EC05.HOME_EC_EmanifestSource,X1_MC02.HOME_EC_AgeInDays,X1_EC05.HOME_EC_AgeInDays,X1_MC02.HOME_EC_AgeInYears,X1_EC05.HOME_EC_AgeInYears
0,A10002-01-0,,EWCP_New,AAX06,,AAX06,0,P,2,1,...,,,,,,,,,,
1,A10670-01-0,,EWCP_New,AAX06,,AAX06,0,P,2,1,...,,,,,,,,,,
2,A20735-02-A,,EWCP_New,AAL01,,AAL02,1,C,2,2,...,,,,,,,,,,
3,A30349-01-A,,EWCP_New,AEA01,,AEA01,1,C,2,1,...,3.0,3.0,2.0,2.0,C,o,2645.0,1793.0,7.0,4.0
4,A37520-01-A,,EWCP_New,ACA01,,ACA01,1,C,2,1,...,,,,,,,,,,


In [192]:
merged_files_30 = merged_files

In [194]:
merged_files_30

['/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CNH_SDQ_4.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_HSE_HOME_EC.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Rec_RCh_TSR_6_12.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Rec_RCh_SESC_6_12.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CWB_PGLS5a_A.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CHB_BLOCK2.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Rec_RCh_NDSR04_C.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CPH_PDS.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CNP_PPVT3a.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CNH_ASQ_18.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CHB_CFSP.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Rec_RCh_CPrU_PR.csv',
 '/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CNH_ASQ_24.csv',
 '/Users/faith/Desktop/ECHO/raw_d

In [246]:
file_paths.index('/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CPH_CG_CAPE.csv')

98

In [248]:
file_paths[99]

'/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_Dem_GI.csv'

In [251]:
file_paths[149]

'/Users/faith/Desktop/ECHO/raw_downloaded/csv_files/Ess_CNH_ASQ_54.csv'

Notes:  file Rec_RCh_NDSR02_C.csv is large file number 54 and takes a while to execute ... over an hour on local computer resources 

In [39]:
batch1 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_1.csv', low_memory=False)

In [17]:
batch1.columns.to_list()

['xParticipantID',
 'xOthPtID1',
 'ProtocolID',
 'xCohortID',
 'xSiteID',
 'xCohortID_UH3',
 'ExplicitRegistration',
 'ParticipantType',
 'ParticipationLevel',
 'PregnancyNumber',
 'DeliveryOrder',
 'Ethnicity',
 'Race',
 'Sex',
 'xDateOfBirth',
 'YearOfBirth',
 'xExpectedDueDate',
 'ExpectedYearOfBirth',
 'xCohortEnrollmentDate',
 'YearOfCohortEnrollment',
 'xProtocolEnrollmentDate',
 'FinalParticipationLevel',
 'FinalLevelReason',
 'xOtherfinalReason',
 'Withdrawn',
 'xWithdrawnReason',
 'VisitName_Ess_CNH_SDQ_4',
 'xFormDT_Ess_CNH_SDQ_4',
 'SequenceNum_Ess_CNH_SDQ_4',
 'respondent_Ess_CNH_SDQ_4',
 'otherresp_Ess_CNH_SDQ_4',
 'sdq_4_1',
 'sdq_4_2',
 'sdq_4_3',
 'sdq_4_4',
 'sdq_4_5',
 'sdq_4_6',
 'sdq_4_7',
 'sdq_4_8',
 'sdq_4_9',
 'sdq_4_10',
 'sdq_4_11',
 'sdq_4_12',
 'sdq_4_13',
 'sdq_4_14',
 'sdq_4_15',
 'sdq_4_16',
 'sdq_4_17',
 'sdq_4_18',
 'sdq_4_19',
 'sdq_4_20',
 'sdq_4_21',
 'sdq_4_22',
 'sdq_4_23',
 'sdq_4_24',
 'sdq_4_25',
 'sdq_4_emotional',
 'sdq_4_conduct',
 'sdq_4_hyp

In [35]:
def get_form_names(columns_list):
    forms = []
    for item in columns_list: 
        if 'VisitName' in item: 
            print(item)
            form_name = item.strip('VisitName_')
            forms.append(form_name)
    return forms
    


In [40]:
batch1_forms = get_form_names(batch1.columns.to_list())
print(len(batch1_forms))
print(batch1_forms)

VisitName_Ess_CNH_SDQ_4
VisitName_Ess_HSE_HOME_EC
VisitName_Rec_RCh_TSR_6_12
VisitName_Rec_RCh_SESC_6_12
VisitName_Ess_CWB_PGLS5a_A
VisitName_Ess_CHB_BLOCK2
VisitName_Rec_RCh_NDSR04_C
VisitName_Ess_CPH_PDS
VisitName_Ess_CNP_PPVT3a
VisitName_Ess_CNH_ASQ_18
VisitName_Ess_CHB_CFSP
VisitName_Rec_RCh_CPrU_PR
VisitName_Ess_CNH_ASQ_24
VisitName_Ess_SRP_EAC_PR
VisitName_Ess_CNH_ASQ_30
VisitName_Ess_CNH_SCQ
VisitName_Ess_CWB_PLS8b_Ped
VisitName_Rec_RCg_CTS
VisitName_Ess_HSE_ACE_aPV_A
VisitName_Rec_RCh_Caff_PR
VisitName_Rec_RCg_PSOC
VisitName_Ess_CNH_SB5
VisitName_Ess_CNH_RECBQvSF
VisitName_Ess_CNH_PPSE4a_Ped
VisitName_Ess_Prg_MMRAaG
VisitName_Ess_HSE_FES_Coh
VisitName_Rec_RCh_C19SSD_cPR
VisitName_Ess_CHB_BLOCK3
VisitName_Rec_RCg_SRS2CG
VisitName_Ess_CHB_PPA8a_PP
VisitName_Rec_RCh_NTES_SR
VisitName_Ess_CNH_WISC5
VisitName_Rec_RCh_CADHD_PR
VisitName_Rec_Mat_PUQE
VisitName_Ess_SRP_HPAP_CR
VisitName_Ess_HSE_HOME_IT
VisitName_Rec_RCg_PGLS5a
VisitName_Ess_CNP_PGHM2a
VisitName_Ess_CNP_PSS14
VisitName_

In [41]:
'VisitName_Ess_Prg_MMRAaG' in batch1.columns.to_list() #PtReg

True

In [42]:
batch1_forms.append('PtReg')

In [44]:
print(batch1_forms)

['Ess_CNH_SDQ_4', 'Ess_HSE_HOME_EC', 'Rec_RCh_TSR_6_12', 'Rec_RCh_SESC_6_12', 'Ess_CWB_PGLS5a_A', 'Ess_CHB_BLOCK2', 'Rec_RCh_NDSR04_C', 'Ess_CPH_PDS', 'Ess_CNP_PPVT3', 'Ess_CNH_ASQ_18', 'Ess_CHB_CFSP', 'Rec_RCh_CPrU_PR', 'Ess_CNH_ASQ_24', 'Ess_SRP_EAC_PR', 'Ess_CNH_ASQ_30', 'Ess_CNH_SCQ', 'Ess_CWB_PLS8b_Ped', 'Rec_RCg_CTS', 'Ess_HSE_ACE_aPV_A', 'Rec_RCh_Caff_PR', 'Rec_RCg_PSOC', 'Ess_CNH_SB5', 'Ess_CNH_RECBQvSF', 'Ess_CNH_PPSE4a_Ped', 'Ess_Prg_MMRAaG', 'Ess_HSE_FES_Coh', 'Rec_RCh_C19SSD_cPR', 'Ess_CHB_BLOCK3', 'Rec_RCg_SRS2CG', 'Ess_CHB_PPA8a_PP', 'Rec_RCh_NTES_SR', 'Ess_CNH_WISC5', 'Rec_RCh_CADHD_PR', 'Rec_Mat_PUQE', 'Ess_SRP_HPAP_CR', 'Ess_HSE_HOME_IT', 'Rec_RCg_PGLS5', 'Ess_CNP_PGHM2', 'Ess_CNP_PSS14', 'Ess_CNH_WASI2', 'Ess_CNH_ASQ_33', 'Ess_CNH_ASQ_27', 'Ess_HSE_ACE_CR', 'Rec_RCh_PDS8a_Ped', 'Ess_HHx_C19Vac_A', 'Ess_HHx_C19_2_pA', 'Rec_RCg_NE_CE_C', 'Rec_RCh_PDS4a_ECPR', 'PtReg']


In [26]:
batch2 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_2.csv', low_memory=False)

In [27]:
batch2_forms = get_form_names(batch2.columns.to_list())
print(len(batch2_forms))
print(batch2_forms)

49
['Rec_RCh_PDS6a_PP', 'Ess_CWB_PGH8a_ECPR', 'Ess_Prg_MSupp_R', 'Ess_CNH_ASQ_2', 'Ess_CNH_WISC4', 'Ess_CNH_ASQ_6', 'Ess_CSH_PSRI4a_A', 'Ess_CNH_SDQ_2', 'Rec_RCh_NDSR02_C', 'Rec_RCh_MCHATR', 'Ess_CPH_PAI8a_PP', 'Rec_RCh_FES_Con', 'Rec_RCh_ASA_TS', 'Ess_HHx_C19_4_pP', 'Ess_CNH_ASQ_36', 'Ess_CNH_ASQ_22', 'Rec_RCg_NE_CE_P', 'Rec_RCh_HGSW_14_21', 'Rec_RCg_PAnx8', 'Ess_CPH_Air2_Adol', 'Ess_CSH_PSD4a_Ped', 'Ess_CSH_PSRI4a_PP', 'Ess_CNP_PSS10', 'Ess_CSH_SHCA2_SR', 'Ess_CNH_WISC3', 'Ess_CWB_CGB', 'Rec_RCh_SEco_PR', 'Ess_CNP_CRISYS', 'Rec_RCg_FES_Con2', 'Ess_CPH_Air2_MC', 'Ess_CNH_PPSE4a_PP', 'Ess_HSE_CHAOS', 'Ess_Prg_Life_PP', 'Ess_CWB_PLS8b_PP', 'Ess_CNP_ESSI', 'Ess_CPH_Air_MC', 'Ess_CNP_PGHP2', 'Ess_CHB_DSQ_SR2', 'Ess_HHx_MH_BF', 'Ess_CWB_PMP8a_Ped', 'Ess_CNH_ASQ_20', 'Ess_CNP_CESD', 'Rec_RCh_SPIRO', 'Ess_HSE_APQ_P9', 'Rec_RCh_Dent_SR', 'Ess_CPH_DXA', 'Ess_CNP_PInfS4', 'Ess_CPH_CG_CAPE', 'Ess_Dem_GI']


In [45]:
print(batch2_forms)

['Rec_RCh_PDS6a_PP', 'Ess_CWB_PGH8a_ECPR', 'Ess_Prg_MSupp_R', 'Ess_CNH_ASQ_2', 'Ess_CNH_WISC4', 'Ess_CNH_ASQ_6', 'Ess_CSH_PSRI4a_A', 'Ess_CNH_SDQ_2', 'Rec_RCh_NDSR02_C', 'Rec_RCh_MCHATR', 'Ess_CPH_PAI8a_PP', 'Rec_RCh_FES_Con', 'Rec_RCh_ASA_TS', 'Ess_HHx_C19_4_pP', 'Ess_CNH_ASQ_36', 'Ess_CNH_ASQ_22', 'Rec_RCg_NE_CE_P', 'Rec_RCh_HGSW_14_21', 'Rec_RCg_PAnx8', 'Ess_CPH_Air2_Adol', 'Ess_CSH_PSD4a_Ped', 'Ess_CSH_PSRI4a_PP', 'Ess_CNP_PSS10', 'Ess_CSH_SHCA2_SR', 'Ess_CNH_WISC3', 'Ess_CWB_CGB', 'Rec_RCh_SEco_PR', 'Ess_CNP_CRISYS', 'Rec_RCg_FES_Con2', 'Ess_CPH_Air2_MC', 'Ess_CNH_PPSE4a_PP', 'Ess_HSE_CHAOS', 'Ess_Prg_Life_PP', 'Ess_CWB_PLS8b_PP', 'Ess_CNP_ESSI', 'Ess_CPH_Air_MC', 'Ess_CNP_PGHP2', 'Ess_CHB_DSQ_SR2', 'Ess_HHx_MH_BF', 'Ess_CWB_PMP8a_Ped', 'Ess_CNH_ASQ_20', 'Ess_CNP_CESD', 'Rec_RCh_SPIRO', 'Ess_HSE_APQ_P9', 'Rec_RCh_Dent_SR', 'Ess_CPH_DXA', 'Ess_CNP_PInfS4', 'Ess_CPH_CG_CAPE', 'Ess_Dem_GI']


In [28]:
batch3 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_3.csv', low_memory=False)
batch3_forms = get_form_names(batch3.columns.to_list())
print(len(batch3_forms))
print(batch3_forms)

50
['Rec_RCg_PAng5', 'Ess_Dem_ECE', 'Ess_CNH_ASQ_4', 'Rec_Mat_VS_SI', 'Ess_CNH_BDI2', 'Ess_HHx_C19_aA', 'Ess_CNP_EPDS', 'Ess_CNH_SRS2_Sch_SF', 'Ess_CHB_PPA7a_ECPR', 'Ess_HSE_PFR4a_PP', 'Ess_CNP_WASI2A', 'Rec_RCh_PPAff8a_PP', 'Ess_CWB_PGHP2a_A', 'Ess_BPE_HCExp_PP', 'Ess_CNP_BSI', 'Ess_Prg_MSupp_PI', 'Ess_CNP_PHQ9', 'Ess_HSE_ACE_PR', 'Ess_BPE_HESHS_PI', 'Ess_SRP_HPAP_PR', 'Ess_HHx_C19_2_cP', 'Ess_Dem_Dem_B', 'Ess_CNH_BASC2', 'Ess_CSH_PSD4a_A', 'Ess_HHx_C19_2_aP', 'Ess_Prg_PSQI', 'Ess_CNP_ACE_aA', 'Ess_CPH_CAPE_C', 'Rec_RCh_PAnx8a_Ped', 'Ess_CNP_WEDS_S', 'Ess_Prg_PMCI', 'Ess_CNH_RIBQRvSF', 'Rec_RCh_ASA_TNS', 'Rec_RCh_Ho', 'Ess_Prg_PWtGpSR', 'Ess_Dem_Dem_C', 'Ess_CNH_BASC3', 'Ess_Prg_MSuppSF_R', 'Ess_HHx_C19_3_aP', 'Ess_CWB_PGHM2a_A', 'Ess_CNH_PSS4_A', 'Ess_CNH_NEPSY2', 'Ess_HHx_MH_EC', 'Ess_HSE_APQ_C9', 'Ess_BPE_OExp_PP', 'Rec_RCg_C19_aF', 'Ess_CHB_YRB_SB', 'Rec_RCh_SPIROv2', 'Ess_CHB_YRB_SU', 'Ess_CNH_ASQ_54']


In [29]:
batch4 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_4.csv', low_memory=False)
batch4_forms = get_form_names(batch4.columns.to_list())
print(len(batch4_forms))
print(batch4_forms)

50
['Ess_CNH_SDQ_11SR', 'Rec_RCg_C19_CEE2', 'Ess_CNP_PPVT4', 'Ess_Prg_BLOCK4', 'Rec_RCh_SEco_CR', 'Ess_CNH_PSS10_SR', 'Rec_RCg_PMP8', 'Ess_Prg_MFSP', 'Ess_CWB_PGH7_Ped', 'Ess_CNP_ACEBRFSS', 'Rec_RCh_SE_3_5', 'Rec_RCh_WJ3', 'Ess_CPH_Air_Adol', 'Ess_Dem_Dem_CG', 'Ess_Prg_Life_R', 'Ess_CNH_ASQ_42', 'Ess_CSH_SHCA_PR', 'Ess_Prg_DSQ_SR', 'Ess_CNP_PSS4', 'Ess_CNP_LSC', 'Rec_RCh_CMU_PR', 'Rec_RCg_ASR', 'Ess_CNP_WAIS4', 'Ess_BPE_OExp_R', 'Ess_HHx_C19_3_cP', 'Rec_RCh_PAnx8a_PP', 'Ess_ADM_Roster', 'Ess_Dem_CFSM', 'Rec_Mat_SRB_F', 'Ess_CNP_KRIEG', 'Ess_CNH_BRIEF', 'Ess_HHx_MH_MCA', 'Rec_RCh_CKCPT', 'Rec_RCh_SESC_3_5', 'Ess_ADM_WTHD', 'Rec_RCg_EPII_A', 'Ess_Prg_MMRA2', 'Ess_Dem_IAFS_C', 'Ess_CPH_CAPE_I', 'Ess_HSE_APQ_P', 'Ess_CSH_SHCA_SR', 'Ess_CPH_MRA_CAPE', 'Rec_RCh_CMU_SR', 'Rec_RCh_HPBS', 'Ess_CNP_BDI', 'Ess_Dem_CK12E', 'Ess_HHx_C19_2_pP', 'Rec_RCh_AAB', 'Ess_Dem_LA_P', 'Ess_BPE_HESHS_R']


In [30]:
batch5 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_5.csv', low_memory=False)
batch5_forms = get_form_names(batch5.columns.to_list())
print(len(batch5_forms))
print(batch5_forms)

50
['Rec_RCh_ASA_Total', 'Ess_CNH_BAY3', 'Ess_CNP_CTQ', 'Ess_HSE_PFR8a_Ped', 'Ess_BPE_HCExp_R', 'Ess_CNH_PPVT3', 'Rec_RCh_CFPQ', 'Ess_CNH_WPPSI3', 'Ess_CNH_SRS2_Pr', 'Ess_CNH_DAS2', 'Rec_RCg_NE_S_C', 'Ess_CNH_PSS10_PR', 'Rec_Mat_NDSR04', 'Ess_CNP_PES4', 'Rec_RCg_PSRI4a2', 'Ess_CNH_SRS2_Pre_SF', 'Ess_BPE_HESHS_PP', 'Ess_Prg_MWtHtM', 'Ess_Prg_DHQ2', 'Ess_CNH_ASQ_60', 'Ess_CNH_ASQ_48', 'Ess_Prg_MSupp_PP', 'Ess_BPE_HCExp_PI', 'Ess_CPH_TANST', 'Ess_CPH_CAPE', 'Ess_CNP_LES', 'Ess_Prg_FTND', 'Ess_CNP_LSC12', 'Ess_BPE_HESHS_C', 'Ess_HSE_PFR8a_PP', 'Ess_CSH_PSRI4a_Ped', 'Rec_RCh_CEBQ', 'Ess_Dem_LA_C', 'Rec_RCh_SEco_INF', 'Ess_BPE_HCExp_C', 'Ess_HHx_MH2_BF', 'Rec_RCh_PAnx4a_ECPR', 'Ess_HHx_C19Vac_PR', 'Ess_Prg_PSD4', 'Ess_CNH_WPPSI4', 'Ess_HHx_C19_3_pP', 'Ess_CPH_Air_EC', 'Ess_SRP_NTPPI_PR', 'Ess_HSE_APQ_C', 'Rec_RCg_C19_CEE', 'Ess_Dem_IAFS_P', 'Ess_CNH_PPVT4', 'Ess_CPH_PAI8a_Ped', 'Ess_CNP_PDep8', 'Rec_Mat_NDSR02']


In [31]:
batch6 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_6.csv', low_memory=False)
batch6_forms = get_form_names(batch6.columns.to_list())
print(len(batch6_forms))
print(batch6_forms)

50
['Ess_CPH_Air2_EC', 'Ess_CNH_SRS2_Sch', 'Ess_CNH_ASQ_9', 'Ess_Prg_MSHPrg', 'Rec_Mat_PPAQ', 'Ess_CNH_SRS2_A', 'Ess_HHx_C19_2_aA', 'Rec_RCg_CRISYS_SF', 'Ess_CHB_PPA8a_Ped', 'Ess_ADM_REVT_S', 'Ess_CNP_ACE_aP', 'Ess_CWB_PGH7_PP', 'Ess_CHB_DSQ_PR', 'Ess_CNH_ASQ_10', 'Rec_RCh_PPAff4a_ECPR', 'Ess_CNH_MSEL', 'Rec_RCh_Dent_PR', 'Ess_CNP_PInstrS4', 'Ess_CPH_Air2_Inf', 'Ess_Prg_PSRI4', 'Ess_Dem_HHC_C', 'Ess_HHx_C19_cP', 'Ess_CNH_ASQ_8', 'Ess_HHx_HIC', 'Rec_RCg_PSD4a2', 'Ess_CNH_PSS10_A', 'Ess_HHx_CBMRAaJ', 'Ess_HHx_C19_aP', 'Ess_CNH_ASQ_12', 'Ess_Prg_MSuppSF_PP', 'Ess_Prg_Life_PI', 'Ess_CSH_SHCA2_PR', 'Ess_CSH_SHAdult_A', 'Ess_CHB_IFP', 'Ess_CNH_SRS_Pr', 'Ess_HHx_C19_2_cA', 'Rec_RCh_HGSW_8_13', 'Ess_CNH_CBCL_Sch', 'Rec_RCh_CADHD_SR', 'Rec_RCh_TSR_3_5', 'Rec_RCh_PPAff8a_Ped', 'Ess_HHx_C19_4_cP', 'Ess_HHx_CBMRA', 'Ess_HHx_C19Vac', 'Ess_Dem_Occ_Adol', 'Ess_HSE_PFR4a_Ped', 'Ess_HHx_MH_I', 'Rec_RCh_C19SSD_aSR', 'Ess_CNP_PGH', 'Ess_CHB_CFH']


In [32]:
batch7 = pd.read_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/combined_echo_data_batch_7.csv', low_memory=False)
batch7_forms = get_form_names(batch7.columns.to_list())
print(len(batch7_forms))
print(batch7_forms)

40
['Rec_RCh_SE_6_12', 'Ess_SRP_PComp6a_A', 'Rec_RCg_SHAdul', 'Ess_Dem_FRS', 'Ess_HSE_PARS', 'Ess_CNH_ASQ_16', 'Rec_RCg_NE_PA_C', 'Ess_CNH_SDQ_11', 'Ess_HHx_NNNS', 'Ess_CSH_SHInf', 'Ess_CSH_PSD4a_PP', 'Ess_SRP_PPR8a_Ped', 'Ess_CNH_BRIEF_2', 'Ess_CPH_Air_Inf', 'Ess_HHx_CBMRA2', 'Ess_CNH_RCBQvSF', 'Ess_Dem_HHC_P', 'Ess_HHx_C19_cPR', 'Ess_Prg_PMCI2', 'Ess_HHx_CBI', 'Ess_CNP_WEDS', 'Ess_CPH_BIOIMP', 'Ess_CNP_WASI', 'Ess_CNH_ASQ_14', 'Rec_RCh_CPrU_SR', 'Rec_RCh_Caff_SR', 'Ess_CHB_BLOCK', 'Ess_CWB_PMP8a_A', 'Ess_CNH_CBCL_Pr', 'Ess_CNP_SF36', 'Ess_CWB_PGH_A', 'Ess_HSE_ACE_aAV_A', 'Ess_Prg_MMRAaK', 'Ess_CPH_PPBP', 'Ess_CNH_SDQ_18SR', 'Ess_HHx_C19_4_aP', 'Ess_Dem_Occ_CG', 'Ess_CNH_BRIEF_P', 'Ess_Prg_MMRA', 'Ess_Prg_TPWtGSR']


In [47]:
batch_dict = {'Batch 1':batch1_forms, 'Batch 2':batch2_forms, 'Batch 3': batch3_forms, 'Batch 4': batch4_forms, 'Batch 5': batch5_forms, 'Batch 6': batch6_forms, 'Batch 7': batch7_forms}

In [49]:
batches_df = pd.DataFrame(dict([(k, pd.Series(v)) for k, v in batch_dict.items()]))
batches_df

Unnamed: 0,Batch 1,Batch 2,Batch 3,Batch 4,Batch 5,Batch 6,Batch 7
0,Ess_CNH_SDQ_4,Rec_RCh_PDS6a_PP,Rec_RCg_PAng5,Ess_CNH_SDQ_11SR,Rec_RCh_ASA_Total,Ess_CPH_Air2_EC,Rec_RCh_SE_6_12
1,Ess_HSE_HOME_EC,Ess_CWB_PGH8a_ECPR,Ess_Dem_ECE,Rec_RCg_C19_CEE2,Ess_CNH_BAY3,Ess_CNH_SRS2_Sch,Ess_SRP_PComp6a_A
2,Rec_RCh_TSR_6_12,Ess_Prg_MSupp_R,Ess_CNH_ASQ_4,Ess_CNP_PPVT4,Ess_CNP_CTQ,Ess_CNH_ASQ_9,Rec_RCg_SHAdul
3,Rec_RCh_SESC_6_12,Ess_CNH_ASQ_2,Rec_Mat_VS_SI,Ess_Prg_BLOCK4,Ess_HSE_PFR8a_Ped,Ess_Prg_MSHPrg,Ess_Dem_FRS
4,Ess_CWB_PGLS5a_A,Ess_CNH_WISC4,Ess_CNH_BDI2,Rec_RCh_SEco_CR,Ess_BPE_HCExp_R,Rec_Mat_PPAQ,Ess_HSE_PARS
5,Ess_CHB_BLOCK2,Ess_CNH_ASQ_6,Ess_HHx_C19_aA,Ess_CNH_PSS10_SR,Ess_CNH_PPVT3,Ess_CNH_SRS2_A,Ess_CNH_ASQ_16
6,Rec_RCh_NDSR04_C,Ess_CSH_PSRI4a_A,Ess_CNP_EPDS,Rec_RCg_PMP8,Rec_RCh_CFPQ,Ess_HHx_C19_2_aA,Rec_RCg_NE_PA_C
7,Ess_CPH_PDS,Ess_CNH_SDQ_2,Ess_CNH_SRS2_Sch_SF,Ess_Prg_MFSP,Ess_CNH_WPPSI3,Rec_RCg_CRISYS_SF,Ess_CNH_SDQ_11
8,Ess_CNP_PPVT3,Rec_RCh_NDSR02_C,Ess_CHB_PPA7a_ECPR,Ess_CWB_PGH7_Ped,Ess_CNH_SRS2_Pr,Ess_CHB_PPA8a_Ped,Ess_HHx_NNNS
9,Ess_CNH_ASQ_18,Rec_RCh_MCHATR,Ess_HSE_PFR4a_PP,Ess_CNP_ACEBRFSS,Ess_CNH_DAS2,Ess_ADM_REVT_S,Ess_CSH_SHInf


# Data from Maternal and Nutrition Paper
https://www.sciencedirect.com/science/article/pii/S2475299123266032?via%3Dihub#kwrds0015

In [22]:
forms_list = ['Rec_RCh_ASA_TNS', 'Rec_RCh_ASA_Totals', 'Rec_RCh_ASA_TS', 'Ess_Prg_DHQ3', 'Ess_CHB_BLOCK', 'Ess_CHB_BLOCK2','Ess_CHB_BLOCK3', 'Ess_Prg_DSQ_SR', 'Ess_CHB_DSQ_SR2', 'Ess_CHB_DSQ_PR', 'Ess_Prg_MFSP', 'Ess_CHB_CFSP', 'Ess_CHB_IFP', 'Ess_CHB_CFH', 'Ess_Prg_MMRA', 'Ess_Prg_MMRA2', 'Ess_Prg_MMRAaG', 'Ess_Prg_MSupp_PP', 'Ess_Prg_MSupp_PI', 'Ess_Prg_MSupp_R', 'Ess_Prg_MSuppSF_PP', 'Ess_Prg_MSuppSF_PI', 'Ess_Prg_MSuppSF_R']

In [23]:
print(len(forms_list))

23


In [24]:
# Get a list of form csv files
specific_form_files = [form + '.csv' for form in forms_list]
print(specific_form_files)

['Rec_RCh_ASA_TNS.csv', 'Rec_RCh_ASA_Totals.csv', 'Rec_RCh_ASA_TS.csv', 'Ess_Prg_DHQ3.csv', 'Ess_CHB_BLOCK.csv', 'Ess_CHB_BLOCK2.csv', 'Ess_CHB_BLOCK3.csv', 'Ess_Prg_DSQ_SR.csv', 'Ess_CHB_DSQ_SR2.csv', 'Ess_CHB_DSQ_PR.csv', 'Ess_Prg_MFSP.csv', 'Ess_CHB_CFSP.csv', 'Ess_CHB_IFP.csv', 'Ess_CHB_CFH.csv', 'Ess_Prg_MMRA.csv', 'Ess_Prg_MMRA2.csv', 'Ess_Prg_MMRAaG.csv', 'Ess_Prg_MSupp_PP.csv', 'Ess_Prg_MSupp_PI.csv', 'Ess_Prg_MSupp_R.csv', 'Ess_Prg_MSuppSF_PP.csv', 'Ess_Prg_MSuppSF_PI.csv', 'Ess_Prg_MSuppSF_R.csv']


In [39]:
# Set directories with the forms 
forms_directory = '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms'
PtReg ='/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/ptReg.csv'

In [40]:
specific_form_file_paths = [os.path.join(forms_directory, file) for file in os.listdir(forms_directory) if file.endswith('.csv') and file in specific_form_files]
len(specific_form_file_paths)

21

In [41]:
# Add pt_reg file to specific forms files as first file
specific_form_file_paths.insert(0, PtReg)
len(specific_form_file_paths)

22

In [42]:
combined_df, merged_files, files_count= combine_echo_files(specific_form_file_paths)

2024-05-19 20:10:13,253 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/ptReg.csv - ('_/Ess_Prg_MMRA', '_/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/ptReg')
2024-05-19 20:10:13,254 - Files processed: 1
2024-05-19 20:10:13,377 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_CHB_BLOCK2.csv - ('_/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/ptReg', '_/Ess_CHB_BLOCK2')
2024-05-19 20:10:47,261 - Files processed: 2
2024-05-19 20:10:47,371 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_CHB_CFSP.csv - ('_/Ess_CHB_BLOCK2', '_/Ess_CHB_CFSP')
2024-05-19 20:12:01,980 - Files processed: 3
2024-05-19 20:12:02,001 - Reading /Users/faith/D

In [44]:
# Export to csv
combined_df.to_csv('/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/raw_combined_batches/maternal_child_nutrition')

In [47]:
combined_df.head(2)

Unnamed: 0,xParticipantID,xOthPtID1,ProtocolID,xCohortID,xSiteID,xCohortID_UH3,ExplicitRegistration,ParticipantType,ParticipationLevel,PregnancyNumber,...,mmra_f3h6_c,mmra_f3h7_c,mmra_f3h8_c,mmra_f3h9_c,mmra_sectionf_complete___1,mmra_setting,mmra_mode,mmra_version,mmra_language,MMRA_EmanifestSource
0,A10002-01-0,,EWCP_New,AAX06,,AAX06,0,P,2,1,...,,,,,,,,,,
1,A10670-01-0,,EWCP_New,AAX06,,AAX06,0,P,2,1,...,,,,,,,,,,


In [None]:
combined_df.shape

# BCH ECHO Categories Processing

In [39]:
# Need to read CSV file with two columns
# Column 1: Form Name - get form name and append .csv at end
# Column 2: Combine into category CSVs - use to get name of the batch file
# Dictionary with batch_file_name as key and form_name csv files 
import pandas as pd 

def generate_categorized_forms_dict(category_file_path, forms_directory_path): 
    # Set forms_directory_path
    forms_directory_path=forms_directory_path
    
    # Define data types for column to speed up reading 
    dtype_spec = {'Form': 'string', 'Category': 'string'}
    df = pd.read_csv(category_file_path, dtype=dtype_spec)
    print(df.head())
    
    # Append forms_path + .csv to Form Names in a vectorized manner(more efficient)
    df['Form'] = forms_directory_path + df['Form'].astype(str) + '.csv'
    
    # Group by Categories and convert to dictionary output for downstream tasks 
    echo_batches_dict = df.groupby('Category')['Form'].apply(list).to_dict()
    
    return echo_batches_dict

In [43]:
echo_batches_dict = generate_categorized_forms_dict(category_file_path='/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/echo_first_analysis.csv', forms_directory_path='/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/')
echo_batches_dict

              Form        Category
0  Ess_Prg_Life_PP  First analysis
1  Ess_Prg_Life_PI  First analysis
2   Ess_Prg_Life_R  First analysis
3     Ess_Prg_FTND  First analysis
4     Rec_Mat_PPAQ  First analysis


{'First analysis': ['/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_PP.csv',
  '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_PI.csv',
  '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_R.csv',
  '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_FTND.csv',
  '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Rec_Mat_PPAQ.csv',
  '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_HHx_MH_I.csv',
  '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_MWtHtM.csv',
  '/Users/faith/Desktop/Work/Boston Child

In [41]:
# Get ECHO Batches per specified categories 
def generate_echo_categorized_batches_data(pt_reg_file_path, category_file_path, forms_directory_path):
    echo_batches_dict = generate_categorized_forms_dict(category_file_path, forms_directory_path)
    # Set PtReg file path
    PtReg = pt_reg_file_path
    pt_reg_columns = len(pd.read_csv(pt_reg_file_path).columns)
    
    # Add pt_reg file to all the echo_batches as first file to be read - NB: this results into getting some participants from registration with no forms data, need to set a threshold say if the file only contains data from in the registration, eliminate them down stream.... 
    for key, values_list in echo_batches_dict.items():
        values_list.insert(0, PtReg)
        # Call combine function
        combined_df, merged_files, files_count = combine_echo_files(values_list)
        print(f'The merged files: Batch {key}: {values_list}')
        combined_df.to_csv(f'combined_echo_data_batch_{key}.csv', index=False)
    print('done')


In [44]:
generate_echo_categorized_batches_data(pt_reg_file_path='/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg.csv', forms_directory_path='/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/', category_file_path='/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/echo_first_analysis.csv') # Took 3hours and 9 minutes to complete, generation of df took about 1.2 hours, exporting it to dataframe took most of the time about 1.9

              Form        Category
0  Ess_Prg_Life_PP  First analysis
1  Ess_Prg_Life_PI  First analysis
2   Ess_Prg_Life_R  First analysis
3     Ess_Prg_FTND  First analysis
4     Rec_Mat_PPAQ  First analysis


  pt_reg_columns = len(pd.read_csv(pt_reg_file_path).columns)
2024-06-03 16:39:03,661 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg.csv - ('_/Ess_Dem_CFSM', '_/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg')
2024-06-03 16:39:03,664 - Files processed: 1
2024-06-03 16:39:06,456 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_PP.csv - ('_/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg', '_/Ess_Prg_Life_PP')
2024-06-03 16:44:59,396 - Files processed: 2
2024-06-03 16:44:59,488 - Reading /Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_PI.csv - ('_/Ess_Prg_Life_PP', '_/Ess_Prg_Life_PI')
2024-06-03 16:45:25,80

The merged files: Batch First analysis: ['/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/01_research/PtReg.csv', '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_PP.csv', '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_PI.csv', '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_Life_R.csv', '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_Prg_FTND.csv', '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Rec_Mat_PPAQ.csv', '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/Data_CSVDownloaded/02_forms/Ess_HHx_MH_I.csv', '/Users/faith/Desktop/Work/Boston Chil

In [45]:
combined_df.head()

Unnamed: 0,xParticipantID,xOthPtID1,ProtocolID,xCohortID,xSiteID,xCohortID_UH3,ExplicitRegistration,ParticipantType,ParticipationLevel,PregnancyNumber,...,X1_MC02.ftr_setting,X1_EC05.ftr_setting,X1_MC02.ftr_mode,X1_EC05.ftr_mode,X1_MC02.HOME_EC_EmanifestSource,X1_EC05.HOME_EC_EmanifestSource,X1_MC02.HOME_EC_AgeInDays,X1_EC05.HOME_EC_AgeInDays,X1_MC02.HOME_EC_AgeInYears,X1_EC05.HOME_EC_AgeInYears
0,A10002-01-0,,EWCP_New,AAX06,,AAX06,0,P,2,1,...,,,,,,,,,,
1,A10670-01-0,,EWCP_New,AAX06,,AAX06,0,P,2,1,...,,,,,,,,,,
2,A20735-02-A,,EWCP_New,AAL01,,AAL02,1,C,2,2,...,,,,,,,,,,
3,A30349-01-A,,EWCP_New,AEA01,,AEA01,1,C,2,1,...,3.0,3.0,2.0,2.0,C,o,2645.0,1793.0,7.0,4.0
4,A37520-01-A,,EWCP_New,ACA01,,ACA01,1,C,2,1,...,,,,,,,,,,


In [46]:
combined_df.columns

Index(['xParticipantID', 'xOthPtID1', 'ProtocolID', 'xCohortID', 'xSiteID',
       'xCohortID_UH3', 'ExplicitRegistration', 'ParticipantType',
       'ParticipationLevel', 'PregnancyNumber',
       ...
       'X1_MC02.ftr_setting', ' X1_EC05.ftr_setting', 'X1_MC02.ftr_mode',
       ' X1_EC05.ftr_mode', 'X1_MC02.HOME_EC_EmanifestSource',
       ' X1_EC05.HOME_EC_EmanifestSource', 'X1_MC02.HOME_EC_AgeInDays',
       ' X1_EC05.HOME_EC_AgeInDays', 'X1_MC02.HOME_EC_AgeInYears',
       ' X1_EC05.HOME_EC_AgeInYears'],
      dtype='object', length=2187)

In [47]:
combined_df.shape

(63215, 2187)

# PySpark

In [3]:
import pyspark 

In [10]:
""" Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the SparkSession.builder.getOrCreate() method. This returns an existing SparkSession if there's already one in the environment, or creates a new one if necessary! """

# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Initialize Spark Session 
spark = (SparkSession.builder.appName('Read First Analysis File').config('spark.sql.files.maxPartitionBytes', '1g')
         .getOrCreate())
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fdf6dfb2640>


In [12]:
## Get file path
first_analysis_file_path = '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/combined_echo_data_batch_First analysis.csv'

## Adjust the max number if columns , default is 20480 columns which is less than '48699' columns in the file
spark.conf.set('spark.sql.maxPartitionBytes', '1g')
spark.conf.set('spark.sql.files.maxColumns', 48700)
first_analysis_spark_df = spark.read.csv(first_analysis_file_path, header=True, inferSchema=True) 

## Read CSV file
df = spark.read.option('maxColumns', '48700').csv(first_analysis_file_path, header=True, inferSchema=True)

## Show the schema
df.printSchema()

# Show the data 
df.show(5)

Py4JJavaError: An error occurred while calling o77.csv.
: com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - null
Parser Configuration: CsvParserSettings:
	Auto configuration enabled=true
	Auto-closing enabled=true
	Autodetect column delimiter=false
	Autodetect quotes=false
	Column reordering enabled=true
	Delimiters for detection=null
	Empty value=
	Escape unquoted values=false
	Header extraction enabled=null
	Headers=null
	Ignore leading whitespaces=false
	Ignore leading whitespaces in quotes=false
	Ignore trailing whitespaces=false
	Ignore trailing whitespaces in quotes=false
	Input buffer size=1048576
	Input reading on separate thread=false
	Keep escape sequences=false
	Keep quotes=false
	Length of content displayed on error=1000
	Line separator detection enabled=false
	Maximum number of characters per column=-1
	Maximum number of columns=20480
	Normalize escaped line separators=true
	Null value=
	Number of records to read=all
	Processor=none
	Restricting data in exceptions=false
	RowProcessor error handler=null
	Selected fields=none
	Skip bits as whitespace=true
	Skip empty lines=true
	Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
	CsvFormat:
		Comment character=#
		Field delimiter=,
		Line separator (normalized)=\n
		Line separator sequence=\n
		Quote character="
		Quote escape character=\
		Quote escape escape character=null
Internal state when error was thrown: line=0, column=20481, record=0, charIndex=470504
	at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
	at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:707)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.$anonfun$inferFromDataset$1(CSVDataSource.scala:124)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.inferFromDataset(CSVDataSource.scala:124)
	at org.apache.spark.sql.execution.datasources.csv.TextInputCSVDataSource$.infer(CSVDataSource.scala:112)
	at org.apache.spark.sql.execution.datasources.csv.CSVDataSource.inferSchema(CSVDataSource.scala:64)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.inferSchema(CSVFileFormat.scala:62)
	at org.apache.spark.sql.execution.datasources.DataSource.$anonfun$getOrInferFileFormatSchema$11(DataSource.scala:208)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:205)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:407)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:538)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ArrayIndexOutOfBoundsException


In [16]:
# Close Spark Connection 
spark.stop()

KeyboardInterrupt: 

In [2]:
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName("Read Large CSV with Many Columns") \
    .config("spark.sql.files.maxPartitionBytes", "1g") \
    .config("spark.sql.files.maxColumns", 48700).getOrCreate()

## Get file path
first_analysis_file_path = '/Users/faith/Desktop/Work/Boston Childrens Hospital/birth-cohort-db/ECHO/data_dump/combined_echo_data_batch_First analysis.csv'

# Read the CSV file with specific options
df = spark.read.option("header", "true").option("inferSchema", "true").option("maxColumns", "48700").option("mode", "DROPMALFORMED").csv(first_analysis_file_path)

# Show the schema
df.printSchema()

# Perform operations on the dataframe -- not working, running out of memory https://stackoverflow.com/questions/40992104/getting-outofmemoryerror-gc-overhead-limit-exceed-in-pyspark
df.show(5)


24/06/05 11:53:06 WARN DAGScheduler: Broadcasting large task binary with size 1398.7 KiB
                                                                                

root
 |-- xParticipantID: string (nullable = true)
 |-- xOthPtID1: string (nullable = true)
 |-- ProtocolID: string (nullable = true)
 |-- xCohortID: string (nullable = true)
 |-- xSiteID: string (nullable = true)
 |-- xCohortID_UH3: string (nullable = true)
 |-- ExplicitRegistration: integer (nullable = true)
 |-- ParticipantType: string (nullable = true)
 |-- ParticipationLevel: integer (nullable = true)
 |-- PregnancyNumber: integer (nullable = true)
 |-- DeliveryOrder: string (nullable = true)
 |-- Ethnicity: double (nullable = true)
 |-- Race: double (nullable = true)
 |-- Sex: double (nullable = true)
 |-- xDateOfBirth: string (nullable = true)
 |-- YearOfBirth: string (nullable = true)
 |-- xExpectedDueDate: string (nullable = true)
 |-- ExpectedYearOfBirth: string (nullable = true)
 |-- xCohortEnrollmentDate: string (nullable = true)
 |-- YearOfCohortEnrollment: string (nullable = true)
 |-- xProtocolEnrollmentDate: string (nullable = true)
 |-- FinalParticipationLevel: double 

Py4JJavaError: An error occurred while calling o43.showString.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuilder.append(StringBuilder.java:136)
	at org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull.<init>(objects.scala:1839)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$5.applyOrElse(ExpressionEncoder.scala:251)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$5.applyOrElse(ExpressionEncoder.scala:247)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$2(TreeNode.scala:515)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1302/2006130245.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:515)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1301/2042671046.apply(Unknown Source)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:699)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1301/2042671046.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.trees.TernaryLike.mapChildren(TreeNode.scala:1272)
	at org.apache.spark.sql.catalyst.trees.TernaryLike.mapChildren$(TreeNode.scala:1271)
	at org.apache.spark.sql.catalyst.expressions.If.mapChildren(conditionalExpressions.scala:41)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUpWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1301/2042671046.apply(Unknown Source)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.TraversableLike$$Lambda$75/343812839.apply(Unknown Source)
	at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:699)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUpWithPruning(TreeNode.scala:512)


In [11]:
df.show(1) ## issues with memory https://stackoverflow.com/questions/40992104/getting-outofmemoryerror-gc-overhead-limit-exceed-in-pyspark

Py4JJavaError: An error occurred while calling o43.showString.
: java.lang.OutOfMemoryError: GC overhead limit exceeded


In [12]:
df.columns

['xParticipantID',
 'xOthPtID1',
 'ProtocolID',
 'xCohortID',
 'xSiteID',
 'xCohortID_UH3',
 'ExplicitRegistration',
 'ParticipantType',
 'ParticipationLevel',
 'PregnancyNumber',
 'DeliveryOrder',
 'Ethnicity',
 'Race',
 'Sex',
 'xDateOfBirth',
 'YearOfBirth',
 'xExpectedDueDate',
 'ExpectedYearOfBirth',
 'xCohortEnrollmentDate',
 'YearOfCohortEnrollment',
 'xProtocolEnrollmentDate',
 'FinalParticipationLevel',
 'FinalLevelReason',
 'xOtherfinalReason',
 'Withdrawn',
 'xWithdrawnReason',
 'VisitName_/Ess_Prg_Life_PP',
 'xFormDT_/Ess_Prg_Life_PP',
 'X3_Pre01.SequenceNum_/Ess_Prg_FTND',
 'X3_Pre01.respondent_/Ess_Prg_FTND',
 'X3_Pre01.life_pp_07a1',
 'X3_Pre01.life_pp_07a2',
 'X3_Pre01.life_pp_10c1',
 'X3_Pre01.life_pp_10c2',
 'X3_Pre01.life_pp_setting',
 'X3_Pre01.life_pp_mode',
 'X3_Pre01.Life_PP_EmanifestSource',
 'X3_Pre01.Life_PP_GAInDays',
 'X3_Pre01.Life_PP_GAInWks',
 'X3_Pre01.life_pp_10c2_tri___1',
 'X3_Pre01.life_pp_10c2_tri___2',
 'X3_Pre01.life_pp_10c2_tri___3',
 'X3_Pre01.l

In [14]:
row_count = df.count() ## memory issue

Py4JJavaError: An error occurred while calling o43.count.
: java.lang.OutOfMemoryError: GC overhead limit exceeded


In [5]:
spark.stop()

# Using Dask Dataframe

In [19]:
!pip install dask

python(46484) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.




In [27]:
import dask.dataframe as dd

# Set a larger sample size
sample_size = 500 * 1024 * 1024  # 500 MB sample size

# Set a block size to control memory usage and parallel processing
blocksize = 100 * 1024 * 1024  # 100 MB block size

# Read the CSV file into a Dask DataFrame with a larger sample size and block size
df = dd.read_csv(first_analysis_file_path, dtype=str, assume_missing=True, sample=sample_size, blocksize=blocksize, low_memory=False)

# Compute and display the first few rows
print(df.head())
# Compute and print the schema
print(df.dtypes)

  xParticipantID xOthPtID1 ProtocolID xCohortID xSiteID xCohortID_UH3  \
0    A10002-01-0       NaN   EWCP_New     AAX06     NaN         AAX06   
1    A10670-01-0       NaN   EWCP_New     AAX06     NaN         AAX06   
2    A20735-02-A       NaN   EWCP_New     AAL01     NaN         AAL02   
3    A30349-01-A       NaN   EWCP_New     AEA01     NaN         AEA01   
4    A37520-01-A       NaN   EWCP_New     ACA01     NaN         ACA01   

  ExplicitRegistration ParticipantType ParticipationLevel PregnancyNumber  \
0                    0               P                  2               1   
1                    0               P                  2               1   
2                    1               C                  2               2   
3                    1               C                  2               1   
4                    1               C                  2               1   

   ... X1_EC01.CFSM_AgeInYears  X1_EC05.CFSM_AgeInYears X1_EC03.SequenceNum  \
0  ...             

In [36]:
len(df.columns)

48699