In [1]:
from metrics import *
from utils import utility
from tslearn.metrics import SoftDTWLossPyTorch
from collections import defaultdict

import pandas as pd
import numpy as np
import torch
import scipy.signal as signal
import matplotlib.pyplot as plt

from dtw import dtw

Install h5py to use hdf5 features: http://docs.h5py.org/
  warn(h5py_msg)


Importing the dtw module. When using in academic works please cite:
  T. Giorgino. Computing and Visualizing Dynamic Time Warping Alignments in R: The dtw Package.
  J. Stat. Soft., doi:10.18637/jss.v031.i07.



In [2]:
LABEL = "S_norm"

In [3]:
def return_samples(LABEL, data):
    index = {"F_norm": [a for a in range(417,712)],
             "N_norm": [a for a in range(2,1002)],
             "Q_norm": [a for a in range(46,1133)],
             "S_norm": [a for a in range(1953, 2721)],
             "V_norm": [a for a in range(854, 1295)]}
    
    y = data[index[LABEL]]
    # y = y.reshape(y.shape[0], 1, y.shape[-1])
    
    return y
def batching(a, n):
    n = min(n, len(a))
    k, m = divmod(len(a), n)
    
    return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))


def loader(label, batch_size, resample):
    data = pd.read_pickle("../data/"+label+".pkl")
    
    X = np.array(data["beat"].to_list())
    X = signal.resample(X, resample, axis=1)
    X = return_samples(label, X)
    n = len(X) // batch_size 
    a = range(n*batch_size)
    # n = int(len(data)/batch_size)-1
    
    batches = list(batching(a, n))
    X = torch.Tensor(X)
    # X = torch.cat([X, torch.zeros((X.shape[0], 512-X.shape[1]))], dim=1)
    
    return X, batches

In [4]:
data, batches = loader(LABEL, 128, 256)
data= data.unsqueeze(1)
print(data.shape)

torch.Size([768, 1, 256])


In [5]:
class pipeline:
    def __init__(self, metric_size):
        self.utils = utility()
        self.dtw = SoftDTWLossPyTorch(gamma=0.01)
        self.metric_size = metric_size
        
    def process(self, input, samples):
        # Convert images once, not repeatedly
        input_imgs = self.utils.convert(input.squeeze(1))
        sample_imgs = self.utils.convert(samples.squeeze(1))

        batch_size, sample_size = input_imgs.shape[0], sample_imgs.shape[0]
        
        # Pre-allocate metrics tensor
        metrics = torch.zeros((batch_size, self.metric_size, sample_size), device=input.device)
        
        # Vectorize outer loop
        for s in range(sample_size):
            grayB = sample_imgs[s]  # Shared across batch
            # sigB = samples[s].reshape(1, -1, 1)  # Shared across batch
            sigB = samples[s]
            
            # Parallelize batch computation
            grayA = input_imgs  # All input images
            # sigA = input.reshape(batch_size, -1, 1)  # All input signals
            sigA = input
            
            # Precompute DTW for the entire batch
            # dtw_values = torch.stack([self.dtw(sigA[b].reshape(1,-1,1), sigB) for b in range(batch_size)])
            tmp = []
            for b in range(batch_size):
                align = dtw(sigA[b], sigB, dist_method='euclidean')
                tmp.append(torch.tensor(align.distance))
            dtw_values = torch.stack(tmp)
            
            # Compute all metrics for the batch
            metric_results = torch.stack([torch.tensor([UQI.process(grayA[b], grayB),
                                          VIFP.process(grayA[b], grayB),
                                          SCC.process(grayA[b], grayB),
                                          SAM.process(grayA[b], grayB),
                                          ERGAS.process(grayA[b], grayB),
                                          RASE.process(grayA[b], grayB),
                                          SIFT.process(grayA[b], grayB),
                                          SSIM.process(grayA[b], grayB),
                                          dtw_values[b]]) for b in range(batch_size)], dim=0)
            
            metrics[:, :, s] = metric_results

        # Take the mean across the sample dimension
        # metrics = metrics.mean(dim=2)
        return metrics
    
pip = pipeline(9)

In [6]:
# REAL DATA TO BE COMPARED WITH
REAL = data[np.random.randint(low = 0,high=data.shape[0], size=4)]

In [7]:
# Load real and generated Q signals => Real comparison

# a = data[np.random.randint(low = 0,high=data.shape[0], size=4)]
gen = data[np.random.randint(low = 0,high=data.shape[0], size=16)]


# # visualize these signals

# plt.figure()
# plt.plot(a.squeeze(1).transpose(0,1))
# plt.plot(b.squeeze(1).transpose(0,1))
# plt.savefig("tmp.png")

# metrics = pip.process(a,b)

