In [8]:
import os
from abc import ABC, abstractmethod
from collections import defaultdict
from pprint import pprint, pformat

from graph import Graph
import pandas
from pandas_profiling import ProfileReport

pandas.options.display.max_colwidth = 0

from kf_lib_data_ingest.common.io import (
    read_df
)
from kf_lib_data_ingest.common.concept_schema import (
    CONCEPT,
    DELIMITER,
    concept_from,
    concept_attr_from
)

SOURCE_DATA_DIR = os.path.abspath('data')
TEST_DATA_DIR = os.path.join(SOURCE_DATA_DIR, 'test')
EXTRACT_DATA_DIR = os.path.join('./output/ExtractStage/')

In [9]:
def display_dfs(df_dict):
    for key, df in df_dict.items():
        print(f'Dataframe {key}')
        display(df)
        
def load_data(input_dir, enable_display=False):
    out_dfs = {}
    for fn in os.listdir(input_dir):
        fp = os.path.join(input_dir, fn)
        ext = os.path.splitext(fn)[-1]
        if (os.path.isfile(fp) and (not fn.startswith('.')) and (ext not in {'.zip', '.json'})):
            out_dfs[fn] = read_df(os.path.join(input_dir, fn))
    if enable_display:
        print(f'Loading {fn} into df')
        display_dfs(out_dfs)

    return out_dfs

# Read in source data files into dict
# print('Source data files ....')
# source_data_dfs = load_data(SOURCE_DATA_DIR)

print('\nExtracted data files ....')
# Read in extracted data files into dict
extract_data_dfs = load_data(EXTRACT_DATA_DIR)

# Test dataframes
print('\nTest data files ....')
test_data_dfs = load_data(TEST_DATA_DIR, enable_display=False)


Extracted data files ....

Test data files ....


In [13]:
# TODO
# - Add enable/disable param for test params
# - Add one custom test class and instance
# - Add type safety to all classes
# - Convert print statements to logging statements

def result_to_emoji(result):
    result = int(result)
    emap = {
        0: '❌',
        1: '✅',
        2: '☑️'
    }
    return emap[result]

def validate_count(count, min_count, max_count):
    if count < min_count:
        return False
    if (max_count is not None) and (count > max_count):
        return False
    return True

def stack_dfs(df_dict, left, right=None):
    cumulative_df = None
    for fn, df in df_dict.items():
        # Strip whitespace from column names
        df.rename(columns=lambda x: x.strip(), inplace=True)

        # Select subset
        if left not in df.columns:
            continue
        extract_cols = set([left, right]) & set(df.columns)
        df = df[extract_cols]

        # Re-order columns for debugging
        if len(df.columns) > 1:
            df = df[[left, right]]
        
        # Drop rows where value of left col is null    
        df = df[(df[left] != '') & (df[left].notnull())]
        
        # Stack dfs
        if cumulative_df is None:
            cumulative_df = df

        cumulative_df = pandas.concat([cumulative_df, df])
    
    # Drop duplicates
    try:
        cumulative_df.drop_duplicates(inplace=True)
    except AttributeError:
        pass
    
    return cumulative_df

def direct_connections(df_dict, left_col, right_col):
    """
    Given an input pandas.DataFrame consisting of two columns, 
    left_col and right_col, produce a DataFrame that represents groups of 
    right_col values associated with each unique left_col value. 
    Then add another column that captures the number of unique items in a group.

    This DataFrame represents a graph showing direct connections between
    values of left_col and values of right_col.

    Example output:
        | PARTICIPANT.ID |    FAMILY.ID    | count |
        |----------------|-----------------|-------|
        |     P1         |    {F1, F2}     |   2   |
        |     P2         |  {F1, F1, F2 }  |   2   |
    """
    
    # Stack dfs     
    df = stack_dfs(df_dict, left_col, right_col)
    
    # Create groups     
    diff = set(df.columns) ^ set([left_col, right_col])
    if (df is not None) and (not diff):
        gdf = (
            df.groupby([left_col])[right_col]
            .apply(set)
            .reset_index(name=right_col)
        )
        
        def remove_nulls(linked_nodes):
            return {
                n for n in linked_nodes
                if pandas.notnull(n) and n
            }
        gdf[right_col] = gdf[right_col].apply(lambda linked_nodes: remove_nulls(linked_nodes))
        
        gdf['linked_node_count'] = gdf[right_col].apply(
            lambda linked_nodes: len(linked_nodes)
        )
    else:
        gdf = None

    return gdf

