# ConvNext


In [None]:
# Install the torchinfo package for showing the network architecture information
!pip install torchinfo -qqq
!pip install torch torchvision

In [None]:
# Import the necessary libraries for working with CIFART-10 dataset and PyTorch.
import torch
import torchvision
from torchvision import datasets,transforms, models
import torch.nn as nn
import torch.nn.functional as F
from torch import nn,optim,no_grad
from torch.utils.data import DataLoader
import torch.optim.lr_scheduler as lr_scheduler
from torchinfo import summary
from torch.cuda.amp import GradScaler, autocast

import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
import pandas as pd
import numpy as np
import pickle
import sys
import time

In [None]:
# To configure the usage of a GPU (cuda) or MPS (Apple) if either of them is available
has_mps = torch.backends.mps.is_built()
device = "cuda" if torch.cuda.is_available() else "mps" if has_mps else "cpu"
print(f"Python versoin: {sys.version_info.major, sys.version_info.minor, sys.version_info.micro}")
print(f"PyTorch version: {torch.__version__}")
print(f"Device: {device}")

In [None]:
# 定义 drop_path 函数
def drop_path(x, drop_prob: float = 0., training: bool = False):
    if drop_prob == 0. or not training:
        return x
    keep_prob = 1 - drop_prob
    shape = (x.shape[0],) + (1,) * (x.ndim - 1)
    random_tensor = keep_prob + torch.rand(shape, dtype=x.dtype, device=x.device)
    random_tensor.floor_()
    output = x / keep_prob * random_tensor
    return output

