# Create RWD$^e$ dataset

In this notebook, we aim to pollute perfect, non-trivial functional dependencies to create the RWD$^e$ dataset.

## Setup
First, load all the files from the RWD dataset. Futhermore, set some configuration parameters if running on an HPC cluster.

In [13]:
import os
import sys

import pandas as pd

# for Jupyter notebooks: add the path of 'code' to allow importing module
sys.path.append(os.path.join(os.getcwd(), ".."))
from afd_measures import utils as afd_utils

data_path = "../../data"
gt_path = "../../data/ground_truth.csv"
results_path = "../../results"

rwd_data = {}
for i, file in enumerate(
    filter(lambda f: f.endswith(".csv"), os.listdir(os.path.join(data_path, "rwd")))
):
    rwd_data[file] = pd.read_csv(os.path.join(data_path, "rwd", file))
    rwd_data[file].columns = [
        afd_utils.clean_colname(c) for c in rwd_data[file].columns
    ]

if os.path.exists(os.path.join(results_path, "rwd_results_0.csv")):
    rwd_results = pd.DataFrame()
    for file in filter(
        lambda f: f.startswith("rwd_results_") and f.endswith(".csv"),
        os.listdir(results_path),
    ):
        rwd_results = pd.concat(
            [rwd_results, pd.read_csv(os.path.join(results_path, file))]
        )
else:
    raise ValueError(
        "Results of RWD are missing. Execute `measure_rwd_afds.ipynb` first."
    )

  rwd_data[file] = pd.read_csv(os.path.join(data_path, "rwd", file))
  rwd_data[file] = pd.read_csv(os.path.join(data_path, "rwd", file))
  rwd_data[file] = pd.read_csv(os.path.join(data_path, "rwd", file))


## Identify the columns to pollute

We are looking for the columns that have to be polluted. That is, all columns that are the RHS of a *perfect* FD but are not the LHS or RHS of *any* AFD.

In [6]:
fds_gt = rwd_results.query("(afd == True) & (trivial_fd == False)").copy()
# add the counts of each column appearing in all FDs contained in the ground truth
lhs_counts = (
    fds_gt.loc[:, ["table", "lhs", "rhs"]]
    .groupby(["table", "lhs"])
    .count()
    .reset_index()
    .rename(columns={"rhs": "lhs_count"})
    .copy()
)
rhs_counts = (
    fds_gt.loc[:, ["table", "lhs", "rhs"]]
    .groupby(["table", "rhs"])
    .count()
    .reset_index()
    .rename(columns={"lhs": "rhs_count"})
    .copy()
)
# if a RHS appears as a LHS in the non-perfect ground truth FDs, it cannot be used to introduce noise
blocked_rhs_columns = set(fds_gt.query("exact_fd == False").loc[:, "lhs"].unique())
# create a DataFrame of FDs that could be polluted: perfect FDs, where the RHS is not blocked
fds_to_pollute = (
    fds_gt.query("(exact_fd == True) & (rhs not in @blocked_rhs_columns)")
    .loc[:, ["table", "lhs", "rhs"]]
    .copy()
)
fds_to_pollute = fds_to_pollute.merge(lhs_counts, on=["table", "lhs"]).copy()
fds_to_pollute = fds_to_pollute.merge(rhs_counts, on=["table", "rhs"]).copy()
# sort be the counts of appearences, which indicates which RHS / LHS columns to prioritize in order to maximize additional true AFDs after introducing noise
fds_to_pollute = fds_to_pollute.sort_values(
    ["rhs_count", "lhs_count"], ascending=[False, True]
).copy()
columns_to_pollute = set()
lhs_to_block = set()
# consume the whole list of FDs to pollute, where at each iteration all related FDs are removed (i.e. where the RHS of the consumed FD appears as either one, LHS or RHS)
while not fds_to_pollute.empty:
    head_table, head_rhs = fds_to_pollute.iloc[0].loc[["table", "rhs"]]
    # lhs_to_block indicates that we have chosen this column as the LHS of another FD which will be noisiated. Thus, do not use this RHS (due to the ordering, we can be sure that we have introduce at least as much new AFDs already)
    if head_rhs not in lhs_to_block:
        columns_to_pollute.add((head_table, head_rhs))
        lhs_to_block = lhs_to_block | set(
            fds_to_pollute.query("rhs == @head_rhs").loc[:, "lhs"].unique()
        )
    fds_to_pollute.drop(
        fds_to_pollute.query("(lhs == @head_rhs) | (rhs == @head_rhs)").index,
        inplace=True,
    )

