<a href="https://colab.research.google.com/github/hmelberg/health-analytics-using-python/blob/master/importing_from_multiple_files.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Importing from multiple files

Sometimes health data arrives in muplie files, for instance one file for each year. Our task is to extract information across the different files. For instance, to extract all events and information about persons who at one point have received a diabetes diagnosis in one of the files.

To make things even more difficult, sometimes these files may have different headlines or different systems for coding variables. For instance, the categories for etnicity or gender may have been expanded in recent years, which means that we must recategorize data of previous years for data to be comparable and aggregated in one file. And to top it off, the data-types for similar variables may - for reasons only known to my enemies - randomly change datatypes. Integers may be stored as floats and so on. 

These problems may often cause a lot of time consuming munging, and it would be useful to have some functions that could make it easier. This notebook will create tools to help with this.

In [None]:
# imports

import pandas as pd
import numpy as np
import os
import re
import pickle


In [None]:
#%% helper functions
def _tolist(string_or_list):
  """
  returns a list

    string becomes a list of one object
    tuple becomes a list
    list remains a list
    """
    if isinstance(string_or_list, list):
        return string_or_list
    else:
        return [string_or_list]
    
def _invert(schema):
  """
  a 
  """
    inverted = dict( (v,k) for k in schema 
                       for v in schema[k] )
    return inverted


In [None]:

def _totuple(str_or_list):
    """
    converts a string or a list into a tuple
        tuplify('npr20')
        tuplify(['results', 'resultater'])
        
    note: may seems unnecessary, but it is suprisingly helpful
        allows input to be single str or list and then changed
    """
    
    if type(str_or_list) == str:
        result = tuple([str_or_list])
    if type(str_or_list) == list:
        result = tuple(str_or_list)
    return result

def _check_if_path(path):
    """ gets the current path if no path is specified and 
        makes a list of it if only one path is specified"""

    if not path:
        path = [os.getcwd()]
    if type(path) == str:
        path = [path]

    if not path[0].endswith('/'):
        path = [path[0] +'/']        
    return path
#%%
def extract_filename(file, ignore_path=True, ignore_format=True):
    """ Extracts filename from a string
        
    >>> extract_filename('C:/dat/meps/annual/meps2014.csv')
    meps2004
    """
    tmp = file    
    if ignore_path:
        tmp = file.split('/')[-1]
    if ignore_format:
        tmp = tmp.split('.')[0]
    return tmp

#%%
def get_vars(files=None):
    """
    returns the variable names in a file or list of files
    
    Parameters
    ----------
    
    files    : str or list
        filename or list of filenames
    
    Returns
    -------
    dict
    
    A dictionary with the filenames as keys and the column names as values
    """
    
    files=_tolist(files)        
    variables = {}

    for i, file in enumerate(files):
        tmp = pd.read_csv(file, nrows = 5, header = 0) 
        variables[file] = tmp.columns.tolist()
    return variables

def get_dtypes(files=None):
    files=_tolist(files)
    datatypes = {}
    for i, file in enumerate(files):
        tmp = pd.read_csv(file, nrows = 5, header = 0) 
        #variables[file] = tmp.columns.tolist()
        datatypes[file] = tmp.dtypes.tolist()
    return datatypes

def get_nfirst(files=None, n=5):
    files=_tolist(files)
    nfirst = {}
    for i, file in enumerate(files):
        nfirst[file] = pd.read_csv(file, nrows=n, header=0)
    return nfirst

# Rewrite this (Very inefficient and may cause errors if n is larger than rows in file)        
def get_nlast(files=None, n=5):
    files=_tolist(files)
    nlast={}
    for i, file in enumerate(files):
        tmp = pd.read_csv(file, header=0) #what if no header
        nlast[file] = tmp.tail(5)
    return nlast

#%%
def explore(files=None, rows=5):
    files=_tolist(files)
    nfirst = {}
    variables = {}
    datatypes = {}
    for i, file in enumerate(files):
        nfirst[file] = pd.read_csv(file, nrows=rows, header=0)
        variables[file] = nfirst[file].columns.tolist()
        datatypes[file] = nfirst[file].dtypes.tolist()
    return nfirst, variables, datatypes



