In [None]:
temp_max_x, temp_max_y 사용안함

In [None]:
multimodal
- AGV
    - train(01~14)
    - val(17~18)
    - test(15~16)
- OHT
    - train(01~14)
    - val(17~18)
    - test(15~16)

In [None]:
AGV 멀티모달 93%

In [16]:
import os
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import time

# 1. 멀티모달 데이터셋 정의
class MultimodalDataset(Dataset):
    def __init__(self, csv_path, bin_root_folder, split_folder, img_dim_h, img_dim_w):
        self.data = []
        self.img_dim_h = img_dim_h
        self.img_dim_w = img_dim_w

        # 모든 BIN 파일의 경로 수집
        bin_files = {}
        split_path = os.path.join(bin_root_folder, split_folder)
        for root, _, files in os.walk(split_path):
            for file in files:
                if file.endswith(".bin"):
                    bin_files[file] = os.path.join(root, file)

        # CSV 파일 읽기
        df = pd.read_csv(csv_path)
        #features = ["NTC", "PM10", "PM2.5", "PM1.0", "CT1", "CT2", "CT3", "CT4", "temp_max_value"]
        features = ["NTC", "PM10", "PM2.5", "PM1.0", "CT1", "CT2", "CT3", "CT4", "temp_max_value", "ex_temperature", "ex_humidity", "ex_illuminance"]

        for _, row in df.iterrows():
            bin_filename = row['bin_filename']
            if bin_filename in bin_files:
                bin_path = bin_files[bin_filename]
                try:
                    img_data = np.load(bin_path).reshape((img_dim_h, img_dim_w))
                except Exception as e:
                    print(f"[Error] BIN 파일 로드 실패: {bin_path}, {e}")
                    continue

                aux_data = row[features].values.astype(np.float32)
                label = int(row['state'])
                self.data.append((img_data, aux_data, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_data, aux_data, label = self.data[idx]
        img_data = torch.tensor(img_data, dtype=torch.float32).unsqueeze(0)
        aux_data = torch.tensor(aux_data, dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long)
        return img_data, aux_data, label


# 2. 데이터 로더 함수
def load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size=32):
    train_dataset = MultimodalDataset(csv_path, bin_root_folder, 'train', img_dim_h, img_dim_w)
    val_dataset = MultimodalDataset(csv_path, bin_root_folder, 'val', img_dim_h, img_dim_w)
    test_dataset = MultimodalDataset(csv_path, bin_root_folder, 'test', img_dim_h, img_dim_w)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader


# 3. 모델 정의
class ViTFeatureExtractor(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, dropout_rate=0.5):
        super().__init__()
        self.vit = nn.Transformer(
            d_model=embed_dim, nhead=num_heads, num_encoder_layers=depth,
            batch_first=True, dropout=dropout_rate
        )
        self.patch_embed = nn.Conv2d(1, embed_dim, kernel_size=patch_size, stride=patch_size)
        num_patches = (img_dim_h // patch_size) * (img_dim_w // patch_size)
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches, embed_dim))
        self.dropout = nn.Dropout(p=dropout_rate)

    def forward(self, x):
        patches = self.patch_embed(x).flatten(2).transpose(1, 2)
        x = patches + self.pos_embedding
        x = self.vit(x, x)
        x = self.dropout(x.mean(dim=1))
        return x


class SoftLabelEncoder(nn.Module):
    def __init__(self, aux_input_dim, embed_dim, dropout_rate=0.5):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(aux_input_dim, embed_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(embed_dim, embed_dim)
        )

    def forward(self, aux_data):
        return self.fc(aux_data)


class CrossAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)

    def forward(self, query, key):
        attn_output, _ = self.attention(query, key, key)
        return attn_output


