# Hybrid template based schema mapping pipeline

This notebook standardizes tenancy schedule data using a hybrid schema- and instance-based scoring model. It matches input CSV columns to a target schema(in YAML format) using both:

- Schema-level metrics (meta-data related: header names, synonyms, token overlap)
- Instance-level metrics (field values related: data types, numeric distributions, dates)

Steps:
- Configuration / some utility functions
- Load schema
- Define scoring metrics (seperated into schema and instance based)
- Map raw columns to schema
- Evaluate performance of the mapping against a ground truth file
- Run steps of pipeline all at once
- 
This is desgined to process multiple CSV files of tenancy schedules of different formats and match the columns to a target attribute.

## Config

General config paths/parameters and all imports required

The metric for matching column to attribute are seperated into schema and instance based
The alpha parm is the blend value between the importance of schema vs instance based in the formula:
    alpha * s_schema + (1 - alpha) * s_inst
Threshold is the minimal required confidence to map a column to a attribute

In [139]:
import re, unicodedata, yaml
import numpy as np
import pandas as pd
import itertools
import json
import multiprocessing as mp
import logging
import copy

from functools import lru_cache
from pathlib import Path
from typing import List, Dict, Tuple, Callable
from pandas import Series, DataFrame
from rapidfuzz import fuzz
from rapidfuzz.distance import Levenshtein as Lev
from scipy.stats import ks_2samp
from scipy.optimize import linear_sum_assignment
from copy import deepcopy
from joblib import Parallel, delayed
from collections import defaultdict
from scipy.stats import wilcoxon

CFG = {
    "schema_path":  Path(r"C:\Users\tim\Documents\THESIS\Data\target_schema_complete_repaired - full.yaml"),
    "src_dir":      Path(r"C:/Users/tim/Documents/THESIS/Data/SCHEDULES"),
    "out_dir":      Path(r"C:/Users/tim/Documents/THESIS/Data/processed"),
    "ground_truth": Path(r"C:/Users/tim/Documents/THESIS/Data/GROUND_TRUTH - mapping.csv"),

    "grid_size": 4,
    "alpha":     0.5,
    "threshold": 0.5,
    "schema_w": {
        "schema_synonym":     1.0,
        "schema_jaccard":     1.0,
        "schema_levenshtein": 1.0,
    },

    "instance_w": {
        "instance_ks":   1.0,
        "instance_type": 1.0,
        "instance_dist": 1.0,
        "instance_date": 1.0,
    }
}

CFG["out_dir"].mkdir(parents=True, exist_ok=True)

## Utility functions

Basic function to help with the input data.
Clean normalizes text and normalizes unicode.
Extract_numeric function will extract the numeric values from more messy strings such as :'€ 249.492'or '3rd, 4th'

In [54]:
clean  = lambda t: re.sub(r"[^\w\s]", " ", unicodedata.normalize("NFKC", str(t).lower())).strip()
DATE_RX = re.compile(r"\b(?:\d{1,2}[-/]\d{1,2}[-/]\d{2,4}|\d{4}[-/]\d{1,2}[-/]\d{1,2})\b")
NUM_RX  = re.compile(r"[-+]?\d*\.?\d+")

def extract_numeric(values: pd.Series):
    s = (values.dropna()
                .str.replace('[€]', '', regex=True)
                .str.replace('.', '', regex=False)
                .str.replace(',', '.', regex=False))
    return pd.to_numeric(s, errors='coerce').dropna()

The YAML file is is based on the Data Requirements Dccument of CBRE and represents our target schema.
This file contains all target attributes and information such as: Data types / synonyms / IQR / mean (when available)

In [55]:
TARGET_SCHEMA: List[dict] = yaml.safe_load(CFG["schema_path"].read_text())["attributes"]
for a in TARGET_SCHEMA:
    a["name_clean"] = clean(a["name"])
    a["syn_clean"]  = [clean(s) for s in a.get("synonyms", [])]

TARGET_IDS = [a["id"] for a in TARGET_SCHEMA]
ATTR = {a["id"]: a for a in TARGET_SCHEMA}


## Schema based metrics

These metrics are used in determining similarity between column and target attribute based on meta-data information such as name of column and synonyms.
Synonym match is a fuzzy token similarity between synonyms and coumn name (takes the best synonym match).
Jaccard overlap of tokens is used and normalized levenshtein distance.

