In [1]:
import os
import tarfile
import urllib.request
import librosa
import numpy as np
import pickle
import HDcompute as hd
import torchaudio
import torch

In [None]:
import os
import tarfile
import urllib.request
import librosa
import numpy as np
import pickle

# URL of the dataset
url = "http://download.tensorflow.org/data/speech_commands_v0.02.tar.gz"
output_path = "speech_commands_v0.02.tar.gz"
extract_dir = "speech_commands"


# Define the 10 core words to include
CORE_WORDS = {'yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go'}

# Download the dataset
if not os.path.exists(output_path):
    print("Downloading dataset...")
    urllib.request.urlretrieve(url, output_path)
else:
    print("Dataset already downloaded.")

# Extract the dataset
if not os.path.exists(extract_dir):
    print("Extracting dataset...")
    with tarfile.open(output_path, "r:gz") as tar:
        tar.extractall(path=extract_dir)
else:
    print("Dataset already extracted.")

# Parameters
DATA_DIR = extract_dir  # Path to the dataset
DATA_DIR = os.path.abspath(extract_dir)  # Convert to absolute path

TARGET_SR = 16000  # Sampling rate
FIXED_LENGTH = TARGET_SR  # 1 second of audio at 16kHz

# Function to process a single audio file
def process_audio(file_path, target_sr=TARGET_SR, fixed_length=FIXED_LENGTH):
    # Load audio file
    audio, sr = librosa.load(file_path, sr=target_sr)

        # Normalize amplitude
    if np.max(np.abs(audio)) > 0:  # Avoid division by zero
        audio = audio / np.max(np.abs(audio))

    # Adjust length (truncate or pad with zeros)
    if len(audio) > fixed_length:
        audio = audio[:fixed_length]
    elif len(audio) < fixed_length:
        audio = np.pad(audio, (0, fixed_length - len(audio)), mode='constant')
    return audio

# Load all signals and labels for the 10 core words
def load_signals(data_dir=DATA_DIR, core_words=CORE_WORDS):
    signals = []
    labels = []
    label_map = {label: idx for idx, label in enumerate(sorted(core_words))}
    for label in core_words:
        label_dir = os.path.join(data_dir, label)
        if os.path.isdir(label_dir):
            for file in os.listdir(label_dir):
                if file.endswith(".wav"):
                    file_path = os.path.join(label_dir, file)
                    signal = process_audio(file_path)
                    signals.append(signal)
                    labels.append(label_map[label])
    return np.array(signals), np.array(labels), label_map


# Load the dataset
signals, labels, label_map = load_signals()

# Print shape of data
print(f"Signals shape: {signals.shape}")  # (num_samples, fixed_length)
print(f"Labels shape: {labels.shape}")    # (num_samples,)
print(f"Label map: {label_map}")

# Save signals, labels, and label_map
with open("speech_commands_signals_core_words.pkl", "wb") as f:
    pickle.dump((signals, labels, label_map), f)

In [2]:
import pickle
# Load it back later
with open("speech_commands_signals_core_words.pkl", "rb") as f:
    signals, labels, label_map = pickle.load(f)


In [None]:
def bandpass_filter(signal, low=300, high=8000, sr=16000):
    """
    Apply a band-pass filter to focus on speech frequencies.
    Args:
        signal (torch.Tensor): Audio signal of shape (num_samples, signal_length).
        low (int): Lower cutoff frequency in Hz.
        high (int): Upper cutoff frequency in Hz.
        sr (int): Sampling rate in Hz.
    Returns:
        torch.Tensor: Filtered audio signal.
    """
    from torchaudio.functional import lowpass_biquad, highpass_biquad
    signal = highpass_biquad(signal, sr, low)
    signal = lowpass_biquad(signal, sr, high)
    return signal
signals = torch.tensor(signals, dtype=torch.float32)  # Shape: (num_samples, signal_length)
signals = bandpass_filter(signals)  

In [None]:
import numpy as np
# Print shape of data
print(f"Signals shape: {signals.shape}")  # (num_samples, fixed_length)
print(f"Labels shape: {labels.shape}")    # (num_samples,)
print(f"Label map: {label_map}")
print(np.unique(labels))


In [None]:
import torch
labels = torch.tensor(labels, dtype= torch.int32)
print(torch.unique(labels))
print(signals.shape)
print(labels)

