In [None]:
import os
import random
import timm
import torch
import albumentations as A
import pandas as pd
import numpy as np
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset

import cv2
import easyocr
from albumentations.pytorch import ToTensorV2
from torch.optim import Adam,NAdam
from PIL import Image
from tqdm import tqdm
from sklearn.metrics import accuracy_score, f1_score
from torch.amp import GradScaler,autocast
from torch.optim.lr_scheduler import CosineAnnealingLR
from sklearn.model_selection import StratifiedKFold
import pickle
import augraphy
from augraphy import AugraphyPipeline, NoiseTexturize,DirtyDrum,InkBleed,LightingGradient,SubtleNoise,BleedThrough, BadPhotoCopy


import torch.nn.functional as F
from torchvision import datasets, transforms
from timm import create_model

In [None]:
# 시드를 고정합니다.
SEED = 2024
os.environ['PYTHONHASHSEED'] = str(SEED)
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.benchmark = True

In [None]:
# device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# data config
data_path = '../data/'
# model config
model_name = 'efficientnet_b3' # 'resnet50' 'efficientnet-b0', ...
# training config
img_size = 300
LR = 1e-4
EPOCHS = 20
BATCH_SIZE = 64
num_workers = 64

In [None]:
class ImageDataset(Dataset):
    def __init__(self, csv, path, pipeline = None, transform=None, is_train = True):
       # CSV 파일에서 데이터 로드
        self.df = pd.read_csv(csv)
        self.df = self._fix_train_dataframe(self.df).values
        self.path = path
        self.pipeline = pipeline
        self.transform = transform
        self.is_train = is_train
        self.targets = self._fix_train_dataframe(pd.read_csv(csv))['target'].values
        
    
    def _fix_train_dataframe(self, df):
        train_df = df
        train_df.loc[train_df['ID'] == '45f0d2dfc7e47c03.jpg', 'target'] = 7  #from 3
        train_df.loc[train_df['ID'] == 'aec62dced7af97cd.jpg', 'target'] = 14 #from 3
        train_df.loc[train_df['ID'] == '8646f2c3280a4f49.jpg', 'target'] = 3  #from 7
        train_df.loc[train_df['ID'] == '1ec14a14bbe633db.jpg', 'target'] = 7  #from 14
        return train_df

    def __len__(self):
            return len(self.df)
            
    def __getitem__(self, idx):
        # 이미지 파일 경로 구성
 
        img_name, label = self.df[idx]
        img_path = os.path.join(self.path, img_name)
        image = cv2.imread(img_path, cv2.COLOR_BGR2RGB)
        #augraphy
        if self.pipeline:
            image = self.pipeline(image)
        # 변환 적용
        if self.transform:
            image = self.transform(image=np.array(image))['image']  # 'image=image'로 albumentations 호출
 
        return image, label

In [None]:
# test image 변환을 위한 transform 코드
tst_transform = A.Compose([
    A.Resize(height=img_size, width=img_size),
    A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    ToTensorV2(),
])

In [None]:
tst_dataset = ImageDataset(
    "../data/sample_submission.csv",
    "../data/test/",
    transform=tst_transform,
    is_train = False
)

In [None]:
tst_loader = DataLoader(
    tst_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=64,
    pin_memory=True
)

In [None]:
def load_model(load_path, filename, device='cuda'):
    """
    저장된 pkl 파일에서 모델을 불러옵니다.
    
    Args:
        load_path: 불러올 파일이 있는 디렉토리 경로
        filename: 불러올 파일 이름
        device: 모델을 로드할 디바이스
    
    Returns:
        loaded_model: 불러온 모델
    """
    # 전체 경로
    full_path = os.path.join(load_path, filename)
    
    # pkl 파일 로드
    with open(full_path, 'rb') as f:
        save_dict = pickle.load(f)
    
    # 동일한 구조의 모델 생성
    model = timm.create_model(
        save_dict['model_name'],
        pretrained=False,
        num_classes=save_dict['num_classes']
    )
    
    # 저장된 가중치 로드
    model.load_state_dict(save_dict['state_dict'])
    
    # 지정된 디바이스로 모델 이동
    model = model.to(device)
    
    print(f"Model loaded successfully from {full_path}")
    return model

In [None]:
def calculate_accuracy(df_true, df_pred):
    # 'target' 컬럼이 일치하는 행의 개수를 계산
    matching_count = (df_true['target'] == df_pred['target']).sum()
    # 전체 행 수로 나누어 정확도를 계산
    accuracy = matching_count / len(df_true)
    return accuracy
