In [None]:
# default_exp utils

In [None]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


# Notations

# general utility functions


In [None]:
#export

import re
import numpy as np
import pandas as pd
from functools import singledispatch

## make codes

In [None]:
#export
def make_codes(n=100, letters=26, numbers=100, seed=False):
  """
  Generate a dataframe with a column of random codes

  Args:
    letters (int): The number of different letters to use
    numbers (int): The number of different numbers to use

  Returns
    A dataframe with a column with one or more codes in the rows

  """
  # each code is assumed to consist of a letter and a number
  alphabet = list('abcdefghigjklmnopqrstuvwxyz')
  letters=alphabet[:letters+1]

  # make random numbers same if seed is specified
  if seed:
    np.random.seed(0)

  # determine the number of codes to be drawn for each event
  n_codes=np.random.negative_binomial(1, p=0.3, size=n)
  # avoid zero (all events have to have at least one code)
  n_codes=n_codes+1

  # for each event, randomly generate a the number of codes specified by n_codes
  codes=[]
  for i in n_codes:
      diag = [np.random.choice(letters).upper()+
              str(int(np.random.uniform(low=1, high=numbers)))
              for num in range(i)]

      code_string=','.join(diag)
      codes.append(code_string)

  # create a dataframe based on the list
  df=pd.DataFrame(codes)
  df.columns=['code']

  return df

## make data

In [None]:
#export
def make_data(n=100, letters=26, numbers=100, seed=False, expand=False, 
              columns=['pid', 'gender', 'birth_date', 'date', 'region', 'codes']):
  """
  Generate a dataframe with a column of random codes

  Args:
    letters (int): The number of different letters to use
    numbers (int): The number of different numbers to use

  Returns
    A dataframe with a column with one or more codes in the rows
  
  Examples
    >>>df = make_data(n=100, letters=5, numbers=5, seed=True)
  """
  
  if seed:
    np.random.seed(seed=seed)
  pid = range(n)
  df_person=pd.DataFrame(index = pid)

  #female = np.random.binomial(1, 0.5, size =n)
  gender = np.random.choice(['male', 'female'], size=n)
  region = np.random.choice(['north', 'south', 'east', 'west'], size=n)
  birth_year = np.random.randint(1920, 2019, size=n)
  birth_month = np.random.randint(1,12, size=n)
  birth_day = np.random.randint(1,28, size=n) # ok, I know!
  events_per_year = np.random.poisson(1, size=n)
  years = 2020 - birth_year
  events = years * events_per_year
  events = np.where(events==0,1,events)
  events = events.astype(int)
  all_codes=[]
  codes = [all_codes.extend(make_codes(n=n, letters=letters,
                                       numbers=numbers,
                                       seed=seed)['code'].tolist())
          for n in events]

  days_alive = (2020 - birth_year) *365

  days_and_events = zip(days_alive.tolist(), events.tolist())
  all_days=[]
  days_after_birth = [all_days.extend(np.random.randint(0, max_day, size=n)) for max_day, n in days_and_events]
  pid_and_events = zip(list(pid), events.tolist())
  all_pids=[]
  pids = [all_pids.extend([p+1]*e) for p, e in pid_and_events]

  df_events = pd.DataFrame(index=all_pids)
  df_events['codes'] = all_codes
  df_events['days_after'] = all_days

  #df_person['female'] = female
  df_person['gender'] = gender

  df_person['region'] = region
  df_person['year'] = birth_year
  df_person['month'] = birth_month
  df_person['day'] = birth_day
  df = df_events.merge(df_person, left_index=True, right_index=True)
  df['birth_date'] = pd.to_datetime(df[['year', 'month', 'day']])
  df['date'] = df['birth_date'] + pd.to_timedelta(df.days_after, unit='d')
  del df['month']
  del df['day']
  del df['days_after']
  df['pid'] = df.index
  df.index_name = 'pid_index'
  df=df.sort_values(['pid', 'date'])
  df=df[columns]
  
  if expand:
    splitted = df.codes.str.split(',', expand=True).add_prefix('code_').fillna(np.nan)
    df = pd.concat([df,splitted], axis=1)
    del df['codes']
  # include deaths too?

  return df

## get rows

