## radar self-learning angle estimation model

### package 불러오기 (환경 설정)

In [1]:
import torch 
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

from tqdm import tqdm
import numpy as np

from include.model import ConvNet
from include.dataset import RadarDataset 
from include.loss import radarLoss
from include.utils import preprocessing_rx_sig, postprocessing

  from .autonotebook import tqdm as notebook_tqdm


### 변수 설정

In [3]:
# 모델 파라미터
BATCH_SIZE = 50
BATCH_MOMENTM = 0.9
EPOCH = 1
lr = 5e-6   # 5e-6 
device = torch.device('cuda' if torch.cuda.is_available() else "cpu")

# 레이더 각도 변수
MIN_ANGLE, MAX_ANGLE, RESOLUTION = -60, 60, 10
NUM_CLASS = (MAX_ANGLE-MIN_ANGLE) // RESOLUTION + 1

print(NUM_CLASS)

13


### 데이터 불러오기 및 학습 형태로 전환

#### 데이터 불러오기

In [8]:
FIRST_IDX = 1
PATH_LIST = ['./data/23_12_22_data/data', './data/23_12_16_data/data']
SIMULATION_INCLUDE = False

# experiment data
for idx, path_val in enumerate(PATH_LIST, 1):
    if idx == FIRST_IDX:
        Rx_sig, angle = preprocessing_rx_sig(path=path_val, min_angle=MIN_ANGLE, 
                                             max_angle=MAX_ANGLE, resolution=RESOLUTION)
    else:
        Rx_sig_tmp, angle_tmp = preprocessing_rx_sig(path=path_val, min_angle=MIN_ANGLE, 
                                                     max_angle=MAX_ANGLE, resolution=RESOLUTION)
        Rx_sig = np.concatenate((Rx_sig, Rx_sig_tmp))
        angle = np.concatenate((angle, angle_tmp))
        
x_train, x_test, y_train, y_test = train_test_split(Rx_sig, angle, test_size=0.2, shuffle=True, random_state=2)
x_valid, x_test, y_valid, y_test = train_test_split(x_test, y_test, test_size=0.5, shuffle=True, random_state=2)

# Simulation data
if SIMULATION_INCLUDE:
    Rx_sig = np.load(f'./data/simul/total_output_COV_{RESOLUTION}_{MAX_ANGLE}.npy')
    angle  = np.load(f'./data/simul/total_output_angle_{RESOLUTION}_{MAX_ANGLE}.npy')
    angle = np.argmax(angle, axis=1)

    Rx_sig /= np.max(Rx_sig)    # normalization

    # concatenate experiment with simulation
    x_train = np.concatenate((x_train, Rx_sig))
    y_train = np.concatenate((y_train, angle))

# convert input shape - [real, imag, phase]
x_train = np.array([[(np.real(Rxx)), (np.imag(Rxx)), (np.angle(Rxx))] for Rxx in x_train])[:9700]
x_valid = np.array([[(np.real(Rxx)), (np.imag(Rxx)), (np.angle(Rxx))] for Rxx in x_valid])[:1200]
x_test  = np.array([[(np.real(Rxx)), (np.imag(Rxx)), (np.angle(Rxx))] for Rxx in x_test ])[:1200]

# print size
print('x_train :', len(x_train), '/ x_valid :', len(x_valid), '/ x_test :', len(x_test))

x_train : 9700 / x_valid : 1200 / x_test : 1200


#### 학습한 적합한 형태로 변환

In [9]:
train_set = RadarDataset(transform=torch.from_numpy, data=x_train, label=y_train)
valid_set = RadarDataset(transform=torch.from_numpy, data=x_valid, label=y_valid)
test_set = RadarDataset(transform=torch.from_numpy, data=x_test, label=y_test)

train_loader = DataLoader(dataset=train_set, batch_size=BATCH_SIZE, shuffle = True)
valid_loader = DataLoader(dataset=valid_set, batch_size=BATCH_SIZE, shuffle = False)
test_loader = DataLoader(dataset=test_set, batch_size=BATCH_SIZE, shuffle = False)

### 학습 준비 및 학습 

#### 학습 준비 - 모델 및 loss 함수와 opimizer 설정 

In [10]:
model = ConvNet(NUM_CLASS=NUM_CLASS, BATCH_MOMENTM=BATCH_MOMENTM).to(device)

