# **탈모 진행 단계별 모니터링 시스템;**
### **두피와 머리카락 분석을 통한 추적 관리 및 예측 모델링**

##### **Ⅰ. 모델 개요**
##### **Ⅱ. 모델 구축**
##### **Ⅲ. (DL) 모델 경량화**
##### **Ⅳ. 모델 배포**&ensp;(실시간 처리 방식 / 이미지 업로드 방식)

### **Ⅰ. 모델 개요**

##### **`FUNCTION 1` : 탈모 상태 분류 (4개)**
&ensp;&ensp;✓&ensp;DL&ensp;&rarr;&ensp;<u>CNN</u>  
&ensp;&ensp;✓&ensp;MODEL&ensp;&rarr;&ensp;<u>EFFICIENTNET_V2_S</u>  
&ensp;&ensp;✓&ensp;경량화

##### **`FUNCTION 2` : 두피 면적과 머리카락 면적의 비율**
&ensp;&ensp;✓&ensp;ML&ensp;&rarr;&ensp;<u>Color Segmentation</u>  
&ensp;&ensp;✓&ensp;MODEL&ensp;&rarr;&ensp;<u>K-MEANS CLUSTERING</u>

##### **`FUNCTION 3` : 모공 하나 당 머리카락 개수**
&ensp;&ensp;✓&ensp;DL&ensp;&rarr;&ensp;<u>Object Detection</u>  
&ensp;&ensp;✓&ensp;MODEL&ensp;&rarr;&ensp;<u>YOLO 11N</u>  
&ensp;&ensp;✓&ensp;경량화

### **Ⅱ. 모델 구축**

#### **(FUNCTION 1) EFFICIENTNET_V2_S**

##### **환경 _ 설정**

In [None]:
from google.colab import drive
from google.colab import files
import zipfile
import os
from PIL import Image
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from pathlib import Path
import matplotlib.pyplot as plt
import torch
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random

In [None]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True

set_seed(15)

g = torch.Generator().manual_seed(15)

##### **데이터 _ 준비**

In [None]:
drive.mount('/content/drive')

In [None]:
if not os.path.exists('condition'):
    path = '/content/drive/MyDrive/dataset/hair_condition.zip'
    f_zip = zipfile.ZipFile(path)
    f_zip.extractall('condition')
    f_zip.close()

##### **데이터 _ 전처리**

In [None]:
batch_size = 16

img_height = 224
img_width = 224

train_transforms = transforms.Compose([
    transforms.RandomResizedCrop((img_height, img_width)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness = 0.2),
    transforms.RandomRotation(degrees = (0, 180)),
    transforms.ToTensor(),
    transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
])
val_transforms = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.CenterCrop((img_height, img_width)),
    transforms.ToTensor(),
    transforms.Normalize(mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225])
])

class CustomImageDataset(Dataset):

    def __init__(self, img_dir, transform = None):

        self.transform = transform

        root = Path(img_dir)
        self.paths = list(root.glob('*/*'))

        self.class_name = ['normal', 'mild', 'moderate', 'severe']
        self.cindex = {'normal' : 0, 'mild' : 1, 'moderate' : 2, 'severe' : 3}

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, idx):

        path = self.paths[idx]
        label = path.parts[-2]
        image = Image.open(path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        return image, self.cindex[label]

train_set = CustomImageDataset('./condition/training', transform = train_transforms)
val_set = CustomImageDataset('./condition/testing', transform = val_transforms)

class_names = train_set.class_name

train_loader = DataLoader(train_set, batch_size = batch_size, shuffle = True, generator = g)
val_loader = DataLoader(val_set, batch_size = batch_size, shuffle = False, generator = g)

for feature, label in train_loader:

    print(feature.shape)

    for i in range(4):
        
        ax = plt.subplot(2, 2, i + 1)

        plt.imshow(feature[i].permute((1, 2, 0)))
        plt.title(class_names[label[i]])
        plt.axis("off")

    break

##### **모델 _ 정의**

In [None]:
model = models.efficientnet_v2_s(weights = 'IMAGENET1K_V1')

print(model.classifier)

In [None]:
class EFCNN(nn.Module):

    def __init__(self, num_classes = 4, dropout = 0.3):

        super(EFCNN, self).__init__()

        self.model = model = models.efficientnet_v2_s(weights = 'IMAGENET1K_V1')
        nn_feature = self.model.classifier[1].in_features
        self.model.classifier = nn.Identity()

        self.fc1 = nn.Linear(nn_feature, 128)
        self.bn1 = nn.BatchNorm1d(128)
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(128, num_classes)
        self.bn2 = nn.BatchNorm1d(num_classes)
        self.softmax = nn.Softmax(dim = 1)

    def forward(self, x):
        
        x = self.model(x)

        x = self.fc1(x)
        x = self.bn1(x)
        x = torch.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.bn2(x)
        x = self.softmax(x)
        
        return x

##### **모델 _ 학습**

In [None]:
model_name = 'hlc_model'

epochs = 100

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EFCNN(dropout = 0.5).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = 0.00001, weight_decay = 1e-4)

