In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import sys
sys.path.append('../src')
from torch_utils import *
from pathlib import Path
from multilabel.evaluate import *
from harvest_data import EuropeanaAPI

from train_multilabel import *

# Single label

## Crossvalidation

In [None]:

results_path = '/home/jcejudo/projects/image_classification/results/single_label/crossvalidation'

results_path = Path(results_path)
metrics_list = ['accuracy','precision','recall','f1',]

metrics_dict = {k:[] for k in metrics_list}
for split in results_path.iterdir():
    info_dict_path = split.joinpath('training_info.pth')
    info_dict = torch.load(info_dict_path)
    
    class_index_path = split.joinpath('class_index.json')
    
    with open(class_index_path,'r') as f:
        class_index_dict = json.load(f)
        class_index_dict = {int(i):v for i,v in class_index_dict.items()}

    for metric in metrics_list:
        metrics_dict[metric].append(info_dict[metric+'_test'])
               
for k,v in metrics_dict.items():
    mean = sum(v)/len(v)
    std = np.std(np.array(v))
    print(f'{k}: {mean:.3f}+-{std:.3f}')

In [None]:
# confusion matrix
confusion_matrix = info_dict['confusion_matrix_test']
labels = [class_index_dict[i] for i in range(confusion_matrix.shape[0])]
plot_conf_matrix(confusion_matrix,labels,font_scale=2.0,figsize=(15,15))

## Evaluation

In [None]:

eval_results = '/home/jcejudo/projects/image_classification/results/single_label/evaluation'

results_path = Path(eval_results).joinpath('evaluation_results.pth')
metrics_dict = torch.load(results_path)
for k,v in metrics_dict.items():
    if k != 'confusion_matrix':
        print(f'{k}: {v:.3f}')
        
labels = [class_index_dict[i] for i in range(confusion_matrix.shape[0])]
plot_conf_matrix(metrics_dict['confusion_matrix'],labels,font_scale=2.0,figsize=(15,15))

In [None]:
# to do: gradcam correctly classified, misclassified

results_path = '/home/jcejudo/projects/image_classification/results/single_label/crossvalidation'
data_dir = '/home/jcejudo/projects/image_classification/data/single_label/images_evaluation'

mode = 'incorrect'

get_first = True

split_path = Path(results_path).joinpath('split_0')
with open(split_path.joinpath('conf.json'),'r') as f:
    conf = json.load(f)

input_size = conf['input_size']
resnet_size = conf['resnet_size']

df = path2DataFrame(data_dir)
X = df['file_path'].values
y = df['category'].values

#load class_index dict
with open(split_path.joinpath('class_index.json'),'r') as f:
    class_index_dict = json.load(f)
    class_index_dict = {int(i):v for i,v in class_index_dict.items()}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = ResNet(resnet_size,len(class_index_dict)).to(device)

#load model
model_path = split_path.joinpath('checkpoint.pth')
model.load_state_dict(torch.load(model_path))
model.eval()

