In [None]:
# This notebook contains all the correct codes for the screenshot classifier. Outputs not included otherwise the file size is too large to upload on Github.
# William Lu; Zexun Yao
# CKIDS Misinformation Diffusion
# Nov 25th 2022

In [None]:
import numpy as np
import pandas as pd
import os
import random
import time

import torch
import torchvision
import torch.nn as nn
import torchvision.datasets as datasets
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F

from sklearn.model_selection import train_test_split

from PIL import Image
import matplotlib.pyplot as plt

In [None]:
img_files = os.listdir('/project/ll_774_951/uk_ru/twitter/training_set/')         # 改
img_files = list(filter(lambda x: x != 'training_set', img_files))
def train_path(p): return f"/project/ll_774_951/uk_ru/twitter/training_set/{p}"
img_files = list(map(train_path, img_files))

print("total training images", len(img_files))
print("First item", img_files[0])

In [None]:
random.shuffle(img_files)

train = img_files[:4493] # increase the number of training
test = img_files[4493:] # 20% of all images

print("train size", len(train))
print("test size", len(test))

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])

In [None]:
class tweetdataset(Dataset):
    def __init__(self, image_paths, transform):
        super().__init__()
        self.paths = image_paths
        self.len = len(self.paths)
        self.transform = transform

    def __len__(self): return self.len

    def __getitem__(self, index): 
        path = self.paths[index]
        image = Image.open(path).convert('RGB')
        image = self.transform(image)
        label = 0 if 'anyuser' in path else 1
        return (image, label)

In [None]:
train_ds = tweetdataset(train, transform)
train_dl = DataLoader(train_ds, batch_size=100)
print(len(train_ds), len(train_dl))

In [None]:
test_ds = tweetdataset(test, transform)
test_dl = DataLoader(test_ds, batch_size=100)
print(len(test_ds), len(test_dl))

In [None]:
class tweetscreenshot(nn.Module):

    def __init__(self):
        super().__init__()

        # onvolutional layers (3,16,32)
        self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 16, kernel_size=(5, 5), stride=2, padding=1)
        self.conv2 = nn.Conv2d(in_channels = 16, out_channels = 32, kernel_size=(5, 5), stride=2, padding=1)
        self.conv3 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size=(3, 3), padding=1)

        # conected layers
        self.fc1 = nn.Linear(in_features= 64 * 6 * 6, out_features=500)
        self.fc2 = nn.Linear(in_features=500, out_features=50)
        self.fc3 = nn.Linear(in_features=50, out_features=2)


    def forward(self, X):

        X = F.relu(self.conv1(X))
        X = F.max_pool2d(X, 2)

        X = F.relu(self.conv2(X))
        X = F.max_pool2d(X, 2)

        X = F.relu(self.conv3(X))
        X = F.max_pool2d(X, 2)

        X = X.view(X.shape[0], -1)
        X = F.relu(self.fc1(X))
        X = F.relu(self.fc2(X))
        X = self.fc3(X)

        return X

In [None]:
model = tweetscreenshot()
losses = []
accuracies = []
epoches = 3
start = time.time()
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)

