In [None]:
#| default_exp meta_loader

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import os
import re
from typing import List, Any, Dict, Union
import warnings

import numpy as np
import pandas as pd
import dask.dataframe as dd

In [None]:
#| export
from pheno_utils.config import (
    DATASETS_PATH, 
    COHORT, 
    ERROR_ACTION
    )
from pheno_utils.pheno_loader import PhenoLoader

In [None]:
#| export

class MetaLoader:
    """
    Class to load multiple dictionaries and allows to easily access the relevant fields.

    Args:
    
        base_path (str, optional): The base path where the data is stored. Defaults to DATASETS_PATH.
        cohort (str, optional): The name of the cohort within the dataset. Defaults to COHORT.
        flexible_field_search (bool, optional): Whether to allow regex field search. Defaults to False.
        errors (str, optional): Whether to raise an error or issue a warning if missing data is encountered.
            Possible values are 'raise', 'warn' and 'ignore'. Defaults to 'raise'.
        **kwargs: Additional keyword arguments to pass to a DataLoader class.

    Attributes:
    
        dicts (pd.DataFrame): A dictionary of data dictionaries (dataframes) of all availbale datasets in the base_path.
        fields (list): A list of all fields.
        cohort (str): The name of the cohort being used.
        base_path (str): The base path where the data is stored.
        flexible_field_search (bool): Whether to allow regex field search.
        errors (str): Whether to raise an error or issue a warning if missing data is encountered.
        kwargs (dict): Additional keyword arguments to pass to a DataLoader class.
    """

    def __init__(
        self,
        base_path: str = DATASETS_PATH,
        cohort: str = COHORT,
        flexible_field_search: bool = False,
        errors: str = ERROR_ACTION,
        **kwargs,
    ) -> None:
        self.cohort = cohort
        self.base_path = base_path
        self.dataset_path = self.__get_dataset_path__()
        
        self.flexible_field_search = flexible_field_search
        self.errors = errors
        self.kwargs = kwargs

        self.__load_dictionaries__()

    def load(self, fields: Union[str,List[str]], flexible: bool=None, prop: str='tabular_field_name') -> pd.DataFrame:
        """
        Return a dataframe containing the fields from the respective datasets.

        Args:
            fields (Union[str,List[str]]): Fields to return
            flexible (bool, optional): Whether to use fuzzy matching to find fields. Defaults to None, which uses the DataLoader's flexible_field_search attribute.
            prop (str, optional): The property to use for searching. Defaults to 'tabular_field_name'.

        Returns:
            pd.DataFrame: Dataframe containing the fields from the respective datasets.
        """
        found_fields = self.get(fields, flexible, prop)
        if found_fields.empty:
            return pd.DataFrame()

        found_fields.columns = found_fields.columns.str.split('/').str[1]
        dup_fields = found_fields.columns.value_counts()\
            .to_frame('count').query('count > 1').index
        n_datasets = found_fields.loc['dataset'].nunique()

        loaded_fields = []
        for ds, f in found_fields.T.groupby('dataset'):
            df = PhenoLoader(ds, base_path=self.base_path, cohort=self.cohort,
                             age_sex_dataset=None, **self.kwargs)\
                [f.index.tolist()]
            if df.empty:
                continue

            if 'array_index' in df.index.names and n_datasets > 1:
                if df.index.get_level_values('array_index').nunique() > 1:
                    df = df.reset_index('array_index', drop=False)\
                        .rename(columns={'array_index': f'{ds}__array_index'})
                else:
                    df = df.reset_index('array_index', drop=True)

            # rename duplicate fields
            df = df.rename(columns=pd.Series(f'{ds}__' + dup_fields, index=dup_fields))

            if not len(loaded_fields):
                loaded_fields = df
                continue

            loaded_fields = loaded_fields.join(df, how='outer')

        return loaded_fields

    def get(self, fields: Union[str,List[str]], flexible: bool=None, prop='tabular_field_name') -> pd.DataFrame:
        """
        Return metadata for the specified fields from all tables.

        Args:
            fields (List[str]): Fields to return
            flexible (bool, optional): Whether to use fuzzy matching to find fields. Defaults to None, which uses the DataLoader's flexible_field_search attribute.
            prop (str, optional): The property to use for searching. Defaults to 'tabular_field_name'.

        Returns:
            pd.DataFrame: Data for the specified fields from all tables
        """
        if flexible is None:
            flexible = self.flexible_field_search
        if isinstance(fields, str):
            fields = [fields]
        fields = pd.DataFrame({'field': [f.lower() for f in fields]}).assign(dataset=None)

        if prop == 'tabular_field_name':
            ind = fields['field'].str.contains('/')
            fields.loc[ind, 'dataset'] = fields.loc[ind, 'field'].str.split('/').str[0]
            fields.loc[ind, 'field'] = fields.loc[ind, 'field'].str.split('/').str[1]

        data = pd.DataFrame()
        for dataset, df in self.dicts.items():
            keep = (fields['dataset'] == dataset) | fields['dataset'].isnull()
            fields_in_dataset = fields.loc[keep, 'field']

            if prop == 'tabular_field_name':
                search_in = pd.Series(df.columns, index=df.columns).str.lower()
            else:
                search_in = df.loc[prop].dropna().str.lower()
            if flexible:
                # use fuzzy matching including regex to find fields
                fields_in_col = np.unique([col for f in fields_in_dataset for col, text in search_in.items()
                                           if type(text) is str and re.search(f, text)])
            else:
                fields_in_col = search_in[search_in.isin(fields_in_dataset)].index

            if len(fields_in_col):
                this_data = df[fields_in_col]
                this_data.columns = dataset + '/' + this_data.columns
                data = self.__concat__(data, this_data)

        return data

    def __repr__(self):
        """
        Return string representation of object

        Returns:
            str: String representation of object
        """
        return self.__str__()

    def __str__(self):
        """
        Return string representation of object

        Returns:
            str: String representation of object
        """
        ds_list = str(list(self.dicts.keys())).replace(',', '\n')
        return f'MetaLoader for: {self.dataset_path}\n' + \
            f'with {len(self.fields)} fields\n{len(self.dicts)} datasets:\n{ds_list}'

    def __getitem__(self, fields: Union[str,List[str]]):
        """
        Return data for the specified fields from all tables

        Args:
            fields (Union[str, List[str]]): Fields to return

        Returns:
            pd.DataFrame: Data for the specified fields from all tables
        """
        return self.get(fields)

    def __concat__(self, df1, df2):
        if df1.empty:
            return df2
        if df2.empty:
            return df1
        return df1.join(df2, how='outer')

    def __load_dictionaries__(self) -> None:
        """
        Load all dictionaries in the base_path.
        """
        dicts = dd.read_csv(os.path.join(self.dataset_path, '*_dict*.csv'),
                            include_path_column=True, dtype={'parent_dataframe': 'object', 'sampling_rate': 'object'}).compute()
        if self.cohort is None:
            dataset_ind = -2
        else:
            dataset_ind = -3
        dicts['dataset'] = dicts['path'].str.split('/').str[dataset_ind]
        dicts = dicts.drop(columns=['path'])
        self.fields = dicts['tabular_field_name'].unique()

        self.dicts = {}
        col_order = ['dataset'] + dicts.columns.drop('dataset').tolist()
        for dataset in dicts['dataset'].unique():
            self.dicts[dataset] = dicts.loc[dicts['dataset'] == dataset, col_order].set_index('tabular_field_name').T

    def __get_dataset_path__(self):
        """
        Get the dataset path.

        Args:
            dataset (str): the name of the dataset

        Returns:
            str: the path to the dataset
        """
        if self.cohort is not None:
            return os.path.join(self.base_path, '*', self.cohort)
        return os.path.join(self.base_path, '*')


