# Import

In [1]:
import os
from torch.utils.data import Dataset
from PIL import Image
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt
from PIL import Image
from torch.utils.data import Dataset, DataLoader, random_split
import numpy as np
import random
import cv2
from time import time
import copy
import pandas as pd
from tqdm import tqdm
from gtts import gTTS
import playsound
import time
from torchvision import transforms
# import pyttsx3
import torch_pruning as tp
from torchinfo import summary
import time
import thop  



In [None]:
# pip install torch_pruning
# pip install playsound
# pip install opencv-python
# pip install torchsummary
# pip install thop
# pip install torchinfo

In [2]:
# Dataset_Class
class PyTorch_Custom_Dataset_Class(Dataset):
    def __init__(self):
        super().__init__()
        pass
    def __getitem__(self, idx):
        pass
    def __len__(self):
        pass
        
        
class PyTorch_Classification_Dataset_Class(Dataset):
    def __init__(self, dataset_dir="/home/jetson/Downloads/project/Recycle_Classification_Dataset"):
        super().__init__()
        self.image_abs_path = os.path.abspath(dataset_dir)
        self.label_list = [d for d in os.listdir(self.image_abs_path) if os.path.isdir(os.path.join(self.image_abs_path, d))]
        self.label_list.sort()
        self.x_list = []  
        self.y_list = []  
        for label_index, label_str in enumerate(self.label_list):
            img_path = os.path.join(self.image_abs_path, label_str)
            img_list = [f for f in os.listdir(img_path) if os.path.isfile(os.path.join(img_path, f))]
            for img in img_list:
                self.x_list.append(os.path.join(img_path, img))
                self.y_list.append(label_index)

        self.transform = transforms.Compose([
            transforms.Resize((128, 128)), 
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) 
        ])

    def __len__(self):
        return len(self.x_list)

    def __getitem__(self, idx):
        img_path = self.x_list[idx] 
        label = self.y_list[idx] 

        image = Image.open(img_path).convert("RGB") 
        image_tensor = self.transform(image)

        return image_tensor, label

    def __save_label_map__(self, dst_text_path="label_map.txt"):
        label_list = self.label_list
        with open(dst_text_path, 'w') as f:
            for label in label_list:
                f.write(f"{label}\n")

    def __num_classes__(self):
        return len(self.label_list)

In [None]:
# Model_Class_From_the_Scratch
import torch
import torch.nn as nn
import torch.nn.functional as F

class PyTorch_Custom_Model_Class(nn.Module):
    def __init__(self):
        super().__init__()
        pass
    
    def forward(self, x):
        return x

class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.global_avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction),
            nn.ReLU(),  
            nn.Linear(channels // reduction, channels),
            nn.Sigmoid()
        )

    def forward(self, x):
        batch, channels, _, _ = x.size()
        y = self.global_avg_pool(x).view(batch, channels)
        y = self.fc(y).view(batch, channels, 1, 1)
        return x * y


class BottleneckResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, reduction=16):
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, groups=out_channels)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels, in_channels, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(in_channels)
        self.se = SEBlock(in_channels, reduction)

    def forward(self, x):
        residual = x
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.bn3(self.conv3(x))
        x = self.se(x)
        return x + residual