#%%
def get_filelist(path=None, 
                 starts_with=None, 
                 ends_with=None, 
                 contains=None, 
                 subpaths=False, 
                 strip_folders=False, 
                 strip_file_formats=False,
                 regexp=None, 
                 only_filename=False):
    
    """ 
    Returns a list of files  
    
    
    parameters
    ----------
        path (str or list) : the directory to search (str) 
                             or a list of directories to search
        starts_with (str or list): only includes files that starts with a
            given string (or list of strings). Default: None.
            
        ends_with (str or list): only includes files that ends with a
            given string (or list of strings). Default: None.
            
            
        ends_with (str or list): only includes files that ends with a
            given string (or list of strings). Default: None.
            
        contains (str or list): only includes files that contains a
            given string (or list of strings). Default: None.
        
        subpaths (bool) : Includes all subdirectories if True. Dafault: False
    
        example
        -------
        files = get_filelist(path = 'C:/dat/meps/annual/', starts_with='meps', ends_with='csv')
    """
    
    # Note: may be extended to include list of paths, and/or all subpaths
    
    path = _check_if_path(path)
    all_files = []

    for folder in path:
        # include subdirectories if subpaths is True            
        if subpaths: 
            files = [x[0] for x in os.walk(folder)]
        else:
            files = os.listdir(folder)
        
        # include only files that satisfy given criteria
        files = select_from_filelist(files=files, 
                             starts_with=starts_with, 
                             ends_with=ends_with, 
                             contains=contains, 
                             regexp=regexp, 
                             only_filename=only_filename)
        
        # make list of all files
        full_files = [folder+file for file in files]
        
        all_files.append(full_files)
        
    return all_files[0] #check if this works with multiple dirs
    
#%%
def select_from_list(lst, 
                 starts_with=None, 
                 ends_with=None, 
                 contains=None, 
                 regexp=None):
    """
    Selects some elements from a list of strings
    
    Example
    In a list of many many meps files for several years, get only files
    from year 2000:
        
    >>>files = select_from_list(files, starts_with="meps20", only_filename=True)
    >>>k73x = select_from_list(icdtopid.keys, starts_with='K73')
    
    """
    # hmm not happy with this ... not clear if it is and or or when multiple conditions are specified
    
   
    if starts_with:
        selected = [element for element in lst if element.startswith(starts_with)]
    
    if ends_with:
        selected = [element for element in lst if element.endswith(ends_with)]
        
    if contains:
        selected = [element for element in lst if contains in element]
        
    if regexp:
        regxpr=re.complie(regexpr)
        selected = [element for element in lst if regxpr.search(element) is not None]  
           
    return selected

#%%
def select_from_filelist(files, 
                 starts_with=None, 
                 ends_with=None, 
                 contains=None, 
                 regexp=None, 
                 ignore_path=True, 
                 ignore_format=True,
                 only_filename=True):
    """
    Selects some elements from a list of strings
    
    Example
    In a list of many many meps files for several years, get only files
    from year 2000:
        
    >>>files = select_from_list(files, starts_with="meps20", only_filename=True)
    >>>k73x = select_from_list(icdtopid.keys, starts_with='K73')
    
    """
    
    for file in files:
        if starts_with:
            beginnings = _totuple(starts_with)
            files = [file for file in files 
                     if extract_filename(file=file).startswith(beginnings)]
        if ends_with:
            endings = _totuple(ends_with)
            files = [file for file in files if file.endswith(endings)]
        if contains:
            files = [file for file in files if contains in file]
        if regexp:
            regxpr=re.complie(regexp)
            files = [file for file in files if regxpr.search(file) is not None]             
    return files
      