class ConditionClassifier(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, aux_input_dim, num_classes, dropout_rate=0.5):
        super().__init__()
        self.vit = ViTFeatureExtractor(img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, dropout_rate)
        self.soft_label_encoder = SoftLabelEncoder(aux_input_dim, embed_dim, dropout_rate)
        self.cross_attention = CrossAttention(embed_dim, num_heads)
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, images, aux_data):
        visual_features = self.vit(images)
        aux_features = self.soft_label_encoder(aux_data)
        visual_features = visual_features.unsqueeze(1)
        aux_features = aux_features.unsqueeze(1)
        integrated_features = self.cross_attention(visual_features, aux_features).squeeze(1)
        return self.classifier(integrated_features)


# 4. 학습 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler=None, num_epochs=30, patience=5):
    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()

        # Training
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        for images, aux_data, labels in train_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, aux_data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
        train_accuracy = 100 * correct_train / total_train

        # Validation
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0
        with torch.no_grad():
            for images, aux_data, labels in val_loader:
                images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
                outputs = model(images, aux_data)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()
        val_accuracy = 100 * correct_val / total_val

        end_time = time.time()

        # Early Stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "AGV/best_model.pth")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch + 1}")
                break

        # Scheduler step
        if scheduler:
            scheduler.step()

        # Print epoch results
        print(f"Epoch {epoch + 1}: Train Loss = {train_loss / len(train_loader):.4f}, "
              f"Train Accuracy = {train_accuracy:.2f}%, "
              f"Val Loss = {val_loss / len(val_loader):.4f}, "
              f"Val Accuracy = {val_accuracy:.2f}%, "
              f"Time = {end_time - start_time:.2f}s")


