In [None]:
import os
import torch
import datetime
import time

import torch

import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import subprocess
import os
from utils import *
from models import *
from my_loss import *
from data_process import *


In [None]:
model = WordConfidence_CMP().to(device)

In [None]:
for name, param in model.named_parameters():
    if param.requires_grad:
        print(name, param.shape)

In [None]:
asr_model = nemo_asr.models.EncDecRNNTBPEModel.from_pretrained(
            "nvidia/stt_en_conformer_transducer_xlarge"
        )

In [None]:
for i, j  in asr_model.cfg.items():
    print(i, j)

In [None]:
CONSTANTS = InitializationTrain(
    verbose=True
)
dataset = CPCdataBinaural(metadata=CONSTANTS.metadata)

In [None]:
model = EncoderPredictor().to(CONSTANTS.device)
mel = model.logmel

In [None]:
train_loader = DataLoader(dataset=dataset, batch_size=3)

## Listener Info (Audiogram)

In [None]:
listener_info = ListenerInfo(['L0231', 'L0201'])
audiogram_l = [listener_info.info[i]['audiogram_l'] for i in range(len(listener_info.info))]
audiogram_r = [listener_info.info[i]['audiogram_r'] for i in range(len(listener_info.info))]
audiogram_cfs = [listener_info.info[i]['audiogram_cfs'] for i in range(len(listener_info.info))]

In [None]:
audiogram_l

In [None]:
listener_info = ListenerInfo(['L0231', 'L0201'])
print(listener_info.info)
print(listener_info.info[0]['audiogram_l'])
print(listener_info.info[0]['audiogram_r'])
print(listener_info.info[0]['audiogram_cfs'])

In [None]:
for speech_input_l, speech_input_r, info_dict in tqdm(train_loader, desc="Training:"):
    mel_feature_l, mel_feature_length = mel(
            input_signal=speech_input_l.to(device),
            length=torch.full((speech_input_l.shape[0],), speech_input_l.shape[1]).to(device),
        )
    listener_info = ListenerInfo(info_dict['listener'])
    listener_info.info['audiogram_l']
    break

In [None]:
print(model.asr_model.cfg)

In [None]:
for item, value in model.asr_model.cfg.items():
    print(item, value)

In [None]:
for item, value in model.asr_model.cfg['preprocessor'].items():
    print(item, value)

In [None]:
import numpy as np
from scipy.interpolate import interp1d

# 示例数据
a = np.array([0, 1, 2, 3, 4, 5, 6, 7])  # 8元素的频率数组
b = np.array([10, 20, 30, 40, 50, 60, 70, 80])  # 8元素的值数组
c = np.linspace(0, 7, 80)  # 80元素的频率数组

# 创建线性插值函数
linear_interpolation = interp1d(a, b)

# 计算c中每个频率对应的值
result = linear_interpolation(c)

# 打印结果
print(result)


In [None]:
import numpy as np

def mel_to_hz(mel):
    return 700 * (10**(mel / 2595) - 1)

def hz_to_mel(hz):
    return 2595 * np.log10(1 + hz / 700)

def get_central_frequencies(nfilt, lowfreq, highfreq):
    low_mel = hz_to_mel(lowfreq)
    high_mel = hz_to_mel(highfreq)

    mel_points = np.linspace(low_mel, high_mel, nfilt + 2)  # nfilt + 2 points to include bounds
    hz_points = mel_to_hz(mel_points)

    central_frequencies = hz_points[1:-1]  # exclude the first and last points
    return central_frequencies


nfilt = 80
lowfreq = 0
highfreq = 8000

central_frequencies = get_central_frequencies(nfilt, lowfreq, highfreq)
print(central_frequencies)


In [None]:
len(central_frequencies)

