## Load Data

In [None]:
import os
import pickle
import numpy as np
from itertools import permutations, combinations
import pandas as pd
import torch
import matplotlib.pyplot as plt
from src.mr_utils import mrs

mr_num = len(mrs)

strength_max = 5
datasets = ['MNIST', 'Caltech256', 'VOC', 'COCO', 'UTKFace']
test_cases_num = {'MNIST': 10000, 'Caltech256': 3061, 'VOC': 4952, 'COCO': 40775, 'UTKFace': 3287}
models = {'MNIST': ['AlexNet'],
		'Caltech256': ['DenseNet121'],
		'VOC': ['MSRN'],
		'COCO': ['MLD'],
  		'UTKFace': ['Faceptor']}
model_names = ['MNIST_AlexNet_9938', 'Caltech256_DenseNet121_6838', 'VOC_MSRN', 'COCO_MLD', 'UTKFace_Faceptor']
augmented_model_names = [
	'MNIST_AlexNet_Aug_online_9938', 'Caltech256_DenseNet121_Aug_online_7187', 'VOC_MSRN_Aug_online', 'COCO_MLD_Aug_online', 'UTKFace_Faceptor_Aug_online'
]

In [None]:
def get_model_name(dataset, model, augment=None):
	if not augment:
		for i in range(len(model_names)):
			if f"{dataset}_{model}" in model_names[i]:
				return model_names[i]
	else:
		for i in range(len(augmented_model_names)):
			if f"{dataset}_{model}_Aug_{augment}" in augmented_model_names[i]:
				return augmented_model_names[i]

def get_validity(dataset):
	filename_validity = 'results/validity/' + dataset + '_validity.npy'
	filename_threshold = 'results/validity/' + dataset + '_threshold.txt'
	validity = np.load(filename_validity, allow_pickle=True).item()
	with open(filename_threshold) as f:
		lines = f.readlines()
		threshold = float(lines[1].split(':')[1].strip())
	# print(threshold)
	for mr in validity:
		for i in range(len(validity[mr])):
			if validity[mr][i] <= threshold:
				validity[mr][i] = True
			else:
				validity[mr][i] = False
	return validity

In [None]:
failures_all, faults_all, total_fault_num_all = {}, {}, {}
validity_all = {}
for model_name in model_names:
	try:
		print(model_name, end=' ')
		with open(f'results/errors/failure_{model_name}.pkl', 'rb') as f:
			failure = pickle.load(f)
		failures_all[model_name] = failure
		with open(f'results/errors/fault_{model_name}.pkl', 'rb') as f:
			faults = pickle.load(f)
		faults_all[model_name] = faults
		total_fault_num_all[model_name] = len(set.union(*[set(f) for f in faults.values()]))
		if model_name.split('_')[0] not in validity_all:
			validity = get_validity(model_name.split('_')[0])
			validity_all[model_name.split('_')[0]] = validity
		print(f"done, {len(failure)=}, {len(faults)=}, {total_fault_num_all[model_name]=}.")
	except Exception as e:
		print(f"failed: {e}")

## RQ1 Validity

How does the validity of follow-up test inputs generated by CMRs?

In [None]:
for i, dataset in enumerate(datasets):
	result_selfOracle = validity_all[dataset]

	data = [[] for _ in range(strength_max)]
	for cmr in result_selfOracle.keys():
		if len(cmr) > strength_max:
			continue
		data[len(cmr)-1].append(len(np.where(np.array(result_selfOracle[cmr]) == True)[0]) / 
									len(result_selfOracle[(0,)]) * 100)

	plt.figure(figsize=(3, 3))
	plt.boxplot(data, patch_artist=False)

	plt.xlabel(f'Composition strength ($k$)')
	plt.ylabel('Proportion of valid images')

	if dataset != 'MNIST':
		plt.ylim(96, 100.1)
		plt.yticks([96, 97, 98, 99, 100])

	plt.tight_layout()
	plt.savefig(f'figures/RQ1/{dataset}_validity.pdf', dpi=600)
	# plt.show()
	plt.close()