# 定义 ConvBNAct 类用于卷积、BN 和激活函数的组合
class ConvBNAct(nn.Module):
    def __init__(self, in_chs, out_chs, kernel_size, stride, act_layer=nn.SiLU):
        super().__init__()
        self.conv = nn.Conv2d(in_chs, out_chs, kernel_size, stride, kernel_size // 2, bias=False)
        self.bn = nn.BatchNorm2d(out_chs)
        self.act = act_layer()

    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

# 定义 SqueezeExcite 模块
class SqueezeExcite(nn.Module):
    def __init__(self, in_chs, se_ratio=0.25):
        super(SqueezeExcite, self).__init__()
        reduced_chs = max(1, int(in_chs * se_ratio))
        self.fc1 = nn.Conv2d(in_chs, reduced_chs, 1)
        self.fc2 = nn.Conv2d(reduced_chs, in_chs, 1)

    def forward(self, x):
        scale = F.adaptive_avg_pool2d(x, 1)
        scale = torch.sigmoid(self.fc2(F.silu(self.fc1(scale))))
        return x * scale

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:

train_path = '/content/drive/MyDrive/Colab Notebooks/trainset'
test_path = '/content/drive/MyDrive/Colab Notebooks/testset'


In [None]:

from torchvision import transforms

# 数据增强部分
train_transform = transforms.Compose([
    # 数据增强部分增加了旋转和随机仿射变换
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5),
    # transforms.RandomRotation(degrees=45),  # 增加旋转
    transforms.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3, hue=0.1),
    #transforms.RandomAffine(degrees=10, translate=(0.2, 0.2), scale=(0.8, 1.2), shear=20),  # 增加仿射变换
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 测试集的数据增强保持不变
test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# 加载数据集
train_set = datasets.ImageFolder(root=train_path, transform=train_transform)
test_set = datasets.ImageFolder(root=test_path, transform=test_transform)

train_loader = DataLoader(train_set, batch_size=32, shuffle=True)
test_loader = DataLoader(test_set, batch_size=32, shuffle=False)


# Define the classes if they are known
classes = [
    '云芝', '冬菇', '冬虫夏草', '变绿红菇', '大青褶伞', '大鹿花菌', '宽鳞多孔菌',
    '尖顶地星', '干巴菌', '杏鮑菇', '毒丝盖伞', '胶质刺银耳', '毒蝇伞', '毛头鬼伞', '灵芝',
    '牛舌菌', '狭头小菇', '猴头菇', '硫黄菌', '竹荪', '粉红枝瑚菌', '粪生黑蛋巢菌',
    '紫蜡蘑', '红紫柄小菇', '红菇', '蓝绿乳菇', '羊肚菌', '美味牛肝菌', '裂褶菌',
     '赭红拟口蘑',  '金黃鵝膏菌', '欧洲黑木耳',
    '鹿蕊', '鳞柄白鹅膏', '黄裙竹荪', '黑松露'
]

# Check dataset loading
print("Number of training samples:", len(train_set))
print("Number of test samples:", len(test_set))
print("Classes:", classes)
print("Class to index mapping:", train_set.class_to_idx)
print(len(classes))



In [None]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torchvision import datasets, transforms

# 下载并安装中文字体
!apt-get -qq install -y fonts-noto-cjk

import matplotlib.font_manager as fm

# 查找可用的 Noto 字体
font_dirs = fm.findSystemFonts(fontpaths=None)
print("Available fonts:", font_dirs)

# 设置使用的中文字体
font_path = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"  # 正确的字体路径
font_prop = fm.FontProperties(fname=font_path)

# 获取类别数
num_classes = len(classes)
classes = train_set.classes
print(len(train_set.classes))

# 确保 classes 和 train_set.class_to_idx 对应
class_idx_to_chinese = {v: classes[v] for v in range(len(classes))}

# 每个类别选择一张示例图像
sample_images = []
for label in range(len(classes)):
    # 找到第一个属于当前类别的图像
    for path, target in train_set.imgs:
        if target == label:
            img = Image.open(path).convert("RGB")  # 转换为RGB
            sample_images.append((img, class_idx_to_chinese[label]))  # 用中文名作为标签
            break

# 设置显示网格
fig, axes = plt.subplots(4, 10, figsize=(20, 8))
i = 0
for row in axes:
    for axis in row:
        axis.set_xticks([])
        axis.set_yticks([])
        if i < len(sample_images):
            img, label = sample_images[i]
            axis.set_xlabel(label, fontsize=12, fontproperties=font_prop)  # 中文标签
            axis.imshow(img)
        else:
            axis.axis('off')  # 如果样本不足，关闭多余的子图框
        i += 1

plt.tight_layout()
plt.show()

In [None]:
import torch
import torch.nn as nn


**Define CNN Model**

In [None]:
# Define the model of ConvNeXt
class ConvNeXtModel(nn.Module):
    def __init__(self, num_classes=10):
        super(ConvNeXtModel, self).__init__()
        # Load a pre-trained ConvNeXt model
        self.model = models.convnext_base(weights='DEFAULT')  # Use available weights
        # Replace the classifier head for CIFAR-10
        self.model.classifier[2] = nn.Linear(self.model.classifier[2].in_features, num_classes)

    def forward(self, x):
        return self.model(x)

Train the model

In [None]:
model = ConvNeXtModel(num_classes=10).to(device)

summary(model=model, input_size=(1, 3, 32, 32), col_width=15,
        col_names=['input_size', 'output_size', 'num_params', 'trainable'],
        row_settings=['var_names'], verbose=0)
# Create a Models folder to store the checkpoints
!mkdir Models

In [None]:
# Specify Loss/Cost function
criterion = nn.CrossEntropyLoss()

# Specify optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=0.001)

#optimizer = optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
#optimizer = optim.SGD(model.parameters(), lr=0.004)

# Specify Learning Rate Scheduler
#scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.2, patience=3, verbose=True, min_lr=1e-6)
#scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=30, gamma=0.1)
scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=25, eta_min=1e-6)

In [None]:

#Start Training
EPOCHS = 300
#initialize early stopping variables
best_acc = 0.0
patience =5
patience_counter =0


loss_hist, acc_hist = [], []  # Lists to store training loss and accuracy
loss_hist_test, acc_hist_test = [], []  # Lists to store validation loss and accuracy

model.to(device)  # Move the model to the specified device (e.g., GPU)

print("Training was started.\n")

# Warm-up for 5 epochs
for epoch in range(5):
    # Train with a small learning rate
    optimizer.param_groups[0]['lr'] = 1e-4  # Start with a lower learning rate


