# Benchmarking Image Quality

## Imports

In [21]:
import os 
import sys
from glob import glob

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn import metrics

## Helper functions

In [22]:
def load_experiment(experiment_name):

    ### process (combined) data frame
    experiment_files = list(sorted(glob(f"{experiment_name}/*_metrics.csv")))

    df = None
    for i, experiment_file in enumerate(experiment_files):
        df_exp = pd.read_csv(experiment_file)
        if df is None:
            df = df_exp.copy()
        else:
            df = pd.concat((df, df_exp), axis=0)

    if df is None:
        print(f"No *_metrics.csv found for experiment_name={experiment_name}")
    
    ### assign experiment name
    df = df.assign(experiment_name = experiment_name)
    
    return df


def load_experiments(experiment_names):
    
    df = None
    for i, experiment_name in enumerate(experiment_names):
        df_exp = load_experiment(experiment_name)
        if df is None:
            df = df_exp.copy()
        else:
            df = pd.concat((df, df_exp), axis=0, ignore_index=True)
    return df
    

## Load result files

In [24]:
# define which experiments to load
experiment_names = [
    
    # TreeRing
    'TreeRing_ImageDistortion_gaussian_blur_r_8',
    'TreeRing_DiffWMAttacker',
    'TreeRing_VAEWMAttacker_bmshj2018-factorized',
    'TreeRing_Rinse4x',
    'TreeRing_InPaint_ReplaceBG',
    
    # StegaStamp
    'StegaStamp_ImageDistortion_gaussian_blur_r_8',
    'StegaStamp_DiffWMAttacker',
    'StegaStamp_VAEWMAttacker_bmshj2018-factorized',
    'StegaStamp_Rinse4x',
    'StegaStamp_InPaint_ReplaceBG',
    
    # StableSig
    'StableSig_ImageDistortion_gaussian_blur_r_8',
    'StableSig_DiffWMAttacker',
    'StableSig_VAEWMAttacker_bmshj2018-factorized',
    'StableSig_Rinse4x',
    'StableSig_InPaint_ReplaceBG',
    
    # Invisible
    'Invisible_ImageDistortion_gaussian_blur_r_8',
    'Invisible_DiffWMAttacker',
    'Invisible_VAEWMAttacker_bmshj2018-factorized',
    'Invisible_Rinse4x',
    'Invisible_InPaint_ReplaceBG',
    
]

In [25]:
# load the experiment result files
df_ = load_experiments(experiment_names)

# display number of successful prompts 
df_.groupby("experiment_name").count()[['prompt_index']]

Unnamed: 0_level_0,prompt_index
experiment_name,Unnamed: 1_level_1
Invisible_DiffWMAttacker,1000
Invisible_ImageDistortion_gaussian_blur_r_8,1000
Invisible_InPaint_ReplaceBG,1000
Invisible_Rinse4x,1000
Invisible_VAEWMAttacker_bmshj2018-factorized,1000
StableSig_DiffWMAttacker,1000
StableSig_ImageDistortion_gaussian_blur_r_8,1000
StableSig_InPaint_ReplaceBG,1000
StableSig_Rinse4x,1000
StableSig_VAEWMAttacker_bmshj2018-factorized,1000


## Compute additional metrics

In [26]:
# Assign watermark type
df_ = df_.assign(wm_type = [_.split('_')[0] for _ in df_.experiment_name])

## Compute Average Watermark Removal Metrics (Table 3)

In [35]:
# select watermark removal metric columns
use_columns = [
    ### clip scores
    # 'no_w_no_attack_sim', 'no_w_sim', 
    #    'w_no_attack_sim',    'w_sim',
    ### image quality scores
    # 'no_w_mse',  'no_w_no_bg_mse',
       'w_mse',     'w_no_bg_mse',
    # 'no_w_ssim', 'no_w_no_bg_ssim',
       'w_ssim',    'w_no_bg_ssim',
    # 'no_w_psnr', 'no_w_no_bg_psnr',
       'w_psnr',    'w_no_bg_psnr',
    ### other
    'w_pct_mask', 'experiment_name'
]


