In [None]:
import json
import pandas as pd
from pathlib import Path

# Helper-Functions 
Those functions generate the paths to the result directories and preprocess the data:

In [None]:
PARAMS_CELEBA = {
    "lr": 0.0005,
    "rec_alpha": 0.0005,
    "rec_alpha_pert": 0.0005,
    "rec_lr": 0.025,
    "rec_max_iter": 10000,
    "n_epochs": {"dcgan": 50, "wgangp": 200, "lsgan": 100, "ebgan": 100},
}
PARAMS_LSUN = {
    "lr": 0.0005,
    "rec_alpha": 0.0005,
    "rec_alpha_pert": 0.0005,
    "rec_lr": 0.00075,
    "rec_max_iter": 10000,
    "n_epochs": {"dcgan": 10, "wgangp": 10, "lsgan": 10, "ebgan": 5},
}
PARAMS_COCO = {
    "lr": 0.0005,
    "rec_alpha": 1e-05,
    "rec_alpha_pert": 1e-05,
    "rec_lr": 0.01,
    "rec_max_iter": 1000,
    "n_epochs": {"stablediffusion": 100}
}
PARAMS_FFHQ = {
    "lr": 0.0005,
    "rec_alpha": 5e-04,
    "rec_alpha_pert": 1e-05,
    "rec_lr": 0.025,
    "rec_max_iter": 10000,
    "n_epochs": {"stylegan2": 100}
}
PARAMS_BCDR = {
    "lr": 0.0005,
    "rec_alpha": 0.001,
    "rec_alpha_pert": 0.001,
    "rec_lr": 0.05,
    "rec_max_iter": 10000,
    "n_epochs": {"medigan_dcgan": 50}
}
PARAMS_WHITEWINE = {
    "lr": 0.001,
    "rec_alpha": 0.0001,
    "rec_alpha_pert": 0.0001,
    "rec_lr": 0.001,
    "rec_max_iter": 10000,
    "n_epochs": {"klwgan": 100}
}
PARAMS_REDWINE = {
    "lr": 0.001,
    "rec_alpha": 0.0005,
    "rec_alpha_pert": 0.0001,
    "rec_lr": 0.001,
    "rec_max_iter": 10000,
    "n_epochs": {"klwgan": 100}
}

PARAMS = {"celeba": PARAMS_CELEBA, 
          "lsun": PARAMS_LSUN, 
          "coco": PARAMS_COCO, 
          "ffhq": PARAMS_FFHQ,
          "bcdr": PARAMS_BCDR,
          "whitewine": PARAMS_WHITEWINE,
          "redwine": PARAMS_REDWINE}

ATTRS = {"celeba": ["fingerprint", "l2_inversion", "inception_inversion", "raw", "dct", "act"],
         "lsun": ["fingerprint", "l2_inversion", "inception_inversion", "raw", "dct", "act"],
         "coco": ["fingerprint", "l2_inversion", "inception_inversion", "raw", "dct", "act"],
         "ffhq": ["fingerprint", "raw", "dct"],
         "bcdr": ["fingerprint", "raw", "dct", "act"],
         "whitewine": ["l2_inversion", "raw", "act"],
         "redwine": ["l2_inversion", "raw", "act"],
        } 
MODELS = {"celeba": ["dcgan", "wgangp", "lsgan", "ebgan"],
          "lsun": ["dcgan", "wgangp", "lsgan", "ebgan"],
          "coco": ["stablediffusion"],
          "ffhq": ["stylegan2"],
          "bcdr": ["medigan_dcgan"],
          "whitewine": ["klwgan"],
          "redwine": ["klwgan"]}

OUTDIR = {"celeba": Path("output"),
          "lsun": Path("output"),
          "coco": Path("output/"),
          "ffhq": Path("output/"),
          "bcdr": Path("output/"),
          "whitewine": Path("output"),
          "redwine": Path("output")
         }
OUTDIR_BASELINES = Path("output/baselines") 

