# Introduction
A very important aspect of supervised and semi-supervised machine learning is the quality of the labels produced by human labelers. Unfortunately, humans are not perfect and in some cases may even maliciously label things incorrectly. In this assignment, you will evaluate the impact of incorrect labels on a number of different classifiers.

We have provided a number of code snippets you can use during this assignment. Feel free to modify them or replace them.


## Dataset
The dataset you will be using is the [Adult Income dataset](https://archive.ics.uci.edu/ml/datasets/Adult). This dataset was created by Ronny Kohavi and Barry Becker and was used to predict whether a person's income is more/less than 50k USD based on census data.

### Data preprocessing
Start by loading and preprocessing the data. Remove NaN values, convert strings to categorical variables and encode the target variable (the string <=50K, >50K in column index 14).

In [None]:
import pandas as pd
import numpy as np

In [None]:
# This can be used to load the dataset
data = pd.read_csv("adult.csv", na_values='?')
data.head()


##### Check the percentage of missing values in the columns. Rule of thumb: If the percentage of missing values is above 60%, remove the feature.

In [None]:
for column in data.columns:
    nan_count = data[column].isna().sum()/len(data)*100
    print("Percentage of NaN in column " + column + " is " + str(nan_count) + "\n")

Remove all rows that contain nan values, since the columns with missing values can't be imputed (no numerical values)

In [None]:
data_before = len(data)
data = data.dropna()
data_after = len(data)
print("Removed " + str(data_before-data_after) + " rows from the " + str(data_before) + " rows")

data = data.drop(columns=["education", "fnlwgt"])
print(data)


Turn string columns into categorical data

In [None]:
string_columns = ['workclass','marital-status','occupation','relationship','race','sex','native-country']
for col in string_columns:
    data[col] = pd.Categorical(data[col])

In [None]:
print(data['salary'].unique())
data['salary'] = data['salary'].str.strip().str.replace(r"\.$", "", regex=True)
data['salary'] = pd.Categorical(data['salary'],categories=["<=50K", ">50K"],ordered=False)

### Data classification
Choose at least 4 different classifiers and evaluate their performance in predicting the target variable.

#### Preprocessing
Think about how you are going to encode the categorical variables, normalization, whether you want to use all of the features, feature dimensionality reduction, etc. Justify your choices

A good method to apply preprocessing steps is using a Pipeline. Read more about this [here](https://machinelearningmastery.com/columntransformer-for-numerical-and-categorical-data/) and [here](https://medium.com/vickdata/a-simple-guide-to-scikit-learn-pipelines-4ac0d974bdcf).

<!-- #### Data visualization
Calculate the correlation between different features, including the target variable. Visualize the correlations in a heatmap. A good example of how to do this can be found [here](https://towardsdatascience.com/better-heatmaps-and-correlation-matrix-plots-in-python-41445d0f2bec).

Select a features you think will be an important predictor of the target variable and one which is not important. Explain your answers. -->

#### Evaluation
Use a validation technique from the previous lecture to evaluate the performance of the model. Explain and justify which metrics you used to compare the different models.

In [None]:
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
from sklearn import tree
from sklearn.svm import LinearSVC
from sklearn.linear_model import SGDClassifier, LogisticRegression
from sklearn.model_selection import cross_val_score, StratifiedKFold, cross_val_predict
from sklearn.metrics import classification_report, accuracy_score
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import seaborn as sns


# determine categorical and numerical features
numerical_ix = ['age','education-num','capital-gain','capital-loss','hours-per-week']
categorical_ix = ['workclass','marital-status','occupation','relationship','race','sex','native-country']

# Define your preprocessing steps here
steps = [('cat', OneHotEncoder(handle_unknown='ignore'), categorical_ix), ('num', MinMaxScaler() , numerical_ix)]


# Apply your model to feature array X and labels y
def apply_model(model, ct, X, y, feature_reduction = False):
    pipeline_pca = Pipeline(steps=[('t', ct), ('pca', PCA(n_components=60)), ('m', model)])
    pipeline_nopca = Pipeline(steps=[('t', ct) , ('m', model)])

    return evaluate_model(X, y, pipeline_nopca, pipeline_pca, feature_reduction)

# Apply your validation techniques and calculate metrics
def evaluate_model(X, y, pipeline_nopca, pipeline_pca, feature_reduction=False):
    cv = StratifiedKFold(n_splits=5, shuffle=True)

    scores_nopca = cross_val_score(pipeline_nopca, X, y, cv=cv, scoring="accuracy")
    scores_pca = cross_val_score(pipeline_pca, X, y, cv=cv, scoring="accuracy")

    #print("Mean accuracy without PCA:", scores_nopca.mean())
    #print("Mean accuracy with PCA   :", scores_pca.mean())

    y_pred = cross_val_predict(pipeline_pca, X, y, cv=cv)

    print("\nClassification Report:")
    print(classification_report(y, y_pred))

    return scores_nopca.mean(), scores_pca.mean()

### DEPRECATED METHOD AS PCA IS USED INSTEAD OF FEATURE IMPORTANCE

# def show_feature_importance(pipeline, X, y, top_n = 12):
#     model = pipeline.named_steps["m"]
#     feature_names = pipeline.named_steps["t"].get_feature_names_out()

#     importance = None

#     if hasattr(model, "feature_importances_"):
#         importance = model.feature_importances_ * 100
#     elif hasattr(model, "coef_"):
#         importance = abs(model.coef_[0])
#     else:
#         print("Using permutation importance (slower)...")
#         r = permutation_importance(pipeline, X, y, n_repeats=10, random_state=42)
#         importance = r.importances_mean

#     df = pd.DataFrame({"feature": feature_names, "importance": importance})

#     df["base_feature"] = (
#         df["feature"]
#         .str.replace(r"^cat__|^num__", "", regex=True)   # remove prefixes
#         .str.split("_").str[0]                          # keep original feature name
#     )

#     agg_df = df.groupby("base_feature")["importance"].sum().sort_values(ascending=False)

#     print("\nTop Features (aggregated):")
#     print(agg_df.head(top_n))

#     red = agg_df.head(top_n).index.to_list()

#     # --- Plot aggregated importance ---
#     plt.figure(figsize=(10, 6))
#     sns.barplot(x=agg_df.head(top_n), y=agg_df.head(top_n).index, palette="viridis")
#     plt.title(f"Aggregated Feature Importance ({type(model).__name__})")
#     plt.xlabel("Importance")
#     plt.ylabel("Feature")
#     plt.show()

#     return red

# DEPRECATED METHOD AS PCA IS USED INSTEAD OF FEATURE SELECTION

# def compare_and_plot(models, ct, X, y):
#     cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
#     results = {}

#     for name, model in models.items():
#         # Full features
#         pipeline_full = Pipeline(steps=[('t', ct), ('m', model)])
#         scores_full = cross_val_score(pipeline_full, X, y, cv=cv, scoring="accuracy")

#         # Reduced features
#         reduced_features = apply_model(model, ct, X, y, feature_reduction=True)
#         X_reduced = X[reduced_features]

#         cat_selected = [c for c in reduced_features if c in categorical_ix]
#         num_selected = [c for c in reduced_features if c in numerical_ix]

#         ct_reduced = ColumnTransformer([
#             ('cat', OneHotEncoder(handle_unknown='ignore'), cat_selected),
#             ('num', MinMaxScaler(), num_selected)
#         ])
#         pipeline_reduced = Pipeline(steps=[('t', ct_reduced), ('m', model)])
#         scores_reduced = cross_val_score(pipeline_reduced, X_reduced, y, cv=cv, scoring="accuracy")

#         # Store mean difference
#         results[name] = scores_reduced.mean() - scores_full.mean()

#     # Plot differences
#     plt.figure(figsize=(8, 5))
#     plt.barh(list(results.keys()), list(results.values()), color="skyblue")
#     plt.axvline(0, color="red", linestyle="--")
#     plt.xlabel("Accuracy Difference (Reduced - Full)")
#     plt.title("Effect of Feature Reduction on Model Accuracy")
#     plt.show()

#     return results

ct = ColumnTransformer(steps)

models = {
    "LogReg": LogisticRegression(max_iter=10000),
    "SGD": SGDClassifier(loss="hinge", penalty="l2", max_iter=10000),
    "DecisionTree": tree.DecisionTreeClassifier(),
    "LinearSVC": LinearSVC()
}

results_nopca = []
results_pca = []

for model in models.values():
    y = data['salary']
    X = data.drop('salary', axis=1)

    npca, ypca = apply_model(model, ct, X, y)
    results_nopca.append(npca)
    results_pca.append(ypca)

x = np.arange(len(models))
width = 0.35

plt.figure(figsize=(8,5))
plt.bar(x - width/2, results_nopca, width, label="No PCA", color="steelblue")
plt.bar(x + width/2, results_pca, width, label="With PCA", color="orange")
plt.xticks(x, list(models.keys()))
plt.ylabel("Mean CV Accuracy")
plt.title("Model Performance: With vs Without PCA")
plt.legend()
plt.show()




### Label perturbation
To evaluate the impact of faulty labels in a dataset, we will introduce some errors in the labels of our data.


#### Preparation
Start by creating a method which alters a dataset by selecting a percentage of rows randomly and swaps labels from a 0->1 and 1->0.


In [None]:
"""Given a label vector, create a new copy where a random fraction of the labels have been flipped."""
def pertubate(y: np.ndarray, fraction: float) -> np.ndarray:
    copy = y.copy()
    n = len(y)

    rng = np.random.default_rng()
    flip_idx = rng.choice(n, size=int(fraction*n), replace=False)

    copy.iloc[flip_idx] = 1 - copy.iloc[flip_idx]

    return copy

#### Analysis
Create a number of new datasets with perturbed labels, for fractions ranging from `0` to `0.5` in increments of `0.1`.

Perform the same experiment you did before, which compared the performances of different models except with the new datasets. Repeat your experiment at least 5x for each model and perturbation level and calculate the mean and variance of the scores. Visualize the change in score for different perturbation levels for all of the models in a single plot.

State your observations. Is there a change in the performance of the models? Are there some classifiers which are impacted more/less than other classifiers and why is this the case?

In [None]:
og_data = pd.read_csv("adult.csv", na_values='?')
data = og_data.copy()

for column in data.columns:
    nan_count = data[column].isna().sum()/len(data)*100

data_before = len(data)
data = data.dropna()
data_after = len(data)

data = data.drop(columns=['education', 'fnlwgt'])
string_columns = ['workclass','marital-status','occupation','relationship','race','sex','native-country']
for col in string_columns:
    data[col] = pd.Categorical(data[col])

data['salary'] = data['salary'].str.strip().str.replace(r"\.$", "", regex=True)
data['salary'] = data['salary'].replace({"<=50K":0, ">50K":1})

og_data = data

In [None]:
salary = og_data['salary']

data_00 = og_data.copy()

data_01 = og_data.copy()
data_01['salary'] = pertubate(salary, 0.1)

data_02 = og_data.copy()
data_02['salary'] = pertubate(salary, 0.2)

data_03 = og_data.copy()
data_03['salary'] = pertubate(salary, 0.3)

data_04 = og_data.copy()
data_04['salary'] = pertubate(salary, 0.4)

data_05 = og_data.copy()
data_05['salary'] = pertubate(salary, 0.5)

# 4 different models
lr = LogisticRegression()
sgd = SGDClassifier(loss="hinge", penalty="l2", max_iter=10000)
dt = tree.DecisionTreeClassifier()
svc = LinearSVC()

data = [("data_00", data_00), ("data_01", data_01), ("data_02", data_02), ("data_03" ,data_03), ("data_04", data_04), ("data_05", data_05)]
models = [lr, sgd, dt, svc]

results = {
    f"data_{i:02d}": {
        model: {"mean": None, "variance": None}
        for model in models
    }
    for i, _ in enumerate(data)
}

numerical_ix = ['age' ,'education-num','capital-gain','capital-loss','hours-per-week']
categorical_ix = ['workclass','marital-status','occupation','relationship','race','sex','native-country']

steps = [('cat', OneHotEncoder(handle_unknown='ignore'), categorical_ix), ('num', MinMaxScaler() , numerical_ix)]

ct = ColumnTransformer(steps)

for m in models:
    for name, df in data:
        scores = []

        for r in range(0,5):
            y = df['salary']
            X = df.drop('salary', axis=1)

            _, score = apply_model(m, ct, X, y)
            scores.append(score)

        mean = np.mean(scores)
        variance = np.var(scores)

        results[name][m]["mean"] = mean
        results[name][m]["variance"] = variance



In [None]:
data_names = ["data_00", "data_01", "data_02", "data_03", "data_04", "data_05"]
x = range(len(data_names))  # 0..5

plt.figure(figsize=(8,5))

for model in models:
    means = [results[dname][model]["mean"] for dname in data_names]
    variances = [results[dname][model]["variance"] for dname in data_names]
    std_devs = np.sqrt(variances)

    plt.plot(x, means, marker='o', label=model)
    plt.fill_between(x,
                     np.array(means) - std_devs,
                     np.array(means) + std_devs,
                     alpha=0.2)

plt.xticks(x, data_names)
plt.xlabel("Dataset")
plt.ylabel("Mean Score")
plt.title("Model Performance Across Datasets")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
records = []
for dataset, models_dict in results.items():
    for model_name, stats in models_dict.items():
        records.append({
            "Dataset": dataset,
            "Model": model_name,
            "Mean": stats["mean"],
            "Variance": stats["variance"]
        })

df = pd.DataFrame(records)

print(df)

Observations + explanations: max. 400 words

#### Discussion

1)  Discuss how you could reduce the impact of wrongly labeled data or correct wrong labels. <br />
    max. 400 words



    Authors: Youri Arkesteijn, Tim van der Horst and Kevin Chong.


## Machine Learning Workflow

From part 1, you will have gone through the entire machine learning workflow which are they following steps:

1) Data Loading
2) Data Pre-processing
3) Machine Learning Model Training
4) Machine Learning Model Testing

