In [1]:
import numpy as np
import pandas as pd
import os
import time
import csv
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

In [2]:
DATA_DIR = "data_csv"
RESULTS_DIR = "results_csv"
os.makedirs(RESULTS_DIR, exist_ok=True)

In [3]:
def load_csv_dataset(file_name):
    path = os.path.join(DATA_DIR, file_name)
    df = pd.read_csv(path)
    X = df.drop(columns=["label"]).values
    y = df["label"].values.astype(int)
    return X, y

In [4]:
class LinearHypothesis:
    def __init__(self, w, b):
        self.w = np.asarray(w, dtype=float)
        self.b = float(b)
    def predict(self, X):
        X = np.atleast_2d(X)
        scores = X.dot(self.w) + self.b
        return np.where(scores >= 0, 1, -1)

In [5]:
def empirical_error(hyp, X, y):
    if len(y) == 0:
        return 0.5
    preds = hyp.predict(X)
    return float(np.mean(preds != y))

In [6]:
def vc_radius(n, dvc, delta):
    if n <= 0:
        return 1.0
    term1 = dvc * np.log((2.0 * np.e * n) / max(dvc, 1.0))
    term2 = np.log(4.0 / max(delta, 1e-12))
    val = max((term1 + term2) / max(n, 1.0), 0.0)
    return min(1.0, np.sqrt(val))

In [7]:
def LB(hyp, X, y, dvc, delta_k):
    n = len(y)
    err_hat = empirical_error(hyp, X, y)
    r = vc_radius(n, dvc, delta_k)
    return max(0.0, err_hat - r)

In [8]:
def UB(hyp, X, y, dvc, delta_k):
    n = len(y)
    err_hat = empirical_error(hyp, X, y)
    r = vc_radius(n, dvc, delta_k)
    return min(1.0, err_hat + r)

In [9]:
def generate_linear_hypotheses(X, y, M=500, random_state=0):
    rng = np.random.RandomState(random_state)
    n, d = X.shape
    scaler = StandardScaler().fit(X)
    Xs = scaler.transform(X)
    hyps = []
    n_boot = max(1, int(M/2))
    for i in range(n_boot):
        idx = rng.choice(n, size=n, replace=True)
        clf = LogisticRegression(penalty='l2', C=1.0, solver='liblinear', max_iter=200)
        try:
            clf.fit(Xs[idx], y[idx])
            w = clf.coef_.ravel()
            b = clf.intercept_.item()
            # unscale
            w_un = w / scaler.scale_
            b_un = b - np.dot(w, scaler.mean_/scaler.scale_)
            hyps.append(LinearHypothesis(w_un, b_un))
        except Exception:
            hyps.append(LinearHypothesis(rng.randn(d), rng.randn()))
    for _ in range(M - n_boot):
        hyps.append(LinearHypothesis(rng.randn(d), rng.randn()))
    return hyps

In [10]:
def empirical_disagreement_fraction(H, X_subset):
    if X_subset is None or len(X_subset) == 0:
        return 0.0
    preds = np.array([h.predict(X_subset) for h in H])  # shape (|H|, m)
    disag = np.sum(preds.max(axis=0) != preds.min(axis=0))
    return float(disag) / float(X_subset.shape[0])

