In [15]:
from utils.utils import train_model, base_transform, evaluate_model_with_cm, TiffDataset
import torch
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import torchvision.models as models
import numpy as np
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, classification_report
import copy
import torch.nn.functional as F

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
import pandas as pd

df_train = pd.read_csv("../../data/label_data/species/label_mapping_sampled.csv")
df_test = pd.read_csv("../../data/label_data/species/label_mapping_test.csv")

In [6]:
df_train['type'] = 'train'
df_test['type'] = 'test'

df = pd.concat([df_train, df_test])

In [None]:
class ResNetClassifier(nn.Module):
    def __init__(self, in_channels, num_classes=6, kernel_size=3):
        super(ResNetClassifier, self).__init__()
        # ResNet18 모델을 기반으로 사용
        self.transform = transform
        self.resnet = models.resnet18(weights=None)  # 사전 훈련 없이 초기화
        self.resnet.conv1 = nn.Conv2d(in_channels, 64, kernel_size=kernel_size, stride=1, padding=1, bias=False)  # 입력 채널 9개로 변경
        self.resnet.maxpool = nn.Identity()  # 5x5 입력이므로 MaxPooling 제거
        self.resnet.fc = nn.Linear(512, num_classes)  # 마지막 출력 뉴런을 클래스 수에 맞게 변경

    def forward(self, x):
        return self.resnet(x)

In [None]:
class FeedForwardBlock(nn.Module):
    def __init__(self, dim, expand_ratio=4, dropout=0.1):
        super(FeedForwardBlock, self).__init__()
        hidden_dim = dim * expand_ratio
        self.fc1 = nn.Linear(dim, hidden_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(hidden_dim, dim)
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.dropout(out)
        out = self.fc2(out)
        return out

class ResidualBlock(nn.Module):
    def __init__(self, dim, expand_ratio=4, dropout=0.1):
        super(ResidualBlock, self).__init__()
        self.norm = nn.LayerNorm(dim)
        self.ffn = FeedForwardBlock(dim, expand_ratio, dropout)
    
    def forward(self, x):
        out = self.norm(x)
        out = self.ffn(out)
        return x + out  # Skip Connection

class DNNClassifier(nn.Module):
    def __init__(self, input_dim=10, hidden_dim=128, output_dim=6, num_layers=10, expand_ratio=4, dropout=0.1):
        super(DNNClassifier, self).__init__()
        self.input_layer = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.layers = nn.Sequential(*[ResidualBlock(hidden_dim, expand_ratio, dropout) for _ in range(num_layers)])
        self.output_layer = nn.Linear(hidden_dim, output_dim)
    
    def forward(self, x):
        x = self.input_layer(x)
        x = self.relu(x)
        x = self.layers(x)
        x = self.output_layer(x)
        return x


In [None]:
class TransformerClassifier(nn.Module):
    def __init__(self, input_dim=10, embed_dim=16, num_heads=2, num_layers=2, num_classes=6, seq_len=12, dropout=0.1):
        super(TransformerClassifier, self).__init__()

        # Linear layer for embedding input_dim -> embed_dim
        self.input_fc = nn.Linear(input_dim, embed_dim)
        
        # Transformer Encoder Layer 설정 (batch_first=True)
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,  # 입력의 임베딩 차원
            nhead=num_heads,  # Multi-head Attention의 head 개수
            dropout=dropout,
            batch_first=True  # 배치 차원이 첫 번째로 오도록 설정
        )
        
        # Transformer Encoder
        self.transformer_encoder = nn.TransformerEncoder(
            self.encoder_layer, num_layers=num_layers  # Encoder Layer의 수
        )
        
        # 마지막 출력에서 분류를 위한 Linear Layer
        self.fc = nn.Linear(embed_dim, num_classes)
        
        #가중치 초기화
        self._init_weights()
        
    def forward(self, x):
        # 입력을 임베딩 차원으로 변환
        x = self.input_fc(x)
        
        # Transformer Encoder 통과
        transformer_out = self.transformer_encoder(x)
        
        # transformer_out의 마지막 시퀀스에 해당하는 출력을 가져옵니다.
        # (batch, seq_len, embed_dim) -> (batch, embed_dim)
        out = transformer_out[:, -1, :]
        
        # FC Layer를 통한 분류
        out = self.fc(out)
        
        return out
    
    def _init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.TransformerEncoderLayer):
                nn.init.xavier_uniform_(m.linear1.weight)
                nn.init.xavier_uniform_(m.linear2.weight)
                if m.linear1.bias is not None:
                    nn.init.zeros_(m.linear1.bias)
                if m.linear2.bias is not None:
                    nn.init.zeros_(m.linear2.bias)


