In [1]:
# Copyright 2023 a1147
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import os
import cv2
import numpy as np
from torch import optim
import torch.nn as nn
from torch.autograd import Variable
from torchvision import transforms
from torchvision.models import resnet18

In [2]:
class MyDataset(Dataset):

    def __init__(self, folder) -> None:
        super(Dataset, self).__init__()
        self.data = []

        for f in os.listdir(f'./{folder}/non'):
            img_path = os.path.join(f'./{folder}/non', f)
            img = cv2.imread(img_path)
            img = img.astype(np.float32) / 255

            mean = 0
            sigma = 0.1
            # 噪声数据
            gauss = np.random.normal(mean,sigma,img.shape)
            noisy_img = img + gauss

            img = np.moveaxis(img, -1, 0)
            noisy_img = np.moveaxis(noisy_img, -1, 0)
            
            self.data.append([img, 0])
            self.data.append([noisy_img.astype(np.float32), 0])
        
        for f in os.listdir(f'./{folder}/white'):
            img_path = os.path.join(f'./{folder}/white', f)
            img = cv2.imread(img_path)
            img = img.astype(np.float32) / 255
            
            mean = 0
            sigma = 0.1
            # 噪声数据
            gauss = np.random.normal(mean,sigma,img.shape)
            noisy_img = img + gauss

            img = np.moveaxis(img, -1, 0)
            noisy_img = np.moveaxis(noisy_img, -1, 0)

            self.data.append([img, 1])
            self.data.append([noisy_img.astype(np.float32), 1])

    def __len__(self) -> int:
        return self.data.__len__()
    
    def __getitem__(self, index) -> (cv2.Mat, int):
        return self.data[index][0], self.data[index][1]

train_dataset = MyDataset('train_datasets')
val_dataset = MyDataset('val_datasets')

KeyboardInterrupt: 