# 5. 실행
if __name__ == "__main__":
    csv_path = "C:/Users/82103/Desktop/multimodal/AGV/agv_merged_output.csv"
    bin_root_folder = "C:/Users/82103/Desktop/multimodal/AGV"
    img_dim_h, img_dim_w = 120, 160
    #aux_input_dim = 9
    aux_input_dim = 12
    num_classes = 4
    batch_size = 32

    # 데이터 로더 생성
    train_loader, val_loader, test_loader = load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # 모델 정의
    model = ConditionClassifier(
        img_dim_h, img_dim_w, patch_size=16, embed_dim=128, num_heads=4,
        depth=6, aux_input_dim=aux_input_dim, num_classes=num_classes, dropout_rate=0.2
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=5e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    # 학습
    train_model(
        model, train_loader, val_loader, criterion, optimizer,
        scheduler, num_epochs=100, patience=8
    )

    # 테스트
    model.load_state_dict(torch.load("AGV/best_model.pth"))
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, aux_data, labels in test_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            outputs = model(images, aux_data)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Test Accuracy: {100 * correct / total:.2f}%")


Using device: cuda
Epoch 1: Train Loss = 0.3027, Train Accuracy = 87.59%, Val Loss = 0.6203, Val Accuracy = 87.61%, Time = 65.75s
Epoch 2: Train Loss = 0.2130, Train Accuracy = 91.16%, Val Loss = 0.7626, Val Accuracy = 86.72%, Time = 65.88s
Epoch 3: Train Loss = 0.1948, Train Accuracy = 91.79%, Val Loss = 0.7671, Val Accuracy = 86.30%, Time = 67.09s
Epoch 4: Train Loss = 0.1912, Train Accuracy = 91.92%, Val Loss = 0.8716, Val Accuracy = 86.88%, Time = 78.75s
Epoch 5: Train Loss = 0.1878, Train Accuracy = 91.99%, Val Loss = 0.8917, Val Accuracy = 88.70%, Time = 68.23s
Epoch 6: Train Loss = 0.1708, Train Accuracy = 92.57%, Val Loss = 0.9025, Val Accuracy = 88.12%, Time = 66.25s
Epoch 7: Train Loss = 0.1680, Train Accuracy = 92.52%, Val Loss = 0.9756, Val Accuracy = 88.44%, Time = 66.16s
Epoch 8: Train Loss = 0.1664, Train Accuracy = 92.65%, Val Loss = 0.9653, Val Accuracy = 87.65%, Time = 65.90s
Early stopping at epoch 9
Test Accuracy: 93.03%


In [None]:
OHT 멀티모달 95%

In [12]:
import os
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
import time

# 1. 멀티모달 데이터셋 정의
class MultimodalDataset(Dataset):
    def __init__(self, csv_path, bin_root_folder, split_folder, img_dim_h, img_dim_w):
        self.data = []
        self.img_dim_h = img_dim_h
        self.img_dim_w = img_dim_w

        # 모든 BIN 파일의 경로 수집
        bin_files = {}
        split_path = os.path.join(bin_root_folder, split_folder)
        for root, _, files in os.walk(split_path):
            for file in files:
                if file.endswith(".bin"):
                    bin_files[file] = os.path.join(root, file)

        # CSV 파일 읽기
        df = pd.read_csv(csv_path)
        #features = ["NTC", "PM10", "PM2.5", "PM1.0", "CT1", "CT2", "CT3", "CT4", "temp_max_value"]
        features = ["NTC", "PM10", "PM2.5", "PM1.0", "CT1", "CT2", "CT3", "CT4", "temp_max_value", "ex_temperature", "ex_humidity", "ex_illuminance"]

        for _, row in df.iterrows():
            bin_filename = row['bin_filename']
            if bin_filename in bin_files:
                bin_path = bin_files[bin_filename]
                try:
                    img_data = np.load(bin_path).reshape((img_dim_h, img_dim_w))
                except Exception as e:
                    print(f"[Error] BIN 파일 로드 실패: {bin_path}, {e}")
                    continue

                aux_data = row[features].values.astype(np.float32)
                label = int(row['state'])
                self.data.append((img_data, aux_data, label))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_data, aux_data, label = self.data[idx]
        img_data = torch.tensor(img_data, dtype=torch.float32).unsqueeze(0)
        aux_data = torch.tensor(aux_data, dtype=torch.float32)
        label = torch.tensor(label, dtype=torch.long)
        return img_data, aux_data, label


# 2. 데이터 로더 함수
def load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size=32):
    train_dataset = MultimodalDataset(csv_path, bin_root_folder, 'train', img_dim_h, img_dim_w)
    val_dataset = MultimodalDataset(csv_path, bin_root_folder, 'val', img_dim_h, img_dim_w)
    test_dataset = MultimodalDataset(csv_path, bin_root_folder, 'test', img_dim_h, img_dim_w)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader


# 3. 모델 정의
class ViTFeatureExtractor(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, dropout_rate=0.5):
        super().__init__()
        self.vit = nn.Transformer(
            d_model=embed_dim, nhead=num_heads, num_encoder_layers=depth,
            batch_first=True, dropout=dropout_rate
        )
        self.patch_embed = nn.Conv2d(1, embed_dim, kernel_size=patch_size, stride=patch_size)
        num_patches = (img_dim_h // patch_size) * (img_dim_w // patch_size)
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches, embed_dim))
        self.dropout = nn.Dropout(p=dropout_rate)

    def forward(self, x):
        patches = self.patch_embed(x).flatten(2).transpose(1, 2)
        x = patches + self.pos_embedding
        x = self.vit(x, x)
        x = self.dropout(x.mean(dim=1))
        return x


class SoftLabelEncoder(nn.Module):
    def __init__(self, aux_input_dim, embed_dim, dropout_rate=0.5):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(aux_input_dim, embed_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(embed_dim, embed_dim)
        )

    def forward(self, aux_data):
        return self.fc(aux_data)


class CrossAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)

    def forward(self, query, key):
        attn_output, _ = self.attention(query, key, key)
        return attn_output