The `MetaLoader` can be used to query all availbale fields throughout all datasets. In the following example, 3 datasets are available.

In [None]:
ml = MetaLoader()
ml

MetaLoader for: examples/*
with 81 fields
4 datasets:
['cgm'
 'diet_logging'
 'fundus'
 'sleep']

The object contains only the data dictionaries (metadata) of these datasets, where the columns correspond to columns in the data tables of the dataset (e.g., fundus).

In [None]:
ml.dicts['fundus']

tabular_field_name,fundus_image_left,fundus_image_right,collection_date
dataset,fundus,fundus,fundus
field_string,Fundus image (left),Fundus image (right),Collection date (YYYY-MM-DD)
description_string,Fundus image (left),Fundus image (right),Collection date (YYYY-MM-DD)
parent_dataframe,,,
relative_location,fundus/fundus.parquet,fundus/fundus.parquet,fundus/fundus.parquet
value_type,Text,Text,Date
units,,,Time
sampling_rate,,,
item_type,Bulk,Bulk,Data
array,Single,Single,Single


You can query fields from multiple datasets directly.

In [None]:
ml[['glucose', 'fundus_image_left', 'fundus/collection_date']]

tabular_field_name,cgm/glucose,fundus/fundus_image_left,fundus/collection_date
dataset,cgm,fundus,fundus
field_string,Glucose,Fundus image (left),Collection date (YYYY-MM-DD)
description_string,cgm temporal glucose values,Fundus image (left),Collection date (YYYY-MM-DD)
parent_dataframe,,,
relative_location,cgm/cgm.parquet,fundus/fundus.parquet,fundus/fundus.parquet
value_type,"Series data, continous",Text,Date
units,mg/dl,,Time
sampling_rate,15min,,
item_type,Data,Bulk,Data
array,Single,Single,Single


Note that in the example above, for `collection_date` (that is common to all datasets) the dataset is specified in the prefix `fundus/`. Therefore, the loader returns the field from the fundus imaging dataset. Omitting this prefix will return all `collection_date` fields in the Human Phenotype Project.

You can then use the `MetaLoader` to load the actual data of fields from multiple datasets. Here we load `glucose` from the CGM dataset, and `fundus_image_left` from the fundus dataset.

In [None]:
ml.load(['glucose' ,'fundus_image_left', 'fundus/collection_date']).head()

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,glucose,fundus_image_left,collection_date
participant_id,collection_timestamp,connection_id,cohort,research_stage,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
0,2020-05-25 10:48:00+03:00,1000001,10k,00_00_visit,111.6,/path/to/file,2022-11-16
0,2020-05-25 11:03:00+03:00,1000001,10k,00_00_visit,79.2,/path/to/file,2022-11-16
0,2020-05-25 11:18:00+03:00,1000001,10k,00_00_visit,84.6,/path/to/file,2022-11-16
0,2020-05-25 11:33:00+03:00,1000001,10k,00_00_visit,106.2,/path/to/file,2022-11-16
0,2020-05-25 11:48:00+03:00,1000001,10k,00_00_visit,102.6,/path/to/file,2022-11-16


You may use more flexible search queries using regex and various properties of the fields. Both the `get()` method and `load()` method support the same syntax.

1. Example: get all bulk data fields.

In [None]:
ml.get('bulk', flexible=True, prop='item_type')

tabular_field_name,cgm/cgm_filename,fundus/fundus_image_left,fundus/fundus_image_right
dataset,cgm,fundus,fundus
field_string,CGM timeseries,Fundus image (left),Fundus image (right)
description_string,Name of the file containing the participants' ...,Fundus image (left),Fundus image (right)
parent_dataframe,,,
relative_location,cgm/cgm.parquet,fundus/fundus.parquet,fundus/fundus.parquet
value_type,Text,Text,Text
units,,,
sampling_rate,,,
item_type,Bulk,Bulk,Bulk
array,Single,Single,Single


2. Example: get all fields that include "mg" in their units

In [None]:
ml.get('mg', flexible=True, prop='units')

tabular_field_name,cgm/1st qu_,cgm/3rd qu_,cgm/auc,cgm/ea1c,cgm/glucose,cgm/gmi,cgm/iqr,cgm/mad,cgm/mag,cgm/mage,...,cgm/modd,cgm/range,cgm/sd,cgm/sdb,cgm/sdbdm,cgm/sddm,cgm/sdhhmm,cgm/sdw,cgm/sdwsh,diet_logging/sodium_mg
dataset,cgm,cgm,cgm,cgm,cgm,cgm,cgm,cgm,cgm,cgm,...,cgm,cgm,cgm,cgm,cgm,cgm,cgm,cgm,cgm,diet_logging
field_string,1st quantile,3rd quantile,AUC,eA1C,Glucose,GMI,IQR,MAD,MAG,MAGE,...,MODD,Range,SD,SDb,SDbdm,SDdm,SDhhmm,SDw,SDwsh,Sodium intake per food logged
description_string,First quantile of all glucose values.,Third quantile of all glucose values.,"Hourly average AUC. This measure integrates, t...",A linear transformation of the mean glucose va...,cgm temporal glucose values,A linear transformation of the mean glucose va...,"Interquartile range (IQR), calculated as the d...",Median Absolute Deviation (MAD). This is a mea...,Mean Absolute Glucose (MAG). This is a measure...,"Mean Amplitude of Glycemic Excursions (MAGE), ...",...,Mean difference between glucose values obtaine...,Difference between the maximum and minimum glu...,Standard deviation of all glucose values.,"SD between days, within time points. Mean valu...","SD between days, within time points, corrected...","Horizontal SD. SD of the mean glucose values, ...",SD between time points. Standard deviation of ...,Vertical SD within days. Average value of the ...,SD within series. Taking hour-long intervals t...,Sodium intake per food logged
parent_dataframe,,,,,,,,,,,...,,,,,,,,,,
relative_location,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,...,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,cgm/cgm.parquet,diet_logging/diet_logging.parquet
value_type,Continuous,Continuous,Continuous,Continuous,"Series data, continous",Continuous,Continuous,Continuous,Continuous,Continuous,...,Continuous,Continuous,Continuous,Continuous,Continuous,Continuous,Continuous,Continuous,Continuous,Continuous
units,mg/dl,mg/dl,mg/dl*h,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,...,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg/dl,mg
sampling_rate,,,,,15min,,,,,,...,,,,,,,,,,
item_type,Data,Data,Data,Data,Data,Data,Data,Data,Data,Data,...,Data,Data,Data,Data,Data,Data,Data,Data,Data,Data
array,Single,Single,Single,Single,Single,Single,Single,Single,Single,Single,...,Single,Single,Single,Single,Single,Single,Single,Single,Single,Single


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()