# Model to classify audio data into notes

## 1) Data acquisition

#### Check sound devices availible

In [None]:
import sounddevice as sd
import matplotlib.pyplot as plt

# sd.default.device = "pipewire"
# print(sd.default.device)
print(sd.query_devices(),"\n")
# print(sd.query_hostapis())

device = sd.query_devices(device="pipewire", kind=None)["index"]
print("Chosen device:", device)

In [None]:
samplerates = 16000, 32000, 44100, 48000, 96000, 128000

supported_samplerates = []
for fs in samplerates:
    try:
        sd.check_output_settings(device=device, samplerate=fs)
    except Exception as e:
        print(fs, e)
    else:
        supported_samplerates.append(fs)
print(supported_samplerates)

#### Custom oscilloscope

In [2]:
import scipy as sp
import numpy as np
from matplotlib import pyplot as plt
from scipy.fft import fft
from scipy.signal import stft
from scipy.signal import welch
from scipy.signal import resample
from scipy.signal import square
from scipy.signal import butter, freqs, sosfilt

class oscope:
    def __init__(self, seg, nol, nff):
        self.seg = seg
        self.nol = nol
        self.nff = nff
        
    def setp(self, seg, nol, nff):
        self.seg = seg
        self.nol = nol
        self.nff = nff
        
    def tplot(self, t, sig, N):
        plt.figure(figsize = (6,3))
        plt.plot(t[:N], sig[:N])
        plt.title("Oscillogram")
        plt.xlabel("Time, s")
        plt.ylabel("Value")
        plt.grid()
        plt.show()
        
    def splot(self, f, fts, N):
        plt.figure(figsize = (6,3))
        plt.plot(f[:N], fts[:N], color = 'purple')
        plt.title("Spectrum")
        plt.xlabel("Frequency, Hz")
        plt.ylabel("Amplitude")
        plt.grid()
        plt.show()
    
    def tsplot(self, sig, Fs):
        f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = self.seg, 
                      noverlap = self.nol, nfft = self.nff, scaling = "density")
        fts = np.sqrt(fts*(Fs*2/seg))
        self.splot(f, fts, -1)
        
    def getmax(self, sig, Fs):
        f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = self.seg, 
                      noverlap = self.nol, nfft = self.nff, scaling = "density")
        return f[np.argmax(fts)]

    def getfft(self, sig, Fs):
        f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = self.seg, 
                      noverlap = self.nol, nfft = self.nff, scaling = "density")
        return f, fts

l = 10000

seg = int(l/2)-1 # 4999
nol = 0
nff = None
osc = oscope(seg, nol, nff)

### Noise recorder
Records all the noise files in one go

In [4]:
import pyaudio
import sounddevice as sd
import wave
from scipy.io.wavfile import read
from scipy.signal import spectrogram

import time
import os
from os import listdir
from os.path import isfile, join
from IPython.display import clear_output
import traceback

import numpy as np
import matplotlib.pyplot as plt


device = sd.query_devices(device="pipewire", kind=None)["index"]


FORMAT = pyaudio.paInt16  # 16 bit mono
CHANNELS = 1
RATE = 48000  # discretization frequency
CHUNK_IN = 1024
CHUNK_OUT = 1024
DEVICE_IDS = (device, device)

e = 10**(-10)

def to_wav(path, frames, p, form=FORMAT, ch=CHANNELS, Fs=RATE):
    wf = wave.open(path, "wb")
    wf.setsampwidth(p.get_sample_size(form))
    wf.setnchannels(ch)
    wf.setframerate(Fs)
    wf.writeframes(b''.join(frames))
    wf.close()
    print(f"Saved to {path}")

def record(stream, duration, chunk_in = CHUNK_IN, Fs=RATE):
    frames = []
    stream.start_stream()
    for i in range(int(Fs / chunk_in * duration)):
        data = stream.read(chunk_in)
        frames.append(data)
    stream.stop_stream()
    return frames
    
def play(stream, file_path, chunk_out=CHUNK_OUT):
    wf = wave.open(file_path, 'rb')
    # read data (based on the chunk size)
    data = wf.readframes(chunk_out)
    # play stream (looping from beginning of file to the end)
    stream.start_stream()
    while data:
        # writing to the stream is what *actually* plays the sound.
        stream.write(data)
        data = wf.readframes(chunk_out)

    stream.stop_stream()
    # stream.stop_stream()
    wf.close()
    
def setup_streams(form=FORMAT, chan=CHANNELS, 
                  chunk_in = CHUNK_IN, Fs=RATE,
                  chunk_out=CHUNK_OUT, dev_id = DEVICE_IDS):
    
    p = pyaudio.PyAudio()

    i_stream = p.open(format=form,
                channels=chan,
                rate=Fs,
                input=True,
                frames_per_buffer=chunk_in,
                input_device_index=dev_id[0])
    # open stream based on the wave object which has been input.
    o_stream = p.open(format = form,
                    channels = chan,
                    rate = Fs,
                    output = True,
                    frames_per_buffer=chunk_out,
                    output_device_index=dev_id[1])
    i_stream.stop_stream()
    o_stream.stop_stream()
    return p, i_stream, o_stream

def close_streams(p, i_stream, o_stream):
    i_stream.close()
    o_stream.close()
    p.terminate()
    print("All streams closed")
    
    
def record_data(folder_path, duration=5, n_recordings=100):
    pa, i_str, o_str = setup_streams()
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    try:
        proceed_choice = bool(int(input("Start a recording?")))
        for i in range(n_recordings): 
            time.sleep(1)
            clear_output()
            
            frames = record(i_str, duration)
            label = "0" 
            cur_file_id = max([int(f.split(".")[0].split("_")[1]) 
                               for f in listdir(folder_path) 
                               if f.split("_")[0]==label 
                               and (isfile(join(folder_path, f)))]+[0])+1
            print(cur_file_id)
            file_name = f"{label}_{cur_file_id}.wav"
                
            path = join(folder_path, file_name)
            to_wav(path, frames, pa)
            # print("Saved")
        close_streams(pa, i_str, o_str)
        
    except Exception as err:
        print(err)
        close_streams(pa, i_str, o_str)
        
mypath = "Data/Notes/New/Noise/"
n_recs = 348
duration = 1.5 
record_data(mypath, duration, n_recs)


KeyboardInterrupt: Interrupted by user

Start a recording? 0