In [None]:
import torch
print(signals.shape)

signals = torch.tensor(signals, dtype=torch.float32, device="cpu")

In [None]:
import torchaudio
import torch
import torch.nn.functional as F

# Parameters for MFCC
sr = 16000  # Sampling rate
n_mfcc = 13  # Number of MFCC coefficients
n_fft = 1300  # FFT window size
hop_length = 1100  # Hop size 


# Torchaudio MFCC transform
mfcc_transform = torchaudio.transforms.MFCC(
    sample_rate=sr,
    n_mfcc=n_mfcc,
    melkwargs={"n_fft": n_fft, "hop_length": hop_length, "n_mels": 40}
)




def compute_mfccs_torchaudio(signals, mfcc_transform):
    """
    Compute MFCCs, delta, and delta-delta features for a batch of signals using torchaudio and PyTorch.

    Args:
        signals (list or ndarray): List or array of signals, each with shape (signal_length,).
        mfcc_transform (torchaudio.transforms.MFCC): MFCC transformation object.
    
    Returns:
        torch.Tensor: Tensor of shape (num_samples, time_frames, n_mfcc * 3),
                      containing MFCCs, deltas, and delta-deltas.
    """
    # Convert the list of numpy arrays to a single PyTorch tensor
    signal_tensor = torch.tensor(signals, dtype=torch.float32)  # Shape: (num_samples, signal_length)


    # Apply the MFCC transform to the batch
    mfcc = mfcc_transform(signal_tensor)  # Shape: (num_samples, n_mfcc, time_frames)
    
    # Swap axes to make the shape (num_samples, time_frames, n_mfcc)
    mfcc = mfcc.permute(0, 2, 1)  # Shape: (num_samples, time_frames, n_mfcc)

    # Compute delta features using PyTorch convolution for each MFCC channel independently
    delta_filter = torch.tensor([-1, 0, 1], dtype=torch.float32).view(1, 1, -1).to(mfcc.device)  # Filter for first-order derivative

    # Reshape for convolution: combine batch and MFCC channels
    mfcc_reshaped = mfcc.permute(0, 2, 1).reshape(-1, 1, mfcc.shape[1])  # Shape: (num_samples * n_mfcc, 1, time_frames)

    # Apply convolution for delta
    delta = F.conv1d(mfcc_reshaped, delta_filter, padding=1).reshape(mfcc.shape[0], mfcc.shape[2], -1).permute(0, 2, 1)

    # Apply convolution again for delta-delta
    delta_delta = F.conv1d(delta.reshape(-1, 1, mfcc.shape[1]), delta_filter, padding=1).reshape(mfcc.shape[0], mfcc.shape[2], -1).permute(0, 2, 1)

    # Concatenate MFCC, delta, and delta-delta features along the last dimension
    features = torch.cat([mfcc, delta, delta_delta], dim=2)  # Shape: (num_samples, time_frames, n_mfcc * 3)

    return features

# Compute MFCC, Delta, and Delta-Delta features for the dataset
print("Computing MFCCs, Delta, and Delta-Delta features for the dataset using torchaudio...")
mfcc_delta_features = compute_mfccs_torchaudio(signals, mfcc_transform)

print("Feature shape:", mfcc_delta_features.shape)  # Should be (num_samples, time_frames, n_mfcc * 3)

# Save the computed features along with labels
with open("speech_commands_mfcc_delta_features_torchaudio.pkl", "wb") as f:
    pickle.dump((mfcc_delta_features, labels, label_map), f)



In [1]:
#with delta features
import pickle
# Load it back
with open("speech_commands_mfcc_delta_features_torchaudio.pkl", "rb") as f:
    mfcc_features, labels, label_map = pickle.load(f)

In [2]:
print(f"MFCC features computed and saved.")
print(f"MFCC Features shape: {mfcc_features.shape}")  # Shape: (num_samples, n_mfcc)
print(f"Labels shape: {labels.shape}")
print(label_map)


MFCC features computed and saved.
MFCC Features shape: torch.Size([38546, 15, 39])
Labels shape: (38546,)
{'down': 0, 'go': 1, 'left': 2, 'no': 3, 'off': 4, 'on': 5, 'right': 6, 'stop': 7, 'up': 8, 'yes': 9}


In [2]:
print(mfcc_features[11235][9][8])
print(labels[0])



tensor(6.2697)
7