def file_locations(error_value, df_dict):
    locs = set()
    for fn, df in df_dict.items():
        if any(df.isin([str(error_value)]).any(1).to_list()): 
            locs.add(fn)
    return (error_value, locs)

class ValidationTest(ABC):
    def __init__(self, name, description, test_type, test_params, id=None):
        self.id = id
        self.name = name
        self.description = description
        self.test_type = test_type
        self.test_params = test_params
    
    def run(self, df_dict, *args, **kwargs):
        print(f'Running {self.id} test on {len(df_dict.values())} files ...')
        return self._run(df_dict, *args, **kwargs)
        
    def build_report(self, *args, **kwargs):
        required_columns = set(['name', 'description', 'result', 'details'])

        df = self._build_report(*args, **kwargs)
        if not isinstance(df, pandas.DataFrame):
            raise Exception(
                f'{type(self).__name__}_build_report must return '
                f'a pandas.DataFrame object!'
            )

        if not (required_columns.issubset(set(df.columns))):
            raise Exception(
                f'{type(self).__name__}_build_report DataFrame must have all required '
                f'columns: {required_columns}'
            )

        return df
    
    @abstractmethod
    def _run(self, df_dict):
        raise NotImplementedError()

    
    @abstractmethod
    def _build_report(self, *args, **kwargs):
        raise NotImplementedError()

    def to_dict(self):
        return {
            'test_id': self.id,
            'type': self.test_type,
            'name': self.name,
            'description': self.description,
            'input_params': self.test_params
        }

    def to_dataframe(self):
        return pandas.DataFrame([self.to_dict()])

    def __repr__(self):
        return pformat(self.to_dict())

class ValidationTestResult(ABC):
    def __init__(self, validation_test, result, data, message=None, result_details=None, error_locations=None):
        self.id = None
        self.validation_test = validation_test
        self.result = result
        self.data = data
        self.message = message or (
            f"{result_to_emoji(int(result))} "
            f"{self.validation_test.test_type.title()} "
            f"for '{self.validation_test.name}' "
            f"{'Succeeded!' if result else 'Failed!'}"
        )
        self.result_details = result_details
        self.error_locations = error_locations
    
    def to_dict(self):
        d = self.validation_test.to_dict()
        d.update({
            'result': self.result,
            'data': self.data,
            'message': self.message,
            'details': self.result_details,
            'error_locations': self.error_locations
        })
        return d
    
    def to_dataframe(self):
        d = self.to_dict()
        df = pandas.DataFrame([d])
        return df.drop('data', axis=1)
    
    
    def __repr__(self):
        return pformat(self.to_dict())
        
