In [17]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [18]:
# import uuid
import time
from typing import List

import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.utils.validation import check_is_fitted
from statsmodels.tsa.stattools import adfuller
from IPython.display import Code, display
import inspect

import avh.utility_functions as utils
from avh.data_issues import (
    IssueTransfomer,
    NumericIssueTransformer,
    CategoricalIssueTransformer,
    IncreasedNulls,
    SchemaChange,
    DistributionChange,
    UnitChange,
    CasingChange,
    DQIssueDatasetGenerator,
    VolumeChangeUpsample,
    VolumeChangeDownsample,
    NumericPerturbation,
)

from avh.data_generation import (
    DataColumn,
    NumericColumn,
    CategoricalColumn,
    NormalNumericColumn,
    UniformNumericColumn,
    BetaNumericColumn,
    StaticCategoricalColumn,
    RandomCategoricalColumn,
    DataGenerationPipeline,
)

from avh.metrics import (
    Metric,
    SingleDistributionMetric,
    TwoDistributionMetric,
    RowCount,
    DistinctCount,
    DistinctRatio,
    CompleteRatio,
    Mean,
    Median,
    Range,
    Min,
    Max,
    Sum,
    MeanDigitLength,
    MeanPunctuationLength,
    MeanStringLength,
    EMD,
    KsDist,
    CohenD,
    KlDivergence,
    JsDivergence,
)

from avh.constraints import (
    Constraint,
    ConstantConstraint,
    ChebyshevConstraint,
    CantelliConstraint,
    CLTConstraint,
    ConjuctivDQProgram,
)

from avh.auto_validate_by_history import AVH

import plotly.express as px

In [19]:
import jupyter_black

jupyter_black.load()

In [20]:
rng = np.random.default_rng(42)

In [21]:
from avh.data_generation import DataGenerationPipeline, NormalNumericColumn, BetaNumericColumn

from avh.data_issues import IncreasedNulls
from avh.auto_validate_by_history import AVH

In [22]:
pipeline = DataGenerationPipeline(
    columns=[
        NormalNumericColumn("money", mean=300, std=10),
        BetaNumericColumn("height", alfa=18, beta=2),
    ],
    issues=[
        ("money", [IncreasedNulls(p=0.05)]),
    ],
    random_state=42,
)

In [23]:
H = [pipeline.generate_normal(20000, 30) for i in range(30)]

In [24]:
PS["money"]

KeyError: 'money'

### Synthetic benchmark data generation

**Benchmark setup**  
columns:
- Numeric:
    - count: 1000
    - dtypes:
        - int: 30%
        - float: 70%
    - distributions:
        - uniform: 10%
        - normal: 20%
        - beta: 70%
- Categorical:
    - count: 1000

### First attempts

In [None]:
import pickle

In [None]:
with open("../benchmark/benchmark_data.pickle", "rb") as f:
    data = pickle.load(f)
    column_history = data["column_history"]
    column_perturbations = data["column_perturbations"]

In [None]:
total_history_size = 60
train_history_window_size = 30

total_windows = total_history_size - train_history_window_size
columns = list([column_set.columns[0] for column_set in column_history[0]])

DC_GENERATOR = AVH()._get_default_issue_dataset_generator()

fpr_budgets = [
    0.0005,
    # 0.001,
    0.005,
    # 0.01,
    0.05,
    0.1,
    # 0.15,
    # 0.2,
    0.25,
    # 0.3,
    # 0.35,
    # 0.4,
    # 0.45,
    0.5,
]

# avh = AVH(verbose=0, random_state=42, optimise_search_space=False, n_jobs=-1)

In [None]:
column = "numeric_8"
history = [run[8] for run in column_history]
perturbations = column_perturbations[8]

In [36]:
avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)
dc_generator = avh._get_default_issue_dataset_generator()

for i in tqdm(range(total_windows)):
    train_h = history[i : i + train_history_window_size]
    test_h = history[i + train_history_window_size]

    Q = avh._generate_constraint_space(
        [run[column] for run in train_h], optimise_search_space=False
    )
    DC = dc_generator.generate(train_h[-1])[column]

    constraint_recalls = avh._precalculate_constraint_recalls_fast(Q, DC)

    for j, fpr_budget in enumerate(fpr_budgets):
        PS_singleton = avh._find_optimal_singleton_conjuctive_dq_program(
            Q, constraint_recalls, fpr_budget
        )

        PS_conjunctive = avh._find_optimal_conjunctive_dq_program(
            Q, constraint_recalls, fpr_budget
        )
        PS = (
            PS_conjunctive
            if len(PS_conjunctive.recall) >= len(PS_singleton.recall)
            else PS_singleton
        )