In [None]:
def build_path_results(data, model, attr, perturb=False, sd_checkpoint="stabilityai_stable_diffusion_2_1_base", modelnr_wine=0):
    attrs = ATTRS[data]
    assert attr in attrs, f"attr needs to be in {attrs}."

    n_epochs = PARAMS[data]["n_epochs"][model]
    lr = PARAMS[data]["lr"]
    rec_lr = PARAMS[data]["rec_lr"]
    rec_alpha = PARAMS[data]["rec_alpha"]
    rec_max_iter = PARAMS[data]["rec_max_iter"]
    rec_alpha = PARAMS[data]["rec_alpha"]
    if attr in ["act", "raw", "dct"]:
        if perturb:
            rec_alpha = PARAMS[data]["rec_alpha_pert"]
        if data == "coco":
            path = (
                OUTDIR[data]
                / f"model={model}-checkpoint={sd_checkpoint}-feat={attr}-n_epochs={n_epochs}-lr={lr}-lr_milestones=[25, 50, 100]-rec_alpha={rec_alpha}-rec_lr={rec_lr}-rec_momentum=0.0-rec_max_iter={rec_max_iter}"
                / "results"
            )
        elif data == "ffhq":
            path = (
                OUTDIR[data]
                / f"model={model}-checkpoint=trained_models_stylegan2_stylegan2_ffhq_256x256_pkl-feat={attr}-n_epochs={n_epochs}-lr={lr}-lr_milestones=[25, 50, 100]-rec_alpha={rec_alpha}-rec_lr={rec_lr}-rec_momentum=0.0-rec_max_iter={rec_max_iter}"
                / "results"
            )
        elif data == "bcdr":
            path = (
                OUTDIR[data]
                / f"model={model}-checkpoint=trained_models_dcgan_bcdr_model_state_dict_pt-feat={attr}-n_epochs={n_epochs}-lr={lr}-lr_milestones=[25, 50, 100]-rec_alpha={rec_alpha}-rec_lr={rec_lr}-rec_momentum=0.0-rec_max_iter={rec_max_iter}"
                / "results"
            )
        elif data in ["whitewine", "redwine"]:
            path = (
                OUTDIR[data]
                / f"model={model}-checkpoint=trained_models_klwgan_hinge_{data}_{modelnr_wine}_pkl-feat={attr}-n_epochs={n_epochs}-lr={lr}-lr_milestones=[25, 50, 100]-rec_alpha={rec_alpha}-rec_lr={rec_lr}-rec_momentum=0.0-rec_max_iter={rec_max_iter}"
                / "results"
            )
        else:
            if attr in ["raw", "dct"]:
                path = (
                    OUTDIR[data]
                    / f"model={model}-checkpoint=trained_models_{model}_{data}_nz_100_niter_{n_epochs}_model_1_checkpoints_netG_epoch_{n_epochs - 1}_pth-feat={attr}-n_epochs=50-lr={lr}-lr_milestones=[25, 50, 100]-rec_alpha={rec_alpha}-rec_lr=0.025-rec_momentum=0.0-rec_max_iter=10000"
                    / "results"
                )
            
            else:
                path = (
                    OUTDIR[data]
                    / f"model={model}-checkpoint=trained_models_{model}_{data}_nz_100_niter_{n_epochs}_model_1_checkpoints_netG_epoch_{n_epochs - 1}_pth-feat={attr}-n_epochs=50-lr={lr}-lr_milestones=[25, 50, 100]-rec_alpha={rec_alpha}-rec_lr={rec_lr}-rec_momentum=0.0-rec_max_iter={rec_max_iter}"
                    / "results"
                )
    elif attr == "fingerprint":
        if data == "coco":
            path = (
                OUTDIR[data] / "baselines" 
                / f"model={model}-checkpoint={sd_checkpoint}-attr=fingerprint" / "results"
            )
        elif data == "bcdr":
            bcdr_checkpoint = "trained_models_dcgan_bcdr_model_state_dict_pt"
            path = (
                OUTDIR[data] / "baselines" / f"model={model}-checkpoint={bcdr_checkpoint}-attr={attr}" / "results"
            )
        elif data == "ffhq":
            ffhq_checkpoint = "pretrained_models_stylegan2_stylegan2_ffhq_256x256_pkl"
            path = (
                OUTDIR[data] / "baselines" / f"model={model}-checkpoint={ffhq_checkpoint}-attr={attr}" / "results"
            )
        else:
            path = (
                OUTDIR_BASELINES 
                / f"model={model}-checkpoint=trained_models_{model}_{data}_nz_100_niter_{n_epochs}_model_1_checkpoints_netG_epoch_{n_epochs - 1}_pth-attr=fingerprint"
                / "results"
            )
    elif attr in ["l2_inversion", "inception_inversion"]:
        if data == "coco":
            path = (
                OUTDIR[data] / "baselines" / f"model={model}-checkpoint={sd_checkpoint}-attr={attr}" / "results"
            )
        elif data == "bcdr":
            bcdr_checkpoint = "trained_models_dcgan_bcdr_model_state_dict_pt"
            path = (
                OUTDIR[data] / "baselines" / f"model={model}-checkpoint={bcdr_checkpoint}-attr={attr}" / "results"
            )
        elif data in ["whitewine", "redwine"]:
            wine_checkpoint = f"trained_models_klwgan_hinge_{data}_{modelnr_wine}_pkl"
            path = (
                OUTDIR[data] / "baselines" / f"model={model}-checkpoint={wine_checkpoint}-attr={attr}-lr_inv=0.1-num_steps_inv=1000-num_inits_inv=10" / "results"
            )
        else:
            path = (
                OUTDIR_BASELINES
                / f"model={model}-checkpoint=trained_models_{model}_{data}_nz_100_niter_{n_epochs}_model_1_checkpoints_netG_epoch_{n_epochs - 1}_pth-attr={attr}-lr_inv=0.1-num_steps_inv=1000-num_inits_inv=10"
                / "results"
            )
    else:
        raise NotImplementedError
    return path

