In [None]:
# 输出图片的大小
imsize = 512 if torch.cuda.is_available() else 128  # use small size if no gpu

# 改变图片大小
loader = transforms.Compose([
    transforms.Resize(imsize),
    transforms.ToTensor()])

def image_loader(image_name):
    image = Image.open(image_name)
    # unsqueeze(n) 在第n维上增加维度
    image = loader(image).unsqueeze(0)
    return image.to(device, torch.float)

style_img = image_loader("./data/images/neural-style/picasso.jpg")
content_img = image_loader("./data/images/neural-style/dancing.jpg")

assert style_img.size() == content_img.size(), \
"we need to import style and content images of the same size"

# assert（断言）用于判断一个表达式，在表达式条件为 false 的时候触发异常。



class ContentLoss(nn.Module):
    def __init__(self, target):
        super(ContentLoss, self).__init__()
        # 我们从用于动态计算梯度的树中“分离”目标内容：
        # 这是一个声明的值，而不是变量。否则标准的正向方法将引发错误。 
        self.target = target.detach()
    
    def forward(self, input):
        self.loss = F.mse_loss(input, self.target)
        return input

    
    
def gram_matrix(input):
    a, b, c, d = input.size()
    # a = batch size(=1)
    # b = number(特征映射) 
    # (c,d) = dimensions of a f. map (N=c*d)
    
    features = input.view(a * b, c * d)
    
    G = torch.mm(features, features.t())
    
    return G.div(a * b * c * d)



class StyleLoss(nn.Module):
    def __init__(self, target_feature):
        super(StyleLoss, self).__init__()
        self.target = gram_matrix(target_feature).detach()
    
    def forward(self, input):
        G = gram_matrix(input)
        self.loss = F.mse_loss(G, self.target)
        return input



cnn_normalization_mean = torch.tensor([0.485, 0.456, 0.406]).to(device)
cnn_normalization_std = torch.tensor([0.229, 0.224, 0.225]).to(device)

# 创建一个模块来规范化输入图像 
# 这样就可以轻松地将它放入nn.Sequential中 
class Normalization(nn.Module):
    def __init__(self, mean, std):
        super(Normalization, self).__init__()
        self.mean = torch.tensor(mean).view(-1, 1, 1)
        self.std = torch.tensor(std).view(-1, 1, 1)
    
    def forward(self, img):
        return (img - self.mean) / self.std





In [None]:
content_layers_default = ['conv_4']
style_layers_default = ['conv_1', 'conv_2', 'conv_3', 'conv_4', 'conv_5']

def get_style_model_and_losses(cnn, normalization_mean, normalization_std, 
                               style_img, content_img, 
                               content_layers = content_layers_default,
                               style_layers = style_layers_default):
    cnn = copy.deepcopy(cnn)
    
    normalization = Normalization(normalization_mean, normalization_std).to(device)
    
    content_losses = []
    style_losses = []
    
    # 创建一个新的nn.Sequential来放入应该按顺序激活的模块 
    model = nn.Sequential(normalization)
    
    i = 0 # 记录conv层的数量
    for layer in cnn.children():
        if isinstance(layer, nn.Conv2d):
            i += 1
            name = 'conv_{}'.format(i)
        elif isinstance(layer, nn.ReLU):
            name = 'relu_{}'.format(i)
            layer = nn.ReLU(inplace = False)
            # inplace-选择是否进行覆盖运算
            # 从上层网络Conv2d中传递下来的tensor直接进行修改，这样能够节省运算内存，不用多存储其他变量
        elif isinstance(layer, nn.MaxPool2d):
            name = 'pool_{}'.format(i)
        elif isinstance(layer, nn.BatchNorm2d):
            name = 'bn_{}'.format(i)
        else:
            raise RuntimeError('Unrecognized layer:{}'.format(layer.__class__.__name__))
        model.add_module(name, layer)
        
        if name in content_layers:
            target = model(content_img).detach()
            content_loss = ContentLoss(target)
            model.add_module("content_loss_{}".format(i), content_loss)
            content_losses.append(content_loss)
                
        if name in style_layers:
            target_feature = model(style_img).detach()
            style_loss = StyleLoss(target_feature)
            model.add_module("style_loss_{}".format(i), style_loss)
            style_losses.append(style_loss)
        
        for i in range (len(model) - 1, -1, -1):
            if isinstance(model[i], ContentLoss) or isinstance(model[i], StyleLoss):
                break
        
        model = model[:(i + 1)]
        
        return model, style_losses, content_losses

In [None]:
def run_style_transfer(cnn, normalization_mean, normalizartion_std,
                       content_img, style_img, input_img, num_steps = 100,
                       style_weight = 1000000, content_weight = 1):
    print("Building the style tranfer model ...")
    model, style_losses, content_losses = get_style_model_and_losses(cnn,
                                                                     normalization_mean, normalizartion_std, 
                                                                     style_img, content_img) 
    optimizer = get_input_optimizer(input_img)
    
    print("Optimizing ...")
    run = [0]
    while run[0] <= num_steps:
        # 更正更新的输入图像的值
        def closure():
            input_img.data.clamp_(0, 1)
            optimizer.zero_grad()
            model(input_img)
            style_score = 0
            content_score = 0
            
            for sl in style_losses:
                style_score += sl.loss
            for cl in content_losses:
                content_score += cl.loss
            
            style_score *= style_weight
            content_score *= content_weight
            loss = style_score + content_score
            loss.backward()
            
            run[0] += 1
            if run[0] % 50 == 0:
                print("run {}:".format(run))
                print('Style Loss : {:4f} Content Loss: {:4f}'.format(
                    style_score.item(), content_score.item()))
                print()
            
            return style_score + content_score
        
        optimizer.step(closure)
    
    # 最后修正
    input_img.data.clamp_(0, 1)
    return input_img

