# Load Data

In [1]:
import pickle
import os
from tqdm import tqdm
import numpy as np

directory = 'data/train'

length_list = []
valence_values=[]

recordings = []

for filename in tqdm(os.listdir(directory)):
    if filename.endswith('.pkl'):
        file_path = os.path.join(directory, filename)
        with open(file_path, 'rb') as file:
            data = pickle.load(file)
            if data['valence'] != 2.333 and len(data['audio_data']) < 91000:
                length_list.append(len(data['audio_data']))
                valence_values.append(data['valence'])
                recordings.append(data['audio_data'])

valence_values = np.array(valence_values)


100%|██████████| 10557/10557 [01:03<00:00, 165.65it/s]


In [2]:
valennce_dist ={}
for i in valence_values:
    if i in valennce_dist:
        valennce_dist[i] += 1
    else:
        valennce_dist[i] = 1


valennce_dist


{1.25: 140,
 3.5: 1173,
 3.25: 1273,
 3.75: 972,
 2.0: 523,
 2.75: 1014,
 2.25: 679,
 3.0: 1045,
 4.25: 606,
 1.0: 114,
 1.5: 227,
 1.75: 407,
 4.75: 151,
 4.0: 784,
 2.5: 890,
 5.0: 28,
 4.5: 365}

In [3]:
# Calculate the maximum length
max_length = max(length_list)

# Adjust the recordings to have the same length
adjusted_recordings = []
for recording in recordings:
    # Repeat the array until it reaches or exceeds the maximum length
    repeated_recording = np.tile(recording, (max_length // len(recording) + 1))[:max_length]
    adjusted_recordings.append(repeated_recording)

valence_values = np.array(valence_values)
len(adjusted_recordings), adjusted_recordings[0].shape



(10391, (90948,))

In [4]:
import torch
from torch.utils.data import TensorDataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


############################################################################################################


############################################################################################################
# Pad the recordings to have the same length
max_length = max(len(array) for array in recordings)  # Find the maximum length

# # Pad each array to have the maximum length
# padded_arrays = np.array([np.pad(array, (0, max_length - len(array)), mode='constant') for array in recordings])


############################################################################################################
# Split the data and labels into training and testing sets
X_train, X_test_help, y_train, y_test_help = train_test_split(adjusted_recordings, valence_values, test_size=0.4, random_state=42)

X_val, X_test, y_val, y_test = train_test_split(X_test_help, y_test_help, test_size=0.5, random_state=42)



############################################################################################################
# Create a DataLoader for the training set, test set, and validation set

batch_size = 256  # You can adjust the batch size depending on your system's capability

# Convert input data and labels to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  # Use float32 for input features
y_train_tensor = torch.tensor(y_train, dtype=torch.float32)  # Use float32 for labels

# Create a dataset from tensors
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)

# Create a DataLoader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

############################################################################################################
# Repeat the same process for the test set

X_test_tensor = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)
y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



############################################################################################################
# Repeat the same process for the validation set
X_validation_tensor = torch.tensor(X_val, dtype=torch.float32).unsqueeze(1)
y_validation_tensor = torch.tensor(y_val, dtype=torch.float32)

validation_dataset = TensorDataset(X_validation_tensor, y_validation_tensor)
validation_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False)




  X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  # Use float32 for input features


In [5]:
import matplotlib.pyplot as plt

def calculate_label_distribution(labels_tensor):
    label_dist = {}
    for label in labels_tensor:
        label = label.item()  # Convert tensor to int
        if label in label_dist:
            label_dist[label] += 1
        else:
            label_dist[label] = 1
    return label_dist

train_labels = train_loader.dataset.tensors[1]
train_valence_dist = calculate_label_distribution(train_labels)




# Augment Data

In [6]:
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift, RoomSimulator, Aliasing, HighPassFilter, LowPassFilter,ApplyImpulseResponse
import numpy as np
import torch
import sounddevice as sd
import random


def draw_augmentations(text,amount,fs=12000):
    
    augmentGaussian = Compose([
        AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=1),
    ])

    augmentPitchShift = Compose([
        PitchShift(min_semitones=-4, max_semitones=4, p=1),
    ])

    augmentRoomSimulator = Compose([
        RoomSimulator(p=1)
    ])

    augmentAliasing = Compose([
        Aliasing(min_sample_rate=8000, max_sample_rate=30000,p=1)
    ])

    augmentHPF = Compose([
        HighPassFilter(min_cutoff_freq=15, max_cutoff_freq=2400,p=1)
        ])

    augmentedLPF = Compose([
        LowPassFilter(p=1)
    ])


    list_of_augmentations = [ augmentGaussian, augmentPitchShift, augmentRoomSimulator, augmentAliasing, augmentHPF, augmentedLPF]

    functions = random.sample(list_of_augmentations, amount,)
    for i in functions:
        # print(i.__name__)
        text = i(text, sample_rate=fs)
    # sd.play(text, fs)
    return text