class CountValidationTest(ValidationTest):
    def __init__(self, name, description, concept_name, min_count, max_count):
        super().__init__(
            name,
            description,
            id=f'{concept_name}-UNIQUE-COUNT', 
            test_type='count-test',
            test_params=(concept_name, min_count, max_count)
        )
        self.concept_name = concept_name
        self.min_count = min_count
        self.max_count = max_count
    
    def to_dict(self):
        d = super().to_dict()
        d.update(
            {
                'concept': self.concept_name,
                'min_count': self.min_count,
                'max_count': self.max_count
            }
        )
        return d
    
    def _run(self, df_dict):
        # Stack dfs vertically into 1 df         
        cumulative_df = stack_dfs(
            df_dict,
            self.concept_name
        )
        if (cumulative_df is None) or (cumulative_df.empty):
            return ValidationTestResult(
                self, 2, None, 
                result_details=(
                    'Test did not run due to missing required columns: '
                    f'{self.concept_name}'
                )
            )
        else:
            # Unique count test
            error = None
            uc = cumulative_df.nunique().values[0]
            result = validate_count(uc, self.min_count, self.max_count)
            # Count check failed
            if not result:
                error = (self.concept_name, uc) 
            return ValidationTestResult(self, result, None, result_details=error)

    def _build_report(self, test_df):
        report_df = test_df[['result', 'name', 'description', 'details', 'error_locations']]
        report_df['result'] = test_df['result'].apply(
            lambda x: result_to_emoji(x)
        )
        report_df['details'] = test_df['details'].apply(
            lambda x: f'Found {x[1]} {x[0]} in data' if x else None
        )
        return report_df

class EntityTypeGraph(object):
    def __init__(self, relations):
        self.adj_list = relations
        self.graph = self._build()
    
    def _build(self):
        graph = Graph()
        for node, neighbors in self.adj_list.items():
            if node not in graph:
                graph.add_node(node)
            # Add directed edge from node to neighbor
            for n in neighbors:
                # Add neighbor node if not in graph
                if n not in graph:
                    graph.add_node(n)
                graph.add_edge(node, n)
        return graph
    
    def ancestors(self, node_id):
        ancestors = []
        while True:
            a = self.graph.nodes(to_node=node_id)
            if a:
                ancestors.extend(a)
                node_id = a[0]
            else:
                break
        return ancestors

    def descendants(self, node_id):
        descendants = []
        while True:
            d = self.graph.nodes(from_node=node_id)
            if d:
                descendants.extend(d)
                node_id = d[0]
            else:
                break
        return descendants
    
    def __repr__(self):
        return pformat(self._adj_list)
      
class RelationshipValidationTest(ValidationTest):
    """
    Validate relationships between enitity instances

    For example given:
    
        self.node_left=SPECIMEN.ID
        self.node_right=PARTICIPANT.ID
        self.min_relations=1
        self.max_relations =1

    A SPECIMEN must be linked to at least 1 PARTICIPANT and no more than 1 PARTICIPANT
    Find the SPECIMENS that violate this rule
    """  
    def __init__(
        self, name, description, node_left, node_right, min_relations, max_relations, type_relations=None
    ):
        super().__init__(
            name,
            description,
            id=f'{node_left}-->{node_right}', 
            test_type='relationship-test',
            test_params=(node_left, node_right, min_relations, max_relations)
        )
        self.node_left = node_left
        self.node_right = node_right
        self.min_relations = min_relations
        self.max_relations = max_relations
        self.relations = type_relations
    
    def to_dict(self):
        d = super().to_dict()
        d.update(
            {'relation': (self.node_left, self.node_right)}
        )
        return d
    
    def _run(self, df_dict):
        # For every instance of node_left, get group of connected node_right
        df = direct_connections(df_dict, self.node_left, self.node_right)
            
        # Test did not run if neither node exists
        if (df is None) or (df.empty):
            return ValidationTestResult(
                self, 2, None, 
                result_details=(
                    f'Test did not run, required columns not found {self.node_left, self.node_right}'
                )
            )
        
        # Test failed if 0 links were found         
        if len(df.columns) == 1:
            return ValidationTestResult(self, False, None)
        
        # Validate relationships in cumulative data          
        else:
            invalid_rows = df[~df['linked_node_count'].apply(
                lambda c: validate_count(c, self.min_relations, self.max_relations)
            )]
            errors = [
                (row[self.node_left], row[self.node_right])
                for i, row in invalid_rows.iterrows()
            ]
            
            #  Collect file locations of error values
            file_locations_per_error = [
                file_locations(node, df_dict)
                for node, _ in errors
            ]
            
            return ValidationTestResult(
                self, 
                invalid_rows.empty, 
                invalid_rows, 
                result_details=errors, 
                error_locations=file_locations_per_error
            )
    
    def _build_report(self, test_df):
    
        def _relation_error_msg(row):
            msgs = []
            left, right = row['relation']
            node_left_type = concept_from(left).title()
            node_right_type = concept_from(right).title()
            
            if int(row['result']) == 2:
                return row['details']

            if row['details'] is None:
                missing = (
                    node_right_type 
                    if concept_attr_from(right) == 'ID' 
                    else concept_attr_from(right)
                )
                return (
                    f'All {node_left_type} entities are missing {missing}'
                )

            for node, linked_nodes in row['details']:
                msg = f'{node_left_type} {node} is linked to'
                if not linked_nodes:
                    msg = f'{msg} 0 {node_right_type} entities'
                else:
                    msg = f'{msg} {len(linked_nodes)} {node_right_type} entities: {linked_nodes}'

                msgs.append(msg)

            return '\n'.join(msgs)

        def _file_location_msg(row):
            if row['error_locations']:
                return '\n'.join(
                    f'Found {error_value} in files: {locs}'
                    for error_value, locs in row['error_locations']
                )
            else:
                return ''
            
        report_df = test_df[['result', 'name', 'description', 'details']]
        report_df['result'] = test_df['result'].apply(
            lambda x: result_to_emoji(x)
        )
        report_df['details'] = test_df.apply(
            lambda row: _relation_error_msg(row),
            axis=1
        )
        report_df['error_locations'] = test_df.apply(
            lambda row: _file_location_msg(row),
            axis=1
        )
        return report_df

