In [2]:
import torch


# Example usage
N, T, B = 2, 5, 16
in_features, out_features = B//4, 10

# create input tensor of shape [NxTxB/4]
x = torch.randn(N, T, in_features)

x = x.view(-1, x.shape[-1])


In [2]:
import torch

outputs = [{'test_frame_recall': torch.tensor(0.3688), 'test_doa_error': torch.tensor(float('nan'))}]
num_subsets = len(outputs)

for subset_idx in range(num_subsets):
    frame_recall = torch.stack([x['test_frame_recall'] for x in outputs[subset_idx]])
    doa_error = torch.stack([x['test_doa_error'] for x in outputs[subset_idx]])

TypeError: string indices must be integers

In [6]:
torch.stack([x['test_frame_recall'] for x in outputs])

tensor([0.3688])

In [4]:

# apply linear layer to each time step
x = torch.nn.Linear(in_features, out_features)(x)


In [5]:
x.shape

torch.Size([10, 10])

In [6]:

# reshape output tensor to [N x T x S]
x = x.view(*x.shape[:-1], -1)

In [7]:
x.shape

torch.Size([10, 10])

In [8]:
import torch
import torch.nn as nn
import torch.nn.utils.rnn as rnn_utils

class TimeDistributed(nn.Module):
    def __init__(self, in_features, out_features):
        super(TimeDistributed, self).__init__()
        self.linear = nn.Linear(in_features, out_features)

    def forward(self, x):
        # reshape input tensor to [N*T x B/4]
        x = x.view(-1, x.shape[-1])

        # apply linear layer to each time step
        x = self.linear(x)

        # reshape output tensor to [N x T x S]
        x = x.view(*x.shape[:-1], -1)

        return x


# Example usage
N, T, B = 2, 5, 16
in_features, out_features = B//4, 10

# create input tensor of shape [NxTxB/4]
x = torch.randn(N, T, in_features)

# pack input tensor and apply time-distributed linear layer
packed_x = rnn_utils.pack_padded_sequence(x, lengths=[T]*N, batch_first=True, enforce_sorted=False)
linear_layer = TimeDistributed(in_features, out_features)
y, _ = rnn_utils.pad_packed_sequence(linear_layer(packed_x)[0], batch_first=True)

# output tensor of shape [N x T x S]
print(y.shape)  # should be [2, 5, 10]


AttributeError: 'PackedSequence' object has no attribute 'view'

In [2]:
import torch
import torch.nn as nn
import torch.nn.utils.rnn as rnn_utils

class TimeDistributedFC(nn.Module):
    """
    Class for applying time distributed fully connected layers with input tensors 
    of shape  [N, T, in_features] where N is the batch_size, T is the number of time step,
    and in_features is the number of input features at each time step. 
    Output tensors will have shape [N, T, out_features], where out_features is the number 
    of output features at each time step.
    """
    def __init__(self, in_features, out_features):
        super(TimeDistributedFC, self).__init__()
        self.linear = nn.Linear(in_features, out_features)

    def forward(self, x):
        
        N,T = x.shape[0],x.shape[1]
        
        # reshape input tensor to [N*T x B/4]
        x = x.view(-1, x.shape[-1])

        # # apply linear layer to each time step
        # x = self.linear(x)

        # reshape output tensor to [N x T x S]
        x = x.view(N, T,-1)

        return x

In [3]:
# Example usage
N, T, B = 2, 5, 16
in_features, out_features = B//4, 10

# create input tensor of shape [NxTxB/4]
x = torch.randn(N, T, in_features)

# # pack input tensor and apply time-distributed linear layer
# packed_x = rnn_utils.pack_padded_sequence(x, lengths=[T]*N, batch_first=True, enforce_sorted=False)
linear_layer = TimeDistributedFC(in_features, out_features)
# y, _ = rnn_utils.pad_packed_sequence(linear_layer(packed_x)[0], batch_first=True)
y = linear_layer(x)

In [9]:
x[i,j,k]

tensor([ 0.3244,  0.4101, -1.5298,  0.9352])

In [4]:
import torch

N = 2
T = 20
directions_of_arrival = []
num_hypothesis = 10
        
for i in range(num_hypothesis) :
    directions_of_arrival.append(torch.randn(size = (N,T,2))) # Size [NxTx2]
    
hyp_stacked = torch.stack(directions_of_arrival, dim=-2)

In [17]:
N = 3
T = 20
directions_of_arrival = []
num_hypothesis = 10

directions_of_arrival = torch.randn(size = (N,T,2*num_hypothesis)) # Size [NxTx(2*num_hypothesis)]
hyps_splitted = torch.split(directions_of_arrival, [2 for i in range(num_hypothesis)], 2) #num_hypothesis-uples of elements of shape [N,T,2]
hyps_stacked = torch.stack([h for h in hyps_splitted], dim=2) #Tuples of elements of shape [N,T,num_hypothesis,2]

In [18]:
len(hyps_splitted)
hyps_splitted[0].shape
hyps_stacked.shape


torch.Size([3, 20, 10, 2])

In [2]:
import torch