100%|██████████| 30/30 [00:58<00:00,  1.96s/it]


In [32]:
avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)
dc_generator = avh._get_default_issue_dataset_generator()


for i in range(total_windows):
    train_h = history[i : i + train_history_window_size]
    test_h = history[i + train_history_window_size]

    dq_problems = dc_generator.generate(train_h[-1])

    for j, fpr_budget in enumerate(fpr_budgets):
        PS = avh.generate(train_h, fpr_target=fpr_budget)
        print(PS["numeric_8"])
    break

ChebyshevConstraint(5.9634 <= Range <= 6.1305, FPR = 0.0004)
CLTConstraint(14305.2387 <= RowCount <= 25481.0946, FPR = 0.0000)
ChebyshevConstraint(1.0000 <= CompleteRatio <= 1.0000, FPR = 0.0000), FPR = 0.000423
ChebyshevConstraint(5.9634 <= Range <= 6.1305, FPR = 0.0004)
CLTConstraint(14305.2387 <= RowCount <= 25481.0946, FPR = 0.0000)
ChebyshevConstraint(1.0000 <= CompleteRatio <= 1.0000, FPR = 0.0000), FPR = 0.000423
ChebyshevConstraint(38070.5220 <= Sum <= 72059.9312, FPR = 0.0400)
ChebyshevConstraint(1.0000 <= CompleteRatio <= 1.0000, FPR = 0.0000)
ChebyshevConstraint(6.1113 <= Max <= 6.2795, FPR = 0.0004), FPR = 0.040416
ChebyshevConstraint(38070.5220 <= Sum <= 72059.9312, FPR = 0.0400)
ChebyshevConstraint(1.0000 <= CompleteRatio <= 1.0000, FPR = 0.0000)
ChebyshevConstraint(6.1113 <= Max <= 6.2795, FPR = 0.0004), FPR = 0.040416
CLTConstraint(18030.5240 <= RowCount <= 21755.8093, FPR = 0.1336), FPR = 0.133614
CLTConstraint(18651.4049 <= RowCount <= 21134.9284, FPR = 0.3173), FPR =

In [None]:
tp_per_threshold = np.zeros(shape=len(fpr_budgets))

avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)

for j, fpr_budget in enumerate(fpr_budgets):
    PS = avh.generate(train_h, fpr_target=fpr_budget)

    for recall_test in perturbations:
        column_prediction = not PS[column].predict(recall_test[1])
        tp_per_threshold[j] += column_prediction

In [None]:
tp_per_threshold

array([23., 23., 23., 23.,  4.,  4.])

In [None]:
avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)
PS = avh.generate(train_h, fpr_target=0.1)

print(PS)

for recall_test in perturbations:
    print(PS[column].predict(recall_test[1]))

ChebyshevConstraint(17409.6431 <= RowCount <= 22376.6902, FPR = 0.2500) 4
ChebyshevConstraint(16167.8814 <= RowCount <= 23618.4520, FPR = 0.1111) 4
ChebyshevConstraint(14926.1196 <= RowCount <= 24860.2137, FPR = 0.0625) 4
ChebyshevConstraint(13684.3579 <= RowCount <= 26101.9755, FPR = 0.0400) 4
ChebyshevConstraint(12442.5961 <= RowCount <= 27343.7372, FPR = 0.0278) 4
ChebyshevConstraint(11200.8343 <= RowCount <= 28585.4990, FPR = 0.0204) 4
ChebyshevConstraint(9959.0726 <= RowCount <= 29827.2608, FPR = 0.0156) 3
ChebyshevConstraint(8717.3108 <= RowCount <= 31069.0225, FPR = 0.0123) 3
ChebyshevConstraint(7475.5491 <= RowCount <= 32310.7843, FPR = 0.0100) 3
ChebyshevConstraint(6233.7873 <= RowCount <= 33552.5460, FPR = 0.0083) 3
ChebyshevConstraint(4992.0255 <= RowCount <= 34794.3078, FPR = 0.0069) 3
ChebyshevConstraint(3750.2638 <= RowCount <= 36036.0696, FPR = 0.0059) 3
ChebyshevConstraint(2508.5020 <= RowCount <= 37277.8313, FPR = 0.0051) 3
ChebyshevConstraint(1266.7402 <= RowCount <= 

In [None]:
c = CLTConstraint(RowCount)
c.fit(train_h, beta=1.5, strategy="std")

In [None]:
for bruh in perturbations:
    print(c.predict(bruh[1]))

True
True
True
True
True
True
False
False
False
False
True
True
True
True
True
True
True
True
True
True
True
True
True


In [None]:
history[29]

Unnamed: 0,numeric_8,numeric_8_neighbor
0,0.414507,8.550938
1,4.436840,9.319068
2,5.211691,10.864843
3,4.287375,11.542261
4,4.547087,8.661777
...,...,...
22213,5.650488,7.521687
22214,1.437375,9.912807
22215,0.266548,9.550843
22216,1.675252,8.976658


In [None]:
avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)
PS = avh.generate(train_h, fpr_target=0.25)