In [56]:
def schema_synonym(h: str, aid: str):
    a = ATTR[aid]
    txt = clean(h)
    choices = [a["name_clean"]] + a["syn_clean"]
    return max(fuzz.token_set_ratio(txt, c) for c in choices) / 100


def schema_jaccard(h: str, aid: str):
    a = ATTR[aid]
    s1 = set(clean(h).split())
    best = 0.0
    for s in [a["name_clean"]] + a["syn_clean"]:
        s2 = set(s.split())
        if s1 or s2:
            best = max(best, len(s1 & s2) / len(s1 | s2))
    return best


def schema_levenshtein(h: str, aid: str):
    a = ATTR[aid]
    txt = clean(h)
    return max(Lev.normalized_similarity(txt, n) for n in [a["name_clean"]] + a["syn_clean"])


SCHEMA_METRICS = {
    "schema_synonym":     schema_synonym,
    "schema_jaccard":     schema_jaccard,
    "schema_levenshtein": schema_levenshtein,
}

## Instance based metrics

These functions/metric compare the actual values in the column fields to expectation of the target schema attribute.

instance_type checks if the columns look like a numeric column total_rent columns will mostly contain numbers and some possible currency character. Floor columns can be filled with '4rd , 5th' as a cell value and thus be partially numeric filled.

instance_date does the same thing but tries to determine if a column is a date value. We regex compare the cells to date formats.

instance_ks compares the numeric distribution using the KS test.

instance_dist compares the mean and IQR of the column against the target attribute values.


In [57]:
def instance_type(_, col: Series, aid: str):
    a = ATTR[aid]
    if col.dropna().empty:
        return 0.0
    has_digit    = col.dropna().astype(str).str.contains(r"\d").mean()
    starts_digit = col.dropna().astype(str).str.match(r"^\s*\d").mean()
    numeric_like = 0.7 * has_digit + 0.3 * starts_digit
    return numeric_like if a["data_type"] == "decimal" else 1 - numeric_like


def instance_date(_, col: Series, aid: str):
    a = ATTR[aid]
    if a["data_type"] != "date" or col.dropna().empty:
        return 0.0
    parsed = col.dropna().astype(str).apply(lambda v: bool(DATE_RX.search(v))).mean()
    return round(parsed, 4)


def instance_ks(_, col: Series, aid: str):
    a = ATTR[aid]
    if a["data_type"] != "decimal":
        return None
    x = extract_numeric(col)
    if len(x) < 15:
        return None
    if "ref_sample" in a:
        ref = pd.Series(a["ref_sample"], dtype=float)
    elif {"mean", "iqr"} <= a.keys():
        mu = a["mean"]
        sigma = (a["iqr"]["q3"] - a["iqr"]["q1"]) / 1.349
        rng = np.random.default_rng(0)
        ref = pd.Series(rng.normal(mu, sigma, len(x)))
    else:
        return None
    stat, _ = ks_2samp(x, ref)
    return round(1 - stat, 4)


def instance_dist(_, col: Series, aid: str):
    a = ATTR[aid]
    if a["data_type"] != "decimal" or {"mean", "iqr"} - a.keys():
        return None
    x = extract_numeric(col)
    if x.empty:
        return None
    mu_t, q1, q3 = a["mean"], a["iqr"]["q1"], a["iqr"]["q3"]
    iqr = q3 - q1 if q3 != q1 else 1e-6
    dist = abs(x.mean() - mu_t) / iqr
    in_iqr = x.between(q1, q3).mean()
    return round(0.5 * max(0, 1 - dist) + 0.5 * in_iqr, 4)


INSTANCE_METRICS = {
    "instance_type": instance_type,
    "instance_date": instance_date,
    "instance_ks":   instance_ks,
    "instance_dist": instance_dist
}

## Hybrid scoring functino

The function which combines the schema based and instance based scores using a weighted average.
The formula is the same as given in the thesis: alpha parameter controls overall balance between schema/instance.

In [110]:
def weighted(metrics, weights, args):
    score = 0.0
    total_weight = 0.0

    for name, func in metrics.items():
        weight = weights.get(name, 0.0)
        if weight == 0:
            continue
        value = func(*args)
        if value is None:
            continue
        score += weight * value
        total_weight += weight

    return score / total_weight if total_weight else 0.0