In [None]:
import pandas as pd
import os

human_validation_df = pd.read_csv("results/validity/human_validation.csv")
selected_check_num = {
    "valid": {},
    "invalid": {}
}
human_selected_invalid = {
    "valid": {},
    "invalid": {}
}
for dataset in datasets:
    for val in ['valid', 'invalid']:
        for strength in range(1, strength_max + 1):
            dir_path = os.path.join("data", "human_validation", dataset, str(strength), val)
            selected_check_num[val][dataset] = selected_check_num[val].get(dataset, 0) + len(os.listdir(dir_path))
        human_selected_invalid[val][dataset] = len(human_validation_df[
            (human_validation_df['dataset'] == dataset) &
            (human_validation_df['self_oracle_label'] == (1 if val == 'valid' else 0))
        ])

print(selected_check_num)
print(human_selected_invalid)

In [None]:

def get_tf_pn(dataset):
    true_positive = selected_check_num["valid"][dataset] - human_selected_invalid["valid"][dataset]
    true_negative = human_selected_invalid["invalid"][dataset]
    false_positive = human_selected_invalid["valid"][dataset]
    false_negative = selected_check_num["invalid"][dataset] - human_selected_invalid["invalid"][dataset]
    return true_positive, true_negative, false_positive, false_negative
print("Dataset & # Valid & # Invalid & # False Positive & # False Negative \\\\")
for dataset in datasets:
    tp, tn, fp, fn = get_tf_pn(dataset)
    print(f"{dataset} & {selected_check_num['valid'][dataset]} & {selected_check_num['invalid'][dataset]} & {fp} & {fn} \\\\")

## RQ2 Overall Test Effectiveness

How effective are CMRs in DNN testing compared to their respective component MRs?

In [None]:
# Compare the total failure ratio and the total number of faults

for dataset in datasets:
	for model in models[dataset]:
		print(dataset, model)
		model_name = get_model_name(dataset, model)
		selected_source_num = test_cases_num[dataset]
		selected_cmr_num = 0
		failure_CMR_MAX, failure_CMR_MEAN, failure_CMR_ALL = 0, 0, 0
		fault_CMR_MAX, fault_CMR_MEAN, fault_CMR_ALL = 0, 0, 0

		fault_cmr, fault_max, fault_union = 0, 0, 0
		fault1, fault2, fault3, fault4, fault5 = 0, 0, 0, 0, 0

		for i in range(2, strength_max+1):
			for cmr in permutations(range(mr_num), i):
				selected_cmr_num += 1
				failure_com, fault_com = [], []
				failure_com_count = 0
				failure_com_union = set()
				fault_com_union = set()
				fault_com_max = set()
				for com in cmr:
					failure_com_union.update(failures_all[model_name][(com,)])
					failure_com.append(len(failures_all[model_name][(com,)]))
					failure_com_count += len(failures_all[model_name][(com,)])
					fault_com.append(len(set(faults_all[model_name][(com,)].values())))
					fault_com_union.update(set(faults_all[model_name][(com,)].values()))
					if len(set(faults_all[model_name][(com,)].values())) > len(fault_com_max):
						fault_com_max = set(faults_all[model_name][(com,)].values())

				failure_CMR_MAX += (len(failures_all[model_name][cmr]) - max(failure_com)) / selected_source_num
				failure_CMR_MEAN += (len(failures_all[model_name][cmr]) - sum(failure_com) / i) / selected_source_num
				failure_CMR_ALL += (len(failures_all[model_name][cmr]) - failure_com_count / i) / selected_source_num

				fault_CMR_MAX += (len(set(faults_all[model_name][cmr].values())) - max(fault_com)) / total_fault_num_all[model_name]
				fault_CMR_MEAN += (len(set(faults_all[model_name][cmr].values())) - sum(fault_com) / i) / total_fault_num_all[model_name]
				fault_CMR_ALL += (len(set(faults_all[model_name][cmr].values())) - len(fault_com_union) / i) / total_fault_num_all[model_name]

				fault_cmr += len(set(faults_all[model_name][cmr].values()))
				fault_max += max(fault_com)
				fault_union += len(fault_com_union)

				fault1 += len(set(faults_all[model_name][cmr].values()) - fault_com_union)
				fault2 += len((set(faults_all[model_name][cmr].values()) & fault_com_union) - fault_com_max)
				fault3 += len(set(faults_all[model_name][cmr].values()) & fault_com_max)
				fault4 += len(fault_com_max - set(faults_all[model_name][cmr].values()))
				fault5 += len(fault_com_union - (set(faults_all[model_name][cmr].values()) | fault_com_max))

		dfr_max = failure_CMR_MAX / selected_cmr_num
		dfr_mean = failure_CMR_MEAN / selected_cmr_num
		dfr_all = failure_CMR_ALL / selected_cmr_num
		dft_max = fault_CMR_MAX / selected_cmr_num
		dft_mean = fault_CMR_MEAN / selected_cmr_num
		dft_all = fault_CMR_ALL / selected_cmr_num
		fault_cmr, fault_max, fault_union = fault_cmr/selected_cmr_num, fault_max/selected_cmr_num, fault_union/selected_cmr_num
		fault1, fault2, fault3, fault4, fault5 = fault1/selected_cmr_num, fault2/selected_cmr_num, fault3/selected_cmr_num, fault4/selected_cmr_num, fault5/selected_cmr_num
		print(f" & {dfr_max*100:.2f}\% & {dfr_mean*100:.2f}\% & {dfr_all*100:.2f}\% & {dft_max*100:.2f}\% & {dft_mean*100:.2f}\% & {dft_all*100:.2f}\% \\\\")
		print(f"{total_fault_num_all[model_name]} {fault_cmr:.1f} {fault_max:.1f} {fault_union:.1f}")
		print(f"{fault1:.0f} {fault2:.0f} {fault3:.0f} {fault4:.0f} {fault5:.0f}")

	print('-------------------------------------------------------------------------------')