def markdown_report(df):
    filepath = os.path.join(os.getcwd(), 'validation_report.md')
    
    # Prepare data 
    out_cols = ['result', 'name', 'details', 'error_locations']
    counts = (
        df[df['test_type'] == 'count-test'][out_cols]
        .drop(['error_locations'],axis=1)
    )
    print(counts.columns)
    relations = df[df['test_type'] == 'relationship-test'][out_cols]
    attributes = df[df['test_type'] == 'attribute-test'][out_cols]
    definitions = df[['test_type', 'name', 'description']]
    
    # Write markdown     
    output = []
    output.append('# Validation Report')
    output.append('## Count Tests')
    output.append(counts.to_markdown(index=False))
    output.append('\n## Relation Tests')
    output.append(relations.to_markdown(index=False))
    output.append('\n## Attribute Tests')
    output.append(attributes.to_markdown(index=False))
    output.append('\n## Test Definitions')
    output.append(definitions.to_markdown(index=False))
    
    with open(filepath, 'w') as md_file:
        md_file.write('\n'.join(output))
    
    print(f'\nGenerated validation report at {filepath}')

def run_tests(test_data_dfs, test_params):
    report_dfs = []
    for test_type, tests in test_params.items():
        if test_type == 'count-test':
            cls = CountValidationTest
        else:
            cls = RelationshipValidationTest
        
        for test in tests:
            test_obj = cls(test['name'], test['desc'], *test['params'])
            test_df = test_obj.run(test_data_dfs).to_dataframe()
            report_df = test_obj.build_report(test_df)            
            report_df['test_type'] = test_type
            report_dfs.append(report_df)
        
    report_df = pandas.concat(report_dfs)
    report_df = report_df.sort_values(by=['result', 'test_type'])
    
    markdown_report(report_df)

    return report_df