In [None]:
#export
# mark rows that contain certain codes in one or more colums
def get_rows(df, codes, cols=None, sep=None, pid='pid', all_codes=None, fix=True, info=None):
  """
  Make a boolean series that is true for all rows that contain the codes

  Args
    df (dataframe or series): The dataframe with codes
    codes (str, list, set, dict): codes to be counted
    cols (str or list): list of columns to search in
    sep (str): The symbol that seperates the codes if there are multiple codes in a cell
    pid (str): The name of the column with the personal identifier

  >>>get_rows(df=df, codes='F3', cols='codes', sep=',')

  """

  # check if evaluated previously
  info, rows = memory(info=info, func = 'get_rows', expr=codes)
  if rows:
    return rows

  # check if codes and columns need to be expanded (needed if they use notation)
  if fix:
    # do this when if cols exist, but if it does not ...
    cols = expand_columns(cols, all_columns=list(df.columns), info=info)
    all_codes = sorted(unique(df=df, cols=cols, sep=sep))
    codes = expand_code(codes, all_codes=all_codes)

  # codes and cols should be lists
  codes = listify(codes)
  cols = listify(cols)

  # approach depends on whether we have multi-value cells or not
  # if sep exist, then have multi-value cells
  if sep:
    # have multi-valued cells
    # note: this assumes the sep is a regex word delimiter
    codes = [rf'\b{code}\b' for code in codes]
    codes_regex = '|'.join(codes)

    # starting point: no codes have been found
    # needed since otherwise the function might return None if no codes exist
    rows = pd.Series(False*len(df),index=df.index)

   # loop over all columns and mark when a code exist
    for col in cols:
      rows=rows | df[col].str.contains(codes_regex, na=False)

  # if not multi valued cells
  else:
    mask = df[cols].isin(codes)
    rows = mask.any(axis=1)
  return rows

## extract codes

In [None]:
#export
def extract_codes(df, codes, cols=None, sep=None, new_sep=',', na_rep='',
                  prefix=None, merge=False, out='bool', fix=True, 
                  series=True, group=False, all_codes=None, info=None):
    """
    Produce one or more columns with only selected codes

    Args:
        df (dataframe): Dataframe with events

        codes (string, list or dict): The codes for the disease

        cols (string, list): Name of columns where codes are located

        sep (string, default: None): Separator between codes in same cell (if exist)
            (If None, the function will infer the separator)

        pid (str, default: 'pid'): Name of column with the personal identification number

        codebook (list): User specified list of all possible or allowed codes

        merge (bool): Content of all columns is merged to one series # only if out='text'?

        group (bool): Star an other notation remain a single group, not split into individual codes

        out (string, ['text', 'category', 'bool' or 'int']): Datatype of output column(s)

    Notes:
        Can produce a set of dummy columns for codes and code groups.
        Can also produce a merged column with only extracted codes.
        Accept star notation.
        Also accepts both single value columns and columns with compound codes and separators
        Repeat events in same rows are only extracted once


    Example:
    to create three dummy columns, based on codes in icdmain column:

    >>> extract_codes(df=df,
    >>>          codes={'fracture' : 'S72*', 'cd': 'K50*', 'uc': 'K51*'},
    >>>          cols=['icdmain', 'icdbi'],
    >>>          merge=False,
    >>>          out='text')

    extract_codes(df=df, codes={'b':['A1','F3'], 'c':'c*'}, cols='codes', sep=',', merge = False)
    extract_codes(df=df, codes={'b':['A1','F3'], 'c':'C*'}, cols='codes', sep=',', merge = False)
    extract_codes(df=df, codes=['A1','F3', 'C*'], cols='codes', sep=',', merge = False)
    extract_codes(df=df, codes='C*', cols='codes', sep=',', merge = False)

    nb: problem with extract rows if dataframe is empty (none of the requested codes)
    """

    if isinstance(df, pd.Series):
        df=df.to_frame()
        cols=[df.columns]
    
    if not cols:
        cols=[df.columns]
        
    if fix:
        cols=expand_columns(cols, all_columns=list(df.columns))
        all_codes = unique(df=df, cols=cols, sep=sep)
        
        if isinstance(codes, str):
            codes=listify(codes)
        if (isinstance(codes, list)) and (not merge):
            codes = expand_code(codes, all_codes=all_codes, info=info)       
            codes = {code:code for code in codes}
        if (isinstance(codes, list)) and (merge):
            codes = {str(tuple(codes)):codes}
            codes = expand_code(codes, all_codes=all_codes, info=info)    
        print('after fix', cols, codes)
          
    subset = pd.DataFrame(index=df.index)

    for k, v in codes.items():
        if v:
          rows = get_rows(df=df, codes=v, cols=cols, sep=sep, all_codes=all_codes, fix=False)
        else:
          rows=False

        if out == 'bool':
            subset[k] = rows
        elif out == 'int':
            subset[k] = rows.astype(int)
        elif out == 'category':
            subset.loc[rows, k] = k
            subset[k] = subset[k].astype('category')
        else:
            subset[k] = na_rep
            subset.loc[rows, k] = k

    if (merge) and (out == 'bool'):
        subset = subset.astype(int).astype(str)

    new_codes = list(subset.columns)

    if (merge) and (len(codes) > 1):
        headline = ', '.join(new_codes)
        merged = subset.iloc[:, 0].str.cat(subset.iloc[:, 1:].values, sep=new_sep,
                                           na_rep=na_rep)  # strange .T.values seemed to work previouslyi but it should not have
        merged = merged.str.strip(',')
        subset = merged
        subset.name = headline
        if out == 'category':
            subset = subset.astype('category')

    # return a series if only one code is asked for (and also if merged?)
    if series and (len(codes) == 1):
        subset = subset.squeeze()

    return subset