You can see these tasks are very sequential, and need to be done in a serial fashion.

As a small perturbation in the actions performed in each of the steps may have a detrimental knock-on effect in the task that comes afterwards.

In the final part of Part 1, you will have experienced the effects of performing perturbations to the machine learning model training aspect and the reaction of the machine learning model testing section.

# Part 2: Data Discovery - RADU


This part implements four methods to find similar columns across different tables in a dataset, two for each relation: join and union. The methods are:
1. Set Containment method (JOIN): Similarity between column values of different tables. \
2. Column name method (2xUNION): Similarity between column names of different tables. Measured in two different ways:
    - Levenshtein distance (Shows how many single-character edits are needed to change one string into another)
    - Jaccard similarity on shingles.
3. JOSIE method (JOIN): Similarity between column values of different tables, using JOSIE. Creates posting lists and prints a top k list of similar columns (k=3).

Thresholds have been set to 0.8, to avoid too many false positives.
First, we read the datasets, and store them in a list of dataframes.

In [None]:
import tqdm
# Load the dataset

def read_csv(file_path):
    try:
        df = pd.read_csv(file_path, low_memory=False)
        return df
    except Exception:
        return pd.read_csv(file_path, delimiter='_', low_memory=False)

tables = []
for i in range(0, 19):
    ct = read_csv(f'lake33/table_{i}.csv')
    ct.name = f'table_{i}.csv'
    tables.append(ct)

We first run the methods on uncleaned data, to identify first results.

In [None]:
def set_containment(set_a, set_b):
    """ Returns the containment of setA in setB """
    if len(set_a) == 0:
        return 0
    return len(set_a.intersection(set_b)) / len(set_a)

