# Setup

Initial module setup.

In [93]:
import numpy as np
import pandas as pd
import dataclasses
import enum
import typing
import random
import auth_biohash.bio_hash
import auth_biohash.random_token
import feature_encoding.base
import feature_encoding.threshold

from eeg_auth_models_framework import data, pre_process, features, processor
from eeg_auth_models_framework.utils import conversion

# Constants

In [94]:
AUTHENTICATION_THRESHOLDS = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
DATASET_SAMPLE_FREQ_HZ = 200
DATA_CHANNEL_NAMES = ['T7','F8','Cz','P4']
FREQUENCIES = [
    pre_process.FrequencyBand(lower=8.0, upper=12.0, label='Alpha'),
    pre_process.FrequencyBand(lower=12.0, upper=35.0, label='Beta'),
    pre_process.FrequencyBand(lower=4.0, upper=8.0, label='Theta'),
    pre_process.FrequencyBand(lower=35.0, upper=None, label='Gamma'),
    pre_process.FrequencyBand(lower=None, upper=None, label='Raw'),
]
WINDOW_SIZE = 1200
WINDOW_OVERLAP = 0
BINARY_THRESHOLD = 50
RANDOM_SEED = 100000000000
RANDOM_GENERATOR = random.Random(RANDOM_SEED)

# Utilities

In [95]:
class TestResultType(enum.Enum):
    TRUE_POSITIVE = enum.auto()
    FALSE_POSITIVE = enum.auto()
    FALSE_NEGATIVE = enum.auto()
    TRUE_NEGATIVE = enum.auto()


@dataclasses.dataclass
class TestResultsSummary:
    true_positives: int = 0
    false_positives: int = 0
    false_negatives: int = 0
    true_negatives: int = 0
    
    @classmethod
    def merge_summaries(cls, 
                        summary_a: 'TestResultsSummary', 
                        summary_b: 'TestResultsSummary') -> 'TestResultsSummary':
        return TestResultsSummary(
            true_positives=summary_a.true_positives + summary_b.true_positives,
            false_positives=summary_a.false_positives + summary_b.false_positives,
            false_negatives=summary_a.false_negatives + summary_b.false_negatives,
            true_negatives=summary_a.true_negatives + summary_b.true_negatives
        )
    
    def increment_count(self, result_type: TestResultType):
        if result_type == TestResultType.TRUE_POSITIVE:
            self.true_positives += 1
        elif result_type == TestResultType.FALSE_POSITIVE:
            self.false_positives += 1
        elif result_type == TestResultType.FALSE_NEGATIVE:
            self.false_negatives += 1
        else:
            self.true_negatives += 1
    
    @property
    def accuracy(self) -> float:
        hits = self.true_positives + self.true_negatives
        total = (
            self.true_positives + self.true_negatives + 
            self.false_positives + self.false_negatives
        )
        return hits / total

    @property
    def false_accept_rate(self) -> float:
        return self.false_positives / (self.false_positives + self.true_negatives)
    
    @property
    def false_reject_rate(self):
        return self.false_negatives / (self.false_negatives + self.true_positives)


@dataclasses.dataclass
class HashTest:
    expected_result: bool
    threshold: float
    if_expected: TestResultType
    if_unexpected: TestResultType
    hashes: typing.Tuple[auth_biohash.bio_hash.BioHash, auth_biohash.bio_hash.BioHash]
    
    def run_test(self) -> TestResultType:
        result = auth_biohash.bio_hash.BioHash.compare(
            self.hashes[0],
            self.hashes[1]
        )
        is_match = result <= self.threshold
        if is_match != self.expected_result:
            return self.if_unexpected
        return self.if_expected


@dataclasses.dataclass
class ThresholdTestSet:
    threshold: str
    template_hash: auth_biohash.bio_hash.BioHash
    positive_cases: typing.List[auth_biohash.bio_hash.BioHash]
    negative_cases: typing.List[auth_biohash.bio_hash.BioHash]


@dataclasses.dataclass
class SubjectTestSet:
    subject_id: str
    threshold_tests: typing.List[ThresholdTestSet]

# Configuration

In [96]:
downloader = data.AuditoryDataDownloader()
reader = data.AuditoryDataReader()
converter = conversion.MNEDataFrameConverter(
    channels=DATA_CHANNEL_NAMES, 
    sample_frequency=DATASET_SAMPLE_FREQ_HZ
)

# Data Processing Setup

## Template Hash Processor

### Pre-Processing Steps

In [97]:
template_pre_process_steps = pre_process.PreProcessingPipeline([
    pre_process.EEGBandpassFilterStep(
        FREQUENCIES,
        converter
    )
])

### Feature Extraction Steps