### Notes Recorder
Iterative recorder with human interaction needed

In [1]:
import pyaudio
import sounddevice as sd
import wave
from scipy.io.wavfile import read
from scipy.signal import spectrogram

import time
from os import listdir
from os.path import isfile, join
from IPython.display import clear_output
import traceback

import numpy as np
import matplotlib.pyplot as plt


device = sd.query_devices(device="pipewire", kind=None)["index"]

FORMAT = pyaudio.paInt16  # 16 bit mono
CHANNELS = 1
RATE = 48000  # discretization frequency
CHUNK_IN = 1024
CHUNK_OUT = 1024
DEVICE_IDS = (device, device)

e = 10**(-10)

def to_wav(path, frames, p, form=FORMAT, ch=CHANNELS, Fs=RATE):
    wf = wave.open(path, "wb")
    wf.setsampwidth(p.get_sample_size(form))
    wf.setnchannels(ch)
    wf.setframerate(Fs)
    wf.writeframes(b''.join(frames))
    wf.close()
    print(f"Saved to {path}")

def record(stream, duration, chunk_in = CHUNK_IN, Fs=RATE):
    frames = []
    stream.start_stream()
    for i in range(int(Fs / chunk_in * duration)):
        data = stream.read(chunk_in)
        frames.append(data)
    stream.stop_stream()
    return frames

def visualize(file_path, Fs=RATE, os=osc):
    file = read(file_path)
    sequence = file[1]
    os.tsplot(sequence, Fs)
    print(os.getmax(sequence, Fs))
    
def play(stream, file_path, chunk_out=CHUNK_OUT):
    wf = wave.open(file_path, 'rb')
    # read data (based on the chunk size)
    data = wf.readframes(chunk_out)
    # play stream (looping from beginning of file to the end)
    stream.start_stream()
    while data:
        # writing to the stream is what *actually* plays the sound.
        stream.write(data)
        data = wf.readframes(chunk_out)

    stream.stop_stream()
    # stream.stop_stream()
    wf.close()
    
def setup_streams(form=FORMAT, chan=CHANNELS, 
                  chunk_in = CHUNK_IN, Fs=RATE,
                  chunk_out=CHUNK_OUT, dev_id = DEVICE_IDS):
    
    p = pyaudio.PyAudio()

    i_stream = p.open(format=form,
                channels=chan,
                rate=Fs,
                input=True,
                frames_per_buffer=chunk_in,
                input_device_index=dev_id[0])
    
    o_stream = p.open(format = form,
                    channels = chan,
                    rate = Fs,
                    output = True,
                    frames_per_buffer=chunk_out,
                    output_device_index=dev_id[1])
    i_stream.stop_stream()
    o_stream.stop_stream()
    return p, i_stream, o_stream

def close_streams(p, i_stream, o_stream):
    i_stream.close()
    o_stream.close()
    p.terminate()
    print("All streams closed")
    
    
def record_data(folder_path, duration=5):
    pa, i_str, o_str = setup_streams()
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    try:
        while True: 
            proceed_choice = bool(int(input("Start a recording?")))
            time.sleep(1)
            if not proceed_choice:
                break
            
            frames = record(i_str, duration)
            tpath = join(folder_path, "temp.wav")
            to_wav(tpath, frames, pa)
            visualize(tpath)
            
            play(o_str, tpath)
    
            label = input("Enter class label (0-88)")
            
            if (not label.isdigit()) or int(label) < 0:
                clear_output()
                print("Not saved")
                continue
                
            cur_file_id = max([int(f.split(".")[0].split("_")[1]) 
                               for f in listdir(folder_path) 
                               if f.split("_")[0]==label 
                               and (isfile(join(folder_path, f)))]+[0])+1
    
            file_name = f"{label}_{cur_file_id}.wav"
                
            path = join(folder_path, file_name)
            to_wav(path, frames, pa)
            
            clear_output()
            print("Saved")
        close_streams(pa, i_str, o_str)
        
    except Exception as err:
        print(err)
        close_streams(pa, i_str, o_str)
        
mypath = "Data/Test/Temp/"
duration = 1.5 
record_data(mypath, duration)


NameError: name 'osc' is not defined

### Dataset transformation
Split into chunks (50 ms or 2400 values each), apply FFT

In [9]:
from torch.utils.data import Dataset
from sklearn.preprocessing import RobustScaler
import os
from os.path import isfile, join
import shutil
import wave
from scipy.io.wavfile import read
from scipy.signal import welch
import pandas as pd
import numpy as np
import math

class WavRW():
    def __init__(self, path):
        wf = wave.open(path, "rb")
        self.form = wf.getsampwidth()
        self.ch = wf.getnchannels()
        self.Fs = wf.getframerate()
        wf.close()
        
    def save_wav(self, path, frames):
        wf = wave.open(path, "wb")
        wf.setsampwidth(self.form)
        wf.setnchannels(self.ch)
        wf.setframerate(self.Fs)
        wf.writeframes(b''.join(frames))
        wf.close()
        # print(f"Saved to {path}")