#%%
def ids_from_csv(files,
                 find,
                 col_info = {'single': ['icdmain'], 'multi': ['icdbi']},
                 id_col='pid',
                 schema={'pid': ['pid']},
                 query=None, 
                 dtype=None,
                 union=False,
                 **kwargs):
    """
    Get set of ids from rows that satisfy some conditions in a list of csv files
            
    Parameters
    ----------
        files: (list) List of files to include
        
        id_col: (string) Column name containing the ids
        
        schema: (dictionary) 
            A mapping of desired column names (keys) to the equivalent and 
            possible diverse column names in the various files. 
            Example: The columns with id and year information may have 
            different names in different files and we want to label 
            all the id columns "pid" and similarly for 'year':
                
                schema={'pid'  : ['id', 'ID', 'person'], 
                        'year' : ['jahre', 'year', 'yyyy']}
                
        find: (dictionary) 
            Key is column name to search, 
            Value is list of strings to search for
            
            Example: find = {'icd_main': ['K50, K51']}
        
        query: (string) 
            Text specifying a query. 
            Example: query = "age > 18"
            
        union: (bool, default: False)
            If True, returns a single set of the union of all ids
        
                                      
    Example
    -------
    Get ids for indidiuals who have K50 or K51 in the column icd_main:
    
    >>>ibd_codes = {'icd_main': ['K50, K51']}       
    >>>ids_from_csv(df, id_col='pid' find=ibd_codes)
    
    Returns
    -------
        dictionary with files as keys and a set of ids as values
    """
    
    old_to_new = _invert(schema)
    #original_cols= old_to_new.keys()
    
    oldvars = {var for varlist in schema.values() for var in varlist}
    
    files=_tolist(files)
    ids={}
    
    for file in files:
        print(file)
        header = pd.read_csv(file, nrows=0)
        header = set(header.columns)
        usecols = header.intersection(oldvars)
        df = pd.read_csv(file, usecols=usecols, dtype=dtype)
        df = df.rename(columns=old_to_new)                     
        idset = ids_from_df(df=df, id_col=id_col, find=find, query=query, **kwargs)
        ids[file] = idset   
    
    if union:
        ids = set.union(*ids.values())
        #alternatively a dict with all the same values for the filekeys
        
    return ids
#%%

def ids_from_df(df, id_col='pid', find=None, query=None, **kwargs):
    """
        Gets the ids that satisfy some conditions
                
        Parameters
        ----------
            df: (dataframe) Dataframe
            id_col: (string) name of column with the ids
            find: (dictionary) Key is column name to search, value is list of strings to search for
            query: (string) String specifying a selection query. Example: "year > 2003"
            out: (string, 'set') Format to return the ids
            
        Example
        -------
        Get all ids for peole who have K50 or K51 in the column icd_main:
        
        >>>find = {'icd_main': ['K50, K51']}       
        >>>ids_from_df(df, id_col='pid', find=None, query=None, out='set', **kwargs)
        
        Returns
        -------
            Set
    """
    
    if query:
        df = df.query(query)
        
    if find:
        #boolean array, staring point: all false, no rows are included
        combined=np.array([False] * len(df))
        
        for var, searchlist in find.items():
            searchstr = "|".join(searchlist)
            true_if_found = df[var].str.contains(searchstr, na=False)
            combined = np.logical_or(combined, np.array(true_if_found))
        df=df[combined]
        #potentially better way if we know single item columns
        #not finished code
#        if col_regex=True:
#                sing_item_cols = df.filter(regex=col_regex, axis=1)
#        mask = df[sing_item_cols].isin(searchlist)
#        true_if_found = mask.any
#        
    ids = set(df[id_col])
         
    return ids