In [98]:
template_feature_extraction_steps = features.FeatureExtractPipeline([
    features.StatisticalFeatureExtractor([
        features.StatisticalFeature.MIN,
        features.StatisticalFeature.MAX,
        features.StatisticalFeature.MEAN,
        features.StatisticalFeature.ZERO_CROSSING_RATE
    ])
])

### Processor

In [99]:
template_data_processor = processor.DataProcessor(
    pre_process=template_pre_process_steps,
    feature_extraction=template_feature_extraction_steps
)

## Sample Hash Processor

### Pre-Processing Steps

In [100]:
sample_pre_process_steps = pre_process.PreProcessingPipeline([
    pre_process.EEGBandpassFilterStep(
        FREQUENCIES,
        converter
    ),
    pre_process.DataWindowStep(WINDOW_SIZE, WINDOW_OVERLAP)
])

### Feature Extraction Steps

In [101]:
sample_feature_extraction_steps = features.FeatureExtractPipeline([
    features.StatisticalFeatureExtractor([
        features.StatisticalFeature.MIN,
        features.StatisticalFeature.MAX,
        features.StatisticalFeature.MEAN,
        features.StatisticalFeature.ZERO_CROSSING_RATE
    ])
])

### Processor

In [102]:
sample_data_processor = processor.DataProcessor(
    pre_process=sample_pre_process_steps,
    feature_extraction=sample_feature_extraction_steps
)

# Subject Data

In [103]:
data_path = downloader.retrieve()
subject_data_map = reader.format_data(data_path)

## Token Setup

In [104]:
subject_tokens_map = {subject: auth_biohash.random_token.generate_token() for subject in subject_data_map}

## Template Hash Setup

### Processing

In [105]:
processed_template_data_map = {subject: template_data_processor.process(subject_data_map[subject]) for subject in subject_data_map}

Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=40114
    Range : 0 ... 40113 =      0.000 ...   200.565 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ..

### Hashing

In [106]:
SubjectHashesMap = typing.Dict[str, typing.Dict[str, typing.List[auth_biohash.bio_hash.BioHash]]]
SubjectTemplateHashesMap = typing.Dict[str, typing.Dict[str, auth_biohash.bio_hash.BioHash]]
ThresholdHashesMap = typing.Dict[str, typing.List[auth_biohash.bio_hash.BioHash]]
TemplateHashesMap = typing.Dict[str, auth_biohash.bio_hash.BioHash]

In [107]:
def make_map_of_threshold_hashes(vectors_to_hash: typing.List[np.ndarray], 
                                 token: str, 
                                 encoder: feature_encoding.base.BinaryEncoder) -> ThresholdHashesMap:
    result = {}
    for threshold in AUTHENTICATION_THRESHOLDS:
        result[str(threshold)] = [
            auth_biohash.bio_hash.BioHash.generate_hash(vector, token, encoder)
            for vector in vectors_to_hash
        ]
    return result


def iter_template_hashes(template_data_map: typing.Dict[str, typing.List[np.ndarray]],
                         tokens_map: typing.Dict[str, str],
                         encoder: feature_encoding.base.BinaryEncoder) -> typing.Iterator[typing.Tuple[str, TemplateHashesMap]]:
    for subject in template_data_map:
        token = tokens_map[subject]
        template_hashes = make_map_of_threshold_hashes(template_data_map[subject], token, encoder)
        normalized_hashes_map: TemplateHashesMap = {}
        for threshold in template_hashes:
            hashes_list = template_hashes[threshold]
            if len(hashes_list) != 1:
                print(f'[warning] Multiple hashes for subject {subject}, should be only 1.')
                continue
            normalized_hashes_map[threshold] = hashes_list[0]
        yield subject, normalized_hashes_map

## Sample Hash Setup

### Processing

In [108]:
processed_data_map = {subject: sample_data_processor.process(subject_data_map[subject]) for subject in subject_data_map}

Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=40114
    Range : 0 ... 40113 =      0.000 ...   200.565 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ...   119.995 secs
Ready.
Creating RawArray with float64 data, n_channels=4, n_times=24000
    Range : 0 ... 23999 =      0.000 ..

### Hashing

In [109]:
ThresholdHashesMap = typing.Dict[str, typing.List[auth_biohash.bio_hash.BioHash]]

In [110]:
def iter_sample_hashes(data_map: typing.Dict[str, typing.List[np.ndarray]], 
                       tokens_map: typing.Dict[str, str], 
                       encoder: feature_encoding.base.BinaryEncoder) -> typing.Iterator[typing.Tuple[str, ThresholdHashesMap]]:
    for subject in data_map:
        token = tokens_map[subject]
        hashes_map = make_map_of_threshold_hashes(data_map[subject], token, encoder)
        yield subject, hashes_map

# Test Set Assembly

## Gathering Test Sets

