<a href="https://colab.research.google.com/github/BonggeunJeon/Follow-up/blob/master/Grasp_Prediction_Bonggeun.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Tutorial D2: Grasp Prediction

## Tasks

But first: Copy notebook to the drive (button should be just next to the +Copy +Text)
Goal: Train a custom-made neural network to predict whether a grasp will be successful based on two tactile images. (binary classification)

### Setup

1. Create conda venv and install Tacto on your machine
2. Clone
```
https://github.com/facebookresearch/tacto/
```
3. Call
```
pip install -r tacto/requirements/examples.txt
```
3. Go to **experiments/grasp_stability**
4. Call
```
pip install scipyplot deepdish torch torchvision
```

### Collect data
1. Default script will collects N x 100 samples. How fast is your machine?
  
  a. It's fast and I have over 100GB free:
  ```
  python grasp_data_collection.py
  ```
  b. It is not that fast, and I have less memory:
  ```
  295    print("\rsample {}".format(log.id * log.batch_size + len(log.dataList)),end="")
  296
  297     # print("\rsample {}".format(log.id), end="")
  298
  299    if log.id > 2000: # N is 2000, for N = 100 dataset size is ~5.7GB
            break
  ```

  ```
  112    data = {
  113       "tactileColorL": tactileColorL,
  114       "tactileColorR": tactileColorR,
  115       # "tactileDepthL": tactileDepthL,
  116       # "tactileDepthR": tactileDepthR,
  117       # "visionColor": visionColor,
  118      # "visionDepth": visionDepth,
  119       # "gripForce": gripForce,
  120       # "normalForce": normalForce,
  121       "label": label,
  122   }
  ```

2. Zip **data** folder and upload it to **Google Drive** or directly

## Implement
1. MyModel class
2. Training loop
3. Save model

- Use materials from the previous lectures and PyTorch documentation Links:
- https://colab.research.google.com/drive/1KyrXe6kErnAYTuFKsRnvDhRLS4pBAiXa?usp=sharing
- https://pytorch.org/docs/stable/index.html

# Install dependencies

In [None]:
!pip install deepdish

In [None]:
from google.colab import files
import random
import os

import tqdm

import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import math

import re
import random
import numpy as np
%matplotlib inline

import cv2

import torch
from torchvision.datasets import ImageFolder
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, models, transforms
import torch.nn as nn
import torch.optim as optim

import copy
import h5py

import glob
import os
import time

import deepdish as dd
import numpy as np
import torch
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as T

from torch import nn
from torch.utils.data import Dataset
from torchvision import transforms

# Dataset Class and Loader

In [None]:
class GraspingDataset(Dataset):
    def __init__(self, fileNames, fields=[], transform=None, transformDepth=None):
        self.transform = transform
        self.transformDepth = transformDepth
        self.fileNames = fileNames
        self.fields = fields + ["label"]
        self.numGroup = 100  # data points per file

        self.dataList = None
        self.dataFileID = -1

    def __len__(self):
        return len(self.fileNames * self.numGroup)

    def load_data(self, idx):
        dirName = self.fileNames[idx]
        data = {}

        for k in self.fields:
            fn = dirName.split("/")[-1]

            fnk = "{}_{}.h5".format(fn, k)

            filenamek = os.path.join(dirName, fnk)
            d = dd.io.load(filenamek)

            data[k] = d

        return data

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        fileID = idx // self.numGroup
        if fileID != self.dataFileID:
            self.dataList = self.load_data(fileID)
            self.dataFileID = fileID

        sample = {}

        data = self.dataList

        for k in self.fields:
            d = data[k][idx % self.numGroup]

            if k in ["tactileColorL", "tactileColorR", "visionColor"]:
                d = d[:, :, :3]
                # print(k, d.min(), d.max())

            if k in ["tactileDepthL", "tactileDepthR", "visionDepth"]:
                d = np.dstack([d, d, d])

            if k in ["tactileDepthL", "tactileDepthR"]:
                d = d / 0.002 * 255
                d = np.clip(d, 0, 255).astype(np.uint8)
                # print("depth min", d.min(), "max", d.max())

            if k in ["visionDepth"]:
                d = (d * 255).astype(np.uint8)

            if k in [
                "tactileColorL",
                "tactileColorR",
                "visionColor",
                "visionDepth",
            ]:
                if self.transform:
                    d = self.transform(d)

            if k in [
                "tactileDepthL",
                "tactileDepthR",
            ]:
                # print("before", d.min(), d.max(), d.mean(), d.std())
                d = self.transformDepth(d)
                # d = (d + 2) / 0.05
                # print("after", d.min(), d.max(), d.mean(), d.std())

            sample[k] = d

        return sample