In [28]:
class cnn(nn.Module):
    def __init__(self):
        super(cnn, self).__init__()
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=3,
                out_channels=16,
                kernel_size=3,
                stride=2,
            ),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        #
        self.conv2 = nn.Sequential(
            nn.Conv2d(
                in_channels=16,
                out_channels=32,
                kernel_size=3,
                stride=2,
            ),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        #
        self.conv3 = nn.Sequential(
            nn.Conv2d(
                in_channels=32,
                out_channels=64,
                kernel_size=3,
                stride=2,
            ),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(
                in_channels=64,
                out_channels=16,
                kernel_size=3,
                stride=2,
            ),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.fc1 = nn.Linear(64, 32)
        self.fc2 = nn.Linear(32, 10)
        self.out = nn.Linear(10, 2)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        # print(x.size())
        x = x.contiguous().view(x.shape[0], -1)
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        # x = self.relu(self.fc3(x))
        # x = self.relu(self.fc4(x))
        x = self.out(x)
        # x = F.log_softmax(x, dim=1)
        return x

In [29]:
Dtr = DataLoader(dataset=train_dataset, batch_size=50, shuffle=True, num_workers=0)
Dva = DataLoader(dataset=val_dataset, batch_size=10, shuffle=False, num_workers=0)

In [41]:
from tqdm import tqdm
import copy

def get_val_loss(model, Val):
    model.eval()
    criterion = nn.CrossEntropyLoss()
    val_loss = []
    for (data, target) in Val:
        data, target = Variable(data), Variable(target.long())
        output = model(data)
        loss = criterion(output, target)
        val_loss.append(loss.item())

    return np.mean(val_loss)

def train():
    print('train...')
    epoch_num = 50
    best_model = None
    min_epochs = 20
    min_val_loss = 5
    model = resnet18(num_classes=2)
    optimizer = optim.Adam(model.parameters(), lr=0.0008)
    criterion = nn.CrossEntropyLoss()
    # criterion = nn.BCELoss()
    for epoch in tqdm(range(epoch_num), ascii=True):
        train_loss = []
        for batch_idx, (data, target) in enumerate(Dtr, 0):
            data, target = Variable(data), Variable(target.long())
            # target = target.view(target.shape[0], -1)
            # print(data, target)
            optimizer.zero_grad()
            output = model(data)
            # print(output)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            train_loss.append(loss.cpu().item())
        # validation
        val_loss = get_val_loss(model, Dva)
        model.train()
        if epoch + 1 > min_epochs and val_loss < min_val_loss:
            min_val_loss = val_loss
            best_model = copy.deepcopy(model)

        tqdm.write('Epoch {:03d} train_loss {:.5f} val_loss {:.5f}'.format(epoch, np.mean(train_loss), val_loss))

    torch.save(best_model.state_dict(), "model/cnn.pkl")
    return best_model


In [5]:
def softmax(x):
    """Compute softmax values for each sets of scores in x."""
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum()

In [42]:
if __name__ == '__main__':
    train()

train...


  0%|          | 0/50 [03:07<?, ?it/s]


KeyboardInterrupt: 

In [2]:
import pickle
model = resnet18(num_classes = 2)
model.load_state_dict(torch.load('model/cnn (4).pkl'))
model.eval()

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    )
    (1): BasicBlock(
      (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
  

In [3]:
cap = cv2.VideoCapture(0)

In [6]:
import time
cnt = 0
while(True):
    # Capture frame-by-frame
    ret, frame = cap.read()


    border_v = 0
    border_h = 0
    if (640/640) >= (frame.shape[0]/frame.shape[1]):
        border_v = int((((640/640)*frame.shape[1])-frame.shape[0])/2)
    else:
        border_h = int((((640/640)*frame.shape[0])-frame.shape[1])/2)
    img = cv2.copyMakeBorder(frame, border_v, border_v, border_h, border_h, cv2.BORDER_CONSTANT, 0)
    img = cv2.resize(img, (640, 640))
    cv2.imwrite(f'self-{cnt}.png', img)
    cnt+=1
    time.sleep(1)
    img = cv2.resize(img, (224, 224))

    data = np.moveaxis(img, -1, 0)
    data = np.expand_dims(data, 0)
    # print(data.shape)
    data = torch.Tensor(data.astype(np.float32) / 255)
    result = model(data)
    # print(result[0])
    print(softmax(result[0].detach().numpy()))
    print('weared' if result[0][1] >= result[0][0] else 'not weared!!')

    # Display the resulting frame
    cv2.imshow('frame', img)
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()


searchText = ""

[0.98945785 0.01054218]
not weared!!
[0.9831024  0.01689765]
not weared!!
[0.9762704  0.02372962]
not weared!!
[0.95596844 0.04403152]
not weared!!
[9.9960548e-01 3.9456697e-04]
not weared!!
[9.997193e-01 2.806109e-04]
not weared!!
[9.9957663e-01 4.2332767e-04]
not weared!!
[0.99675    0.00325004]
not weared!!
[9.999620e-01 3.806788e-05]
not weared!!
[9.994103e-01 5.897383e-04]
not weared!!
[0.97281367 0.0271864 ]
not weared!!
[0.8624624  0.13753754]
not weared!!
[9.9963176e-01 3.6826174e-04]
not weared!!
[0.9970367  0.00296326]
not weared!!
[0.91184646 0.08815354]
not weared!!
[0.9293943  0.07060574]
not weared!!
[0.7171148  0.28288516]
not weared!!
[0.99805    0.00195008]
not weared!!
[9.9999082e-01 9.1212405e-06]
not weared!!
[0.9964218  0.00357825]
not weared!!
[0.98893297 0.01106709]
not weared!!
[0.9833604  0.01663957]
not weared!!
[0.9852561  0.01474392]
not weared!!
[0.94979763 0.05020235]
not weared!!
[0.9370366  0.06296349]
not weared!!
[0.94543326 0.0545668 ]
not weared!!
[0

KeyboardInterrupt: 

In [7]:
cap.release()
cv2.destroyAllWindows()

In [None]:
x = torch.randn(1, 3, 640, 640, requires_grad=True)
torch.onnx.export(model, x, "btest.onnx",True, opset_version=10, input_names=['input'], output_names=['output'])