In [None]:
# Set Containment method:
def set_containment_method(dataset):
    threshold = 0.8

    rows_list = []
    # Iterate through pairs of tables
    for i, df1 in enumerate(tqdm.tqdm(dataset)):
        for j, df2 in enumerate(dataset):
            if i >= j:
                continue
            # Iterate through pairs of columns of both tables:
            for colidx1, col1 in enumerate(df1.columns):
                for colidx2, col2 in enumerate(df2.columns):
                    vals1 = set(df1[col1].dropna())
                    vals2 = set(df2[col2].dropna())

                    sc1 = set_containment(vals1, vals2)
                    if sc1 >= threshold:
                        dict = {}
                        dict.update({'Dataset 1': df1.name, 'Column 1': str(col1), 'Dataset 2': df2.name, 'Column 2': str(col2), 'Set Containment': sc1, 'Relation': 'JOIN'})
                        rows_list.append(dict)

    result = pd.DataFrame(rows_list, columns=['Dataset 1', 'Dataset 2', 'Relation', 'Column 1', 'Column 2', 'Set Containment'])
    return result

result_sc_unclean = set_containment_method(tables)
result_sc_unclean.to_csv('outputs/set_containment_results_unclean.csv', encoding='utf-8', index=False, header=True)

Results method 1: (Runtime ~ 10 minutes)

We notice the set containment method manages to identify many similarities between tables. For instance, it correctly captures similarities between tables 1 and 2, which appear to have similar data, even when the column titles are mismatched. This is the case for many tables, where the column names are different but the values are similar.

However, this method yields a large number of false positives as well. For example, some tables appear to contain some sort of sensor data, which may have a small range of values that can potentially overlap with some other columns that have no connection to sensors, but a wide range of values. This yields a set containment value that exceeds the threshold, but the columns are not actually similar in any meaningful way.

One method to mitigate this issue would be to consider the set containment relation in both directions, however that would significantly increase the computation time.

In [None]:
# Column name method
# Two ideas: Levenshtein distance, or Jaccard similarity on shingles.
def levenshtein(s1, s2):
    m = len(s1)
    n = len(s2)

    prev_row = [i for i in range(n + 1)]
    curr_row = [0] * (n + 1)

    for i in range(1, m + 1):
        curr_row[0] = i
        for j in range(1, n + 1):
            if s1[i - 1] == s2[j - 1]:
                curr_row[j] = prev_row[j - 1]
            else:
                curr_row[j] = min(prev_row[j - 1], prev_row[j], curr_row[j - 1]) + 1
        prev_row = curr_row.copy()
    return curr_row[n]

def jaccard_similarity(s1, s2, k=2):
    def get_shingles(s, k):
        return {s[i:i+k] for i in range(len(s) - k + 1)}

    shingles1 = get_shingles(s1, k)
    shingles2 = get_shingles(s2, k)

    intersection = len(shingles1.intersection(shingles2))
    union = len(shingles1.union(shingles2))

    if union == 0:
        return 0.0
    return intersection / union

def column_names(dataset):
    threshold = 0.8
    rows_list = []
    for i, df1 in enumerate(tqdm.tqdm(dataset)):
        for j, df2 in enumerate(dataset):
            if i >= j:
                continue
            for colidx1, col1 in enumerate(df1.columns):
                for colidx2, col2 in enumerate(df2.columns):
                    c1 = col1.lower()
                    c2 = col2.lower()
                    has_result = False

                    dict = {'Dataset 1': df1.name, 'Column 1': str(col1), 'Dataset 2': df2.name, 'Column 2': str(col2), 'Relation': 'UNION'}

                    lev_ratio = (levenshtein(col1.lower(), col2.lower())) / max(len(c1), len(c2))
                    if lev_ratio <= (1 - threshold):
                        has_result = True
                        dict.update({'Levenshtein Ratio': 1 - lev_ratio})

                    for k in range(2,6):
                        sim = jaccard_similarity(c1, c2, k)
                        if sim >= threshold:
                            has_result = True
                            dict.update({f'Jaccard (k={k})': sim})
                    if has_result:
                        rows_list.append(dict)

    result = pd.DataFrame(rows_list, columns=['Dataset 1', 'Dataset 2', 'Relation', 'Column 1', 'Column 2', 'Levenshtein Ratio', 'Jaccard (k=2)', 'Jaccard (k=3)', 'Jaccard (k=4)', 'Jaccard (k=5)'])
    return result

result_cn_unclean = column_names(tables)
result_cn_unclean.to_csv('outputs/column_name_results_unclean.csv', encoding='utf-8', index=False, header=True)

Results method 2 (methods 2.1 and 2.2): (Runtime ~ 75 seconds)

The column name method is faster, as we only analyse the column names instead of searching through the whole values. The Levenshtein distance method manages to capture similarities when the column names are very similar. The threshold has been set to 20% difference, which means that the strings can differ by at most 20% of the length of the longer string. This may exclude shorter strings that are similar, but works well for longer strings which may contain spelling mistakes or shortcuts.

The Jaccard similarity method on shingles captures similarities when column names have similar substrings. This works well for column names that differ by many characters semantically, but share common words or abbreviations.

On the other hand, if the names are completely different, both methods cannot capture any similarity between tables, leading to a large number of false negatives.

In [None]:
# JOSIE method

def josie(dataset):
    # Create posting lists
    postings = {}
    for i, df in enumerate(tqdm.tqdm(dataset)):
        for j, col in enumerate(df.columns):
            vals = set(df[col].dropna())
            for entry in vals:
                entry = str(entry)
                if entry not in postings:
                    postings[entry] = set()
                postings[entry].add((df.name, j))

    k = 3
    rows_list = []

    for i, df1 in enumerate(tqdm.tqdm(dataset)):
        for j, col1 in enumerate(df1.columns):
            vals1 = set(df1[col1].dropna())
            counter = {}
            for val in vals1:
                val = str(val)
                if val in postings:
                    for (table_id, col_id) in postings[val]:
                        if table_id == df1.name: # Do not match within the same table
                            continue
                        if (table_id, col_id) not in counter:
                            counter[(table_id, col_id)] = 0
                        counter[(table_id, col_id)] += 1
            sorted_counter = sorted(counter.items(), key=lambda x: x[1], reverse=True)
            has_result = False
            dict = {'Dataset 1': df1.name, 'Column 1': str(col1)}
            for ((table_id, col_id), count) in sorted_counter[:(min(len(counter), k))]:
                has_result = True
                dict.update({f'Similar Column {len(dict)-1}': f'Table {table_id}, Column {col_id}, Overlap: {count}'})
            if has_result:
                rows_list.append(dict)

    col_names = ['Dataset 1', 'Column 1']
    for i in range(k):
        col_names.append(f'Similar Column {i+1}')

    result = pd.DataFrame(rows_list, columns=col_names)
    return result

result_josie_unclean = josie(tables)
result_josie_unclean.to_csv('outputs/josie_results_unclean.csv', encoding='utf-8', index=False, header=True)

Results method 3: (Runtime ~ 8 seconds)

JOSIE does great at creating posting lists quickly, and uses them to quickly find similar columns. It is very fast and also creates a top k list of similar columns, which is useful for considering more potential matches.

However, this method only uses overlap as a metric, which is not giving very insightful information on its own. This method works best if combined with other previous approaches, which could give a better idea if two columns are actually similar or not.

-- Rest of assignment follows --

# Part 2: Data Discovery MinHash - Berken Tekin

Discovery algorithm:
1. Scan each database with read_csv.
2. Flatten each database, convert it to one string
3. Shingle with k=6
4. The MinHash and CMinHash values are calculated for each set of shingles
5. LSH returns pairs of columns with similarities above a threshold.


In [None]:
import numba
@numba.jit(nopython=True)
def kshingle_manual(s, k=6):
   sh = set()
   for i in range(len(s) - k + 1):
       sh.add(s[i:i + k])
   return list(sh)