In [None]:
# function that will
# - take all samples from rootDir
# - divide whole set in N parts
# - take i-th for test and rest N-1 for training
def load_data(rootDir, K, i):
        # K-fold, test the i-th fold, train the rest

        fileNames = glob.glob(os.path.join(rootDir, "*"))
        fileNames = sorted(fileNames)
        print(fileNames)

        # Split K fold
        N = len(fileNames)
        n = N // K

        idx = list(range(N))
        testIdx = idx[n * i : n * (i + 1)]
        trainIdx = list(set(idx) - set(testIdx))

        trainFileNames = [fileNames[i] for i in trainIdx]
        testFileNames = [fileNames[i] for i in testIdx]

        trainTransform = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize(256),
                transforms.RandomCrop(224),
                # transforms.RandomHorizontalFlip(p=0.5),
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.5,), std=(0.5,)),
                # AddGaussianNoise(0.0, 0.01),
            ]
        )

        trainTransformDepth = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize(256),
                transforms.RandomCrop(224),
                # transforms.RandomHorizontalFlip(p=0.5),
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.1,), std=(0.2,)),
                # AddGaussianNoise(0.0, 0.01),
            ]
        )

        # Create training dataset and dataloader
        trainDataset = GraspingDataset(
            trainFileNames,
            fields=["tactileColorL", "tactileColorR"],
            transform=trainTransform,
            transformDepth=trainTransformDepth,
        )
        trainLoader = torch.utils.data.DataLoader(
            trainDataset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True
        )

        testTransform = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize(256),
                transforms.RandomCrop(224),
                # transforms.RandomHorizontalFlip(p=0.5),
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.5,), std=(0.5,)),
            ]
        )

        testTransformDepth = transforms.Compose(
            [
                transforms.ToPILImage(),
                transforms.Resize(256),
                transforms.RandomCrop(224),
                # transforms.RandomHorizontalFlip(p=0.5),
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.1,), std=(0.2,)),
                # AddGaussianNoise(0.0, 0.01),
            ]
        )

        # Create training dataset and dataloader
        testDataset = GraspingDataset(
            testFileNames,
            fields=["tactileColorL", "tactileColorR"],
            transform=testTransform,
            transformDepth=testTransformDepth,
        )
        testLoader = torch.utils.data.DataLoader(
            testDataset, batch_size=32, shuffle=False, num_workers=2, pin_memory=True
        )

        return trainLoader, testLoader

# Task I: Get and upload data

```
from google.colab import drive
drive.mount('/content/drive')
```

In [None]:
# I picked 3rd option
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# todo: fix path
!unzip /content/drive/MyDrive/TutorialD/tutorial-d/data.zip


# Task II: Load Dataset
- result: you should see two tactile readings

In [None]:
# todo: check if your data is in the right dir
rootDir = '/content/grasp'

# create dataset loaders (do you know why are we using loaders?)
trainLoader, testLoader = load_data(rootDir, 10, 0)

# show single test example
single_test_batch = next(iter(testLoader))
tactileColorL, tactileColorR, label = single_test_batch['tactileColorL'], single_test_batch['tactileColorR'], single_test_batch['label']

display_reading_index = 0
plt.title('Label:' + str(label[display_reading_index].item()))
plt.imshow(tactileColorL[display_reading_index,:,:,:].T)
plt.show()

plt.title('Label:' + str(label[display_reading_index].item()))
plt.imshow(tactileColorR[display_reading_index,:,:,:].T)
plt.show()

In [None]:
print(single_test_batch['tactileColorL'].shape)

## Task III: Implement custom model and train it
- tip 1: all manipulations are performed on tensors, check tensor shape frequently (i.e. print(inputs.shape))
- you can use Tutorial A for inspiration https://colab.research.google.com/drive/1KyrXe6kErnAYTuFKsRnvDhRLS4pBAiXa?usp=sharing
- one way how it can be done is on slides
- for tensor manipulation https://pytorch.org/docs/stable/index.html
- take single batch and play with it

In [None]:
print(len(testLoader))

In [None]:
from transformers import ViTForImageClassification



In [None]:
# implement network
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.vit = ViTForImageClassification.from_pretrained("google/vit-base-patch16-224", attn_implementation="sdpa", torch_dtype=torch.float16)
        self.vit.classifier = nn.Identity()
        self.classifier = nn.Sequential(
            nn.Linear(self.vit.config.hidden_size * 2, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, 2)
        )

    def forward(self, tactileColorL, tactileColorR):
      featuresL = self.vit(pixel_values=tactileColorL).logits
      featuresR = self.vit(pixel_values=tactileColorR).logits
      combined_features = torch.cat((featuresL, featuresR), dim=1)
      output = self.classifier(combined_features)

      return output

In [None]:
model = MyModel()

print(next(model.parameters()).dtype)

In [None]:
from transformers import AutoImageProcessor

processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224-in21k")

def preprocess_images(image_batch):
    # Convert tensor images to PIL images
    pil_images = [T.ToPILImage()(img) for img in image_batch]
    return processor(images=pil_images, return_tensors="pt")['pixel_values']

In [None]:
for batch in testLoader:
  tactileColorL, tactileColorR, labels = (
      batch['tactileColorL'],
      batch['tactileColorR'],
      batch['label']
  )
  input = preprocess_images(tactileColorL)

print(input.dtype)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

net = MyModel()
net.to(torch.float32).to(device)

loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)

# training loop
epochs = 100

for epoch in range(epochs):
  net.train()
  running_loss = 0.0
  for batch in trainLoader:
    tactileColorL, tactileColorR, labels = (
            batch['tactileColorL'],
            batch['tactileColorR'],
            batch['label']
        )
    tactileColorL, tactileColorR, labels = (
            tactileColorL.to(device),
            tactileColorR.to(device),
            labels.to(device)
        )

    inputL = preprocess_images(tactileColorL).to(device)
    inputR = preprocess_images(tactileColorR).to(device)

    optimizer.zero_grad()

    outputs = net(inputL, inputR)
    loss = loss_function(outputs, labels)

    loss.backward()
    optimizer.step()

    running_loss += loss.item()

  print(f"Epoch {epoch+1}, Loss: {running_loss/len(trainLoader)}")