#%%
def read_csv_using_ids(files, 
                 ids, 
                 schema=None, 
                 id_col='pid', 
                 columns=None, 
                 select=None, 
                 dtype=None, 
                 **kwargs):
    """
    Aggregate selected rows and columns from a list of csv files to a single dataframe
    files: list
        list of csv files to read
        
    ids: set or dict
        if set: use same set of ids to read all files
        if dict: key is file, values is set of ids to include for the file
           
    schema : dict (with lists of columns as values)
        keys are the desired column names that corresponod to the list of columns names in values
        example: 
            A schema to so specify that files named 'person' 'number' and 'id' in the different csv files contain the same information and should be labelled 'pid:
            {'pid' : ['person', number', 'id']}
                    
    id_col: str (default is 'pid')
        the column name that containts information about id (column names from the schema)
        
    columns: List
        the columns in the files to be read (column names from the schema)
  
    select: string
        additional query to limit the result
        example: query='female==1'
         
    dtype:dictionary of dtypes (column names from the schema)
        speficy the dtypes of the columns
    
    """
    
    files=_tolist(files)
    dfs=[]
        
    if schema:
        oldvars = {var for varlist in schema.values() for var in varlist}
        old_to_new = dict( (v,k) for k in schema for v in schema[k] )
        
    for file in files:
        print(file)
        header = pd.read_csv(file, nrows=0, encoding='latin-1')
        header = set(header.columns)
        
        # is columns are specified, use this, if not, use all columns 
        if columns:        
            oldvars = {var for k, varlist in schema.items() 
                        if k in columns 
                        for var in varlist} 
            
            use_columns = header.intersection(oldvars)
        else:
            use_columns = header
        
        df = pd.read_csv(file, usecols=use_columns, encoding='latin-1')
        
        if schema:
            df = df.rename(columns=old_to_new)
        
        # allow user to input a single id set/list that applies to all files
        if (isinstance(ids, set)) or (isinstance(ids, list)):
            df = df[df[id_col].isin(ids)]
        else:
            df = df[df[id_col].isin(ids[file])]
     
        if select:
            df = df.query(select)
        dfs.append(df)
    dfs=pd.concat(dfs)
    return dfs

#%%

def read_hdf_using_ids(file_keys,
                 ids, 
                 schema=None, 
                 id_col='pid', 
                 columns=None, 
                 select=None, 
                 dtype=None, 
                 **kwargs):
    """
    Aggregate selected rows and columns from a tables in a hdf datastore to a single dataframe
    path: dict
        a dictionary of paths with a list of keys for the files to be read
        example:  
            path_keys = {'Q:/mepsdata/annual/hdf/meps.h5' :['meps1992', 'meps1993']}
        
    ids: dict
        a dict with the ids of the rows to be included in each file
        the key in the dictionary is a tuple with path and key to the table
        example: ids = {('Q:/mepsdata/annual/hdf/meps.h5', 'meps1992'): [28, 35]}
           
    schema : dict (with lists of columns as values)
        keys are the desired column names that corresponod to the list of columns names in values
        example: 
            A schema to so specify that files named 'person' 'number' and 'id' in the different csv files contain the same information and should be labelled 'pid:
            {'pid' : ['person', number', 'id']}
                    
    id_col= str (default is 'pid')
        the column name that containts information about id (column names from the schema)
        
    columns= List
        the columns in the files to be read (column names from the schema)
  
    select: string
        additional query to limit the result
        example: query='female==1'
         
    dtype=dictionary of dtypes (column names from the schema)
        speficy the dtypes of the columns
    """
    
    dfs=[]
    
    # allow user to input a list of ids (instead of dicts with separate ids 
    # for every (file, key) combination
    # if it is a list, every file select for these ids
    #
    
    if (isinstance(ids, list)) or (isinstance(ids, set)):
        ids_new={}
        for file, keys in file_keys.items():
            for key in keys:
                ids_new[(file, key)] = ids
        ids = ids_new
        
    if schema:
        oldvars = {var for varlist in schema.values() for var in varlist}
        old_to_new = dict( (v,k) for k in schema for v in schema[k] )
    
    for file, keys in file_keys.items():
        for key in keys:
            header = pd.read_hdf(file, key, start=0, stop=1)
            header = set(header.columns)
            
            if columns:        
                oldvars = {var for k, varlist in schema.items() 
                            if k in columns 
                            for var in varlist} 
                
                use_columns = header.intersection(oldvars)
            else:
                use_columns = header
                
            store= pd.HDFStore(file)
            #hmm, should not use pid here, use old pid (since pid may not exist)
            #better, but not perfect since there may two is columns in some files?
            #to solve this: need "true" file schema. one for each file
            #or make user input a pid dictionary?
            
            #local_pid_col = set(old_to_new['pid']).intersection(header)
                        
            all_ids = store.select_column(key, id_col)
            selected_ids = ids[(file, key)]
            idarray = all_ids.isin(selected_ids)
            
            df = pd.read_hdf(file, key, columns=use_columns, where=idarray)
            
            if schema:
                df = df.rename(columns=old_to_new)
                     
            if select:
                df = df.query(select)
            dfs.append(df)
            
    dfs=pd.concat(dfs)
    return dfs