test_params = {
    'relationship-test': [
        {
            'name': 'A Participant is in at least 1 Family Group',
            'desc': 'Every uniquely identifiable Participant must be linked to at '
                    ' least 1 uniquely identifiable Family within the study',
            'params': (CONCEPT.PARTICIPANT.ID, CONCEPT.FAMILY.ID, 1, None)
        },
        {
            'name': 'A Family Group must have at least 1 Participant',
            'desc': 'Every uniquely identifiable Family Group must have '
                    'at least 1 uniquely identifiable Participant within the study',
            'params': (CONCEPT.FAMILY.ID, CONCEPT.PARTICIPANT.ID, 1, None)
        },
        {
            'name': 'A Specimen comes from 1 Participant',
            'desc': 'Every uniquely identifiable Specimen must be linked to '
                    ' exactly 1 uniquely identifiable Participant within the study',
            'params': (CONCEPT.BIOSPECIMEN.ID, CONCEPT.PARTICIPANT.ID, 1, 1)
        },
        {
            'name': 'A Participant must have at least 1 Specimen',
            'desc': 'Every uniquely identifiable Participant must have at least 1 '
                    ' uniquely identifiable Specimen within the study',
            'params': (CONCEPT.PARTICIPANT.ID, CONCEPT.BIOSPECIMEN.ID, 1, None)
        },
        {
            'name': 'A Sequence Manifest File Record represents only 1 Specimen',
            'desc': 'Every uniquely identifiable Sequence Manifest File Record must be linked to '
                    'exactly 1 uniquely identifiable Specimen within the study',
            'params': (CONCEPT.GENOMIC_FILE.URL_LIST, CONCEPT.BIOSPECIMEN.ID, 1, 1)
        },
        {
            'name': 'A Specimen must have at least 1 Sequence Manifest File Record',
            'desc': 'Every uniquely identifiable specimen must be linked to '
                    'at least 1 uniquely identifiable Sequence Manifest File Record '
                    'within the study',
            'params': (CONCEPT.BIOSPECIMEN.ID, CONCEPT.GENOMIC_FILE.URL_LIST, 1, None)
        }
    ],
    'attribute-test': [
        {
            'name': 'A Participant must have exactly 1 gender',
            'desc': 'Every uniquely identifiable Participant must have exactly 1'
                    ' gender from the acceptable list: Male, Female',
            'params': (CONCEPT.PARTICIPANT.ID, CONCEPT.PARTICIPANT.GENDER, 1, 1)
        }
    ],
    'count-test': [
        {
            'name': 'Expected Participant Unique Count = 10',
            'desc': 'The number of uniquely identifiable participants found in '
                    'study must be equal to 10',
            'params': (CONCEPT.PARTICIPANT.ID, 10, 10)
        },
        {
            'name': 'Expected Specimen Unique Count = 12',
            'desc': 'The number of uniquely identifiable specimens found in '
                    'study must be equal to 12',
            'params': (CONCEPT.BIOSPECIMEN.ID, 12, 12)
        }
    ]
}

report_df = run_tests(test_data_dfs, test_params)
display(report_df)

Running PARTICIPANT|ID-->FAMILY|ID test on 6 files ...
Running FAMILY|ID-->PARTICIPANT|ID test on 6 files ...
Running BIOSPECIMEN|ID-->PARTICIPANT|ID test on 6 files ...
Running PARTICIPANT|ID-->BIOSPECIMEN|ID test on 6 files ...
Running GENOMIC_FILE|URL_LIST-->BIOSPECIMEN|ID test on 6 files ...
Running BIOSPECIMEN|ID-->GENOMIC_FILE|URL_LIST test on 6 files ...
Running PARTICIPANT|ID-->PARTICIPANT|GENDER test on 6 files ...
Running PARTICIPANT|ID-UNIQUE-COUNT test on 6 files ...
Running BIOSPECIMEN|ID-UNIQUE-COUNT test on 6 files ...
Index(['result', 'name', 'details'], dtype='object')

Generated validation report at /Users/singhn4/Projects/kids_first/kf-ingest-packages/kf_ingest_packages/packages/SD_46SK55A3/validation_report.md