def hybrid_score(header, col, aid, weight_tbl=None):
    if weight_tbl is None:
        schema_w = CFG["schema_w"]
        instance_w = CFG["instance_w"]
    else:
        schema_w = weight_tbl[aid]
        instance_w = weight_tbl[aid]

    schema_score = weighted(SCHEMA_METRICS, schema_w, (header, aid))
    instance_score = weighted(INSTANCE_METRICS, instance_w, (None, col, aid))

    alpha = CFG["alpha"]
    return alpha * schema_score + (1 - alpha) * instance_score


## Column to target attribute mapping


For every column in raw tenancy schedule we (potentially) map it to a target attribute by:
- Compute score for all metrics to all target attriobutes.
- Select best scoring attribte
- Use the hungarian algorithmm to enforce global optimal mapping and one-to-one mapping (Total rent could have very high scores in mapping to total_rent and total_area but we only want to map it to that which gives us global optimal score).
- Rename and reorder columns according to target schema

The result/return is the standardized tenancy schedule with a DataFrame and mapping dictionary

In [111]:
def map_dataframe(df_raw, src_name, weight_tbl=None):
    raw_cols = list(df_raw.columns)
    score_mat = np.empty((len(raw_cols), len(TARGET_IDS)), dtype=float)

    for i, raw in enumerate(raw_cols):
        for j, aid in enumerate(TARGET_IDS):
            score_mat[i, j] = hybrid_score(raw, df_raw[raw], aid, weight_tbl)

    row_idx, col_idx = linear_sum_assignment(-score_mat)

    mapping = {
        raw_cols[i]: (TARGET_IDS[j], score_mat[i, j])
        for i, j in zip(row_idx, col_idx)
        if score_mat[i, j] >= CFG["threshold"]
    }

    dbg_df = pd.DataFrame([
        {
            "raw_column": raw,
            "target_attr": attr,
            "score": score,
            "accepted": score >= CFG["threshold"]
        }
        for raw, (attr, score) in mapping.items()
    ])

    std_df = df_raw.rename(columns={r: aid for r, (aid, _) in mapping.items()})
    std_df = std_df.reindex(columns=TARGET_IDS)
    std_df["__src__"] = src_name

    if not CFG.get("opt_mode", False):
        out_path = CFG["out_dir"] / f"{Path(src_name).stem}_debug.xlsx"
        dbg_df.to_excel(out_path, index=False)

    return std_df, dbg_df


## Evalution

Compare predicted mapping/standardised schedule to manually created ground truth using true positives (TP), false positives (FP), false negatives (FN). With this we calculate precision/recall and F1

In [112]:
def collect_predictions(out_dir):
    rows = []

    for file in out_dir.glob("*_debug.xlsx"):
        try:
            dbg = pd.read_excel(file)
        except:
            continue

        if not {"raw_column", "target_attr"}.issubset(dbg.columns):
            continue

        accepted = dbg if "accepted" not in dbg.columns else dbg[dbg["accepted"]]
        fmt = re.split(r"[_\-]", file.stem.replace("_debug", ""))[0].upper()

        for _, row in accepted.iterrows():
            rows.append({
                "FORMAT": fmt,
                "RAW_HEADER": row["raw_column"],
                "PREDICTED_ATTR": row["target_attr"],
                "FILE_NAME": file.name.replace("_debug.xlsx", ".csv")
            })

    if rows:
        pred = pd.DataFrame(rows)
        pred["RAW_HEADER_CLEAN"] = pred["RAW_HEADER"].apply(clean)
    else:
        pred = pd.DataFrame(columns=[
            "FORMAT", "RAW_HEADER", "PREDICTED_ATTR", "FILE_NAME", "RAW_HEADER_CLEAN"
        ])

    return pred