In [111]:
def iter_subject_test_sets(template_data: typing.Dict[str, typing.List[np.ndarray]],
                           sample_data: typing.Dict[str, typing.List[np.ndarray]],
                           tokens_map: typing.Dict[str, str],
                           encoder: feature_encoding.base.BinaryEncoder) -> typing.Iterator[SubjectTestSet]:
    for subject, template_hashes in iter_template_hashes(template_data, tokens_map, encoder):
        threshold_test_sets: typing.Dict[str, ThresholdTestSet] = {
            str(threshold): ThresholdTestSet(
                threshold=threshold, template_hash=template_hashes[str(threshold)],
                positive_cases=[], negative_cases=[]
            )
            for threshold in AUTHENTICATION_THRESHOLDS
        }
        for sample_subject, sample_hashes in iter_sample_hashes(sample_data, tokens_map, encoder):
            for threshold in sample_hashes:
                test_set = threshold_test_sets[threshold]
                if subject == sample_subject:
                    test_set.positive_cases.extend(sample_hashes[threshold])
                else:
                    test_set.negative_cases.extend(sample_hashes[threshold])
        yield SubjectTestSet(
            subject_id=subject,
            threshold_tests=list(threshold_test_sets.values())
        )

## Generating Hash Tests

In [112]:
def make_hash_tests(test_set: ThresholdTestSet) -> typing.List[HashTest]:
    tests = []
    # Use the minimum to ensure that the same amount of tests are possible from both populations
    # (there are more than likely more negative cases than positive ones)
    sample_size = min(len(test_set.positive_cases), len(test_set.negative_cases))
    should_match_cases: typing.List[auth_biohash.bio_hash.BioHash] = RANDOM_GENERATOR.sample(test_set.positive_cases, sample_size)
    should_not_match_cases: typing.List[auth_biohash.bio_hash.BioHash] = RANDOM_GENERATOR.sample(test_set.negative_cases, sample_size)
    for case in should_match_cases:
        tests.append(
            HashTest(
                expected_result=True, 
                hashes=(test_set.template_hash, case),
                threshold=float(test_set.threshold),
                if_expected=TestResultType.TRUE_POSITIVE,
                if_unexpected=TestResultType.FALSE_NEGATIVE
            )
        )
    for case in should_not_match_cases:
        tests.append(
            HashTest(
                expected_result=False, 
                hashes=(test_set.template_hash, case),
                threshold=float(test_set.threshold),
                if_expected=TestResultType.TRUE_NEGATIVE,
                if_unexpected=TestResultType.FALSE_POSITIVE
            )
        )
    return tests


def iter_hash_tests(template_data: typing.Dict[str, typing.List[np.ndarray]],
                    sample_data: typing.Dict[str, typing.List[np.ndarray]],
                    tokens_map: typing.Dict[str, str],
                    encoder: feature_encoding.base.BinaryEncoder) -> typing.Iterator[typing.Tuple[str, typing.List[HashTest]]]:
    for subject_test_set in iter_subject_test_sets(template_data, sample_data, tokens_map, encoder):
        for threshold_test_data in subject_test_set.threshold_tests:
            yield threshold_test_data.threshold, make_hash_tests(threshold_test_data)

# Execute Tests

In [113]:
def run_hash_tests(hash_tests: typing.List[HashTest]) -> TestResultsSummary:
    summary = TestResultsSummary()
    for test in hash_tests:
        result_type = test.run_test()
        summary.increment_count(result_type)
    return summary

In [114]:
test_results_map = {}
threshold_encoder = feature_encoding.threshold.ThresholdBinaryEncoder(BINARY_THRESHOLD)
hash_test_args = (
    processed_template_data_map, processed_data_map, subject_tokens_map, threshold_encoder
)
for threshold_type, hash_test_data in iter_hash_tests(*hash_test_args):
    if threshold_type not in test_results_map:
        test_results_map[threshold_type] = TestResultsSummary()
    test_results_map[threshold_type] = TestResultsSummary.merge_summaries(
        test_results_map[threshold_type],
        run_hash_tests(hash_test_data)
    )

In [115]:
data_results = []
for threshold_type, result_summary in test_results_map.items():
    data_results.append([
        threshold_type, result_summary.false_accept_rate, 
        result_summary.false_reject_rate, result_summary.accuracy
    ])
test_results = pd.DataFrame(
    data_results, columns=['Threshold', 'FAR', 'FRR', 'Accuracy']
)
test_results

Unnamed: 0,Threshold,FAR,FRR,Accuracy
0,0.1,0.0,0.157385,0.921308
1,0.2,0.0,0.077482,0.961259
2,0.3,0.0,0.077482,0.961259
3,0.4,0.038741,0.0,0.98063
4,0.5,0.527845,0.0,0.736077
5,0.6,0.96368,0.0,0.51816
6,0.7,0.997579,0.0,0.501211
7,0.8,1.0,0.0,0.5
8,0.9,1.0,0.0,0.5