In [3]:
import torch
from collections import OrderedDict
from torch.utils.data import random_split

# Create a subset of 5000 signals
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(device)

# subset_size = 5000
# indices = torch.randperm(len(labels))[:subset_size]

# # # Sort the indices
# sorted_indices = torch.sort(indices).values

# labels = torch.tensor(labels, dtype=torch.int32).to(device)
# mfcc_features = torch.tensor(mfcc_features, dtype=torch.float32).to(device)

# subset_mfcc_features = mfcc_features[sorted_indices]
# subset_labels = labels[sorted_indices]

# print(subset_mfcc_features.shape)


# # Define the train-test split ratio for subset
# train_ratio = 0.80
# train_size = int(train_ratio * subset_size)
# test_size = subset_size - train_size

# train_dataset, test_dataset = random_split(list(zip(subset_mfcc_features, subset_labels)), [train_size, test_size])


size = labels.shape[0]
# Define the train-test split ratio for full length
train_ratio = 0.95
train_size = int(train_ratio * size)
test_size = size - train_size


# Create the train-test split
train_dataset, test_dataset = random_split(list(zip(mfcc_features, labels)), [train_size, test_size])

# Separate the features and labels for train and test sets
train_mfcc_features, train_labels = zip(*train_dataset)
test_mfcc_features, test_labels = zip(*test_dataset)

# Convert to tensors
train_mfcc_features = torch.stack(train_mfcc_features).to(device)
train_labels = torch.tensor(train_labels, dtype=torch.int32).to(device)
test_mfcc_features = torch.stack(test_mfcc_features).to(device)
test_labels = torch.tensor(test_labels, dtype=torch.int32).to(device)

print(train_mfcc_features.shape, train_labels.shape)
print(test_mfcc_features.shape, test_labels.shape)


mps
torch.Size([36618, 15, 39]) torch.Size([36618])
torch.Size([1928, 15, 39]) torch.Size([1928])


In [4]:
import os
import time
import torchaudio
import torch
import pickle
import HDcompute as hd

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(device)

#mfcc_features = mfcc_features.to(device)

#normalisation takes a bunch of time

vector1 = torch.tensor(hd.make_rand_vector(seed=2, size = 10000), dtype = torch.int8).to(device)
# vector2 = torch.tensor(hd.make_rand_vector(seed=8, size = 40000), dtype = torch.int8).to(device)
vector2 = vector1 * -1
#vector1 = vector1 * 0
print(vector1)
print(vector2)
labels = torch.tensor(labels, dtype=torch.int32, device=device)
mfcc_features = torch.tensor(mfcc_features, dtype=torch.float32, device=device)


start = time.time()
# model = hd.make_model_from_mfccs(mfcc_features, labels, vector1=vector1, vector2=vector2,
#                                     batch_size=10, device =device,
#                                     seed = 42, n_gram = 2, majority_vote=True, separate_signals= True,
#                                     alpha=2)
model = hd.make_model_from_mfccs(train_mfcc_features, train_labels, vector1=vector1, vector2=vector2, 
                                  batch_size=4, device = device,
                                  seed = 42, n_gram = 2, majority_vote=True, separate_signals= True, 
                                  alpha=1, single_window=False, weighing = True)
end = time.time() #111.9
print(end-start)

