In [1]:
# default_exp operators

from nbdev.showdoc import show_doc


In [2]:
#hide
#export
import pandas as pd
import numpy as np
import numexpr as ne

# Operators

In order to be able to capture the transformations required to normalize neuro data with different styles
we'll need a collection of operations to manage them.
In the abstract, these operations should manage grabbing the relevant data and then serving the result back in a standardized form.

In [3]:
#hide
#export
class AbstractOperation(object):

    fields = []
    result_fields = []

    @staticmethod
    def from_config(config):
        op_classes = [EquationOp,
                      AggregationOp,
                      ClipOp,
                      NormativeLookupOp,
                      CategoricalOp,
                      BinnedScalingOp,
                      EquationFilterOp]

        for op_class in op_classes:
            op = op_class.from_config(config)
            if op is not None:
                return op
        raise NotImplementedError(f'Did not understand type: {config["type"]}')
        #return None

    def process_single(self, row):
        raise NotImplementedError

    def explain(self, row):
        raise NotImplementedError


    def to_series(self, row):

        series = pd.Series(dict((field, row.get(field)) for field in self.fields))
        return series

    def __call__(self, row):

        res = self.process_single(row)
        yield self.result_fields[0], res

In [4]:
#hide

abs_op = AbstractOperation()
assert type(abs_op) == AbstractOperation

abs_op.fields = ['t1', 't2']
test_data = {'t1': 1, 't2': 2, 'other': 3}
ser = abs_op.to_series(test_data)
assert (ser['t1'] == 1) & (ser['t2'] == 2) & ('other' not in ser)


Again, we'll uss the BVMT test as the example.
But we're going to back up a step. Since there are a bunch of different intermediate values, I want to calculate those using operations.

Measured Values:
 - `trial1` - Trial 1 successes
 - `trial2` - Trial 2 successes
 - `trial3` - Trial 3 successes
 - `delay` - Delayed Successes
 - `hits` - Successful recognitions with distractors
 - `false_pos` - False-positive recognitions

Our goal is to define all of the operations required to calculated intermediate values (ie immediate)
as well as scaled values.

There are three derived values to calculate:
  - `immediate`: the sum of the three trials
  - `regonition`: the number of hits - false-positive recognitions
  - `retention`: ratio of delayed successes and largest of the trial 2 & trial 3 successes

The first two can be solved with basic equations.
The third will require an additional strategy effort.

## Basic Equations

In [5]:
#export
class EquationOp(AbstractOperation):
    "Manipulate values with 1numexpr1 equations."

    def __init__(self, out_field, equation, fields):
        """

        Parameters
        ----------
        out_field : str
        equation : str
        fields : list[str]
        """

        self.fields = fields
        self.equation = equation
        self.result_fields = [out_field]

    @staticmethod
    def from_config(config):
        """

        Expecting yaml of the format:
          type: equation
          equation: "hits-false_pos"
          fields: ['hits', 'false_pos']
          out_field: 'recognition'

        Parameters
        ----------
        config : dict

        Returns
        -------
        EquationOp
        """
        if config['type'] == 'equation':
            return EquationOp(config['out_field'],
                              config['equation'],
                              config['fields'])
        return None

    def explain(self, row):
        """

        Parameters
        ----------
        row : dict,pd.Series

        Returns
        -------
        str
        """

        res = self.process_single(row)
        return f'Used Equation: {self.equation} = {res} = {self.result_fields[0]}'

    def process_single(self, row):
        """ Apply the equation to the row

        Parameters
        ----------
        row : mapping

        Returns
        -------
        float

        """

        data = self.to_series(row)
        #print(data)
        if data.notnull().all():
            res = pd.eval(self.equation, local_dict=data.to_dict())
        else:
            res = np.nan
        return res

Let's imagine a individual to test.

Measured Values:
 - `trial1` - 5
 - `trial2` - 6
 - `trial3` - 7
 - `delay` - 8
 - `hits` - 6
 - `false_pos` - 2
 - `copy` - 12

Using the `EquationOp` let's calculate `immediate` and `recognition`.

In [6]:
DATA = {'trial1': 5, 'trial2': 6, 'trial3': 7,
        'delay': 8, 'hits': 6, 'false_pos': 2,
        'copy': 12}

total_op = EquationOp('immediate',
                      'trial1+trial2+trial3',
                      ['trial1', 'trial2', 'trial3'])
immed = total_op.process_single(DATA)
assert immed == 18
DATA['immediate'] = immed

We can also `explain` the result using the method.

In [7]:
print(total_op.explain(DATA))