loss_fn = nn.CrossEntropyLoss()

scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = 'min', factor = 0.5, patience = 5, verbose = True)

if not os.path.isdir('/checkpoint'):
    os.mkdir('/checkpoint')

history = {'train_loss' : [], 'val_loss' : [], 'train_acc' : [], 'val_acc' : []}

for epoch in range(epochs):
    
    t_loss = 0
    t_acc = 0

    model.train()

    for i, (images, labels) in enumerate(train_loader):

        images = images.to(device)
        labels = labels.to(device)

        outputs = model(images)
        loss = loss_fn(outputs, labels)

        optimizer.zero_grad()

        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm = 1.0)
        
        optimizer.step()

        acc = (outputs.argmax(dim = 1) == labels).sum() / len(labels)

        t_loss += loss.item()
        t_acc += acc

        if i % 10 == 0:
          print('.', end = '')

    print(f"Epoch : {epoch}, Loss : {t_loss / (i+1)}, Accuracy : {t_acc / (i+1)}")

    history['train_loss'].append(t_loss / len(train_loader))
    history['train_acc'].append((t_acc / len(train_loader)).cpu().numpy())

    checkpoint_path = f"/checkpoint/{model_name}_cp{epoch}.pt"

    torch.save(model.state_dict(), checkpoint_path)

    v_loss = 0
    v_acc = 0

    model.eval()

    with torch.no_grad():

        for i, (images, labels) in enumerate(val_loader):

            images = images.to(device)
            labels = labels.to(device)

            outputs = model(images)
            loss = loss_fn(outputs, labels)

            acc = (outputs.argmax(dim = 1) == labels).sum() / len(labels)

            v_acc += acc
            v_loss += loss.item()

        print(f"Epoch : {epoch}, Val_Loss : {v_loss / (i+1)}, Val_Accuracy : {v_acc / (i+1)}")

        history['val_loss'].append(v_loss / len(val_loader))
        history['val_acc'].append((v_acc / len(val_loader)).cpu().numpy())

        scheduler.step(v_loss / len(val_loader))

os.makedirs('ptmodel', exist_ok = True)
torch.save(model, f"ptmodel/{model_name}.pt")

##### **모델 _ 검증**

In [None]:
loss = history['train_loss']
acc = history['train_acc']
val_loss = history['val_loss']
val_acc = history['val_acc']

eps = range(len(val_loss))

fig = plt.figure(figsize = (10, 5))

ax1 = fig.add_subplot(1, 2, 1)
ax1.plot(eps, loss, label = 'train_loss')
ax1.plot(eps, val_loss, label = 'val_loss')
ax1.legend()

ax2 = fig.add_subplot(1, 2, 2)
ax2.plot(eps, acc, label = 'train_acc')
ax2.plot(eps, val_acc, label = 'val_acc')
ax2.legend()

plt.show()

In [None]:
best_val_acc_epoch = np.argmax(history['val_acc'])

best_val_acc = history['val_acc'][best_val_acc_epoch]
best_val_loss = history['val_loss'][best_val_acc_epoch]

print(f"Best Epoch For Validation Accuracy : {best_val_acc_epoch}")
print(f"Best Validation Accuracy : {best_val_acc}")
print(f"Best Validation Loss : {best_val_loss}")

In [None]:
best_val_loss_epoch = np.argmin(history['val_loss'])