# General helper functions

## Info

In [None]:
#export
class Info():
  """
  A class to store information about the data and results from analysis
  """
  def __init__(self):
      self.evaluated = {}

## memory

In [None]:
#export
def memory(info, func, expr):
  """
  checks if the function has been called with the same argument previously and
  if so, returns the same results instead of running the function again

  args:
    -
  """
  rows=None
  if info:
    if func in info.evaluated:
      if expr in info.evaluated[func]:
        rows = info.evaluated[func][expr]
    else:
      info.evaluated[func] = {}
  else:
    info = Info()
    info.evaluated[func] = {}
  return info, rows

## listify

In [None]:
#export
def listify(string_or_list):
    """
    return a list if the input is a string, if not: returns the input as it was

    Args:
        string_or_list (str or any):

    Returns:
        A list if the input is a string, if not: returns the input as it was

    Note:
        - allows user to use a string as an argument instead of single lists
        - cols='icd10' is allowed instead of cols=['icd10']
        - cols='icd10' is transformed to cols=['icd10'] by this function

    """
    if isinstance(string_or_list, str):
        string_or_list = [string_or_list]
    return string_or_list

In [None]:
#export
def reverse_dict(dikt):
    new_dict = {}
    for name, codelist in dikt.items():
        codelist = _listify(codelist)
        new_dict.update({code: name for code in codelist})
    return new_dict

# Notation

## del dot and zero

In [None]:
#export
def del_dot(code):
  if isinstance(code, str):
    return code.replace('.','')
  else:
    codes = [c.replace('.','') for c in code]
  return codes

def del_zero(code, left=True, right=False):
  if isinstance(codes, str):
    codes=[code]
  if left:
    codes = [c.lstrip('0') for c in code]
  if right:
    codes = [c.rstrip('0') for c in code]
  if isinstance(code, str):
    codes=codes[0]
  return codes

## expand hyphen

In [None]:
#export
# function to expand a string like 'K51.2-K53.8' to a list of codes

# Need regex to extract the number component of the input string

# The singledispach decorator enables us to have the same name, but use
# different functions depending on the datatype of the first argument.
#
# In our case we want one function to deal with a single string input, and
# another to handle a list of strings. It could all be handled in a single
# function using nested if, but singledispatch makes it less messy and more fun!


# Here is the main function, it is just the name and an error message if the
# argument does not fit any of the inputs that wil be allowed

@singledispatch
def expand_hyphen(expr):
  """
  Expands codes expression(s) that have hyphens to list of all codes

  Args:
      code (str or list of str): String or list of strings to be expanded

  Returns:
      List of strings

  Examples:
      expand_hyphen('C00-C26')
      expand_hyphen('b01.1*-b09.9*')
      expand_hyphen('n02.2-n02.7')
      expand_hyphen('c00*-c260')
      expand_hyphen('b01-b09')
      expand_hyphen('b001.1*-b009.9*')
      expand_hyphen(['b001.1*-b009.9*', 'c11-c15'])
  Note:
      Unequal number of decimals in start and end code is problematic.
      Example: C26.0-C27.11 will not work since the meaning is not obvious:
      Is the step size 0.01? In which case C27.1 will not be included, while
      C27.10 will be (and traing zeros can be important in codes)
  """
  raise ValueError('The argument must be a string or a list')

# register the function to be used if the input is a string
@expand_hyphen.register(str)
def _(expr):
    # return immediately if nothing to expand
    if '-' not in expr:
      return [expr]

    lower, upper = expr.split('-')

    lower=lower.strip()

    # identify the numeric component of the code
    lower_str = re.search("\d*\.\d+|\d+", lower).group()
    upper_str = re.search("\d*\.\d+|\d+", upper).group()
    # note: what about european decimal notation?
    # also note: what if multiple groups K50.1J8.4-etc


    lower_num = int(lower_str.replace('.',''))
    upper_num = int(upper_str.replace('.','')) +1

    if upper_num<lower_num:
      raise ValueError('The start code cannot have a higher number than the end code')

    # remember length in case of leading zeros
    length = len(lower_str)

    nums = range(lower_num, upper_num)

    # must use integers in a loop, not floats
    # which also means that we must multiply and divide to get decimal back
    # and take care of leading and trailing zeros that may disappear
    if '.' in lower_str:
      lower_decimals = len(lower_str.split('.')[1])
      upper_decimals = len(upper_str.split('.')[1])
      if lower_decimals==upper_decimals:
        multiplier = 10**lower_decimals
        codes = [lower.replace(lower_str, format(num /multiplier, f'.{lower_decimals}f').zfill(length)) for num in nums]
      # special case: allow k1.1-k1.123, but not k.1-k2.123 the last is ambigious: should it list k2.0 only 2.00?
      elif (lower_decimals<upper_decimals) & (upper_str.split('.')[0]==lower_str.split('.')[0]):
        from_decimal = int(lower_str.split('.')[1])
        to_decimal = int(upper_str.split('.')[1]) +1
        nums = range(from_decimal, to_decimal)
        decimal_str = '.'+lower.split('.')[1]
        codes = [lower.replace(decimal_str, '.'+str(num)) for num in nums]
      else:
        raise ValueError('The start code and the end code do not have the same number of decimals')
    else:
        codes = [lower.replace(lower_str, str(num).zfill(length)) for num in nums]
    return codes