In [None]:
def collect_results(data, perturb=False):
    results = None
    models = MODELS[data]
    attrs = ATTRS[data]
    for model in models:
        for attr in attrs:
            if data in ["whitewine", "redwine"]:
                for j in range(5):
                    path_results = build_path_results(data, model, attr, perturb, modelnr_wine=j)
        
                    try:
                        path = path_results / "results.json"
                        with open(path, encoding="utf-8-sig", mode="r") as f:
                            lines = f.readlines()
                        for j in range(len(lines)):
                            line = lines[j]
                            # delete last ',/n'
                            line = line.rstrip(", \n")
                            line = json.loads(line)
                            lines[j] = line
        
                        result = pd.read_json(json.dumps(lines))
                        result["model"] = result["model"] + "_" + attr
                        if results is None:
                            results = result
                        else:
                            results = pd.concat([results, result], ignore_index=True)
                    except:
                        print(f"{path_results} does not contain results.json.")
            else:
                path_results = build_path_results(data, model, attr, perturb)

                try:
                    path = path_results / "results.json"
                    with open(path, encoding="utf-8-sig", mode="r") as f:
                        lines = f.readlines()
                    for j in range(len(lines)):
                        line = lines[j]
                        # delete last ',/n'
                        line = line.rstrip(", \n")
                        line = json.loads(line)
                        lines[j] = line
    
                    result = pd.read_json(json.dumps(lines))
                    result["model"] = result["model"] + "_" + attr
                    if results is None:
                        results = result
                    else:
                        results = pd.concat([results, result], ignore_index=True)
                except:
                    print(f"{path_results} does not contain results.json.")
    results = results.drop(columns=["checkpoint"])
    return results

In [None]:
def prepare_results(data, different_arch=True, perturb_type=None, perturb_params=None):
    if perturb_type is None:
        perturb = False
    else:
        perturb = True
    df = collect_results(data, perturb)
    if perturb:
        assert perturb_params is not None, "perturb_params were not set."
        df = df[(df["perturbation"] == perturb_type) & (df["perturbation-param"].isin(perturb_params))]
        # select only immunized results
        df = df[df["model"].str.contains("_immun_")]

        if different_arch:
            # extract the method

            df["descr"] = df["model"]
            df["model"] = df["descr"].str.split("_").str[-1]

            df = df.groupby(by=["model", "perturbation-param"]).mean()
            return df.transpose()

        else:
            raise NotImplementedError
    else:
        # remove all perturbed results
        df = df[df["perturbation"].isnull()]
        df = df.drop(columns=["perturbation", "perturbation-param"])
        
        if different_arch:
            # keep just rows if my gan and other gan ends with j for j in {1,..,5} or other is "real"
            df = df[(df.other.str[-1]==df.my.str[-1]) | df.other.str.endswith("real")]
        else:
            # keep just rows if other ends with a different number than my and other is not real
            df = df[(df.other.str[-1] != df.my.str[-1]) & -(df.other.str.endswith("real")) ]
        df = df.drop(columns=["my"])
        df = df.groupby(by="model").mean()
    return df.transpose()

# Table 2: SMA with different architectures

In [None]:
def avg_results(data, fnr=0.005, different_arch=True):
    df = prepare_results(data, different_arch)
    models = MODELS[data]
    attrs = ATTRS[data]
    results_df = pd.DataFrame(columns=models, index=attrs)
    for attr in attrs:
        for model in models:
            try:
                results_df[model][attr] = df[f"{model}_{attr}"][f"fnr={fnr}"]
            except:
                print(f"No result found for model={model} and attr method {attr}.")
                results_df[model][attr] = 0

    results_df = 100 * results_df
    results_df = results_df.astype(float).round(2)
    return results_df