class ConditionClassifier(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, aux_input_dim, num_classes, dropout_rate=0.5):
        super().__init__()
        self.vit = ViTFeatureExtractor(img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, dropout_rate)
        self.soft_label_encoder = SoftLabelEncoder(aux_input_dim, embed_dim, dropout_rate)
        self.cross_attention = CrossAttention(embed_dim, num_heads)
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU(),
            nn.Dropout(p=dropout_rate),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, images, aux_data):
        visual_features = self.vit(images)
        aux_features = self.soft_label_encoder(aux_data)
        visual_features = visual_features.unsqueeze(1)
        aux_features = aux_features.unsqueeze(1)
        integrated_features = self.cross_attention(visual_features, aux_features).squeeze(1)
        return self.classifier(integrated_features)


# 4. 학습 함수
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler=None, num_epochs=30, patience=5):
    best_val_loss = float('inf')
    patience_counter = 0

    for epoch in range(num_epochs):
        model.train()
        start_time = time.time()

        # Training
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        for images, aux_data, labels in train_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, aux_data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()
        train_accuracy = 100 * correct_train / total_train

        # Validation
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0
        with torch.no_grad():
            for images, aux_data, labels in val_loader:
                images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
                outputs = model(images, aux_data)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()
        val_accuracy = 100 * correct_val / total_val

        end_time = time.time()

        # Early Stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(model.state_dict(), "OHT/best_model.pth")
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {epoch + 1}")
                break

        # Scheduler step
        if scheduler:
            scheduler.step()

        # Print epoch results
        print(f"Epoch {epoch + 1}: Train Loss = {train_loss / len(train_loader):.4f}, "
              f"Train Accuracy = {train_accuracy:.2f}%, "
              f"Val Loss = {val_loss / len(val_loader):.4f}, "
              f"Val Accuracy = {val_accuracy:.2f}%, "
              f"Time = {end_time - start_time:.2f}s")


# 5. 실행
if __name__ == "__main__":
    csv_path = "C:/Users/82103/Desktop/multimodal/OHT/oht_merged_output.csv"
    bin_root_folder = "C:/Users/82103/Desktop/multimodal/OHT"
    img_dim_h, img_dim_w = 120, 160
    #aux_input_dim = 9
    aux_input_dim = 12
    num_classes = 4
    batch_size = 32

    # 데이터 로더 생성
    train_loader, val_loader, test_loader = load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # 모델 정의
    model = ConditionClassifier(
        img_dim_h, img_dim_w, patch_size=16, embed_dim=128, num_heads=4,
        depth=8, aux_input_dim=aux_input_dim, num_classes=num_classes, dropout_rate=0.2
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=5e-4)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    # 학습
    train_model(
        model, train_loader, val_loader, criterion, optimizer,
        scheduler, num_epochs=100, patience=8
    )

    # 테스트
    model.load_state_dict(torch.load("OHT/best_model.pth"))
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, aux_data, labels in test_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            outputs = model(images, aux_data)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Test Accuracy: {100 * correct / total:.2f}%")


Using device: cuda
Epoch 1: Train Loss = 0.5292, Train Accuracy = 78.58%, Val Loss = 0.2562, Val Accuracy = 91.65%, Time = 118.94s
Epoch 2: Train Loss = 0.3289, Train Accuracy = 87.20%, Val Loss = 0.2776, Val Accuracy = 90.77%, Time = 120.03s
Epoch 3: Train Loss = 0.2876, Train Accuracy = 88.94%, Val Loss = 0.1820, Val Accuracy = 93.31%, Time = 118.85s
Epoch 4: Train Loss = 0.2576, Train Accuracy = 89.98%, Val Loss = 0.2089, Val Accuracy = 91.38%, Time = 118.90s
Epoch 5: Train Loss = 0.2387, Train Accuracy = 90.68%, Val Loss = 0.2021, Val Accuracy = 91.39%, Time = 119.09s
Epoch 6: Train Loss = 0.1936, Train Accuracy = 92.14%, Val Loss = 0.1677, Val Accuracy = 92.82%, Time = 119.41s
Epoch 7: Train Loss = 0.1888, Train Accuracy = 92.24%, Val Loss = 0.1544, Val Accuracy = 93.88%, Time = 119.03s
Epoch 8: Train Loss = 0.1847, Train Accuracy = 92.16%, Val Loss = 0.1564, Val Accuracy = 93.58%, Time = 118.94s
Epoch 9: Train Loss = 0.1834, Train Accuracy = 92.39%, Val Loss = 0.1532, Val Accurac