source_activity_target_t = torch.tensor([[1, 0, 1], [0, 1, 0]])
direction_of_arrival_target_t = torch.tensor(
    [[[1, 2], [3, 4], [5, 6]], [[7, 8], [9, 10], [11, 12]]]
)

direction_of_arrival_target_t[source_activity_target_t == 0, :] = 1000

In [3]:
direction_of_arrival_target_t

tensor([[[   1,    2],
         [1000, 1000],
         [   5,    6]],

        [[1000, 1000],
         [   9,   10],
         [1000, 1000]]])

In [7]:
outputs = [{'test_frame_recall': torch.tensor(0.9925), 'test_doa_error': torch.tensor(16.2175)}, 
 {'test_frame_recall': torch.tensor(0.9950), 'test_doa_error': torch.tensor(10.0907)}, 
 {'test_frame_recall': torch.tensor(0.9950), 'test_doa_error': torch.tensor(14.7058)}, 
 {'test_frame_recall': torch.tensor(0.9862), 'test_doa_error': torch.tensor(8.9629)}, 
 {'test_frame_recall': torch.tensor(0.9613), 'test_doa_error': torch.tensor(16.7399)}, 
 {'test_frame_recall': torch.tensor(0.9975), 'test_doa_error': torch.tensor(16.7870)}, 
 {'test_frame_recall': torch.tensor(0.9775), 'test_doa_error': torch.tensor(10.8708)}, 
 {'test_frame_recall': torch.tensor(0.9962), 'test_doa_error': torch.tensor(12.1684)}, 
 {'test_frame_recall': torch.tensor(0.9912), 'test_doa_error': torch.tensor(14.0261)}, 
 {'test_frame_recall': torch.tensor(0.9900), 'test_doa_error': torch.tensor(12.9945)}]

In [12]:
type(outputs[0])

dict

In [9]:
results = {
            'model': [], 'dataset': [], 'fold_idx': [], 'subset_idx': [], 'frame_recall': [], 'doa_error': []
        }
frame_recall = torch.stack([x['test_frame_recall'] for x in outputs]).detach().cpu().numpy()
doa_error = torch.stack([x['test_doa_error'] for x in outputs]).detach().cpu().numpy()

num_sequences = len(frame_recall)

for seq_idx in range(num_sequences):
    results['model'].append('name')
    results['dataset'].append('data')
    results['fold_idx'].append(0)
    results['subset_idx'].append(0)
    results['frame_recall'].append(frame_recall[seq_idx])
    results['doa_error'].append(doa_error[seq_idx])

In [11]:
results['frame_recall']

[0.9925, 0.995, 0.995, 0.9862, 0.9613, 0.9975, 0.9775, 0.9962, 0.9912, 0.99]

In [14]:
import time
import warnings
from importlib.util import find_spec
from pathlib import Path
from typing import Any, Callable, Dict, List
from pytorch_lightning import Callback
from pytorch_lightning.loggers import Logger
from pytorch_lightning.utilities import rank_zero_only
from itertools import permutations
import numpy as np
import torch
from torch.nn.modules.loss import _Loss
import torch.nn.functional as F
from typing import Tuple

In [18]:
def compute_spherical_distance(y_pred: torch.Tensor,
                                   y_true: torch.Tensor) -> torch.Tensor:
        if (y_pred.shape[-1] != 2) or (y_true.shape[-1] != 2):
            assert RuntimeError('Input tensors require a dimension of two.')

        sine_term = torch.sin(y_pred[:, 0]) * torch.sin(y_true[:, 0])
        cosine_term = torch.cos(y_pred[:, 0]) * torch.cos(y_true[:, 0]) * torch.cos(y_true[:, 1] - y_pred[:, 1])

        return torch.acos(F.hardtanh(sine_term + cosine_term, min_val=-1, max_val=1))
    
batch = 2
num_hyps = 10
gts = torch.randn(batch,num_hyps,2)
hyps_stacked_t=torch.randn(batch,num_hyps,2)

In [19]:
hyps_stacked_t_m = hyps_stacked_t.view(-1,2)
gts_m = gts.view(-1,2)
diff = compute_spherical_distance(hyps_stacked_t_m,gts_m)
diff = diff.view(batch,num_hyps)
spatial_epes = diff.unsqueeze(2).unsqueeze(3).unsqueeze(4)

In [20]:
# V2
# hyps_stacked_t of shape (batch,num_hyps,2), gts of shape (batch, num_hyps, 2)
# Compute the diff tensor using tensor operations
sine_term = torch.sin(hyps_stacked_t[:, :, 0]) * torch.sin(gts[:, :, 0])  # (batch, num_hyps)
cosine_term = torch.cos(hyps_stacked_t[:, :, 0]) * torch.cos(gts[:, :, 0]) * torch.cos(gts[:, :, 1] - hyps_stacked_t[:, :, 1])  # (batch, num_hyps)
diff2 = torch.acos(torch.clamp(sine_term + cosine_term, min=-1, max=1))  # (batch, num_hyps)
spatial_epes2 = diff2.unsqueeze(2).unsqueeze(3).unsqueeze(4)  # (batch, num_hyps, 1, 1, 1) 

In [21]:
spatial_epes == spatial_epes2

tensor([[[[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]]],



        [[[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]],


         [[[True]]]]])