class AudioDataset(Dataset):
    def __init__(self, audio_dir, binary=False, recreate = False, rec_len=5000, step=5000, transform=None, target_transform=None):
        self.path = audio_dir
        self.rec_len = rec_len
        self.step = step
        self.audio_dir = audio_dir
        self.d1 = "DataSet/"
        self.d2 = "FFT/"
        self.d3 = "Sequences/"
        self.work_dir = self.audio_dir + self.d1
        self.di_name = "data_info.csv"
        self.binary = binary

        self.make_dirs()
        
        if recreate:
            self.form_dataset()
        else:
            self.load_ds_info()
        # self.scale_data()
        
        self.transform = transform
        self.target_transform = target_transform

    def make_dirs(self):
        new_directories = [self.work_dir, self.work_dir+self.d2, self.work_dir+self.d3]
        for dir_path in new_directories:
            if os.path.exists(dir_path):
                shutil.rmtree(dir_path)
            os.makedirs(dir_path)
    
    # not used  
    def scale_data(self):
        self.scaler = RobustScaler(copy=False)
        self.data_x = self.scaler.fit_transform(data_x)

    def get_fft(self, sig, Fs):
        nps = len(sig)-1 # int(len(sig)/2-1)
        f,fts = welch(sig, window = "boxcar", fs = Fs, nperseg = nps, 
                      noverlap = None, nfft = nps, scaling = "density") # here is the scaling
        return f, fts

    def load_ds_info():
        self.data_info = pd.readcsv(self.work_dir+self.di_name)
        self.n_classes = len(self.data_info["class"].uniques())
        
    def form_dataset(self, tr_r=0.8):
        file_names = os.listdir(self.path)
        file_names = [file_name for file_name in file_names if isfile(self.audio_dir+file_name)]
        self.w_rw = WavRW(self.audio_dir+file_names[0])
        
        if self.binary:
            classes = (0, 1)
        else:
            classes = set([int(file_name.split("_")[0]) for file_name in file_names])
        self.n_classes = len(classes)

        self.data_info = pd.DataFrame(columns=["file_path", "fft_path", "seq_path", "class", "cw_id"])
        
        iterator = 0
        for file_name in file_names:
            print("Processing file", file_name)
            file_path = self.audio_dir+file_name
            wfile = read(file_path)
            sequence = wfile[1]
            l = len(sequence)
            st = 0
            # print(math.floor((l-sample_size)/step))
            y = int(file_name.split("_")[0])
            cw_id = int(file_name.split(".")[0].split("_")[1])
           
            for i in range(math.floor((l-self.rec_len)/self.step)): # step
                subseq = sequence[st:st+self.rec_len]
                _, x = self.get_fft(subseq, self.w_rw.Fs)
                fft_fpath = f"{self.work_dir}{self.d2}{iterator}.txt" 
                np.savetxt(fft_fpath, x) # np.array(subs).astype(np.double)
                
                seq_fpath = f"{self.work_dir}{self.d3}{iterator}.wav" 
                self.w_rw.save_wav(seq_fpath, subseq)
                
                self.data_info.loc[iterator] = (file_path, fft_fpath, seq_fpath, y, cw_id)
                st += self.step
                iterator += 1
                
            di_fpath = self.work_dir+self.di_name
            self.data_info.to_csv(di_fpath, index=True)  

        self.fft_len = len(x) #int(self.rec_len/4)
        return self.data_info
    
    def __len__(self):
        return len(self.data_info)

    def __getitem__(self, idx):
        row = self.data_info.loc[idx]
        x = np.loadtxt(row["fft_path"])
        
        if self.binary:
            y = row["class"] > 1
            return x, y
        else:
            y_ohe = np.zeros(self.n_classes)
            y_ohe[row["class"]] = 1
        
        return x, y_ohe

In [None]:
data_path="./Data/Notes/Noise/"
r_len = 2400
step =  r_len # int(r_len/2)
# sample_rate = 48000
dsb = AudioDataset(data_path, rec_len=r_len, step=r_len, binary=True, recreate=True)

In [None]:
data_path="./Data/Notes/FullX4/"
r_len = 2400
step =  r_len # int(r_len/2)
# sample_rate = 48000
ds = AudioDataset(data_path, rec_len=r_len, step=r_len, binary=False, recreate=True)

## Classifier and training procedure

Learning is presented through cross-validation

As for classifiers architecture - it's just a simple MLP (with some tunings)

In [11]:
import json
import os

class ExperimentsLogger:
    def __init__(self, models_path):
        self.models_path = models_path
        self.cur_id = 0
        self.read_experiments()
        
    def read_experiments(self):
        if not os.path.exists(self.models_path):
            os.makedirs(self.models_path)
        
        try:
            with open(self.models_path+"model_experiments.json", "r") as file:
                self.experiments = json.load(file)
                self.cur_id = int(list(self.experiments.keys())[-1]) + 1
                print("Current experiment:", self.cur_id)
                
        except:
            self.experiments = {}
            with open(self.models_path+"model_experiments.json", "w") as file:
                json.dump(self.experiments, file)
                print("No experiments found, creating the new log file")

    def write_experiment(self, exp_info, save=True):
        score = exp_info["score"]["metric"]["values"]
        model_name = exp_info["model_info"]["name"]
        model_path = exp_info["model_info"]["path"]
        description = exp_info["description"]

        final_score = np.mean(score)
        self.experiments[self.cur_id] = {"M":model_name, "S": str(final_score),  "D":description, "P":model_path, "E":exp_info}
        
        print(f"\nAvg cross-val metric: {np.mean(score)}, STD: {np.std(score)}")
        if save:
         with open(self.models_path+"model_experiments.json", "w") as file:
             json.dump(self.experiments, file)
             print("Experiment was written successfully")
        self.cur_id+=1
        
    def describe_experiments(self):
        to_display = ["P", "S", "D"]
        print([exp[to_display] for exp in self.experiments.values()], sep="\n")

In [12]:
import torch
print("Availible GPUs:")
for i in range(torch.cuda.device_count()):
    print(i, torch.cuda.get_device_properties(i).name)
print()

device = ""
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")
    
print(f"Using {torch.cuda.get_device_name(device)}")

Availible GPUs:
0 NVIDIA GeForce GTX 1050 Ti

Using NVIDIA GeForce GTX 1050 Ti


### 1. Noise classifier
To determine if a note is really playing atm

In [13]:
models_path = "./Models/Noise_BClassifiers/"
elb = ExperimentsLogger(models_path)

Current experiment: 2


In [14]:
from sklearn.model_selection import KFold
import torch
from torcheval.metrics import BinaryAccuracy
from torch import nn

rs = 42
torch.manual_seed(rs)
np.random.seed(rs)

class Classifier(nn.Module):
    def __init__(self, n_input):
        self.n_neurons = 256
        super().__init__()
        self.model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(n_input, self.n_neurons),# new 128 32
            nn.Tanh(),
            nn.BatchNorm1d(self.n_neurons),
            nn.Linear(self.n_neurons, 1),
            nn.Dropout(p=0.2),
            nn.Sigmoid())
        # self.model.apply(self.init_weights)
        
    def forward(self, x):
        output = self.model(x)
        return output

    def init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_uniform_(m.weight)
            m.bias.data.fill_(0.01)

    # def validate(self, validation_set, batch_size, loss_function):
    #     #  creating a list to hold loss per batch
    #     loss_per_batch = []
        
    #     #  defining model state
    #     network.eval()
        
    #     #  defining dataloader
    #     val_loader = DataLoader(validation_set, batch_size)

k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True)

batch_size = 128
batch_n = int(dsb.__len__()/batch_size)

