In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
#!wget -O /content/drive/MyDrive/Vision.zip "https://www.dropbox.com/s/mamwc19rgy8wiv5/Vision.zip?dl=1"

In [3]:
#!unzip Vision.zip -d "/content/drive/MyDrive/Vision"

In [10]:
%cd /content/drive/MyDrive

/content/drive/MyDrive


In [None]:
import os
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
from tqdm import tqdm

# define root directory
ROOT = os.getcwd()
print(ROOT)

scenario = 'Scenario5'

dataset_dir = os.path.join(ROOT,"Vision")
scenario_dir = os.path.join(dataset_dir,scenario)
development_dir = os.path.join(scenario_dir, "development_dataset")
challenge_dir = os.path.join(scenario_dir, "challenge_dataset")

In [12]:
import random

seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available() :
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

In [13]:
# load data

import torchvision
import torchvision.transforms as transforms
from PIL import Image
from torch.utils.data import DataLoader, Dataset
import torchvision.transforms.functional as FT

def transform(image, flag) :
    mean = [0.485, 0.456, 0.406]
    std =  [0.229, 0.224, 0.225]

    if flag == 2 :      #test
        new_image = FT.resize(image,224)
        new_image = FT.to_tensor(new_image)
        new_image = FT.normalize(new_image, mean=mean, std=std)
    else :              #train, val
        new_image = FT.resize(image,224)
        new_image = FT.to_tensor(new_image)
        new_image = FT.normalize(new_image, mean=mean, std=std)

    return new_image

class DeepSense6G(Dataset):
    def __init__(self, data_folder, split, flag):
        self.split = split
        assert self.split in {'Scenario5','Scenario6','Scenario7','Scenario8','Scenario9'}

        self.data_folder = data_folder
        self.scenario_path = os.path.join(self.data_folder, split)
        self.development_path = os.path.join(self.scenario_path, 'development_dataset')
        self.challenge_path = os.path.join(self.scenario_path, 'challenge_dataset')

        self.flag = flag

        self.split = self.split.lower()
        if flag == 0 :      #train
            self.dataframe_path = os.path.join(self.development_path, self.split + '_dev_train.csv')
        elif flag == 1 :    #validation
            self.dataframe_path = os.path.join(self.development_path, self.split + '_dev_val.csv')
        elif flag == 2 :              #test
            self.dataframe_path = os.path.join(self.development_path, self.split + '_dev_test.csv')

        self.dataframe = pd.read_csv(self.dataframe_path)

        self.Images_path = self.dataframe['unit1_rgb_1'].values
        self.Beam_index = self.dataframe['beam_index_1'].values
        self.image_index = self.dataframe['index'].values
        self.Beam_pwr_path = self.dataframe['unit1_pwr_1'].values

    def __getitem__(self, i):

        self.img_rel_path = self.Images_path[i]

        if self.flag == 2 :     #test
            self.img_abs_path = os.path.join(self.development_path, self.img_rel_path)
        else :                  #train, validation
            self.img_abs_path = os.path.join(self.development_path, self.img_rel_path)
        
        image = Image.open(self.img_abs_path, mode='r')
        image = image.convert('RGB')

        beam_index = self.Beam_index[i]

        image = transform(image, self.flag)

        return image, beam_index

    def __len__(self):
        return len(self.Images_path)

In [None]:
# device initialize

DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(DEVICE)

## EfficientNet_v2

In [15]:
#!git clone https://github.com/hankyul2/EfficientNetV2-pytorch.git

In [None]:
%cd /content/drive/MyDrive/EfficientNetV2-pytorch

In [None]:
import torch

model = torch.hub.load('hankyul2/EfficientNetV2-pytorch', 'efficientnet_v2_m_in21k', pretrained=True, nclass=64)
model = model.to(DEVICE)
print(model)

In [None]:
#training
import time

batch_size = 32
scenario = 'Scenario5'