In [None]:
df_celeba = avg_results("celeba")
df_lsun = avg_results("lsun")
df_celeba
pd.concat([df_celeba, df_lsun], axis=1)

# Table 2: SMA with same architectures:

In [None]:
df_celeba_same = avg_results("celeba", different_arch=False)
df_lsun_same = avg_results("lsun", different_arch=False)
pd.concat([df_celeba_same, df_lsun_same], axis=1)

# Table 3: SMA with Perturbations CelebA/LSUN 

In [None]:
def avg_results_perturb(data, perturb_type, perturb_params, fnr=0.005):
    df = prepare_results(data, True, perturb_type, perturb_params)
    df = df.transpose()
    
    attrs = ATTRS[data]
    results_df = pd.DataFrame(columns=perturb_params, index=attrs)

    for perturb_param in perturb_params:
        for attr in attrs:
            try:
                results_df[perturb_param][attr] = df[f"fnr={fnr}"][attr, perturb_param]
            except:
                print(f"No result found for method={attr} and perturbation {perturb_type}: {perturb_params}.")
                results_df[perturb_param][attr] = 0

    results_df = 100 * results_df
    results_df = results_df.astype(float).round(2)
    return results_df

In [None]:
fnr = 0.005
blur_celeba = avg_results_perturb("celeba", "blur", [1, 3], fnr=fnr)
crop_celeba = avg_results_perturb("celeba", "crop", [60, 55], fnr=fnr)
noise_celeba = avg_results_perturb("celeba", "noise", [0.05, 0.1], fnr=fnr)
jpeg_celeba = avg_results_perturb("celeba", "jpeg", [90, 80], fnr=fnr)

blur_lsun = avg_results_perturb("lsun", "blur", [1, 3], fnr=fnr)
crop_lsun = avg_results_perturb("lsun", "crop", [60, 55], fnr=fnr)
noise_lsun = avg_results_perturb("lsun", "noise", [0.05, 0.1], fnr=fnr)
jpeg_lsun = avg_results_perturb("lsun", "jpeg", [90, 80], fnr=fnr)

pd.concat([blur_celeba, crop_celeba, noise_celeba, jpeg_celeba,
           blur_lsun, crop_lsun, noise_lsun, jpeg_lsun], axis=1)

In [None]:
fnr = 0.05
blur_celeba = avg_results_perturb("celeba", "blur", [1, 3], fnr=fnr)
crop_celeba = avg_results_perturb("celeba", "crop", [60, 55], fnr=fnr)
noise_celeba = avg_results_perturb("celeba", "noise", [0.05, 0.1], fnr=fnr)
jpeg_celeba = avg_results_perturb("celeba", "jpeg", [90, 80], fnr=fnr)

blur_lsun = avg_results_perturb("lsun", "blur", [1, 3], fnr=fnr)
crop_lsun = avg_results_perturb("lsun", "crop", [60, 55], fnr=fnr)
noise_lsun = avg_results_perturb("lsun", "noise", [0.05, 0.1], fnr=fnr)
jpeg_lsun = avg_results_perturb("lsun", "jpeg", [90, 80], fnr=fnr)

pd.concat([blur_celeba, crop_celeba, noise_celeba, jpeg_celeba,
           blur_lsun, crop_lsun, noise_lsun, jpeg_lsun], axis=1)

# Table 4: Stable Diffusion / StyleGAN / MediGAN:

In [None]:
def results_detailed(data, metric="mean", different_arch=True, perturb_type=None, perturb_params=None):
    if perturb_type is None:
        perturb = False
    else:
        perturb = True
    df = collect_results(data, perturb)
    if perturb:
        raise NotImplementedError
    else:
        # remove all perturbed results
        df = df[df["perturbation"].isnull()]
        df = df.drop(columns=["perturbation", "perturbation-param"])

        if data in ["lsun", "celeba"]:
            if different_arch:
                # keep just rows if my gan and other gan ends with j for j in {1,..,5} or other is "real"
                df = df[(df.other.str[-1]==df.my.str[-1]) | df.other.str.endswith("real")]
            else:
                # keep just rows if other ends with a different number than my and other is not real
                df = df[(df.other.str[-1] != df.my.str[-1]) & -(df.other.str.endswith("real")) ]
        df = df.drop(columns=["my"])
        columns_to_group = ["model", "other"]
        if data == "coco":
            columns_to_group.append("channels")
        if metric=="mean":
            df = df.groupby(by=columns_to_group).mean()
        elif metric=="std":
            df = df.groupby(by=columns_to_group).std()
        return df 

In [None]:
# Stable Diffusion: 
results_detailed("coco")