## RQ3 Influence of CMR Creation Methods

How do the different ways in creating CMRs influence their test effectiveness in DNN testing?

### RQ3.1

How does the number of component MRs influence the test effectiveness of CMRs?

In [None]:
# draw box plot of Failure
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

def compare_with_com_failure_4boxplot(dataset, model):
	model_name = get_model_name(dataset, model)
	result_delta_max, result_delta_mean = [], []
	source_num = test_cases_num[dataset]
	for i in range(1, strength_max): 
		delta_max, delta_mean = [], []
		cmr_num = 0
		for cmr in permutations(range(mr_num), i + 1):
			cmr_num += 1
			cmr_failure_ratio = len(failures_all[model_name][cmr]) / source_num
			com_failure_ratio = []
			for mr in cmr:
				mr = (mr,)
				com_failure_ratio.append(len(failures_all[model_name][mr]) / source_num)
			delta_max.append(cmr_failure_ratio - max(com_failure_ratio))
			delta_mean.append(cmr_failure_ratio - sum(com_failure_ratio) / len(com_failure_ratio))
		result_delta_max.append(delta_max)
		result_delta_mean.append(delta_mean)
	return result_delta_max, result_delta_mean

def draw_boxplot_failure(dataset, model):
	delta_max, delta_mean = compare_with_com_failure_4boxplot(dataset, model)
	x_labels = range(2, strength_max + 1)
	group_labels = [r"$\mathit{DFR}_{\mathit{best}}$", r"$\mathit{DFR}_{\mathit{mean}}$"]

	records = []
	for i, x in enumerate(x_labels):
		for group, data in zip(group_labels, [delta_max[i], delta_mean[i]]):
			for value in data:
				records.append({'X': x, 'Group': group, 'Value': value})
	df = pd.DataFrame(records)

	plt.figure(figsize=(4, 4))
	sns.boxplot(x='X', y='Value', hue='Group', data=df, palette='Set2', width=0.6, flierprops=dict(marker='o', markersize=3, linestyle='none') )
	#plt.title('Failure ' + dataset)
	plt.xlabel(f'$k$')
	plt.ylabel('Difference in failure ratio')
	plt.legend()
	plt.tight_layout()
	plt.savefig(f"figures/RQ3.1/failure_{dataset}_{model}_box_plot.pdf", dpi=300, bbox_inches='tight')
	#plt.show()
	plt.close()