mps
tensor([ 1, -1, -1,  ..., -1, -1,  1], device='mps:0', dtype=torch.int8)
tensor([-1,  1,  1,  ...,  1,  1, -1], device='mps:0', dtype=torch.int8)


  mfcc_features = torch.tensor(mfcc_features, dtype=torch.float32, device=device)
  output, inverse_indices, counts = torch._unique2(
  model[idx] = torch.sum(torch_matrix[mask], dim=0)


109.48038911819458


In [None]:
print(model[0]) 
print(model[0].shape) 
#model = (hd.majority_vote_torch(model[0]),) + model[1:]# thats bad
#print(model[0])

In [5]:
#predict
# model = (model[0].to("mps"),) + model[1:]
# test_mfcc_features = test_mfcc_features.to("mps")
preds = hd.predict(model, test_mfcc_features, majority_vote=False, weighing = True)

#preds = hd.predict_vote(model, test_mfcc_features).to("mps")


In [None]:
#print(torch.count_nonzero(preds == labels))
print("accuracy: ",round(torch.count_nonzero(preds == test_labels).item()*100/test_labels.shape[0], 4) , "%")

# print(preds)
# print(test_labels)

#1836 for 4-gram, within sample
#1926 for 1 gram superposition, within sample

#60 % accuracy with superposition, and random seed +1 in the for loop, and n-gram = 2
# accuracy 68-69 % with larger windows up to 2000 and no majority vote (log transform helps a bunch)
#72% with superposition, log transformation, seperate signals, n_gram = 2.
#76 with vector length = 20.000, 
# 74-76% with 1600 window length, 1200 skip size 
# 78 % with weighing

# alpha ≈ 1-3
print(torch.count_nonzero(train_labels == 9)) 

#best model is with no majority vote and seperate signals



In [None]:
import numpy as np

# Example numpy arrays (replace with your actual data)
preds_np = np.asanyarray(preds.to("cpu"))  # Replace with your predictions array
test_labels_np = np.asanyarray(test_labels.to("cpu"))  # Replace with your test labels array

# Combine the arrays into a single 2D array (columns)
data = np.column_stack((preds_np, test_labels_np))

# Save to CSV
np.savetxt("preds_and_test_labels.csv", data, delimiter=",", header="preds,test_labels", comments="", fmt="%d")

print("CSV file saved successfully.")

In [None]:
import numpy as np

# Read the CSV file back into a NumPy array
data = np.loadtxt("preds_and_test_labels.csv", delimiter=",", skiprows=1, dtype=np.int32)

# Separate the columns
preds_np, test_labels_np = data[:, 0], data[:, 1]

print(np.count_nonzero(preds_np==test_labels_np)/preds_np.shape[0])

print("",preds_np[:10],"\n",test_labels_np[:10])  # Print first 5 rows


In [None]:
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score
import numpy as np

# Generate confusion matrix
conf_matrix = confusion_matrix(test_labels_np, preds_np)

# Calculate F1 score
f1 = f1_score(test_labels_np, preds_np, average='weighted')  # Use 'micro', 'macro', or 'weighted' depending on needs

accuracy = accuracy_score(test_labels_np, preds_np)

print(f"F1 Score (weighted): {f1:.4f}")
print("Accuracy:", round(accuracy, 3))


import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay

# Visualize confusion matrix
disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix)
disp.plot(cmap='viridis')  # Adjust the colormap if desired
plt.show()

# Uncomment to display the confusion matrix values
# print("Confusion Matrix:")
# print(conf_matrix)


In [None]:
accuracies = []
for i in range(1, 10):
    print(i)
    model = hd.make_model_from_mfccs(train_mfcc_features, train_labels, vector1=vector1, vector2=vector2, batch_size=10, device =device, seed = 7, n_gram = i, majority_vote=False, separate_signals= True)
    preds = hd.predict(model, test_mfcc_features)
    accuracies.append(torch.count_nonzero(preds == test_labels))


In [None]:
#multithread evaluation
import concurrent.futures
import torch

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(device)

def train_and_evaluate(n_gram):
    model = hd.make_model_from_mfccs(
        train_mfcc_features, train_labels,
        vector1=vector1, vector2=vector2, batch_size=10,
        device=device, seed=7, n_gram=n_gram,
        majority_vote=False, separate_signals=True
    )
    preds = hd.predict(model, test_mfcc_features)
    accuracy = torch.count_nonzero(preds == test_labels).item()
    return accuracy

# Create a pool of threads to parallelize training and evaluation
accuracies = []
with concurrent.futures.ThreadPoolExecutor() as executor:
    n_gram_range = range(1, 10)
    # Map each n_gram value to the train_and_evaluate function
    results = executor.map(train_and_evaluate, n_gram_range)

    # Collect results
    accuracies = list(results)

print("Accuracies:", accuracies)

In [None]:
print(accuracies)

In [None]:
from collections import OrderedDict
import math
print(model[0].shape)
print(labels.shape)
unique_labels = torch.tensor(list(OrderedDict.fromkeys(labels.tolist())), dtype=labels.dtype, device=labels.device)  
print(unique_labels)
print(labels[1])
print(torch.count_nonzero(model[0][0]== 307881))
#2462
n = torch.count_nonzero(labels == 2)
print(n)
print(81*3801)

print("LOOL",torch.count_nonzero(model[0][0]== 307881))


print(model[0].shape)