In [1]:
import torch
import torchvision
import numpy as np
import torch.nn as nn
from torchvision import datasets, transforms
import matplotlib .pyplot as plt
import torch.nn.functional as F
import torch.optim as optim

plt.ion()

In [2]:
Root = "./data"
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

train_loader = torch.utils.data.DataLoader(
    datasets.MNIST(root = Root, train = True, download = True,
                  transform = transforms.Compose([
                      transforms.ToTensor(),
                      transforms.Normalize((0.1307,), (0.3081,))
                  ])), batch_size = 64, shuffle = True, num_workers = 0)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST(root = Root, train = False,
                  transform = transforms.Compose([
                      transforms.ToTensor(),
                      transforms.Normalize((0.1307,), (0.3081,))
                  ])), batch_size = 64, shuffle = True, num_workers = 0)

In [3]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size = 5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size = 5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)
        
        # STN
        self.localization = nn.Sequential(
            nn.Conv2d(1, 8, kernel_size = 7),
            nn.MaxPool2d(2, stride = 2),
            nn.ReLU(True),
            nn.Conv2d(8, 10, kernel_size = 5),
            nn.MaxPool2d(2, stride = 2),
            nn.ReLU(True)
        )
        
        # θ：3×2
        self.fc_loc = nn.Sequential(
            nn.Linear(10 * 3 * 3, 32),
            nn.ReLU(True),
            nn.Linear(32, 3 * 2)
        )
        
        # 初始化
        self.fc_loc[2].weight.data.zero_()
        self.fc_loc[2].bias.data.copy_(torch.tensor([1, 0, 0, 0, 1, 0],
                                                    dtype=torch.float))
        
    def stn(self, x):
        xs = self.localization(x)
        xs = xs.view(-1, 10 * 3 * 3)
        theta = self.fc_loc(xs)
        theta = theta.view(-1, 2, 3)
        
        grid = F.affine_grid(theta, x.size())
        x = F.grid_sample(x, grid)
        
        return x
    
    def forward(self, x):
        # transform the input
        x = self.stn(x)
        
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training = self.training)
        x = self.fc2(x)
        return F.log_softmax(x, dim = 1)

In [4]:
model = Net().to(device)

In [5]:
optimizer = optim.SGD(model.parameters(), lr = 0.01)

In [6]:
def train(epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % 500 == 0:
            print("Train epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

In [7]:
def test():
    with torch.no_grad():
        model.eval()
        test_loss = 0
        correct = 0
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            
            test_loss += F.nll_loss(output, target, size_average = False).item()
            pred = output.max(1, keepdim = True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()
            # eq(): 遍历
            
        test_loss /= len(test_loader.dataset)
        print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))

In [8]:
for epoch in range(1, 21):
    train(epoch)
    test()








Test set: Average loss: 0.2290, Accuracy: 9363/10000 (94%)


Test set: Average loss: 0.1330, Accuracy: 9595/10000 (96%)


Test set: Average loss: 0.1236, Accuracy: 9607/10000 (96%)


Test set: Average loss: 0.0954, Accuracy: 9704/10000 (97%)


Test set: Average loss: 0.0807, Accuracy: 9742/10000 (97%)


Test set: Average loss: 0.0783, Accuracy: 9747/10000 (97%)


Test set: Average loss: 0.0699, Accuracy: 9773/10000 (98%)


Test set: Average loss: 0.0589, Accuracy: 9814/10000 (98%)


Test set: Average loss: 0.0759, Accuracy: 9768/10000 (98%)


Test set: Average loss: 0.0950, Accuracy: 9703/10000 (97%)


Test set: Average loss: 0.0987, Accuracy: 9701/10000 (97%)


Test set: Average loss: 0.0509, Accuracy: 9849/10000 (98%)


Test set: Average loss: 0.0553, Accuracy: 9820/10000 (98%)


Test set: Average loss: 0.0815, Accuracy: 9779/10000 (98%)


Test set: Average loss: 0.0542, Accuracy: 9832/10000 (98%)


Test set: Average loss: 0.0693, Accuracy: 9774/10000 (98%)


Test set: Average loss:

In [9]:
# 可视化 STN 结果
def convert_image_np(inp):

    inp = inp.numpy().transpose((1,2,0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])

    inp = std*inp +mean

    inp = np.clip(inp, 0, 1)
    return inp

# STN 可视化一批输入图像和相应变换批次

def visualize_stn():
    with torch.no_grad():
        data = next(iter(test_loader))[0].to(device)
        input_tensor = data.cpu()
        transformed_input_tensor = model.stn(data).cpu()

        in_grid = convert_image_np(
            torchvision.utils.make_grid(input_tensor))
        out_grid = convert_image_np(
            torchvision.utils.make_grid(transformed_input_tensor))

        # Plot the results side-by_side
        f, axarr= plt.subplots(1,2)
        axarr[0].imshow(in_grid)
        axarr[0].set_title("Dataset Images")

        axarr[1].imshow(out_grid)
        axarr[1].set_title("Transformed Images")


In [None]:
visualize_stn()