In [141]:
def evaluate(verbose=True):
    gt = pd.read_csv(CFG["ground_truth"])
    gt = gt[gt["TARGET_ATTRIBUTE"] != "omit"].copy()
    gt["RAW_HEADER_CLEAN"] = gt["RAW_HEADER"].apply(clean)

    pred = collect_predictions(CFG["out_dir"])

    merged = pred.merge(
        gt[["FORMAT", "RAW_HEADER_CLEAN", "TARGET_ATTRIBUTE"]],
        on=["FORMAT", "RAW_HEADER_CLEAN"],
        how="outer",
        indicator=True,
    )

    def label(row):
        if row["_merge"] == "both":
            return "TP" if row["PREDICTED_ATTR"] == row["TARGET_ATTRIBUTE"] else "FP"
        return "FP" if row["_merge"] == "left_only" else "FN"

    merged["STATUS"] = merged.apply(label, axis=1)

    tp = (merged["STATUS"] == "TP").sum()
    fp = (merged["STATUS"] == "FP").sum()
    fn = (merged["STATUS"] == "FN").sum()

    precision = tp / (tp + fp) if tp + fp else 0.0
    recall    = tp / (tp + fn) if tp + fn else 0.0
    f1        = 2 * precision * recall / (precision + recall) if (precision + recall) else 0.0

    if verbose:
        print(f"True positives : {tp}")
        print(f"False positives: {fp}")
        print(f"False negatives: {fn}")
        print(f"Precision      : {precision}")
        print(f"Recall         : {recall}")
        print(f"F1 score       : {f1}")

        merged.to_csv(CFG["out_dir"] / "prediction_vs_truth.csv", index=False)

    return f1


## Run pipeline

This block processes all CSV files in input folder, applies column mapping logic and outputs the standardised schedule. Finally we evaluate te results using the evaluation block.

In [119]:
def run():
    std_tables: List[DataFrame] = []
    for csv in sorted(CFG["src_dir"].glob("*.csv")):
        raw_df = pd.read_csv(csv, dtype=str)
        std_df, _ = map_dataframe(raw_df, csv.name)
        std_tables.append(std_df)

    master = pd.concat(std_tables, ignore_index=True)
    master.to_excel(CFG["out_dir"] / "standardised_schedules.xlsx", index=False)
    
    total_cells = master.shape[0] * master.shape[1]
    num_nulls = master.isna().sum().sum()
    null_pct = 100 * num_nulls / total_cells if total_cells else 0
    print(f"Null percentage : {null_pct:}%")

    evaluate()

run()


Null percentage : 47.84313725490196%
True positives : 56
False positives: 15
False negatives: 17
Precision      : 0.7887323943661971
Recall         : 0.7671232876712328
F1 score       : 0.7777777777777778



In [63]:
def run_once(alpha, threshold, schema_w, instance_w):
    CFG["alpha"]      = alpha
    CFG["threshold"]  = threshold
    CFG["schema_w"]   = schema_w
    CFG["instance_w"] = instance_w

    for f in CFG["out_dir"].glob("*_debug.xlsx"):
        f.unlink(missing_ok=True)

    for csv in CFG["src_dir"].glob("*.csv"):
        df_raw = pd.read_csv(csv, dtype=str)
        std_df, _ = map_dataframe(df_raw, csv.name)

    W = {
        aid: {**schema_w, **instance_w}
        for aid in TGT_IDS
    }
    f1 = evaluate(verbose=False)

    print(f"α={alpha} θ={threshold} F1={f1}")
    return f1


## Optimize alpha and threshold

We try different combinations of Alpha and Threshold to get the best F1 score.
We use this combination later on as well itertools is used to iterate over all combinations of alpha/threshold and the best combination is outputted

In [13]:
SCHEMA_W = CFG["schema_w"]
INSTANCE_W = CFG["instance_w"]

In [None]:
#alphas = [0.35, 0.40, 0.45, 0.50, 0.55, 0.60, 0.65, 0.70]
#thresholds = [0.35, 0.40, 0.45, 0.50, 0.55, 0.60, 0.65, 0.70, 0.75]

alphas = np.linspace(0.35, 0.75, num=int((0.75 - 0.35) / 0.02) + 1).round(2).tolist()
thresholds = np.linspace(0.35, 0.75, num=int((0.75 - 0.35) / 0.02) + 1).round(2).tolist()

TGT_IDS = list(ATTR.keys()) 
best = {"f1": -1}

for α, θ in itertools.product(alphas, thresholds):
    f1 = run_once(α, θ, deepcopy(SCHEMA_W), deepcopy(INSTANCE_W))
    if f1 > best["f1"]:
        best = dict(alpha=α, threshold=θ, schema_w=SCHEMA_W, instance_w=INSTANCE_W, f1=f1)

print("Best config:", best)


## Results

Alpha = 0.5 chosen and threshold = 0.65

In [128]:
CFG["alpha"] = 0.5
CFG["threshold"] = 0.5