print(torch.count_nonzero(model[0][1] == 0))


print(torch.count_nonzero(vector1 == 1))
print(torch.count_nonzero(vector2 == 1))


tots = torch.tensor([-3000, 20, 3, -2, -5, -6, 3, 3, 3])

tots = hd.majority_vote_torch(tots)
print(tots)


Should give torch.Size([n_classes, 10000])

In [None]:
# Sanity check for encoding vectors
batch_size = 10
num_windows = 5
num_mfccs = 13
vector_length = 5

# Example tensors
mfcc_encoded_vectors = torch.rand(batch_size, num_windows, num_mfccs, vector_length)

print(mfcc_encoded_vectors.shape)

rand_mfcc_matrix = torch.randint(0, 2, (num_mfccs, vector_length), dtype=torch.float32) * 2 - 1
print(rand_mfcc_matrix.shape)

print(mfcc_encoded_vectors[1][1][1])
print(rand_mfcc_matrix[1])

sum = 0

for i in range(num_mfccs):
    sum += mfcc_encoded_vectors[1][1][i]*rand_mfcc_matrix[i]

print("sum:", sum)

# Call the function
result = hd.bind_and_add_mfcc_vectors(mfcc_encoded_vectors, rand_mfcc_matrix)

print("result:", result[1][1])


print(result.shape)  # Should print (10, 5, 128)

In [None]:
#cosine similarity sanity check
import torch
import HDcompute as hd
from collections import OrderedDict

# Example usage
M = torch.tensor([[1, 2, 3], [4, 5, 6]], dtype=torch.float32).to("mps")
X = torch.tensor([[0.4, 0.5, 0.6], [1, 2, 3], [1, 1, 1]], dtype=torch.float32).to("mps") # Signal vectors

print(M.device)
print(X.device)
#12


cos_sim_matrix = hd.cosine_similarity_matrix_torch(M, X)


cos_sim_matrix = cos_sim_matrix.to("cpu") #but whyyyy??? the gpu returns the wrong argmax for some reason.

print("cos sim matrix\n",cos_sim_matrix) #20

argmax = torch.argmax(cos_sim_matrix, dim = 0)

print(argmax)



device = M.device
labels = labels.clone().to(device=device, dtype=torch.int32)
unique_labels = torch.tensor(list(OrderedDict.fromkeys(labels.tolist())), dtype=labels.dtype, device=labels.device) #30


print(unique_labels[argmax])



In [None]:
import torch
import HDcompute as hd

device = "mps"


def encode_mfcc_batch_with_percentage_flip(
    mfccs_batch, num_cols, num_mfccs, vector_length, 
    min_vector, mean_vector, max_vector, device, seed=None
):
    if seed is not None:
        torch.manual_seed(seed)

    # Calculate mean for each MFCC across time windows for each signal
    mean_vals = torch.mean(mfccs_batch, dim=1, keepdim=True)  # Shape: (signals, 1, mfccs)

    # Calculate range (max - mean or mean - min) and avoid division by zero
    range_vals = torch.amax(mfccs_batch, dim=1, keepdim=True) - torch.amin(mfccs_batch, dim=1, keepdim=True) + 1e-6

    # Calculate percentage deviation from the mean
    percentages = (mfccs_batch - mean_vals) / range_vals  # Shape: (signals, windows, mfccs)
    percentages = percentages.clamp(min=-1, max=1)  # Clamp to range [-1, 1]

    # Expand percentage tensor to match hypervector dimensions
    percentages_expanded = (percentages * 2).clamp(min=-1, max=1).unsqueeze(-1)  # Amplify influence


    # Expand vectors to match batch dimensions
    mean_vector_expanded = mean_vector.unsqueeze(0).unsqueeze(0)  # Shape: (1, 1, mfccs, vector_length)
    min_vector_expanded = min_vector.unsqueeze(0).unsqueeze(0)
    max_vector_expanded = max_vector.unsqueeze(0).unsqueeze(0)

    # Generate random values for flipping
    random_values = torch.rand(mfccs_batch.shape[0], num_cols, num_mfccs, vector_length, device=device)

    # Calculate masks for flipping towards min and max
    flip_towards_min = percentages_expanded < 0  # True if deviation is below the mean
    flip_towards_max = percentages_expanded > 0  # True if deviation is above the mean

    # Compute absolute percentages for blending
    abs_percentages = percentages_expanded.abs()

    # Generate the final flipped vectors
    flipped_towards_min = torch.where(
        random_values < abs_percentages,
        min_vector_expanded,
        mean_vector_expanded
    )
    flipped_towards_max = torch.where(
        random_values < abs_percentages,
        max_vector_expanded,
        mean_vector_expanded
    )

    # Combine based on masks
    flipped_vectors = torch.where(
        flip_towards_min,
        flipped_towards_min,
        flipped_towards_max
    )

    return flipped_vectors