lr = 0.001
num_epochs = 50 # 160 80 # 20
loss_function = nn.BCELoss()
tr_metric = BinaryAccuracy()
val_metric = BinaryAccuracy()
mclist = []
losslist = []


for fold, (train_ids, test_ids) in enumerate(kfold.split(dsb)):
    torch.cuda.device(1)
    # Print
    print(f'\nFOLD {fold}')
    print('--------------------------------')
    
    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    
    # Define data loaders for training and testing data in this fold
    trainloader = torch.utils.data.DataLoader(
                      dsb, 
                      batch_size=batch_size, sampler=train_subsampler, num_workers=2, pin_memory=True)
    testloader = torch.utils.data.DataLoader(
                      dsb,
                      batch_size=batch_size, sampler=test_subsampler, num_workers=2, pin_memory=True)

    clf = Classifier(dsb.fft_len).cuda()
    optimizer = torch.optim.Adam(clf.parameters(), lr=lr)
    torch.compile(clf)
    for epoch in range(num_epochs):
        clf.train()
        tr_metric.reset()
        val_metric.reset()
        for n, (x, y) in enumerate(trainloader):
            optimizer.zero_grad(set_to_none=True)
        
            x = x[:,None,:]
            y = y[:,None]
            x = x.to(torch.float32).cuda()
            y = y.to(torch.float32).cuda()
            
            prediction = clf(x) 
            loss_classifier = loss_function(prediction, y)
            loss_classifier.backward()
            optimizer.step()
            tr_metric.update(prediction.flatten(), y.flatten())
            
        clf.eval()
        for n, (x, y) in enumerate(testloader):
            with torch.no_grad():
                x = x[:,None,:]
                y = y[:,None]
                x = x.to(torch.float32).cuda()
                y = y.to(torch.float32).cuda()
                
                prediction = clf(x)
                val_metric.update(prediction.flatten(), y.flatten())
                # print(torch.sum(torch.abs((prediction - y.cuda())))/len(prediction.flatten()))
                # metric.update(prediction, y)
                # print((prediction).argmax(axis=1)-y.cuda().argmax(axis=1))
        mcv = val_metric.compute()
         
        if epoch % 5 == 0 or epoch == (num_epochs-1):
            mct = tr_metric.compute()
            print(f"Epoch: {epoch} Loss clf: {loss_classifier} Metrics: ({mct}, {mcv})")
    
    mclist.append(float(mcv))
    losslist.append(float(loss_classifier))
    


FOLD 0
--------------------------------
Epoch: 0 Loss clf: 0.4033673405647278 Metrics: (0.7546225786209106, 0.8649949431419373)


Exception in thread Thread-11 (_pin_memory_loop):
Traceback (most recent call last):
  File "/home/mephodius/.pyenv/versions/3.10.6/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/mephodius/Programming/PythonProjects/DataScience/Pytorch/pytorch/lib/python3.10/site-packages/ipykernel/ipkernel.py", line 766, in run_closure
    _threading_Thread_run(self)
  File "/home/mephodius/.pyenv/versions/3.10.6/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mephodius/Programming/PythonProjects/DataScience/Pytorch/pytorch/lib/python3.10/site-packages/torch/utils/data/_utils/pin_memory.py", line 59, in _pin_memory_loop
    do_one_step()
  File "/home/mephodius/Programming/PythonProjects/DataScience/Pytorch/pytorch/lib/python3.10/site-packages/torch/utils/data/_utils/pin_memory.py", line 35, in do_one_step
    r = in_queue.get(timeout=MP_STATUS_CHECK_INTERVAL)
  File "/home/mephodius/.pyenv/versions/3

KeyboardInterrupt: 

In [83]:
# saving the model and experiment results

metric_name = "bin-acc"
loss_name = "bin-cross-entropy"
model_name = "nn"
description = f"fft_len {dsb.fft_len}, batch {batch_size}, {clf.n_neurons} neurons, batch_norm before nonlinearity"


model_path = f"{models_path}{model_name}_{metric_name}_{mclist[-1]:.4f}.pt"
model_scripted = torch.jit.script(clf) # Export to TorchScript
model_scripted.save(model_path) # Save

print(f"Saved as {model_path}")

exp_info = {"score":{"metric":{"name":metric_name, "values":mclist}, "loss":{"name":loss_name, "values":losslist}},
            "data": {"fs": dsb.w_rw.Fs, "fft_len":dsb.fft_len, "duration":(dsb.fft_len*2)/dsb.w_rw.Fs},
            "learning":{"n_epochs":num_epochs, "n_batches":batch_size, "learning_rate":lr, "folds":k_folds},
            "model_info":{"name":model_name, "path":model_path},
            "description": description}

print(exp_info)
elb.write_experiment(exp_info)

Saved as ./Models/Noise_BClassifiers/nn_bin-acc_0.9658.pt
{'score': {'metric': {'name': 'bin-acc', 'values': [0.9734042286872864, 0.9726443886756897, 0.9721378087997437, 0.9658054709434509, 0.9658054709434509]}, 'loss': {'name': 'bin-cross-entropy', 'values': [0.2739558517932892, 0.2659973204135895, 0.17677325010299683, 0.18252500891685486, 0.11571940034627914]}}, 'data': {'fs': 48000, 'fft_len': 1200, 'duration': 0.05}, 'learning': {'n_epochs': 50, 'n_batches': 128, 'learning_rate': 0.001, 'folds': 5}, 'model_info': {'name': 'nn', 'path': './Models/Noise_BClassifiers/nn_bin-acc_0.9658.pt'}, 'description': 'fft_len 1200, batch 128, 256 neurons, batch_norm before nonlinearity'}

Avg cross-val metric: 0.9699594736099243, STD: 0.0034156032251632185
Experiment was written successfully


#### Get description of the best experiments so far

In [15]:
n_exp = 3
print(*sorted(elb.experiments.items(), key=lambda x: x[1]['S'], reverse=True)[:n_exp], sep="\n")