In [None]:
class HurricaneData(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []

        for label_folder in os.listdir(root_dir):
            label_path = os.path.join(root_dir, label_folder)
            if os.path.isdir(label_path):
                for audio_file in os.listdir(os.path.join(label_path,'ssn')):
                    audio_path = os.path.join(label_path, audio_file)
                    self.samples.append(audio_path)

In [None]:
import os
root_dir = '/home/ubuntu/elec823/hurricane'
print(os.listdir(root_dir))
label_folder = os.listdir(root_dir)[0]
label_path = os.path.join(root_dir, label_folder)
print(label_path)
print(os.path.join(label_path,'ssn'))

In [None]:
for mod_folder in os.listdir(root_dir):
    ssn_path = os.path.join(root_dir, mod_folder, 'ssn')
print(os.listdir(ssn_path))

In [None]:

import os
root_dir = '/home/ubuntu/elec823/hurricane'
samples = []
for mod_folder in os.listdir(root_dir):
    if mod_folder.startswith("."):
        continue
    ssn_path = os.path.join(root_dir, mod_folder, 'ssn')
    # print(ssn_path)
    for snr in os.listdir(ssn_path):
        if snr.startswith("."):
            continue
        snr_path = os.path.join(ssn_path, snr)
        for audio_file in os.listdir(snr_path):
            audio_path = os.path.join(snr_path, audio_file)
            samples.append(audio_path)

In [None]:
print(len(samples))
print(samples[1])

In [None]:
import torchaudio
waveform, sample_rate = torchaudio.load(samples[1])

In [None]:
waveform.shape
import torch
a = torch.mean(waveform, dim=0)
a.shape

In [None]:
os.path.join(label_path,'ssn')

In [None]:
samples

In [None]:
class HurricaneData(Dataset):
    def __init__(self, state, root_dir='/home/ubuntu/elec823/hurricane', transform=None):
        self.state = state
        self.root_dir = root_dir
        self.transform = transform
        self.scores = scipy.io.loadmat(os.path.join(root_dir, 'scores.mat'))['intell']
        self.all_samples = []
        self.noise_types = {"cs":0, "ssn":1}
        self.snrs = {"snrHi":0, "snrMid":1, "snrLo":2}

        for mod_folder in os.listdir(root_dir):
            if mod_folder.startswith("."):
                continue
            ssn_path = os.path.join(root_dir, mod_folder, 'ssn')
            if not os.path.isdir(ssn_path):
                continue
            for snr in os.listdir(ssn_path):
                if snr.startswith("."):
                    continue
                snr_path = os.path.join(ssn_path, snr)
                for audio_file in os.listdir(snr_path):
                    audio_path = os.path.join(snr_path, audio_file)
                    self.all_samples.append(audio_path)
        idx = 0
        val_list = []
        for i in range(0, len(self.all_samples), 180):
            val_list.extend(self.all_samples[i:i+36])
        if self.state == 'train':
            self.samples = [item for item in self.all_samples if item not in val_list]
        elif self.state == 'valid':
            self.samples = val_list

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        audio_path = self.samples[idx]
        split_str = audio_path.split('/')
        output = [split_str[-4], split_str[-3], split_str[-2], split_str[-1].split('_')[-1].split('.')[0]]
        numbers = int(''.join(re.findall(r'\d+', output[0])))
        noise_type = self.noise_types[output[1]]
        snr = self.snrs[output[2]]
        utt = int(output[3])
        score = self.scores[numbers-1][noise_type][snr][utt-1]
        
        waveform, sample_rate = torchaudio.load(audio_path)
        waveform = torch.mean(waveform, dim=0)
        
        # Pad or trim the audio to 3 seconds
        desired_length = sample_rate * 3  # keep 3 seconds
        if waveform.size(-1) < desired_length:
            padding = desired_length - waveform.size(-1)
            waveform = torch.nn.functional.pad(waveform, (0, padding), "constant")
        elif waveform.size(-1) > desired_length:
            waveform = waveform[..., :desired_length]
        # if self.transform:
        #     waveform = self.transform(waveform)

        return waveform, waveform, score
dataset_train = HurricaneData('train')
dataset_valid = HurricaneData('valid')
train_loader = DataLoader(dataset=dataset, batch_size=3)
for i, j in train_loader:
    break

In [None]:
print(dataset_train.__len__())
print(dataset_valid.__len__())


In [None]:
a = ["1", "2", "3"]
c = [4,5,6]
b =[]
b=b+a
print(b)

In [None]:
dataset.samples

In [None]:
for i in train_loader:
    print(i.shape)
    break

In [None]:
import scipy.io

data = scipy.io.loadmat('/home/ubuntu/elec823/cache.mat')['intell']

print(data)

In [None]:
input_str = '*/123/123/hurricane/mod10/ssn/snrLo/hvd_009.wav'

# 使用'/'分割字符串
split_str = input_str.split('/')

# 从分割后的字符串列表中提取所需部分
output = [split_str[-4], split_str[-3], split_str[-2], split_str[-1].split('_')[-1].split('.')[0]]

print(output)
import re
noise_types = {"cs":0, "ssn":1}
snrs = {"snrHi":0, "snrMid":1, "snrLo":2}
numbers = int(''.join(re.findall(r'\d+', output[0])))
noise_type = noise_types[output[1]]
snr = snrs[output[2]]
utt = int(output[3])
print(numbers, noise_type, snr, utt)

In [None]:
import os
import torch
import datetime
import time

import torch

import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import KFold
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
import subprocess
import os
from utils import *
from models import *
from my_loss import *
from data_process import *


In [None]:
logmel = asr_model.preprocessor
conformer_encoder = asr_model.encoder
asr_model = nemo_asr.models.EncDecRNNTBPEModel.from_pretrained(
            "nvidia/stt_en_conformer_transducer_xlarge")

In [None]:
model = EncoderPredictor_Fusion()


In [8]:
print(model.predictor[0].weight[0])

tensor([ 1.6621e-03,  9.7242e-05,  1.2429e-04,  ..., -2.3975e-03,
        -1.4652e-04, -3.4295e-04], grad_fn=<SelectBackward0>)


: 

In [None]:
for name, param in model.named_parameters():
    print(name)

In [None]:
import torch
import torch

# create a random tensor of size [B, 10]
x = torch.randn(8, 10)

# unsqueeze the tensor along the last dimension to create a new dimension of size 151
x = torch.unsqueeze(x, -1)
x = x.repeat(1, 1, 151)  # repeat the tensor along the new dimension

# print the size of the resulting tensor
print(x.size())  # should output [B, 10, 151]

In [None]:
y = torch.randn(8, 512, 151)

In [None]:
concat = torch.cat((x, y), dim=1)
print(concat.shape)

In [None]:
import torch
a = torch.tensor([1,2,3])
a.shape
a.shape[0]

In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence

# 示例：不等长的张量列表
tensor_list = [
    torch.tensor([1, 2, 3]),
    torch.tensor([4, 5]),
    torch.tensor([6, 7, 8, 9])
]

# 使用pad_sequence将列表转换为固定长度的张量
padded_tensor = pad_sequence(tensor_list, batch_first=True, padding_value=0)

print("Padded tensor:")
print(padded_tensor)


In [None]:
class Test(nn.Module):
    def __init__(self):
        super(Test, self).__init__()
        self.threshold = nn.Parameter(torch.tensor(0.5), requires_grad=True)

    def forward(self, word_confidence, valid_len):
        """
        Args:
            word_confidence (torch.Tensor): [Batch, 10]
            valid_len (torch.Tensor): [Batch, 1]
        """
        # greater_than_thr = (word_confidence > self.threshold).sum(dim=1, keepdim=True).float().requires_grad_()
        # print("great: ", greater_than_thr)
        # output = torch.div(greater_than_thr, valid_len.view(-1, 1))
        output = word_confidence * self.threshold
        return output

In [None]:
a = torch.tensor([0.2,0.5,0.8], requires_grad=True, dtype=torch.float32)
b = torch.tensor([0.1,0.9,0.4], requires_grad=True, dtype=torch.float32)
c = torch.tensor([7,8,9], requires_grad=True, dtype=torch.float32)
loss = torch.nn.MSELoss()
test = Test()

x = torch.stack([a,b], dim=0)
print(x)
print(x.shape)
# y = a+b
y = test(x, torch.tensor([3,3]))
print(y)
t = torch.tensor([[0.5, 0.5, 0.5], [0.5, 0.5, 0.5]])
loss = loss(y, t)
loss.backward()

print(loss.retain_grad())
print(x.grad)
tensor_threshold = test.threshold.detach()

print(tensor_threshold.grad)

In [None]:
a = torch.tensor([5,2,3], requires_grad=True, dtype=torch.float32)
b = torch.tensor([4,5,6], requires_grad=True, dtype=torch.float32)
c = torch.tensor([7,8,9], requires_grad=True, dtype=torch.float32)
loss = torch.nn.MSELoss()
test = Test()
# y = a+b
y = test(a)
print(y)
y = torch.max(a,b)
print(y)
loss = loss(y,c)
loss.backward()
print(a.grad)
print(b.grad)