We run the experiment twice for dirty and clean datasets.

In [None]:
csv_path = "./lake33"

In [None]:
from collections import namedtuple
from collections import defaultdict
import os
import re
import pandas as pd

ColStruct = namedtuple("ColStruct", ["index", "col_name", "value"])

df_dict = {}
df_col_dict = defaultdict(list)

for root, dirs, files in os.walk(csv_path):
    for csv_file in files:
        if os.path.splitext(csv_file)[1] == ".csv":
            file_path = os.path.join(root, csv_file)
            try:
                # Naive read_csv() as mentionmed ion the report
                df = pd.read_csv(file_path, sep=None, engine='python', on_bad_lines='skip', header=0)
                df = df.dropna().astype(object)
                # print(df)
            except Exception as e:
                continue
            # Serialize whole file (flattened)
            df_str = df.to_numpy().flatten()
            df_str = " ".join(str(x) for x in df_str).replace("\n", " ")
            df_str = re.sub(r"\s+", " ", df_str).strip()
            df_dict[csv_file] = df_str

            # Serialize each column for this file
            for col_idx, col in enumerate(df.columns):
                col_arr = df[col].to_numpy().flatten()
                col_str = " ".join(str(x) for x in col_arr).replace("\n", " ")
                col_str = re.sub(r"\s+", " ", col_str).strip()
                df_col_dict[csv_file].append(ColStruct(index=col_idx, col_name=str(col), value=col_str))


In [None]:
from datasketch import MinHash, MinHashLSH
from typing import Callable, Generator, Iterable, List, Optional, Tuple
from datasketch.hashfunc import sha1_hash32
from tqdm import tqdm
import numpy as np


The datasketch.MinHash module is a good starting point to build the CMinHash algorithm.

For MinHash, at the beginning I did not really understand what the algorithm does.
Working on extending a module instead of starting from scratch helped me understand the algorithm better. I also learned about batch processing from datasketch.MinHash.

In [None]:
minhash_files = {}
cminhash_files = {}
cminhash_files_pi = {}
cminhash_cols = defaultdict(list)
cminhash_cols_pi = defaultdict(list)
minhash_cols = defaultdict(list)
lookup = {}
mh_file_ind = 0
hashvalue_byte_size = 4
# The size of a hash value in number of bytes


class CMinHashTest(MinHash):

    _large_prime = np.uint64((1 << 31) - 1) # A large prime
    _hash_range = 1 << 31

    def __init__(self, num_perm: int = 128, seed: int = 1234, hashfunc: Callable = sha1_hash32,
                 hashobj: Optional[object] = None, hashvalues: Optional[Iterable] = None,
                 permutations: Optional[Tuple[Iterable, Iterable]] = None, c_minhash_rotations: Optional[int] = None) -> None:
        super().__init__(num_perm, seed, hashfunc, hashobj, hashvalues, permutations)
        if c_minhash_rotations is not None: # Hard code
            self.c_minhash_rotations = c_minhash_rotations
        self.pi_permutations = self._init_pi_permutations()
        self.sigma_permutations = self._init_sigma_permutations()
        self.shift_right_precomputed = np.array([k for k in range(1, self.c_minhash_rotations + 1)], dtype=np.uint64).T


    def _init_sigma_permutations(self):
        gen = np.random.RandomState(self.seed + 1) # So we get different values for sigma permutation
        return np.array([
            gen.randint(1, self._large_prime, dtype=np.uint64),
            gen.randint(0, self._large_prime, dtype=np.uint64),
        ], dtype=np.uint64).T

    def _init_pi_permutations(self):
        # Create parameters for a random bijective permutation function
        # that maps a 32-bit hash value to another 32-bit hash value.
        # http://en.wikipedia.org/wiki/Universal_hashing
        gen = np.random.RandomState(self.seed)
        return np.array(
            [
                (
                    gen.randint(1, self._large_prime, dtype=np.uint64),
                    gen.randint(0, self._large_prime, dtype=np.uint64),
                )
            ],
            dtype=np.uint64,
        ).T


    def sf(self, hv):
        a, b = self.sigma_permutations
        return (hv * a + b) % self._large_prime

    def pf(self, hv) -> np.ndarray:
        a, b = self.pi_permutations
        return (hv * a + b) % self._large_prime

    # Another way of shifting the permutations.
    # The k offset is introduced at the end of the operation, instead of the beginning.
    # Here, "pia" holds (a * k + b % p) for all k.
    # https://github.com/beowolx/rensa/blob/95d80780f52f3105df4433132b279acd8c2762a0/src/cminhash.rs
    def pf2(self, hv, pia) -> np.ndarray:
        a, b = self.pi_permutations
        return (hv * a + pia) % self._large_prime

    def pf3(self, hv, off) -> np.ndarray:
        a, b = self.pi_permutations
        return (hv * (a - off) + b) % self._large_prime



    def update_batch_cminhash_pi_pi(self, b: Iterable) -> None:
        """Update this MinHash with new values.
        The values will be hashed using the hash function specified by
        the `hashfunc` argument in the constructor.

        """
        if self.c_minhash_rotations is None:
            raise ValueError("You need to specify c_minhash_rotations")

        hv = np.array([self.hashfunc(_b) for _b in b], dtype=np.uint64, ndmin=2).T
        phv = self.pf(hv)
        phv = self.pf3(phv, self.shift_right_precomputed)

        self.hashvalues = np.vstack([phv, self.hashvalues]).min(axis=0)


    def update_batch_cminhash_sigma_pi(self, b: Iterable) -> None:
        """Update this MinHash with new values.
        The values will be hashed using the hash function specified by
        the `hashfunc` argument in the constructor.

        """
        if self.c_minhash_rotations is None:
            raise ValueError("You need to specify c_minhash_rotations")

        hv = np.array([self.hashfunc(_b) for _b in b], dtype=np.uint64, ndmin=2).T
        phv = self.sf(hv)
        phv = self.pf3(phv, self.shift_right_precomputed)

        self.hashvalues = np.vstack([phv, self.hashvalues]).min(axis=0)


In [None]:

text_shinglesets = {}
col_shinglesets = defaultdict(set)

for fname, text in tqdm(df_dict.items()):
    cmh = CMinHashTest(num_perm=256, c_minhash_rotations=256)
    cmhp = CMinHashTest(num_perm=256, c_minhash_rotations=256)
    mh = MinHash(num_perm=256)

    ks = kshingle_manual(text)
    text_shinglesets[fname] = tuple(ks)
    max_buf=128
    for i in range(0, len(ks), max_buf):
        bufsize = min(len(ks)-i, max_buf)
        cmh.update_batch_cminhash_sigma_pi([e.encode("utf8") for e in ks[i:i+bufsize]])
        cmhp.update_batch_cminhash_pi_pi([e.encode("utf8") for e in ks[i:i+bufsize]])
    for (col_idx, col_name, col_value) in df_col_dict[fname]:
        cmh_col = CMinHashTest(num_perm=256, c_minhash_rotations=256)
        cmhp_col = CMinHashTest(num_perm=256, c_minhash_rotations=256)

        ks = kshingle_manual(col_value)
        cmh_col.update_batch_cminhash_sigma_pi([e.encode("utf8") for e in ks])
        cmhp_col.update_batch_cminhash_pi_pi([e.encode("utf8") for e in ks])
        cminhash_cols[fname].append(ColStruct((fname, col_idx), col_name ,cmh_col))
        cminhash_cols_pi[fname].append(ColStruct((fname, col_idx), col_name ,cmhp_col))
    cminhash_files[fname] = cmh
    cminhash_files_pi[fname] = cmhp


    for i in range(0, len(ks), max_buf):
        bufsize = min(len(ks)-i, max_buf)
        mh.update_batch([e.encode("utf8") for e in ks[i:i+bufsize]])
    for (col_idx, col_name, col_value) in df_col_dict[fname]:
        mh_col = MinHash(num_perm=256)
        ks = kshingle_manual(col_value)
        mh_col.update_batch([e.encode("utf8") for e in ks])
        minhash_cols[fname].append(ColStruct((fname, col_idx), col_name ,mh_col))

        col_shinglesets[fname].add(ColStruct((fname, col_idx), col_name, tuple(ks)))


    minhash_files[fname] = mh