# extract watermark removal metrics
df = df_[use_columns].groupby("experiment_name").mean()
df = df.loc[experiment_names]

# save the dataframe to csv
save_as = "Table_03-average_image_quality_metrics.csv"
df.to_csv(save_as)
print(f"[+] {save_as}")

# show dataframe
df.round(4)

[+] Table_03-average_image_quality_metrics.csv


Unnamed: 0_level_0,w_mse,w_no_bg_mse,w_ssim,w_no_bg_ssim,w_psnr,w_no_bg_psnr,w_pct_mask
experiment_name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
TreeRing_ImageDistortion_gaussian_blur_r_8,0.0097,0.0044,0.6018,0.8419,21.2291,25.7407,0.6121
TreeRing_DiffWMAttacker,0.0031,0.0014,0.7908,0.9203,25.9545,30.7253,0.6121
TreeRing_VAEWMAttacker_bmshj2018-factorized,0.0019,0.0008,0.7972,0.9222,28.0665,32.8905,0.6121
TreeRing_Rinse4x,0.0079,0.0034,0.6314,0.8584,21.6234,26.3478,0.6121
TreeRing_InPaint_ReplaceBG,0.0643,0.0009,0.4644,0.9491,12.8322,31.71,0.6121
StegaStamp_ImageDistortion_gaussian_blur_r_8,0.0101,0.0041,0.5667,0.8483,20.9412,26.3288,0.6562
StegaStamp_DiffWMAttacker,0.0037,0.0014,0.7496,0.9149,25.0814,30.6098,0.6562
StegaStamp_VAEWMAttacker_bmshj2018-factorized,0.0022,0.0008,0.7729,0.9226,27.4918,33.0666,0.6562
StegaStamp_Rinse4x,0.0089,0.0034,0.5885,0.8605,21.042,26.6792,0.6562
StegaStamp_InPaint_ReplaceBG,0.0687,0.0012,0.4077,0.9366,12.415,30.4105,0.6562


## Compute Average Watermark Removal Metrics (Table 3-6a)

In [36]:
# define watermark types
wm_types = ["TreeRing", "StegaStamp", "StableSig", "Invisible"]

# select watermark removal metric columns
use_columns = [
    ### clip scores
    # 'no_w_no_attack_sim', 'no_w_sim', 
    #    'w_no_attack_sim',    'w_sim',
    ### image quality scores
    # 'no_w_mse',  'no_w_no_bg_mse',
       'w_mse',     'w_no_bg_mse',
    # 'no_w_ssim', 'no_w_no_bg_ssim',
       'w_ssim',    'w_no_bg_ssim',
    # 'no_w_psnr', 'no_w_no_bg_psnr',
       'w_psnr',    'w_no_bg_psnr',
    ### other
    'w_pct_mask', 'experiment_name'
]


# define threshold
min_pct_mask = 0.00
max_pct_mask = 1.00

# store dfs 
dfs = dict()

# loop over watermark types
for i,wm_type in enumerate(wm_types):

    # filter results by {wm_type}_InPaint_ReplaceBG
    df_experiment = df_[df_.experiment_name.isin([f"{wm_type}_InPaint_ReplaceBG"])]

    # filter prompts s.t. (mask pct > 50)
    df_experiment_filter = df_experiment[
        (df_experiment.w_pct_mask >= min_pct_mask)
      & (df_experiment.w_pct_mask <= max_pct_mask)
    ] 
    good_prompts = list(df_experiment_filter.prompt_index.unique())
    # print(f"wm_type={wm_type}, n_good_prompts={len(good_prompts)}")
    
    # filter all results by watermark type
    df_wm = df_[df_.wm_type.isin([wm_type])]
    
    # filter all results based on good prompts
    df_wm_good = df_wm[df_wm.prompt_index.isin(good_prompts)]
    df_wm_good = df_wm_good[use_columns].groupby("experiment_name").mean()
    df_wm_good = df_wm_good.loc[[_ for _ in experiment_names if _.startswith(wm_type)]]
    
    # save scores
    save_as = f"Table_{i+3:02d}a-average_image_quality_metrics_{wm_type}_w_pct_mask_le_{min_pct_mask:0.2f}_ge_{max_pct_mask:0.2f}.csv"
    df_wm_good.to_csv(save_as)
    print(f"[+] {save_as}")
    
    # store df
    dfs[wm_type] = df_wm_good

