<a href="https://colab.research.google.com/github/fboldt/aulasann/blob/main/aula07d_mnist_conv_torch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from tensorflow.keras.datasets import mnist
(train_images, train_labels), (test_images, test_labels) = mnist.load_data()
print(train_images.shape)
print(train_labels.shape)
print(test_images.shape)
print(test_labels.shape)

(60000, 28, 28)
(60000,)
(10000, 28, 28)
(10000,)


In [2]:
import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [3]:
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.metrics import accuracy_score

class TorchCNN2D(nn.Module):
  def __init__(self, num_classes):
    super(TorchCNN2D, self).__init__()
    self.conv1 = nn.Conv2d(in_channels=1, out_channels=4, kernel_size=4)
    self.flatten = nn.Flatten()
    self.fc1 = nn.Linear(in_features=4*25*25, out_features=512)
    self.fc2 = nn.Linear(in_features=512, out_features=num_classes)
  def forward(self, x):
    x = F.relu(self.conv1(x))
    x = self.flatten(x)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    x = F.softmax(x, dim=1)
    return x

class TorchWrappedNN(BaseEstimator, ClassifierMixin):
  def __init__(self, epochs=5, batch_size=128, model_fabric=TorchCNN2D):
    self.epochs = epochs
    self.batch_size = batch_size
    self.model_fabric = model_fabric

  def fit(self, X, y):
    self.labels, ids = torch.unique(torch.tensor(y), return_inverse=True)
    self.model = self.model_fabric(len(self.labels)).to(device)
    self.criterion = nn.CrossEntropyLoss()
    self.optimizer = optim.RMSprop(self.model.parameters(), lr=0.001)

    train_dataset = TensorDataset(
        torch.tensor(X, dtype=torch.float32),
        torch.tensor(ids, dtype=torch.long))
    train_loader = DataLoader(train_dataset, batch_size=self.batch_size)
    for epoch in range(self.epochs):
      for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        self.optimizer.zero_grad()
        outputs = self.model(inputs)
        loss = self.criterion(outputs, labels)
        loss.backward()
        self.optimizer.step()
    return self

  def predict(self, X):
    with torch.no_grad():
      inputs = torch.tensor(X, dtype=torch.float32).to(device)
      outputs = self.model(inputs)
      return self.labels[torch.argmax(outputs, dim=1).cpu().numpy()]


In [4]:
from sklearn.base import TransformerMixin
from sklearn.pipeline import Pipeline

class Divide255(BaseEstimator, TransformerMixin):
  def fit(self, X, y=None):
    return self
  def transform(self, X):
    return X / 255.0

class Shape2Torch(BaseEstimator, TransformerMixin):
  def fit(self, X, y=None):
    return self
  def transform(self, X):
    return X.reshape((-1, 1, 28, 28))

pipeline = Pipeline([
    ("scaler", Divide255()),
    ("shape2Torch", Shape2Torch()),
    ("model", TorchWrappedNN())
])

pipeline.fit(train_images, train_labels)
y_pred = pipeline.predict(test_images)
accuracy_score(test_labels, y_pred)

  torch.tensor(ids, dtype=torch.long))


0.9823

In [11]:
class TorchCNN2D(nn.Module):
  def __init__(self, input_shape, output_shape):
    super(TorchCNN2D, self).__init__()
    self.conv1 = nn.Conv2d(in_channels=input_shape[0],
                           out_channels=32, kernel_size=3)
    self.pool1 = nn.MaxPool2d(kernel_size=2)
    self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
    self.pool2 = nn.MaxPool2d(kernel_size=2)
    self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3)
    self.flatten = nn.Flatten()
    self.fc = nn.Linear(in_features=1152, out_features=output_shape)

  def forward(self, x):
    x = F.relu(self.conv1(x))
    x = self.pool1(x)
    x = F.relu(self.conv2(x))
    x = self.pool2(x)
    x = F.relu(self.conv3(x))
    x = self.flatten(x)
    x = self.fc(x)
    x = F.softmax(x, dim=1)
    return x

input_shape = (1, 28, 28)
output_shape = 10
model = TorchCNN2D(input_shape, output_shape)

print(model)

TorchCNN2D(
  (conv1): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1))
  (pool1): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1))
  (pool2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (fc): Linear(in_features=1152, out_features=10, bias=True)
)


In [21]:
class TorchWrappedNN(BaseEstimator, ClassifierMixin):
  def __init__(self, epochs=5, batch_size=128, model_fabric=TorchCNN2D):
    self.epochs = epochs
    self.batch_size = batch_size
    self.model_fabric = model_fabric

  def fit(self, X, y):
    self.labels, ids = torch.unique(torch.tensor(y), return_inverse=True)
    self.model = self.model_fabric(X.shape[1:], len(self.labels)).to(device)
    self.criterion = nn.CrossEntropyLoss()
    self.optimizer = optim.RMSprop(self.model.parameters(), lr=0.001)

    train_dataset = TensorDataset(
        torch.tensor(X, dtype=torch.float32),
        torch.tensor(ids, dtype=torch.long))
    train_loader = DataLoader(train_dataset, batch_size=self.batch_size)
    for epoch in range(self.epochs):
      for data in train_loader:
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        self.optimizer.zero_grad()
        outputs = self.model(inputs)
        loss = self.criterion(outputs, labels)
        loss.backward()
        self.optimizer.step()
    return self

  def predict(self, X):
    with torch.no_grad():
      inputs = torch.tensor(X, dtype=torch.float32).to(device)
      outputs = self.model(inputs)
      return self.labels[torch.argmax(outputs, dim=1).cpu().numpy()]

pipeline = Pipeline([
    ("scaler", Divide255()),
    ("shape2Torch", Shape2Torch()),
    ("model", TorchWrappedNN())
])

pipeline.fit(train_images, train_labels)
y_pred = pipeline.predict(test_images)
accuracy_score(test_labels, y_pred)

  torch.tensor(ids, dtype=torch.long))


0.9887