In [None]:
import matplotlib.pyplot as plt

def jaccard_manual(set1, set2):
    set1 = set(set1)
    set2 = set(set2)

    intersection = set1.intersection(set2)
    union = set1.union(set2)

    if len(intersection) == 0:
        return 0.0
    return len(intersection) / len(union)


hash_scores = {}
chash_scores = {}
chash_scores_pi = {}
plain_scores = {}
for fname, content in minhash_files.items():
    hash_scores[fname] = {f: minhash_files[f].jaccard(content) for f in minhash_files.keys()}
for fname, content in cminhash_files.items():
    chash_scores[fname] = {f: cminhash_files[f].jaccard(content) for f in cminhash_files.keys()}

for fname, content in cminhash_files_pi.items():
    chash_scores_pi[fname] = {f: cminhash_files_pi[f].jaccard(content) for f in cminhash_files_pi.keys()}
for fname, content in text_shinglesets.items():
    plain_scores[fname] = {f: jaccard_manual(text_shinglesets[f], content) for f in text_shinglesets.keys()}

df = pd.DataFrame(hash_scores)
cdf = pd.DataFrame(chash_scores)
pdf = pd.DataFrame(plain_scores)
cpdf = pd.DataFrame(chash_scores_pi)

df.to_csv("outputs/hash_scores_dirty.csv", index=False)
cdf.to_csv("outputs/chash_scores_dirty.csv", index=False)
pdf.to_csv("outputs/plain_scores_dirty.csv", index=False)
cpdf.to_csv("outputs/chash_scores_pi_dirty.csv", index=False)



In [None]:
import numpy as np
import matplotlib.pyplot as plt

files = sorted(pdf.index)
n = len(files)
cols = 3
rows = (n + cols - 1) // cols

fig, axes = plt.subplots(rows, cols, figsize=(15, max(6, rows * 3)), sharey=True)
axes = axes.flatten() if n > 1 else [axes]

bar_width = 0.2
offsets = (-1.5*bar_width, -0.5*bar_width, 0.5*bar_width, 1.5*bar_width)

for idx, anchor in enumerate(files):
    ax = axes[idx]
    others = [f for f in files if f != anchor]
    x = np.arange(len(others))

    y_plain = [plain_scores[anchor][o] for o in others]
    y_mh   = [hash_scores[anchor][o] for o in others]
    y_cmh  = [chash_scores[anchor][o] for o in others]
    y_cmhp = [chash_scores_pi[anchor][o] for o in others]

    ax.bar(x + offsets[0], y_plain, width=bar_width, label='Plain')
    ax.bar(x + offsets[1], y_mh,   width=bar_width, label='MinHash')
    ax.bar(x + offsets[2], y_cmh,  width=bar_width, label='CMinHash')
    ax.bar(x + offsets[3], y_cmhp, width=bar_width, label='CMinHash-pi-pi')

    ax.set_title(anchor)
    ax.set_xticks(x)
    ax.set_xticklabels(others, rotation=90)
    ax.set_ylim(0, 1)
    if idx % cols == 0:
        ax.set_ylabel('Similarity')


# One legend for all subplots (same as before)
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc='upper center', ncol=4)

fig.suptitle('Per-File Pairwise Similarities (Plain vs MinHash vs CMinHash vs Fourth)', y=0.98)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()


Task: For each table, find the top-5 most similar columns across all tables using Jaccard similarity of 8-shingles computed from each column’s flattened string. Return a tidy DataFrame with: source_file, source_col, match_file, match_col, jaccard, ranked by jaccard descending per source column.

Use already computed minhash_files and cminhash_files to build tidy DataFrames of pairwise Jaccard similarities (files × files) and a long-form table. Do not recompute signatures.

Code for LSH

In [None]:
from dataclasses import dataclass, field
from datasketch.hashfunc import sha1_hash32

@dataclass
class LSH:
    threshold: float = None
    b: int = 4
    r: int = 64
    num_perm: int = 256
    seed: int = 1
    hashfunc = sha1_hash32
    metadata = []

    def __post_init__(self):
        self.init_hashvalues()

    def init_hashvalues(self):
        self.hashvalues = np.empty(shape = (0, self.b * self.r), dtype=np.uint64)

    def add(self, minhash_list: List[ColStruct]):
        self.threshold = (1/self.b)**(1/self.r)
        print(f"approximate threshhold: {self.threshold}")
        minhash_digests = np.array([minhash.value.digest() for minhash in minhash_list], dtype=np.uint64)
        minhash_metadata = [(minhash.index, minhash.col_name) for minhash in minhash_list]
        self.metadata.extend(minhash_metadata)

        if (self.b * self.r != len(minhash_digests[0])):
            raise Exception("Invalid shape")

        self.hashvalues = np.append(self.hashvalues, minhash_digests, axis=0)

    def query(self, col: ColStruct) -> List[MinHash]:
        minhash_digests = np.array([col.value.digest()], dtype=np.uint64)
        if (self.b * self.r != len(minhash_digests[0])):
            raise Exception("Invalid shape")

        # VERY IMPORTANT WE SPLIT BY r
        hashvalues_split = np.split(self.hashvalues, self.b, axis=1)
        col_split = np.split(minhash_digests, self.b, axis=1)

        candidates = set()
        for i in range(self.b):
            w = np.where(np.all(hashvalues_split[i] == col_split[i], axis=1))
            if w is None or w is []:
                continue
            carr = w[0]
            candidates.add(tuple([self.metadata[j] for j in carr]))
        return candidates


In [None]:
import json
lsh = LSH()

cmh_ci_exp = pd.DataFrame(cminhash_cols.values())
cmh_ci_exp = np.array(cmh_ci_exp).reshape(-1)
cmh_ci_exp = [c for c in cmh_ci_exp if c is not None]

lsh.add(cmh_ci_exp)

outs = {}

# For each table, print out all columns that are at least 97% similar
for i in range(len(cmh_ci_exp)):
    qr = lsh.query(cmh_ci_exp[i])
    if len(qr) > 1:
        print(cmh_ci_exp[i])
        outs[str(cmh_ci_exp[i])] = str(qr)
        print(qr)
        print()

json.dump(outs, open("./outputs/CMinHash_LSH_similar_rows_copy.json", "w"))


# Part 3: Cleaning Data - Ocean

By looking at the CSV files manually, we observe the following:

### Table 0
**Separator:** Underscore  
**Issues Found:** Multilingual column headers with translations separated by newlines
**Cleaning Applied:**
- `has_multilingual`: Extracts only English portion of column names by splitting on newline characters

---

### Table 1
**Separator:** Underscore  
**Issues Found:** Similar multilingual headers as Table 0, emoji presence in data  
**Cleaning Applied:**
- `has_multilingual`: Standardizes to English headers
- `has_emojis`: Strips emojis from both headers and data values

---

### Table 2
**Separator:** Comma
**Issues Found:** Multilingual and emojis
**Cleaning Applied:**
- `has_multilingual`: Ensures English-only headers
- `has_emojis`: Removes emojis from text fields

---

### Table 3
**Separator:** Underscore
**Issues Found:** BOM character corruption, excessive quotation marks  
**Cleaning Applied:**
- `has_bom`: Removes UTF-8 BOM characters that corrupt first column name
- `has_quotes`: Strips excessive quotes from values and headers

---

### Table 4
**Separator:** Comma
**Issues Found:** BOM character issues, emoji contamination  
**Cleaning Applied:**
- `has_bom`: Fixes first column corruption
- `has_emojis`: Cleans emoji characters

---