In [None]:
import torch
import torch.nn.functional as F
import math

def get_gaussian_kernel(kernel_size, sigma):
    """
    2D Gaussian Kernel 생성
    """
    ax = torch.arange(kernel_size, dtype=torch.float32) - (kernel_size - 1) / 2
    xx, yy = torch.meshgrid(ax, ax, indexing='ij')
    kernel = torch.exp(-(xx**2 + yy**2) / (2 * sigma**2))
    kernel /= kernel.sum()
    return kernel

class DownsampleWithGaussian:
    def __init__(self, kernel_size: int, sigma: float = None):
        """
        kernel_size: 가우시안 필터 크기
        sigma: 가우시안 필터의 표준편차 (None일 경우 기본값 자동 설정)
        """
        self.kernel_size = kernel_size
        if sigma is None:
            sigma = 0.3 * ((kernel_size - 1) * 0.5 - 1) + 0.8  # 일반적인 디폴트 값
        self.sigma = sigma
        
        # 가우시안 커널 생성
        kernel = get_gaussian_kernel(kernel_size, sigma)
        self.kernel = kernel.view(1, 1, kernel_size, kernel_size)  # (1, 1, K, K)
    
    def __call__(self, x):
        """
        x: Tensor of shape (B, T, H, W)
        Returns: Downsampled Tensor of shape (B, T, H//kernel_size, W//kernel_size)
        """
        B, T, H, W = x.shape
  
        if H % self.kernel_size != 0 or W % self.kernel_size != 0:
            raise ValueError(f"Input size ({H}, {W}) must be divisible by kernel_size {self.kernel_size}")
        
        # 채널 차원 추가 후 Gaussian Blur 적용
        x = x.contiguous().view(B * T, 1, H, W)  # (B*T, 1, H, W)
        x_blurred = F.conv2d(x, self.kernel.to(x.device), stride=self.kernel_size, padding=0)  # (B*T, 1, H//K, W//K)
        
        return x_blurred.view(B, T, H // self.kernel_size, W // self.kernel_size)



In [None]:
def train_model(model, train_loader, criterion, optimizer, num_epochs, scheduler=None):
    
    if patience == None:
        patience= num_epochs
        
    train_losses = []
    val_losses = []
    best_val_f1_score = 0
    best_model_state = model.state_dict()  # 초기 모델 가중치 저장
    no_improve_count = 0

    for epoch in range(num_epochs):
        # Training Phase
        model.train()
        train_running_loss = 0.0
        
        train_labels = []
        train_predictions = []

        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_running_loss += loss.item() * labels.size(0)  # 배치별 loss * 개수로 전체 손실 계산
            _, predicted = torch.max(outputs, 1)
            
            train_labels.extend(labels.cpu().numpy())
            train_predictions.extend(predicted.cpu().numpy())
            
        train_loss = train_running_loss / len(train_loader.dataset)  # 전체 샘플 수로 나눔
        train_losses.append(train_loss)
        
        full_class_labels = np.arange(outputs.shape[1])
        train_report = classification_report(train_labels, train_predictions, labels=full_class_labels, output_dict=True)
        
        print(f"\nEpoch [{epoch+1}/{num_epochs}], "
              f"Train Loss: {train_loss:.4f}, Train Accuracy: {train_report['accuracy']:.2f}, Train f1-score: {train_report["macro avg"]["f1-score"]:.2f} ")       
       
        if scheduler:
            if isinstance(scheduler, torch.optim.lr_scheduler.ReduceLROnPlateau):
                scheduler.step(val_loss)  # validation loss 기준
            else:
                scheduler.step()  # 일반적인 step()
    
    return best_model_state, train_losses, val_losses

In [7]:
df

Unnamed: 0,label,x_pos,y_pos,file,box_number,tree_species,type
0,4,1060,1063,jiri_1.tif,23,QM,train
1,4,463,997,jiri_1.tif,22,QM,train
2,4,889,1147,jiri_1.tif,33,QM,train
3,4,652,298,jiri_1.tif,2,QM,train
4,4,388,634,jiri_1.tif,12,QM,train
...,...,...,...,...,...,...,...
14394,5,1078,568,sobaek.tif,13,QV,test
14395,5,1126,505,sobaek.tif,14,QV,test
14396,5,190,430,sobaek.tif,11,QV,test
14397,5,3535,751,sobaek.tif,30,QV,test


In [9]:
df.to_csv("../../data/label_data/species/label_mapping_concated.csv", encoding='utf-8-sig', index=False)

In [11]:
len(df),len(df_train),len(df_test), len(df_train) + len(df_test)

(242500, 228101, 14399, 242500)

In [12]:
num_epochs = 50
patience=None
bands = 10
patch_size = 9
time_idx = 4 #5월초
large_tif_dir = '../../data/source_data/with_s2' #원천데이터 주소

In [13]:
kernel_size = 9

In [17]:
train_dataset = TiffDataset(
            large_tif_dir = large_tif_dir,
            file_list = ["jiri_1.tif", "jiri_2.tif", "sobaek.tif"], #전체 지역을 모두 사용한다.
            label_file = "../../data/label_data/species/label_mapping_sampled.csv",
            box_filter_fn = lambda box_number: not val_filter(box_number),
            patch_size = patch_size,
            transform=transform
        )

val_dataset = TiffDataset(
            large_tif_dir = large_tif_dir,
            file_list = ["jiri_1.tif", "jiri_2.tif", "sobaek.tif"], #전체 지역을 모두 사용한다.
            label_file ="../../data/label_data/species/label_mapping_sampled.csv",
            box_filter_fn = val_filter,
            patch_size = patch_size,
            transform=transform
        )

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

NameError: name 'val_filter' is not defined

In [16]:
transform = transforms.Compose([
    base_transform(bands, patch_size),
    transforms.Lambda(lambda x: x[:,time_idx])
])

train_dataset.transform = transform
val_dataset.transform = transform

NameError: name 'train_dataset' is not defined

In [None]:
model = ResNetClassifier(in_channels=10).to(device)
model.load_state_dict(torch.load(f"./checkpoints/spectral/resnet_{bands}_{patch_size}_{kernel_size}_{num_epochs}.pth"))
# 🔹 Loss function (Categorical Classification)
criterion = nn.CrossEntropyLoss()

# 🔹 Optimizer (AdamW with weight decay)
optimizer = optim.AdamW(model.parameters(), lr=1e-3, weight_decay=1e-4)

# # 🔹 Learning Rate Scheduler (StepLR: 10 epochs마다 lr 0.1배 감소)
# scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

In [None]:
# 전체 파라미터 수 (학습 가능한 파라미터와 학습 불가능한 파라미터 포함)
total_params = sum(p.numel() for p in model.parameters())

# 학습 가능한 파라미터 수 (requires_grad == True인 파라미터)
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print("전체 파라미터 수:", total_params)
print("학습 가능한 파라미터 수:", trainable_params)

In [None]:
best_model_state, train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, patience=patience)
torch.save(best_model_state, os.path.join(checkpoints_dir, f"resnet_{bands}_{patch_size}_{kernel_size}_{num_epochs}.pth"))
model.load_state_dict(best_model_state)