summ = defaultdict(list)
for j in range(4):
    # a = data[np.random.randint(low = 0, high = data.shape[0], size = 4)]
    b = gen[(j*4):(4 + j*4)]
    
    metrics = pip.process(REAL,b)
    summ["UQI"] += torch.Tensor.tolist(metrics[:,0].reshape(1,-1))[0]
    summ["VIFP"] += torch.Tensor.tolist(metrics[:,1].reshape(1,-1))[0]
    summ["SCC"] += torch.Tensor.tolist(metrics[:,2].reshape(1,-1))[0]
    summ["SAM"] += torch.Tensor.tolist(metrics[:,3].reshape(1,-1))[0]
    summ["ERGAS"] += torch.Tensor.tolist(metrics[:,4].reshape(1,-1))[0]
    summ["RASE"] += torch.Tensor.tolist(metrics[:,5].reshape(1,-1))[0]
    summ["SIFT"] += torch.Tensor.tolist(metrics[:,6].reshape(1,-1))[0]
    summ["SSIM"] += torch.Tensor.tolist(metrics[:,7].reshape(1,-1))[0]
    summ["DTW"] += torch.Tensor.tolist(metrics[:,8].reshape(1,-1))[0]

summ = pd.DataFrame(summ)
print("Metrics of Real signals")
print(summ.mean())
print(summ.min())
print(summ.max())

Metrics of Real signals
UQI        0.992474
VIFP       0.041272
SCC        0.001082
SAM        0.093675
ERGAS      2.347319
RASE     187.248143
SIFT      32.000000
SSIM       0.933158
DTW       11.506093
dtype: float64
UQI       0.990990
VIFP      0.004690
SCC      -0.007167
SAM       0.000000
ERGAS     0.000000
RASE      0.000000
SIFT     16.000000
SSIM      0.918791
DTW       0.000000
dtype: float64
UQI        1.000000
VIFP       1.000000
SCC        0.057839
SAM        0.102736
ERGAS      2.574510
RASE     218.560822
SIFT     197.000000
SSIM       1.000000
DTW       21.518492
dtype: float64


In [8]:
# Load real and generated Q signals => Transformers 1d

# a = data[np.random.randint(low = 0,high=data.shape[0], size=4)]
gen = torch.load("../Transformer/"+LABEL[0]+"_1d.pt")


# # visualize these signals

# plt.figure()
# plt.plot(a.squeeze(1).transpose(0,1))
# plt.plot(b.squeeze(1).transpose(0,1))
# plt.savefig("tmp.png")

# metrics = pip.process(a,b)

summ = defaultdict(list)
for j in range(4):
    # a = data[np.random.randint(low = 0, high = data.shape[0], size = 4)]
    b = gen[(j*4):(4 + j*4)]
    
    metrics = pip.process(REAL,b)
    summ["UQI"] += torch.Tensor.tolist(metrics[:,0].reshape(1,-1))[0]
    summ["VIFP"] += torch.Tensor.tolist(metrics[:,1].reshape(1,-1))[0]
    summ["SCC"] += torch.Tensor.tolist(metrics[:,2].reshape(1,-1))[0]
    summ["SAM"] += torch.Tensor.tolist(metrics[:,3].reshape(1,-1))[0]
    summ["ERGAS"] += torch.Tensor.tolist(metrics[:,4].reshape(1,-1))[0]
    summ["RASE"] += torch.Tensor.tolist(metrics[:,5].reshape(1,-1))[0]
    summ["SIFT"] += torch.Tensor.tolist(metrics[:,6].reshape(1,-1))[0]
    summ["SSIM"] += torch.Tensor.tolist(metrics[:,7].reshape(1,-1))[0]
    summ["DTW"] += torch.Tensor.tolist(metrics[:,8].reshape(1,-1))[0]

summ = pd.DataFrame(summ)
print("Metrics of Transformer 1d generated signals")
print(summ.mean())
print(summ.min())
print(summ.max())

  gen = torch.load("../Transformer/"+LABEL[0]+"_1d.pt")


Metrics of Transformer 1d generated signals
UQI        0.991466
VIFP       0.027133
SCC        0.000172
SAM        0.098887
ERGAS      2.476627
RASE     198.162326
SIFT      21.734375
SSIM       0.930084
DTW       10.942024
dtype: float64
UQI        0.988922
VIFP       0.002722
SCC       -0.004153
SAM        0.079766
ERGAS      1.998655
RASE     140.051895
SIFT       4.000000
SSIM       0.911880
DTW        7.011832
dtype: float64
UQI        0.994453
VIFP       0.156498
SCC        0.010911
SAM        0.110959
ERGAS      2.776769
RASE     240.216599
SIFT      41.000000
SSIM       0.954267
DTW       19.708237
dtype: float64


In [9]:
# Load real and generated Q signals => Unet 1d

# a = data[np.random.randint(low = 0,high=data.shape[0], size=4)]
gen = torch.load("../Unet/"+LABEL[0]+"_1d.pt")


# # visualize these signals

# plt.figure()
# plt.plot(a.squeeze(1).transpose(0,1))
# plt.plot(b.squeeze(1).transpose(0,1))
# plt.savefig("tmp.png")