In [None]:
---------------------------------------------------------------------------------------------------------------------

In [None]:
AGV 멀티모달

In [14]:
import os
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

# 1. 멀티모달 데이터셋 정의
class MultimodalDataset(Dataset):
    def __init__(self, csv_path, bin_root_folder, split_folder, img_dim_h, img_dim_w):
        """
        멀티모달 데이터셋 클래스
        :param csv_path: CSV 파일 경로
        :param bin_root_folder: BIN 파일들이 위치한 루트 폴더
        :param split_folder: 데이터 분할 폴더 ('train', 'val', 'test')
        :param img_dim_h: 이미지 높이
        :param img_dim_w: 이미지 너비
        """
        self.data = []
        self.missing_count = 0  # BIN 파일 누락 카운트
        self.img_dim_h = img_dim_h
        self.img_dim_w = img_dim_w

        # 모든 BIN 파일의 경로 수집 (하위 폴더 포함)
        bin_files = {}
        split_path = os.path.join(bin_root_folder, split_folder)
        for root, _, files in os.walk(split_path):
            for file in files:
                if file.endswith(".bin"):
                    bin_files[file] = os.path.join(root, file)

        # CSV 파일 읽기
        df = pd.read_csv(csv_path)

        # 필요한 features만 선택
        features = ["NTC", "PM10", "PM2.5", "PM1.0", "CT1", "CT2", "CT3", "CT4", "temp_max_value"]

        # 중복된 파일 처리 방지
        missing_files = set()
        for _, row in df.iterrows():
            bin_filename = row['bin_filename']  # CSV의 BIN 파일명

            if bin_filename in bin_files:
                bin_path = bin_files[bin_filename]

                # BIN 파일 로드
                try:
                    img_data = np.load(bin_path).reshape((img_dim_h, img_dim_w))
                except Exception as e:
                    print(f"[Error] BIN 파일 로드 실패: {bin_path}, {e}")
                    continue

                # 보조 데이터와 라벨 추출
                aux_data = row[features].values.astype(np.float32)
                label = int(row['state'])

                self.data.append((img_data, aux_data, label))
            else:
                missing_files.add(bin_filename)

        self.missing_count = len(missing_files)
        print(f"[INFO] {split_folder} set missing BIN files: {self.missing_count}")

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_data, aux_data, label = self.data[idx]

        # 이미지 데이터 텐서로 변환
        img_data = torch.tensor(img_data, dtype=torch.float32).unsqueeze(0)  # [1, img_dim_h, img_dim_w]
        aux_data = torch.tensor(aux_data, dtype=torch.float32)  # [aux_input_dim]
        label = torch.tensor(label, dtype=torch.long)  # 정답 라벨
        return img_data, aux_data, label


# 2. 데이터 로더 함수
def load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size=32):
    """
    데이터 로더 생성
    :param csv_path: CSV 파일 경로
    :param bin_root_folder: BIN 파일들이 위치한 루트 폴더
    :param img_dim_h: 이미지 높이
    :param img_dim_w: 이미지 너비
    :param batch_size: 배치 크기
    """
    train_dataset = MultimodalDataset(csv_path, bin_root_folder, 'train', img_dim_h, img_dim_w)
    val_dataset = MultimodalDataset(csv_path, bin_root_folder, 'val', img_dim_h, img_dim_w)
    test_dataset = MultimodalDataset(csv_path, bin_root_folder, 'test', img_dim_h, img_dim_w)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader


# 3. 모델 정의
class ViTFeatureExtractor(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth):
        super().__init__()
        self.vit = nn.Transformer(
            d_model=embed_dim,
            nhead=num_heads,
            num_encoder_layers=depth,
            batch_first=True
        )
        self.patch_embed = nn.Conv2d(1, embed_dim, kernel_size=patch_size, stride=patch_size)
        num_patches = (img_dim_h // patch_size) * (img_dim_w // patch_size)
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches, embed_dim))

    def forward(self, x):
        patches = self.patch_embed(x).flatten(2).transpose(1, 2)
        x = patches + self.pos_embedding
        x = self.vit(x, x)
        return x.mean(dim=1)


class SoftLabelEncoder(nn.Module):
    def __init__(self, aux_input_dim, embed_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(aux_input_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, embed_dim)
        )

    def forward(self, aux_data):
        return self.fc(aux_data)


class CrossAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)

    def forward(self, query, key):
        attn_output, _ = self.attention(query, key, key)
        return attn_output


class ConditionClassifier(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, aux_input_dim, num_classes):
        super().__init__()
        self.vit = ViTFeatureExtractor(img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth)
        self.soft_label_encoder = SoftLabelEncoder(aux_input_dim, embed_dim)
        self.cross_attention = CrossAttention(embed_dim, num_heads)
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, images, aux_data):
        visual_features = self.vit(images)
        aux_features = self.soft_label_encoder(aux_data)

        visual_features = visual_features.unsqueeze(1)
        aux_features = aux_features.unsqueeze(1)
        integrated_features = self.cross_attention(visual_features, aux_features).squeeze(1)

        return self.classifier(integrated_features)


# 4. 실행
if __name__ == "__main__":
    csv_path = "C:/Users/82103/Desktop/multimodal/AGV/agv_merged_output.csv"
    bin_root_folder = "C:/Users/82103/Desktop/multimodal/AGV"
    img_dim_h, img_dim_w = 120, 160
    aux_input_dim = 9  # 선택된 features의 개수
    num_classes = 4
    batch_size = 32

    # 데이터 로더 생성
    train_loader, val_loader, test_loader = load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 모델 정의
    model = ConditionClassifier(
        img_dim_h, img_dim_w, patch_size=16, embed_dim=128, num_heads=4,
        depth=6, aux_input_dim=aux_input_dim, num_classes=num_classes
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=5e-4)

    # 학습
    for epoch in range(10):
        model.train()
        for images, aux_data, labels in train_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, aux_data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch + 1}: Loss = {loss.item():.4f}")

    # 테스트
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, aux_data, labels in test_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            outputs = model(images, aux_data)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Test Accuracy: {100 * correct / total:.2f}%")


[INFO] train set missing BIN files: 10104
[INFO] val set missing BIN files: 40427
[INFO] test set missing BIN files: 40427
Epoch 1: Loss = 0.1366
Epoch 2: Loss = 0.1603
Epoch 3: Loss = 0.1558
Epoch 4: Loss = 0.1274
Epoch 5: Loss = 0.1327
Epoch 6: Loss = 0.2687
Epoch 7: Loss = 0.1072
Epoch 8: Loss = 0.1310
Epoch 9: Loss = 0.1591
Epoch 10: Loss = 0.2064
Test Accuracy: 94.64%


In [None]:
OHT 멀티모달

In [5]:
import os
import torch
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim

# 1. 멀티모달 데이터셋 정의
class MultimodalDataset(Dataset):
    def __init__(self, csv_path, bin_root_folder, split_folder, img_dim_h, img_dim_w):
        self.data = []
        self.missing_count = 0
        self.img_dim_h = img_dim_h
        self.img_dim_w = img_dim_w

        # BIN 파일 경로 수집
        bin_files = {}
        split_path = os.path.join(bin_root_folder, split_folder)
        for root, _, files in os.walk(split_path):
            for file in files:
                if file.endswith(".bin"):
                    bin_files[file] = os.path.join(root, file)

        # CSV 읽기
        df = pd.read_csv(csv_path)
        features = ["NTC", "PM10", "PM2.5", "PM1.0", "CT1", "CT2", "CT3", "CT4", "temp_max_value"]

        # 매칭되지 않는 파일 확인
        missing_files = set()
        unmatched_files = []
        for _, row in df.iterrows():
            bin_filename = row['bin_filename']
            if bin_filename in bin_files:
                try:
                    img_data = np.load(bin_files[bin_filename]).reshape((img_dim_h, img_dim_w))
                except Exception as e:
                    print(f"[Error] BIN 파일 로드 실패: {bin_files[bin_filename]}, {e}")
                    continue
                aux_data = row[features].values.astype(np.float32)
                label = int(row['state'])
                self.data.append((img_data, aux_data, label))
            else:
                missing_files.add(bin_filename)
                unmatched_files.append(bin_filename)

        # 누락된 파일 정보 출력
        self.missing_count = len(missing_files)
        print(f"[INFO] {split_folder} set missing BIN files: {self.missing_count}")
        print(f"누락된 파일 목록(예시): {list(missing_files)[:10]}")
        print(f"CSV에 있지만 BIN 파일에 없는 파일(예시): {unmatched_files[:10]}")
        print(f"매칭되지 않는 파일 수: {len(unmatched_files)}")


    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_data, aux_data, label = self.data[idx]

        # 이미지 데이터 텐서로 변환
        img_data = torch.tensor(img_data, dtype=torch.float32).unsqueeze(0)  # [1, img_dim_h, img_dim_w]
        aux_data = torch.tensor(aux_data, dtype=torch.float32)  # [aux_input_dim]
        label = torch.tensor(label, dtype=torch.long)  # 정답 라벨
        return img_data, aux_data, label


# 2. 데이터 로더 함수
def load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size=32):
    """
    데이터 로더 생성
    :param csv_path: CSV 파일 경로
    :param bin_root_folder: BIN 파일들이 위치한 루트 폴더
    :param img_dim_h: 이미지 높이
    :param img_dim_w: 이미지 너비
    :param batch_size: 배치 크기
    """
    train_dataset = MultimodalDataset(csv_path, bin_root_folder, 'train', img_dim_h, img_dim_w)
    val_dataset = MultimodalDataset(csv_path, bin_root_folder, 'val', img_dim_h, img_dim_w)
    test_dataset = MultimodalDataset(csv_path, bin_root_folder, 'test', img_dim_h, img_dim_w)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, val_loader, test_loader


# 3. 모델 정의
class ViTFeatureExtractor(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth):
        super().__init__()
        self.vit = nn.Transformer(
            d_model=embed_dim,
            nhead=num_heads,
            num_encoder_layers=depth,
            batch_first=True
        )
        self.patch_embed = nn.Conv2d(1, embed_dim, kernel_size=patch_size, stride=patch_size)
        num_patches = (img_dim_h // patch_size) * (img_dim_w // patch_size)
        self.pos_embedding = nn.Parameter(torch.randn(1, num_patches, embed_dim))

    def forward(self, x):
        patches = self.patch_embed(x).flatten(2).transpose(1, 2)
        x = patches + self.pos_embedding
        x = self.vit(x, x)
        return x.mean(dim=1)


class SoftLabelEncoder(nn.Module):
    def __init__(self, aux_input_dim, embed_dim):
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(aux_input_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, embed_dim)
        )

    def forward(self, aux_data):
        return self.fc(aux_data)


class CrossAttention(nn.Module):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        self.attention = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)

    def forward(self, query, key):
        attn_output, _ = self.attention(query, key, key)
        return attn_output


