In [None]:
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from joblib import Parallel, delayed
import matplotlib
from utils.testing import Seq_C_2ST as Seq_C_2ST1
from utils.testing_ftrl import Seq_C_2ST as Seq_C_2ST2
from utils.testing_op_ftrl import Seq_C_2ST as Seq_C_2ST3
from random import shuffle
import json
import os

In [None]:
sns.set(style="whitegrid",
        font_scale=1.4,
        rc={
            "lines.linewidth": 2,
#             "axes.facecolor": ".9",
            'figure.figsize': (8, 6)
        })
sns.set_palette('Set2')
matplotlib.rcParams['text.usetex'] = True

In [None]:
%load_ext autoreload
%autoreload 2

First sims

In [None]:
im = plt.imread('KDEF_and_AKDEF/KDEF_cropped_gray_group/AN/AF01ANS.JPG')

In [None]:
# Scale images to the [0, 1] range
x_train = im.astype("float32") / 255

In [None]:
plt.imshow(im, cmap='gray')

In [None]:
# create a training dataset
import os

cand_labels = ['AF', 'AN', 'DI', 'HA', 'NE', 'SA', 'SU']
targ_dir = 'KDEF_and_AKDEF/KDEF_cropped_gray_group'
list_images = list()
list_labels = list()
total_cnt = 0
for cur_ind, cur_label in enumerate(cand_labels):
    cur_dir = os.path.join(targ_dir, cur_label)
    cur_cnt = 0
    for fname in os.listdir(cur_dir):
        if fname.endswith('.JPG'):
            cur_cnt +=1
            img_src = os.path.join(cur_dir, fname)
            im = plt.imread(img_src).astype("float32") / 255
            list_images += [im.copy()]
            list_labels += [cur_ind]


In [None]:
# create a training dataset
import os

pos_em = ['HA', 'NE', 'SU']
neg_em = ['AF', 'AN', 'DI']
targ_dir = 'KDEF_and_AKDEF/KDEF_cropped_gray_group'
p_images = list()
q_images = list()
for cur_ind, cur_label in enumerate(pos_em):
    cur_dir = os.path.join(targ_dir, cur_label)
    cur_cnt = 0
    for fname in os.listdir(cur_dir):
        if fname.endswith('.JPG'):
            cur_cnt += 1
            img_src = os.path.join(cur_dir, fname)
            im = plt.imread(img_src).astype("float32") / 255
            p_images += [im.copy()]

for cur_ind, cur_label in enumerate(neg_em):
    cur_dir = os.path.join(targ_dir, cur_label)
    cur_cnt = 0
    for fname in os.listdir(cur_dir):
        if fname.endswith('.JPG'):
            cur_cnt += 1
            img_src = os.path.join(cur_dir, fname)
            im = plt.imread(img_src).astype("float32") / 255
            q_images += [im.copy()]


In [None]:
plt.imshow(p_images[-2], cmap='gray')
plt.axis('off')

In [None]:
plt.imshow(q_images[100], cmap='gray')
plt.axis('off')

In [None]:
p_images_test = p_images[:250]
q_images_test = q_images[:250]

x_train = np.vstack([np.stack(p_images_test), np.stack(q_images_test)])

# To simulate the random guessing for models under **H0**
y_train_h0 = np.random.choice([-1, 1], size=len(x_train))

In [None]:
t_size = len(x_train)
print(t_size)

In [None]:
# When using ONS as OAlg method

significance_levels = np.linspace(0.005, 0.1, 20)
rejection_counts = np.zeros(len(significance_levels))
rejection_times_sums = [[] for _ in range(len(significance_levels))]
total_tests_per_level = np.zeros(len(significance_levels))

required_successes = 10

success_count = 0