In [None]:
for epoch in range(epoches):

    epoch_loss = 0
    epoch_accuracy = 0

    for X, y in train_dl:
        preds = model(X)
        loss = loss_fn(preds, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        accuracy = ((preds.argmax(dim=1) == y).float().mean())
        epoch_accuracy += accuracy
        epoch_loss += loss
        print('.', end='', flush=True)

    epoch_accuracy = epoch_accuracy/len(train_dl)
    accuracies.append(epoch_accuracy)
    epoch_loss = epoch_loss / len(train_dl)
    losses.append(epoch_loss)

    print("\n --- Epoch: {}, train loss: {:.4f}, train acc: {:.4f}, time: {}".format(epoch, epoch_loss, epoch_accuracy, time.time() - start))
    
    with torch.no_grad():

        test_epoch_loss = 0
        test_epoch_accuracy = 0

        for test_X, test_y in test_dl:

            test_preds = model(test_X)
            test_loss = loss_fn(test_preds, test_y)

            test_epoch_loss += test_loss            
            test_accuracy = ((test_preds.argmax(dim=1) == test_y).float().mean())
            test_epoch_accuracy += test_accuracy

        test_epoch_accuracy = test_epoch_accuracy/len(test_dl)
        test_epoch_loss = test_epoch_loss / len(test_dl)

        print("Epoch: {}, test loss: {:.4f}, test acc: {:.4f}, time: {}\n".format(epoch, test_epoch_loss, test_epoch_accuracy, time.time() - start))

In [None]:
# training the first model complete

In [None]:
test_files_step1 = os.listdir('/project/ll_774_951/uk_ru/twitter/test_set/')
test_files_step1 = list(filter(lambda x: x != 'test_set', test_files_step1))#old: test_set, new: step3_04?
def test_path_step1(p): return f"/project/ll_774_951/uk_ru/twitter/test_set/{p}"
test_files_step1 = list(map(test_path_step1, test_files_step1))[:20000]

class Test_step1(Dataset):
    def __init__(self, image_paths, transform):
        super().__init__()
        self.paths = image_paths
        self.len = len(self.paths)
        self.transform = transform

    def __len__(self): return self.len

#     def __getitem__(self, index): 
#         path = self.paths[index]
#         image = Image.open(path).convert('RGB')
#         image = self.transform(image)
#         fileid = path.split('/')[-1].split('.')[0]
#         return (image, fileid)

    def __getitem__(self, index): 
        path = self.paths[index]
        image = Image.open(path).convert('RGB')
        image = self.transform(image)
        fileid = path.split('/')[-1].split('.')[0]
        return (image, fileid)
    
test_ds_step1 = Test_step1(test_files_step1, transform)
test_dl_step1 = DataLoader(test_ds_step1, batch_size=100)
len(test_ds_step1), len(test_dl_step1)

In [None]:
screenshot_probs_step1 = []

with torch.no_grad():
    for X, fileid in test_dl_step1:
        preds = model(X)
        preds_list = F.softmax(preds, dim=1)[:, 1].tolist()
        screenshot_probs_step1 += list(zip(list(fileid), preds_list))

In [None]:
# display some screenshot images
counter_step1 = 0 
for img, probs in zip(test_files_step1, screenshot_probs_step1):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "screenshot") and counter_step1<=100:
        counter_step1+=1
        #count += 1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')

In [None]:
# display some non-screenshot images
counter_step11 = 0 
for img, probs in zip(test_files_step1, screenshot_probs_step1):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "non-screenshot") and counter_step11<=100:
        counter_step11+=1
        #count += 1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')

In [None]:
@torch.no_grad()
def get_all_prediction(model, loader):
    preds = torch.tensor([])
    for batch in loader:
        images, labels = batch
        batch_predictions = model(images)
        preds = torch.cat((preds, batch_predictions), dim = 0)
    return preds

In [None]:
from sklearn.metrics import confusion_matrix

def plot_confusion_matrix(y_true, y_pred, classes,
                          normalize=False,
                          title=None,
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if not title:
        if normalize:
            title = 'Normalized confusion matrix'
        else:
            title = 'Confusion matrix, without normalization'

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    # Only use the labels that appear in the data
    #classes = classes[unique_labels(y_true, y_pred)]
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    print(cm)

    fig, ax = plt.subplots()
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)
    ax.figure.colorbar(im, ax=ax)
    # We want to show all ticks...
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes,
           title=title,
           ylabel='True label',
           xlabel='Predicted label')

    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right",
             rotation_mode="anchor")

    # Loop over data dimensions and create text annotations.
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha="center", va="center",
                    color="white" if cm[i, j] > thresh else "black")
    fig.tight_layout()
    return ax

In [None]:
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score

np.set_printoptions(precision=2)

train_preds = get_all_prediction(model, train_dl)
label_dict = {
 'Screenshot', 'Non-Screenshot'
}

train_label=[]
for i in range(0,len(train_ds)):
    train_label.append(train_ds.__getitem__(i)[1])
train_label= torch.FloatTensor(train_label)


# Plot non-normalized confusion matrix