In [129]:
gt = pd.read_csv(CFG["ground_truth"])
gt = gt[gt["TARGET_ATTRIBUTE"] != "omit"]
gt["RAW_HEADER_CLEAN"] = gt["RAW_HEADER"].apply(clean)
FAST_GT = gt[["FORMAT", "RAW_HEADER_CLEAN", "TARGET_ATTRIBUTE"]]


In [130]:
def fast_f1(weight_tbl):
    rows = []

    for csv in sorted(CFG["src_dir"].glob("*.csv")):
        df_raw = pd.read_csv(csv, dtype=str)
        fmt = re.split(r"[_\-]", csv.stem)[0].upper()

        mapping = resolve_mapping_tensor(
            df_raw,
            csv,  # using csv as the file_key
            weights=weight_tbl,
            alpha=CFG["alpha"],
            thresh=CFG["threshold"],
        )

        for raw, aid in mapping.items():
            rows.append({
                "FORMAT": fmt,
                "RAW_HEADER_CLEAN": raw,
                "PREDICTED_ATTR": aid,
                "FILE_NAME": csv.name,
            })

    if not rows:
        return 0.0, 0.0, 0.0

    pred = pd.DataFrame(rows)
    gt = FAST_GT

    merged = pred.merge(gt, on=["FORMAT", "RAW_HEADER_CLEAN"], how="outer", indicator=True)

    tp = ((merged["_merge"] == "both") & (merged["PREDICTED_ATTR"] == merged["TARGET_ATTRIBUTE"])).sum()
    fp = ((merged["_merge"] == "both") & (merged["PREDICTED_ATTR"] != merged["TARGET_ATTRIBUTE"])).sum()
    fp += (merged["_merge"] == "left_only").sum()
    fn = (merged["_merge"] == "right_only").sum()

    p = tp / (tp + fp) if (tp + fp) else 0.0
    r = tp / (tp + fn) if (tp + fn) else 0.0
    f1 = 2 * p * r / (p + r) if (p + r) else 0.0

    return p, r, f1


In [131]:
GRID = np.linspace(0, 1, CFG["grid_size"])
METRICS = list(SCHEMA_METRICS) + list(INSTANCE_METRICS)

WEIGHTS = {aid: {m: 0.5 for m in METRICS} for aid in TGT_IDS}

# run F1 evaluation for one specific weight combination
def f1_for_combo(aid, trial_vec):
    trial_weights = dict(zip(METRICS, trial_vec))
    temp_weights = deepcopy(WEIGHTS)
    temp_weights[aid].update(trial_weights)
    return fast_f1(temp_weights), trial_weights

# try combinations of weights for a single attribute
def search_attribute(aid):
    all_combos = list(itertools.product(GRID, repeat=len(METRICS)))
    print(f"Trying {len(all_combos)} combinations for {aid}...")
    results = Parallel(n_jobs=mp.cpu_count() - 1, backend="loky", verbose=5)(
        delayed(f1_for_combo)(aid, vec) for vec in all_combos
    )
    
    # Finding best set
    best_f1 = -1.0
    best_weights = {}

    for score, trial_weights in results:
        if score > best_f1:
            best_f1 = score
            best_weights = trial_weights

    WEIGHTS[aid].update(best_weights)
    print(f"Best F1 for {aid}: {best_f1}")


In [132]:
def optimise_all_attributes(num_passes=1):
    grid = np.linspace(0.1, 1.0, CFG["grid_size"])
    metrics = list(SCHEMA_METRICS) + list(INSTANCE_METRICS)
    all_combos = list(itertools.product(grid, repeat=len(metrics)))

    for epoch in range(num_passes):
        print(f"Epoch {epoch + 1}")
        for aid in TGT_IDS:
            print(f"Searching weights for: {aid}")
            prev_weights = deepcopy(WEIGHTS[aid])
            results = Parallel(n_jobs=mp.cpu_count() - 1, backend="loky", verbose=10)(delayed(f1_for_combo)(aid, vec) for vec in all_combos)
            best_f1 = -1.0
            best_weights = None
            for (p, r, f1), trial_weights in results:
                if f1 > best_f1:
                    best_f1 = f1
                    best_weights = trial_weights

            WEIGHTS[aid].update(best_weights)
            print(f"Best F1: {best_f1} (Precision: {p}, Recall: {r})")

            if best_weights != prev_weights:
                print(f" Weights updated for {aid}")

    out = Path("attr_weights.json")
    out.write_text(json.dumps(WEIGHTS, indent=2))
    print(f"Saved attribute weights to {out}")