criterion = radarLoss(MIN_ANGLE, MAX_ANGLE, RESOLUTION, device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

#### 학습 

In [11]:
# Train the model
total_train_step, total_valid_step = len(train_loader), len(valid_loader)
total_train_loss, total_valid_loss = [], []

for epoch in range(EPOCH):
    train_loss, val_loss = 0, 0
    train_tqdm = tqdm(train_loader, total=len(train_loader), leave=False)
     
    model.train()
    for i, (x_train, y_train) in enumerate(train_tqdm, 1):
        x_train = x_train.to(device)
        
        outputs = model(x_train)
        loss = criterion(outputs, x_train)
        optimizer.zero_grad()
        loss.backward()
        
        train_loss += loss.item()
        optimizer.step()
        
        train_tqdm.set_description(desc=f'Training Loss on Epoch [{epoch+1}/{EPOCH}]: {train_loss / i: .3f}')
        
    with torch.no_grad():
        for k, (x_valid, y_valid) in enumerate(valid_loader):  # data의 크기를 bath_size 만큼 나눈 것을 반복한다.
            x_valid = x_valid.to(device)
            
            val_output = model(x_valid)
            v_loss = criterion(outputs, x_valid)
            
            val_loss += v_loss.item()
    
    total_train_loss.append(train_loss/total_train_step)
    total_valid_loss.append(val_loss/total_valid_step)

    print (f'Epoch [{epoch+1}/{EPOCH}], Step [{i}/{total_train_step}], train Loss: {train_loss/total_train_step:.4f}, val Loss: {val_loss/total_valid_step:.4f}')

                                                                                       

Epoch [1/1], Step [194/194], train Loss: 0.7610, val Loss: 1.2606


### 결과 확인 

In [17]:
angle_range = torch.arange(MIN_ANGLE, MAX_ANGLE+1, RESOLUTION)
TOLERANCE_ANGLE = 10
ZERO_INDEX = NUM_CLASS // 2

#### 지표로 확인

In [18]:
model.eval()
with torch.no_grad():
    sum_correct = 0
    
    for batch_idx, (x_test, y_test) in enumerate(test_loader, 1):
        x_test = x_test.to(device)
        real = y_test
        outputs = model(x_test)
    
        # 조건 설정
        outputs[outputs < 0.1] = 0                                          # 1. 0.1보다 작으면 죽이기
        outputs[:, ZERO_INDEX][outputs[:, ZERO_INDEX] < 0.5] = 0            # 2. 0이 0.5보다 작으면 죽이기
        pred = postprocessing(outputs, NUM_CLASS, RESOLUTION, MIN_ANGLE)    # 3. top1보다 그 이외의 값의 합이 더 크면 수정
        
        # 성능 평가
        correct = np.sum(list(map(lambda x, y : np.abs(x-y) <= TOLERANCE_ANGLE, real, pred)))
        sum_correct += correct
        
    print(f'{round(sum_correct / (batch_idx * BATCH_SIZE), 2) * 100}%')

90.0%


#### 데이터로 확인

In [14]:
model.eval()
print_num = 5

with torch.no_grad():
    sum_correct = 0
    for batch_idx, (x_test, y_test) in enumerate(test_loader, 1):
        x_test = x_test.to(device)
        real = y_test
        outputs = model(x_test)
        
        values = outputs.topk(5)[0]
        idxs = outputs.topk(5)[1] * 10 - 60 
        
        # 조건 설정
        outputs[outputs < 0.1] = 0                                              # 1. 0.1보다 작으면 죽이기
        outputs[:, ZERO_INDEX][outputs[:, ZERO_INDEX] < 0.5] = 0                                  # 2. 0이 0.5보다 작으면 죽이기
        pred = postprocessing(outputs, NUM_CLASS, RESOLUTION, MIN_ANGLE)   # 3. top1보다 그 이외의 값의 합이 더 크면 수정
        
        correct = np.sum(list(map(lambda x, y : np.abs(x-y) <= TOLERANCE_ANGLE, real, pred)))
        sum_correct += correct
        
        # 출력
        print(np.round(values[:print_num].cpu().numpy().T, 3))
        print(idxs[:print_num].cpu().numpy().T); print()
        print('model:         ', idxs[:print_num, 0].cpu().numpy() * 1.)
        print('real:          ', y_test[:print_num].numpy().reshape(-1).T)
        print('postprocessing:', pred[:print_num])
        print(np.abs(np.subtract(pred, real)) <= TOLERANCE_ANGLE, end='\n\n')
        
    print(f'{round(sum_correct / (batch_idx * BATCH_SIZE), 2) * 100}%')

[[0.273 0.327 0.241 0.296 0.238]
 [0.166 0.229 0.174 0.269 0.228]
 [0.148 0.055 0.128 0.213 0.105]
 [0.132 0.052 0.038 0.038 0.08 ]
 [0.002 0.043 0.    0.017 0.049]]
[[-50 -50  10  50  30]
 [-60 -60   0  40  20]
 [-30 -10  20  60  10]
 [-40 -40  60  30   0]
 [-10 -30 -60  20 -50]]

model:          [-50. -50.  10.  50.  30.]
real:           [-47. -58.  11.  41.  21.]
postprocessing: [-47.0, -50, 15.0, 50.0, 20.0]
tensor([ True, False,  True, False,  True,  True,  True,  True,  True,  True,
         True, False,  True,  True, False,  True, False,  True, False,  True,
         True, False, False, False, False, False, False,  True,  True, False,
        False,  True,  True,  True,  True,  True,  True,  True,  True,  True,
        False, False, False,  True,  True, False,  True,  True, False,  True])

[[0.237 0.243 0.214 0.254 0.289]
 [0.17  0.173 0.137 0.225 0.172]
 [0.125 0.133 0.128 0.19  0.155]
 [0.041 0.043 0.105 0.089 0.138]
 [0.    0.    0.052 0.018 0.003]]
[[ 10  10   0 -30 -50]
 [ 

In [15]:
import time
month, days, hour, minute = time.localtime()[1:5]

SAVE_MODEL = True
SAVE_NAME = f'./model/model_{RESOLUTION}_{MAX_ANGLE}_{month}{days}{hour}{minute}.ckpt'

if SAVE_MODEL:
    torch.save(model.state_dict(), SAVE_NAME)