print(PS)

for recall_test in perturbations:
    print(PS[column].predict(recall_test[1]))

ChebyshevConstraint(17409.6431 <= RowCount <= 22376.6902, FPR = 0.2500) 4
ChebyshevConstraint(16167.8814 <= RowCount <= 23618.4520, FPR = 0.1111) 4
ChebyshevConstraint(14926.1196 <= RowCount <= 24860.2137, FPR = 0.0625) 4
ChebyshevConstraint(13684.3579 <= RowCount <= 26101.9755, FPR = 0.0400) 4
ChebyshevConstraint(12442.5961 <= RowCount <= 27343.7372, FPR = 0.0278) 4
ChebyshevConstraint(11200.8343 <= RowCount <= 28585.4990, FPR = 0.0204) 4
ChebyshevConstraint(9959.0726 <= RowCount <= 29827.2608, FPR = 0.0156) 3
ChebyshevConstraint(8717.3108 <= RowCount <= 31069.0225, FPR = 0.0123) 3
ChebyshevConstraint(7475.5491 <= RowCount <= 32310.7843, FPR = 0.0100) 3
ChebyshevConstraint(6233.7873 <= RowCount <= 33552.5460, FPR = 0.0083) 3
ChebyshevConstraint(4992.0255 <= RowCount <= 34794.3078, FPR = 0.0069) 3
ChebyshevConstraint(3750.2638 <= RowCount <= 36036.0696, FPR = 0.0059) 3
ChebyshevConstraint(2508.5020 <= RowCount <= 37277.8313, FPR = 0.0051) 3
ChebyshevConstraint(1266.7402 <= RowCount <= 

In [None]:
# general casse:
def test_precision(history: List[pd.DataFrame], column: str):
    fp_per_threshold = np.zeros(shape=len(fpr_budgets))

    avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)
    for i in range(total_windows):
        train_h = history[i : i + train_history_window_size]
        test_h = history[i + train_history_window_size]

        for j, fpr_budget in enumerate(fpr_budgets):
            PS = avh.generate(train_h, fpr_target=fpr_budget)

            column_prediction = not PS[column].predict(test_h[column])
            fp_per_threshold[j] += column_prediction

    return fp_per_threshold


def test_recall(history: List[pd.DataFrame], column: str):
    tp_per_threshold = np.zeros(shape=len(fpr_budgets))

    train_h = history[:train_history_window_size]
    test_h = history[train_history_window_size]
    recall_tests = DC_GENERATOR.generate(test_h)[column]

    avh = AVH(columns=[column], verbose=0, random_state=42, optimise_search_space=False)

    for j, fpr_budget in enumerate(fpr_budgets):
        PS = avh.generate(train_h, fpr_target=fpr_budget)

        for recall_test in recall_tests:
            column_prediction = not PS[column].predict(recall_test[1])
            tp_per_threshold[j] += column_prediction

    return tp_per_threshold


def test_algorithm(history: List[pd.DataFrame], column: str):
    fp_per_threshold = test_precision(history, column)
    tp_per_thershold = test_recall(history, column)

    return fp_per_threshold, tp_per_thershold


# column_fp_per_threshold = []
# column_tp_per_threshold = []

# for i, column in enumerate(tqdm(columns)):
#     history = [run[i] for run in H_FULL]
#     # fp_per_threshold = test_precision(history, column)
#     tp_per_thershold = test_recall(history, column)

#     # column_fp_per_threshold.append(fp_per_threshold)
#     column_tp_per_threshold.append(tp_per_thershold)

In [None]:
from joblib import Parallel, delayed

