In [None]:
"""
** test only **
parameters:
    csv_data: all data
    signal_len = 1*22051
    record_len = 26
"""

In [1]:
import json, tempfile
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm import tqdm
from pathlib import Path

import torch
from torch.utils.data import Dataset, DataLoader

In [2]:
data_dir1 = Path(r'..\dataset\test\npz_fft_public')
data_dir2 = Path(r'..\dataset\test\npz_fft_private')
data_json1 = r'..\dataset\test\data_list_public_20230511.json'
data_json2 = r'..\dataset\test\data_list_private_20230517.json'

In [3]:
with open(data_json1) as f:
    data_files1 = json.load(f)['test']
with open(data_json2) as f:
    data_files2 = json.load(f)['test']
print(len(data_files1), len(data_files2))

500 500


In [4]:
test_files = []
for v in data_files1:
    test_files.append(v)
for v in data_files2:
    test_files.append(v)

In [5]:
signal_len = 1*22051 # length of signal
record_len = 26 # length of record

def z_score(x):
    return (x-x.mean())/x.std()

def read_csv_data(csv_data):
    record = []
    for k, v in csv_data.items():
        if k == 'ID':
            continue
        elif k == 'Sex':
            v = float(v)-1 # 1~2 to 0~1
        elif k == 'Age':
            v = float(v)/50
        elif k == 'Voice handicap index - 10':
            v = float(v)/40
        else:
            v = float(v)
        record.append(v)
    record = z_score(np.array(record)).astype(np.float32)
    
    return record

def read_npz_file(file):
    npz_data = np.load(file, allow_pickle=True)
    signal = z_score(npz_data['signal']).astype(np.float32)
    record = read_csv_data(npz_data['csv_data'].item())
    
    return (signal, record)

def cut_pad_signal(signal, length=signal_len, mode='middle'):
    if len(signal) > length:
        cut_len = len(signal)-length
        if mode == 'middle': # cut from middle
            mid = cut_len//2
            return signal[mid:mid+length]
        elif mode == 'random': # random cut
            rand = np.random.randint(cut_len)
            return signal[rand:rand+length]
    else:
        pad_len = length-len(signal)
        signal = np.pad(signal, (0, pad_len)) 
        return signal

In [6]:
test_info = {i: [] for i in ['input_data']}

for file in tqdm(data_files1):
    input_data = read_npz_file(data_dir1/file)
    test_info['input_data'].append(input_data)
for file in tqdm(data_files2):
    input_data = read_npz_file(data_dir2/file)
    test_info['input_data'].append(input_data)

100%|███████████████████████████████████████████████████████████████████████████████| 500/500 [00:03<00:00, 131.44it/s]
100%|███████████████████████████████████████████████████████████████████████████████| 500/500 [00:03<00:00, 135.53it/s]


In [7]:
# reload model
model_name = 'm202_MultiOutput_20230518_114418'

In [8]:
from models import m202_MultiOutput
model = m202_MultiOutput(record_len).cuda()
print(model_name)

weights_dir = Path('weights', model_name)
results_dir = Path('test_results', model_name)
results_dir.mkdir(parents=True, exist_ok=True)

m202_MultiOutput_20230518_114418


In [9]:
def start_evaluate(chosen):
    model.eval()
    results = []

    pbar = tqdm(test_data, unit='batch')
    for batch in pbar:
        inputs = batch

        with torch.no_grad():
            outputs = model(inputs[0], inputs[1])
            
        preds = torch.max(outputs[chosen], 1)[1]
        results += [p+1 for p in preds.tolist()]

    return results

In [10]:
class EvaluationDataset(Dataset):
    def __init__(self, data_type, input_data):
        self.data_type = data_type
        self.input_data = input_data
        
    def __getitem__(self, index):
        signal = self.input_data[index][0]
        record = self.input_data[index][1]
        if self.data_type == 'fixed': # fixed length
            signal = torch.tensor(cut_pad_signal(signal), dtype=torch.float32).cuda().unsqueeze(0)
        elif self.data_type == 'original': # original length
            signal = torch.tensor(signal, dtype=torch.float32).cuda().unsqueeze(0)
            
        record = torch.tensor(record, dtype=torch.float32).cuda()
        return (signal, record)
    
    def __len__(self):
        return len(self.input_data)

In [11]:
# fixed length results
batch_size = 64
test_data = DataLoader(EvaluationDataset('fixed', test_info['input_data']),
                       batch_size=batch_size)

In [12]:
weights_file = 'best_uar-0'
print(f'{weights_file}:')
model.load_state_dict(torch.load(weights_dir/f'{weights_file}.pth'))
results = start_evaluate(0)

df = pd.DataFrame(results, [f.split('.')[0] for f in test_files])
df.to_csv(results_dir/'fixed_length-chosen_outputs0.csv', header=False)

best_uar-0:


100%|███████████████████████████████████████████████████████████████████████████████| 16/16 [00:00<00:00, 16.37batch/s]