for dataset in datasets:
	for model in models[dataset]:
		draw_boxplot_failure(dataset, model)

In [None]:
# draw box plot of Fault
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

def compare_with_com_fault_4boxplot(dataset, model):
	model_name = get_model_name(dataset, model)
	result_delta_union, result_delta_max, result_delta_mean = [], [], []
	result_cmr_union_insert, result_cmr_union_diff = [], []
	for i in range(1, strength_max):
		delta_union, delta_max, delta_mean = [], [], []
		fault_cmr_union_insert, fault_cmr_minus_union, fault_union_minus_cmr = [], [], []
		for cmr in permutations(range(mr_num), i + 1):
			f_cmr = set(faults_all[model_name][cmr].values())
			com_fault = []
			com_fault_union = set()
			for mr in cmr:
				mr = (mr,)
				com_fault.append(len(set(faults_all[model_name][mr].values())))
				com_fault_union.update(faults_all[model_name][mr].values())

			delta_union.append(len(f_cmr) / total_fault_num_all[model_name] - len(com_fault_union) / len(com_fault) / total_fault_num_all[model_name])
			delta_max.append(len(f_cmr) / total_fault_num_all[model_name] - max(com_fault) / total_fault_num_all[model_name])
			delta_mean.append(len(f_cmr) / total_fault_num_all[model_name] - sum(com_fault) / len(com_fault) / total_fault_num_all[model_name])

			fault_cmr_union_insert.append(len(f_cmr & com_fault_union) / total_fault_num_all[model_name])
			fault_cmr_minus_union.append(len(f_cmr - com_fault_union) / total_fault_num_all[model_name])
		result_delta_union.append(delta_union)
		result_delta_max.append(delta_max)
		result_delta_mean.append(delta_mean)
		result_cmr_union_insert.append(fault_cmr_union_insert)
		result_cmr_union_diff.append(fault_cmr_minus_union)
	#return result_delta_union, result_delta_max, result_delta_mean, result_cmr_union_insert, result_cmr_union_diff
	return result_delta_max, result_delta_mean

def draw_boxplot_fault(dataset, model):
	delta_max, delta_mean = compare_with_com_fault_4boxplot(dataset, model)

	x_labels = range(2, strength_max + 1)
	group_labels = [r"$\mathit{DFT}_{\mathit{best}}$", r"$\mathit{DFT}_{\mathit{mean}}$"]

	records = []
	for i, x in enumerate(x_labels):
		for group, data in zip(group_labels, [delta_max[i], delta_mean[i]]):
			for value in data:
				records.append({'X': x, 'Group': group, 'Value': value})

	df = pd.DataFrame(records)

	plt.figure(figsize=(4, 4))
	sns.boxplot(x='X', y='Value', hue='Group', data=df, palette='Set2', width=0.6, flierprops=dict(marker='o', markersize=3, linestyle='none') )
	#plt.title('Failure ' + dataset)

	plt.xlabel(f'$k$')
	plt.ylabel('Difference in size of fault types')
	plt.legend()
	plt.tight_layout()
	plt.savefig(f"figures/RQ3.1/fault_{dataset}_{model}_box_plot.pdf", dpi=300, bbox_inches='tight')
	# plt.show()
	plt.close()

for dataset in datasets:
	for model in models[dataset]:
		draw_boxplot_fault(dataset, model)

### RQ3.2 Component

How do the different choices of component MRs influence the test effectiveness of CMRs?

In [None]:
# check delta Effectiveness w.r.t. Failure
from sklearn.decomposition import PCA
from scipy.stats import spearmanr
import re