### Table 5
**Separator:** Comma
**Issues Found:** BOM character, malformed first column name  
**Cleaning Applied:**
- `has_bom`: Removes BOM character
- `fix_first_col='REF_DATE'`: Explicitly corrects the first column name

---

### Table 6
**Separator:** Comma
**Issues Found:** Missing headers entirely, there are tables that seem to be related but cannot find any with corresponding headers that we could use.  
**Cleaning Applied:**
- `no_headers`: Loads without headers (assigns numeric column names)

---

### Table 7
**Separator:** Comma   
**Issues Found:** Scrambled/corrupted headers but data intact  
**Cleaning Applied:** None, don't know how to unscramble headers besides brute forcing each header

---

### Table 8
**Separator:** Underscore
**Issues Found:** Duplicate header names repeated within single cells, emojis  
**Cleaning Applied:**
- `has_duplicate_headers`: Detects repeated words in headers (e.g., "Name Name Name" → "Name")
- `has_emojis`: Removes emoji characters

---

### Table 9
**Separator:** Underscore
**Issues Found:** Excessive quotes, emoji contamination  
**Cleaning Applied:**
- `has_quotes`: Removes quote artifacts
- `has_emojis`: Strips emojis

---

### Table 10
**Separator:** Comma
**Issues Found:** No headers, same structure as 12 and 11
**Cleaning Applied:**
- `use_headers_from='table_12.csv'`: Borrows headers from Table 12

---

### Table 11
**Separator:** Comma
**Issues Found:** No headers, but structure matches Table 12  
**Cleaning Applied:**
- `no_headers`: Loads without headers initially
- `has_emojis`: Removes emojis from data
- `use_headers_from='table_12.csv'`: Borrows headers from Table 12

---

### Table 12
**Separator:** Underscore
**Issues Found:** Non-standard header format requiring custom parsing, emojis  
**Cleaning Applied:**
- `special_header`: Uses custom extraction logic for malformed header line
- `has_emojis`: Cleans emoji characters


---

### Table 13
**Separator:** Comma  
**Issues Found:** None

---

### Table 14
**Separator:** Tab (\t)  
**Issues Found:** Uses tab separator instead of standard CSV  
**Cleaning Applied:** None, just separator specification

---

### Table 15
**Separator:** Underscore  
**Issues Found:** Quote artifacts, emoji contamination  
**Cleaning Applied:**
- `has_quotes`: Removes excessive quoting
- `has_emojis`: Strips emoji characters

---

### Table 16
**Separator:** Underscore  
**Issues Found:** Similar to Table 15 - quotes and emojis  
**Cleaning Applied:**
- `has_quotes`: Quote removal
- `has_emojis`: Emoji stripping

---

### Table 17
**Separator:** Underscore   
**Issues Found:** Duplicate headers, emoji presence  
**Cleaning Applied:**
- `has_duplicate_headers`: Fixes repeated header names
- `has_emojis`: Removes emojis

---

### Table 18
**Separator:** Underscore
**Issues Found:** Has data but malformed headers, structure matches Table 17  
**Cleaning Applied:**
- `skip_header`: Skips the corrupted header row
- `has_emojis`: Cleans emojis
- `use_headers_from='table_17.csv'`: Borrows headers from Table 17

---

### Processing order

The order for cleaning tables is important because some tables are almost the same, and in case there are missing headers, we can inherit them from another similar table that does have the headers.

#### 1. Independent Tables First (Tables 0-9, 12-17)
- These tables have their own headers (even if corrupted)
- Must be processed first to establish clean header sets
- Table 12 specifically must be processed before Table 11
- Table 17 must be processed before Table 18

#### 2. Dependent Tables Second (Tables 11, 18)
- **Table 11** depends on **Table 12**: Borrows the cleaned headers after Table 12's special header extraction
- **Table 18** depends on **Table 17**: Uses Table 17's deduplicated headers after skipping its own corrupted header row

### Dependency Rationale

**Table 12 → Table 11 Dependency**
- Table 11 has no headers but identical structure to Table 12
- Likely these are split parts of the same dataset or parallel data collections
- Table 12's custom header parsing must complete first

**Table 17 → Table 18 Dependency**
- Table 18's headers are scrambled
- Column count and data types match Table 17 exactly
- Indication these are related, possibly temporal splits or data segments

---

## Common data issues in lake33

- **BOM characters** (Tables 3, 4, 5)
- **Emojis** (11 tables)
- **Multilingual content** (Tables 0, 1, 2)
- **Missing headers** (Tables 6, 11)
- **Duplicate headers** (Tables 8, 17)
- **Special formats** (Table 12)
- Several use underscore while comma is the standard
- One uses tab (Table 14)