transform = transforms.Compose([
transforms.Resize((input_size,input_size)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

cat_list = []

for img_path, label in zip(X,y):

    image = Image.open(img_path).convert('RGB')
        
    ID = Path(img_path).with_suffix('').name.replace('[ph]','/')
    URI = 'http://data.europeana.eu/item'+ID
    
    category_list, confidence_list, XAI_list = predict_grad_cam(
        model = model, 
        class_index_dict = class_index_dict,
        image = image,
        heatmap_layer = model.net.layer4[1].conv2, 
        transform = transform, 
        device = device, 
        thres = 0.1, 
        max_pred = 3)

    pred = category_list[0]
    conf = confidence_list[0]

    if pred == label and mode == 'correct' or pred != label and mode == 'incorrect':
        
        if get_first:
            if label in cat_list:
                continue
            else:
                cat_list.append(label)
            
        
        
        print(URI)
        print('ground truth: ',label)
        plot_grad_cam(
            image = image,
            category_list = category_list, 
            confidence_list = confidence_list,
            XAI_list = XAI_list,
            fontsize = 20,
            figsize = (20,20)
        )

        plt.show()
        
        

        print(50*'--')
        
    


# Multilabel

## Crossvalidation

In [None]:


results_path = '/home/jcejudo/projects/image_classification/results/multilabel/crossvalidation'
results_path = Path(results_path)

metrics_list = ['coverage','lrap','label_ranking_loss','ndcg_score','dcg_score']
metrics_list += ['acc','precision','recall','f1',]

metrics_dict = {k:[] for k in metrics_list}
for split in results_path.iterdir():
    info_dict_path = split.joinpath('training_info.pth')
    info_dict = torch.load(info_dict_path)
    
    class_index_path = split.joinpath('class_index.json')
    
    with open(class_index_path,'r') as f:
        class_index_dict = json.load(f)
        class_index_dict = {int(i):v for i,v in class_index_dict.items()}

    for metric in metrics_list:
        metrics_dict[metric].append(info_dict[metric+'_test'])
               
for k,v in metrics_dict.items():
    mean = sum(v)/len(v)
    std = np.std(np.array(v))
    print(f'{k}: {mean:.3f}+-{std:.3f}')

In [None]:

cat_metrics = {k:{'precision':[],'recall':[],'f1-score':[],} for k in class_index_dict.values()}

for split in results_path.iterdir():
    info_dict_path = split.joinpath('training_info.pth')
    info_dict = torch.load(info_dict_path)
    
    class_index_path = split.joinpath('class_index.json')

    for cm,v in zip(info_dict['confusion_matrix_test'],class_index_dict.values()):

        TN = float(cm[0,0])
        TP = float(cm[1,1])
        FP = float(cm[0,1])
        FN = float(cm[1,0])

        try:
            recall = TP/(TP+FN)
        except:
            recall = 0
        try:
            precision = TP/(TP+FP)
        except:
            precision = 0
            
        try:
            f1 = 2*TP/(2*TP+FP+FN)
        except:
            f1 = 0
            
        cat_metrics[v]['precision'].append(precision)
        cat_metrics[v]['recall'].append(recall)
        cat_metrics[v]['f1-score'].append(f1)
        

for k in cat_metrics.keys():
    cat_metrics[k]['precision'] = {'mean':np.mean(cat_metrics[k]['precision']),'std':np.std(cat_metrics[k]['precision'])}
    cat_metrics[k]['recall'] = {'mean':np.mean(cat_metrics[k]['recall']),'std':np.std(cat_metrics[k]['recall'])}
    cat_metrics[k]['f1-score'] = {'mean':np.mean(cat_metrics[k]['f1-score']),'std':np.std(cat_metrics[k]['f1-score'])}

from tabulate import tabulate

table_list = [['category','precision','recall','f1-score']]


for k in cat_metrics.keys():
    prec_mean = cat_metrics[k]['precision']['mean']
    prec_std = cat_metrics[k]['precision']['std']
    prec_str = f'{prec_mean:.3f}+-{prec_std:.3f}'
    
    recall_mean = cat_metrics[k]['recall']['mean']
    recall_std = cat_metrics[k]['recall']['std']
    recall_str = f'{recall_mean:.3f}+-{recall_std:.3f}'
    
    f1_mean = cat_metrics[k]['f1-score']['mean']
    f1_std = cat_metrics[k]['f1-score']['std']
    f1_str = f'{f1_mean:.3f}+-{f1_std:.3f}'
    
    table_list.append([k,prec_str,recall_str,f1_str])

print(tabulate(table_list))



In [None]:
#confusion matrix
for cm,v in zip(info_dict['confusion_matrix_test'],class_index_dict.values()):
    labels = ['False','True']
    plot_conf_matrix(cm,labels,font_scale=2.0,figsize=(5,5),title=v)

## Evaluation

In [None]:
eval_results = '/home/jcejudo/projects/image_classification/results/multilabel/evaluation'

results_path = Path(eval_results).joinpath('evaluation_results.pth')
metrics_dict = torch.load(results_path)
for k,v in metrics_dict.items():
    if k not in ['confusion_matrix']:
        try:
            print(f'{str(k)}: {v:.3f}')
        except:
            pass
        
for cm,v in zip(metrics_dict['confusion_matrix'],class_index_dict.values()):
    labels = ['False','True']
    plot_conf_matrix(cm,labels,font_scale=2.0,figsize=(5,5),title=v)

In [None]:
# gradcam 

results_path = '/home/jcejudo/projects/image_classification/results/multilabel/crossvalidation'
data_dir = '/home/jcejudo/projects/image_classification/data/multilabel/images_evaluation'
eval_annotations = '/home/jcejudo/projects/image_classification/data/multilabel/eval_multilabel_with_URL.csv'

data_dir = Path(data_dir)

split_path = Path(results_path).joinpath('split_0')
with open(split_path.joinpath('conf.json'),'r') as f:
    conf = json.load(f)

input_size = conf['input_size']
resnet_size = conf['resnet_size']

#load class_index dict
with open(split_path.joinpath('class_index.json'),'r') as f:
    class_index_dict = json.load(f)
    class_index_dict = {int(i):v for i,v in class_index_dict.items()}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model,class_index_dict = load_multilabel_model(split_path,device,resnet_size = resnet_size)

df = pd.read_csv(eval_annotations)
#df = df.dropna()
#filter images in df contained in data_path
imgs_list = list(data_dir.iterdir())
df['filepath'] = df['ID'].apply(lambda x:data_dir.joinpath(id_to_filename(x)+'.jpg'))
df = df.loc[df['filepath'].apply(lambda x: Path(x) in imgs_list)]

X = df['filepath'].values
y = df['category'].values

model.eval()

transform = transforms.Compose([
transforms.Resize((input_size,input_size)),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

for img_path, label in zip(X,y):

    image = Image.open(img_path).convert('RGB')
        
    ID = Path(img_path).with_suffix('').name.replace('[ph]','/')
    URI = 'http://data.europeana.eu/item'+ID
    
    category_list, confidence_list, XAI_list = predict_grad_cam(
        model = model, 
        class_index_dict = class_index_dict,
        image = image,
        heatmap_layer = model.net.layer4[1].conv2, 
        transform = transform, 
        device = device, 
        thres = 0.5, 
        max_pred = 3)

    print(URI)
    print('ground truth: ',label)
    plot_grad_cam(
        image = image,
        category_list = category_list, 
        confidence_list = confidence_list,
        XAI_list = XAI_list,
        fontsize = 20,
        figsize = (20,20)
    )

    plt.show()

    print(50*'--')

In [None]:
#read crossvalidation results

results_path = '/home/jcejudo/projects/image_classification/results/multilabel/crossvalidation'

results_path = Path(results_path)

metrics_list = ['loss','coverage','lrap','label_ranking_loss','ndcg_score','dcg_score']

metrics_dict = {k:[] for k in metrics_list}
for split in results_path.iterdir():
    with open(split.joinpath('test_metrics.json'),'r') as f:
        test_metrics = json.load(f)
    for metric in metrics_dict:
        if metric in test_metrics.keys():
            metrics_dict[metric].append(test_metrics[metric])

        
for k,v in metrics_dict.items():
    mean = sum(v)/len(v)
    std = np.std(np.array(v))
    print(f'{k}: {mean:.3f}+-{std:.3f}')
        

In [None]:
#clean up evaluation set

def validate_categories(x):
    return ' '.join([cat for cat in x.split() if cat in vocab_dict.keys()])

def map_vocab(x,vocab_dict):
    return ' '.join([vocab_dict[cat] for cat in x.split()])

def get_url(ID):
    return eu.record(ID)
    
eu = EuropeanaAPI('api2demo')
with open('../vocabularies/vocabulary.json','r') as f:
    vocab_dict = json.load(f)
    
df = pd.read_csv('../data/multilabel/eval_multilabel.csv')
print(df.shape)
df['category'] = df['category'].apply(lambda x: validate_categories(x))
df['skos_concept'] = df['category'].apply(lambda x: map_vocab(x,vocab_dict))
df['URL'] = df['ID'].apply(lambda x: get_url(x))
print(df.shape)
df.to_csv('../data/multilabel/eval_multilabel_with_URL.csv',index=False)

In [None]:
df = pd.read_csv('../data/multilabel/training_data.csv')
df.columns
print(df.shape)

In [None]:
df.category.unique()

In [None]:
df.loc[df['category'] == ' ']

In [None]:

def evaluate(**kwargs):

    # https://scikit-learn.org/stable/modules/generated/sklearn.metrics.multilabel_confusion_matrix.html
    
    model = kwargs.get('model')
    dataloader = kwargs.get('dataloader')
    loss_function = kwargs.get('loss_function')
    device = kwargs.get('device')
    
    ground_truth = []
    predictions = []
    
    model.eval()
    val_loss = 0.0
    for inputs,labels,_ in dataloader:
        inputs, labels = inputs.to(device), labels.to(device)
        output = model(inputs)
        labels = labels.type_as(output)
        loss = loss_function(output, labels)
        val_loss += loss.item()
                
        ground_truth += list(labels.cpu().detach().numpy())
        predictions += list(output.cpu().detach().numpy())
        
    ground_truth = np.array(ground_truth)
    predictions = np.array(predictions)
    
    print(ground_truth.shape)
        
    val_loss /= len(dataloader.dataset)
    coverage = sklearn.metrics.coverage_error(ground_truth, predictions)
    lrap = sklearn.metrics.label_ranking_average_precision_score(ground_truth, predictions)
    label_ranking_loss = sklearn.metrics.label_ranking_loss(ground_truth, predictions)
    ndcg_score = sklearn.metrics.ndcg_score(ground_truth, predictions)
    dcg_score = sklearn.metrics.dcg_score(ground_truth, predictions)

    #sklearn.metrics.multilabel_confusion_matrix(ground_truth, predictions,labels=np.arange(output.shape[1]))
    
    
    return {
        'loss':val_loss,
        'coverage':coverage,
        'lrap':lrap,
        'label_ranking_loss':label_ranking_loss,
        'ndcg_score':ndcg_score,
        'dcg_score':dcg_score,
        },ground_truth,predictions


#analysis evaluation set
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

input_size = 128
batch_size = 64
num_workers = 4
model_dir = '/home/jcejudo/results_multilabel_no_test'
data_dir = '/home/jcejudo/eval_multilabel'
annotations = '../data/multilabel/eval_multilabel.csv'


data_dir = Path(data_dir)
df_path = Path(annotations)
model_dir = Path(model_dir)


test_transform = transforms.Compose([
  transforms.Resize((input_size, input_size)),
  transforms.ToTensor(),
  # this normalization is required https://pytorch.org/tutorials/beginner/finetuning_torchvision_models_tutorial.html
  transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
])

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model,class_index_dict = load_multilabel_model(model_dir,device)

df = pd.read_csv(df_path)
df = df.dropna()
print(df.shape)
print(df)
#filter images in df contained in data_path
imgs_list = list(data_dir.iterdir())
df['filepath'] = df['ID'].apply(lambda x:data_dir.joinpath(id_to_filename(x)+'.jpg'))
df = df.loc[df['filepath'].apply(lambda x: Path(x) in imgs_list)]


mlb = sklearn.preprocessing.MultiLabelBinarizer()
mlb.fit([class_index_dict.values()])

imgs = np.array([str(path) for path in df['filepath'].values])
labels = [item.split() for item in df['category'].values]
labels = mlb.transform(labels)

testset = MultilabelDataset(imgs,labels,transform = test_transform)
testloader = DataLoader(testset, batch_size=batch_size,shuffle=True, num_workers=num_workers,drop_last=True)

print('test:',imgs.shape[0])

loss_function = nn.BCEWithLogitsLoss()
test_metrics,ground_truth,predictions = evaluate(
  model = model,
  dataloader = testloader,
  loss_function = loss_function,
  device = device
)


In [None]:
test_metrics

In [None]:
threshold = 0.9

true_positive_dict = {}
false_negative_dict = {}
false_positive_dict = {}

for gt,pred in zip(ground_truth,predictions):
    gt = [class_index_dict[i] for i in np.where(gt  == 1.0)[0]]
    pred = [class_index_dict[i] for i in np.where(pred  > threshold)[0]]
    
    for label in gt:
        if label not in pred:
            if label not in false_negative_dict:
                false_negative_dict.update({label:1})
            else:
                false_negative_dict[label] += 1
                
        else:
            if label not in true_positive_dict:
                true_positive_dict.update({label:1})
            else:
                true_positive_dict[label] += 1
            
    for label in pred:
        if label not in gt:
            if label not in false_positive_dict:
                false_positive_dict.update({label:1})
            else:
                false_positive_dict[label] += 1
                
                
                
print('correctly classified:',true_positive_dict)
print('false negatives:',false_negative_dict)
print('false positives:',false_positive_dict)

In [None]:


def plot_count_dict(count_dict,title=''):
    cats = []
    values = []
    for k,v in count_dict.items():
        cats.append(k)
        values.append(v)

    fig,ax = plt.subplots(1,1,figsize=(10,10))
    pos = np.arange(len(cats))
    rects = ax.barh(pos, values,
                     align='center',
                     height=0.5,
                     tick_label=cats)

    for i, v in enumerate(values):
        ax.text(v + 0.5, i, str(v))
        
    ax.set_title(title)
        
plot_count_dict(true_positive_dict,'true positives')
plot_count_dict(false_negative_dict,'false negatives')
plot_count_dict(false_positive_dict,'false positives')