pca_n_components = 8

for dataset in datasets:
	validity_followup = validity_all[dataset]
	if dataset == 'MNIST':
		extractor = 'lenet5'
	elif dataset == 'UTKFace':
		extractor = 'insightface'
	else:
		extractor = 'vgg16'
	save_path = os.path.join('results', 'features', extractor, f'{dataset}.pt')
	source__features = torch.load(save_path)
	followup__features_followup = []
	for i in range(mr_num):
		save_path = os.path.join('results', 'features', extractor, f'{dataset}_{i}.pt')
		followup__features_followup.append(torch.load(save_path))
	all_features = [source__features] + followup__features_followup
	all_features = [f.cpu().numpy() for f in all_features]
	all_features = np.concatenate(all_features, axis=0)
	pca = PCA(n_components=pca_n_components)
	reduced_features = pca.fit_transform(all_features)
	source_length = len(source__features)
	source_features = reduced_features[:source_length]
	followup_features = reduced_features[source_length:]
	followup_separated = [followup_features[i * source_length:(i + 1) * source_length] for i in range(7)]
 
	for model in models[dataset]:
		print(dataset, model)
		model_name = get_model_name(dataset, model)
		for i in range(2, strength_max+1):
			deltadis, deltaeff = [], []
			for com in combinations(range(mr_num), i):
				select_com = com[0]
				for j in range(1, len(com)):
					if len(failures_all[model_name][(com[j],)]) / sum(validity_followup[(com[j],)]) > len(failures_all[model_name][(select_com,)]) / sum(validity_followup[(select_com,)]):
						select_com = com[j]
				delta_d = []
				delta_e = []
				for cmr in permutations(com):
					d, f, f_c, v_p_num = 0, 0, 0, 0
					for index in range(source_length):
						if validity_followup[cmr][index] and validity_followup[(select_com,)][index]:
							v_p_num += 1
							delta_com = []
							for c in cmr:
								delta_com.append(followup_separated[c][index] - source_features[index])
							combined = sum(delta_com)
							delta = followup_separated[select_com][index] - source_features[index]
							d += np.linalg.norm(combined) - np.linalg.norm(delta)

							if index in failures_all[model_name][(select_com,)]:
								f_c += 1
							if index in failures_all[model_name][cmr]:
								f += 1
					delta_d.append(d / v_p_num)
					delta_e.append((f - f_c) / v_p_num)
				deltadis.append(sum(delta_d) / len(delta_d))
				deltaeff.append(sum(delta_e) / len(delta_e))
			correlation, p_value = spearmanr(deltadis, deltaeff)
			p_value_str = re.sub(r'e([+-])0+(\d+)', r'e\1\2', f'{p_value:.1e}')
			print(f"& {i} & {correlation:.2f} & {p_value_str}")
			plt.figure(figsize=(4, 3.5))
			plt.scatter(deltadis, deltaeff,  color="#002EA6", s=15)
			plt.xlabel(r"$\Delta_M$")
			plt.ylabel(r"$\mathit{DFR}_{\mathit{best}}$")

			plt.tight_layout()
			plt.savefig(f"figures/RQ3.2/failure_{dataset}_{model}_{i}_scatter_plot.pdf", dpi=300, bbox_inches='tight')
			# plt.show()
			plt.close()

In [None]:
# check delta Effectiveness w.r.t. Fault
from sklearn.decomposition import PCA
from scipy.stats import spearmanr
import re