# Example dimensions
signals = 2  # Number of signals
windows = 81  # Number of time windows
mfccs = 39  # Number of MFCC coefficients
vector_length = 10000  # Hypervector dimensionality

# Random input and reference vectors
mfccs_batch = torch.rand(signals, windows, mfccs, device=device)
min_vector = torch.tensor(hd.make_rand_vector(seed=2, size = 10000), dtype = torch.int8).to(device)
mean_vector = torch.tensor(hd.make_rand_vector(seed=3, size = 10000), dtype = torch.int8).to(device)
max_vector = torch.tensor(hd.make_rand_vector(seed=4, size = 10000), dtype = torch.int8).to(device)

# Encode MFCCs
encoded_hypervectors = encode_mfcc_batch_with_percentage_flip(
    mfccs_batch, num_cols=windows, num_mfccs=mfccs, vector_length=vector_length,
    min_vector=min_vector, mean_vector=mean_vector, max_vector=max_vector,
    device=device
)


print(encoded_hypervectors.shape)  # Should be (signals, windows, mfccs, vector_length)

def hamming_distance(tensor1, tensor2):
    num_matches = torch.sum(tensor1 == tensor2).item()
    #total_elements = tensor1.numel()
    #fraction_matches = num_matches / total_elements
    return num_matches


print("distances:")
print("max",hamming_distance(encoded_hypervectors[0][0][0], max_vector))
print("min",hamming_distance(encoded_hypervectors[0][0][0], min_vector))
print("mean",hamming_distance(encoded_hypervectors[0][0][0], mean_vector))

print(mfccs_batch[0][0][0])
print(torch.max(mfccs_batch[0][:][0]))
print(torch.min(mfccs_batch[0][:][0]))
print(torch.mean(mfccs_batch[0][:][0]))



In [None]:
import torch
import HDcompute as hd


# Create a simple test signal tensor (5 windows, vector length 4)
test_signal = torch.tensor([[
    [1, 2, 3, 4],  # Window 1
    [5, 6, 7, 8],  # Window 2
    [9, 10, 11, 12],  # Window 3
    [13, 14, 15, 16],  # Window 4
    [17, 18, 19, 20]   # Window 5
]], dtype=torch.float)

print(hd.n_gram_encode_tensor_torch(test_signal, 3)) #4619


In [None]:
# Create a simple test signal tensor (5 windows, vector length 4)
test_signal = torch.tensor([[
    [1, 2, 3, 4],  # Window 1
    [5, 6, 7, 8],  # Window 2
    [9, 10, 11, 12],  # Window 3
    [13, 14, 15, 16],  # Window 4
    [17, 18, 19, 20]   # Window 5
]], dtype=torch.float)

#print("test signal\n",test_signal)

test_signal_copy = test_signal.clone()

test_signal_permutated = torch.roll(test_signal, shifts=1, dims=2) # permutation
test_signal_permutated_2 = torch.roll(test_signal, shifts=2, dims=2) # permutation 
test_signal_permutated_3 = torch.roll(test_signal, shifts=3, dims=2) # permutation 

# print(test_signal_permutated_3)
# print(test_signal_permutated_2)
# print(test_signal_permutated)
# print(test_signal)

print(test_signal_copy)

test_signal_permutated[:, 1:4] *= test_signal_copy[:, 2:5]

test_signal_permutated_2[:, 0:3] *= test_signal_permutated[:, 1:4] # we never change test_signal permutated_2[:, 4]


test_signal[:, 2:5] *= test_signal_permutated[:, 1:4]

test_signal[:, 1:4] *= test_signal_permutated_2[:, 0:3]



print(test_signal_permutated_2)

In [None]:

def make_model_from_torch_matrix(torch_matrix, target):
    # Determine the number of unique labels
    unique_labels = torch.tensor(list(OrderedDict.fromkeys(target.tolist())), dtype=target.dtype, device=target.device)
    n_target = len(unique_labels)
    model = torch.zeros((n_target, torch_matrix.shape[1]), dtype=torch.float32).to(torch_matrix.device)

    row = 0

    for i, Class in enumerate(unique_labels):
        print(Class)
        while row < torch_matrix.shape[0] and Class == target[row]: #until we are through the class, start adding vectors in the same class
            model[i] += torch_matrix[row]
            row += 1 
        print(row)
    return model, unique_labels



import torch
from collections import OrderedDict
from HDcompute import make_model_from_torch_matrix

# Example data
torch_matrix = torch.tensor([[1, 2], [3, 4], [5, 6], [7, 8], [9, 10], [11, 12], [13, 14], [15, 16], [17, 18], [19, 20]])
target = torch.tensor([2, 2, 2, 2, 3, 3, 3, 3, 3, 5, 5, 5, 5, 0, 0, 0, 0, 3, 3, 3])

# Create the model
model, unique_labels = make_model_from_torch_matrix(torch_matrix, target)

# Print the model and unique labels
print("Model:", model)
print("Unique Labels:", unique_labels)

# Example usage of the model
print(model[2])
# model = (hd.majority_vote_torch(model[0]),) + model[1:]
# print(model[0])

In [None]:
import torch
import numpy as np
import time

# Original function
def normalise_mfccs_original(mfccs, separate_signals = False):
    copy_mfccs = mfccs.clone()
    if separate_signals == False:
        #find the min and max for each mfcc group eg. 26 values
        for i in range(mfccs.shape[2]):     #for each mfcc coeficient
            max = torch.max(mfccs[:, :, i]) #get the max for that coefficient
            min = torch.min(mfccs[:, :, i]) #get the max for that coefficient
            diff = max-min                  #calculate the difference
            copy_mfccs[:, :, i] =  (copy_mfccs[:, :, i]-min)/diff

    elif separate_signals == True:
        for j in range (mfccs.shape[0]): #for each signal
            for i in range(mfccs.shape[2]):     #for each mfcc coeficient
                max = torch.max(mfccs[j, :, i]) #get the max for that coefficient
                min = torch.min(mfccs[j, :, i]) #get the max for that coefficient
                diff = max-min                  #calculate the difference
                copy_mfccs[j, :, i] =  (copy_mfccs[j, :, i]-min)/diff

    return copy_mfccs

# Refactored function
def normalise_mfccs_refactored(mfccs, separate_signals=False):
    copy_mfccs = mfccs.clone()
    if not separate_signals:
        # Use amin and amax for multidimensional min/max
        min_vals = torch.amin(mfccs, dim=(0, 1), keepdim=True)
        max_vals = torch.amax(mfccs, dim=(0, 1), keepdim=True)
        diff = max_vals - min_vals
        copy_mfccs = (copy_mfccs - min_vals) / diff
    else:
        # Use amin and amax for reduction along specific dimensions
        min_vals = torch.amin(mfccs, dim=1, keepdim=True)
        max_vals = torch.amax(mfccs, dim=1, keepdim=True)
        diff = max_vals - min_vals
        copy_mfccs = (copy_mfccs - min_vals) / diff

    return copy_mfccs


# Test
def test_normalise_mfccs():
    # Create random test data
    torch.manual_seed(42)
    signals = 4000  # Number of signals
    frames = 81   # Number of frames per signal
    coeffs = 39   # Number of MFCC coefficients
    mfccs = torch.rand((signals, frames, coeffs))

    # Test with separate_signals=False
    original_output = normalise_mfccs_original(mfccs, separate_signals=False)
    refactored_output = normalise_mfccs_refactored(mfccs, separate_signals=False)
    assert torch.allclose(original_output, refactored_output), \
        "Outputs do not match for separate_signals=False!"

    # Test with separate_signals=True
    start = time.time()
    original_output = normalise_mfccs_original(mfccs, separate_signals=True)
    end = time.time()
    print(end-start)

    start = time.time()
    refactored_output = normalise_mfccs_refactored(mfccs, separate_signals=True)
    end = time.time()
    print(end-start)

    assert torch.allclose(original_output, refactored_output), \
        "Outputs do not match for separate_signals=True!"

    print("All tests passed!")

# Run the test
test_normalise_mfccs()