In [None]:
# parallelize per column
column_fp_per_threshold = []
column_tp_per_threshold = []

results = Parallel(n_jobs=2, timeout=9999, return_as="generator")(
    delayed(test_algorithm)([run[i] for run in H_FULL], col) for i, col in enumerate(columns)
)

for fp_array, tp_array in tqdm(results, total=len(columns)):
    column_fp_per_threshold.append(fp_array)
    column_tp_per_threshold.append(tp_array)

100%|██████████| 2/2 [06:48<00:00, 204.19s/it]


In [None]:
column_fp_per_threshold

[array([0., 0., 0., 0., 2., 4.]), array([0., 0., 0., 0., 1., 1.])]

In [None]:
column_tp_per_threshold

[array([23., 23., 22., 23., 23., 23.]), array([23., 23., 23., 23., 23., 23.])]

In [None]:
fp_predictions = np.array(column_fp_per_threshold)
tp_predictions = np.array(column_tp_per_threshold)

In [None]:
fp_predictions

array([[0., 0., 0., 0., 2., 4.],
       [0., 0., 0., 0., 1., 1.]])

In [None]:
tp_predictions

array([[23., 23., 22., 23., 23., 23.],
       [23., 23., 23., 23., 23., 23.]])

In [None]:
recalls = tp_predictions / 23
precisions = tp_predictions / (fp_predictions + tp_predictions)

In [None]:
recalls.shape

(2, 6)

In [None]:
a = recalls.mean(axis=0)
b = precisions.mean(axis=0)

In [None]:
px.line(x=a, y=b)

### LOF tests

In [39]:
from joblib import Parallel, delayed
from sklearn.neighbors import LocalOutlierFactor
from sklearn.preprocessing import StandardScaler

In [None]:
total_history_size = 60
train_history_window_size = 30

total_windows = total_history_size - train_history_window_size
columns = list([column_set.columns[0] for column_set in H_FULL[0]])

thresholds = np.arange(0, 1, 0.01)

DC_GENERATOR = AVH()._get_default_issue_dataset_generator()

In [None]:
# general casse:
def test_precision(history: List[pd.DataFrame], feature_history: np.ndarray, column: str):
    fp_per_threshold = np.zeros(shape=len(thresholds))

    scaler = StandardScaler()
    lof = LocalOutlierFactor(n_neighbors=30, contamination="auto", novelty=True)

    for j in range(total_windows):
        train_x = feature_history[j : j + train_history_window_size]
        test_x = feature_history[j + train_history_window_size].reshape(1, -1)

        train_x_scaled = scaler.fit_transform(train_x)
        test_x_scaled = scaler.transform(test_x)

        lof.fit(train_x_scaled)
        shifted_local_outlier_factor = lof.decision_function(test_x_scaled)

        fp_per_threshold += shifted_local_outlier_factor < thresholds

    return fp_per_threshold


def test_recall(
    history: List[pd.DataFrame], feature_history: np.ndarray, column: str
) -> np.ndarray:
    tp_per_threshold = np.zeros(shape=len(thresholds))

    dc_generator = AVH()._get_default_issue_dataset_generator()
    scaler = StandardScaler()
    lof = LocalOutlierFactor(n_neighbors=30, contamination="auto", novelty=True)

    train_x = feature_history[:train_history_window_size]
    original_test_sample = history[train_history_window_size]

    train_x_scaled = scaler.fit_transform(train_x)
    lof.fit(train_x_scaled)

    for recall_test in dc_generator.generate(original_test_sample)[column]:
        test_x = extract_features(recall_test[1]).reshape(1, -1)
        test_x_scaled = scaler.transform(test_x)

        shifted_local_outlier_factor = lof.decision_function(test_x_scaled)
        tp_per_threshold += shifted_local_outlier_factor < thresholds

    return tp_per_threshold


def extract_features(data: pd.Series) -> np.ndarray:
    row_count = RowCount.calculate(data)
    min_val = Min.calculate(data)
    max_val = Max.calculate(data)
    mean_val = Mean.calculate(data)
    median_val = Median.calculate(data)
    sum_val = Sum.calculate(data)
    range_val = Range.calculate(data)
    distinct_ratio = DistinctRatio.calculate(data)
    complete_ratio = CompleteRatio.calculate(data)

    features = [
        row_count,
        min_val,
        max_val,
        mean_val,
        median_val,
        sum_val,
        range_val,
        distinct_ratio,
        complete_ratio,
    ]

    return np.array(features)