# metrics = pip.process(a,b)

summ = defaultdict(list)
for j in range(4):
    # a = data[np.random.randint(low = 0, high = data.shape[0], size = 4)]
    b = gen[(j*4):(4 + j*4)]
    
    metrics = pip.process(REAL,b)
    summ["UQI"] += torch.Tensor.tolist(metrics[:,0].reshape(1,-1))[0]
    summ["VIFP"] += torch.Tensor.tolist(metrics[:,1].reshape(1,-1))[0]
    summ["SCC"] += torch.Tensor.tolist(metrics[:,2].reshape(1,-1))[0]
    summ["SAM"] += torch.Tensor.tolist(metrics[:,3].reshape(1,-1))[0]
    summ["ERGAS"] += torch.Tensor.tolist(metrics[:,4].reshape(1,-1))[0]
    summ["RASE"] += torch.Tensor.tolist(metrics[:,5].reshape(1,-1))[0]
    summ["SIFT"] += torch.Tensor.tolist(metrics[:,6].reshape(1,-1))[0]
    summ["SSIM"] += torch.Tensor.tolist(metrics[:,7].reshape(1,-1))[0]
    summ["DTW"] += torch.Tensor.tolist(metrics[:,8].reshape(1,-1))[0]

summ = pd.DataFrame(summ)
print("Metrics of Unet1d generated signals")
print(summ.mean())
print(summ.min())
print(summ.max())

  gen = torch.load("../Unet/"+LABEL[0]+"_1d.pt")


Metrics of Unet1d generated signals
UQI        0.976983
VIFP       0.020747
SCC       -0.000156
SAM        0.140441
ERGAS      3.509854
RASE     310.643261
SIFT      11.468750
SSIM       0.910642
DTW       33.832653
dtype: float64
UQI        0.974240
VIFP       0.000248
SCC       -0.002709
SAM        0.125852
ERGAS      3.150434
RASE     256.767670
SIFT       1.000000
SSIM       0.899212
DTW       21.565685
dtype: float64
UQI        0.980349
VIFP       0.101061
SCC        0.005315
SAM        0.149260
ERGAS      3.728685
RASE     343.987671
SIFT      25.000000
SSIM       0.929541
DTW       46.249146
dtype: float64


In [10]:
# Load real and generated Q signals => UnetAttn

# a = data[np.random.randint(low = 0,high=data.shape[0], size=4)]
gen = torch.load("../UnetAttn/"+LABEL[0]+"_1d.pt")

# # visualize these signals

# plt.figure()
# plt.plot(a.squeeze(1).transpose(0,1))
# plt.plot(b.squeeze(1).transpose(0,1))
# plt.savefig("tmp.png")

# metrics = pip.process(a,b)

summ = defaultdict(list)
for j in range(4):
    # a = data[np.random.randint(low = 0, high = data.shape[0], size = 4)]
    b = gen[(j*4):(4 + j*4)]
    
    metrics = pip.process(REAL,b)
    summ["UQI"] += torch.Tensor.tolist(metrics[:,0].reshape(1,-1))[0]
    summ["VIFP"] += torch.Tensor.tolist(metrics[:,1].reshape(1,-1))[0]
    summ["SCC"] += torch.Tensor.tolist(metrics[:,2].reshape(1,-1))[0]
    summ["SAM"] += torch.Tensor.tolist(metrics[:,3].reshape(1,-1))[0]
    summ["ERGAS"] += torch.Tensor.tolist(metrics[:,4].reshape(1,-1))[0]
    summ["RASE"] += torch.Tensor.tolist(metrics[:,5].reshape(1,-1))[0]
    summ["SIFT"] += torch.Tensor.tolist(metrics[:,6].reshape(1,-1))[0]
    summ["SSIM"] += torch.Tensor.tolist(metrics[:,7].reshape(1,-1))[0]
    summ["DTW"] += torch.Tensor.tolist(metrics[:,8].reshape(1,-1))[0]

summ = pd.DataFrame(summ)
print("Metrics of Unet1d Attn generated signals")
print(summ.mean())
print(summ.min())
print(summ.max())

  gen = torch.load("../UnetAttn/"+LABEL[0]+"_1d.pt")


Metrics of Unet1d Attn generated signals
UQI        0.983970
VIFP       0.014356
SCC       -0.000001
SAM        0.123602
ERGAS      3.090588
RASE     261.918316
SIFT      16.406250
SSIM       0.915315
DTW       32.251479
dtype: float64
UQI        0.982515
VIFP       0.002352
SCC       -0.003761
SAM        0.110693
ERGAS      2.768909
RASE     217.692535
SIFT       4.000000
SSIM       0.904588
DTW       15.138026
dtype: float64
UQI        0.986733
VIFP       0.077286
SCC        0.003091
SAM        0.129403
ERGAS      3.235091
RASE     286.168335
SIFT      29.000000
SSIM       0.934104
DTW       42.551773
dtype: float64