human_answer_df = pd.read_csv(os.path.join(data_path, 'human_answer.csv'))

In [None]:
class TestTimeAugmentation:
    def __init__(self, model, device='cuda'):
        self.model = model
        self.device = device
        self.model.eval()
        
        self.base_transform = A.Compose([
            A.Resize(height=img_size, width=img_size),
            A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ToTensorV2(),
        ])
        self.kernel = np.array([
                [0, -1, 0],
                [-1, 5, -1],
                [0, -1, 0]
            ])
        # 기존 코드의 transform들을 활용
        
    def denoise_and_sharpen(self, img):
        """노이즈 제거와 샤프닝을 적용하는 메소드"""
        try:
            # BGR to RGB (cv2.imread가 BGR로 읽어오므로)
            if len(img.shape) == 3:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            
            # 노이즈 제거            
            # 샤프닝
            
            denoised = cv2.fastNlMeansDenoisingColored(img, None, h=5, templateWindowSize=7, searchWindowSize=13)
            sharpened = cv2.filter2D(denoised, -1, self.kernel)
            denoised2 = cv2.fastNlMeansDenoisingColored(sharpened, None, h=5, templateWindowSize=7, searchWindowSize=1)
            return denoised2
            
        except Exception as e:
            print(f"Error in denoise_and_sharpen: {str(e)}")
            print(f"Image shape: {img.shape}, dtype: {img.dtype}")
            return img

    def predict_with_tta(self, dataset, loader):
        predictions = []
        image_ids = []
        
        with torch.no_grad():
            for batch_idx, (images, _) in enumerate(tqdm(loader, desc="TTA Inference")):
                batch_predictions = []
                
                # 현재 배치의 이미지 ID 저장
                start_idx = batch_idx * loader.batch_size
                end_idx = min((batch_idx + 1) * loader.batch_size, len(dataset))
                batch_image_ids = [dataset.df[i][0] for i in range(start_idx, end_idx)]
                image_ids.extend(batch_image_ids)
                
                # 1. 원본 이미지 예측
                imgs_gpu = images.to(self.device)
                outputs = self.model(imgs_gpu)
                batch_predictions.append(outputs.softmax(dim=1))
                
                # 2. 노이즈 제거 + 샤프닝 버전
                denoised_sharpened_batch = []
                for idx in range(start_idx, end_idx):
                    # 원본 이미지 다시 로드
                    img_name = dataset.df[idx][0]
                    img_path = os.path.join(dataset.path, img_name)
                    img = cv2.imread(img_path)
                    
                    # 노이즈 제거 및 샤프닝 적용
                    processed = self.denoise_and_sharpen(img)
                    # transform 적용
                    tensor_img = self.base_transform(image=processed)['image']
                    denoised_sharpened_batch.append(tensor_img)
                
                if denoised_sharpened_batch:  # 배치가 비어있지 않은 경우만
                    denoised_sharpened_batch = torch.stack(denoised_sharpened_batch).to(self.device)
                    outputs = self.model(denoised_sharpened_batch)
                    batch_predictions.append(outputs.softmax(dim=1))
                
                # 모든 예측값의 평균 계산
                batch_predictions = torch.stack(batch_predictions)
                avg_predictions = batch_predictions.mean(dim=0)
                predictions.extend(avg_predictions.argmax(dim=1).cpu().numpy())
        
        return predictions, image_ids

In [None]:
def run_inference_with_tta(model, dataset, loader, device):
    tta = TestTimeAugmentation(model, device)
    predictions, image_ids = tta.predict_with_tta(dataset, loader)
    
    # 결과를 DataFrame으로 변환
    pred_df = pd.DataFrame({
        'ID': image_ids,
        'target': predictions
    })
    
    return pred_df

In [None]:
model_path = '../output/models'
#output/night_sample/current_epoch_model_14.pkl
model = load_model(model_path, 'model_name','cuda')

model.eval()
# TTA 추론 실행
#pred_df_from_tta = run_inference_with_tta(model, tst_dataset, tst_loader, device)
preds_list = []
for image, _ in tqdm(tst_loader):
    image = image.to(device)
    with torch.no_grad():
        preds = model(image)
    preds_list.extend(preds.argmax(dim=1).detach().cpu().numpy())

pred_df = pd.DataFrame(tst_dataset.df, columns=['ID', 'target'])
pred_df['target'] = preds_list

In [None]:
pred_df.to_csv('../output/output/pred_output.csv', index=False)