for epoch in range(1, EPOCHS + 1):
    time_ckpt = time.time()
    print("EPOCH:", epoch, end=" ")
    running_loss = 0.0
    correct = 0

    # Training loop
    for data in train_loader:
        batch, labels = data
        batch, labels = batch.to(device), labels.to(device)

        optimizer.zero_grad()  # Clear the gradients
        outputs = model(batch)  # Forward pass
        loss = criterion(outputs, labels)  # Compute the loss
        loss.backward()  # Backward pass (compute gradients)
        optimizer.step()  # Update the model's parameters

        # Compute training statistics
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        running_loss += loss.item()

    avg_loss = running_loss / len(train_set)  # Average training loss for the epoch
    avg_acc = correct / len(train_set)  # Average training accuracy for the epoch
    loss_hist.append(avg_loss)
    acc_hist.append(avg_acc)

    # Validation statistics
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():
        loss_test = 0.0
        correct_test = 0

        # Validation loop
        for data in test_loader:
            batch, labels = data
            batch, labels = batch.to(device), labels.to(device)
            outputs = model(batch)
            loss = criterion(outputs, labels)
            _, predicted = torch.max(outputs, 1)
            correct_test += (predicted == labels).sum().item()
            loss_test += loss.item()

        avg_loss_test = loss_test / len(test_set)  # Average validation loss for the epoch
        avg_acc_test = correct_test / len(test_set)  # Average validation accuracy for the epoch
        loss_hist_test.append(avg_loss_test)
        acc_hist_test.append(avg_acc_test)

    model.train()  # Set the model back to training mode
#     scheduler.step(avg_loss_val) # Check the scheduler for updating the learning rate

    # Save the model at the end of each epoch
    with open("Models/lenet5_model_{}.pth".format(epoch), "wb") as f:
        model.eval()
        pickle.dump(model, f)
        model.train()
    # Early Stopping Logic
    if avg_acc_test > best_acc:
        best_acc = avg_acc_test
        patience_counter = 0
        # Save the model at the end of the epoch
        with open(f"Models/best_model_epoch_{epoch}.pth", "wb") as f:
            pickle.dump(model, f)
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered.")
            break


    print("Train Loss: {:.3f}".format(avg_loss * 100), end=" ")
    print("Test Loss: {:.3f}".format(avg_loss_test * 100), end=" ")
    print("Train Accuracy: {:.2f}%".format(avg_acc * 100), end=" ")
    print("Test Accuracy: {:.2f}%".format(avg_acc_test * 100), end=" ")
    print("Time: {:.2f}s".format(time.time() - time_ckpt), end=" \n")


In [None]:
plots=[(loss_hist,loss_hist_test),(acc_hist,acc_hist_test)]
plt_labels=[("Training Loss","Test Loss"),("Training Accuracy","Test Accuracy")]
plt_titles=["Loss","Accuracy"]
plt.figure(figsize=(20,7))
for i in range(0,2):
    ax=plt.subplot(1,2,i+1)
    ax.plot(plots[i][0],label=plt_labels[i][0])
    ax.plot(plots[i][1],label=plt_labels[i][1])
    ax.set_title(plt_titles[i])
    ax.legend()

In [None]:

# Selecting the best model
best_acc = max(acc_hist_test)
best_epoch = acc_hist_test.index(best_acc)+1

print("Best accuracy on test set: {:.2f}%".format(best_acc*100))
print("Best epoch: {}".format(best_epoch))

# Load the best model
with open(f"Models/shuffleNetv2_model_{best_epoch}.pth","rb") as f:
    model=pickle.load(f)


In [None]:
pred_vec = []
label_vec = []
correct = 0
test_loss = 0.0
avg_test_loss = 0.0