plt.figure(figsize=(50,50))
a=plot_confusion_matrix(train_label, train_preds.argmax(dim=1), classes=label_dict,
                      title='Confusion matrix')
print(a)
plt.show()

print('Accuracy:', accuracy_score(train_label, train_preds.argmax(dim=1)))
print('F1:', f1_score(train_label, train_preds.argmax(dim=1), average='weighted'))

In [None]:
from sklearn.metrics import recall_score
from sklearn.metrics import roc_auc_score
print('Recall:', recall_score(train_label, train_preds.argmax(dim=1)))
print('Roc-Auc:', roc_auc_score(train_label, train_preds.argmax(dim=1)))

In [None]:
test_ds_step1_cm = tweetdataset(test_files_step1, transform)
test_dl_step1_cm = DataLoader(test_ds_step1_cm, batch_size=100)
print(len(test_ds_step1_cm), len(test_dl_step1_cm))

In [None]:
np.set_printoptions(precision=2)

test_preds_step1 = get_all_prediction(model, test_dl_step1_cm)
label_dict = {
 'Screenshot', 'Non-Screenshot'
}

test_label_step1=[]
for i in range(0,len(test_ds_step1_cm)):
    test_label_step1.append(test_ds_step1_cm.__getitem__(i)[1])
test_label_step1= torch.FloatTensor(test_label_step1)


# Plot non-normalized confusion matrix


plt.figure(figsize=(50,50))
b=plot_confusion_matrix(test_label_step1, test_preds_step1.argmax(dim=1), classes=label_dict,
                      title='Confusion matrix for the step1 test data')
print(b)
plt.show()

print('Accuracy:', accuracy_score(test_label_step1, test_preds_step1.argmax(dim=1)))
print('F1:', f1_score(test_label_step1, test_preds_step1.argmax(dim=1), average='weighted'))
print('Recall:', recall_score(test_label_step1, test_preds_step1.argmax(dim=1)))
print('Roc-Auc:', roc_auc_score(test_label_step1, test_preds_step1.argmax(dim=1)))

In [None]:
######
# Up till here are step1. In other words, the result on the testing data.
# Below is step2. In other words, train a new model, called model2, on both the training and testing data
# from above. Manually check 200 predictions of each category to find the accuracy.
######

In [None]:
img_files_step2 = os.listdir('/project/ll_774_951/uk_ru/twitter/detection3/detectionss/')         # 改
img_files_step2 = list(filter(lambda x: x != 'detectionss', img_files_step2))
def train_path_step2(p): return f"/project/ll_774_951/uk_ru/twitter/detection3/detectionss/{p}"
img_files_step2 = list(map(train_path_step2, img_files_step2))

print("total training images", len(img_files_step2))
print("First item", img_files_step2[0])

In [None]:
random.shuffle(img_files_step2)

train_step2 = img_files_step2[:5615] # increase the number of training
test_step2 = img_files_step2[5615:] # 20% of all images

print("train size", len(train_step2))
print("test size", len(test_step2))

In [None]:
train_ds_step2 = tweetdataset(train_step2, transform)
train_dl_step2 = DataLoader(train_ds_step2, batch_size=100)
print(len(train_ds_step2), len(train_dl_step2))

In [None]:
test_ds_step2 = tweetdataset(test_step2, transform)
test_dl_step2 = DataLoader(test_ds_step2, batch_size=100)
print(len(test_ds_step2), len(test_dl_step2))

In [None]:
model2 = tweetscreenshot()

losses2 = []
accuracies2 = []
epoches2 = 3
start2 = time.time()
loss_fn2 = nn.CrossEntropyLoss()
optimizer2 = torch.optim.Adam(model2.parameters(), lr = 0.001)