# register the function to be used if if the input is a list of strings
@expand_hyphen.register(list)
def _(expr):
  extended = []
  for word in expr:
    extended.extend(expand_hyphen(word))
  return extended

## expand star

In [None]:
#export
# A function to expand a string with star notation (K50*)
# to list of all codes starting with K50

@singledispatch
def expand_star(code, all_codes=None):
  """
  Expand expressions with star notation to a list of all values with the specified pattern

  Args:
    expr (str or list): Expression (or list of expressions) to be expanded
    all_codes (list) : A list of all codes

  Examples:
    expand_star('K50*', all_codes=icd9)
    expand_star('K*5', all_codes=icd9)
    expand_star('*5', all_codes=icd9)

  """
  raise ValueError('The argument must be a string or a list')

@expand_star.register(str)
def _(code, all_codes=None):
  # return immediately if there is nothing to expand
  if '*' not in code:
    return [code]

  start_str, end_str = code.split('*')

  if start_str and end_str:
    codes = {code for code in all_codes if (code.startswith(start_str) & code.endswith(end_str))}

  if start_str:
    codes = {code for code in all_codes if code.startswith(start_str)}

  if end_str:
    codes = {code for code in all_codes if code.endswith(end_str)}

  return sorted(list(codes))

@expand_star.register(list)
def _(code, all_codes=None):

  expanded=[]
  for star_code in code:
    new_codes = expand_star(star_code, all_codes=all_codes)
    expanded.extend(new_codes)

  # uniqify in case some overlap
  expanded = list(set(expanded))

  return sorted(expanded)

## expand colon

In [None]:
#export
# function to get all codes in a list between the specified start and end code
# Example: Get all codes between K40:L52

@singledispatch
def expand_colon(code, all_codes=None):
  raise ValueError('The argument must be a string or a list')

@expand_colon.register(str)
def _(code, all_codes=None):
  """
  Expand expressions with colon notation to a list of complete code names
  code (str or list): Expression (or list of expressions) to be expanded
  all_codes (list or array) : The list to slice from

  Examples
    K50:K52
    K50.5:K52.19
    A3.0:A9.3

  Note: This is different from hyphen and star notation because it can handle
  different code lengths and different number of decimals

  """
  if ':' not in code:
    return [code]

  startstr, endstr = code.split(':')

  # remove spaces
  startstr = startstr.strip()
  endstr =endstr.strip()

  # find start and end position
  startpos = all_codes.index(startstr)
  endpos = all_codes.index(endstr) + 1

  # slice list
  expanded = all_codes[startpos:endpos+1]

  return expanded


@expand_colon.register(list)
def _(code, all_codes=None, regex=False):
  expanded=[]

  for cod in code:
    new_codes = expand_colon(cod, all_codes=all_codes)
    expanded.extend(new_codes)

  return expanded

## expand regex

In [None]:
#export
# Return all elements in a list that fits a regex pattern

@singledispatch
def expand_regex(code, all_codes):
  raise ValueError('The argument must be a string or a list of strings')

@expand_regex.register(str)
def _(code, all_codes=None):
  code_regex = re.compile(code)
  expanded = {code for code in all_codes if code_regex.match(code)}
  # uniqify
  expanded = list(set(expanded))
  return expanded

@expand_regex.register(list)
def _(code, all_codes):
  expanded=[]

  for cod in code:
    new_codes = expand_regex(cod, all_codes=all_codes)
    expanded.extend(new_codes)

  # uniqify in case some overlap
  expanded = sorted(list(set(expanded)))

  return expanded

## expand code

In [None]:
#export
@singledispatch
def expand_code(code, all_codes=None,
                hyphen=True, star=True, colon=True, regex=False,
                drop_dot=False, drop_leading_zero=False,
                sort_unique=True, info=None):
  raise ValueError('The argument must be a string or a list of strings')

@expand_code.register(str)
def _(code, all_codes=None,
      hyphen=True, star=True, colon=True, regex=False,
      drop_dot=False, drop_leading_zero=False,
      sort_unique=True, info=None):
  #validating input
  if (not regex) and (':' in code) and (('-' in code) or ('*' in code)):
    raise ValueError('Notation using colon must start from and end in specific codes, not codes using star or hyphen')

  if regex:
    codes = expand_regex(code, all_codes=all_codes)
    return codes

  if drop_dot:
    code = del_dot(code)

  codes=[code]

  if hyphen:
    codes=expand_hyphen(code)
  if star:
    codes=expand_star(codes, all_codes=all_codes)
  if colon:
    codes=expand_colon(codes, all_codes=all_codes)

  if sort_unique:
    codes = sorted(list(set(codes)))

  return codes