In [8]:
from synthetic_data import generator as sd
import pandas as pd

fds_to_pollute = []
for table, column in columns_to_pollute:
    _df: pd.DataFrame = rwd_data[table].copy()
    _table_fds = (
        fds_gt.query("(table == @table) & (rhs == @column)")
        .loc[:, ["table", "lhs", "rhs"]]
        .copy()
    )
    # we are looking for the column with the most noise potential
    _settings = {"tuples": _df.shape[0]}
    _max_noise_index = (
        _table_fds.apply(
            lambda row: sd.get_noise_potential(
                _settings, {0: _df.loc[:, row["lhs"]], 1: _df.loc[:, row["rhs"]]}
            ),
            axis="columns",
        )
        .sort_values(ascending=True)
        .index[0]
    )
    fds_to_pollute.append((table, fds_gt.loc[_max_noise_index, "lhs"], column))

## Write the fds to pollute

Write the FDs we want to pollute to disk for later use.

In [9]:
import os
import pickle

if not os.path.exists(os.path.join(results_path, f"fds_to_pollute.pkl")):
    with open(os.path.join(results_path, f"fds_to_pollute.pkl"), "wb") as f:
        pickle.dump(fds_to_pollute, f)

## Pollute RWD

Introduce errors into the RWD tables.

In [10]:
import copy
import random

from synthetic_data import generator as sd

noise_funcs = {
    "copy": sd.introduce_noise_copy,
    "bogus": sd.introduce_noise_bogus,
    "typo": sd.introduce_noise_typo,
    "copy-lhs": sd.introduce_lhs_noise_copy,
}

noisy_dfs = {
    func: {nl: {} for nl in (0.01, 0.02, 0.05, 0.1)} for func in noise_funcs.keys()
}
for table_name, lhs_name, rhs_name in fds_to_pollute:
    original_df = rwd_data[table_name].copy()
    potential = sd.get_noise_potential_df(original_df, lhs_name, rhs_name)
    trimmed_df = original_df.loc[:, [lhs_name, rhs_name]].dropna().copy()
    clean = {0: trimmed_df.iloc[:, 0].to_list(), 1: trimmed_df.iloc[:, 1].to_list()}
    for noise_level in (0.01, 0.02, 0.05, 0.1):
        if potential <= noise_level:
            print(
                f"Cannot introduce noise of {noise_level} to {table_name}, {lhs_name} -> {rhs_name}. Potential of {potential} is too low."
            )
        else:
            settings = {
                "noise": noise_level,
                "tuples": trimmed_df.shape[0],
                "rhs_cardinality": trimmed_df.iloc[:, 1].nunique(),
            }
            for noise_type, noise_func in noise_funcs.items():
                noisy = noise_func(settings, copy.deepcopy(clean))
                if table_name in noisy_dfs[noise_type][noise_level]:
                    noisy_df = noisy_dfs[noise_type][noise_level][table_name]
                else:
                    noisy_df = original_df.copy()
                trimmed_df.loc[:, rhs_name] = noisy[1]
                noisy_df.update(trimmed_df, overwrite=True)
                noisy_dfs[noise_type][noise_level][table_name] = noisy_df.copy()

In [14]:
import itertools
import os

for noise_type, noise_level in itertools.product(
    noise_funcs.keys(), (0.01, 0.02, 0.05, 0.1)
):
    for table_name, df in noisy_dfs[noise_type][noise_level].items():
        df.to_csv(
            os.path.join(
                data_path, "rwd_e", f"polluted_{noise_type}_{noise_level}_{table_name}"
            ),
            index=False,
        )