In [None]:
for epoch in range(epoches2):

    epoch_loss = 0
    epoch_accuracy = 0

    for X, y in train_dl_step2:
        preds = model2(X)
        loss = loss_fn2(preds, y)

        optimizer2.zero_grad()
        loss.backward()
        optimizer2.step()

        accuracy = ((preds.argmax(dim=1) == y).float().mean())
        epoch_accuracy += accuracy
        epoch_loss += loss
        print('.', end='', flush=True)

    epoch_accuracy = epoch_accuracy/len(train_dl_step2)
    accuracies.append(epoch_accuracy)
    epoch_loss = epoch_loss / len(train_dl_step2)
    losses2.append(epoch_loss)

    print("\n --- Epoch: {}, train loss: {:.4f}, train acc: {:.4f}, time: {}".format(epoch, epoch_loss, epoch_accuracy, time.time() - start))
    
    with torch.no_grad():

        test_epoch_loss = 0
        test_epoch_accuracy = 0

        for test_X, test_y in test_dl_step2:

            test_preds = model2(test_X)
            test_loss = loss_fn2(test_preds, test_y)

            test_epoch_loss += test_loss            
            test_accuracy = ((test_preds.argmax(dim=1) == test_y).float().mean())
            test_epoch_accuracy += test_accuracy

        test_epoch_accuracy = test_epoch_accuracy/len(test_dl_step2)
        test_epoch_loss = test_epoch_loss / len(test_dl_step2)

        print("Epoch: {}, test loss: {:.4f}, test acc: {:.4f}, time: {}\n".format(epoch, test_epoch_loss, test_epoch_accuracy, time.time() - start))

In [None]:
test_files = os.listdir('/project/ll_774_951/uk_ru/twitter/step3_04/')
test_files = list(filter(lambda x: x != 'step3_04', test_files))#old: test_set, new: step3_04?
def test_path(p): return f"/project/ll_774_951/uk_ru/twitter/step3_04/{p}"
test_files = list(map(test_path, test_files))[:20000]

class Test(Dataset):
    def __init__(self, image_paths, transform):
        super().__init__()
        self.paths = image_paths
        self.len = len(self.paths)
        self.transform = transform

    def __len__(self): return self.len

#     def __getitem__(self, index): 
#         path = self.paths[index]
#         image = Image.open(path).convert('RGB')
#         image = self.transform(image)
#         fileid = path.split('/')[-1].split('.')[0]
#         return (image, fileid)

    def __getitem__(self, index): 
        path = self.paths[index]
        image = Image.open(path).convert('RGB')
        image = self.transform(image)
        fileid = path.split('/')[-1].split('.')[0]
        return (image, fileid)
    
test_ds2 = Test(test_files, transform)
test_dl2 = DataLoader(test_ds2, batch_size=100)
len(test_ds2), len(test_dl2)


In [None]:
screenshot_probs = []

with torch.no_grad():
    for X, fileid in test_dl2:
        preds = model(X)
        preds_list = F.softmax(preds, dim=1)[:, 1].tolist()
        screenshot_probs += list(zip(list(fileid), preds_list))

In [None]:
# display some screenshot images; first 50 images
# test data: random images from the web
counter = 0
#count = 0 
for img, probs in zip(test_files, screenshot_probs):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "screenshot") and counter<=200:
        counter+=1
        #count += 1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')

In [None]:
# display some screenshot images; second 50 images
# test data: random images from the web
zz = 0 
for img, probs in zip(test_files[1000:], screenshot_probs[1000:]):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "screenshot") and zz<=100:
        zz += 1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')

In [None]:
# display some non-screenshot images; first 50 images
# test data: random images from the web
qq = 0 
for img, probs in zip(test_files, screenshot_probs):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "non-screenshot") and qq<=200:
        qq += 1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')

In [None]:
# display some non-screenshot images; second 50 images
# test data: random images from the web
qqq = 0 
for img, probs in zip(test_files[1000:], screenshot_probs[1000:]):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "non-screenshot") and qqq<=100:
        qqq += 1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')

In [None]:
# test statistics for testing random images on the web
total2 = 200
TP2 = 53
FP2 = 47
TN2 = 99
FN2 = 1
Recall2 = TP2/(TP2+FN2)
Precision2 = TP2/(TP2+FP2)
Accuracy2 = (TP2+TN2)/(TP2+TN2+FP2+FN2)
F12 = 2*((Precision2*Recall2)/(Precision2+Recall2))
print('Accuracy:',Accuracy2)
print('F1',F12)
print('Recall',Recall2)
print('Precision',Precision2)