In [133]:
def run_mapping_for_all_sources(weights):
    results = []

    for csv_file in sorted(CFG["src_dir"].glob("*.csv")):
        df = pd.read_csv(csv_file, dtype=str)
        std_df, _ = map_dataframe(df, csv_file.name, weights)
        results.append(std_df)

    return results


In [134]:
def resolve_mapping_tensor(df_raw, file_key, weights, alpha, thresh):
    schema = SCHEMA_T[file_key]
    instance = INSTANCE_T[file_key]
    headers = HEADER_CLEAN[file_key]

    num_raw, num_attr = schema.shape[:2]
    score_matrix = np.full((num_raw, num_attr), -1e9)

    for j, attr_id in enumerate(TGT_IDS):
        schema_w = np.array([weights[attr_id][name] for name in SCHEMA_METRICS])
        instance_w = np.array([weights[attr_id][name] for name in INSTANCE_METRICS])

        schema_score = (schema[:, j, :] * schema_w).sum(axis=1) / schema_w.sum()
        instance_score = (instance[:, j, :] * instance_w).sum(axis=1) / instance_w.sum()

        score_matrix[:, j] = alpha * schema_score + (1 - alpha) * instance_score

    row_idx, col_idx = linear_sum_assignment(-score_matrix)

    return {
        headers[i]: TGT_IDS[j]
        for i, j in zip(row_idx, col_idx)
        if score_matrix[i, j] >= thresh
    }

In [135]:
# Cached similarity helpers
@lru_cache(maxsize=None)
def token_ratio(a, b):
    return fuzz.token_set_ratio(a, b)

@lru_cache(maxsize=None)
def lev_similarity(a, b):
    return Lev.normalized_similarity(a, b)

# Schema-based similarity metrics
def schema_synonym(header, attr_id):
    attr = ATTR[attr_id]
    h_clean = clean(header)
    options = [attr["name_clean"]] + attr["syn_clean"]
    return max(token_ratio(h_clean, s) for s in options) / 100

def schema_levenshtein(header, attr_id):
    attr = ATTR[attr_id]
    h_clean = clean(header)
    options = [attr["name_clean"]] + attr["syn_clean"]
    return max(lev_similarity(h_clean, s) for s in options)

# Jaccard stays as-is but memoized
schema_jaccard = lru_cache(maxsize=None)(schema_jaccard)

# Load CSVs
RAW_DFS = {
    path: pd.read_csv(path, dtype=str)
    for path in CFG["src_dir"].glob("*.csv")
}

# Storage for tensors and headers
SCHEMA_T = {}
INSTANCE_T = {}
HEADER_CLEAN = {}

# Get list of metric functions
schema_metrics = list(SCHEMA_METRICS.values())
instance_metrics = list(INSTANCE_METRICS.values())

# Compute schema + instance metric tensors for each file
for path, df in RAW_DFS.items():
    cleaned_headers = [clean(h) for h in df.columns]
    HEADER_CLEAN[path] = cleaned_headers

    num_headers = len(cleaned_headers)
    num_attrs = len(TARGET_IDS)

    schema_tensor = np.zeros((num_headers, num_attrs, len(schema_metrics)), dtype=np.float32)
    instance_tensor = np.zeros((num_headers, num_attrs, len(instance_metrics)), dtype=np.float32)

    for j, attr_id in enumerate(TARGET_IDS):
        for i, h in enumerate(cleaned_headers):
            for k, func in enumerate(schema_metrics):
                schema_tensor[i, j, k] = func(h, attr_id)

        for i, raw_col in enumerate(df.columns):
            col_data = df[raw_col]
            for k, func in enumerate(instance_metrics):
                val = func(None, col_data, attr_id)
                instance_tensor[i, j, k] = 0.0 if val is None else val

    SCHEMA_T[path] = schema_tensor
    INSTANCE_T[path] = instance_tensor


In [136]:
FAST_GT = (
    pd.read_csv(CFG["ground_truth"])
      .query("TARGET_ATTRIBUTE != 'omit'")
      .assign(RAW_HEADER_CLEAN=lambda df: df["RAW_HEADER"].map(clean))
      [["FORMAT", "RAW_HEADER_CLEAN", "TARGET_ATTRIBUTE"]]
)

# we optimize weight and set the mode for printing purposes
CFG["opt_mode"] = True
optimise_all_attributes()
CFG["opt_mode"] = False