In [None]:
class datacleaner:
    """Cleaning code for all lake33 tables"""
    def __init__(self, base_dir: str = './lake33', output_dir: str = './cleaned'):
        self.base_dir = base_dir
        self.output_dir = output_dir
        self.cleaned_datasets = {}

        # Compact table configurations using sets for boolean flags
        self.configs = {
            'table_0.csv': ('_', {'has_multilingual', 'has_emojis'}),
            'table_1.csv': ('_', {'has_multilingual', 'has_emojis'}),
            'table_2.csv': (',', {'has_multilingual', 'has_emojis'}),
            'table_3.csv': ('_', {'has_bom', 'has_quotes'}),
            'table_4.csv': (',', {'has_bom', 'has_emojis'}),
            'table_5.csv': (',', {'has_bom'}, 'REF_DATE'),
            'table_6.csv': (',', {'no_headers'}),
            'table_7.csv': (',', set()),
            'table_8.csv': ('_', {'has_duplicate_headers', 'has_emojis'}),
            'table_9.csv': ('_', {'has_quotes', 'has_emojis'}),
            'table_11.csv': (',', {'no_headers', 'has_emojis'}, None, 'table_12.csv'),
            'table_12.csv': ('_', {'special_header', 'has_emojis'}),
            'table_13.csv': (',', set()),
            'table_14.csv': ('\t', set()),
            'table_15.csv': ('_', {'has_quotes', 'has_emojis'}),
            'table_16.csv': ('_', {'has_quotes', 'has_emojis'}),
            'table_17.csv': ('_', {'has_duplicate_headers', 'has_emojis'}),
            'table_18.csv': ('_', {'skip_header', 'has_emojis'}, None, 'table_17.csv'),
            'table_10.csv': (',', set(), None, 'table_12.csv'),}

    def clean_text(self, text, rm_emoji: bool = False) -> str:
        #Clean text values: remove emojis, quotes, and normalize whitespace
        if pd.isna(text) or isinstance(text, (int, float)):
            return text

        text = str(text).strip('[]')

        if rm_emoji:
            text = ''.join(c for c in text if ord(c) < 128)
            text = re.sub(r'[\[\]\'"]', '', text)

        text = re.sub(r'\s+', ' ', text).strip()
        parts = text.split()

        if len(parts) > 1 and len(set(parts)) == 1:
            return parts[0]
        return 'MSNG' if not text and 'MSNG' in str(text) else text or str(text)

    def clean_col(self, col: str, flags: set) -> str:
        #Clean column name based on flags
        col = str(col)

        if 'has_bom' in flags:
            col = col.replace('\ufeff', '').replace('ï»¿', '')
            col = ''.join(c for c in col if ord(c) >= 32 and ord(c) != 8203)

        if 'has_quotes' in flags or 'has_emojis' in flags:
            col = re.sub(r'[\[\]"\']', '', col)

        if 'has_multilingual' in flags:
            for d in ['\\\\n', '\\n', '\n']:
                if d in col:
                    col = col.split(d)[0]
                    break

        if 'has_emojis' in flags:
            col = ''.join(c for c in col if ord(c) < 128)

        col = re.sub(r'\s+', ' ', col).strip()

        if 'has_duplicate_headers' in flags:
            parts = col.split()
            if len(parts) >= 2 and all(parts[0] == p for p in parts[1:]):
                col = parts[0]
        return col or 'unnamed_column'

    def get_t12_headers(self, path: str) -> List[str]:
        """Special header extraction for table 12"""
        with open(path, 'r', encoding='utf-8', errors='ignore') as f:
            line = f.readline().strip()
        return [p.replace('"', '').replace('_', ' ').strip()
                for p in line.split('_"') if p.replace('"', '').replace('_', ' ').strip()]

    def load_table(self, name: str) -> Optional[pd.DataFrame]:
        path = os.path.join(self.base_dir, name)
        if not os.path.exists(path):
            return None

        cfg = self.configs.get(name, (',', set()))
        sep = cfg[0]
        flags = cfg[1] if len(cfg) > 1 else set()
        try:
            kw = {'low_memory': False, 'on_bad_lines': 'skip'}

            if 'no_headers' in flags:
                return pd.read_csv(path, header=None, **kw)
            elif 'special_header' in flags and name == 'table_12.csv':
                headers = self.get_t12_headers(path)
                df = pd.read_csv(path, sep=sep, skiprows=1, header=None, **kw)
                if len(df.columns) == len(headers):
                    df.columns = headers
                return df
            elif 'skip_header' in flags:
                return pd.read_csv(path, sep=sep, skiprows=1, header=None, **kw)
            else:
                return pd.read_csv(path, sep=sep, **kw)
        except:
            try:
                return pd.read_csv(path, **kw)
            except:
                return None

    def process(self, name: str) -> Optional[pd.DataFrame]:
        """Process a single table with all necessary cleaning"""
        print(f"Processing {name}...", end=' ')
        df = self.load_table(name)
        if df is None:
            print("Failed")
            return None
        cfg = self.configs.get(name, (',', set()))
        flags = cfg[1] if len(cfg) > 1 else set()

        #handle headers from other tables
        if len(cfg) > 3 and cfg[3]:
            src = cfg[3]
            if src in self.cleaned_datasets:
                src_cols = self.cleaned_datasets[src].columns
                if len(df.columns) == len(src_cols):
                    df.columns = src_cols
        else:
            df.columns = [self.clean_col(c, flags) for c in df.columns]

        #fix first column if specified
        if len(cfg) > 2 and cfg[2]:
            cols = df.columns.tolist()
            cols[0] = cfg[2]
            df.columns = cols

        if 'has_emojis' in flags or 'has_quotes' in flags:
            for i, col in enumerate(df.columns):
                if df.iloc[:, i].dtype == 'object':
                    df.iloc[:, i] = df.iloc[:, i].apply(
                        lambda x: self.clean_text(x, 'has_emojis' in flags))

        df = df.drop_duplicates()
        print(f"{df.shape[0]} rows, {df.shape[1]} cols")
        return df

    def clean_all(self) -> Dict[str, pd.DataFrame]:
        """Main method to clean all tables"""
        os.makedirs(self.output_dir, exist_ok=True)
        tables = [f for f in os.listdir(self.base_dir)
                 if f.endswith('.csv')]

        #process independent tables first, then dependent ones
        indep = [t for t in tables if len(self.configs.get(t, ())) <= 3 or not self.configs.get(t)[3]]
        dep = [t for t in tables if len(self.configs.get(t, ())) > 3 and self.configs.get(t)[3]]

        print(f"Cleaning {len(tables)} tables")
        print("-" * 40)

        for table in indep + dep:
            df = self.process(table)
            if df is not None:
                self.cleaned_datasets[table] = df
                df.to_csv(os.path.join(self.output_dir, f"cleaned_{table}"), index=False)

        print("-" * 40)
        print(f"Completed: {len(self.cleaned_datasets)} tables cleaned\n")
        return self.cleaned_datasets

    def report(self) -> dict:
        report = {
            'summary': {
                'total_tables_processed': len(self.cleaned_datasets),
                'output_directory': self.output_dir
            },
            'table_details': {}}

        for name, df in self.cleaned_datasets.items():
            cfg = self.configs.get(name, (',', set()))
            flags = list(cfg[1]) if len(cfg) > 1 else []

            details = {
                'shape': df.shape,
                'separator': cfg[0],
                'cleaning_applied': flags.copy(),
                'columns': df.columns.tolist()[:10]}

            if len(cfg) > 2 and cfg[2]:
                details['cleaning_applied'].append(f'fix_first_col={cfg[2]}')
            if len(cfg) > 3 and cfg[3]:
                details['cleaning_applied'].append(f'use_headers_from={cfg[3]}')

            report['table_details'][name] = details

        with open(os.path.join(self.output_dir, 'cleaning_report.json'), 'w') as f:
            json.dump(report, f, indent=2, default=str)

        return report

def clean_lake33_tables(base_dir='./lake33', output_dir='./cleaned'):
    cleaner = datacleaner(base_dir, output_dir)
    cleaned_data = cleaner.clean_all()
    report = cleaner.report()
    return cleaned_data, report

if __name__ == "__main__":
    cleaned_datasets, cleaning_report = clean_lake33_tables()

# Part 4: running discovery methods on clean data


In [None]:
# Reading data from the cleaned tables. Since tables 6,7 and 10 have been removed, we no longer attempt to read them.
tables_clean = []
for i in range(0, 19):
    if i in [6,7,10]:
        continue
    ct = read_csv(f'lake33c/cleaned_table_{i}.csv')
    ct.name = f'cleaned_table_{i}.csv'
    tables_clean.append(ct)

result_sc_clean = set_containment_method(tables_clean)
result_sc_clean.to_csv('outputs/set_containment_results_clean.csv', encoding='utf-8', index=False, header=True)

result_cn_clean = column_names(tables_clean)
result_cn_clean.to_csv('outputs/column_name_results_clean.csv', encoding='utf-8', index=False, header=True)

result_josie_clean = josie(tables_clean)
result_josie_clean.to_csv('outputs/josie_results_clean.csv', encoding='utf-8', index=False, header=True)

## Part 4: Clean Data Discovery MinHash - Berken Tekin

In [None]:
csv_path = "./lake33c"

In [None]:
df_dict = {}
df_col_dict = defaultdict(list)

error_count_file = 0
for root, dirs, files in os.walk(csv_path):
    for csv_file in files:
        if os.path.splitext(csv_file)[1] == ".csv":
            file_path = os.path.join(root, csv_file)
            try:
                # Naive read_csv() as mentionmed ion the report
                df = pd.read_csv(file_path, sep=None, engine='python', on_bad_lines='skip', header=0)
                df = df.dropna().astype(object)
                # print(df)
            except Exception as e:
                continue
            # Serialize whole file (flattened)
            df_str = df.to_numpy().flatten()
            df_str = " ".join(str(x) for x in df_str).replace("\n", " ")
            df_str = re.sub(r"\s+", " ", df_str).strip()
            df_dict[csv_file] = df_str

            # Serialize each column for this file
            for col_idx, col in enumerate(df.columns):
                col_arr = df[col].to_numpy().flatten()
                col_str = " ".join(str(x) for x in col_arr).replace("\n", " ")
                col_str = re.sub(r"\s+", " ", col_str).strip()
                df_col_dict[csv_file].append(ColStruct(index=col_idx, col_name=str(col), value=col_str))

print(error_count_file)

Recalculate MinHash for clean datasets

In [None]:

text_shinglesets = {}
col_shinglesets = defaultdict(set)

