# Package, module setting

In [None]:
import torch
from torch.utils.tensorboard import SummaryWriter
import sys
import time
import yaml
import torchvision
from torchvision import transforms, datasets

import numpy as np
import os
from sklearn import preprocessing
from torch import cuda
from torch.utils.data.dataloader import DataLoader
from tqdm import tqdm

In [None]:
sys.path.append('../')
from mymodels.resnet_base_network import ResNet18
from mydata.imageloader import MyDataset, psnrDataUnit

In [None]:
batch_size = 32
data_transforms = torchvision.transforms.Compose([transforms.ToTensor()])

config = yaml.load(open("../config/config.yaml", "r"), Loader=yaml.FullLoader)
writer = SummaryWriter()

# device = 'cpu'
device = 'cuda' if cuda.is_available() else 'cpu'
print(f"Training with: {device}")
if device=='cuda':
    torch.cuda.empty_cache()

# Train-data, Test-data
- shuffle the train data
- dataset
- dataloader

# Encoder loading
load encoder for both images

In [None]:
encoder = ResNet18(**config['network'])
output_feature_dim = encoder.projetion.net[0].in_features
print(output_feature_dim)

# Load encoded tensors

In [None]:
# load tensor array
train_load_path = "./tensors/runtotal/train.pt"
test_load_path = "./tensors/runtotal/test.pt"

loaded1 = torch.load(train_load_path)
loaded2 = torch.load(test_load_path)

x_train = loaded1['x']
y_train = loaded1['y']
x_test = loaded2['x']
y_test = loaded2['y']

print("Training data shape:", x_train.shape, y_train.shape)
print("Testing data shape:", x_test.shape, y_test.shape)

In [None]:
scaler = preprocessing.StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train).astype(np.float32)
x_test = scaler.transform(x_test).astype(np.float32)

In [None]:
def create_data_loaders_from_arrays(X_train, y_train, X_test, y_test):
    
    train = torch.utils.data.TensorDataset(X_train, y_train)
    train_loader = torch.utils.data.DataLoader(train, batch_size=64, shuffle=True)

    test = torch.utils.data.TensorDataset(X_test, y_test)
    test_loader = torch.utils.data.DataLoader(test, batch_size=1, shuffle=False)
    return train_loader, test_loader

In [None]:
train_loader, test_loader = create_data_loaders_from_arrays(torch.from_numpy(x_train), y_train, torch.from_numpy(x_test), y_test)

# MLP setting
number of hidden layers: 5

In [None]:
# Neural Network Class
class MyOne(torch.nn.Module):
    def __init__(self, D_in, H1, H2, H3, H4, H5, D_out):
        super().__init__()
        self.linear1 = torch.nn.Linear(D_in, H1)
        self.linear2 = torch.nn.Linear(H1, H2)
        self.linear3 = torch.nn.Linear(H2, H3)
        self.linear4 = torch.nn.Linear(H3, H4)
        self.linear5 = torch.nn.Linear(H4, H5)
        self.linear6 = torch.nn.Linear(H5, D_out)
    
    def forward(self, x):
        x = torch.nn.functional.relu(self.linear1(x))
        x = torch.nn.functional.relu(self.linear2(x))
        x = torch.nn.functional.relu(self.linear3(x))
        x = torch.nn.functional.relu(self.linear4(x))
        x = torch.nn.functional.relu(self.linear5(x))
        x = self.linear6(x)
        return x    

In [None]:
# logreg = LogisticRegression(output_feature_dim*2, 10)
# logreg = logreg.to(device)
mymo = MyOne(output_feature_dim*2, output_feature_dim*2, output_feature_dim*2,output_feature_dim*2, output_feature_dim*2, output_feature_dim, 1)
mymo = mymo.to(device)
# 모델의 state_dict 출력
print("Model's state_dict:")
for param_tensor in mymo.state_dict():
    print(param_tensor, "\t", mymo.state_dict()[param_tensor].size())

# Model training1

In [None]:
optimizer = torch.optim.Adam(mymo.parameters(), lr=3e-4)
# criterion = torch.nn.CrossEntropyLoss()
criterion = torch.nn.L1Loss()
eval_every_n_epochs = 10
first_epoch = 400
mymo.train()
# device = 'cuda' if cuda.is_available() else 'cpu'
# print(f"Training with: {device}")
for epoch in tqdm(range(first_epoch)):
#     train_acc = []
    for x, y in train_loader:

        x = x.to(device)
        y = y.to(device)
        
        # zero the parameter gradients
        optimizer.zero_grad() 
        
        logits = mymo(x)
        # predictions = torch.argmax(logits, dim=1)
        
        loss = criterion(logits, y.unsqueeze(1))
        
        loss.backward()
        optimizer.step()
    writer.add_scalar("Loss/train", loss, epoch)
writer.close()

In [None]:
test_result_diff = []

now = time.localtime()

mymo.eval()
for x, y in tqdm(test_loader):
    x = x.to(device)
    y = y.to(device)
    logits = mymo(x)
    ty = 1/float(y[0].item())
    oy = 1/logits[0].item()
    test_result_diff.append((ty, oy, abs(ty-oy)))

with open("result_%.2f_%02d%02d_%02d%02d.txt" %(1.0, now.tm_mon, now.tm_mday, now.tm_hour, now.tm_min), 'w') as f:
    for item1, item2, item3 in test_result_diff:
        f.write("sr_sum:%s, model_out:%s, diff(abs):%s\n" % (item1, item2, item3))    