In [1]:
import os
import sys
# add local lib to sys path for relative import
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

import pandas as pd
from src.log.common._types import LogFrame
# Load DF
df = pd.read_pickle('../test-data/numerics_df_v2.pkl')

In [2]:
class SchemaException(Exception):
    """The provided DataFrame does not have a valid schema. See schema_migration and/or 
    LogFrame.schema to rectify the issue"""
    
    ...

In [5]:
from pandas.util import hash_pandas_object
import hashlib

def _get_df_hash(df: pd.DataFrame):
    """Gets A SHA-256 Hash of the dataframe - does not conside attr values"""
    return int(hashlib.sha256(pd.util.hash_pandas_object(df, index=True).values).hexdigest(), 16)

def validate_schema(df: pd.DataFrame, SCHEMA = LogFrame):
    from pandas.api.types import is_string_dtype, is_integer_dtype, is_float_dtype, is_dtype_equal
    import typing
    # get lf schema as Dict[tuple(tl_col, col) : type]
    comp_fn = {
        str: is_string_dtype,
        int: is_integer_dtype,
        float: is_float_dtype
    }
    schema, wcs = SCHEMA.get_flat_schema()

    def _check_dtype(col, key_map = None) -> bool:
        # uses outer fn varaibles, schema, comp_fn and df
        dtype_opt = schema[col] if key_map == None else wcs[key_map].dtype

        if typing.get_origin(dtype_opt) == typing.Union:
            # iterate over args
            for arg in typing.get_args(dtype_opt):
                if arg != None:
                    # check it is this type
                    if comp_fn[arg](df.dtypes[col]):
                        # if so return True
                        return True
        # must be a single type                
        else:
            # check if Dtype matches
            if (comp_fn[dtype_opt](df.dtypes[col])):
                # if so return true
                return True
        # No Dtype matches .. return false
        return False
    
    def _check_wildcard(col):
        # checks if the column tuple matches a wildcard, if the count is correct,
        # create temporary key mappings -> this assumes that there's only 1 wild card per higherlevel index (which may not be valid)
        # also very inefficient as is done per wildcard check!
        t_col = (col[0], '*')
        tmp_wcs, map_wcs = {},{}
        for k,v in wcs.items():
            new_key = (k[0], str(k[1]()))
            tmp_wcs[new_key] = v
            map_wcs[new_key] = k

        if t_col in tmp_wcs.keys():
            map_key = map_wcs[t_col]

            if _check_dtype(col, map_key):
                # dtype matches -> make sure it doesn't break WC Constraints
                wcs[map_key].count += 1
                return True
        return False

    counter = 0 
    # iterate over columns
    for col in df.columns:
        schema_match = False
        # easy match
        if col in schema.keys():
            # check if union
            schema_match = _check_dtype(col=col)
            

        # Wildcard Match
        elif _check_wildcard:
            counter += 1
        else:
            ...
        # if the column matches the schema, then increment counter
        if schema_match:
            counter += 1

    
    # if all columns don't pass validation, Raise Schema Exception
    if counter != len(df.columns):
        raise SchemaException

    # Make sure WildCards match  
    for key, value in wcs.items():
        if value.count >= key[1].max or value.count < key[1].min:
            raise SchemaException

    df.attrs['vis-meta'] = {
        'hash': _get_df_hash(df)}
    return df


df2 = validate_schema(df)