for fname, text in tqdm(df_dict.items()):
    cmh = CMinHashTest(num_perm=256, c_minhash_rotations=256)
    cmhp = CMinHashTest(num_perm=256, c_minhash_rotations=256)
    mh = MinHash(num_perm=256)

    ks = kshingle_manual(text)
    text_shinglesets[fname] = tuple(ks)
    max_buf=128
    for i in range(0, len(ks), max_buf):
        bufsize = min(len(ks)-i, max_buf)
        cmh.update_batch_cminhash_sigma_pi([e.encode("utf8") for e in ks[i:i+bufsize]])
        cmhp.update_batch_cminhash_pi_pi([e.encode("utf8") for e in ks[i:i+bufsize]])
    for (col_idx, col_name, col_value) in df_col_dict[fname]:
        cmh_col = CMinHashTest(num_perm=256, c_minhash_rotations=256)
        cmhp_col = CMinHashTest(num_perm=256, c_minhash_rotations=256)

        ks = kshingle_manual(col_value)
        cmh_col.update_batch_cminhash_sigma_pi([e.encode("utf8") for e in ks])
        cmhp_col.update_batch_cminhash_pi_pi([e.encode("utf8") for e in ks])
        cminhash_cols[fname].append(ColStruct((fname, col_idx), col_name ,cmh_col))
        cminhash_cols_pi[fname].append(ColStruct((fname, col_idx), col_name ,cmhp_col))
    cminhash_files[fname] = cmh
    cminhash_files_pi[fname] = cmhp


    for i in range(0, len(ks), max_buf):
        bufsize = min(len(ks)-i, max_buf)
        mh.update_batch([e.encode("utf8") for e in ks[i:i+bufsize]])
    for (col_idx, col_name, col_value) in df_col_dict[fname]:
        mh_col = MinHash(num_perm=256)
        ks = kshingle_manual(col_value)
        mh_col.update_batch([e.encode("utf8") for e in ks])
        minhash_cols[fname].append(ColStruct((fname, col_idx), col_name ,mh_col))

        col_shinglesets[fname].add(ColStruct((fname, col_idx), col_name, tuple(ks)))


    minhash_files[fname] = mh



In [None]:
import matplotlib.pyplot as plt

def jaccard_manual(set1, set2):
    set1 = set(set1)
    set2 = set(set2)

    intersection = set1.intersection(set2)
    union = set1.union(set2)

    if len(intersection) == 0:
        return 0.0
    return len(intersection) / len(union)


hash_scores = {}
chash_scores = {}
chash_scores_pi = {}
plain_scores = {}
for fname, content in minhash_files.items():
    hash_scores[fname] = {f: minhash_files[f].jaccard(content) for f in minhash_files.keys()}
for fname, content in cminhash_files.items():
    chash_scores[fname] = {f: cminhash_files[f].jaccard(content) for f in cminhash_files.keys()}

for fname, content in cminhash_files_pi.items():
    chash_scores_pi[fname] = {f: cminhash_files_pi[f].jaccard(content) for f in cminhash_files_pi.keys()}
for fname, content in text_shinglesets.items():
    plain_scores[fname] = {f: jaccard_manual(text_shinglesets[f], content) for f in text_shinglesets.keys()}

df = pd.DataFrame(hash_scores)
cdf = pd.DataFrame(chash_scores)
pdf = pd.DataFrame(plain_scores)
cpdf = pd.DataFrame(chash_scores_pi)

df.to_csv("outputs/hash_scores_clean.csv", index=False)
cdf.to_csv("outputs/chash_scores_clean.csv", index=False)
pdf.to_csv("outputs/plain_scores_clean.csv", index=False)
cpdf.to_csv("outputs/chash_scores_pi_clean.csv", index=False)



In [None]:
import numpy as np
import matplotlib.pyplot as plt

files = sorted(pdf.index)
n = len(files)
cols = 3
rows = (n + cols - 1) // cols

fig, axes = plt.subplots(rows, cols, figsize=(15, max(6, rows * 3)), sharey=True)
axes = axes.flatten() if n > 1 else [axes]

bar_width = 0.2
offsets = (-1.5*bar_width, -0.5*bar_width, 0.5*bar_width, 1.5*bar_width)

for idx, anchor in enumerate(files):
    ax = axes[idx]
    others = [f for f in files if f != anchor]
    x = np.arange(len(others))

    y_plain = [plain_scores[anchor][o] for o in others]
    y_mh   = [hash_scores[anchor][o] for o in others]
    y_cmh  = [chash_scores[anchor][o] for o in others]
    y_cmhp = [chash_scores_pi[anchor][o] for o in others]

    ax.bar(x + offsets[0], y_plain, width=bar_width, label='Plain')
    ax.bar(x + offsets[1], y_mh,   width=bar_width, label='MinHash')
    ax.bar(x + offsets[2], y_cmh,  width=bar_width, label='CMinHash')
    ax.bar(x + offsets[3], y_cmhp, width=bar_width, label='CMinHash-pi-pi')

    ax.set_title(anchor)
    ax.set_xticks(x)
    ax.set_xticklabels(others, rotation=90)
    ax.set_ylim(0, 1)
    if idx % cols == 0:
        ax.set_ylabel('Similarity')


# One legend for all subplots (same as before)
handles, labels = axes[0].get_legend_handles_labels()
fig.legend(handles, labels, loc='upper center', ncol=4)

fig.suptitle('Per-File Pairwise Similarities (Plain vs MinHash vs CMinHash vs Fourth)', y=0.98)
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()


Task: For each table, find the top-5 most similar columns across all tables using Jaccard similarity of 8-shingles computed from each column’s flattened string. Return a tidy DataFrame with: source_file, source_col, match_file, match_col, jaccard, ranked by jaccard descending per source column.

Use already computed minhash_files and cminhash_files to build tidy DataFrames of pairwise Jaccard similarities (files × files) and a long-form table. Do not recompute signatures.

Code for LSH

In [None]:
from dataclasses import dataclass, field
from datasketch.hashfunc import sha1_hash32

@dataclass
class LSH:
    threshold: float = None
    b: int = 4
    r: int = 64
    num_perm: int = 256
    seed: int = 1
    hashfunc = sha1_hash32
    metadata = []

    def __post_init__(self):
        self.init_hashvalues()

    def init_hashvalues(self):
        self.hashvalues = np.empty(shape = (0, self.b * self.r), dtype=np.uint64)

    def add(self, minhash_list: List[ColStruct]):
        self.threshold = (1/self.b)**(1/self.r)
        print(f"approximate threshhold: {self.threshold}")
        minhash_digests = np.array([minhash.value.digest() for minhash in minhash_list], dtype=np.uint64)
        minhash_metadata = [(minhash.index, minhash.col_name) for minhash in minhash_list]
        self.metadata.extend(minhash_metadata)

        if (self.b * self.r != len(minhash_digests[0])):
            raise Exception("Invalid shape")

        self.hashvalues = np.append(self.hashvalues, minhash_digests, axis=0)

    def query(self, col: ColStruct) -> List[MinHash]:
        minhash_digests = np.array([col.value.digest()], dtype=np.uint64)
        if (self.b * self.r != len(minhash_digests[0])):
            raise Exception("Invalid shape")

        # VERY IMPORTANT WE SPLIT BY r
        hashvalues_split = np.split(self.hashvalues, self.b, axis=1)
        col_split = np.split(minhash_digests, self.b, axis=1)

        candidates = set()
        for i in range(self.b):
            w = np.where(np.all(hashvalues_split[i] == col_split[i], axis=1))
            if w is None or w is []:
                continue
            carr = w[0]
            candidates.add(tuple([self.metadata[j] for j in carr]))
        return candidates


In [None]:
import json
lsh = LSH()

cmh_ci_exp = pd.DataFrame(cminhash_cols.values())
cmh_ci_exp = np.array(cmh_ci_exp).reshape(-1)
cmh_ci_exp = [c for c in cmh_ci_exp if c is not None]

lsh.add(cmh_ci_exp)

outs = {}

# For each table, print out all columns that are at least 97% similar
for i in range(len(cmh_ci_exp)):
    qr = lsh.query(cmh_ci_exp[i])
    if len(qr) > 1:
        print(cmh_ci_exp[i])
        outs[str(cmh_ci_exp[i])] = str(qr)
        print(qr)
        print()

json.dump(outs, open("./outputs/CMinHash_LSH_similar_rows_copy_clean.json", "w"))
