In [9]:
import os
import json
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import random_split, DataLoader
from ast import literal_eval

import warnings
warnings.filterwarnings('ignore')

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

# The batch size
batch_size = 512 

transform_train = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
])

train_set = torchvision.datasets.CIFAR10(
    root="./data", train=True, download=True,
    transform=transform_train,
)
test_set = torchvision.datasets.CIFAR10(
    root="./data", train=False, download=True,
    transform=transform_test
)

trains_set, validation_set = random_split(train_set, [0.75, 0.25])

train_loader = DataLoader(
    train_set, batch_size=batch_size, shuffle=True,
    num_workers=2
)
test_loader = DataLoader(
    test_set, batch_size=batch_size, shuffle=True,
    num_workers=2
)
validation_loader = DataLoader(
    validation_set, batch_size=batch_size, shuffle=True,
    num_workers=2
)

In [None]:
class CNN(nn.Module):
    def __init__(self, conv1_hidden=6, conv2_hidden=16, conv_kernel_size=5, pool_kernel_size=2, pool_stride=2, linear1_hidden=120, linear2_hidden=84):
        super().__init__()
        self.conv1 = nn.Conv2d(3, conv1_hidden, conv_kernel_size)
        self.pool = nn.MaxPool2d(pool_kernel_size, pool_stride)
        self.conv2 = nn.Conv2d(conv1_hidden, conv2_hidden, conv_kernel_size)
        self.fc1 = nn.Linear(conv2_hidden * 5 * 5, linear1_hidden)
        self.fc2 = nn.Linear(linear1_hidden, linear2_hidden)
        self.fc3 = nn.Linear(linear2_hidden, 10)

    def forward(self, x):
        intermediate_outputs = []
        x = self.conv1(x)
        intermediate_outputs.append(x)
        x = self.pool(F.relu(x))
        intermediate_outputs.append(x)
        x = self.conv2(x)
        intermediate_outputs.append(x)
        x = self.pool(F.relu(x))
        intermediate_outputs.append(x)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        intermediate_outputs.append(x)
        x = F.relu(self.fc2(x))
        intermediate_outputs.append(x)
        x = F.softmax(self.fc3(x))
        intermediate_outputs.append(x)

        return x, intermediate_outputs
    