for dataset in datasets:
	validity_followup = validity_all[dataset]
	if dataset == 'MNIST':
		extractor = 'lenet5'
	elif dataset == 'UTKFace':
		extractor = 'insightface'
	else:
		extractor = 'vgg16'
	save_path = os.path.join('results', 'features', extractor, f'{dataset}.pt')
	source__features = torch.load(save_path)
	followup__features_followup = []
	for i in range(mr_num):
		save_path = os.path.join('results', 'features', extractor, f'{dataset}_{i}.pt')
		followup__features_followup.append(torch.load(save_path))
	all_features = [source__features] + followup__features_followup
	all_features = [f.cpu().numpy() for f in all_features]
	all_features = np.concatenate(all_features, axis=0)
	pca = PCA(n_components=pca_n_components)
	reduced_features = pca.fit_transform(all_features)
	source_length = len(source__features)
	source_features = reduced_features[:source_length]
	followup_features = reduced_features[source_length:]
	followup_separated = [followup_features[i * source_length:(i + 1) * source_length] for i in range(7)]

	for model in models[dataset]:
		print(dataset, model)
		model_name = get_model_name(dataset, model)
		for i in range(2, strength_max+1):
			deltadis, deltaeff = [], []
			for com in combinations(range(mr_num), i):
				select_com = com[0]
				for j in range(1, len(com)):
					if (len(set(faults_all[model_name][(com[j],)].values())) > len(set(faults_all[model_name][(select_com,)].values()))):
						select_com = com[j]

				delta_d = []
				delta_e = []
				for cmr in permutations(com):
					d, f, f_c, v_p_num = 0, set(), set(), 0
					for index in range(source_length):
						if validity_followup[cmr][index] and validity_followup[(select_com,)][index]:
							v_p_num += 1

							delta_com = []
							for c in cmr:
								delta_com.append(followup_separated[c][index] - source_features[index])
							combined = sum(delta_com)
							delta = followup_separated[select_com][index] - source_features[index]
							d += np.linalg.norm(combined) - np.linalg.norm(delta)

							#print(cmr, d, index)
							if index in failures_all[model_name][(select_com,)]:
								f_c.add(faults_all[model_name][(select_com,)][index])
							if index in failures_all[model_name][cmr]:
								f.add(faults_all[model_name][cmr][index])
					delta_d.append(d / v_p_num)
					delta_e.append((len(f) - len(f_c)))
				#print(com, select_com, delta_d, delta_e)
				deltadis.append(sum(delta_d) / len(delta_d))
				deltaeff.append(sum(delta_e) / len(delta_e) / total_fault_num_all[model_name])
			correlation, p_value = spearmanr(deltadis, deltaeff)
			p_value_str = re.sub(r'e([+-])0+(\d+)', r'e\1\2', f'{p_value:.1e}')
			print(f"& {i} & {correlation:.2f} & {p_value:.1e}")
			plt.figure(figsize=(4, 3.5))
			plt.scatter(deltadis, deltaeff,  color="#002EA6", s=15)
			plt.xlabel(r"$\Delta_M$")
			plt.ylabel(r"$\mathit{DFT}_{\mathit{best}}$")

			plt.tight_layout()
			plt.savefig(f"figures/RQ3.2/fault_{dataset}_{model}_{i}_scatter_plot.pdf", dpi=300, bbox_inches='tight')
			# plt.show()
			plt.close()

### RQ3.3 Sequence

How do the different composition sequences influence the test effectiveness of CMRs?