Used Equation: trial1+trial2+trial3 = 18 = immediate


While one might construct these operations in Python code, I actually expect most things to be saved as yaml.
So, we need a way to represent this info in that format.
This is also useful when constructing larger sets.

Here's the yaml example for the recognition calculation.
```
type: equation
equation: "hits-false_pos"
fields: ['hits', 'false_pos']
out_field: 'recognition'
```

In [8]:
import yaml

st = """
type: equation
equation: "hits-false_pos"
fields: ['hits', 'false_pos']
out_field: 'recognition'
"""

ret_op = EquationOp.from_config(yaml.full_load(st))
recog = ret_op.process_single(DATA)

assert recog == 4
print(ret_op.explain(DATA))
DATA['recognition'] = recog


Used Equation: hits-false_pos = 4 = recognition


## Aggregation Operations

Due to limitations in numexpr, it cannot choose the largest of two numbers, as needed for retention.
So, we use an `AggregationOp`.


In [9]:
#export

class AggregationOp(AbstractOperation):

    def __init__(self, out_field, aggregation, fields):
        """

        Parameters
        ----------
        out_field : str
        aggregation : str
        fields : list[str]
        """

        self.fields = fields
        self.aggregation = aggregation
        self.result_fields = [out_field]

    @staticmethod
    def from_config(config):
        """
        Load from config. Expects:
            type: agg
            method: 'max'
            fields: ['trial2', 'trial3']
            out_field: retention_denom

        Parameters
        ----------
        config : dict

        Returns
        -------
        AggregationOp
        """
        if config['type'] == 'agg':
            return AggregationOp(config['out_field'],
                                 config['method'],
                                 config['fields'])
        return None

    def explain(self, row):
        res = self.process_single(row)
        return f'Aggregation: {self.aggregation} [{", ".join(self.fields)}]  = {res}'

    def process_single(self, row):
        data = self.to_series(row)
        return data.agg(self.aggregation)

In [10]:
ret_denom_op = AggregationOp('retention_denom', 'max', ['trial2', 'trial3'])
re_denom = ret_denom_op.process_single(DATA)

assert re_denom == 7
print(ret_denom_op.explain(DATA))
DATA['retention_denom'] = re_denom

Aggregation: max [trial2, trial3]  = 7


Now that we have the denominator we can do another equation to calculate recognition.

In [11]:
retent_op = EquationOp('retention', 'delay/retention_denom', ['delay', 'retention_denom'])
retent = retent_op.process_single(DATA)

assert retent == 8/7
print(retent_op.explain(DATA))
DATA['retention'] = retent

Used Equation: delay/retention_denom = 1.1428571428571428 = retention


Dang, the value was above 100%. And sometimes it may be negative.
By convention we clip these to a [0,1] scale, which we'll need an operation for that.


In [12]:
#export
class ClipOp(AbstractOperation):

    def __init__(self, field, lower = 0, upper=1):
        """

        Parameters
        ----------
        field : str
        lower : float
        upper : float
        """

        self.fields = [field]
        self.lower = lower
        self.upper = upper
        self.result_fields = [field]

    @staticmethod
    def from_config(config):
        """
        Load from config. Expects:
            type: clip
            field: retention
            lower: 0
            upper: 1
        Parameters
        ----------
        config

        Returns
        -------
        ClipOp

        """
        if config['type'] == 'clip':
            return ClipOp(config['field'],
                          lower = config['lower'],
                          upper = config['upper'])
        return None

    def explain(self, row):
        return f'Clipped {self.fields[0]} to [{self.lower}, {self.upper}]'

    def process_single(self, row):

        data = self.to_series(row)
        clipped = data.clip(lower=self.lower, upper=self.upper)
        return clipped[self.result_fields[0]]

In [13]:
ret_clip_op = ClipOp('retention', lower=0, upper=1)
ret_clip = ret_clip_op.process_single(DATA)

assert ret_clip == 1
print(ret_clip_op.explain(DATA))
DATA['retention'] = ret_clip

Clipped retention to [0, 1]


Now we've calculated all of the intermediate values.
We'll need to use these values, along with demographic data, to lookup a "healthy" normal distribution.

## Normative Lookups

These lookup tables are composed of lookup tables that index a mean and std given a demographic filter.
For example:

A 32 year old should have an `immediate` memory of 26.9 with a std of 4.6.
Our example has an `immediate` of 18, a deficit of 9, roughly 2 stds.

In [14]:
#export