train_dataset = DeepSense6G(dataset_dir, split=scenario, flag=0)
val_dataset = DeepSense6G(dataset_dir, split=scenario, flag=1)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=1, shuffle=False) 

epochs = 50
best_loss = 100

optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
loss = torch.nn.CrossEntropyLoss().cuda()

train_acc_list = []
val_acc_list = []

for epoch in range(epochs) :
    start = time.time()
    print(f'Epoch {epoch}/{epochs}')

    #train
    train_loss = 0
    correct = 0

    model.train()
    for img, bi in tqdm(train_loader) :

        optimizer.zero_grad()
        y_pred = model(img.cuda())
        cost = loss(y_pred, bi.cuda())

        cost.backward()
        optimizer.step()
        train_loss += cost.item()

        pred = y_pred.data.max(1, keepdim=True)[1]
        correct += pred.cpu().eq(bi.data.view_as(pred)).sum()

    train_loss /= len(train_loader)
    train_acc = correct / len(train_loader.dataset)
    train_acc_list.append(train_acc)

    #validate
    val_loss = 0
    val_correct = 0

    with torch.no_grad() :
        model.eval()
        for img, bi in val_loader :
            y_pred = model(img.cuda())
            cost = loss(y_pred, bi.cuda())
            val_loss += cost.item()

            pred = y_pred.data.max(1, keepdim=True)[1]
            val_correct += pred.cpu().eq(bi.data.view_as(pred)).cpu().sum()
        
        val_loss /= len(val_loader)
        val_acc = val_correct / len(val_loader.dataset)
        val_acc_list.append(val_acc)

        if val_loss < best_loss :
            torch.save({
                'epoch' : epoch,
                'model' : model,
                'model_state_dict' : model.state_dict(),
                'optimizer_state_dict' : optimizer.state_dict(),
                'loss' : cost.item,
            }, os.path.join(dataset_dir, scenario + '/checkpoint/effs_bestCheckpoint.pth'))

            print(f'Epoch {epoch:05d}: val_loss improved from {best_loss:.5f} to {val_loss:.5f}, saving model to bestCheckPoint.pth')
            best_loss = val_loss
        else :
            print(f'Epoch {epoch:05d}: val_loss did not improve')
    print(f'{int(time.time() - start)}s - loss: {train_loss: .5f} - acc: {train_acc:.5f} - val_loss: {val_loss:.5f} - val_acc: {val_acc:.5f}')

In [None]:
## 저장되어있는 best checkpoint load

best_model = torch.load(os.path.join(dataset_dir, scenario + '/checkpoint/effm_bestCheckpoint.pth'))['model'] 
best_model.load_state_dict(torch.load(os.path.join(dataset_dir, scenario + '/checkpoint/effm_bestCheckpoint.pth'))['model_state_dict'])  

In [None]:
#test 

batch_size = 1

test_dataset = DeepSense6G(dataset_dir, split=scenario, flag=2)

test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

correct = 0
total = 0
total_results = []
gt = []

def top_k (gts, preds, k) :
    correct = 0

    for i in range(len(preds)) :
        gt = gts[i]
        pred = preds[i]
        correct += int(gt.item() in pred[:k])
    
    print(scenario, 'top', k, 'accuracy', 100 * correct / len(preds))

def mse (gts, preds) :
    mse = 0

    for i in range(len(preds)) :
        gt = gts[i].item()
        pred = preds[i][0]
        mse += (gt-pred)**2
    
    print(scenario, 'mse', mse/len(preds))

with torch.no_grad() :
    model.eval()
    for img, bi in test_loader :
        pred = model(img.cuda())
        sorted_result_idx = np.flip(np.array(np.argsort(pred.cpu())[0]))
        top3 = sorted_result_idx[:3]
        total_results.append(top3)
        gt.append(bi)

top_k(gt, total_results, 1)
top_k(gt, total_results, 2)
top_k(gt, total_results, 3)
mse(gt, total_results)