def calculate_feature_history(history: List[pd.DataFrame], column: str) -> np.ndarray:
    feature_history = [extract_features(run[column]) for run in history]
    return np.array(feature_history)


def test_algorithm(history: List[pd.DataFrame], column: str):
    feature_history = calculate_feature_history(history, column)
    fp_per_threshold = test_precision(history, feature_history, column)
    tp_per_thershold = test_recall(history, feature_history, column)

    return fp_per_threshold, tp_per_thershold


# column_fp_per_threshold, column_tp_per_threshold = [], []
# for i, column in enumerate(tqdm(columns)):
#     history = [run[i] for run in H_FULL]

#     feature_history = calculate_feature_history(history, column)
#     fp_per_threshold = test_precision(history, feature_history, column)
#     tp_per_thershold = test_recall(history, feature_history, column)

#     column_fp_per_threshold.append(fp_per_threshold)
#     column_tp_per_threshold.append(tp_per_thershold)

In [None]:
# parallelize per column
column_fp_per_threshold, column_tp_per_threshold = [], []

results = Parallel(n_jobs=-1, timeout=9999, return_as="generator")(
    delayed(test_algorithm)([run[i] for run in H_FULL], col) for i, col in enumerate(columns)
)

for fp_array, tp_array in tqdm(results, total=len(columns)):
    column_fp_per_threshold.append(fp_array)
    column_tp_per_threshold.append(tp_array)

100%|██████████| 30/30 [00:08<00:00,  3.62it/s]


In [None]:
column_fp_per_threshold = np.array(column_fp_per_threshold)
column_tp_per_threshold = np.array(column_tp_per_threshold)

In [None]:
recalls = column_tp_per_threshold / 23
precisions = column_tp_per_threshold / (column_fp_per_threshold + column_tp_per_threshold)

In [None]:
a = recalls.mean(axis=0)
b = precisions.mean(axis=0)

In [40]:
import pickle
import plotly.graph_objects as go
from copy import deepcopy

In [45]:
with open("../benchmark/benchmark_avh_metrics.pickle", "rb") as f:
    metrics_avh = pickle.load(f)

with open("../benchmark/benchmark_ks_test_metrics.pickle", "rb") as f:
    metrics_ks = pickle.load(f)

with open("../benchmark/benchmark_lof_metrics.pickle", "rb") as f:
    metrics_lof = pickle.load(f)

with open("../benchmark/benchmark_tfdv_metrics.pickle", "rb") as f:
    metrics_tfdv = pickle.load(f)

In [44]:
fig = go.Figure()

fig.add_scatter(x=metrics_avh["recall"], y=metrics_avh["precision"], name="AVH")
fig.add_scatter(x=metrics_ks["recall"], y=metrics_ks["precision"], name="KS-test")
fig.add_scatter(x=metrics_lof["recall"], y=metrics_lof["precision"], name="LOF")
fig.add_scatter(x=metrics_tfdv["recall"], y=metrics_tfdv["precision"], name="TFDV")
fig.update_layout(width=700, height=700)

In [None]:
fig = go.Figure()

fig.add_scatter(x=metrics_avh["recall"], y=metrics_avh["precision"], name="AVH")
fig.add_scatter(x=metrics_ks["recall"], y=metrics_ks["precision"], name="KS-test")
fig.add_scatter(x=metrics_lof["recall"], y=metrics_lof["precision"], name="LOF")
fig.add_scatter(x=metrics_tfdv["recall"], y=metrics_tfdv["precision"], name="TFDV")
fig.update_layout(width=700, height=700)

In [None]:
fig = go.Figure()

fig.add_scatter(x=metrics_avh["recall"], y=metrics_avh["precision"], name="AVH")
fig.add_scatter(x=metrics_ks["recall"], y=metrics_ks["precision"], name="KS-test")
fig.add_scatter(x=metrics_lof["recall"], y=metrics_lof["precision"], name="LOF")
fig.add_scatter(x=metrics_hs["recall"], y=metrics_hs["precision"], name="Health-ESN")
fig.add_scatter(x=metrics_tfdv["recall"], y=metrics_tfdv["precision"], name="TFDV")
fig.update_layout(width=700, height=700)

In [None]:
fig = go.Figure()

fig.add_scatter(x=metrics_ks["recall"], y=metrics_ks["precision"])
fig.add_scatter(x=metrics_lof["recall"], y=metrics_lof["precision"])