@expand_code.register(list)
def _(code, all_codes=None, hyphen=True, star=True, colon=True, regex=False,
      drop_dot=False, drop_leading_zero=False,
      sort_unique=True, info=None):

  expanded=[]

  for cod in code:
    new_codes = expand_code(cod, all_codes=all_codes, hyphen=hyphen, star=star, colon=colon, regex=regex, drop_dot=drop_dot, drop_leading_zero=drop_leading_zero)
    expanded.extend(new_codes)

  # uniqify in case some overlap
  expanded = list(set(expanded))

  return sorted(expanded)

# a dict of names and codes (in a string or a list)
@expand_code.register(dict)
def _(code, all_codes=None, hyphen=True, star=True, colon=True, regex=False,
      drop_dot=False, drop_leading_zero=False,
      sort_unique=True, info=None):

  expanded={}

  for name, cod in code.items():
    if isinstance(cod,str):
        cod = [cod]
    expanded_codes=[]
    for co in cod:
        new_codes = expand_code(co, all_codes=all_codes, hyphen=hyphen, star=star, colon=colon, regex=regex, drop_dot=drop_dot, drop_leading_zero=drop_leading_zero)
        expanded_codes.extend(new_codes)
    expanded[name] = list(set(expanded_codes))

  return expanded

In [None]:
codes={'F3':'F3'}
all_codes=['G3', 'F3']
expand_code(codes, all_codes=all_codes)
cod=[]
cod.extend('H3')
cod
expand_code('F3', all_codes=all_codes)

['F3']

## expand columns

In [None]:
#export
@singledispatch
def expand_columns(expr, all_columns=None, df=None, star=True,
                   hyphen=True, colon=True, regex=None, info=None):
    """
    Expand columns with special notation to their full column names

    """
    raise ValueError('Must be str or list of str')

@expand_columns.register(str)
def _(expr, all_columns=None, df=None, star=True,
                   hyphen=True, colon=True, regex=None, info=None):
    notations = '* - :'.split()
    # return immediately if not needed
    if not any(symbol in expr for symbol in notations):
      return [expr]

    # get a list of columns of it is only implicity defined by the df
    # warning: may depreciate this, require explicit all_columns
    if df & (not all_columns):
      all_columns=list(df.columns)

    if regex:
      cols = [col for col in all_columns if re.match(regex, expr)]
    else:
      if hyphen:
        cols = expand_hyphen(expr)
      if star:
        cols = expand_star(expr, all_codes=all_columns)
      if colon:
        cols = expand_colon(expr, all_codes=all_columns)

    return cols

@expand_columns.register(list)
def _(expr, all_columns=None, df=None, star=True,
                   hyphen=True, colon=True, regex=None, info=None):
    all_columns=[]
    for col in expr:
        new_columns = expand_columns(col, all_columns=all_columns, df=df, star=star,
                       hyphen=hyphen, colon=colon, regex=regex, info=info)
        all_columns.extend(new_columns)
    return all_columns


# More helper functions


In [None]:
#export
def format_codes(codes, merge=True):
    """
    Makes sure that the codes has the desired format: a dict with strings as
    keys (name) and a list of codes as values)

    Background: For several functions the user is allower to use strings
    when there is only one element in the list, and a list when there is
    no code replacement or aggregations, or a dict. To avoid (even more) mess
    the input is standardised as soon as possible in a function.

    Examples:
            codes = '4AB02'
            codes='4AB*'
            codes = ['4AB02', '4AB04', '4AC*']
            codes = ['4AB02', '4AB04']
            codes = {'tumor' : 'a4*', 'diabetes': ['d3*', 'd5-d9']}
            codes = 'S72*'
            codes = ['K50*', 'K51*']

            _format_codes(codes, merge=False)

    TODO: test for correctness of input, not just reformat (is the key a str?)
    """
    codes = _listify(codes)

    # treatment of pure lists depends on whether special classes should be treated as one merged group or separate codes
    # exmple xounting of Z51* could mean count the total number of codes with Z51 OR a shorthand for saying "count all codes starting with Z51 separately
    # The option "merged, enables the user to switch between these two interpretations

    if isinstance(codes, list):
        if merge:
            codes = {'_'.join(codes): codes}
        else:
            codes = {code: [code] for code in codes}

    elif isinstance(codes, dict):
        new_codes = {}
        for name, codelist in codes.items():
            if isinstance(codelist, str):
                codelist = [codelist]
            new_codes[name] = codelist
        codes = new_codes

    return codes

## reverse dict

In [None]:
def reverse_dict(dikt):
  """
  each value in the list of values in the dict become keys in a new dict
  """
  new_dict = {}
  for name, codelist in dikt.items():
      codelist = listify(codelist)
      new_dict.update({code: name for code in codelist})
  return new_dict

