From https://datascience.stackexchange.com/questions/55566/tool-for-labeling-audio

In [None]:
import pandas
from pathlib import Path
import numpy as np
from pydub import AudioSegment

import simple_model_feature
dataset_path = Path('datasets')

dataset_id = 1
chunk_length = 10 # seconds
chunk_move_step = 10 # seconds

path_labels = dataset_path / f'{dataset_id:02}.txt'
path_mp3 = dataset_path / f'{dataset_id:02}.mp3'
chunk_path = dataset_path / f'{dataset_id:02}_split'
labels = pandas.read_csv(path_labels, sep='\t', header=None,
                        names=['start', 'end', 'annotation'],
                        dtype=dict(start=float,end=float,annotation=str))
labels.head()

In [None]:
# load model
from simple_model import load_trained_model

model = load_trained_model('model_saves/weights.best.basic_cnn.hdf5')
model.summary()

In [None]:
# test with validation dataset
import pandas as pd
featuresdf = pd.read_pickle('data_chunks_df.pickle')

from sklearn.preprocessing import LabelEncoder
from keras.utils import to_categorical

# Convert features and corresponding classification labels into numpy arrays
X = np.array(featuresdf.feature.tolist())
y = np.array(featuresdf.class_label.tolist())

# Encode the classification labels
le = LabelEncoder()
yy = to_categorical(le.fit_transform(y)) 

# split the dataset 
from sklearn.model_selection import train_test_split 

x_train, x_test, y_train, y_test = train_test_split(X, yy, test_size=0.2, random_state = 42)

score = model.evaluate(x_test, y_test, verbose=1)
accuracy = 100*score[1]

print("Model accuracy: %.4f%%" % accuracy) 

In [None]:
# sound = AudioSegment.from_mp3(path_mp3)
# print(f'The file {path_mp3} is {len(sound)/1000/60:.1f}min long')

In [None]:
# # intervals
# intervals = []
# rows = list(labels.iterrows())
# for r, next_r in zip(rows[:-1], rows[1:]):
#     intervals.append([r[1]['start'], next_r[1]['start'], r[1]['annotation']])
# intervals.append([rows[-1][1]['start'], len(sound)/1000, rows[-1][1]['annotation']])
# print('\n'.join([str(x) for x in intervals]))

In [None]:
# from numpy import linspace
# import shutil
# import os
# from tqdm import tqdm
# import io
# import importlib
# importlib.reload(feature)

# classifications = []
# for start, end, label in tqdm(intervals):
#     chunk_start = start
#     while chunk_start+chunk_length < end:
#         sound_chunk = sound[int(chunk_start*1000):int((chunk_start+chunk_length)*1000)]
#         sound_chunk = sound_chunk.set_frame_rate(44100)

#         channels = 2
#         samples = [float(x) for x in sound_chunk.get_array_of_samples()]
#         stacked = np.vstack((samples[0::channels], samples[1::channels]))
#         data = feature.extract_features(stacked, sound_chunk.frame_rate)

#         prediction = model.predict_classes(data.reshape(1,80))[0]
#         if prediction == 0:
#             prediction_label = 'm'
#         else:
#             prediction_label = 'p'

#         classifications.append(prediction_label == label)
#         chunk_start += chunk_move_step
#     print(f'\n{sum(classifications)/len(classifications)*100:.1f} %')

In [None]:
# online
def classify_sound_chunk(sound_chunk):
    channels = 2
    samples = [float(x) for x in sound_chunk.get_array_of_samples()]
    stacked = np.vstack((samples[0::channels], samples[1::channels]))
    data = feature.extract_features(stacked, sound_chunk.frame_rate)

    prediction = model.predict_classes(data.reshape(1,80))[0]
    if prediction == 0:
        prediction_label = 'm'
    else:
        prediction_label = 'p'
    return prediction_label

In [None]:
from io import BytesIO
from urllib.request import urlopen
from IPython.display import display, clear_output
from datetime import datetime

url = "https://dradio-edge-209a-fra-lg-cdn.cast.addradio.de/dradio/nova/live/mp3/128/stream.mp3" # nova
#url = "http://st01.dlf.de/dlf/01/128/mp3/stream.mp3" # DLF
#url = "https://st02.sslstream.dlf.de/dlf/02/128/mp3/stream.mp3" # kultur

u = urlopen(url)
buffer = []
mute_states = []
mute_state_n = 8
is_online = False
while True:
    data = u.read(1024*2)
    buffer.append(data)
    concat = b''.join(buffer)

    try:
        buffer_as_audiosegment = AudioSegment.from_mp3(BytesIO(concat))
    except:
        print('error')
        buffer = []
        u = urlopen(url)
        is_online = False
        continue

    if len(buffer_as_audiosegment) >= chunk_length*1000:
        cropped = buffer_as_audiosegment[:10000]
        classification = classify_sound_chunk(cropped)

        mute_states.append(classification == 'p')
        if len(mute_states) > mute_state_n:
            mute_states = mute_states[len(mute_states)-mute_state_n:]
        mute = sum(mute_states) / len(mute_states)
        mapping = {'p':'person/news','m':'music'}
        clear_output(wait=True)
        display(f' {datetime.now()} Classification: {mapping[classification]:>15}, sound state: {mute*100: 3.0f} %')
        buffer = buffer[3:]
        is_online = True
    else:
        if not is_online:
            clear_output(wait=True)
            missing = chunk_length - len(buffer_as_audiosegment) / 1000
            display(f'{datetime.now()} Loading additional {missing: 4.1f}s')