class MODEL_From_Scratch(nn.Module):
    def __init__(self, num_classes):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),  
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(inplace=True),
            SEBlock(64)
        )
        self.residual_blocks = nn.Sequential(
            BottleneckResidualBlock(64, 128, reduction=16),
            BottleneckResidualBlock(64, 128, reduction=16),
            BottleneckResidualBlock(64, 128, reduction=16)  
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU()
        )
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(128, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(256, 128),
            nn.ReLU(inplace=True),
            nn.Dropout(p=0.3),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.residual_blocks(x)
        x = self.conv3(x)
        x = self.global_pool(x)
        x = self.fc(x)
        return x


In [None]:
# Training_Class
class PyTorch_Classification_Training_Class():
    def __init__(self
                , dataset_dir = "/home/jetson/Downloads/project/Recycle_Classification_Dataset"
                , batch_size = 16
                , train_ratio = 0.75
                ):
        
        if not os.path.isdir(dataset_dir) or not os.listdir(dataset_dir):
            if os.path.isdir(dataset_dir):
                print(f"Directory {dataset_dir} exists but is empty. Cloning dataset...")
        else:
            print(f"Directory {dataset_dir} already exists and is not empty. Using existing dataset.")
  
        self.DEVICE =  torch.device("cuda" if torch.cuda.is_available() else "cpu")
        print(f"Using device: {self.DEVICE}")

        dataset = PyTorch_Classification_Dataset_Class(dataset_dir = dataset_dir)
        dataset.__save_label_map__()
        self.num_classes = dataset.__num_classes__()
        train_size = int(train_ratio * len(dataset))
        test_size = len(dataset) - train_size
        train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
        self.train_loader = torch.utils.data.DataLoader(
            train_dataset
            , batch_size=batch_size
            , shuffle=True
        )
        self.test_loader = torch.utils.data.DataLoader(
            test_dataset
            , batch_size=batch_size
            , shuffle=False
        )
        self.model = None
        self.model_str = None
        
    def prepare_network(self):
        self.model = MODEL_From_Scratch(self.num_classes)
        self.model.to(self.DEVICE)
        self.model_str = "PyTorch_Training_From_Scratch.pth"

    def training_network(self
            , learning_rate = 0.0001
            , epochs = 10
            , step_size = 3
            , gamma = 0.3):
        
        optimizer = optim.Adam(self.model.parameters(), lr=learning_rate)
        scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)
        acc = 0.0
        for epoch in range(1, epochs + 1):
            self.model.train()
            for data, target in tqdm(self.train_loader):
                data, target = data.to(self.DEVICE), target.to(self.DEVICE)
                optimizer.zero_grad()
                output = self.model(data)
                loss = F.cross_entropy(output, target)
                loss.backward()
                optimizer.step()
            scheduler.step()
            self.model.eval()
            test_loss = 0
            correct = 0
            with torch.no_grad():
                for batch_idx, (data, target) in enumerate(tqdm(self.test_loader)):
                    data, target = data.to(self.DEVICE), target.to(self.DEVICE)
                    output = self.model(data)
                    batch_loss = F.cross_entropy(output, target, reduction='sum').item()
                    test_loss += batch_loss
                    pred = output.max(1, keepdim=True)[1]
                    correct += pred.eq(target.view_as(pred)).sum().item()

            test_loss /= len(self.test_loader.dataset)
            test_accuracy = 100.0 * correct / len(self.test_loader.dataset)
            print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(epoch, test_loss, test_accuracy))


            if acc < test_accuracy or epoch == epochs:
                acc = test_accuracy
                torch.save(self.model.state_dict(), self.model_str)
                print("model saved!")

if __name__ == "__main__":
    training_class = PyTorch_Classification_Training_Class()
    training_class.prepare_network()

In [None]:
training_class.training_network()

In [None]:
device ="cuda" if torch.cuda.is_available() else "cpu"
model_path = "/home/jetson/Downloads/PyTorch_Training_From_Scratch.pth"

model = copy.deepcopy(training_class.model)
model.load_state_dict(torch.load(model_path, map_location=device))

