In [3]:
import torch as th
import torch.nn as nn
import numpy as np

device = 'cuda' if th.cuda.is_available() else 'cpu'
if device == 'cuda':
    print(f'device name: {th.cuda.get_device_name()}')


device name: NVIDIA GeForce RTX 3070 Laptop GPU


In [9]:
# torch tensor, python list and numpy array
a = [[1,2,3,4,5],[6,7,8,9,10]]
b = np.array(a)
c  = th.randn(1,3)

print(f'list test, size: ({len(a)}, {len(a[0])})')
print(f'np array test, size: {b.shape}')
print(f'tensor test, tensor: {c} size: {c.shape}, {c.size()}')

# o dimensional tensor (scalar)
a = th.tensor(1, dtype=th.float32)
print (f'o dimension shape: {a.shape}, {len(a.shape)}')

import os
folders = os.listdir('../grasp_wrs/src/Vacuum-Grasp/vision_processing/scripts/figures')
print(folders)
folders.sort()
print(folders)

data_count = 42
digit = 5
data_name = str(data_count).zfill(digit)
print(f'data name: {data_name}')

list test, size: (2, 5)
np array test, size: (2, 5)
tensor test, tensor: tensor([[ 0.6479, -0.9443, -1.3357]]) size: torch.Size([1, 3]), torch.Size([1, 3])
o dimension shape: torch.Size([]), 0
['10.png', '8.png', '6.png', '7.png', '5.png', 'side_2.png', '3.png', '4.png', '2.png', 'side_1.png', '1.png', '9.png']
['1.png', '10.png', '2.png', '3.png', '4.png', '5.png', '6.png', '7.png', '8.png', '9.png', 'side_1.png', 'side_2.png']
data name: 00042


In [24]:
# torch indexing
a = th.randn(3,2,2,4,3,4)
print(a.shape)
print(a[:2, ..., :, :2].shape)

torch.Size([3, 2, 2, 4, 3, 4])
torch.Size([2, 2, 2, 4, 3, 2])


In [25]:
# torch creation
np_arr = np.array([1,2,3])
a = th.from_numpy(np_arr) # share data with original numpy array, th.as_tensor(np_array) also shallow copy
print(f'a: {a}, {a.shape}')
a = th.tensor(np_arr) # or a = th.tensor([1,2,3]), do not share, do deep copy
print(f'a: {a}, {a.shape}')

# create by shape
b = th.FloatTensor(3,4)
print(f'b: {b}, {b.shape}')
b = th.empty(3,4)
print(f'b: {b}, {b.shape}')
b = th.eye(3,3)
print(f'b: {b}, {b.shape}')
b = th.ones(4,5)
print(f'b: {b}, {b.shape}')

# random number tensor
c = th.randn(3, 3)
print(f'c: {c}, {c.shape}')
d = th.rand_like(c)
print(f'd: {d}, {d.shape}')


# some pattern generation
print(th.arange(0,10)) # [start, end). end is excluded
print(th.arange(0,10,2))
print(th.linspace(0,10,steps=3)) # linspace 等差数列
print(th.linspace(0,10,steps=11))
print(th.logspace(0,-1,steps=10)) # 在10的指数上做等差数列，前两个参数是[start, end]
print(th.logspace(0,1,steps=10))


a: tensor([1, 2, 3]), torch.Size([3])
a: tensor([1, 2, 3]), torch.Size([3])
b: tensor([[ 6.2766e+05,  4.5685e-41,  6.2766e+05,  4.5685e-41],
        [ 6.7847e-01,  7.6067e-01,  1.8575e+00, -7.5308e-02],
        [ 1.3892e+00, -5.1013e-01,  2.6076e-01, -2.9725e-01]]), torch.Size([3, 4])
b: tensor([[6.2766e+05, 4.5685e-41, 3.9551e-35, 0.0000e+00],
        [0.0000e+00, 0.0000e+00, 0.0000e+00, 0.0000e+00],
        [0.0000e+00, 0.0000e+00, 8.4078e-45, 0.0000e+00]]), torch.Size([3, 4])
b: tensor([[1., 0., 0.],
        [0., 1., 0.],
        [0., 0., 1.]]), torch.Size([3, 3])
b: tensor([[1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.],
        [1., 1., 1., 1., 1.]]), torch.Size([4, 5])
