## Import

In [41]:
import random
import pandas as pd
import numpy as np
import os
import re
import glob
import cv2

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import torchvision.models as models

from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.metrics import f1_score
from sklearn.metrics import classification_report
from tqdm.auto import tqdm

import warnings
warnings.filterwarnings(action='ignore') 
import cv2
import cvlib as cv
from cvlib.object_detection import draw_bbox
from PIL import ImageFont, ImageDraw, Image


# 경로 지정

In [None]:
# PATH
DATA_PATH  = '../DATA'
device = torch.device('mps:0' if torch.backends.mps.is_available() else 'cpu')
torch.backends.mps.is_available()
CFG = {
    'IMG_SIZE':224,
    'EPOCHS':5,
    'LEARNING_RATE':3e-4,
    'BATCH_SIZE':32,
    'SEED':41
}

def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

os.chdir(DATA_PATH)

데이터 셋 만들기 
- opencv를 이용해서 webcam을 통해 이미지를 생성
- 저장 형태
-       \____DATA
-           \____face
-               \____{class명(숫자형)}
-                   \____image_files

In [2]:
## Face detection
# open webcam
data_path = 'f_test/0'
# Check if the directory exists
if not os.path.exists(data_path):
    # Create the directory
    os.makedirs(data_path)

webcam = cv2.VideoCapture(0)
if not webcam.isOpened():
    print("Could not open webcam")
    exit()
sample_num = 0
captured_num = 0

# loop through frames
while webcam.isOpened():

    # read frame from webcam
    status, frame = webcam.read()
    sample_num = sample_num + 1

    if not status :
        break
    face, confidence = cv.detect_face(frame)
    print(face)
    print(confidence)
    
    # loop through detected faces
    for idx, f in enumerate(face):
        
        (startX, startY) = f[0], f[1]
        (endX, endY) = f[2], f[3]

        if sample_num % 8 == 0 :
            captured_num = captured_num + 1
            face_in_img = frame[startY:endY, startX:endX, :]
            cv2.imwrite(os.path.join(data_path,"face" + str(captured_num) + ".jpg"), face_in_img)
           

    # display output
    cv2.imshow("captured frames", frame)

    # press "Q" to stop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    elif captured_num == 200:
        break
    
# release resources
webcam.release()
cv2.destroyAllWindows()

[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]


In [None]:
## Face detection
# open webcam
data_path = 'f_test/1'
# Check if the directory exists
if not os.path.exists(data_path):
    # Create the directory
    os.makedirs(data_path)
    
webcam = cv2.VideoCapture(0)
if not webcam.isOpened():
    print("Could not open webcam")
    exit()
sample_num = 0
captured_num = 0

# loop through frames
while webcam.isOpened():

    # read frame from webcam
    status, frame = webcam.read()
    sample_num = sample_num + 1

    if not status :
        break
    face, confidence = cv.detect_face(frame)
    print(face)
    print(confidence)
    
    # loop through detected faces
    for idx, f in enumerate(face):
        
        (startX, startY) = f[0], f[1]
        (endX, endY) = f[2], f[3]

        if sample_num % 8 == 0 :
            captured_num = captured_num + 1
            face_in_img = frame[startY:endY, startX:endX, :]
            cv2.imwrite(os.path.join(data_path,"face" + str(captured_num) + ".jpg"), face_in_img)
           

    # display output
    cv2.imshow("captured frames", frame)

    # press "Q" to stop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break
    elif captured_num == 200:
        break
# release resources
webcam.release()
cv2.destroyAllWindows()

## Fixed RandomSeed

In [24]:
# 데이터 프레임 만들기 
# 이미지 경로, 라벨
all_img_list = glob.glob('/f_test/*/*')

# 데이터셋 디렉토리 경로
dataset_dir = "f_test"

# 모든 이미지 파일 경로 리스트
all_img_list = []
folder_list = []
train_file_list = os.listdir(dataset_dir)
for item in train_file_list:
    if not item.startswith('.'):
        item_path = os.path.join(dataset_dir, item)
        for file in os.listdir(item_path):
            if not file.startswith('.'):
                all_img_list.append(os.path.join(item_path, file))
                folder_list.append(item)

# 데이터 프레임 생성
df = pd.DataFrame(columns=['img_path', 'label'])
df['img_path'] = all_img_list
df['label'] = df['img_path'].apply(lambda x : int(str(x).split('/')[1]))

train, val, _, _ = train_test_split(df, df['label'], test_size=0.2, stratify=df['label'], random_state=CFG['SEED'])

## CustomDataset

In [30]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, transforms=None):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.transforms = transforms
        
    def __getitem__(self, index):
        img_path = self.img_path_list[index]
        
        image = cv2.imread(img_path)
        
        if self.transforms is not None:
            image = self.transforms(image=image)['image']
        
        if self.label_list is not None:
            label = self.label_list[index]
            return image, label
        else:
            return image
        
    def __len__(self):
        return len(self.img_path_list)

In [31]:
train_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

test_transform = A.Compose([
                            A.Resize(CFG['IMG_SIZE'],CFG['IMG_SIZE']),
                            A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225), max_pixel_value=255.0, always_apply=False, p=1.0),
                            ToTensorV2()
                            ])