model.to(device)
model.eval()
with torch.no_grad():
    for data in test_loader:
        batch, labels = data
        batch, labels = batch.to(device), labels.to(device)
        outputs = model(batch)
        loss = criterion(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        test_loss += loss.item()
        pred_vec.extend(predicted.cpu().numpy())  # Convert tensor to numpy array
        label_vec.extend(labels.cpu().numpy())  # Convert tensor to numpy array

    avg_test_loss = test_loss / len(test_set)

pred_vec = np.array(pred_vec)
label_vec = np.array(label_vec)


In [None]:
print(F"Test Loss: {avg_test_loss}")
print(F"Test Accuracy on the {len(test_set)} test images: {(100 * correct / len(test_set))}%")

In [None]:
# Create confusion matrix
from sklearn.metrics import confusion_matrix
confusion_mat = confusion_matrix(label_vec, pred_vec)
# Convert confusion matrix to pandas DataFrame
labels = np.unique(label_vec)
confusion_df = pd.DataFrame(confusion_mat, index=classes, columns=classes)
print("Confusion Matrix")
confusion_df

In [None]:
# Create a report to show the f1-score, precision, recall
from sklearn.metrics import classification_report

report = pd.DataFrame.from_dict(classification_report(pred_vec,label_vec,output_dict=True)).T
report['Label']=[classes[int(x)] if x.isdigit() else " " for x in report.index]
report=report[['Label','f1-score','precision','recall','support']]
report

In [None]:
from google.colab import sheets
sheet = sheets.InteractiveSheet(df=report)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from PIL import Image
from torchvision import datasets, transforms

# 下载并安装中文字体
!apt-get -qq install -y fonts-noto-cjk

import matplotlib.font_manager as fm

# 查找可用的 Noto 字体
font_dirs = fm.findSystemFonts(fontpaths=None)
print("Available fonts:", font_dirs)

# 设置使用的中文字体
font_path = "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"  # 正确的字体路径
font_prop = fm.FontProperties(fname=font_path)
# obtain one batch of test images
images, labels = next(iter(test_loader))
model.cpu()

# get sample outputs
output = model(images)
# convert output probabilities to predicted class
_, preds = torch.max(output, 1)

# Create a 4x4 grid for displaying the images
fig, axes = plt.subplots(4, 4, figsize=(8, 8))

# Iterate over the images and display them in the grid
for idx, ax in enumerate(axes.flat):
  # Normalize the image tensor to [0, 1] range
  image = images[idx].permute(1, 2, 0)
  image = (image - image.min()) / (image.max() - image.min())
  ax.imshow(image)  # Display the image
  ax.axis('off')  # Hide the axes
  ax.set_title("{}".format(classes[preds[idx]]),
                 color=("green" if preds[idx]==labels[idx] else "red"))  # Add title to the image
plt.show()

In [None]:
# Define the loader for all test data
test_set = datasets.ImageFolder(root=test_path, transform=test_transform)
test_loader = DataLoader(test_set, batch_size=32, shuffle=False)



# obtain one batch of test images
dataiter = iter(test_set)
images, labels = next(iter(test_loader))
model.cpu()

# get sample outputs
output = model(images)
# convert output probabilities to predicted class
_, preds = torch.max(output, 1)


fig = plt.figure(figsize=(15, 7))
fig.subplots_adjust(left=0, right=1, bottom=0, top=1, hspace=0.05, wspace=0.05)

for idx in range(50):
    # Normalize the image tensor to [0, 1] range
    image = images[idx].permute(1, 2, 0)
    image = (image - image.min()) / (image.max() - image.min())
    ax = fig.add_subplot(5, 10, idx + 1, xticks=[], yticks=[])
    ax.imshow(image, interpolation='nearest')

    if preds[idx]==labels[idx]:
      ax.text(0, 3, str(classes[preds[idx].item()]), color='green')
    else:
      ax.text(0, 3, str(classes[preds[idx].item()]), color='red')

plt.show()


In [None]:

# Visualize wrongly classified image for each class
pred_vec_all = []
correct = 0
test_loss = 0.0

model.to(device)
model.eval()
with torch.no_grad():
    for data in test_loader_all:
        batch, labels = data
        batch, labels = batch.to(device), labels.to(device)
        outputs = model(batch)
        test_loss=criterion(outputs, labels)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        pred_vec_all.append(predicted)
    pred_vec_all = torch.cat(pred_vec_all)

pred_vec_all = pred_vec_all.cpu().numpy()
ground_truths = np.asarray(test_set_all.targets)
incorrect_mask = pred_vec_all != ground_truths
incorrect_images = [test_set_all.data[(ground_truths == label) & incorrect_mask][0] for label in range(10)]
pred_results_all = [pred_vec_all[(ground_truths == label) & incorrect_mask][0] for label in range(10)]

# show images
fig, axes = plt.subplots(2, 5, figsize=(12, 6))
i = 0
for row in axes:
  for axis in row:
    axis.set_xticks([])
    axis.set_yticks([])
    axis.set_xlabel("Predicted: %s" % classes[pred_results_all[i]], fontsize=10)
    axis.imshow(incorrect_images[i], cmap='gray')
    i += 1