In [None]:
import os
import gc
import cv2
import time
import random
import glob
from PIL import Image
import  matplotlib.pyplot as plt

# For data manipulation
import numpy as np
import pandas as pd

# Pytorch Imports
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.optim import lr_scheduler
from torch.utils.data import Dataset, DataLoader
from torch.cuda import amp
from pytorch_toolbelt import losses as L

# Utils
from tqdm.auto import tqdm

# For Image Models
import timm

# Albumentations for augmentations
import albumentations as A
from albumentations.pytorch import ToTensorV2

## using gpu:1
os.environ['CUDA_VISIBLE_DEVICES'] = '0'

def seed_everything(seed=123):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
seed_everything()

In [None]:
class Customize_Model(nn.Module):
    def __init__(self, model_name, cls):
        super().__init__()
        
    def forward(self, image):
        x = self.model(image)
        return x

In [None]:
def get_test_transform(img_size):
    return A.Compose([
        A.Resize(img_size, img_size),
        ToTensorV2(p=1.0),
    ])

def get_resize_transform(height, width):
    return A.Compose([
        A.Resize(width, height, p=1),
    ])


def read_video(path):
    imgs= []
    cap= cv2.VideoCapture(path)
    while cap.isOpened():
        ret, img = cap.read()
        if not ret: break
        img= cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        imgs.append(img)
    ## resize z-axis
    imgs= np.array(imgs)
    imgs= get_resize_transform(imgs.shape[1], CFG["depth"])(image= imgs)['image']
    
    return np.array(imgs) ## (img_len, H, W)


class Customize_Dataset(Dataset):
    def __init__(self, df, transforms=None, is_train=True):
        self.df = df
        self.transforms = transforms
        self.is_train= is_train
    
    def __getitem__(self, index):
        data = self.df.loc[index]
        img= read_video(data['image_path'])
        img = img.transpose(1,2,0)
        
        if self.transforms:
            trans= self.transforms(image=img)
            img= trans["image"]
        
        ## convert to 3 channel
        img= img.unsqueeze(dim=0)
        img= img.expand(3,img.shape[1],img.shape[2],img.shape[3])
        
        return {
            'image': torch.tensor(img/255, dtype=torch.float32),
        }
    
    def __len__(self):
        return len(self.df)

# CFG

In [None]:
CFG= {
    'img_size': 512,
    'depth': 32,
    
    'TTA': 2,  ## disable TTA= 1
    'model': [
        './test_model/ball_type/csn_img512_d32/cv0_best.pth',
        './test_model/ball_type/csn_img512_d32/cv2_best.pth',
    ]
}
CFG['model']= [ torch.load(m, map_location= 'cuda:0') for m in CFG['model'] ]
print(f"length of model: {len(CFG['model'])}")

# Prepare Dataset

In [None]:
paths= glob.glob('Data/val_test_balltype_landing/**/*mp4', recursive=True)
paths= sorted(paths, key= lambda x:int(x.split('_')[-1].split('.')[0]))
test_df= pd.DataFrame()
test_df['image_path']= paths
print(f'valid dataset: {len(test_df)}')

valid_dataset= Customize_Dataset(test_df.iloc[:].reset_index(drop=True), get_test_transform(CFG['img_size']))
valid_loader= DataLoader(valid_dataset, batch_size=1, shuffle=False, num_workers=0)
test_df.head()

In [None]:
def inference(model, img):
    
    img= torch.unsqueeze(img, 0).cuda()
    for i, m in enumerate(model):
        with torch.no_grad():
            m.eval()
            imgs= torch.cat([img, img.flip(-1), img.flip(-2), img.flip(-1).flip(-2)], dim=0)
            pred= m(imgs[:CFG['TTA']])
            pred= pred.mean(dim=0)
                
        if i==0: preds= pred[:-2].softmax(dim=-1)
        else: preds+= pred[:-2].softmax(dim=-1)
            
    pred= preds/len(model)
    pred= pred.cpu().numpy().tolist()
    return pred

In [None]:
submision= pd.read_csv('submission/submission_val_test.csv')
count= 0
for i, data in enumerate(tqdm(valid_loader)):
    for j in range(len(data['image'])):
        imgs= data['image'][j]
        pred= inference(CFG['model'], imgs)
        
        submision.loc[count, 'BallType']= int( np.array(pred).argmax(0) )
        count+= 1
    
submision.head()

In [None]:
submision.to_csv('submission.csv', index=False)
submision