In [None]:
def sequence_effect(dataset, model):
	model_name = get_model_name(dataset, model)
	print(model_name)
	source_num = test_cases_num[dataset]
	for k in range(1, strength_max):
		k = k + 1
		dfr_max_mean, dfr_max_std, dfr_mean_mean, dfr_mean_std = [], [], [], []
		dft_max_mean, dft_max_std, dft_mean_mean, dft_mean_std = [], [], [], []
		for com in combinations(range(mr_num), k):
			dfr_max_com, dfr_mean_com, dft_max_com, dft_mean_com = [], [], [], []
			fr_max, fr_mean = 0, 0
			ft_max, ft_mean = 0, 0
			for mr in com:
				fr_tmp = len(failures_all[model_name][(mr,)]) / source_num
				fr_mean += fr_tmp
				if fr_tmp > fr_max:
					fr_max = fr_tmp
				ft_tmp = len(set(faults_all[model_name][(mr,)].values())) / total_fault_num_all[model_name]
				ft_mean += ft_tmp
				if ft_tmp > ft_max:
					ft_max = ft_tmp
			fr_mean, ft_mean = fr_mean / len(com), ft_mean / len(com)
			for per in permutations(com):
				fr = len(failures_all[model_name][per]) / source_num
				ft = len(set(faults_all[model_name][per].values())) / total_fault_num_all[model_name]
				dfr_max_com.append(fr - fr_max)
				dfr_mean_com.append(fr - fr_mean)
				dft_max_com.append(ft - ft_max)
				dft_mean_com.append(ft - ft_mean)

			dfr_max_mean.append(np.mean(dfr_max_com))
			dfr_max_std.append(np.std(dfr_max_com))
			dfr_mean_mean.append(np.mean(dfr_mean_com))
			dfr_mean_std.append(np.std(dfr_mean_com))
			dft_max_mean.append(np.mean(dft_max_com))
			dft_max_std.append(np.std(dft_max_com))
			dft_mean_mean.append(np.mean(dft_mean_com))
			dft_mean_std.append(np.std(dft_mean_com))

		dfr_max_mean, dfr_max_std, dfr_mean_mean, dfr_mean_std = np.mean(dfr_max_mean), np.mean(dfr_max_std), np.mean(dfr_mean_mean), np.mean(dfr_mean_std)
		dft_max_mean, dft_max_std, dft_mean_mean, dft_mean_std = np.mean(dft_max_mean), np.mean(dft_max_std), np.mean(dft_mean_mean), np.mean(dft_mean_std)
		print(f"& {k} & {dfr_max_mean*100:.2f}\\% & {dfr_max_std*100:.2f}\\% & {dfr_mean_mean*100:.2f}\\% & {dfr_mean_std*100:.2f}\\% & "
			f"{dft_max_mean*100:.2f}\\% & {dft_max_std*100:.2f}\\% & {dft_mean_mean*100:.2f}\\% & {dft_mean_std*100:.2f}\\% \\\\")

def compare_sequence():
	for dataset in datasets:
		for _, model in enumerate(models[dataset]):
			sequence_effect(dataset, model)

In [None]:
compare_sequence()

## RQ4 Data Augmentation

In [None]:
# Load selected CMRs and selected image indices
selected_cmr_num = 50
selected_source_num = 1000
selected_cmrs_all = {}
selected_indices_all = {}
for model_name in model_names:
	try:
		print(model_name, end=' ')
		with open(f'results/samples/{model_name}_cmr{selected_cmr_num}.pkl', 'rb') as f:
			selected_cmrs = pickle.load(f)
		selected_cmrs_all[model_name] = selected_cmrs
		if selected_source_num == 0:
			selected_indices = list(range(test_cases_num[model_name.split('_')[0]]))
		else:
			with open(f'results/samples/{model_name.split("_")[0]}_{selected_source_num}.pkl', 'rb') as f:
				selected_indices = pickle.load(f)
		selected_indices_all[model_name.split('_')[0]] = selected_indices
		print(f"done, {len(selected_indices)=}, {selected_cmrs.keys()=}.")
	except Exception as e:
		print(f"failed: {e}")

In [None]:
# Load failure and fault for selected image indices for original models
selected_failures_all, selected_faults_all = {}, {}
for model_name in model_names:
	try:
		print(model_name, end=' ')
		with open(f'results/errors/failure_{model_name}_{selected_source_num}.pkl', 'rb') as f:
			failure = pickle.load(f)
		selected_failures_all[model_name] = failure
		with open(f'results/errors/fault_{model_name}_{selected_source_num}.pkl', 'rb') as f:
			faults = pickle.load(f)
		selected_faults_all[model_name] = faults
		print(f"done, {len(failure)=}, {len(faults)=}.")
	except Exception as e:
		print(f"failed: {e}")

