Faça o upload de um arquivo .zip com os seus dados e use o comando abaixo após a finalização do upload para a extração do conteúdo do arquivo no colab.

Implemente as funções a seguir.

In [25]:
from torch.utils.data import Dataset, DataLoader
import torch
from torch import nn
import os
import pandas as pd
from torchvision.io import read_image
from torchvision import transforms
import numpy as np
import random

In [26]:

def add_random_shadow(image):
    # Adiciona sombra aleatória como uma fatia vertical da imagem
    h, w = image.shape[1], image.shape[2]
    x1, x2 = np.random.choice(w, 2, replace=False)
    k = h / (x2 - x1)
    b = - k * x1
    shadow_intensity = 0.5
    for i in range(h):
        c = int((i - b) / k)
        if c > 0 and c < w:
            image[:, i, :c] *= shadow_intensity
    return image

class CarSimDataset(Dataset):
    def __init__(self, annotations_file, img_dir, transform=None, augment=True):
        self.img_labels = pd.read_csv(annotations_file)
        self.img_dir = img_dir
        self.transform = transform
        self.augment = augment
        self.cameras = ['left', 'center', 'right']
        self.cameras_steering_correction = [.25, 0., -.25]

    def __getitem__(self, idx):
        camera_idx = np.random.randint(len(self.cameras)) if self.augment else 1
        camera = self.cameras[camera_idx]
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx][camera].strip())
        image = read_image(img_path).float() / 255.0  # Ler a imagem e normalizar os pixels
        angle = self.img_labels.iloc[idx]['steer'] + self.cameras_steering_correction[camera_idx]

        if self.augment:
            # Adiciona sombra aleatória
            image = add_random_shadow(image)
            # Deslocamento vertical aleatório
            v_delta = random.uniform(-0.05, 0.05)
            top_offset = 0.375 + v_delta
            bottom_offset = 0.125 + v_delta
            top = int(top_offset * image.shape[1])
            bottom = int(bottom_offset * image.shape[1])
            image = image[:, top:-bottom, :]
        
        if self.transform:
            image = self.transform(image)
        
        return image, angle

    def __len__(self):
        return len(self.img_labels)


Crie as variáveis a seguir a partir da classe implementada acima.

In [27]:
# Transformações específicas do PyTorch
transform = transforms.Compose([
    transforms.Resize((28, 28)),  # Redimensiona a imagem
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalização
])

# Configuração do Dataset de Treinamento
train_dataset = CarSimDataset(
    annotations_file='C:\\Users\\moura\\Searches\\behavior-cloning\\todososdados.csv',
    img_dir='DataAndLoader\\IMG',
    transform=transform,
    augment=True  # Ativa aumento de dados para treinamento
)

# Configuração do Dataset de Teste
test_dataset = CarSimDataset(
    annotations_file='C:\\Users\\moura\\Searches\\behavior-cloning\\todososdados.csv',
    img_dir='DataAndLoader\\IMG',
    transform=transform,
    augment=False  # Desativa aumento de dados para teste
)

In [28]:

# DataLoader para o Treinamento
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)

# DataLoader para o Teste
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Exemplo de como iterar sobre o DataLoader de Teste
for images, angles in test_dataloader:
    print(f"Shape of images: {images.shape}, Shape of angles: {angles.shape}")
    break  # Mostra apenas o primeiro lote

Shape of images: torch.Size([64, 3, 28, 28]), Shape of angles: torch.Size([64])


O código abaixo será utilizado para treinar o modelo com o seu dataset.

In [29]:
for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([64, 3, 28, 28])
Shape of y: torch.Size([64]) torch.float32


In [30]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28*28*3, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 1)
        )

    def forward(self, x):
        x = self.flatten(x)
        logits = self.linear_relu_stack(x)
        return logits.squeeze()

model = NeuralNetwork().to(device)
print(model)

Using cpu device
NeuralNetwork(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (linear_relu_stack): Sequential(
    (0): Linear(in_features=2352, out_features=512, bias=True)
    (1): ReLU()
    (2): Linear(in_features=512, out_features=512, bias=True)
    (3): ReLU()
    (4): Linear(in_features=512, out_features=1, bias=True)
  )
)


In [31]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [32]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [33]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            # correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    # correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [34]:
epochs = 10
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

Epoch 1
-------------------------------
loss: 0.066660  [   64/22277]
loss: 0.045485  [ 6464/22277]
loss: 0.037565  [12864/22277]
loss: 0.039113  [19264/22277]
Test Error: 
 Accuracy: 0.0%, Avg loss: 0.011126 

Epoch 2
-------------------------------
loss: 0.041347  [   64/22277]
loss: 0.041063  [ 6464/22277]
loss: 0.024976  [12864/22277]
loss: 0.040716  [19264/22277]
Test Error: 
 Accuracy: 0.0%, Avg loss: 0.010695 

Epoch 3
-------------------------------
loss: 0.032235  [   64/22277]
loss: 0.027940  [ 6464/22277]
loss: 0.025921  [12864/22277]
loss: 0.021612  [19264/22277]
Test Error: 
 Accuracy: 0.0%, Avg loss: 0.010670 

Epoch 4
-------------------------------
loss: 0.037443  [   64/22277]
loss: 0.033000  [ 6464/22277]
loss: 0.025183  [12864/22277]
loss: 0.029771  [19264/22277]
Test Error: 
 Accuracy: 0.0%, Avg loss: 0.010624 

Epoch 5
-------------------------------
loss: 0.029586  [   64/22277]
loss: 0.022937  [ 6464/22277]
loss: 0.025752  [12864/22277]
loss: 0.018783  [19264/222

In [35]:
torch.save(model.state_dict(), "model_preprocess.pth")
print("Saved PyTorch Model State to model.pth")

Saved PyTorch Model State to model.pth


Faça o download do seu modelo após o treinamento, caso queira testá-lo no simulador.

O código a seguir demonstra como o modelo será usado para inferência no simulador. Caso seja necessário, altere a função *preprocess*.

In [28]:
def preprocess(x):
    # TODO: se necessário, alterar função
    return x

In [29]:
# model = NeuralNetwork().to(device)
# model.load_state_dict(torch.load("model.pth"))

In [30]:
# model.eval()
# x = test_data[0][0]
# with torch.no_grad():
    # x = preprocess(x)
    # x = x.to(device)
    # pred = model(x)