In [None]:
#export
def _expand_regex(expr, full_list):
    exprs = _listify(expr)

    expanded = []

    if isinstance(full_list, pd.Series):
        pass
    elif isinstance(full_list, list):
        unique_series = pd.Series(full_list)
    elif isinstance(full_list, set):
        unique_series = pd.Series(list(full_list))

    for expr in exprs:
        match = unique_series.str.contains(expr)
        expanded.extend(unique_series[match])
    return expanded

In [None]:
def persons_with(df,
                 codes,
                 cols,
                 pid='pid',
                 sep=None,
                 merge=True,
                 first_date=None,
                 last_date=None,
                 group=False,
                 _fix=True):
    """
    Determine whether people have received a code

    Args:
        codes (list or dict): codes to mark for
            codes to search for
                - if list: each code will represent a column
                - if dict: the codes in each item will be aggregated to one indicator
            cols (str or list of str): Column(s) with the codes
            pid (str): colum with the person identifier
            first_date (str): use only codes after a given date
                the string either represents a date (same for all individuals)
                or the name of a column with dates (may be different for different individuals)
            last_date (str): only use codes after a given date
                the string either represents a date (same for all individuals)
                or the name of a column with dates (may be different for different individuals)

    Returns:
        Series or Dataframe


    Examples:
        fracture = persons_with(df=df, codes='S72*', cols='icdmain')
        fracture = persons_with(df=df, codes={'frac':'S72*'}, cols='icdmain')

    Todo:
        - function may check if pid_index is unique, in which it does not have to aggregate
        - this may apply in general? functions that work on event data may then also work on person level data
        - allow user to input person level dataframe source?
    """
    sub = df

    if _fix:
        df, cols = _to_df(df=df, cols=cols)
        codes, cols, allcodes, sep = _fix_args(df=df, codes=codes, cols=cols, sep=sep, merge=merge, group=group)
        rows = get_rows(df=df, codes=allcodes, cols=cols, sep=sep, _fix=False)
        sub = df[rows]

    df_persons = sub.groupby(pid)[cols].apply(lambda s: pd.unique(s.values.ravel()).tolist()).astype(str)

    # alternative approach, also good, and avoids creaintg personal dataframe
    # but ... regeis is fast since it stopw when it finds one true code!
    #    c=df.icdbi.str.split(', ', expand=True).to_sparse()
    #    c.isin(['S720', 'I10']).any(axis=1).any(level=0)

    persondf = pd.DataFrame(index=df[pid].unique().tolist())
    for name, codes in codes.items():
        codes_regex = '|'.join(codes)
        persondf[name] = df_persons.str.contains(codes_regex, na=False)

    return persondf

# formatting an expression

## insert_external

In [None]:
#export
def insert_external(expr):
  """
  Replaces variables prefixed with @ in the expression with the
  value of the variable from the global namespace

  Example:
      x=['4AB02', '4AB04', '4AB06']
      expr = '@x before 4AB02'
      insert_external(expr)
  """
  externals = [word.strip('@') for word in expr.split() if word.startswith('@')]
  for external in externals:
      tmp = globals()[external]
      expr = expr.replace(f'@{external} ', f'{tmp} ')
  return expr

# Descriptive and analysis

## unique

In [None]:
#export
# A function to identify all unique values in one or more columns
# with one or multiple codes in each cell


def unique(df, cols=None, sep=None, all_str=True, info=None):
  """
  Lists unique values from one or more columns

  sep (str): separator if cells have multiple values
  all_str (bool): converts all values to strings

  unique(df=df, cols='inpatient', sep=',')
  """
  # if no column(s) are specified, find unique values in whole dataframe
  if cols==None:
    cols=list(df.columns)
  cols = listify(cols)
  
  # multiple values with separator in cells
  if sep:
    all_unique=set()
    for col in cols:
      new_unique = set(df[col].str.cat(sep=',').split(','))
      all_unique.update(new_unique)
  # single valued cells
  else:
    all_unique = pd.unique(df[cols].values.ravel('K'))

  # if need to make sure all elements are strings without surrounding spaces
  if all_str:
    all_unique=[str(value).strip() for value in all_unique]

  return all_unique

## count codes