while success_count < required_successes:
    for index, level in enumerate(significance_levels):
        try:
            tester = Seq_C_2ST1(significance_level=level)
            tester.payoff_style = 'classification'
            tester.pred_model = 'CNN'
            tester.bet_scheme = 'ONS'

            for cur_ind in range(t_size):
                tester.process_pair(x_train[cur_ind], y_train_h0[cur_ind])
                if tester.null_rejected:
                    rejection_counts[index] += 1
                    rejection_times_sums[index].append(cur_ind + 1)
                    break  

            total_tests_per_level[index] += 1  

        except Exception as e:
            print(f"Attempt failed at level {level} due to {e}, skipping this trial...")

    if np.min(total_tests_per_level) >= required_successes:
        break  


average_rejection_rates = rejection_counts / total_tests_per_level
average_rejection_times = [np.mean(times) if times else 0 for times in rejection_times_sums]

average_rejection_rates = np.array(average_rejection_rates)
average_rejection_times = np.array(average_rejection_times)

print("Average False Positive Rates per Level:", average_rejection_rates)
print("Average Rejection Times per Level:", average_rejection_times)


In [None]:
# When using FTRL+Barrier as OAlg method

significance_levels = np.linspace(0.005, 0.1, 20)
rejection_counts = np.zeros(len(significance_levels))
rejection_times_sums = [[] for _ in range(len(significance_levels))]
total_tests_per_level = np.zeros(len(significance_levels))

required_successes = 10

success_count = 0

while success_count < required_successes:
    for index, level in enumerate(significance_levels):
        try:
            tester = Seq_C_2ST2(significance_level=level)
            tester.payoff_style = 'classification'
            tester.pred_model = 'CNN'
            tester.bet_scheme = 'FB'

            for cur_ind in range(t_size):
                tester.process_pair(x_train[cur_ind], y_train_h0[cur_ind])
                if tester.null_rejected:
                    rejection_counts[index] += 1
                    rejection_times_sums[index].append(cur_ind + 1)
                    break  

            total_tests_per_level[index] += 1  

        except Exception as e:
            print(f"Attempt failed at level {level} due to {e}, skipping this trial...")

    if np.min(total_tests_per_level) >= required_successes:
        break  


average_rejection_rates = rejection_counts / total_tests_per_level
average_rejection_times = [np.mean(times) if times else 0 for times in rejection_times_sums]

average_rejection_rates = np.array(average_rejection_rates)
average_rejection_times = np.array(average_rejection_times)

print("Average False Positive Rates per Level:", average_rejection_rates)
print("Average Rejection Times per Level:", average_rejection_times)

In [None]:
# When using Optimistic-FTRL+Barrier as OAlg method

significance_levels = np.linspace(0.005, 0.1, 20)
rejection_counts = np.zeros(len(significance_levels))
rejection_times_sums = [[] for _ in range(len(significance_levels))]
total_tests_per_level = np.zeros(len(significance_levels))

required_successes = 10

success_count = 0

while success_count < required_successes:
    for index, level in enumerate(significance_levels):
        try:
            tester = Seq_C_2ST3(significance_level=level)
            tester.payoff_style = 'classification'
            tester.pred_model = 'CNN'
            tester.bet_scheme = 'OFB'

            for cur_ind in range(t_size):
                tester.process_pair(x_train[cur_ind], y_train_h0[cur_ind])
                if tester.null_rejected:
                    rejection_counts[index] += 1
                    rejection_times_sums[index].append(cur_ind + 1)
                    break  

            total_tests_per_level[index] += 1  

        except Exception as e:
            print(f"Attempt failed at level {level} due to {e}, skipping this trial...")

    if np.min(total_tests_per_level) >= required_successes:
        break  


average_rejection_rates = rejection_counts / total_tests_per_level
average_rejection_times = [np.mean(times) if times else 0 for times in rejection_times_sums]

average_rejection_rates = np.array(average_rejection_rates)
average_rejection_times = np.array(average_rejection_times)

print("Average False Positive Rates per Level:", average_rejection_rates)
print("Average Rejection Times per Level:", average_rejection_times)