print("\ntrain data")
evaluate_model_with_cm(model, train_loader, num_classes=6)
print("\nvalidation data")
evaluate_model_with_cm(model, val_loader, num_classes=6)

In [None]:
# ✅ 메모리 정리
del model
torch.cuda.empty_cache()

In [None]:
def reshape_transform(x):
    x = x.squeeze(-1).squeeze(-1)  # (10, 12, 1, 1) → (10, 12)
    x = x.permute(1, 0)  # (10, 12) → (12, 10)
    return x

transform = transforms.Compose([
    base_transform(bands, patch_size),
    DownsampleWithGaussian(kernel_size, sigma=3),
    transforms.Lambda(reshape_transform)
])

train_dataset.transform = transform
val_dataset.transform = transform

In [None]:
model =  TransformerClassifier(input_dim=10, embed_dim=16, num_heads=2, num_layers=2, num_classes=6, seq_len=12).to(device)

# 🔹 Loss function (Categorical Classification)
criterion = nn.CrossEntropyLoss()

# Optimizer 설정
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)

In [None]:
best_model_state, train_losses, val_losses = train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs, patience=patience)
torch.save(best_model_state, os.path.join(checkpoints_dir, f"transformer_{bands}_{patch_size}_{num_epochs}.pth"))
model.load_state_dict(best_model_state)

print("\ntrain data")
evaluate_model_with_cm(model, train_loader, num_classes=6)
print("\nvalidation data")
evaluate_model_with_cm(model, val_loader, num_classes=6)