In [None]:
#export
def count_codes(df, codes=None, cols=None, sep=None, normalize=False,
                ascending=False, fix=True, merge=False, group=False, dropna=True, all_codes=None, info=None):
    """
    Count frequency of values in multiple columns and columns with seperators

    Args:
        codes (str, list of str, dict): codes to be counted. If None, all codes will be counted
        cols (str or list of str): columns where codes are
        sep (str): separator if multiple codes in cells
        merge (bool): If False, each code wil be counted separately
            If True (default), each code with special notation will be counted together
        strip (bool): strip space before and after code before counting
        ignore_case (bool): determine if codes with same characters,
            but different cases should be the same
        normalize (bool): If True, outputs percentages and not absolute numbers
        dropna (bool): If True, codes not listed are not counted and ignored when calculating percentages

    allows
        - star notation in codes and columns
        - values in cells with multiple valules can be separated (if sep is defined)
        - replacement and aggregation to larger groups (when code is a dict)

    example
    To count the number of stereoid events (codes starting with H2) and use of
    antibiotics (codes starting with xx) in all columns where the column names
    starts with "atc":

    count_codes(df=df,
                 codes={'stereoids' : 'H2*', 'antibiotics' : =['AI3*']},
                 cols='atc*',
                 sep=',')

    more examples
    -------------

    df.count_codes(codes='K51*', cols='icd', sep=',')
    count_codes(df, codes='K51*', cols='icdm', sep=',', group=True)
    count_codes(df, codes='Z51*', cols=['icd', 'icdbi'], sep=',')
    count_codes(df, codes='Z51*', cols=['icdmain', 'icdbi'], sep=',', group=True)
    count_codes(df, codes={'radiation': 'Z51*'}, cols=['icd'], sep=',')
    count_codes(df, codes={'radiation': 'Z51*'}, cols=['icdmain', 'icdbi'], sep=',')
    count_codes(df, codes={'crohns': 'K50*', 'uc':'K51*'}, cols=['icdmain', 'icdbi'], sep=',')
    count_codes(df, codes={'crohns': 'K50*', 'uc':'K51*'}, cols=['icdmain', 'icdbi'], sep=',', dropna=True)
    count_codes(df, codes={'crohns': 'K50*', 'uc':'K51*'}, cols=['icdmain', 'icdbi'], sep=',', dropna=False)
    count_codes(df, codes={'crohns': 'K50*', 'uc':'K51*'}, cols=['icdmain', 'icdbi'], sep=',', dropna=False, group=False)
    count_codes(df, codes=['K50*', 'K51*'], cols=['icd'], sep=',', dropna=False, group=True, merge=False)
    count_codes(df, codes=['K50*', 'K51*'], cols=['icdmain', 'icdbi'], sep=',', dropna=False, group=False, merge=False)
    count_codes(df, codes=['K50*', 'K51*'], cols=['icdmain', 'icdbi'], sep=',', dropna=False, group=False, merge=True)
    count_codes(df, codes=['K50*', 'K51*'], cols=['icdmain', 'icdbi'], sep=',', dropna=True, group=True, merge=True)
    #group fasle, merge true, for list = wrong ...

    count_codes(df, codes=['K50*', 'K51*'], cols=['icdmain', 'icdbi'], sep=',', dropna=True, group=False, merge=False)


    """
    # preliminary formating
    if isinstance(df, pd.Series):
      df=df.to_frame()
      cols=list(df.columns)
      # maybe df[pid]=df.index

    if not codes:
      codes=unique(df=df, cols=cols, sep=sep, info=info)
      all_codes = list(set(codes))
    
    cols=expand_columns(cols, all_columns=list(df.columns))
    
    if not all_codes:
        all_codes = unique(df=df, cols=cols, sep=sep)
    
    old_codes=codes
    
    codes = expand_code(codes, all_codes=all_codes, info=info)

    if isinstance(old_codes, str) and (merge):
      codes = {old_codes:codes}
    elif isinstance(old_codes, str) and not (merge):
      codes = {code:code for code in codes}
    elif isinstance(old_codes, list) and (merge):
      codes = {str(old_codes): codes}
    elif isinstance(old_codes, list) and not (merge):
      codes = {code: code for code in codes}
    
    only_codes=[]
    for name, code in codes.items():
        code=listify(code)
        only_codes.extend(code)
        # prevent duplicates
        only_codes=list(set(only_codes))
        
    sub = df
    
    if dropna:
        rows = get_rows(df=sub, codes=only_codes, cols=cols, sep=sep, all_codes=all_codes)
        sub = sub[rows]

    if sep:
      count=Counter()
      for col in cols:
        codes_in_col = [code.strip() for code in sub[col].str.cat(sep=sep).split(sep)]
        count.update(codes_in_col)
      code_count=pd.Series(count)
    else:
        code_count = sub[cols].apply(pd.Series.value_counts).sum(axis=1)

    if codes:
        not_included_n = code_count[~code_count.isin(only_codes)].sum()
        code_count = code_count[only_codes]
        if not dropna:
            code_count['na'] = not_included_n

    if isinstance(codes, dict):
        code_count = code_count.rename(index=reverse_dict(codes)).sum(level=0)

    if normalize:
        code_n = code_count.sum()
        code_count = code_count / code_n
    else:
        code_count = code_count.astype(int)

    if ascending:
        code_count = code_count.sort_values(ascending=True)
    else:
        code_count = code_count.sort_values(ascending=False)

    return code_count

In [None]:
df=make_data()
df.head()

