<a href="https://colab.research.google.com/github/358Xin/DL/blob/main/CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!gdown --id '1awF7pZ9Dz7X1jn1_QAiKN-_v56veCEKy' --output food-11.zip
!unzip -q food-11.zip

Downloading...
From: https://drive.google.com/uc?id=1awF7pZ9Dz7X1jn1_QAiKN-_v56veCEKy
To: /content/food-11.zip
100% 963M/963M [00:03<00:00, 251MB/s]


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import ConcatDataset, Dataset, Subset
from torchvision.datasets import DatasetFolder
from tqdm.auto import tqdm

In [None]:
train_transform = transforms.Compose([transforms.Resize((128,128)), transforms.ToTensor()])
test_transform = transforms.Compose([transforms.Resize((128,128)), transforms.ToTensor()])

In [None]:
from torch.functional import broadcast_shapes
from torch.utils.data.dataloader import DataLoader
batch_size = 128

train_set = DatasetFolder("food-11/training/labeled", loader=lambda x: Image.open(x), extensions="jpg", transform=train_transform)
valid_set = DatasetFolder("food-11/validation", loader=lambda x: Image.open(x), extensions="jpg", transform=test_transform)
unlabeled_set = DatasetFolder("food-11/training/unlabeled", loader=lambda x: Image.open(x), extensions="jpg", transform=train_transform)
test_set = DatasetFolder("food-11/testing", loader=lambda x: Image.opne(x), extensions="jpg", transform=test_transform)

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
from torch.nn.modules.pooling import MaxPool2d
from torch.nn.modules.conv import Conv2d
from torch.nn.modules.batchnorm import BatchNorm2d
class Classifier(nn.Module):
  def __init__(self):
    super(Classifier, self).__init__()

    self.cnn_layers = nn.Sequential(
        nn.Conv2d(3, 64, 3, 1, 1),
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(2,2,0),

        nn.Conv2d(64, 128, 3, 1, 1),
        nn.BatchNorm2d(128),
        nn.ReLU(),
        nn.MaxPool2d(2, 2, 0),

        nn.Conv2d(128, 256, 3, 1, 1),
        nn.BatchNorm2d(256),
        nn.ReLU(),
        nn.MaxPool2d(4, 4, 0)
    )

    self.fc_layers = nn.Sequential(
        nn.Linear(256*8*8, 256),
        nn.ReLU(),
        nn.Linear(256, 256),
        nn.ReLU(),
        nn.Linear(256, 11)
    )
  
  def forward(self, x):
    x = self.cnn_layers(x)
    x = x.flatten(1)
    x = self.fc_layers(x)
    return x

In [None]:
def get_pseudo_labels(dataset, model, threshold=0.65):
  device = 'cuda' if torch.cuda.is_available() else 'cpu'
  
  data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=False)

  model.eval()
  softmax = nn.Softmax(-1)
  for batch in tqdm(data_loader):
    img, _ = batch
    with torch.no_grad():
      logits = model(img.to(device))
    probs = softmax(logits)
  model.train()
  return dataset

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model = Classifier().to(device)
model.device = device

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0003, weight_decay=1e-5)
n_epochs = 80
do_semi = False

for epoch in range(n_epochs):
  if do_semi:
    pseudo_set = get_pseudo_labels(unlabeled_set, model)
    concat_dataset = ConcatDataset([train_set, pseudo_set])
    train_loader = DataLoader(concat_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
  model.train()
  train_loss = []
  train_acc = []

  for batch in tqdm(train_loader):
    img, labels = batch
    logits = model(img.to(device))          #计算结果
    loss = criterion(logits, labels.to(device))   #交叉熵损失
    optimizer.zero_grad()                #零梯度开始 
    loss.backward()                   #参数求梯度
    grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)
    optimizer.step()                  #更新参数
    acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()
    train_loss.append(loss.item())
    train_acc.append(acc)
  mean_train_loss = sum(train_loss) / len(train_loss)   #计算平均损失
  mean_train_acc = sum(train_acc) / len(train_acc)    #计算平均准确率
  print(f"[Train | {epoch+1:d} / {n_epochs:d}] loss = {mean_train_loss:.5f}, acc = {mean_train_acc:.5f}")

  model.eval()
  valid_loss = []
  valid_acc = []

  for batch in tqdm(valid_loader):
    img, labels = batch
    with torch.no_grad():
      logits = model(img.to(device))
    loss = criterion(logits, labels.to(device))
    acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()
    valid_loss.append(loss)
    valid_acc.append(acc)
  mean_valid_loss = sum(valid_loss) / len(valid_loss)
  mean_valid_acc = sum(valid_acc) / len(valid_acc)
  print(f"[Valid | {epoch+1:d} / {n_epochs:d}] loss = {mean_valid_loss:.5f}, acc = {mean_valid_acc:.5f}")

In [None]:
model.eval()
prediction = []

for batch in tqdm(test_loader):
  img, labels = batch
  with torch.no_grad():
    logits = model(img.to(device))
  prediction.extend(logits.argmax(dim=-1).cpu().numpy().tolist())

with open("prediction.csv", "w") as f:
  f.write("Id, Category\n")
  for i, pred in enumerate(prediction):
    f.write(f"{i}, {pred}\n")