# 连接云盘

In [0]:
from google.colab import drive
drive.mount('/content/drive/')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive/


# import

In [0]:
import torch
from torch.autograd import Variable
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms, models
from torchvision.datasets import ImageFolder
import torchvision.models as models
import numpy as np
import cv2
from PIL import Image
import os
from matplotlib import pyplot as plt


# 全局变量

In [0]:

batch_size = 100
test_batch_size = 10
mini_batch_size = 10
epochs = 5
lr = 0.01
gamma = 0.9
no_cuda = True
seed = 1
log_interval = 10
save_model = True

base_dirs = ['pinghe/', 'qixu/', 'qiyu/', 'shire/', 'tanshi/', 'xueyu/', 'yangxu/', 'yinxu/']
root = "./drive/My Drive/data/dataset6/"

loss_fun = nn.CrossEntropyLoss()  #交叉熵

# MyDataset类

In [0]:

def default_loader(path):
    normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],std=[0.229, 0.224, 0.225])
    preprocess = transforms.Compose([transforms.ToTensor(),normalize])

    img_pil =  Image.open(path)    
    img_tensor = preprocess(img_pil)
    return img_tensor


class MyDataset(Dataset):
    def __init__(self, root, base_dirs,loader=default_loader):
        self.image_label_list, self.image_path_list  = self.read_file(root,base_dirs)
        self.root = root
        self.base_dirs=base_dirs
        self.len = len(self.image_label_list)
        self.loader = loader
  
    def __getitem__(self, i):
        index = i
        label = self.image_label_list[index]
        path = self.image_path_list[index]        
        img = self.loader(path)        
        return img, label

    def __len__(self):
        data_len = len(self.image_label_list)
        return data_len

    #返回标签列表、目录列表
    def read_file(self,root,base_dirs):
        image_label_list = []
        image_path_list = []

        for i in range(len(base_dirs)):
            dir=root+base_dirs[i]
            listImages = [dir+ Image for Image in (os.listdir(dir))]
            for file in listImages:
                image_label_list.append(i)
                image_path_list.append(file)

        return image_label_list,image_path_list




# 非k折验证

## train

In [0]:


def train(model, device, train_loader, optimizer, epoch):
    model.train()
    tot_loss=0.0
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()

        # forward backward
        output = model(data)
        loss = loss_fun(output, target)
        loss.backward()
        optimizer.step()
        tot_loss += loss.item()
        
        
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                       100. * batch_idx / len(train_loader), loss.item()))
        
        #print(tot_loss/len(train_loader))
        #print('\ntrain set: Average loss: {:.4f}'.format(tot_loss/len(train_loader)))





## test

In [0]:

def test( model, device, test_loader):
    model.eval()
    test_loss = 0.0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss+=loss_fun(output, target)
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))
    


## main

In [0]:


def main():
    use_cuda = not no_cuda and torch.cuda.is_available()
    device = torch.device("cuda" if use_cuda else "cpu")
    torch.manual_seed(seed)    

    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    

    train_data=MyDataset(root, base_dirs)
    test_data=MyDataset(root, base_dirs)

    train_loader = torch.utils.data.DataLoader(train_data,
                            batch_size=batch_size, shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(test_data,
                            batch_size=test_batch_size, shuffle=True, **kwargs)
    '''
    model =Net().to(device)    
    '''
    
    model=models.resnet18(pretrained=True)
    model.fc = nn.Linear(512,8)
    model=model.to(device)
    
    #交叉熵
    loss_fun = nn.CrossEntropyLoss()
    #momentumSGD
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=0.005)

    scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
    for epoch in range(1, epochs + 1):
        train(model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    if save_model:
        torch.save(model.state_dict(), "res01.pt")



main()

NameError: ignored

# k折交叉验证

## get_k_fold_data

In [0]:
k = 10

def get_k_fold_data(k, i, X, y):
  # 返回第i折交叉验证时所需要的训练和验证数据
  assert k > 1
  fold_size = X.shape[0]
  X_train, y_train = None, None
  for j in range(k):
    idx = slice(j * fold_size, (j + 1) * fold_size)
    X_part, y_part = X[idx, :], y[idx]
    
    if j == i:
      X_valid, y_valid = X_part, y_part
    elif X_train is None:
      X_train, y_train = X_part, y_part
    else:
      X_train = torch.cat((X_train, X_part), dim=0)
      y_train = torch.cat((y_train, y_part), dim=0)
  return X_train, y_train, X_valid, y_valid

## k_fold

In [0]:
#训练k次并返回训练和验证的平均误差

def k_fold(k, X_train, y_train, num_epochs,learning_rate, weight_decay, batch_size):
  train_l_sum, valid_l_sum = 0, 0
  train_acc_sum ,valid_acc_sum = 0,0

  for i in range(k):
    data = get_k_fold_data(k, i, X_train, y_train)
    net = Net()

    train_ls, valid_ls = train(net, *data, num_epochs,learning_rate,weight_decay, batch_size)
    train_l_sum += train_ls[-1]
    valid_l_sum += valid_ls[-1]

    if i == 0:
      d2l.semilogy(range(1, num_epochs + 1), train_ls,'epochs', 'rmse',range(1, num_epochs + 1), valid_ls,['train', 'valid'])
    print('fold %d, train rmse %f, valid rmse %f' % (i,train_ls[-1], valid_ls[-1]))
    
  return train_l_sum / k, valid_l_sum / k

In [0]:
def semilogy(x_vals, y_vals, x_label, y_label, x2_vals=None,y2_vals=None,
  legend=None, figsize=(3.5, 2.5)):
  d2l.set_figsize(figsize)
  d2l.plt.xlabel(x_label)
  d2l.plt.ylabel(y_label)
  d2l.plt.semilogy(x_vals, y_vals)
  if x2_vals and y2_vals:
    d2l.plt.semilogy(x2_vals, y2_vals, linestyle=':')
    d2l.plt.legend(legend)

## train

In [0]:
def train(net, train_features, train_labels, test_features, test_labels, num_epochs, learning_rate,weight_decay, batch_size):
    train_ls, test_ls = [], [] ##存储train_loss,test_loss
    dataset = TraindataSet(train_features, train_labels) 
    train_iter = DataLoader(dataset, batch_size, shuffle=True) 
    ### 将数据封装成 Dataloder 对应步骤（2）
    
    #这里使用了Adam优化算法
    optimizer = torch.optim.Adam(params=net.parameters(), lr= learning_rate, weight_decay=weight_decay)
    
    for epoch in range(num_epochs):
        for X, y in train_iter:  ###分批训练 
            output  = net(X)
            loss = loss_func(output,y)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        ### 得到每个epoch的 loss 和 accuracy 
        train_ls.append(log_rmse(0,net, train_features, train_labels)) 
        if test_labels is not None:
            test_ls.append(log_rmse(1,net, test_features, test_labels))
    #print(train_ls,test_ls)
    return train_ls, test_ls

## log_rmse

In [0]:

def log_rmse(flag,net,x,y):
    if flag == 1: ### valid 数据集
        net.eval()
    output = net(x)
    result = torch.max(output,1)[1].view(y.size())
    corrects = (result.data == y.data).sum().item()
    accuracy = corrects*100.0/len(y)  #### 5 是 batch_size
    loss = loss_func(output,y)
    net.train()
    
    return (loss.data.item(),accuracy)

## main

In [0]:
k = 10
 
k_fold(10,x,label)