Unnamed: 0,pid,gender,birth_date,date,region,codes
1,1,female,1949-02-19,1959-11-04,east,"V42,O16"
2,2,male,1950-10-05,1952-07-13,south,J89
2,2,male,1950-10-05,1953-06-19,south,S87
2,2,male,1950-10-05,1956-01-30,south,J88
2,2,male,1950-10-05,1956-04-25,south,"T6,P14,B35,W83,Q5,K94,R63,Z16"


In [None]:
from collections import Counter

In [None]:
#count_codes(df=df, codes={'a':['G4*', 'C4*', 'c4'], 'b':'A4*'}, cols='codes', sep=',', merge=True, normalize=True)

In [None]:
count_codes(df=df, codes='A4*', cols='codes', sep=',')

   pid  gender birth_date       date region                          codes
1    1  female 1949-02-19 1959-11-04   east                        V42,O16
2    2    male 1950-10-05 1952-07-13  south                            J89
2    2    male 1950-10-05 1953-06-19  south                            S87
2    2    male 1950-10-05 1956-01-30  south                            J88
2    2    male 1950-10-05 1956-04-25  south  T6,P14,B35,W83,Q5,K94,R63,Z16 cols codes
{'A4': 'A4', 'A40': 'A40', 'A41': 'A41', 'A42': 'A42', 'A43': 'A43', 'A44': 'A44', 'A45': 'A45', 'A46': 'A46', 'A47': 'A47', 'A48': 'A48', 'A49': 'A49'}


A4     12
A48    10
A46     9
A47     9
A44     8
A43     8
A40     7
A42     7
A41     6
A49     5
A45     5
dtype: int32

In [None]:
#df.hrr3.count_codes(codes='A*', cols='codes', sep=',')
df.codes.hrrb.count_codes(sep=',')

G32    27
G14    25
G31    25
G41    23
G81    23
       ..
E65     1
H85     1
T4      1
M61     1
J67     1
Length: 2573, dtype: int32

In [None]:
df.codes.hrrb.count_codes(sep=',',)

In [None]:
str(['a', 'b'])

"['a', 'b']"

# find codes

In [None]:
#export
def lookup_codes(dikt, codes):
    """
    returns those elements in a dict where key starts with the expressions listed in codes

    todo: more complicated star notations: starts with, contains, endswith
    lookup(medcodes, 'L04*')

    """

    codes = _listify(codes)
    codes = [code.upper().strip('*') for code in codes]
    codes = tuple(codes)

    selected_codes = {k: v for k, v in dikt.items() if str(k).upper().startswith(codes)}
    return selected_codes


# %%
def get_codes(dikt, text):
    """
    returns those elements in a dict where value contains the expressions listed in codes

    todo: more complicated star notations: starts with, contains, endswith
    alterative name: find_codes? get_codes?

    example
    get all codes that have "steroid" in the explanatory text

        get_codes(medcodes, 'steroid*')

    """

    text = _listify(text)
    text = [txt.upper().strip('*') for txt in text]
    # codes = " ".join(codes)

    selected_codes = {k: v for k, v in dikt.items() if any(txt in str(v).upper() for txt in text)}

    return selected_codes

# Register functions

In [None]:
@pd.api.extensions.register_dataframe_accessor("hrr6")
class RegisterResearchAccessor:
    def __init__(self, df):
        self._df = df

    def count_codes(df, codes=None, cols=None, sep=None, normalize=False,
                ascending=False, fix=True, merge=False, group=False, dropna=True, all_codes=None, info=None):
        df=df._df
        result = count_codes(df=df, codes=codes, cols=cols, sep=sep, normalize=normalize,
                ascending=ascending, fix=fix, merge=merge, dropna=dropna, all_codes=all_codes, info=info)
        return result
      
    
@pd.api.extensions.register_series_accessor("hrr6")
class RegisterResearchAccessor:
    def __init__(self, df):
        self._df = df
    
    def count_codes(df, codes=None, cols=None, sep=None, normalize=False,
                ascending=False, fix=True, merge=False, group=False, dropna=True, all_codes=None, info=None):
        df=df._df
        result = count_codes(df=df, codes=codes, cols=cols, sep=sep, normalize=normalize,
                ascending=ascending, fix=fix, merge=merge, dropna=dropna, all_codes=all_codes, info=info)
        return result

In [None]:
  
@pd.api.extensions.register_series_accessor("hrrb")
class RegisterResearchAccessorSeries:
    def __init__(self, df):
        self._df = df
        
    def count_codes(df, **kwargs):
        df=df._df
        kwargs.update(df=df)
        result = count_codes(**kwargs)
        return result

In [None]:
#import nbdev

In [None]:
#from nbdev.sync import script2notebook 

In [None]:
from nbdev.export import *
notebook2script()

Converted 1_intro_make_data_notation.ipynb.
Converted index.ipynb.
Converted old_functions.ipynb.
Converted pattern_finder.ipynb.
Converted utilities.ipynb.