cnn = CNN().to(device)
optimizer_cnn = torch.optim.Adam(cnn.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

for epoch in range(20):
    for data in train_loader:
        inputs, labels = data[0].to(device), data[1].to(device)
        optimizer_cnn.zero_grad()
        outputs, intermediate_outputs = cnn(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer_cnn.step()

In [None]:
with torch.no_grad():
    for data in test_loader:
        inputs, labels = data[0].to(device), data[1].to(device)
        outputs, intermediate_outputs = cnn(inputs)
        break

In [None]:
labels = labels.detach().numpy()
outputs = outputs.detach().numpy()

In [None]:
f = open("labels.csv", "w")
f.write("label;output\n")

for i in range(len(labels)):
    labels_str = labels[i]
    outputs_str = str(outputs[i])
    # remove \n from string
    outputs_str = outputs_str.replace("\n", "")
    f.write(str(labels_str) +";"+ str(outputs_str) +"\n")

In [245]:
# make df where column "label" is column 10 of data_cifar10_probs and column "output" is an array of the first 10 columns of data_cifar10_probs as a list of floats
data_cifar10_probs = pd.read_csv('notebooks/cifar10_probs.csv', sep=',', header=None)
data_cifar10_probs = data_cifar10_probs.rename(columns={10: "label"})
data_cifar10_probs["output"] = data_cifar10_probs.iloc[:, 0:10].values.tolist()
# make 

data_cifar10_probs = data_cifar10_probs.drop(data_cifar10_probs.columns[0:10], axis=1)
data_cifar10_probs["output"] = data_cifar10_probs["output"].astype(str)

data_cifar10_probs

Unnamed: 0,label,output
0,3,"[2.3661317e-09, 1.266867e-08, 8.9752067e-10, 0..."
1,8,"[2.0486328e-11, 8.667231e-08, 4.1953889e-16, 1..."
2,8,"[1.9216972e-07, 0.002469865, 3.1885104e-12, 3...."
3,0,"[0.999424, 1.3174234e-05, 4.9130163e-06, 2.804..."
4,6,"[3.4858956e-09, 1.7338067e-06, 1.1511221e-06, ..."
...,...,...
9995,8,"[0.20628296, 1.48559475e-05, 1.4134733e-06, 2...."
9996,3,"[4.3346837e-14, 1.0565218e-13, 8.082701e-10, 1..."
9997,5,"[8.1107293e-10, 9.9917585e-09, 7.729877e-08, 1..."
9998,1,"[1.5138195e-08, 0.99875677, 1.074252e-05, 3.15..."


In [246]:
# we have to remember to turn " " into ","
data1 = pd.read_csv('labels (2).csv', sep=';')
# get types of columns
type(data1["output"][0])

str

In [247]:
# we have to remember to turn " " into ","

smx = data_cifar10_probs['output']
labels = data_cifar10_probs['label'].astype(int)

# create an empty array
lista = np.array([])
for i in range(len(smx)):
    lista = np.append(lista, literal_eval(smx[i]))
    
smx = lista.reshape(len(smx), 10)

In [248]:
smx.shape

(10000, 10)

In [249]:
# Problem setup
n=300 # number of calibration points
alpha = 0.02 # 1-alpha is the desired coverage

In [250]:
# Split the softmax scores into calibration and validation sets (save the shuffling)

# n Trues e (smx.shape[0]-n) Falses
idx = np.array([1] * n + [0] * (smx.shape[0]-n)) > 0 

# embaralha os Trues e Falses
np.random.shuffle(idx) 

# pega os valores de softmax de acordo com os Trues e Falses
cal_smx, val_smx = smx[idx,:], smx[~idx, :] 
#val_smx = val_smx[2].reshape(1,10) #use to have 1 img to predict


# pega os valores de labels de acordo com os Trues e Falses
cal_labels, val_labels = np.array(labels[idx]), np.array(labels[~idx])
#cal_labels, val_labels = np.array(labels[idx]), np.array(labels[~idx])[2] #use to have 1 img to predict

In [251]:
cal_smx.shape, val_smx.shape

((300, 10), (9700, 10))

In [252]:
cal_labels.shape, val_labels.shape

((300,), (9700,))

### Conformal prediction happens here

In [253]:
# 1: get conformal scores. n = calib_Y.shape[0]

# criamos um vetor cal_scores com (1 -(probabilidade atribuida pelo modelo de que a imagem tenha seu label verdadeiro))
# cal score é quanto menor, melhor
cal_scores = 1-cal_smx[np.arange(n),cal_labels]

# 2: get adjusted quantile
# qhat será o valor de s_i (entrada de cal_scores) que limita os 1-alpha menores scores (os melhores!)
q_level = np.ceil((n+1)*(1-alpha))/n
qhat = np.quantile(cal_scores, q_level, interpolation='higher')

prediction_sets = val_smx >= (1-qhat) # 3: form prediction sets

In [254]:
empirical_coverage = prediction_sets[np.arange(prediction_sets.shape[0]),val_labels].mean()
print(f"The empirical coverage is: {empirical_coverage}")

The empirical coverage is: 0.9661855670103093


In [255]:
prediction_sets.shape

(9700, 10)

In [256]:
val_labels.shape

(9700,)

In [244]:
# prediction_sets_one_guess = prediction_set removing all lists with only one guess
prediction_sets_many_guesses = []
for i in range(len(prediction_sets)):
    if prediction_sets[i].sum() != 1:
        prediction_sets_many_guesses.append(prediction_sets[i])
    else:
        # make val_labels[i]==-1
        val_labels[i] = -1

# remove all -1 from val_labels
val_labels = val_labels[val_labels != -1]

# get the accuracy of the model
accuracy = 0
for i in range(len(prediction_sets_many_guesses)):
    if prediction_sets_many_guesses[i][val_labels[i]] == True:
        accuracy += 1
print(accuracy)
print(len(prediction_sets_many_guesses))

808
873


In [257]:
# prediction_sets_one_guess = prediction_set removing all lists with more than one guess
prediction_sets_one_guess = []
for i in range(len(prediction_sets)):
    if prediction_sets[i].sum() == 1:
        prediction_sets_one_guess.append(prediction_sets[i])
    else:
        # make val_labels[i]==-1
        val_labels[i] = -1

# remove all -1 from val_labels
val_labels = val_labels[val_labels != -1]

# get the accuracy of the model
accuracy = 0
for i in range(len(prediction_sets_one_guess)):
    if prediction_sets_one_guess[i][val_labels[i]] == True:
        accuracy += 1
print(accuracy)
print(len(prediction_sets_one_guess))

8591
8855


In [159]:
# count how many times there is more than one label in the prediction set
count = 0
for i in range(len(prediction_sets)):
    if sum(prediction_sets[i]) > 9:
        count += 1
print(f"The number of times there is more than one label in the prediction set is: {count}")

The number of times there is more than one label in the prediction set is: 0