best_val_loss = history['val_loss'][best_val_loss_epoch]
best_val_acc = history['val_acc'][best_val_loss_epoch]

print(f"Best Epoch For Validation Loss : {best_val_loss_epoch}")
print(f"Best Validation Loss : {best_val_loss}")
print(f"Best Validation Accuracy : {best_val_acc}")

In [None]:
normalized_acc = (history['val_acc'] - np.min(history['val_acc'])) / (np.max(history['val_acc']) - np.min(history['val_acc']))
normalized_loss = (history['val_loss'] - np.min(history['val_loss'])) / (np.max(history['val_loss']) - np.min(history['val_loss']))

score = normalized_acc + (1 - normalized_loss)

best_epoch = np.argmax(score)

best_val_acc = history['val_acc'][best_epoch]
best_val_loss = history['val_loss'][best_epoch]

print(f"Best Epoch (Considering Both) : {best_epoch}")
print(f"Best Validation Accuracy : {best_val_acc}")
print(f"Best Validation Loss : {best_val_loss}")

##### **모델 _ 저장**

In [None]:
files.download('/content/ptmodel/hlc_model.pt')

In [None]:
files.download('/checkpoint/hlc_model_cp92.pt')

#### **(FUNCTION 2) K-MEANS CLUSTERING**

##### **환경 _ 설정**

##### **데이터 _ 준비**

##### **데이터 _ 전처리**

##### **모델 _ 정의**

##### **모델 _ 학습**

##### **모델 _ 평가**

#### **(FUNCTION 3) YOLO11N**

##### **환경 _ 설정**

In [None]:
!pip install ultralytics

In [None]:
from google.colab import drive
from google.colab import files
import zipfile
import os
from ultralytics import YOLO
from pathlib import Path
import matplotlib.pyplot as plt
from PIL import Image

##### **데이터 _ 준비**

In [None]:
drive.mount('/content/drive')

In [None]:
if not os.path.exists('hairloss_data'):
  path = '/content/drive/MyDrive/dataset/hairloss.zip'
  f_zip = zipfile.ZipFile(path)
  f_zip.extractall('hairloss_data')
  f_zip.close()

##### **모델 _ 정의 & 학습**

In [None]:
model = YOLO("yolo11n.pt")

model.train(data = 'hairloss_data/hairloss.yaml', epochs = 300, project = 'hairloss_od', name = 'hairloss_model')

##### **모델 _ 저장**

In [None]:
files.download('hairloss_od/hairloss_model/weights/best.pt')

##### **모델 _ 추론**

In [None]:
model = YOLO("hairloss_od/hairloss_model/weights/best.pt")

root = Path("hairloss_data/Validation/images")
paths = list(root.glob('*'))

plt.figure(figsize = (20, 20))

i = 0

for path in paths[:9]:
  
  results = model(path)
  im_array = results[0].plot()
  im = Image.fromarray(im_array[..., ::-1])

  i += 1

  ax = plt.subplot(3, 3, i)
  ax.imshow(im)

plt.show()

### **Ⅲ. (DL) 모델 경량화**

#### **(FUNCTION 1) EFFICIENTNET_V2_S**

In [None]:
!pip install sng4onnx onnx onnxruntime onnx_graphsurgeon onnxslim onnx2tf

In [None]:
from google.colab import drive
from google.colab import files
import torch
import tensorflow as tf

In [None]:
drive.mount('/content/drive')

In [None]:
model.load_state_dict(torch.load('/checkpoint/cphlc_model92.pt', map_location = 'cpu'))
model = model.to('cpu')

dummy_input = torch.randn(1, 3, 224, 224)

print(dummy_input)

model.eval()

torch.onnx.export(model, dummy_input, "/hlc_model.onnx", export_params = True, opset_version = 18, input_names = ['input'], output_names = ['output'], dynamic_axes = {'input' : {0 : 'batch_size'}, 'output' : {0 : 'batch_size'}})

In [None]:
!onnx2tf -i /hlc_model.onnx

In [None]:
files.download("saved_model/hlc_model_float16.tflite")
files.download("saved_model/hlc_model_float32.tflite")

In [None]:
converter = tf.lite.TFLiteConverter.from_saved_model("/content/drive/MyDrive/saved_model")

converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,
    tf.lite.OpsSet.SELECT_TF_OPS
]
converter.optimizations = [tf.lite.Optimize.DEFAULT]
converter.target_spec.supported_types = [tf.float16]
converter.experimental_new_converter = True

tflite_model = converter.convert()

with open("hlcM_torchQ16.tflite", "wb") as f:
    f.write(tflite_model)

files.download("hlcM_torchQ16.tflite")

#### **(FUNCTION 3) YOLO11N**

In [None]:
!pip install ultralytics onnx2tf onnx onnxruntime tflite_support 'sng4onnx>=1.0.1', 'onnx_graphsurgeon>=0.3.26', 'onnx2tf>1.17.5,<=1.26.3', 'onnxslim>=0.1.31'

In [None]:
from ultralytics import YOLO
from google.colab import files

In [None]:
model = YOLO("best.pt")

model.export(format = 'tflite')

In [None]:
files.download('best_saved_model/best_float32.tflite')

### **Ⅳ. 모델 배포**

#### **1. 실시간 처리 방식 (BY Raspberry Pi)**

##### **1-1. FUNCTION 1 + FUNCTION 3**

In [None]:
import cv2
import tflite_runtime.interpreter as tflite
import time
import threading
from collections import deque
import numpy as np

classification_names = ['normal', 'mild', 'moderate', 'severe']
detection_names = ['1hair', '2hair', '3hair', '4hair']

classification_model_path = 'hlcM_torchQ16.tflite'
detection_model_path = 'yolo300_float32.tflite'

classification_interpreter = tflite.Interpreter(model_path = classification_model_path)
classification_interpreter.allocate_tensors()
class_input_details = classification_interpreter.get_input_details()
class_output_details = classification_interpreter.get_output_details()

detection_interpreter = tflite.Interpreter(model_path = detection_model_path)
detection_interpreter.allocate_tensors()
detect_input_details = detection_interpreter.get_input_details()
detect_output_details = detection_interpreter.get_output_details()

detect_img_height = detect_input_details[0]['shape'][1]
detect_img_width = detect_input_details[0]['shape'][2]

def iou(box1, box2):

    x1_min, y1_min, x1_max, y1_max = (
        box1[0] - box1[2] / 2, box1[1] - box1[3] / 2,
        box1[0] + box1[2] / 2, box1[1] + box1[3] / 2
    )
    x2_min, y2_min, x2_max, y2_max = (
        box2[0] - box2[2] / 2, box2[1] - box2[3] / 2,
        box2[0] + box2[2] / 2, box2[1] + box2[3] / 2
    )

    inter_x1 = max(x1_min, x2_min)
    inter_y1 = max(y1_min, y2_min)
    inter_x2 = min(x1_max, x2_max)
    inter_y2 = min(y1_max, y2_max)

    inter_area = max(0, inter_x2 - inter_x1) * max(0, inter_y2 - inter_y1)
    box1_area = (x1_max - x1_min) * (y1_max - y1_min)
    box2_area = (x2_max - x2_min) * (y2_max - y2_min)
    union_area = box1_area + box2_area - inter_area

    return inter_area / union_area if union_area > 0 else 0

def merge_boxes(boxes, scores, classes, iou_threshold = 0.4):

    indices = cv2.dnn.NMSBoxes(boxes, scores, score_threshold = 0.2, nms_threshold = iou_threshold)
    
    merged_result = []

    if len(indices) > 0:
        for i in indices.flatten():
            merged_result.append((np.array(boxes[i]), scores[i], classes[i]))
    
    return merged_result

def imgproc(img_q, result_q):
    while True:

        if not img_q:
            continue

        image = img_q.popleft()

        if image is None:
            break

        image_resized = cv2.resize(image, (detect_img_width, detect_img_height))
        image_resized = cv2.cvtColor(image_resized, cv2.COLOR_BGR2RGB)

        image_np = np.array(image_resized, dtype = np.float32) / 255.0
        image_np = np.expand_dims(image_np, axis = 0)

        detection_interpreter.set_tensor(detect_input_details[0]['index'], image_np)
        detection_interpreter.invoke()

        output = detection_interpreter.get_tensor(detect_output_details[0]['index'])[0].T

        boxes_xywh = output[:, :4]
        scores = np.max(output[:, 4:], axis = 1)
        classes = np.argmax(output[:, 4:], axis = 1)
        
        threshold = 0.2

        filtered_boxes, filtered_scores, filtered_classes = [], [], []
        
        for i, s in enumerate(scores):
            if s > threshold:
                filtered_boxes.append(boxes_xywh[i].tolist())
                filtered_scores.append(float(s))
                filtered_classes.append(int(classes[i]))

        merged_result = merge_boxes(filtered_boxes, filtered_scores, filtered_classes)
        result_q.append(merged_result)

        time.sleep(0.01)