class ConditionClassifier(nn.Module):
    def __init__(self, img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth, aux_input_dim, num_classes):
        super().__init__()
        self.vit = ViTFeatureExtractor(img_dim_h, img_dim_w, patch_size, embed_dim, num_heads, depth)
        self.soft_label_encoder = SoftLabelEncoder(aux_input_dim, embed_dim)
        self.cross_attention = CrossAttention(embed_dim, num_heads)
        self.classifier = nn.Sequential(
            nn.Linear(embed_dim, embed_dim),
            nn.ReLU(),
            nn.Linear(embed_dim, num_classes)
        )

    def forward(self, images, aux_data):
        visual_features = self.vit(images)
        aux_features = self.soft_label_encoder(aux_data)

        visual_features = visual_features.unsqueeze(1)
        aux_features = aux_features.unsqueeze(1)
        integrated_features = self.cross_attention(visual_features, aux_features).squeeze(1)

        return self.classifier(integrated_features)


# 4. 실행
if __name__ == "__main__":
    csv_path = "C:/Users/82103/Desktop/multimodal/OHT/oht_merged_output.csv"
    bin_root_folder = "C:/Users/82103/Desktop/multimodal/OHT"
    img_dim_h, img_dim_w = 120, 160
    aux_input_dim = 9  # 선택된 features의 개수
    num_classes = 4
    batch_size = 32

    # 데이터 로더 생성
    train_loader, val_loader, test_loader = load_data(csv_path, bin_root_folder, img_dim_h, img_dim_w, batch_size)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # 모델 정의
    model = ConditionClassifier(
        img_dim_h, img_dim_w, patch_size=16, embed_dim=128, num_heads=4,
        depth=6, aux_input_dim=aux_input_dim, num_classes=num_classes
    ).to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.AdamW(model.parameters(), lr=5e-4)

    # 학습
    for epoch in range(10):
        model.train()
        for images, aux_data, labels in train_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(images, aux_data)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        print(f"Epoch {epoch + 1}: Loss = {loss.item():.4f}")

    # 테스트
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, aux_data, labels in test_loader:
            images, aux_data, labels = images.to(device), aux_data.to(device), labels.to(device)
            outputs = model(images, aux_data)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Test Accuracy: {100 * correct / total:.2f}%")


[INFO] train set missing BIN files: 14684
누락된 파일 목록(예시): ['oht18_0827_045605.bin', 'oht17_0826_184833.bin', 'oht18_0827_025950.bin', 'oht18_0827_045446.bin', 'oht15_0920_091137.bin', 'oht16_0827_021214.bin', 'oht15_0826_212953.bin', 'oht18_0902_130927.bin', 'oht17_0827_044806.bin', 'oht16_0826_183226.bin']
CSV에 있지만 BIN 파일에 없는 파일(예시): ['oht15_0826_142907.bin', 'oht15_0826_142908.bin', 'oht15_0826_142909.bin', 'oht15_0826_142910.bin', 'oht15_0826_142911.bin', 'oht15_0826_142912.bin', 'oht15_0826_142913.bin', 'oht15_0826_142914.bin', 'oht15_0826_142915.bin', 'oht15_0826_142916.bin']
매칭되지 않는 파일 수: 14684
[INFO] val set missing BIN files: 59049
누락된 파일 목록(예시): ['oht02_0920_205947.bin', 'oht02_0920_162451.bin', 'oht10_0902_140617.bin', 'oht12_0826_162714.bin', 'oht09_0902_123137.bin', 'oht04_0920_211910.bin', 'oht13_0826_140705.bin', 'oht11_0920_072143.bin', 'oht12_0826_135518.bin', 'oht06_0827_024143.bin']
CSV에 있지만 BIN 파일에 없는 파일(예시): ['oht01_0826_203735.bin', 'oht01_0826_203736.bin', 'oht01_0