class NormativeLookupOp(AbstractOperation):
    """Lookup table with normalized scores."""

    def __init__(self, lookup_table, filter_cols, measure_col, out_name):

        self.lookup_table = lookup_table
        self.filter_cols = filter_cols
        self.fields = filter_cols + [measure_col]
        self.result_fields = [out_name]
        self.measure_col = measure_col

    @staticmethod
    def from_config(config):

        if config['type'] == 'normative_lookup':

            return NormativeLookupOp(config['table'],
                                     config['filter_cols'],
                                     config['measure_col'],
                                     config['out_name'])
        return None

    def lookup_norm(self, row):

        data = self.to_series(row)
        for filt in self.lookup_table:
            if ne.evaluate(filt['filter'], local_dict=data):
                return filt['filter'], filt['mean'], filt['std']

        return None, None, None

    def explain(self, row):

        flt, mean, std = self.lookup_norm(row)
        data = self.to_series(row)


        if flt is None:
            data = self.to_series(row)
            return f'{self.result_fields[0]}: Could not find matching filter for {data[self.filter_cols]}'
        else:
            z = (data[self.measure_col] - mean)/std
            return f'{self.result_fields[0]}: Matched {flt}, Expecting {mean} +- {std}, Observed: {data[self.measure_col]}, Z: {z}'

    def process_single(self, row):

        data = self.to_series(row)
        _, mean, std = self.lookup_norm(data)
        if mean is None:
            return np.nan
        return (data[self.measure_col] - mean)/std

While it is possible to create these in Python, it's much easier to build using yaml definitions.

```
type: normative_lookup
measure_col: immediate
filter_cols: ['age']
out_name: 'heaton_immediate'
table:
  - filter: (18 <= age) & (age <= 21)
    mean: 28.74
    std: 4.32
  - filter: (20 <= age) & (age <= 23)
    mean: 28.44
    std: 4.38
  ...

```

The filters are anything acceptable to `pd.eval`.
The Heaton norms for the BVMT are currently in `data/norms/from_kate/heaton_bvmt.yaml`

In [15]:
DATA['age'] = 32
bvmt_config = yaml.full_load(open('data/norms/from_kate/heaton_bvmt.yaml'))

lookup_op = NormativeLookupOp.from_config(bvmt_config['operations'][0])
lookup_score = lookup_op.process_single(DATA)

assert lookup_score == -1.9224137931034488
print(lookup_op.explain(DATA))


Matched (30 <= age) & (age <= 33), Expecting 26.92 +- 4.64


The collection of these operators can be combined into a `TestCalculator` which manages applying these operations sequentially.
That is discussed elsewhere.

These features are sufficient for any analysis that requires looking up normalizations based on demographic information.
However, for regression based norms like the `Norman` set we need a further collection of operators.

## Regression Based Norms

When doing regression based normalization the first step is to `scale` the raw values based on a set of bins.
This is done to help _normalize_ the raw values before entering the regression equation.

For example. When scaling the `delay` column, the `norman` scheme uses:

| Raw | Scaled |
|-----|--------|
| 12  | 14
| 11  | 11
| 10  | 9
| 9   | 8
| 8   | 7
| 7   | 6
| 5   | 5
| 4   | 4
| 3   | 3
| 0   | 2

The `BinnedScalingOp` can be used to deal with these conditions.

In [16]:
# export

class BinnedScalingOp(AbstractOperation):
    def __init__(self, bins, measure_col, out_field = None):


        self.fields = [measure_col]
        if out_field is None:
            self.result_fields = [measure_col+'_scaled']
        else:
            self.result_fields = [out_field]
        self.bins = sorted(bins, key = lambda x: x['min'],
                           reverse=True)

    @staticmethod
    def from_config(config):
        """
        Build from config, Expecting yaml like:
          type: binned_scaling
          measure_col: delay
          bins:
            - scaled: 14
              min: 12
            - scaled: 11
              min: 11
            - scaled: 9
              min: 10

        Parameters
        ----------
        config

        Returns
        -------
        BinnedScalingOp

        """

        if config['type'] == 'binned_scaling':
            return BinnedScalingOp(config['bins'],
                                   config['measure_col'])
        return None

    def lookup_bin(self, row):

        data = self.to_series(row)
        val = data[self.fields[0]]
        if val == val:
            for bin in self.bins:
                if val >= bin['min']:
                    return bin['min'], bin['scaled']
            return np.nan, np.nan
        else:
            return np.nan, np.nan

    def explain(self, row):

        edge, scaled = self.lookup_bin(row)

        if edge != edge:
            data = self.to_series(row)
            return f'Could not find matching bin for {data[self.fields[0]]}'
        else:
            return f'{self.fields[0]} matched {edge}, scaled to {scaled}'

    def process_single(self, row):
        _, res = self.lookup_bin(row)
        return res