In [None]:
# Load augmented model results
aug_failures_all, aug_faults_all = {}, {}
for model_name in augmented_model_names:
	try:
		print(model_name, end=' ')
		with open(f'results/errors/failure_{model_name}_{selected_source_num}.pkl', 'rb') as f:
			failure = pickle.load(f)
		aug_failures_all[model_name] = failure
		with open(f'results/errors/fault_{model_name}_{selected_source_num}.pkl', 'rb') as f:
			faults = pickle.load(f)
		aug_faults_all[model_name] = faults
		print(f"done, {len(failure)=}, {len(faults)=}.")
	except Exception as e:
		print(f"failed: {e}")

In [None]:
# Compare the total failure ratio and the total number of faults
def compare_selected_cmrs(model_name, cmrs, total_fault_num):
	dataset = model_name.split('_')[0]
	source_num = len(selected_indices_all[dataset])
	cmr_num = 0
	failure_CMR, failure_MR_MAX, failure_MR_MEAN = 0, 0, 0
	fault_CMR, fault_MR_MAX, fault_MR_MEAN = 0, 0, 0

	if 'Aug' in model_name:
		failures, faults = aug_failures_all[model_name], aug_faults_all[model_name]
	else:
		failures, faults = selected_failures_all[model_name], selected_faults_all[model_name]

	for cmr in cmrs:
		i = len(cmr)
		cmr_num += 1
		failure_com, fault_com = [], []
		fault_com_union = set()
		for com in cmr:
			failure_com.append(len(failures[(com,)]))
			fault_com.append(len(set(faults[(com,)].values())))
			fault_com_union.update(set(faults[(com,)].values()))

		failure_CMR += len(failures[cmr]) / source_num
		failure_MR_MAX += max(failure_com) / source_num
		failure_MR_MEAN += sum(failure_com) / i / source_num

		fault_CMR += len(set(faults[cmr].values())) / total_fault_num
		fault_MR_MAX += max(fault_com) / total_fault_num
		fault_MR_MEAN += len(fault_com_union) / i / total_fault_num

	failure_cmr = failure_CMR / cmr_num
	failure_mr_max, failure_mr_mean = failure_MR_MAX / cmr_num, failure_MR_MEAN / cmr_num
	fault_cmr = fault_CMR / cmr_num
	fault_mr_max, fault_mr_mean = fault_MR_MAX / cmr_num, fault_MR_MEAN / cmr_num

	return [
		failure_cmr, failure_mr_max, failure_mr_mean,
		fault_cmr, fault_mr_max, fault_mr_mean
	]

In [None]:
import pandas as pd

compare_aug_method = [None, "online"]
df = pd.DataFrame(columns=[
	'Dataset', 'Model', 'Augmentation',
	'failure_mr_max', 'failure_mr_mean', 'failure_cmr',
	'fault_mr_max', 'fault_mr_mean', 'fault_cmr',
])

for dataset in datasets:
	for model in models[dataset]:
		for m in [f'random_{i}' for i in range(5)]:
			selected_cmrs = selected_cmrs_all[get_model_name(dataset, model)][m]
			total_fault_num = 0
			# print(selected_cmrs)
			for aug in compare_aug_method:
				model_name = get_model_name(dataset, model, augment=aug)
				if not aug:
					total_fault_num += len(set.union(*[set(f) for k, f in faults_all[model_name].items() if k in selected_cmrs]))
				else:
					total_fault_num += len(set.union(*[set(f) for k, f in aug_faults_all[model_name].items() if k in selected_cmrs]))
			for aug in compare_aug_method:
				model_name = get_model_name(dataset, model, augment=aug)
				# print(f"{model_name:<29}", f"{m:<12}", end='\t')
				res = compare_selected_cmrs(model_name, selected_cmrs, total_fault_num)
				df.loc[len(df)] = [
					dataset, model, ('None' if not aug else aug),
					res[1] * 100, res[2] * 100, res[0] * 100,
					res[4] * 100, res[5] * 100, res[3] * 100,
				]
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)
print(df.groupby(['Dataset', 'Model', 'Augmentation']).mean().sort_values(by=['Dataset'], key=lambda x: [datasets.index(i) for i in x]))