[+] Table_03a-average_image_quality_metrics_TreeRing_w_pct_mask_le_0.00_ge_1.00.csv
[+] Table_04a-average_image_quality_metrics_StegaStamp_w_pct_mask_le_0.00_ge_1.00.csv
[+] Table_05a-average_image_quality_metrics_StableSig_w_pct_mask_le_0.00_ge_1.00.csv
[+] Table_06a-average_image_quality_metrics_Invisible_w_pct_mask_le_0.00_ge_1.00.csv


## Compute Average Watermark Removal Metrics (Table 3b-6b)

In [40]:
# define watermark types
wm_types = ["TreeRing", "StegaStamp", "StableSig", "Invisible"]

# select watermark removal metric columns
use_columns = [
    ### clip scores
    # 'no_w_no_attack_sim', 'no_w_sim', 
    #    'w_no_attack_sim',    'w_sim',
    ### image quality scores
    # 'no_w_mse',  'no_w_no_bg_mse',
       'w_mse',     'w_no_bg_mse',
    # 'no_w_ssim', 'no_w_no_bg_ssim',
       'w_ssim',    'w_no_bg_ssim',
    # 'no_w_psnr', 'no_w_no_bg_psnr',
       'w_psnr',    'w_no_bg_psnr',
    ### other
    'w_pct_mask', 'experiment_name'
]

# define threshold
min_pct_mask = 0.50
max_pct_mask = 0.80

# store dfs 
dfs = dict()

# loop over watermark types
for i,wm_type in enumerate(wm_types):

    # filter results by {wm_type}_InPaint_ReplaceBG
    df_experiment = df_[df_.experiment_name.isin([f"{wm_type}_InPaint_ReplaceBG"])]

    # filter prompts s.t. (mask pct > 50)
    df_experiment_filter = df_experiment[
        (df_experiment.w_pct_mask >= min_pct_mask)
      & (df_experiment.w_pct_mask <= max_pct_mask)
    ] 
    good_prompts = list(df_experiment_filter.prompt_index.unique())
    # print(f"wm_type={wm_type}, n_good_prompts={len(good_prompts)}")
    
    # filter all results by watermark type
    df_wm = df_[df_.wm_type.isin([wm_type])]
    
    # filter all results based on good prompts
    df_wm_good = df_wm[df_wm.prompt_index.isin(good_prompts)]
    df_wm_good = df_wm_good[use_columns].groupby("experiment_name").mean()
    df_wm_good = df_wm_good.loc[[_ for _ in experiment_names if _.startswith(wm_type)]]
    
    # save scores
    save_as = f"Table_{i+3:02d}b-average_image_quality_metrics_{wm_type}_w_pct_mask_le_{min_pct_mask:0.2f}_ge_{max_pct_mask:0.2f}.csv"
    df_wm_good.to_csv(save_as)
    print(f"[+] {save_as}")
    
    # store df
    dfs[wm_type] = df_wm_good

[+] Table_03b-average_image_quality_metrics_TreeRing_w_pct_mask_le_0.50_ge_0.80.csv
[+] Table_04b-average_image_quality_metrics_StegaStamp_w_pct_mask_le_0.50_ge_0.80.csv
[+] Table_05b-average_image_quality_metrics_StableSig_w_pct_mask_le_0.50_ge_0.80.csv
[+] Table_06b-average_image_quality_metrics_Invisible_w_pct_mask_le_0.50_ge_0.80.csv