Unnamed: 0,result,name,description,details,error_locations,test_type
0,☑️,A Participant must have exactly 1 gender,"Every uniquely identifiable Participant must have exactly 1 gender from the acceptable list: Male, Female","Test did not run, required columns not found ('PARTICIPANT|ID', 'PARTICIPANT|GENDER')",,attribute-test
0,✅,Expected Participant Unique Count = 10,The number of uniquely identifiable participants found in study must be equal to 10,,,count-test
0,✅,Expected Specimen Unique Count = 12,The number of uniquely identifiable specimens found in study must be equal to 12,,,count-test
0,❌,A Participant is in at least 1 Family Group,Every uniquely identifiable Participant must be linked to at least 1 uniquely identifiable Family within the study,Participant P11 is linked to 0 Family entities,Found P11 in files: {'pf.csv'},relationship-test
0,❌,A Family Group must have at least 1 Participant,Every uniquely identifiable Family Group must have at least 1 uniquely identifiable Participant within the study,Family F12 is linked to 0 Participant entities,Found F12 in files: {'pf.csv'},relationship-test
0,❌,A Specimen comes from 1 Participant,Every uniquely identifiable Specimen must be linked to exactly 1 uniquely identifiable Participant within the study,"Biospecimen S2 is linked to 2 Participant entities: {'P1', 'P2'}\nBiospecimen S8 is linked to 0 Participant entities","Found S2 in files: {'spf.csv', 'sfp2.csv'}\nFound S8 in files: {'spf.csv'}",relationship-test
0,❌,A Participant must have at least 1 Specimen,Every uniquely identifiable Participant must have at least 1 uniquely identifiable Specimen within the study,Participant P11 is linked to 0 Biospecimen entities\nParticipant P13 is linked to 0 Biospecimen entities,Found P11 in files: {'pf.csv'}\nFound P13 in files: {'pf.csv'},relationship-test
0,❌,A Sequence Manifest File Record represents only 1 Specimen,Every uniquely identifiable Sequence Manifest File Record must be linked to exactly 1 uniquely identifiable Specimen within the study,Genomic_File ['foo/s11.txt'] is linked to 0 Biospecimen entities\nGenomic_File ['foo/s5.txt'] is linked to 0 Biospecimen entities\nGenomic_File ['foo/s9.txt'] is linked to 0 Biospecimen entities,Found ['foo/s11.txt'] in files: {'pg.csv'}\nFound ['foo/s5.txt'] in files: {'pg.csv'}\nFound ['foo/s9.txt'] in files: {'pg.csv'},relationship-test
0,❌,A Specimen must have at least 1 Sequence Manifest File Record,Every uniquely identifiable specimen must be linked to at least 1 uniquely identifiable Sequence Manifest File Record within the study,Biospecimen NA is linked to 0 Genomic_File entities\nBiospecimen S1 is linked to 0 Genomic_File entities\nBiospecimen S11 is linked to 0 Genomic_File entities\nBiospecimen S2 is linked to 0 Genomic_File entities\nBiospecimen S3 is linked to 0 Genomic_File entities\nBiospecimen S4 is linked to 0 Genomic_File entities\nBiospecimen S5 is linked to 0 Genomic_File entities\nBiospecimen S6 is linked to 0 Genomic_File entities\nBiospecimen S8 is linked to 0 Genomic_File entities\nBiospecimen S9 is linked to 0 Genomic_File entities,"Found NA in files: {'sfp2.csv'}\nFound S1 in files: {'spf.csv'}\nFound S11 in files: {'sp.csv'}\nFound S2 in files: {'spf.csv', 'sfp2.csv'}\nFound S3 in files: {'spf.csv'}\nFound S4 in files: {'spf.csv'}\nFound S5 in files: {'spf.csv'}\nFound S6 in files: {'spf.csv'}\nFound S8 in files: {'spf.csv'}\nFound S9 in files: {'spf.csv'}",relationship-test
