In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Data Process

In [None]:
import cv2
import os
from tqdm import *
import numpy
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import time
import torch
from torchvision.models import resnet18, resnet34, resnet50

In [None]:
data_root = '/content/drive/MyDrive/Dataset/Ham10000/ham10000'
label_list = []
label_to_image_path_list = {}
label_to_int = {}
int_to_label = {}

In [None]:
# load meta data
cnt = 0
for f1 in os.listdir(data_root):
    label = f1
    label_to_int[label] = cnt
    int_to_label[cnt] = label
    cnt += 1
    label_list.append(label)
    f2 = os.path.join(data_root, f1)
    label_to_image_path_list[label] = []
    for f3 in os.listdir(f2):
        f4 = os.path.join(f2, f3)
        label_to_image_path_list[label].append(f4)

print( 'label_to_int={0}'.format(label_to_int) )
print( 'int_to_label={0}'.format(int_to_label) )

label_to_int={'df': 0, 'akiec': 1, 'bcc': 2, 'bkl': 3, 'nv': 4, 'vasc': 5, 'mel': 6}
int_to_label={0: 'df', 1: 'akiec', 2: 'bcc', 3: 'bkl', 4: 'nv', 5: 'vasc', 6: 'mel'}


In [None]:
# load image to color images
label_to_image_arr_list = {}
for label in label_to_image_path_list:
    label_to_image_arr_list[label] = []
    image_path_list = label_to_image_path_list[label]
    for image_path in tqdm(image_path_list):
        img = cv2.imread(image_path, cv2.IMREAD_COLOR)
        label_to_image_arr_list[label].append(img)

100%|██████████| 115/115 [00:10<00:00, 10.72it/s]
100%|██████████| 327/327 [01:34<00:00,  3.46it/s]
100%|██████████| 514/514 [02:40<00:00,  3.21it/s]
100%|██████████| 1099/1099 [05:48<00:00,  3.15it/s]
100%|██████████| 6705/6705 [37:06<00:00,  3.01it/s]
100%|██████████| 142/142 [00:45<00:00,  3.15it/s]
100%|██████████| 1113/1113 [06:28<00:00,  2.86it/s]


In [None]:
# print load status
for label in label_to_image_arr_list:
    print('{0}: loaded {1} images, shape={2}'.format(label, len(label_to_image_arr_list[label]),
                                                    label_to_image_arr_list[label][0].shape))

df: loaded 115 images, shape=(450, 600, 3)
akiec: loaded 327 images, shape=(450, 600, 3)
bcc: loaded 514 images, shape=(450, 600, 3)
bkl: loaded 1099 images, shape=(450, 600, 3)
nv: loaded 6705 images, shape=(450, 600, 3)
vasc: loaded 142 images, shape=(450, 600, 3)
mel: loaded 1113 images, shape=(450, 600, 3)


In [None]:
# images to x_data, y_data
x_data = []
y_data = []
for label in label_to_image_arr_list:
    for img in label_to_image_arr_list[label]:
        x_data.append( img )
        y_data.append( label_to_int[label] )


x_data = numpy.array(x_data)
y_data = numpy.array(y_data)
print( 'x_data.shape={0}, y_data.shape={1}'.format(x_data.shape, y_data.shape) )

x_data.shape=(10015, 450, 600, 3), y_data.shape=(10015,)


## Resnet

In [None]:
learning_rate = 0.001
class_count = len(label_to_image_arr_list)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
model = resnet18().to(device)
model.fc = torch.nn.Linear(512,class_count)

In [None]:
# loss and optimizer
loss_function = torch.nn.CrossEntropyLoss()
optimizer_function = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
n_channel = x_data.shape[-1]
n_count = x_data.shape[0]
width = x_data.shape[1]
height = x_data.shape[2]

x_data = x_data.reshape( (n_count,n_channel,width,height)  )

## The training and test sets are split in the ratio of 8:2

In [None]:
# train test split
x_train, x_test, y_train, y_test = train_test_split(x_data, y_data, test_size=0.2, random_state=42)

In [None]:
# output
print('x_train={0}, x_test={1}, y_train={2}, y_test={3}'.format(
    x_train.shape, x_test.shape, y_train.shape, y_test.shape
) )

x_train=(8012, 3, 450, 600), x_test=(2003, 3, 450, 600), y_train=(8012,), y_test=(2003,)


In [None]:
def get_acc(y_pred, y_true):
    asum = 0
    for i in range(len(y_pred)):
        if y_pred[i]==y_true[i]:
            asum += 1
    return asum/len(y_pred)

In [None]:
# train and test for each epoch
epoch = 280
batch_size = 64
loss_train_history_list = []
acc_test_list = []
model.to(device)