In [None]:
# StyleGAN:
results_detailed("ffhq")

In [None]:
# mediGAN:
results_detailed("bcdr")

# Table 5: Tabular Experiments:

In [None]:
results_detailed("redwine")

# Table 7 Confusion Matrix SMA:

In [None]:
def confusion_matrix(data, attr, different_arch=True, fnr=0.005, perturb_type=None, perturb_param=None):
    df = collect_results(data)
    df_confusion = pd.DataFrame(columns=["real"] + MODELS[data], index=MODELS[data])
    if perturb_type is None:
        df = df[df["perturbation"].isnull()]
        df = df.drop(columns=["perturbation", "perturbation-param"])
        if different_arch:
            df = df[df["model"].str.endswith(attr)]
            for model in MODELS[data]:
                df_confusion["real"][model] = df[(df["my"] == model + "_1") & (df["other"] == "real")][
                    f"fnr={fnr}"
                ].mean()
                for model_other in MODELS[data]:
                    if model != model_other:
                        df_confusion[model_other][model] = df[
                            (df["my"] == model + "_1") & (df["other"] == model_other + "_1")
                        ][f"fnr={fnr}"].mean()
            return df_confusion * 100 
        else:
            raise NotImplementedError()
    else:
        raise NotImplementedError()


In [None]:
for data in ["celeba", "lsun"]:
    for attr in ATTRS[data]:
        print(f"Data={data} - Attribution = {attr}")
        print(confusion_matrix(data, attr))

# Table 9 Stds small generative models: 

In [None]:
def prepare_results_std(data, different_arch=True, perturb_type=None, perturb_params=None):
    if perturb_type is None:
        perturb = False
    else:
        perturb = True
    df = collect_results(data, perturb)
    if perturb:
        assert perturb_params is not None, "perturb_params were not set."
        df = df[(df["perturbation"] == perturb_type) & (df["perturbation-param"].isin(perturb_params))]
        # select only immunized results
        df = df[df["model"].str.contains("_immun_")]

        if different_arch:
            # extract the method

            df["descr"] = df["model"]
            df["model"] = df["descr"].str.split("_").str[-1]

            df = df.groupby(by=["model", "perturbation-param"]).std()
            return df.transpose()

        else:
            raise NotImplementedError
    else:
        # remove all perturbed results
        df = df[df["perturbation"].isnull()]
        df = df.drop(columns=["perturbation", "perturbation-param"])
        if different_arch:
            # keep just rows if my gan and other gan ends with j for j in {1,..,5} or other is "real"
            df = df[(df.other.str[-1]==df.my.str[-1]) | df.other.str.endswith("real")]
        else:
            # keep just rows if other ends with a different number than my and other is not real
            df = df[(df.other.str[-1] != df.my.str[-1]) & -(df.other.str.endswith("real")) ]
            
        df = df.drop(columns=["my"])
        df = df.groupby(by="model").std()
    return df.transpose()
    
def std_results(data, fnr=0.005, different_arch=True):
    df = prepare_results_std(data, different_arch)
    models = MODELS[data]
    attrs = ATTRS[data]
    results_df = pd.DataFrame(columns=models, index=attrs)
    for attr in attrs:
        for model in models:
            try:
                results_df[model][attr] = df[f"{model}_{attr}"][f"fnr={fnr}"]
            except:
                print(f"No result found for model={model} and attr method {attr}.")
                results_df[model][attr] = 0

    results_df = 100 * results_df
    results_df = results_df.astype(float).round(2)
    return results_df

In [None]:
stds_celeba = std_results("celeba")
stds_lsun = std_results("lsun")
stds = pd.concat([stds_celeba, stds_lsun], axis=1)
stds

# Table 10 Stds small generative models - same architecture

In [None]:
stds_celeba = std_results("celeba", different_arch=False)
stds_lsun = std_results("lsun", different_arch=False)
stds = pd.concat([stds_celeba, stds_lsun], axis=1)
stds

# Table 12: Standard Deviations Stable Diffusion / StyleGAN / MediGAN 

In [None]:
# Stds Stable Diffusion
results_detailed("coco", metric="std")*1000

In [None]:
# 2. Stds StyleGAN
results_detailed("ffhq", metric="std")*1000

In [None]:
# 3. Stds MediGAN
results_detailed("bcdr", metric="std")*100

# Table 13: Whitewine results

In [None]:
results_detailed("whitewine")

# Table 14: Wine Standard deviations

In [None]:
# red wine 
std_red = results_detailed("redwine", metric="std") * 1000
std_red 

In [None]:
std_white = results_detailed("whitewine", metric="std") * 1000
std_white 