In [11]:
def save_results_csv(filepath, dataset_name, method_name, n_labels, accuracy, total_time, learning_curve):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    lc_str = ";".join(f"{int(n)}:{float(a)}" for n, a in learning_curve)
    file_exists = os.path.isfile(filepath)
    with open(filepath, "a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        if not file_exists:
            writer.writerow(["dataset", "method", "n_labels", "accuracy", "total_time", "learning_curve"])
        writer.writerow([dataset_name, method_name, n_labels, float(accuracy), float(total_time), lc_str])

In [12]:
def A2_algorithm1(X_pool, Y_pool, H, dvc, epsilon=0.05, delta=0.05,
                  max_outer=100, rng_seed=0, save_path=None, dataset_name="undefined"):
    """
    Implémentation fidèle de l'Algorithm 1 A^2 (selon ton pseudo-code collé).
    Paramètres:
        X_pool, Y_pool : pool simulant D (entièrement visible pour la simulation)
        H : classe d'hypothèses approximée par une liste finie (version space approx)
        dvc : VC-dimension (ex: d+1 pour séparateurs linéaires)
        epsilon, delta : paramètres
    Retour :
        best_h, log
    """
    rng = np.random.RandomState(rng_seed)
    n_total = X_pool.shape[0]
    pool_indices = np.arange(n_total)

    # Initialize variables as in the pseudo-code
    i = 1
    # D_i simulated by the empirical pool (we'll restrict indices when needed)
    Di_indices = pool_indices.copy()
    Hi = list(H)
    Si_x = np.zeros((0, X_pool.shape[1]))
    Si_y = np.zeros((0,), dtype=int)
    k = 1

    learning_curve = []
    n_labels = 0
    start_time = time.time()

    # helper to get min UB and min LB over a set of hypotheses Hset with current Si and δ_k
    def min_UB_and_min_LB(Hset, Sx, Sy, dvc, delta_k):
        if len(Hset) == 0:
            return 1.0, 0.0
        UB_vals = np.array([UB(h, Sx, Sy, dvc, delta_k) for h in Hset])
        LB_vals = np.array([LB(h, Sx, Sy, dvc, delta_k) for h in Hset])
        return float(np.min(UB_vals)), float(np.min(LB_vals))

    # Outer while condition:
    # while Disagree_D(Hi) * (min_{h in Hi} UB(Si, h, δ^k) - min_{h in Hi} LB(Si, h, δ^k)) > epsilon
    while True:
        # compute current disagreement fraction on Di_indices (simulate Disagree_D(Hi) using Di)
        if len(Di_indices) == 0:
            disagree_Hi = 0.0
        else:
            disagree_Hi = empirical_disagreement_fraction(Hi, X_pool[Di_indices, :])

        # compute delta_k for current k; we choose δ_k = delta / k (explicit choice)
        delta_k = max(delta / max(1, k), 1e-12)
        min_UB_Hi, min_LB_Hi = min_UB_and_min_LB(Hi, Si_x, Si_y, dvc, delta_k)

        outer_metric = disagree_Hi * (min_UB_Hi - min_LB_Hi)
        # If outer_metric <= epsilon -> stop and return best hypothesis
        if outer_metric <= epsilon:
            # choose h = argmin_{h in Hi} UB(Si, h, δ_k)
            UB_vals = np.array([UB(h, Si_x, Si_y, dvc, delta_k) for h in Hi]) if len(Hi)>0 else np.array([1.0])
            best_idx = int(np.argmin(UB_vals))
            best_h = Hi[best_idx] if len(Hi)>0 else H[0]
            total_time = time.time() - start_time
            best_h.n_labels_used = n_labels
            best_h.learning_curve = learning_curve
            if save_path is not None:
                preds = best_h.predict(X_pool)
                acc = float(np.mean(preds == Y_pool))
                save_results_csv(save_path, dataset_name, "A2_algorithm1", n_labels, acc, total_time, learning_curve)
            return best_h, {"n_labels": n_labels, "learning_curve": learning_curve, "total_time": total_time}

        # Step 1 of pseudo-code: set Si = ∅, H0_i = Hi, k <- k + 1
        Si_x = np.zeros((0, X_pool.shape[1]))
        Si_y = np.zeros((0,), dtype=int)
        H0_i = list(Hi)
        k += 1  # increment as in the pseudocode

        # precompute Disagree_D(Hi) for inner loop threshold
        threshold_half = 0.5 * disagree_Hi

        # Inner loop: while Disagree_D(H0_i) >= 1/2 Disagree_D(Hi)
        # use Di (current Di indices from previous iteration) to measure disagreement
        # We'll use the full pool to get fraction for H0_i (consistent with simulation)
        while True:
            disagree_H0i = empirical_disagreement_fraction(H0_i, X_pool[Di_indices, :]) if len(Di_indices)>0 else 0.0
            if disagree_H0i < threshold_half:
                break  # inner while condition false -> exit inner loop

            # (a) check if Disagree_D(H0_i) * (min_{h in H0_i} UB - min_{h in H0_i} LB) <= epsilon
            # use δ_k in UB/LB
            delta_k = max(delta / max(1, k), 1e-12)
            min_UB_H0i, min_LB_H0i = min_UB_and_min_LB(H0_i, Si_x, Si_y, dvc, delta_k)
            inner_metric = disagree_H0i * (min_UB_H0i - min_LB_H0i)

            if inner_metric <= epsilon:
                # (b) return h = argmin_{h in H0_i} UB(Si, h, δ_k)
                UB_vals = np.array([UB(h, Si_x, Si_y, dvc, delta_k) for h in H0_i]) if len(H0_i)>0 else np.array([1.0])
                best_idx = int(np.argmin(UB_vals))
                best_h = H0_i[best_idx] if len(H0_i)>0 else H[0]
                total_time = time.time() - start_time
                best_h.n_labels_used = n_labels
                best_h.learning_curve = learning_curve
                if save_path is not None:
                    preds = best_h.predict(X_pool)
                    acc = float(np.mean(preds == Y_pool))
                    save_results_csv(save_path, dataset_name, "A2_algorithm1", n_labels, acc, total_time, learning_curve)
                return best_h, {"n_labels": n_labels, "learning_curve": learning_curve, "total_time": total_time}

            # (c) else:
            # i. S0_i = Rejection sample 2|Si| + 1 samples x from D such that exists disagreement in Hi
            batch_size = int(2 * len(Si_y) + 1)
            collected = 0
            attempts = 0
            max_attempts = max(1000, batch_size * 200)
            # D_i for rejection criterion is based on Hi (not H0_i)
            # compute disag mask for Hi over the full empirical pool
            preds_Hi = np.array([h.predict(X_pool) for h in Hi])
            disag_mask_Hi = (preds_Hi.max(axis=0) != preds_Hi.min(axis=0))
            # Build list of candidate indices (Di_indices already simulate Di)
            # But criterion requires "exists h1,h2 in Hi : h1(x)!=h2(x)" -> use disag_mask_Hi
            candidate_indices = pool_indices[disag_mask_Hi]
            if len(candidate_indices) == 0:
                # nothing to sample; break inner loop to avoid infinite loop
                break

            while collected < batch_size and attempts < max_attempts:
                attempts += 1
                idx = int(rng.choice(pool_indices))
                # Accept only if idx is candidate (disagreement under Hi)
                if not disag_mask_Hi[idx]:
                    continue
                # otherwise accept this sample
                x = X_pool[idx:idx+1]
                y = np.array([Y_pool[idx]])
                Si_x = np.vstack([Si_x, x]) if Si_x.size else x.copy()
                Si_y = np.concatenate([Si_y, y]) if Si_y.size else y.copy()
                collected += 1
                n_labels += 1

            # ii. Si ← Si ∪ {(x,O(x)) : x ∈ S0_i}; k ← k + 1;
            k += 1

            # iii. H0_i = {h ∈ Hi : LB(Si, h, δ_k) ≤ min_{h0 ∈ Hi} UB(Si, h0, δ_k)} , k ← k + 1.
            delta_k = max(delta / max(1, k), 1e-12)
            UB_vals = np.array([UB(h, Si_x, Si_y, dvc, delta_k) for h in Hi])
            LB_vals = np.array([LB(h, Si_x, Si_y, dvc, delta_k) for h in Hi])
            min_UB_val_over_Hi = float(np.min(UB_vals)) if len(UB_vals)>0 else 1.0

            H0_i_new = []
            for idx_h, h in enumerate(Hi):
                if LB_vals[idx_h] <= min_UB_val_over_Hi + 1e-12:
                    H0_i_new.append(h)
            if len(H0_i_new) > 0:
                H0_i = H0_i_new
            # increment k again per pseudo-code
            k += 1

            # update disagree_H0i for next inner loop check (we'll recompute at top)
            # but to keep consistent recompute Di_indices as restriction of Di by H0_i
            # Di is Di_indices restricted to points where H0_i disagree
            if len(Di_indices) > 0 and len(H0_i) > 0:
                preds_H0i = np.array([h.predict(X_pool) for h in H0_i])
                disag_mask_H0i = (preds_H0i.max(axis=0) != preds_H0i.min(axis=0))
                Di_indices = Di_indices[disag_mask_H0i[Di_indices]] if len(Di_indices)>0 else Di_indices

            # update learning curve with current best hypothesis according to UB on Si
            if len(H0_i) > 0:
                delta_k_now = max(delta / max(1, k), 1e-12)
                UB_vals_now = np.array([UB(h, Si_x, Si_y, dvc, delta_k_now) for h in H0_i])
                best_tmp = H0_i[int(np.argmin(UB_vals_now))]
                acc_tmp = float(np.mean(best_tmp.predict(X_pool) == Y_pool))
                learning_curve.append((n_labels, acc_tmp))

        # End inner while
        # 3. Hi+1 ← H0_i ; Di+1 ← Di conditioned on the disagreement ; i ← i + 1
        Hi = list(H0_i)
        # Di_indices is already restricted above; if not, restrict now by Hi
        if len(Di_indices) > 0 and len(Hi) > 0:
            preds_Hi = np.array([h.predict(X_pool) for h in Hi])
            disag_mask_Hi = (preds_Hi.max(axis=0) != preds_Hi.min(axis=0))
            Di_indices = pool_indices[disag_mask_Hi]
        i += 1
        # loop back to outer while condition


In [14]:
datasets = ["synthetique.csv", "iris.csv"]
results_csv = os.path.join(RESULTS_DIR, "A2_algorithm1_results.csv")

for dataset_file in datasets:
    if not os.path.exists(os.path.join(DATA_DIR, dataset_file)):
        print(f"Dataset {dataset_file} missing in {DATA_DIR} – skipping.")
        continue

    X, y = load_csv_dataset(dataset_file)
    d = X.shape[1]
    dvc = d + 1

    print(f"\nRunning exact A^2 algorithm on {dataset_file} (n={len(y)}, d={d})")
    H_pool = generate_linear_hypotheses(X, y, M=200, random_state=42)

    t0 = time.time()
    best_h, log = A2_algorithm1(X, y, H_pool, dvc=dvc,
                                epsilon=0.02, delta=0.05,
                                max_outer=50, rng_seed=42,
                                save_path=results_csv, dataset_name=dataset_file.replace(".csv",""))
    t1 = time.time()
    acc = float(np.mean(best_h.predict(X) == y))
    print(f"Done: n_labels={log['n_labels']}, accuracy={acc:.4f}, time={t1-t0:.2f}s")


Running exact A^2 algorithm on synthetique.csv (n=200, d=2)


KeyboardInterrupt: 