In [32]:
train_dataset = CustomDataset(train['img_path'].values, train['label'].values, train_transform)
train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val['img_path'].values, val['label'].values, test_transform)
val_loader = DataLoader(val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=0)

## Model Define

In [36]:
class BaseModel(nn.Module):
    def __init__(self, num_classes=len(df['label'].unique())):
        super(BaseModel, self).__init__()
        self.backbone = models.efficientnet_b0(pretrained=True)
        self.classifier = nn.Linear(1000, num_classes)
        
    def forward(self, x):
        x = self.backbone(x)
        x = self.classifier(x)
        return x

## Train

## Run!!

In [38]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    criterion = nn.CrossEntropyLoss().to(device)
    
    best_score = 0
    best_model = None
    
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for imgs, labels in tqdm(train_loader):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            optimizer.zero_grad()
            
            output = model(imgs)
            loss = criterion(output, labels)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
                    
        _val_loss, _val_score = validation(model, criterion, val_loader, device)
        _train_loss = np.mean(train_loss)
        print(f'Epoch [{epoch}], Train Loss: [{_train_loss:.5f}], Val Loss: [{_val_loss:.5f}], Val Weighted F1 Score: [{_val_score:.5f}]')
       
        if scheduler is not None:
            scheduler.step(_val_score)
            
        if best_score < _val_score:
            best_score = _val_score
            best_model = model
            torch.save(best_model.state_dict(), f'best_model.pth')
    
    return best_model

def validation(model, criterion, val_loader, device):
    model.eval()
    val_loss = []
    preds, true_labels = [], []

    with torch.no_grad():
        for imgs, labels in tqdm(val_loader):
            imgs = imgs.float().to(device)
            labels = labels.to(device)
            
            pred = model(imgs)
            
            loss = criterion(pred, labels)
            
            preds += pred.argmax(1).detach().cpu().numpy().tolist()
            true_labels += labels.detach().cpu().numpy().tolist()
            
            val_loss.append(loss.item())
        
        _val_loss = np.mean(val_loss)
        _val_score = f1_score(true_labels, preds, average='weighted')
    
    return _val_loss, _val_score

model = BaseModel(num_classes=len(set(df['label'])))
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)


  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

Epoch [1], Train Loss: [0.14914], Val Loss: [0.01267], Val Weighted F1 Score: [1.00000]


  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

Epoch [2], Train Loss: [0.00249], Val Loss: [0.00014], Val Weighted F1 Score: [1.00000]


  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

Epoch [3], Train Loss: [0.00023], Val Loss: [0.00003], Val Weighted F1 Score: [1.00000]


  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

Epoch [4], Train Loss: [0.00010], Val Loss: [0.00002], Val Weighted F1 Score: [1.00000]
Epoch 00004: reducing learning rate of group 0 to 1.5000e-04.


  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/2 [00:00<?, ?it/s]

Epoch [5], Train Loss: [0.00003], Val Loss: [0.00001], Val Weighted F1 Score: [1.00000]


## Inference run!!

In [40]:
# Load the best model
best_model = BaseModel(num_classes=len(set(df['label'])))
best_model.load_state_dict(torch.load('best_model.pth'))
best_model = best_model.to(device)
best_model.eval()

# Open the webcam
webcam = cv2.VideoCapture(0)

if not webcam.isOpened():
    print("Could not open webcam")
    exit()

# Loop through frames
while webcam.isOpened():

    # Read frame from webcam
    status, frame = webcam.read()
    if not status:
        print('Could not read frame')
        exit()
    
    # Apply face detection
    face, confidence = cv.detect_face(frame)

    # Loop through detected faces
    for idx, f in enumerate(face):

        (startX, startY) = f[0], f[1]
        (endX, endY) = f[2], f[3]

        if 0 <= startX <= frame.shape[1] and 0 <= endX <= frame.shape[1] and 0 <= startY <= frame.shape[0] and 0 <= endY <= frame.shape[0]:

            face_region = frame[startY:endY, startX:endX]

            face_region1 = cv2.resize(face_region, (224, 224), interpolation=cv2.INTER_AREA)

            x = np.array(face_region1)
            x = np.expand_dims(x, axis=0)
            x = np.transpose(x, (0, 3, 1, 2))
            x = torch.from_numpy(x).float()
            x = x.to(device)

            # Inference using the best model
            pred = best_model(x)
            predicted_class = pred.argmax(1).detach().cpu().numpy().tolist()[0]
            print(predicted_class)

            if predicted_class == 1: 
                cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 255, 0), 2)
                Y = startY - 10 if startY - 10 > 10 else startY + 10
                cv2.putText(frame, 'me', (startX, Y), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
            else:
                cv2.rectangle(frame, (startX, startY), (endX, endY), (0, 0, 255), 2)
                Y = startY - 10 if startY - 10 > 10 else startY + 10
                cv2.putText(frame, 'other', (startX, Y), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 0, 255), 2)
                
    # Display output
    cv2.imshow("Real-time face detection", frame)

    # Press "Q" to stop
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

# Release resources
webcam.release()
cv2.destroyAllWindows()


1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
0
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
1