('0', {'M': 'nn', 'S': '0.9733029484748841', 'D': 'fft_len 1200, batch 32, 256 neurons, batch_norm before nonlinearity', 'P': './Models/Noise_BClassifiers/nn_bin-acc_0.9749.pt', 'E': {'score': {'metric': {'name': 'bin-acc', 'values': [0.968845009803772, 0.9739108681678772, 0.9759371876716614, 0.9728976488113403, 0.9749240279197693]}, 'loss': {'name': 'bin-cross-entropy', 'values': [0.22502265870571136, 0.2225693166255951, 0.13281896710395813, 0.30771803855895996, 0.17659813165664673]}}, 'data': {'fs': 48000, 'fft_len': 1200, 'duration': 0.05}, 'learning': {'n_epochs': 50, 'n_batches': 32, 'learning_rate': 0.001, 'folds': 5}, 'model_info': {'name': 'nn', 'path': './Models/Noise_BClassifiers/nn_bin-acc_0.9749.pt'}, 'description': 'fft_len 1200, batch 32, 256 neurons, batch_norm before nonlinearity'}})
('1', {'M': 'nn', 'S': '0.9699594736099243', 'D': 'fft_len 1200, batch 128, 256 neurons, batch_norm before nonlinearity', 'P': './Models/Noise_BClassifiers/nn_bin-acc_0.9658.pt', 'E': {'sco

### Note classifier
Just classifies a chunk of audio as one of the notes presented (88+1)

In [17]:
models_path = "./Models/Note_Classifiers/"
el = ExperimentsLogger(models_path)

Current experiment: 8


In [18]:
from sklearn.model_selection import KFold
import torch
from torcheval.metrics import MulticlassAccuracy
from torch import nn

rs = 42
torch.manual_seed(rs)
np.random.seed(rs)

class Classifier(nn.Module):
    def __init__(self, n_input, n_output):
        self.n_neurons = 256
        super().__init__()
        self.model = nn.Sequential(
            nn.Flatten(),
            nn.Linear(n_input, self.n_neurons), # new 128 32
            nn.Tanh(),
            nn.BatchNorm1d(self.n_neurons),
            # nn.Linear(512, 256), # new 128 32
            # nn.Tanh(),
            # nn.BatchNorm1d(256),
            # nn.Linear(128, 128),
            # nn.Tanh(),
            nn.Linear(self.n_neurons, n_output),
            nn.Softmax(dim=1))
        # self.model.apply(self.init_weights)
        
    def forward(self, x):
        output = self.model(x)
        return output

    def init_weights(self, m):
        if isinstance(m, nn.Linear):
            torch.nn.init.xavier_uniform_(m.weight)
            m.bias.data.fill_(0.01)

    # def validate(self, validation_set, batch_size, loss_function):
    #     #  creating a list to hold loss per batch
    #     loss_per_batch = []
        
    #     #  defining model state
    #     network.eval()
        
    #     #  defining dataloader
    #     val_loader = DataLoader(validation_set, batch_size)

device = ""
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print(device)


k_folds = 5
kfold = KFold(n_splits=k_folds, shuffle=True)

batch_size = 64
batch_n = int(ds.__len__()/batch_size)

lr = 0.001
num_epochs = 200 # 160 80 # 20
loss_function = nn.CrossEntropyLoss()
tr_metric = MulticlassAccuracy()
val_metric = MulticlassAccuracy()
mclist = []
losslist = []


for fold, (train_ids, test_ids) in enumerate(kfold.split(ds)):
    
    # Print
    print(f'\nFOLD {fold}')
    print('--------------------------------')
    
    # Sample elements randomly from a given list of ids, no replacement.
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_ids)
    test_subsampler = torch.utils.data.SubsetRandomSampler(test_ids)
    
    # Define data loaders for training and testing data in this fold
    trainloader = torch.utils.data.DataLoader(
                      ds, 
                      batch_size=batch_size, sampler=train_subsampler, pin_memory=True)
    testloader = torch.utils.data.DataLoader(
                      ds,
                      batch_size=batch_size, sampler=test_subsampler, pin_memory=True)

    clf = Classifier(ds.fft_len, ds.n_classes).to(device=device)
    optimizer = torch.optim.Adam(clf.parameters(), lr=lr)
    
    for epoch in range(num_epochs):
        clf.train()
        tr_metric.reset()
        val_metric.reset()
        for n, (x, y) in enumerate(trainloader):
            
            optimizer.zero_grad()
            x = x[:,None,:]
            prediction = clf(x.to(torch.float32).cuda()) # .to(torch.float)
            loss_classifier = loss_function(prediction, y.cuda())
            loss_classifier.backward()
            optimizer.step()

            tr_metric.update(prediction, y.argmax(axis=1))

        clf.eval()
        for n, (x, y) in enumerate(testloader):
            x = x[:,None,:]
            prediction = clf(x.to(torch.float32).cuda())
            val_metric.update(prediction, y.argmax(axis=1))
        mcv = val_metric.compute()
        
        if epoch % 5 == 0 or epoch == (num_epochs-1):
            mct = tr_metric.compute()
            print(f"Epoch: {epoch} Loss clf: {loss_classifier} Metrics: ({mct}, {mcv})")
    
    mclist.append(float(mcv))
    losslist.append(float(loss_classifier))
    

cuda

FOLD 0
--------------------------------
Epoch: 0 Loss clf: 4.06616751352946 Metrics: (0.4889945089817047, 0.628000020980835)
Epoch: 5 Loss clf: 3.9588475505510967 Metrics: (0.6700850129127502, 0.6775000095367432)
Epoch: 10 Loss clf: 3.8223320603370667 Metrics: (0.6967233419418335, 0.6940000057220459)
Epoch: 15 Loss clf: 3.8033057411511737 Metrics: (0.7134817242622375, 0.7055000066757202)
Epoch: 20 Loss clf: 3.7157182455062867 Metrics: (0.7216107845306396, 0.7099999785423279)


KeyboardInterrupt: 

In [5]:
# model_name, score, params, save=True
# mclist = [float(mc) for mc in mclist]
# losslist = [float(loss) for loss in losslist]

metric_name = "mult-acc"
loss_name = "cross-entropy"
model_name = "nn"
description = f"fft_len {ds.fft_len}, batch {batch_size}, {clf.n_neurons} neurons, batch_norm before nonlinearity"


model_path = f"{models_path}{model_name}_{metric_name}_{mclist[-1]:.4f}.pt"
model_scripted = torch.jit.script(clf) # Export to TorchScript
model_scripted.save(model_path) # Save

print(f"Saved as {model_path}")

exp_info = {"score":{"metric":{"name":metric_name, "values":mclist}, "loss":{"name":loss_name, "values":losslist}},
            "data": {"fs": ds.w_rw.Fs, "fft_len":ds.fft_len, "duration":(ds.fft_len*2)/ds.w_rw.Fs},
            "learning":{"n_epochs":num_epochs, "n_batches":batch_size, "learning_rate":lr, "folds":k_folds},
            "model_info":{"name":model_name, "path":model_path},
            "description": description}

print(exp_info)
el.write_experiment(exp_info)

Saved as ./Models/nn_mult-acc_0.7609.pt
{'score': {'metric': {'name': 'mult-acc', 'values': [0.7860000133514404, 0.7418709397315979, 0.7633817195892334, 0.7623811960220337, 0.7608804106712341]}, 'loss': {'name': 'cross-entropy', 'values': [3.663506273428599, 3.738224451659156, 3.6014773024887337, 3.7023925155889796, 3.787837989994737]}}, 'data': {'fs': 48000, 'fft_len': 1200, 'duration': 0.05}, 'learning': {'n_epochs': 200, 'n_batches': 64, 'learning_rate': 0.001, 'folds': 5}, 'model_info': {'name': 'nn', 'path': './Models/nn_mult-acc_0.7609.pt'}, 'description': 'fft_len 1200, batch 64, 256 neurons, batch_norm before nonlinearity'}

Avg cross-val metric: 0.7629028558731079, STD: 0.01400294186387866
Experiment was written successfully


#### Get description of the best experiments so far

In [25]:
n_exp = 3
print(*sorted(el.experiments.items(), key=lambda x: x[1]['S'], reverse=True)[:n_exp], sep="\n")

('5', {'M': 'nn', 'S': '0.7741090059280396', 'D': 'fft_len 1200, batch 32, 64 neurons, batch_norm', 'P': './Models/nn_mult-acc_0.7729.pt', 'E': {'score': {'metric': {'name': 'mult-acc', 'values': [0.7804999947547913, 0.756378173828125, 0.7848924398422241, 0.7758879661560059, 0.7728864550590515]}, 'loss': {'name': 'cross-entropy', 'values': [3.699983750070844, 3.8210244918691703, 3.848136104386428, 3.714581719760237, 3.680334946204876]}}, 'data': {'fs': 48000, 'fft_len': 1200, 'duration': 0.05}, 'learning': {'n_epochs': 200, 'n_batches': 32, 'learning_rate': 0.001, 'folds': 5}, 'model_info': {'name': 'nn', 'path': './Models/nn_mult-acc_0.7729.pt'}, 'description': 'fft_len 1200, batch 32, 64 neurons, batch_norm'}})
('3', {'M': 'nn', 'S': '0.7697069525718689', 'D': 'fft_len 1200, batch 32, 256 neurons, batch_norm', 'P': './Models/nn_mult-acc_0.7784.pt', 'E': {'score': {'metric': {'name': 'mult-acc', 'values': [0.7789999842643738, 0.7483741641044617, 0.780390202999115, 0.7623811960220337, 

## Test IRL

In [2]:
import torch

r_len = 2400

bmodel_path = 'Models/Noise_BClassifiers/nn_bin-acc_0.9749.pt'
bmodel = torch.jit.load(bmodel_path).cuda()
bmodel.eval()

model_path = 'Models/Note_Classifiers/nn_mult-acc_0.7784.pt'

model = torch.jit.load(model_path).cuda()
model.eval()

RecursiveScriptModule(
  original_name=Classifier
  (model): RecursiveScriptModule(
    original_name=Sequential
    (0): RecursiveScriptModule(original_name=Flatten)
    (1): RecursiveScriptModule(original_name=Linear)
    (2): RecursiveScriptModule(original_name=Tanh)
    (3): RecursiveScriptModule(original_name=BatchNorm1d)
    (4): RecursiveScriptModule(original_name=Linear)
    (5): RecursiveScriptModule(original_name=Softmax)
  )
)

### Live-time testing

In [21]:
from pynput import keyboard

import pyaudio
import sounddevice as sd
import wave
from scipy.io.wavfile import read
from scipy.signal import spectrogram
from scipy.signal import welch

import time
from os import listdir
from os.path import isfile, join
from IPython.display import clear_output
import traceback

import numpy as np
import matplotlib.pyplot as plt

float_formatter = "{:.2f}".format
np.set_printoptions(formatter={'float_kind':float_formatter})

device = sd.query_devices(device="pipewire", kind=None)["index"]

FORMAT = pyaudio.paInt16  # Формат звука (16 бит, стерео)
CHANNELS = 1
RATE = 48000  # Частота дискретизации
CHUNK_IN = r_len
DEVICE_IDS = (device, device)

e = 10**(-10)

def setup_streams(form=FORMAT, chan=CHANNELS, 
                  chunk_in=CHUNK_IN, Fs=RATE,
                dev_id = DEVICE_IDS):
    
    p = pyaudio.PyAudio()

    i_stream = p.open(format=form,
                channels=chan,
                rate=Fs,
                input=True,
                frames_per_buffer=chunk_in,
                input_device_index=dev_id[0])
    i_stream.stop_stream()
    return p, i_stream

def close_streams(p, i_stream):
    i_stream.close()
    p.terminate()
    print("All streams closed")

def getfft(sig, Fs):
    nps = len(sig)-1
    f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = nps, 
                  noverlap = 0, nfft = nps, scaling = "density")
    return f, fts

def predict(sequence, thresh, Fs=RATE):
    sequence = np.frombuffer(sequence, dtype=np.int16)
    
    _, x = getfft(sequence, Fs)
    x = torch.from_numpy(x[None, None,:])
    x = x.to(torch.float32).cuda()
    
    note = 0
    nprob = 1
    prediction = torch.zeros([89])
    prediction[0] = 1
    
    with torch.no_grad():
        isnote = bmodel(x)
        if isnote > thresh:
            prediction = model(x)
            # print(prediction)
            note = np.array(prediction.cpu()).argmax(axis=1).astype(int)[0]
            nprob = float(prediction[0][note])
    return note, nprob, prediction

proceed = True

def on_press(key):
    global proceed
    if key == keyboard.Key.esc:
        proceed = False

def listen_to_music(thresh1=0.7, thresh2=0, chunk_in=CHUNK_IN):
    global proceed
    n_top = 3
    try:
        pa, i_str = setup_streams()
        listener = keyboard.Listener(on_press=on_press)
        listener.start()
        # listener.join()
        model.eval()
        bmodel.eval()
        note = 0
        prob = 1
        proceed_choice = input("Press Enter to start recording...")
        i_str.start_stream()
        while proceed:  
            frame = i_str.read(chunk_in)
            results = predict(frame, thresh1)
            if results[1] > thresh2:
                note, prob, prediction = results
            clear_output(wait=True)
            key = str(note)
            key = (2-len(key))*" "+key
            print(f"Key {key} plays with prob {prob:.2f}, top {n_top} probs", 
                  np.array(torch.topk(prediction.flatten(), n_top).values.cpu()))  
        close_streams(pa, i_str)
    except Exception as err:
        print(traceback.format_exc())
        print(err)
        close_streams(pa, i_str)
        
listen_to_music()

Key  0 plays with prob 1.00, top 3 probs [1.00 0.00 0.00]
All streams closed


### Iterative testing

In [None]:
# custom oscilloscope (yep, again)

import scipy as sp
import numpy as np
from matplotlib import pyplot as plt
from scipy.fft import fft
from scipy.signal import stft
from scipy.signal import welch
from scipy.signal import resample
from scipy.signal import square
from scipy.signal import butter, freqs, sosfilt

class oscope:
    def __init__(self, seg, nol, nff):
        self.seg = seg
        self.nol = nol
        self.nff = nff
        
    def setp(self, seg, nol, nff):
        self.seg = seg
        self.nol = nol
        self.nff = nff
        
    def tplot(self, t, sig, N):
        plt.figure(figsize = (6,3))
        plt.plot(t[:N], sig[:N])
        plt.title("Oscillogram")
        plt.xlabel("Time, s")
        plt.ylabel("Value")
        plt.grid()
        plt.show()
        
    def splot(self, f, fts, N):
        plt.figure(figsize = (6,3))
        plt.plot(f[:N], fts[:N], color = 'purple')
        plt.title("Spectrum")
        plt.xlabel("Frequency, Hz")
        plt.ylabel("Amplitude")
        plt.grid()
        plt.show()
    
    def tsplot(self, sig, Fs):
        f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = self.seg, 
                      noverlap = self.nol, nfft = self.nff, scaling = "density")
        fts = np.sqrt(fts*(Fs*2/seg))
        self.splot(f, fts, -1)
        
    def getmax(self, sig, Fs):
        f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = self.seg, 
                      noverlap = self.nol, nfft = self.nff, scaling = "density")
        return f[np.argmax(fts)]

    def getfft(self, sig, Fs):
        f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = self.seg, 
                      noverlap = self.nol, nfft = self.nff, scaling = "density")
        return f, fts

l = 10000

seg = int(l/2)-1 # 4999
nol = 0
nff = None
osc = oscope(seg, nol, nff)
# l = 10000
# shift = 20000
# osc.tsplot(sequence[shift:shift+l], sample_rate)
# osc.getmax(sequence[shift:shift+l], sample_rate)

In [22]:
import pyaudio
import sounddevice as sd
import wave
from scipy.io.wavfile import read
from scipy.signal import spectrogram

import time
from os import listdir
from os.path import isfile, join
from IPython.display import clear_output
import traceback

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


device = sd.query_devices(device="pipewire", kind=None)["index"]

FORMAT = pyaudio.paInt16  # Формат звука (16 бит, стерео)
CHANNELS = 1
RATE = 48000  # Частота дискретизации
CHUNK_IN = 512
CHUNK_OUT = r_len
DEVICE_IDS = (device, device)

e = 10**(-10)

def to_wav(path, frames, p, form=FORMAT, ch=CHANNELS, Fs=RATE):
    wf = wave.open(path, "wb")
    wf.setsampwidth(p.get_sample_size(form))
    wf.setnchannels(ch)
    wf.setframerate(Fs)
    wf.writeframes(b''.join(frames))
    wf.close()
    print(f"Saved to {path}")

def record(stream, duration, chunk_in = CHUNK_IN, Fs=RATE):
    
    frames = []
    stream.start_stream()
    for i in range(int(Fs / chunk_in * duration)):
        data = stream.read(chunk_in)
        frames.append(data)
    stream.stop_stream()
    # stream.close()
    # print(len(frames))
    return frames

def visualize(file_path, Fs=RATE):
    file = read(file_path)
    sequence = file[1]

    f, t, Sxx = spectrogram(sequence, Fs)
    plt.pcolormesh(t, f, np.log(Sxx+e), shading='gouraud')
    plt.ylabel('Frequency [Hz]')
    plt.xlabel('Time [sec]')
    plt.show()

def visualize(file_path, Fs=RATE, os=osc):
    file = read(file_path)
    sequence = file[1]
    os.tsplot(sequence, Fs)
    print(os.getmax(sequence, Fs))
    
def play(stream, file_path, chunk_out=CHUNK_OUT):
    wf = wave.open(file_path, 'rb')
    # read data (based on the chunk size)
    data = wf.readframes(chunk_out)
    # play stream (looping from beginning of file to the end)
    stream.start_stream()
    while data:
        # writing to the stream is what *actually* plays the sound.
        stream.write(data)
        data = wf.readframes(chunk_out)

    stream.stop_stream()
    # stream.stop_stream()
    wf.close()
    # stream.close()    
    # p.terminate()

def setup_streams(form=FORMAT, chan=CHANNELS, 
                  chunk_in = CHUNK_IN, Fs=RATE,
                  chunk_out=CHUNK_OUT, dev_id = DEVICE_IDS):
    
    p = pyaudio.PyAudio()

    i_stream = p.open(format=form,
                channels=chan,
                rate=Fs,
                input=True,
                frames_per_buffer=chunk_in,
                input_device_index=dev_id[0])
    # open stream based on the wave object which has been input.
    # print(chan, dev_id[0], dev_id[1])
    o_stream = p.open(format = form,
                    channels = chan,
                    rate = Fs,
                    output = True,
                    frames_per_buffer=chunk_out,
                    output_device_index=dev_id[1])
    i_stream.stop_stream()
    o_stream.stop_stream()
    return p, i_stream, o_stream

def close_streams(p, i_stream, o_stream):
    i_stream.close()
    o_stream.close()
    p.terminate()
    print("All streams closed")

def getfft(sig, Fs):
    nps = len(sig)-1
    f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = nps, 
                  noverlap = 0, nfft = nps, scaling = "density")
    return f, fts

def predict(file_path, chunk_out=r_len, Fs=RATE):

    wfile = read(file_path)
    sequence = wfile[1]
    result_df = pd.DataFrame(columns=["time", "note", "prob"])
    for chunk in range(int(len(sequence)/chunk_out)):
        subseq=np.array(sequence[chunk*chunk_out:(chunk+1)*chunk_out])
        # print(len(subseq),subseq)
        _, x = getfft(subseq, Fs)
        x = torch.from_numpy(x[None, None,:])
        # print(x.shape)
        with torch.no_grad():
            model.eval()
            prediction = model(x.to(torch.float32).cuda())
            note = np.array(prediction.cpu()).argmax(axis=1).astype(int)[0]
            nprob = float(prediction.cpu()[0][note])
            result_df.loc[len(result_df)] = (chunk*chunk_out/Fs, note, nprob)
    print(result_df)
  
def record_data(folder_path, duration=5):
    pa, i_str, o_str = setup_streams()
    try:
        while True: 
            proceed_choice = bool(int(input("Start a recording?")))
            clear_output()
            # time.sleep(1)
            if not proceed_choice:
                break
            
            frames = record(i_str, duration)
            # print("Wow")
            tpath = join(folder_path, "temp.wav")
            to_wav(tpath, frames, pa)
            visualize(tpath)
            
            # play_choice = bool(int(input("Play an audio?")))
            # if play_choice:
            predict(tpath)
            
        close_streams(pa, i_str, o_str)
        
    except Exception as err:
        print(traceback.format_exc())
        print(err)
        close_streams(pa, i_str, o_str)
        
mypath = "./Data/Test/New/"

record_data(mypath, 1.5)


All streams closed


### URL File Testing

In [17]:
import pyaudio
import sounddevice as sd
import wave
from scipy.io.wavfile import read
from scipy.signal import spectrogram
from scipy.signal import welch

import time
from os import listdir
from os.path import isfile, join
from IPython.display import clear_output
import traceback

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import yt_dlp

file_path = 'temp'

# full options list https://github.com/ytdl-org/youtube-dl/blob/master/youtube_dl/YoutubeDL.py#L128-L278
# shorten version https://stackoverflow.com/questions/38658046/how-can-i-find-all-ydl-opts

ydl_opts = {
    # 'verbose': False,
    'quiet': True,
    'format': 'bestaudio/best',
    'outtmpl': file_path,
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'wav',
    }],
}


e = 10**(-10)
    
def getfft(sig, Fs):
    nps = len(sig)-1
    f,fts = welch(sig, window = 'boxcar', fs = Fs, nperseg = nps, 
                  noverlap = 0, nfft = nps, scaling = "density")
    return f, fts

def predict(file_path, chunk_out=r_len):

    wf = wave.open(file_path, "rb")
    Fs = wf.getframerate()
    wf.close()
        
    wfile = read(file_path)
    sequence = wfile[1]
    result_df = pd.DataFrame(columns=["time", "note", "prob"])
    for chunk in range(int(len(sequence)/chunk_out)):
        subseq=np.array(sequence[chunk*chunk_out:(chunk+1)*chunk_out])
        subseq=subseq[:,0] # first channel
        # print(len(subseq),subseq)
        _, x = getfft(subseq, Fs)
        # print(x.shape)
        x = torch.from_numpy(x[None, None,:])
        # print(x.shape)
        with torch.no_grad():
            model.eval()
            # print(x.to(torch.float32).shape) 
            prediction = model(x.to(torch.float32).cuda())
            note = np.array(prediction.cpu()).argmax(axis=1).astype(int)[0]
            nprob = float(prediction.cpu()[0][note])
            result_df.loc[len(result_df)] = (chunk*chunk_out/Fs, note, nprob)
    return result_df

def download(url):
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download(url)
    
    
def url_predict():
    try:
        while True: 
            url = input("Paste url here")
            clear_output()
            if url == "":
                break
                
            print("Download in progress...")
            download(url)
            print("Done")
            tpath = file_path+".wav"
            result = predict(tpath)
            result.to_csv('predictions.csv', index=False)
            
        # close_streams(pa, i_str, o_str)
        
    except Exception as err:
        print(traceback.format_exc())
        print(err)
        # close_streams(pa, i_str, o_str)

# url = "https://youtu.be/2HZnTNIN648?si=TVziVpuXmDIF3cfu"

url_predict()


#### The end! Hurraaaay

### Some code testing

In [18]:
%pip install pysynth

Collecting pysynth
  Downloading pysynth-0.0.4-py3-none-any.whl.metadata (3.1 kB)
Downloading pysynth-0.0.4-py3-none-any.whl (14 kB)
Installing collected packages: pysynth
Successfully installed pysynth-0.0.4

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49m/home/mephodius/Programming/PythonProjects/DataScience/Pytorch/pytorch/bin/python -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [7]:
# import youtube_dl
import yt_dlp
import ffmpeg
import sys

ydl_opts = {
    'format': 'bestaudio/best',
    'outtmpl': 'temp',
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'wav',
    }],
}