# Run mapping using learned weights
standardized_tables = run_mapping_for_all_sources(WEIGHTS)
final_df = pd.concat(standardized_tables, ignore_index=True)

# Calculate and print null percentage
total = final_df.shape[0] * len(TGT_IDS)
nulls = final_df[TGT_IDS].isna().sum().sum()
print("Final null %:", 100 * nulls / total)


Epoch 1
Searching weights for: property_id


[Parallel(n_jobs=31)]: Using backend LokyBackend with 31 concurrent workers.
[Parallel(n_jobs=31)]: Done  10 tasks      | elapsed:    7.3s
[Parallel(n_jobs=31)]: Done  23 tasks      | elapsed:    7.3s
[Parallel(n_jobs=31)]: Done  36 tasks      | elapsed:    7.5s
[Parallel(n_jobs=31)]: Done  51 tasks      | elapsed:    7.6s
[Parallel(n_jobs=31)]: Done  66 tasks      | elapsed:    7.7s
[Parallel(n_jobs=31)]: Done  83 tasks      | elapsed:    7.7s
[Parallel(n_jobs=31)]: Done 100 tasks      | elapsed:    7.8s
[Parallel(n_jobs=31)]: Done 119 tasks      | elapsed:    7.9s
[Parallel(n_jobs=31)]: Done 138 tasks      | elapsed:    7.9s
[Parallel(n_jobs=31)]: Done 159 tasks      | elapsed:    8.0s
[Parallel(n_jobs=31)]: Done 180 tasks      | elapsed:    8.1s
[Parallel(n_jobs=31)]: Done 203 tasks      | elapsed:    8.2s
[Parallel(n_jobs=31)]: Done 226 tasks      | elapsed:    8.3s
[Parallel(n_jobs=31)]: Done 251 tasks      | elapsed:    8.4s
[Parallel(n_jobs=31)]: Done 276 tasks      | elapsed:  

KeyboardInterrupt: 

In [75]:
#Calc avg weights after optimization per datatype

In [140]:
with open("attr_weights grid_4.json") as f:
    weights = json.load(f)

with open(r"C:\Users\tim\Documents\THESIS\Data\target_schema_complete_repaired - full.yaml") as f:
    schema = yaml.safe_load(f)

schema_metrics = {"schema_synonym", "schema_jaccard", "schema_levenshtein"}
instance_metrics = {"instance_type", "instance_date", "instance_ks", "instance_dist"}

# Group weights by data type
by_type = defaultdict(list)
for attr in schema["attributes"]:
    aid = attr["id"]
    dtype = attr["data_type"]
    if aid in weights:
        by_type[dtype].append(weights[aid])

# Print average weights per type
for dtype in ["string", "decimal", "date"]:
    s_sum = i_sum = s_count = i_count = 0

    for w in by_type[dtype]:
        for metric, value in w.items():
            if metric in schema_metrics:
                s_sum += value
                s_count += 1
            elif metric in instance_metrics:
                i_sum += value
                i_count += 1

    s_avg = s_sum / s_count if s_count else 0
    i_avg = i_sum / i_count if i_count else 0
    gap = i_avg - s_avg

    print(f"{dtype}")
    print(f"  Avg schema weight  : {s_avg}")
    print(f"  Avg instance weight: {i_avg}")
    print(f"  Gap                : {gap}")

schema_avgs = []
instance_avgs = []

for aid, w in weights.items():
    s_avg = sum(w.get(m, 0) for m in schema_metrics) / len(schema_metrics)
    i_avg = sum(w.get(m, 0) for m in instance_metrics) / len(instance_metrics)
    schema_avgs.append(s_avg)
    instance_avgs.append(i_avg)

stat, p = wilcoxon(instance_avgs, schema_avgs)
print("Wilcoxon test result:")
print(f"  Statistic: {stat}")
print(f"  P-value  : {p}")

string
  Avg schema weight  : 0.1333333333333334
  Avg instance weight: 0.2625
  Gap                : 0.12916666666666662
decimal
  Avg schema weight  : 0.11428571428571435
  Avg instance weight: 0.3892857142857142
  Gap                : 0.2749999999999998
date
  Avg schema weight  : 0.09999999999999999
  Avg instance weight: 0.47500000000000003
  Gap                : 0.37500000000000006

Wilcoxon test result:
  Statistic: 3.0
  P-value  : 0.0004908453484823393