In [17]:
bins = [{'min': 12, 'scaled': 14},
        {'min': 11, 'scaled': 11},
        {'min': 10, 'scaled': 9},
        {'min': 9, 'scaled':  8},
        {'min': 8, 'scaled': 7},
        {'min': 7, 'scaled': 6},
        {'min': 5, 'scaled': 5},
        {'min': 4, 'scaled': 4},
        {'min': 3, 'scaled': 3},
        {'min': 0, 'scaled': 2}]

scale_op = BinnedScalingOp(bins, 'delay')
delay_scaled = scale_op.process_single(DATA)

assert delay_scaled == 7
print(scale_op.explain(DATA))

DATA['delay_scaled'] = delay_scaled

delay matched 8, scaled to 7


Now that we have the scaled value we need to handle the demographic variables.
For the `norman` set Male gender is set to 1 with females as 0.
The race also needs to be converted with white = 0 and AA = 1.


In [18]:
#export

class CategoricalOp(AbstractOperation):

    def __init__(self, measure_col, mapping, out_col):

        self.result_fields = [out_col]
        self.mapping = mapping
        self.fields = [measure_col]

    @staticmethod
    def from_config(config):
        """
        Build from config. Expects yaml like:
          type: categorical
          in_field: gender
          out_field: norman_gender
          mapping:
            male: 0
            female: 1

        Parameters
        ----------
        config : dict

        Returns
        -------

        """

        if config['type'] == 'categorical':
            return CategoricalOp(config['in_field'],
                                 config['mapping'],
                                 config['out_field'])
        return None

    def lookup(self, row):

        data = self.to_series(row)
        return self.mapping.get(data[self.fields[0]])

    def process_single(self, row):

        return self.lookup(row)

    def explain(self, row):

        res = self.lookup(row)
        if res is not None:
            return f'{self.fields[0]}:{row[self.fields[0]]} -> {self.result_fields[0]}:{res}'
        else:
            return f'Could not match {self.fields[0]}:{row[self.fields[0]]}'

In [19]:
cat_op = CategoricalOp('gender', {'male': 0, 'female': 1}, 'norman_gender')
DATA['gender'] = 'male'

norman_gender = cat_op.process_single(DATA)

assert norman_gender == 0
print(cat_op.explain(DATA))
DATA['norman_gender'] = norman_gender

gender:male -> norman_gender:0


Now for the big finale, regression based norms.
After scaling the relevant data and handling categorical variables we need to apply an equation.
However, the equation changes depending on the individual's demographic variables.
One for african americans, one for caucasians, and a different one for spanish speakers.
The `EquationFilterOp` takes care of these intricacies.

In [20]:
#export
class EquationFilterOp(AbstractOperation):

    def __init__(self, fields, regressions, out_field, result_type = 'zscale'):

        self.regressions = regressions
        self.fields = fields
        self.result_fields = [out_field]
        self.result_type = result_type

    @staticmethod
    def from_config(config):
        if config['type'] == 'equation_filter':
            return EquationFilterOp(config['fields'],
                                    config['equations'],
                                    config['out_field'],
                                    result_type = config['result_type'])

        return None

    def search_filters(self, row):

        data = self.to_series(row)
        check_func = lambda reg: pd.eval(reg['filter'], local_dict=data.to_dict())
        return [reg for reg in self.regressions if check_func(reg)]

    def scale_data(self, row):

        data = self.to_series(row)
        hits = self.search_filters(row)
        if hits: #Currently only implementing "first"
            reg = hits[0]
            val = pd.eval(reg['norm'], local_dict=data.to_dict())

            if (self.result_type == 'standard_score') | (self.result_type == 'tscore'):
                val = (val - 50)/10
            elif (self.result_type  == 'zscore') | (self.result_type  == 'zscale'):
                pass
            elif self.result_type == 'other':
                pass
            else:
                raise ValueError(f'Did not understand result_type: {self.result_type}')

            return reg, val
        return None, None


    def explain(self, row):

        data = self.to_series(row)
        reg, val = self.scale_data(row)

        if reg is None:
            return 'Could not find a match for regression normalization.'
        else:
            return f'Matched {reg["filter"]}, applied {reg["norm"]} = {float(val)}'


    def process_single(self, row):

        _, val = self.scale_data(row)
        return val

These are best explained through their yaml imports.
Examine the `data/norms/norman/norman_bvmt_regnorm.yaml` for a complete example.