# def download_from_url(url):
#     ydl.download(url)
#     stream = ffmpeg.input('output.m4a')
#     stream = ffmpeg.output(stream, 'output.wav')


# with youtube_dl.YoutubeDL(ydl_opts) as ydl:
#     ydl.download([url])

url = "https://youtu.be/2HZnTNIN648?si=TVziVpuXmDIF3cfu"
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
    ydl.download(url)
    # download_from_url(url)


[youtube] Extracting URL: https://youtu.be/2HZnTNIN648?si=TVziVpuXmDIF3cfu
[youtube] 2HZnTNIN648: Downloading webpage
[youtube] 2HZnTNIN648: Downloading tv client config
[youtube] 2HZnTNIN648: Downloading player b191cf34
[youtube] 2HZnTNIN648: Downloading tv player API JSON
[youtube] 2HZnTNIN648: Downloading ios player API JSON
[youtube] 2HZnTNIN648: Downloading m3u8 information
[info] 2HZnTNIN648: Downloading 1 format(s): 251
[download] Destination: temp
[download] 100% of    1.31MiB in 00:00:00 at 4.04MiB/s   
[ExtractAudio] Destination: temp.wav
Deleting original file temp (pass -k to keep)


In [19]:
import pysynth
test = ( ('c', 4), ('e', 4), ('g', 4), ('c5', 1) )
pysynth.make_wav(test, fn = "test.wav")

AttributeError: module 'pysynth' has no attribute 'make_wav'