In [None]:
#####
# Below are for testing kaggle data screenshot. Step 3.
#####

In [None]:
test_files_k = os.listdir('/project/ll_774_951/uk_ru/twitter/Twitter_kaggle_screenshot/')
test_files_k = list(filter(lambda x: x != 'Twitter_kaggle_screenshot', test_files_k)) # test set kaggle
def test_path_k(p): return f"/project/ll_774_951/uk_ru/twitter/Twitter_kaggle_screenshot/{p}"
test_files_k = list(map(test_path_k, test_files_k))

class Test(Dataset):
    def __init__(self, image_paths, transform):
        super().__init__()
        self.paths = image_paths
        self.len = len(self.paths)
        self.transform = transform

    def __len__(self): return self.len

#     def __getitem__(self, index): 
#         path = self.paths[index]
#         image = Image.open(path).convert('RGB')
#         image = self.transform(image)
#         fileid = path.split('/')[-1].split('.')[0]
#         return (image, fileid)

    def __getitem__(self, index): 
        path = self.paths[index]
        image = Image.open(path).convert('RGB')
        image = self.transform(image)
        fileid = path.split('/')[-1].split('.')[0]
        return (image, fileid)
    
test_dsk = Test(test_files_k, transform)
test_dlk = DataLoader(test_dsk, batch_size=100)
len(test_dsk), len(test_dlk)


In [None]:
screenshot_probs_k = []

with torch.no_grad():
    for X, fileid in test_dlk:
        preds = model2(X)
        preds_list = F.softmax(preds, dim=1)[:, 1].tolist()
        screenshot_probs_k += list(zip(list(fileid), preds_list))

In [None]:
num = 0 
for img, probs in zip(test_files_k, screenshot_probs_k):
    pil_im = Image.open(img, 'r')
    label = "non-screenshot" if probs[1] > 0.5 else "screenshot"
    title = "prob of non-screenshot: " + str(probs[1]) + " Classified as: " + label
    if (label == "non-screenshot"):
        num+=1
        plt.figure()
        plt.imshow(pil_im)
        plt.suptitle(title)
        plt.show()
print('success')
print('Number of images classified as non-screenshot',num) # 162 here, which is a bad result.

In [None]:
# Possible improvement for the classifier using cross validation

In [None]:
from torch.utils.data import Dataset, DataLoader,TensorDataset,random_split,SubsetRandomSampler, ConcatDataset
#https://medium.com/dataseries/k-fold-cross-validation-with-pytorch-and-sklearn-d094aa00105f

In [None]:
for fold, (train_idx,val_idx) in enumerate(splits.split(np.arange(len(dataset)))):

    print('Fold {}'.format(fold + 1))

    train_sampler = SubsetRandomSampler(train_idx)
    test_sampler = SubsetRandomSampler(val_idx)
    train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
    test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    
    model = tweetscreenshot()
    model.to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.002)

    history = {'train_loss': [], 'test_loss': [],'train_acc':[],'test_acc':[]}

    for epoch in range(num_epochs):
        train_loss, train_correct=train_epoch(model,device,train_loader,criterion,optimizer)
        test_loss, test_correct=valid_epoch(model,device,test_loader,criterion)

        train_loss = train_loss / len(train_loader.sampler)
        train_acc = train_correct / len(train_loader.sampler) * 100
        test_loss = test_loss / len(test_loader.sampler)
        test_acc = test_correct / len(test_loader.sampler) * 100

        print("Epoch:{}/{} AVG Training Loss:{:.3f} AVG Test Loss:{:.3f} AVG Training Acc {:.2f} % AVG Test Acc {:.2f} %".format(epoch + 1,
                                                                                                             num_epochs,
                                                                                                             train_loss,
                                                                                                             test_loss,
                                                                                                             train_acc,
                                                                                                             test_acc))
        history['train_loss'].append(train_loss)
        history['test_loss'].append(test_loss)
        history['train_acc'].append(train_acc)
        history['test_acc'].append(test_acc)

    foldperf['fold{}'.format(fold+1)] = history  

best_model = torch.save(model,'k_cross_CNN.pt') 