In [None]:
def preprocess_image(frame):
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image = image.resize((128, 128))
    image_np = np.array(image, dtype=np.float32) / 255.0
    image_np = (image_np - np.array([0.485, 0.456, 0.406], dtype=np.float32)) / np.array([0.229, 0.224, 0.225], dtype=np.float32)
    image_tensor = torch.tensor(image_np, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
    return image_tensor

def gstreamer_pipeline(
    capture_width=1280,
    capture_height=720,
    display_width=1280,
    display_height=720,
    framerate=30,
    flip_method=0
):
    return (
        f"v4l2src device=/dev/video0 ! video/x-raw, width={capture_width}, height={capture_height}, framerate={framerate}/1 ! "
f"videoconvert ! video/x-raw, format=(string)BGR ! appsink"

    )

def predict_class(model, image_tensor, device):
    model.eval()
    image_tensor = image_tensor.to(device)
    with torch.no_grad():
        start_time = time.time()
        outputs = model(image_tensor)
        inference_time = time.time() - start_time
        _, predicted = torch.max(outputs, 1)
    return predicted.item(), inference_time

def load_labels(label_map_path="label_map.txt"):
    with open(label_map_path, "r") as f:
        labels = f.read().splitlines()
    return labels

def speak(text, language="ko", filename="output.mp3"):
    tts = gTTS(text=text, lang=language)
    tts.save(filename)
    playsound.playsound(filename)
    os.remove(filename)


def load_model(model_path, num_classes, device):
    model = MODEL_From_Scratch(num_classes=num_classes)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    return model

def run_inference(model_path, label_map_path="label_map.txt"):
    labels = load_labels(label_map_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    pipeline = (
    "nvarguscamerasrc ! video/x-raw(memory:NVMM),format=NV12,width=640,height=480,framerate=30/1 ! "
    "nvvidconv ! video/x-raw,format=BGRx ! videoconvert ! video/x-raw,format=BGR ! appsink drop=1")

    cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
    if not cap.isOpened():
        print("카메라를 열 수 없습니다.")
        return

    print("카메라가 실행 중입니다. 's' 키를 눌러 분류를 실행하세요. 'q' 키를 눌러 종료하세요.")
    inference_times = []
    while True:
        ret, frame = cap.read()
        if not ret:
            print("카메라 프레임을 읽을 수 없습니다.")
            break

        cv2.imshow("Camera", frame)
        key = cv2.waitKey(1)

        if key == ord("s"):
            image_tensor = preprocess_image(frame)

            predicted_class, inference_time = predict_class(model, image_tensor, device)
            predicted_label = labels[predicted_class]
            inference_times.append(inference_time)

            print(f"Inference Time: {inference_time:.4f}s")
            
            cv2.imshow("Camera", frame)

            speak(f"으으으으 으으으으{predicted_label}으로 분류하세요")

        elif key == ord("q"): 
            if inference_times:
                avg_inference_time = sum(inference_times) / len(inference_times)
                print(f"Average Inference Time: {avg_inference_time:.4f}s")
            print("프로그램을 종료합니다.")
            break

    cap.release()
    cv2.destroyAllWindows()

# 실행
run_inference("/home/jetson/Downloads/PyTorch_Training_From_Scratch.pth", "label_map.txt")


In [3]:
def test_accuracy(model, test_loader, device):
    model = model.to(device)
    model.eval()  
    correct = 0
    total = 0
    
    with torch.no_grad():
        inference_time = []
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            start_time = time.time()
            outputs = model(inputs)
            inference_time.append(time.time() - start_time)

            _, predicted = torch.max(outputs, 1)
            
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    accuracy = (correct / total) * 100 if total > 0 else 0
    avg_inference_time = sum(inference_time) / len(inference_time) if inference_time else 0
    return accuracy, avg_inference_time

In [None]:
accuracy, t = test_accuracy(copy.deepcopy(training_class.model), training_class.test_loader, training_class.DEVICE)
print(f"Test Accuracy: {accuracy:.5f}%")
print(f"Inference Time: {t:.5f}")

In [None]:
# summary(training_class.model, (3, 128, 128), device="cuda" if torch.cuda.is_available() else "cpu")

In [None]:
print(model)

In [None]:
macs, params = tp.utils.count_ops_and_params(copy.deepcopy(training_class.model), torch.zeros(1, 3, 26, 34).to('cuda'))
print(f'Basic model Params: {params/1e6} M')

In [4]:
def measure_execution_time(model, example_inputs, device, iterations=30):
    model = model.to(device)
    model.eval()
    with torch.no_grad():
        for _ in range(iterations):
            start_time = time.time()
            _ = model(example_inputs)
            end_time = time.time()

    avg_time = (end_time - start_time) / iterations
    return avg_time

def compare_models(base_model, light_model, example_inputs, test_loader, device):
    base_macs, base_params = tp.utils.count_ops_and_params(base_model, example_inputs)
    light_macs, light_params = tp.utils.count_ops_and_params(light_model, example_inputs)

    base_flops = 2 * base_macs
    light_flops = 2 * light_macs

    base_time = measure_execution_time(base_model, example_inputs, device)
    light_time = measure_execution_time(light_model, example_inputs, device)

    base_accuracy, _ = test_accuracy(base_model, test_loader, device)
    light_accuracy, _ = test_accuracy(light_model, test_loader, device)

    print("\nComparison of Models:")
    print("-------------------------------------------------")
    print(f"Metric                  | Base Model  | light Model")
    print("-------------------------------------------------")
    print(f"Parameters (M)          | {base_params / 1e6:.3f}       | {light_params / 1e6:.3f}")
    print(f"FLOPs (M)               | {base_flops / 1e6:.2f}       | {light_flops / 1e6:.2f}")
    print(f"Execution Time (ms)     | {base_time * 1e3:.3f}        | {light_time * 1e3:.3f}")
    print(f"Accuracy (%)            | {base_accuracy:.2f}       | {light_accuracy:.2f}")
    print("-------------------------------------------------")

# Pruning(1)

In [None]:
# DependencyGraph를 기반으로 의존성을 가지는 연산들까지 Pruning을 적용하는 함수.
def prune_with_dependency_conv(model, example_inputs, ratio, device="cuda"):
    
    model = copy.deepcopy(model).to(device)
    DG = tp.DependencyGraph().build_dependency(model, example_inputs=example_inputs)
    conv_layers = [mod for mod in DG.module2node.keys() if isinstance(mod, nn.Conv2d)]

    for layer in conv_layers:
        num_channels = layer.out_channels
        num_pruned = int(num_channels * ratio)
        if num_pruned <= 0:
            continue  
        pruning_idxs = list(range(num_pruned))

        pruning_group = DG.get_pruning_group(layer, tp.prune_conv_out_channels, idxs=pruning_idxs)
        
        if DG.check_pruning_group(pruning_group):
            pruning_group.prune()

    p_acc, p_time = test_accuracy(model, training_class.test_loader, device="cuda" if torch.cuda.is_available() else "cpu")
    prune_macs, prune_params = tp.utils.count_ops_and_params(model, example_inputs)

    return model, p_acc, prune_params, 2*prune_macs, p_time

device = "cuda" if torch.cuda.is_available() else "cpu"

results1 = []
sparsities = [0.0, 0.25, 0.50, 0.75]
example_inputs = torch.randn(1, 3, 128, 128).to(device)

for s in tqdm(sparsities):
    save_path = f"pruned_model_conv{int(s * 100)}.pt"
    pruned_model, p_acc, p_params, p_flops, p_time = prune_with_dependency_conv(training_class.model, example_inputs, s, device=device)
    torch.save(pruned_model.state_dict(), save_path)

    results1.append({
        "sparsity": s,
        "accuracy": p_acc,
        "params (M)": p_params,
        "FLOPs (M)": p_flops,
        "time (s)": p_time
    })

df = pd.DataFrame(results1)

print("\nPruning Results:")
print(df.to_string(index=False))

In [None]:
# DependencyGraph를 기반으로 의존성을 가지는 연산들까지 Pruning을 적용하는 함수.
def prune_with_dependency_linear(model, example_inputs, ratio, device="cuda"):
    
    model = copy.deepcopy(model).to(device)
    DG = tp.DependencyGraph().build_dependency(model, example_inputs=example_inputs)
    linear_layers = [mod for mod in DG.module2node.keys() if isinstance(mod, nn.Linear)]

    for layer in linear_layers:
        num_neurons = layer.out_features
        num_pruned = int(num_neurons * ratio)
        if num_pruned <= 0:
            continue
        pruning_idxs = list(range(num_pruned))

        pruning_group = DG.get_pruning_group(layer, tp.prune_linear_out_channels, idxs=pruning_idxs)
        if DG.check_pruning_group(pruning_group):
            pruning_group.prune()

    p_acc, p_time = test_accuracy(model, training_class.test_loader, device="cuda" if torch.cuda.is_available() else "cpu")

    prune_macs, prune_params = tp.utils.count_ops_and_params(model, example_inputs)

    return model, p_acc, prune_params, 2*prune_macs, p_time

device = "cuda" if torch.cuda.is_available() else "cpu"
example_inputs = torch.randn(1, 3, 128, 128).to(device)

results2 = []
sparsities = [0.0, 0.25, 0.50, 0.75]
for s in tqdm(sparsities):
    save_path = f"pruned_model_linear{int(s * 100)}.pt"
    pruned_model2, p_acc2, p_params2, p_flops2, p_time2 = prune_with_dependency_linear(training_class.model, example_inputs, s, device=device)
    torch.save(pruned_model2.state_dict(), save_path)

    results2.append({
        "sparsity": s,
        "accuracy": p_acc2,
        "params (M)": p_params2,
        "FLOPs (M)": p_flops2,
        "time (s)": p_time2
    })

df = pd.DataFrame(results2)

print("\nPruning Results:")
print(df.to_string(index=False))

In [None]:
def prune_with_dependency_all(model, example_inputs, ratio_conv, ratio_linear, device="cuda" if torch.cuda.is_available() else "cpu"):
    
    model = copy.deepcopy(model).to(device)
    DG = tp.DependencyGraph().build_dependency(model, example_inputs=example_inputs)
    
    conv_layers = [mod for mod in DG.module2node.keys() if isinstance(mod, nn.Conv2d)]
    for layer in conv_layers:
        num_channels = layer.out_channels
        num_pruned = int(num_channels * ratio_conv)
        if num_pruned <= 0:
            continue
        pruning_idxs = list(range(num_pruned))
        pruning_group = DG.get_pruning_group(layer, tp.prune_conv_out_channels, idxs=pruning_idxs)
        if DG.check_pruning_group(pruning_group):
            pruning_group.prune()
    
    linear_layers = [mod for mod in DG.module2node.keys() if isinstance(mod, nn.Linear)]
    for layer in linear_layers:
        num_neurons = layer.out_features
        num_pruned = int(num_neurons * ratio_linear)
        if num_pruned <= 0:
            continue
        pruning_idxs = list(range(num_pruned))
        pruning_group = DG.get_pruning_group(layer, tp.prune_linear_out_channels, idxs=pruning_idxs)
        if DG.check_pruning_group(pruning_group):
            pruning_group.prune()
    
    p_acc, p_time = test_accuracy(model, training_class.test_loader, device=device)
    prune_macs, prune_params = tp.utils.count_ops_and_params(model, example_inputs)

    return model, p_acc, prune_params, 2*prune_macs, p_time

device = "cuda" if torch.cuda.is_available() else "cpu"
example_inputs = torch.randn(1, 3, 128, 128).to(device)

results_all = []
sparsities = [0.0, 0.25, 0.50, 0.75]

for s in tqdm(sparsities):
    save_path = f"pruned_model_all_{int(s * 100)}.pt"
    pruned_model, p_acc, p_params, p_flops, p_time = prune_with_dependency_all(
        model=training_class.model, 
        example_inputs=example_inputs, 
        ratio_conv=s, 
        ratio_linear=s, 
        device=device
    )
    torch.save(pruned_model.state_dict(), save_path)
    
    results_all.append({
        "sparsity": s,
        "accuracy": p_acc,
        "params (M)": p_params,
        "FLOPs (M)": p_flops,
        "time (s)": p_time
    })

df = pd.DataFrame(results_all)

print("\nCombined Pruning Results:")
print(df.to_string(index=False))

In [None]:
# def load_pruned_model(model_path, device):
#     p_model = copy.deepcopy(training_class.model)
#     p_model.load_state_dict(torch.load(model_path, map_location=device))
#     p_model.to(device)
#     p_model.eval()  
#     return p_model

# def calculate_flops_and_params(model, example_inputs):
#     # flops, params = thop.profile(model, inputs=(example_inputs,), verbose=False)
#     prune_macs, prune_params = tp.utils.count_ops_and_params(model, example_inputs)
#     return 2*prune_macs, prune_params

In [None]:
def preprocess_image(frame):
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image = image.resize((128, 128))
    image_np = np.array(image, dtype=np.float32) / 255.0
    image_np = (image_np - np.array([0.485, 0.456, 0.406], dtype=np.float32)) / np.array([0.229, 0.224, 0.225], dtype=np.float32)
    image_tensor = torch.tensor(image_np, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
    return image_tensor

def gstreamer_pipeline(
    capture_width=1280,
    capture_height=720,
    display_width=1280,
    display_height=720,
    framerate=30,
    flip_method=0
):
    return (
        f"v4l2src device=/dev/video0 ! video/x-raw, width={capture_width}, height={capture_height}, framerate={framerate}/1 ! "
f"videoconvert ! video/x-raw, format=(string)BGR ! appsink"

    )

def predict_class(model, image_tensor, device):
    model.eval()
    image_tensor = image_tensor.to(device)
    with torch.no_grad():
        start_time = time.time()
        outputs = model(image_tensor)
        inference_time = time.time() - start_time
        _, predicted = torch.max(outputs, 1)
    return predicted.item(), inference_time

def load_labels(label_map_path="label_map.txt"):
    with open(label_map_path, "r") as f:
        labels = f.read().splitlines()
    return labels

def speak(text, language="ko", filename="output.mp3"):
    tts = gTTS(text=text, lang=language)
    tts.save(filename)
    playsound.playsound(filename)
    os.remove(filename)


def load_model(model_path, num_classes, device):
    model =  copy.deepcopy(training_class.model)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    return model

def run_inference(model_path, label_map_path="label_map.txt"):
    labels = load_labels(label_map_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    pipeline = (
    "nvarguscamerasrc ! video/x-raw(memory:NVMM),format=NV12,width=640,height=480,framerate=30/1 ! "
    "nvvidconv ! video/x-raw,format=BGRx ! videoconvert ! video/x-raw,format=BGR ! appsink drop=1")

    cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
    if not cap.isOpened():
        print("카메라를 열 수 없습니다.")
        return

    print("카메라가 실행 중입니다. 's' 키를 눌러 분류를 실행하세요. 'q' 키를 눌러 종료하세요.")
    inference_times = []
    while True:
        ret, frame = cap.read()
        if not ret:
            print("카메라 프레임을 읽을 수 없습니다.")
            break

        cv2.imshow("Camera", frame)
        key = cv2.waitKey(1)

        if key == ord("s"):
            image_tensor = preprocess_image(frame)

            predicted_class, inference_time = predict_class(model, image_tensor, device)
            predicted_label = labels[predicted_class]
            inference_times.append(inference_time)

            print(f"Inference Time: {inference_time:.4f}s")
            
            cv2.imshow("Camera", frame)

            speak(f"으으으으 으으으으{predicted_label}으로 분류하세요")

        elif key == ord("q"): 
            if inference_times:
                avg_inference_time = sum(inference_times) / len(inference_times)
                print(f"Average Inference Time: {avg_inference_time:.4f}s")
            print("프로그램을 종료합니다.")
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
run_inference("/home/jetson/Downloads/pruned_model_all_75.pt", "label_map.txt")

# Knowledge Distillation

In [5]:
class StudentModel(nn.Module):
    def __init__(self, num_classes):
        super(StudentModel, self).__init__()
        
        def depthwise_separable_conv(in_channels, out_channels, stride=1):
            return nn.Sequential(
                nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=stride, padding=1, groups=in_channels, bias=False),
                nn.BatchNorm2d(in_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True),
            )
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
        )
        self.block1 = depthwise_separable_conv(32, 64, stride=1)
        self.block2 = depthwise_separable_conv(64, 128, stride=2)
        self.block3 = depthwise_separable_conv(128, 256, stride=2)
        self.block4 = depthwise_separable_conv(256, 512, stride=2)
        self.global_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Linear(512, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.block1(x)
        x = self.block2(x)
        x = self.block3(x)
        x = self.block4(x)
        x = self.global_pool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

def kd_loss(student_outputs, teacher_outputs, temperature):
    teacher_probs = torch.softmax(teacher_outputs / temperature, dim=1)
    student_probs = torch.log_softmax(student_outputs / temperature, dim=1)
    loss = nn.KLDivLoss(reduction='batchmean')(student_probs, teacher_probs)
    return loss * (temperature ** 2)

def combined_loss(student_outputs, targets, teacher_outputs, criterion, temperature, alpha):
    hard_loss = criterion(student_outputs, targets)
    soft_loss = kd_loss(student_outputs, teacher_outputs, temperature)
    return alpha * hard_loss + (1 - alpha) * soft_loss

def train_student_with_kd(
    teacher_model,
    student_model,
    train_loader,
    val_loader,
    device,
    epochs=10,
    learning_rate=0.001,
    temperature=3.0,
    alpha=0.5
):
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(student_model.parameters(), lr=learning_rate)

    teacher_model = teacher_model.to(device)
    student_model = student_model.to(device)
    teacher_model.eval()

    for epoch in range(epochs):
        student_model.train()
        train_loss = 0.0

        for inputs, targets in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs}"):
            inputs, targets = inputs.to(device), targets.to(device)

            with torch.no_grad():
                teacher_outputs = teacher_model(inputs)
            student_outputs = student_model(inputs)

            loss = combined_loss(student_outputs, targets, teacher_outputs, criterion, temperature, alpha)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        student_model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                teacher_outputs = teacher_model(inputs)
                student_outputs = student_model(inputs)

                loss = combined_loss(student_outputs, targets, teacher_outputs, criterion, temperature, alpha)
                val_loss += loss.item()

                _, predicted = torch.max(student_outputs, 1)
                correct += (predicted == targets).sum().item()
                total += targets.size(0)

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        val_accuracy = 100 * correct / total

        print(f"Epoch {epoch + 1}/{epochs}")

    return student_model

In [None]:
dataset_dir = "/home/jetson/Downloads/project/Recycle_Classification_Dataset"
dataset = PyTorch_Classification_Dataset_Class(dataset_dir=dataset_dir)
train_ratio = 0.8
train_size = int(train_ratio * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=0)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=0)

teacher_model = copy.deepcopy(training_class.model)
student_model = StudentModel(num_classes=dataset.__num_classes__())

In [None]:
# trained_student_model = train_student_with_kd(
#     teacher_model=teacher_model,
#     student_model=student_model,
#     train_loader=train_loader,
#     val_loader=val_loader,
#     device="cuda" if torch.cuda.is_available() else "cpu",
#     epochs=10,
#     learning_rate=0.001,
#     temperature=3.0,
#     alpha=0.5
# )

# torch.save(trained_student_model.state_dict(), "lightweight_student_model.pth")
# print("Student model saved as 'lightweight_student_model.pth'")

In [None]:
trained_student_model = StudentModel(2)
trained_student_model.load_state_dict(torch.load("/home/jetson/Downloads/lightweight_student_model.pth", map_location=device))

trained_student_model = trained_student_model.to(device).to(dtype=torch.float32) 
teacher_model = copy.deepcopy(training_class.model).to(device).to(dtype=torch.float32) 

example_inputs = torch.randn(1, 3, 128, 128, dtype=torch.float32).to(device)
compare_models(teacher_model, trained_student_model, example_inputs, val_loader, device=device)

In [None]:
def preprocess_image(frame):
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image = image.resize((128, 128))
    image_np = np.array(image, dtype=np.float32) / 255.0
    image_np = (image_np - np.array([0.485, 0.456, 0.406], dtype=np.float32)) / np.array([0.229, 0.224, 0.225], dtype=np.float32)
    image_tensor = torch.tensor(image_np, dtype=torch.float32).permute(2, 0, 1).unsqueeze(0)
    return image_tensor

def gstreamer_pipeline(
    capture_width=1280,
    capture_height=720,
    display_width=1280,
    display_height=720,
    framerate=30,
    flip_method=0
):
    return (
        f"v4l2src device=/dev/video0 ! video/x-raw, width={capture_width}, height={capture_height}, framerate={framerate}/1 ! "
f"videoconvert ! video/x-raw, format=(string)BGR ! appsink"
    )

def predict_class(model, image_tensor, device):
    model.eval()
    image_tensor = image_tensor.to(device)
    with torch.no_grad():
        start_time = time.time()
        outputs = model(image_tensor)
        inference_time = time.time() - start_time
        _, predicted = torch.max(outputs, 1)
    return predicted.item(), inference_time

def load_labels(label_map_path="label_map.txt"):
    with open(label_map_path, "r") as f:
        labels = f.read().splitlines()
    return labels

def speak(text, language="ko", filename="output.mp3"):
    tts = gTTS(text=text, lang=language)
    tts.save(filename)
    playsound.playsound(filename)
    os.remove(filename)


def load_model(model_path, num_classes, device):
    model = StudentModel(num_classes=num_classes)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    return model

def run_inference(model_path, label_map_path="label_map.txt"):
    labels = load_labels(label_map_path)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    pipeline = (
    "nvarguscamerasrc ! video/x-raw(memory:NVMM),format=NV12,width=640,height=480,framerate=30/1 ! "
    "nvvidconv ! video/x-raw,format=BGRx ! videoconvert ! video/x-raw,format=BGR ! appsink drop=1")

    cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER)
    if not cap.isOpened():
        print("카메라를 열 수 없습니다.")
        return

    print("카메라가 실행 중입니다. 's' 키를 눌러 분류를 실행하세요. 'q' 키를 눌러 종료하세요.")
    inference_times = []
    while True:
        ret, frame = cap.read()
        if not ret:
            print("카메라 프레임을 읽을 수 없습니다.")
            break

        cv2.imshow("Camera", frame)
        key = cv2.waitKey(1)

        if key == ord("s"):
            image_tensor = preprocess_image(frame)

            predicted_class, inference_time = predict_class(model, image_tensor, device)
            predicted_label = labels[predicted_class]
            inference_times.append(inference_time)

            print(f"Inference Time: {inference_time:.4f}s")
            
            cv2.imshow("Camera", frame)

            speak(f"으으으으 으으으으{predicted_label}으로 분류하세요")

        elif key == ord("q"): 
            if inference_times:
                avg_inference_time = sum(inference_times) / len(inference_times)
                print(f"Average Inference Time: {avg_inference_time:.4f}s")
            print("프로그램을 종료합니다.")
            break

    cap.release()
    cv2.destroyAllWindows()

In [None]:
run_inference("/home/jetson/Downloads/lightweight_student_model.pth", "label_map.txt")