c: tensor([[ 0.2738, -0.0883, -2.0356],
        [ 0.1819,  0.8304, -0.5378],
        [-0.0831, -0.6756,  0.2318]]), torch.Size([3, 3])
d: tensor([[0.8520, 0.7556, 0.8177],
        [0.1698, 0.8037, 0.3213],
        [0.3510, 0.8716, 0.4748]]), torch.Size([3, 3])
tensor([0, 1, 2, 3, 4

In [24]:
# torch shape manipulation
## adding/deleting one dimension with size one
x = th.randn(1,2,1,2,2,1)
print(f'x.shape: {x.shape}')
x = x.squeeze(dim=0)
print(f'x.shape: {x.shape}')
x = x.unsqueeze(dim=0)
print(f'x.shape: {x.shape}')
x = x.squeeze() # no parameter indicates which dimension to squeeze means squeeze all dimension equals to one
print(f'x.shape: {x.shape}')

## reshape， view方法和reshape方法有同样的功能呢个但是view有局限性
## view方法必须要满足tensor的contigous性质，可以调用.contigous方法去重构数据并开辟新的空间(deep copy)
## 如不需要deep copy可以无脑使用reshape方法
y = th.randn(3,4)
print(f'y.shape: {y.shape}')
y = y.reshape(2,6) # y = th.reshape(y, (2,6))
print(f'y.shape: {y.shape}')
y = th.reshape(y, (2,6))
print(f'y.shape: {y.shape}')
y = th.randn(2,3,4)
y = y.permute(2,1,0)
print(f'y.shape: {y.shape}')

x.shape: torch.Size([1, 2, 1, 2, 2, 1])
x.shape: torch.Size([2, 1, 2, 2, 1])
x.shape: torch.Size([1, 2, 1, 2, 2, 1])
x.shape: torch.Size([2, 2, 2])
y.shape: torch.Size([3, 4])
y.shape: torch.Size([2, 6])
y.shape: torch.Size([2, 6])
y.shape: torch.Size([4, 3, 2])


In [25]:
# torch stacking and cat
x = th.randn(2,3)
print(x.shape)
concated_x = th.cat((x,x,x),dim=0)
print(f'0-dim cat: {concated_x.shape}')
concated_x = th.cat((x,x,x),dim=1)
print(f'1-dim cat: {concated_x.shape}')

torch.Size([2, 3])
0-dim cat: torch.Size([6, 3])
1-dim cat: torch.Size([2, 9])


In [40]:
# Tensor赋值，切片， 变形等等都是进行的shallow copy 如需deep copy请使用clone()的成员函数
# tensor clone and detach usage
x = th.randn(3,4,5)
clone_x = x.clone() # deep copy of x but will be treated as intermediate variable during gradient descend
detach_x = x.detach() # shallow copy, will not get involved with gradient calculation
clone_detach_x = x.clone().detach() #


# test 
y = th.randn(1,2,1,2,1, requires_grad=True)
out = y.sigmoid()
print(out,out.shape)
out_clone = out.clone().squeeze()
print(out_clone)
detach_out = out.detach()
print(detach_out, detach_out.shape)
squezee_out = detach_out.squeeze()
print(squezee_out, squezee_out.shape)

tensor([[[[[0.5375],
           [0.5653]]],


         [[[0.2496],
           [0.4502]]]]], grad_fn=<SigmoidBackward0>) torch.Size([1, 2, 1, 2, 1])
tensor([[0.5375, 0.5653],
        [0.2496, 0.4502]], grad_fn=<SqueezeBackward0>)
tensor([[[[[0.5375],
           [0.5653]]],


         [[[0.2496],
           [0.4502]]]]]) torch.Size([1, 2, 1, 2, 1])
tensor([[0.5375, 0.5653],
        [0.2496, 0.4502]]) torch.Size([2, 2])


In [5]:
# torch logistic selection by api
x = th.tensor([1, 2, 3, 4])
y = th.tensor([5, 6, 7, 8])
condition = th.tensor([True, False, True, False])

# torch.where()函数根据条件选择元素
result = th.where(condition, x, y) #  true select the first array, false select the second array
condition = x < 3
compa_result = th.where(condition, x, y) #  result = [1,2,7,8]

print(result) #  tensor([1, 6, 3, 8])

# gather api for index mapping
# 创建示例输入张量
input = th.tensor([[1, 2], [3, 4], [5, 6]])

# 创建索引张量
index = th.tensor([[0, 1], [1, 0]])

# 在维度 1 上进行索引收集
output = th.gather(input, 1, index)  # 在一维数据进行钱两行数据的mapping 

print(output)


tensor([1, 6, 3, 8])
tensor([[1, 2],
        [4, 3]])


In [13]:
# Data set
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.transforms import transforms
from torchvision.datasets import ImageFolder

class CustomDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        super().__init__(data, labels)
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        x = self.data[index]
        if self.transform:
            x = self.transform(x)
        y = self.labels[index]
        return x, y

# Reading from some directory to get the data and labels 
data = []
labels = []
# data augmentation for images
transform = transforms.Compose([
    transforms.RandomCrop(size=224),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=15),
    transforms.RandomErasing(),
    transforms.GaussianBlur(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
    transforms.ToTensor(),
])

myDataset = CustomDataset(data, labels, transform=transform)
batch_size = 64
loader = DataLoader(myDataset, 64, True)

# get one batch data
x, y = iter(loader)
print(f'size of x,y: {(x.shape, y.shape)}') # data shape should be torch tensor (batch_size, (data_size)), (batch_size, (label_size))
# normal training procedure
num_epoches = 1000
for i in range(num_epoches):
    # for one epoach training (Used all data inside dataset for one time inside the iteration)
    # each data and label are one batch data 
    for curr_data, true_label in loader:
        # train
        pass

# split dataset to train and test set
data_length = myDataset.__len__()
train_ratio = 0.8
train_length = int(train_ratio * data_length)
test_length = data_length - train_length

train_dataset, test_dataset = random_split(myDataset, [train_length , test_length])
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)




NameError: name 'transform' is not defined

In [14]:
# model architecture 
import torch.nn as nn
import argparse
import tqdm

def cal_cnn_size(in_size, kernel_size, stride=1, padding=1):
    # if result is not integer, use // operation to do a floor operation to get an int
    return (in_size - kernel_size + 2 * padding) // stride + 1

class CustomNet(nn.Module):
    def __init__(self, input_shape, feature_size) -> None:
        super().__init__()
        cnn_output_channel = 32
        self.cnn_module = nn.Sequential(
            nn.Conv2d(4, cnn_output_channel, kernel_size=7, stride=2),
            nn.ReLU(),
            nn.MaxPool2d(5, stride=2),
            nn.Flatten()
        )
        cnn_output_size = cnn_output_channel * cal_cnn_size(input_shape[1], 7, 2, 1) * cal_cnn_size(input_shape[2], 7, 2, 1)
        self.lstm = nn.LSTM(cnn_output_size, 1024, num_layers=3)
        self.mlp = nn.Sequential(
            nn.Linear(1024, feature_size),
            nn.Softmax()
        )

    def forward(self, x):
        # forward pass
        cnn_res = self.cnn_module(x)
        lstm_res = self.lstm(cnn_res)
        out = self.mlp(lstm_res)
        return out
    
net = CustomNet()
# To achieve we training, we still need optimizer to perform gradient descent for updating nn weights
parser = argparse.ArgumentParser(description="learning hyperparameters")
parser.add_argument('--learning_rate', '-lr', type=float, default=0.001, help="learning rate")
args = parser.parse_args()
optimizer = th.optim.Adam(net.parameters, lr=args.lr)
criterion = nn.MSELoss()

for i in range(num_epoches):
    for idx, data in tqdm(enumerate(myDataset)):
        input = data.unsqueeze(dim=2).float()
        true_output = input.clone() #  单步预测

        optimizer.zero_grad()
        model_out = net(input)
        loss = criterion(model_out, true_output)
        loss.backward()
        optimizer.step()




TypeError: CustomNet.__init__() missing 2 required positional arguments: 'input_shape' and 'feature_size'

In [18]:
# obtain the feature map from the pre-trained resenet model
import torchvision.models as models

resnet = models.resnet50(pretrained=True)
resnet_wo_fc = nn.Sequential(*list(resnet.children())[:-1]) # remove the last fc layer
# print(resnet)
print(resnet_wo_fc)

Sequential(
  (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (2): ReLU(inplace=True)
  (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (4): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