In [7]:

waveforms = train_loader.dataset.tensors[0].numpy()  # Convert tensor to NumPy array
labels = train_loader.dataset.tensors[1].numpy()  # Convert tensor to NumPy array

# Create a dictionary to hold the data categorized by labels
label_dict = {}

# Iterate through each label and corresponding waveform
for waveform, label in zip(waveforms, labels):
    if label not in label_dict:
        label_dict[label] = []
    label_dict[label].append(waveform)


for label in label_dict:
    for k in label_dict[label]:
        print(k.shape)
        





(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)

In [8]:
from tqdm import tqdm
def train_balancer(train_valence_dist):
    train_balanced = {}
    for label in train_valence_dist.keys():
        train_balanced[label] = []
    for label in tqdm(train_valence_dist.keys()):
        if train_valence_dist[label] < 400:
            for recording in range(len(label_dict[label])):
                train_balanced[label].append(label_dict[label][recording])
            for _ in range(400 - len(label_dict[label])):
                choose_func_amount = random.randint(1, 3)
                choose_index = random.randint(0, len(label_dict[label])-1)
                train_balanced[label].append(draw_augmentations(label_dict[label][choose_index],choose_func_amount))
        else:
            indices = random.sample(range(len(label_dict[label])), 400)
            for index in indices:
                train_balanced[label].append(label_dict[label][index])
    return train_balanced


train_balanced = train_balancer(train_valence_dist= train_valence_dist)
print(train_balanced.keys())
        

100%|██████████| 17/17 [02:43<00:00,  9.63s/it]

dict_keys([3.5, 2.5, 3.25, 3.75, 1.75, 1.25, 3.0, 4.5, 4.25, 2.0, 4.0, 1.5, 2.25, 1.0, 2.75, 4.75, 5.0])





In [9]:
def array_to_hashable(arr):
    return hash(arr.tobytes())



for arrays in train_balanced.keys():
    print("Label: ", arrays)
    # Convert each array to a hashable type
    unique_hashes = set(array_to_hashable(arr) for arr in train_balanced[arrays])

    # The number of unique arrays
    num_unique_arrays = len(unique_hashes)
    print("Number of unique arrays for : ", num_unique_arrays)

    

Label:  3.5
Number of unique arrays for :  400
Label:  2.5
Number of unique arrays for :  400
Label:  3.25
Number of unique arrays for :  400
Label:  3.75
Number of unique arrays for :  400
Label:  1.75
Number of unique arrays for :  400
Label:  1.25
Number of unique arrays for :  400
Label:  3.0
Number of unique arrays for :  400
Label:  4.5
Number of unique arrays for :  400
Label:  4.25
Number of unique arrays for :  400
Label:  2.0
Number of unique arrays for :  400
Label:  4.0
Number of unique arrays for :  400
Label:  1.5
Number of unique arrays for :  400
Label:  2.25
Number of unique arrays for :  400
Label:  1.0
Number of unique arrays for :  400
Label:  2.75
Number of unique arrays for :  400
Label:  4.75
Number of unique arrays for :  400
Label:  5.0
Number of unique arrays for :  400


In [11]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import numpy as np



test_dict = train_balanced

# Flatten the dictionary into lists of samples and labels
X_train = []
y_train = []
for label, arrays in test_dict.items():
    for array in arrays:
        X_train.append(array)
        y_train.append(label)  # Adjust how labels are encoded as needed

for i in X_train:
    print(i.shape)


# # Convert lists to tensors
# X_train_tensor = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  # Use float32 for input features
# y_train_tensor = torch.tensor(y_train, dtype=torch.float32)  # Adjust the type depending on how you encode labels

# # Create a dataset from tensors
# train_dataset = TensorDataset(X_train_tensor, y_train_tensor)

# # Define the batch size
# batch_size = 256  # Adjust batch size based on your system capability

# # Create a DataLoader
# train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# # Now 'train_loader' is ready to be used in a training loop


# print(train_loader.dataset.tensors[0].shape)

(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)
(1, 90948)