cap = cv2.VideoCapture(0)

_, frame = cap.read()
h, w, c = frame.shape
cen, ofs = w // 2, h // 2
left, right = cen - ofs, cen + ofs

img_q, result_q = deque(maxlen = 3), deque(maxlen = 10)

detect_thread = threading.Thread(target = imgproc, args = (img_q, result_q))
detect_thread.start()

while True:

    ret, frame = cap.read()
    image = frame[:, left:right]
    img_q.append(image)
    
    class_image = cv2.resize(image, (224, 224))
    class_image = cv2.cvtColor(class_image, cv2.COLOR_BGR2RGB)
    class_image = np.array(class_image, dtype = np.float32)
    class_image = np.expand_dims(class_image, axis = 0)
    
    classification_interpreter.set_tensor(class_input_details[0]['index'], class_image)
    classification_interpreter.invoke()
    class_output = classification_interpreter.get_tensor(class_output_details[0]['index'])
    class_pred = np.argmax(class_output[0])
    class_label = classification_names[class_pred]
    
    detection_colors = {0 : (255, 0, 0), 1 : (0, 255, 0), 2 : (0, 0, 255), 3 : (255, 255, 0)}
    
    if result_q:

        detected_objects = result_q.popleft()

        for box, score, cls in detected_objects:
            
            box = np.array(box)

            x_center, y_center, width, height = box * np.array([w, h, w, h])

            x1 = int(x_center - width / 2)
            y1 = int(y_center - height / 2)
            x2 = int(x_center + width / 2)
            y2 = int(y_center + height / 2)
        
            color = detection_colors.get(cls, (255, 255, 255))

            cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)

            text = f"{detection_names[cls]} ({score : .2f})"
            
            cv2.putText(image, text, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)    

    cv2.putText(image, f"HL Condition : {class_label}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 165, 255), 2)
    
    image_resized = cv2.resize(image, (w * 2, h * 2))

    cv2.imshow('FUNCTION13', image_resized)

    key = cv2.waitKey(1) & 0xFF
    
    if key == ord('q'):
        img_q.append(None)
        break

cap.release()
cv2.destroyAllWindows()

##### **1-2. FUNCTION 2**

In [None]:
import cv2
import numpy as np
from sklearn.cluster import KMeans

cap = cv2.VideoCapture(0)

while True:

    ret, frame = cap.read()

    if not ret:
        print("카메라에서 프레임을 읽을 수 없습니다.")
        break
    
    frame_resized = cv2.resize(frame, (320, 240))
    image_lab = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2LAB)
    pixels = image_lab.reshape((-1, 3))
    kmeans = KMeans(n_clusters = 2, random_state = 45, n_init = 10)
    labels = kmeans.fit_predict(pixels)
    segmented_image = labels.reshape(frame_resized.shape[:2])

    hair_pixels = np.sum(labels == 0)
    scalp_pixels = np.sum(labels == 1)
    total_pixels = hair_pixels + scalp_pixels

    hair_ratio = (hair_pixels / total_pixels) * 100
    scalp_ratio = (scalp_pixels / total_pixels) * 100

    segmented_colored = np.zeros_like(frame_resized)
    segmented_colored[segmented_image == 0] = [0, 0, 0]    # 머리카락
    segmented_colored[segmented_image == 1] = [255, 255, 255]    # 두피

    text = f"Hair : {hair_ratio : .2f}% | Scalp : {scalp_ratio : .2f}%"
    
    cv2.putText(segmented_colored, text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 165, 255), 2)

    cv2.imshow("FUNCTION2", segmented_colored)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

#### **2. 이미지 업로드 방식 (BY Flask)**

##### **2-1. FRONT-END**

##### **2-2. BACK-END**