sum_time_cost_train = 0
for ep in range(epoch):
    i = 0
    batch_loss_list = []
    t0 = time.time()
    while i<len(y_train):
        t1 = time.time()

        # x_train_tensor_batch = x_train_tensor[i:i+batch_size]
        # y_train_tensor_batch = y_train_tensor[i:i+batch_size]
        x_train_tensor_batch = x_train[i:i+batch_size]
        y_train_tensor_batch = y_train[i:i+batch_size]
        x_train_tensor_batch = torch.tensor(x_train_tensor_batch, dtype=torch.float32).to(device)
        y_train_tensor_batch = torch.tensor(y_train_tensor_batch, dtype=torch.long).to(device)

        # STEP-01: train
        model.train()
        # predict
        y_train_pred = model(x_train_tensor_batch)
        # loss
        loss = loss_function(y_train_pred, y_train_tensor_batch)
        batch_loss_list.append(loss)
        # gradient decent
        optimizer_function.zero_grad()
        loss.backward()
        optimizer_function.step()
        i = i+batch_size
        t2 = time.time()
        print('completed batch {0} of epoch {1}. loss={2}. train batch time cost={3}s'.format(i//batch_size, ep, loss, t2-t1))
    t3 = time.time()
    sum_time_cost_train += t3-t0

    # STEP-02: validation
    loss_ave = sum(batch_loss_list)/len(batch_loss_list)
    loss_train_history_list.append(loss_ave)

    # Test
    model.eval()
    asum = 0
    j=0
    with torch.no_grad():
        while j < len(y_test):
            x_test_batch = x_test[j:j+batch_size]
            y_test_batch = y_test[j:j+batch_size]
            x_test_tensor_batch = torch.tensor(x_test_batch, dtype=torch.float32).to(device)
            y_test_tensor_batch = torch.tensor(y_test_batch, dtype=torch.long).to(device)

            y_test_pred_batch = model(x_test_tensor_batch)
            y_test_pred_batch = y_test_pred_batch.cpu().detach().numpy()
            y_test_pred_batch = numpy.argmax(y_test_pred_batch, axis=1)

            for k in range(len(y_test_pred_batch)):
                if y_test_pred_batch[k]==y_test_batch[k]:
                    asum += 1

            j = j+batch_size

        t4 = time.time()
        acc_test = asum/len(y_test)
        print('completed test of epoch {0}. loss={1}. accuracy={2}. train one epoch time cost={3}s, test validation time cost={4}'.format(ep,loss, acc_test, t3-t0, t4-t3))
        acc_test_list.append(acc_test)
        print(acc_test_list)


print(sum_time_cost_train)

  return F.conv2d(input, weight, bias, self.stride,


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
completed batch 103 of epoch 116. loss=0.04286640137434006. train batch time cost=0.10180425643920898s
completed batch 104 of epoch 116. loss=0.015677202492952347. train batch time cost=0.10231566429138184s
completed batch 105 of epoch 116. loss=0.023114468902349472. train batch time cost=0.10125350952148438s
completed batch 106 of epoch 116. loss=0.08870118111371994. train batch time cost=0.10152649879455566s
completed batch 107 of epoch 116. loss=0.023600000888109207. train batch time cost=0.10115838050842285s
completed batch 108 of epoch 116. loss=0.0714862123131752. train batch time cost=0.10099053382873535s
completed batch 109 of epoch 116. loss=0.03860117122530937. train batch time cost=0.10059261322021484s
completed batch 110 of epoch 116. loss=0.06232021003961563. train batch time cost=0.09921669960021973s
completed batch 111 of epoch 116. loss=0.020567530766129494. train batch time cost=0.10095906257629395s
compl

In [None]:
# Define the folder path
dir_root = '/content/drive/MyDrive/Colab Notebooks/Results/Task 1/1/280'
if not os.path.exists(dir_root):
    os.makedirs(dir_root)

loss_train_history_list_txt = os.path.join(dir_root, 'loss_train_history_list.txt')
acc_test_list_txt = os.path.join(dir_root, 'acc_test_list.txt')

# Open file in write mode
with open(loss_train_history_list_txt, 'w') as file:
    for item in loss_train_history_list:
        file.write(str(item) + '\n')

with open(acc_test_list_txt, 'w') as file:
    for item in acc_test_list:
        file.write(str(item) + '\n')

In [None]:
final_accuracy = acc_test_list[-1]
print(f'Final model accuracy: {final_accuracy:.5f}')

In [None]:
torch.save(model, os.path.join(dir_root, 'model_resnet18_task1.pth'))

## Plot the training loss curve and accuracy curve

In [None]:
loss_train_history_list_txt =os.path.join(dir_root, 'loss_train_history_list.txt')

x_list = []
loss_train_history_list = []
# Open file in write mode
i = 0
with open(loss_train_history_list_txt, 'r') as fr:
    for line in fr:
        i += 1
        line = line.strip()
        token = float( line.split('(')[1].split(',')[0] )
        loss_train_history_list.append(token)
        x_list.append(i)

# Plotting training loss curves
plt.figure(figsize=(10, 5))
plt.plot(x_list, loss_train_history_list, label='Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Loss Over Epochs')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(dir_root,'training_loss_curve.png'))
plt.show()

In [None]:
acc_test_list_txt = os.path.join(dir_root, 'acc_test_list.txt')
x_list = []
acc_test_history_list = []
# Open file in write mode
i = 0
with open(acc_test_list_txt, 'r') as fr:
    for line in fr:
        i += 1
        line = line.strip()
        token = float( line )
        acc_test_history_list.append(token)
        x_list.append(i)


# Plotting test accuracy curves
plt.figure(figsize=(10, 5))
plt.plot(x_list, acc_test_history_list, label='Test Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.title('Test Accuracy Over Epochs')
plt.legend()
plt.grid(True)
plt.savefig(os.path.join(dir_root,'test_accuracy_curve.png'))
plt.show()