This collection of scenarios demonstrates how to solve various data quality problems by exploiting patterns found (or validated) by Desbordante.

In this scenario, we showcase a simple application that performs anomaly detection in a table.

The idea of this scenario is described in the paper "Solving Data Quality Problems with Desbordante: a Demo" by G. Chernishev et al., available at https://arxiv.org/abs/2307.14935. There is also an interactive demo at https://desbordante.streamlit.app/.

# Anomaly detection example using Desbordante algorithms.

In [None]:
!pip install desbordante==2.3.2
!wget https://raw.githubusercontent.com/Desbordante/desbordante-core/refs/heads/main/examples/datasets/cargo_data_1.csv
!wget https://raw.githubusercontent.com/Desbordante/desbordante-core/refs/heads/main/examples/datasets/cargo_data_2.csv
!wget https://raw.githubusercontent.com/Desbordante/desbordante-core/refs/heads/main/examples/datasets/cargo_data_3.csv


import desbordante
import pandas


def setup_pandas_print():
    pandas.set_option('display.max_columns', None)
    pandas.set_option("display.max_rows", None)
    pandas.set_option('display.width', None)
    pandas.set_option('display.max_colwidth', None)
    pandas.set_option('display.expand_frame_repr', False)

setup_pandas_print()

Collecting desbordante==2.3.2
  Downloading desbordante-2.3.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Downloading desbordante-2.3.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (4.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.0/4.0 MB[0m [31m27.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: desbordante
Successfully installed desbordante-2.3.2
--2025-03-20 18:46:41--  https://raw.githubusercontent.com/Desbordante/desbordante-core/refs/heads/main/examples/datasets/cargo_data_1.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 4139 (4.0K) [text/plain]
Saving to: ‘cargo_data_1.csv’


2025-03-20 18:46:41 (32.1 MB/s) - ‘cargo_data_1.csv’ saved [4139/4139]

--2025-03

## Setting up various algorithm parameters.

In [None]:
# Parameters for pandas.read_csv(...).
HEADER = 0
SEPARATOR = ","

# Algorithm that finds exact FDs and its config.
EXACT_ALGORITHM_TYPE = desbordante.fd.algorithms.Default
EXACT_ALGORITHM_CONFIG = {}

# Algorithm that finds approximate FDs and its config.
APPROXIMATE_ALGORITHM_TYPE = desbordante.afd.algorithms.Default
ERROR = [0.01, 0.03, 0.05]
APPROXIMATE_ALGORITHM_CONFIG = {'error': ERROR}

METRIC_VERIFIER = "MetricVerifier"
METRIC_VERIFIER_CONFIG = {
    "lhs_indices": [1],
    "rhs_indices": [3],
    "metric": "euclidean",
    "metric_algorithm": "brute",
    "parameter": 4,
}

# available metrics: euclidean, levenshtein, cosine
# available metric algorithms: brute, approx, calipers
# parameter: desired distance related to metric. e.g., for euclidean metric parameter=4 means that euclidean distance has to be no larger than 4

# Variables to simplify the configuration string construction below.
EXACT_ALGORITHM = EXACT_ALGORITHM_TYPE.__name__
APPROXIMATE_ALGORITHM = APPROXIMATE_ALGORITHM_TYPE.__name__

CONFIG_STRING = f"""Starting anomaly detection scenario with parameters:
{ERROR=}
{HEADER=}
{SEPARATOR=}
{EXACT_ALGORITHM=}
{APPROXIMATE_ALGORITHM=}
{METRIC_VERIFIER=}"""

## Defining necessary functions.

In [None]:
def get_result_set_fd(df, algo_type, algo_config):
    algo = algo_type()
    algo.load_data(table=df, **algo_config)
    algo.execute(**algo_config)
    return set(algo.get_fds())


def get_result_mv(df, mv_config):
    mv = desbordante.mfd_verification.algorithms.Default()
    mv.load_data(table=df, **mv_config)
    mv.execute(**mv_config)
    return mv.mfd_holds()


def print_fds(fds):
    print('\n'.join(map(str, sorted(fds, key=lambda fd: fd.to_name_tuple()))))


def diff(fd_set_1, fd_set_2):
    diff = fd_set_1 - fd_set_2

    if diff:
        print("Missing FDs:")
        print_fds(diff)
    else:
        print("No missing FDs.")

    return diff

## Starting anomaly detection scenario with parameters:

In [None]:
print(CONFIG_STRING)

Starting anomaly detection scenario with parameters:
ERROR=[0.01, 0.03, 0.05]
HEADER=0
SEPARATOR=','
EXACT_ALGORITHM='HyFD'
APPROXIMATE_ALGORITHM='Pyro'
METRIC_VERIFIER='MetricVerifier'


In [None]:
df1 = pandas.read_csv("cargo_data_1.csv", sep=SEPARATOR, header=HEADER)
df2 = pandas.read_csv("cargo_data_2.csv", sep=SEPARATOR, header=HEADER)
df3 = pandas.read_csv("cargo_data_3.csv", sep=SEPARATOR, header=HEADER)

if not (df1.columns.tolist() == df2.columns.tolist() == df3.columns.tolist()):
    print('Datasets must have the same schemas!')
else:
    # mine FDs for D1
    print("============================")
    print("FDs found for dataset 1:")
    fds1 = get_result_set_fd(df1, EXACT_ALGORITHM_TYPE, EXACT_ALGORITHM_CONFIG)
    print_fds(fds1)

    # mine FDs for D2
    print("============================")
    print("FDs found for dataset 2:")
    fds2 = get_result_set_fd(df2, EXACT_ALGORITHM_TYPE, EXACT_ALGORITHM_CONFIG)
    print_fds(fds2)

    print()

    # check whether some of FDs are missing
    diff12 = diff(fds1, fds2)
    # diff is empty, proceed to D3

FDs found for dataset 1:
[item_id] -> item_weight
[item_weight] -> item_id
[record_id] -> cargo_id
[record_id] -> item_id
[record_id] -> item_weight
[record_id] -> timestamp
[timestamp] -> cargo_id
[timestamp] -> item_id
[timestamp] -> item_weight
[timestamp] -> record_id
FDs found for dataset 2:
[item_id] -> item_weight
[item_weight] -> item_id
[record_id] -> cargo_id
[record_id] -> item_id
[record_id] -> item_weight
[record_id] -> timestamp
[timestamp] -> cargo_id
[timestamp] -> item_id
[timestamp] -> item_weight
[timestamp] -> record_id

No missing FDs.


In [None]:
# mine FDs for D3
print("FDs found for dataset 3:")
fds3 = get_result_set_fd(df3, EXACT_ALGORITHM_TYPE, EXACT_ALGORITHM_CONFIG)
print_fds(fds3)
print()

# missing FD found here
diff23 = diff(fds2, fds3)

FDs found for dataset 3:
[item_weight] -> item_id
[record_id] -> cargo_id
[record_id] -> item_id
[record_id] -> item_weight
[record_id] -> timestamp
[timestamp] -> cargo_id
[timestamp] -> item_id
[timestamp] -> item_weight
[timestamp] -> record_id

Missing FDs:
[item_id] -> item_weight


In [None]:
# initiate processes for checking if missing FD has become an AFD

is_AFD = False
for error in ERROR:
    print("Checking for AFD with error =", error)
    APPROXIMATE_ALGORITHM_CONFIG["error"] = error
    afds = get_result_set_fd(df3, APPROXIMATE_ALGORITHM_TYPE, APPROXIMATE_ALGORITHM_CONFIG)
    # print_fds(afds)

    if diff in afds:
        is_AFD = True
        print("Missing FD is an AFD.")
    else:
        print("Missing FD is not an AFD.")

    print()

print()

if not is_AFD:
  print("Missing FD is not part of AFD set. proceed to MFD validation phase\n")
  # check the stats of RHS attribute
  print(df3["item_weight"].describe())

  # define range for MetricVerifier parameter as [1; std]
  for pj in range(1, int(df3["item_weight"].std())):
      METRIC_VERIFIER_CONFIG["parameter"] = pj
      mfd_holds = get_result_mv(df3, METRIC_VERIFIER_CONFIG)

      if mfd_holds:
          print("MFD with parameter {} holds.".format(pj))
          break
      else:
          print("MFD with parameter {} not holds.".format(pj))


Checking for AFD with error = 0.01
Missing FD is not an AFD.

Checking for AFD with error = 0.03
Missing FD is not an AFD.

Checking for AFD with error = 0.05
Missing FD is not an AFD.


Missing FD is not part of AFD set. proceed to MFD validation phase

count    100.000000
mean      54.590000
std       20.758883
min       27.000000
25%       32.000000
50%       59.000000
75%       68.250000
max       89.000000
Name: item_weight, dtype: float64
MFD with parameter 1 not holds.
MFD with parameter 2 holds.
