# TODO
- Zkusit jestli to nepozná už CLAP (motorky / auta) - _Martin_ 26.10. **DONE**
- Kategorie na "clustering" NN - _Filip_ 27.10.
- Nechat si schválit téma u Tomáše - _Filip_ 27.10.
- Add links to the respective used models and libraries for downloading and later reference and update Zotero bib - _Martin_ 26.10. **DONE**

**Nápady:**
  - titulky pro hluchoněmý
  - "clustering" - self referential NN (variational auto encoding - VAE)

In [None]:
import json
from datasets import load_dataset
import sounddevice as sd
from IPython.display import Audio, IFrame, display
import pandas as pd
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

# Audio File Download
- Using [yt-dlp](https://pypi.org/project/yt-dlp/#embedding-yt-dlp) for downloading the youtube files.

In [None]:
import yt_dlp
from yt_dlp import YoutubeDL
from yt_dlp.utils import download_range_func

In [None]:
url_motorcycle_test = 'https://youtu.be/--EG-JqO4S0?si=Py8FBEOvyR_5V_vm'
url_embed_motorcycle_test = "https://www.youtube.com/embed/--EG-JqO4S0?si=Py8FBEOvyR_5V_vm"

In [None]:
IFrame(width=560, height=315, src=url_embed_motorcycle_test, title="YouTube video player")

In [None]:
keys_of_interest = ['id', 'title', 'uploader', 'duration', 'view_count', 'like_count',
                        'categories', 'webpage_url', 'tags', 'release_year', 'filesize', 'description',
                        'format_id', 'acodec', 'audio_ext']

In [None]:
ydl_opts = {
    'format': 'flac/bestaudio/best',
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'flac',  # preferred audio format
    }],
    'outtmpl': '../newData/engine/%(id)s.%(ext)s',
    'no_warnings':True,
    'quiet': True,
    'ffmpeg_location': '/opt/homebrew/bin/ffmpeg'
    # suppress output
}

In [None]:
# TODO: Test the validity of the URL
# TODO: Logging with    logger:            Log messages to a logging.Logger instance.
def yt_download(url, infokeys = keys_of_interest, opts = ydl_opts, start_time=None, end_time=None):
    with YoutubeDL(ydl_opts) as ydl:
        info = ydl.extract_info(url, download=False)
        serialized_info = ydl.sanitize_info(info)
        info_dict = {key: serialized_info[key] for key in keys_of_interest if key in serialized_info}
        info_dict["full_duration"] = info_dict["duration"]
    if start_time or end_time:
        info_dict["start_time"] = start_time or 0
        info_dict["end_time"] = end_time or info_dict["full_duration"]
        opts['download_ranges'] = download_range_func(None, [(info_dict["start_time"] , info_dict["end_time"])])
        opts['force_keyframes_at_cuts'] = True,
        info_dict["duration"] = info_dict["end_time"] - info_dict["start_time"]
        
    with YoutubeDL(ydl_opts) as ydl:
        download_info = ydl.extract_info(url, download=True)
        serialized_info = ydl.sanitize_info(download_info)
        assert len(serialized_info["requested_downloads"]) == 1
        download_path = serialized_info["requested_downloads"][0]["filepath"]
        info_dict["path"] = download_path
    return info_dict

In [None]:
audio_test = yt_download(url_motorcycle_test)

In [None]:
test_audio_file_df = pd.DataFrame([audio_test])

In [None]:
test_audio_file_df

In [None]:
test_audio_file_df.to_json("../data/test_audio.json")

In [None]:
Audio(audio_test["path"])

# Audio classification
__Plan:__ I will be using a single-shot classifiction approach using [this open model](https://huggingface.co/laion/clap-htsat-fused#usage). This means that I need to predefine audio labels that will be contained in the data. Some of them will be the classes that I want to extract from the audio. I will be splitting the audio into smaller parts of 10s of a second. Then after the classification, I will merge the parts that include the wanted classes into an audio file that will be in our training data.

In [None]:
from transformers import pipeline, ClapModel, ClapProcessor, ClapAudioConfig, ClapConfig, ClapTextConfig
import scipy.signal as sps

In [None]:
clap_model = ClapModel.from_pretrained("laion/clap-htsat-fused") # `fused` means that the model has a text and audio modalities in a singular branch
clap_processor = ClapProcessor.from_pretrained("laion/clap-htsat-fused")

In [None]:
clap_model.config;

In [None]:
classes = ["Engine revving", "Speach", "Silence", "Engine starting", "Engine running"]

## Testing Clap

In [None]:
test_dataset = load_dataset("hf-internal-testing/ashraq-esc50-1-dog-example")
test_audio_sample = test_dataset["train"]["audio"][0]

In [None]:
test_audio_sample

In [None]:
Audio(test_audio_sample["array"], rate=test_audio_sample["sampling_rate"])

In [None]:
test_audio_sample_resamped = sps.resample(test_audio_sample["array"], round(len(test_audio_sample["array"]) * float(48000) / test_audio_sample["sampling_rate"]))

In [None]:
test_classes = ["Dog barking", "Cat sounds"]

In [None]:
test_clap_inputs = clap_processor(text=test_classes, audios=test_audio_sample_resamped, return_tensors="pt", padding=True, sampling_rate=48000)

In [None]:
test_outputs = clap_model(**test_clap_inputs)

In [None]:
test_probs = test_outputs.logits_per_audio.softmax(dim=-1)  # this is the audio-text similarity score

In [None]:
{c: float(p) for c,p in zip(test_classes, test_probs[0])}

## Spliting into Frames

In [None]:
import librosa
import numpy as np

In [None]:
audio_test_array,audio_test_array_sr = librosa.load(audio_test["path"])

In [None]:
def resample(audio_input, current_rate, target_rate):
    return sps.resample(audio_input, round(len(audio_input) * float(target_rate) / current_rate))

In [None]:
audio_test_array_resamped = resample(audio_test_array, audio_test_array_sr, 48000)

In [None]:
Audio(audio_test_array_resamped, rate=48000)

In [None]:
def audio_frames(audio_input, frame_duration=10000, overlap_duration=5000, rate=48000):
    frame_length = round(frame_duration/1000) * rate
    overlap_length = round(overlap_duration/1000) * rate
    hop_length = frame_length - overlap_length
    frames = librosa.util.frame(audio_input, frame_length=frame_length, hop_length=hop_length, axis=0)
    frames_annotated =  [{ 
        "array": w, 
        "start_s":(i*hop_length), # starting sample
        "start_time":round(1000*(i*hop_length)/rate), # starting time in ms
        "end_s":((i*hop_length) + frame_length), # ending sample
        "end_time": round(1000*((i*hop_length) + frame_length)/rate) # ending time in ms
    } for i,w in enumerate(frames)]
    if len(frames)*hop_length != len(audio_input):
        last_frame_start = len(frames)*hop_length
        last_frame_end = len(audio_input)
        frames_annotated.append({
            "array": audio_input[last_frame_start:last_frame_end],
            "start_s": last_frame_start,
            "start_time": round(1000*last_frame_start/rate),
            "end_s": last_frame_end,
            "end_time": round(1000*last_frame_end/rate)
        })
    return frames_annotated

In [None]:
frames_test = audio_frames(audio_test_array_resamped, overlap_duration=5000)

In [None]:
Audio(frames_test[5]["array"], rate=48000)

In [None]:
Audio(frames_test[6]["array"], rate=48000)

## Classification of the Frames

In [None]:
# TODO: Add progress bar
def classify_frames(frames, classes, model = clap_model):
    frames_classified = []
    for f in frames:   
        fc = f.copy()
        frame_inputs = clap_processor(text=classes, audios=fc["array"], return_tensors="pt", padding=True, sampling_rate=48000),
        frame_outputs = model(**frame_inputs[0])
        probs = frame_outputs.logits_per_audio.softmax(dim=-1)  # this is the audio-text similarity score
        fc["probs"] = {c: float(p) for c,p in zip(classes, probs[0])}
        frames_classified.append(fc)
    return frames_classified

In [None]:
frames_test_classified = classify_frames(frames_test, classes)

In [None]:
Audio(frames_test_classified[0]["array"], rate=48000)

In [None]:
frames_test_classified[0]["probs"]

In [None]:
Audio(frames_test_classified[8]["array"], rate=48000)

In [None]:
frames_test_classified[8]["probs"] 

In [None]:
Audio(frames_test_classified[12]["array"], rate=48000)

In [None]:
frames_test_classified[12]["probs"]

In [None]:
import matplotlib.pyplot as plt

In [None]:
def prob_plot(frames_classified, classes):
    frames_probs = dict()
    for c in classes:
        frames_probs[c] = [f["probs"][c] for f in frames_classified]
    times = [f["start_time"]/1000 for f in frames_classified]
    fig, ax = plt.subplots()
    for c in classes:
        ax.step(times, frames_probs[c], where = "post", label = c)
    ax.legend()
    ax.set_xlabel("time (s)")
    ax.set_ylabel("p(class)")
    plt.show()

In [None]:
prob_plot(frames_test_classified, classes)

# Merging the Fidelized Frames

In [None]:
def merge_frames(frames, audio_array):
    merged_segment = dict()
    merged_segment["start_s"] = np.min([s["start_s"] for s in frames]) 
    merged_segment["start_time"] = np.min([s["start_time"] for s in frames])
    merged_segment["end_s"] = np.max([s["end_s"] for s in frames]) 
    merged_segment["end_time"] = np.max([s["end_time"] for s in frames])
    merged_segment["array"] = audio_array[merged_segment["start_s"]:merged_segment["end_s"]]
    return merged_segment
def get_valid_segments(frames_classified, audio_array, accepted_classes, class_thresh = 0.8):
    valid_segments = []
    is_valid_segment = [np.sum([f["probs"][c] for c in accepted_classes])  > class_thresh for f in frames_classified]
    current_segment = []
    for i,valid_f in enumerate(is_valid_segment):
        if valid_f:
            current_segment.append(frames_classified[i])
        else:
            if len(current_segment) == 0:
                continue
            valid_segments.append(merge_frames(current_segment, audio_array))
            current_segment = []
         # last segment needs to be added if it ends with True
        if len(is_valid_segment) == (i+1) and len(current_segment) != 0:
            valid_segments.append(merge_frames(current_segment, audio_array))
    return valid_segments

In [None]:
valid_segments_test = get_valid_segments(frames_test_classified, audio_test_array_resamped, ["Engine revving", "Engine running", "Engine starting"])

In [None]:
is_valid_segment = [np.sum([f["probs"][c] for c in ["Engine revving", "Engine running", "Engine starting"]])  > 0.8 for f in frames_test_classified]

In [None]:
valid_segments_test

In [None]:
fig,ax = plt.subplots()
ax.stairs(is_valid_segment)
ax.set_title("Valid segments")
plt.show()

In [None]:
is_valid_segment;

In [None]:
Audio(valid_segments_test[0]["array"], rate=48000)

## Testing Motorcyle / Car Zeroshot Classification

In [None]:
url_moto_test = 'https://youtu.be/--EG-JqO4S0?si=Py8FBEOvyR_5V_vm'
url_embed_moto_test = "https://www.youtube.com/embed/--EG-JqO4S0?si=Py8FBEOvyR_5V_vm"
url_car_test = 'https://youtu.be/-3d2F1GLG3w?si=9LK9bDh_SaVYUwCi'
url_embed_car_test = 'https://www.youtube.com/embed/-3d2F1GLG3w?si=9LK9bDh_SaVYUwCi'

In [None]:
IFrame(width=300, height=200, src=url_embed_moto_test, title="YouTube video player")

In [None]:
IFrame(width=300, height=200, src=url_embed_car_test, title="YouTube video player")

In [None]:
audio_test_moto = yt_download(url_moto_test)
audio_test_car = yt_download(url_car_test)

In [None]:
pd.DataFrame([audio_test_moto, audio_test_car])

In [None]:
audio_test_array_moto, audio_test_array_moto_sr = librosa.load(audio_test_moto["path"])
audio_test_array_car, audio_test_array_car_sr = librosa.load(audio_test_car["path"])

audio_test_array_moto_resamped = resample(audio_test_array_moto, audio_test_array_moto_sr, 48000)
audio_test_array_car_resamped = resample(audio_test_array_car, audio_test_array_car_sr, 48000)

frames_test_moto = audio_frames(audio_test_array_moto_resamped)
frames_test_car = audio_frames(audio_test_array_car_resamped)

frames_test_moto_classified = classify_frames(frames_test_moto, classes)
frames_test_car_classified = classify_frames(frames_test_car, classes)

valid_classes = ["Engine revving", "Engine running", "Engine starting"]
valid_segments_test_moto = get_valid_segments(frames_test_moto_classified, audio_test_array_moto_resamped, valid_classes)
valid_segments_test_car = get_valid_segments(frames_test_car_classified, audio_test_array_car_resamped, valid_classes)

In [None]:
prob_plot(frames_test_moto_classified, classes)

In [None]:
prob_plot(frames_test_car_classified, classes)

In [None]:
Audio(valid_segments_test_car[0]["array"], rate=48000)

In [None]:
Audio(valid_segments_test_moto[0]["array"], rate=48000)

In [None]:
test_car_moto_classes = ["Motorcycle engine", "Car engine"]

In [None]:
test_clap_inputs_moto = clap_processor(
    text=test_car_moto_classes,
    audios=valid_segments_test_moto[0]["array"], 
    return_tensors="pt", padding=True, sampling_rate=48000)
test_clap_inputs_car = clap_processor(
    text=test_car_moto_classes,
    audios=valid_segments_test_car[0]["array"], 
    return_tensors="pt", padding=True, sampling_rate=48000)

test_outputs_moto = clap_model(**test_clap_inputs_moto)
test_outputs_car = clap_model(**test_clap_inputs_car)

In [None]:
test_probs_moto = test_outputs_moto.logits_per_audio.softmax(dim=-1)
test_probs_car = test_outputs_car.logits_per_audio.softmax(dim=-1)

In [None]:
{c: float(p) for c,p in zip(test_car_moto_classes, test_probs_moto[0])}

In [None]:
{c: float(p) for c,p in zip(test_car_moto_classes, test_probs_car[0])}

**Clap cannot faithfully recognize a motorcycle engine sound from the car engine sound!**

# Scraping the Candidate URLs
- We will be using [this site](https://research.google.com/audioset/////unbalanced_train/engine.html) for the dataset scraping

In [None]:
import os, time

from bs4 import BeautifulSoup
import requests
# data is populated using a script therefore we need to use a webdriver to run the javascript
# unfortunately for this it is necessary to have the chrome binary oooff...
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from webdriver_manager.chrome import ChromeDriverManager

In [None]:
driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

In [None]:
testing_category_url = "https://research.google.com/audioset/eval/squeak.html"

In [None]:
def load_full_page(driver, category_url=None):
    if category_url:
        driver.get(category_url)
    driver.implicitly_wait(10)

    # scroll infinitely in order to load all of the thumbnails
    last_height = driver.execute_script("return document.body.scrollHeight")
    while True:
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(2)
        new_height = driver.execute_script("return document.body.scrollHeight")
        if new_height == last_height: # at the end of page
            break
        
        last_height = new_height
    return driver

In [None]:
driver = load_full_page(driver, testing_category_url)
soup = BeautifulSoup(driver.page_source, 'html.parser')

In [None]:
def find_thumbnails(soup):
    thumbnail_div = soup.find('div', attrs={'id':'thumbnails'})
    thumbnail_divs = thumbnail_div.find_all('div', {'class':'u'})
    thumbs_dicts = []
    label_dict = dict()
    for t in thumbnail_divs:
        new = dict()
        new['yt-id'] = t['data-ytid']
        new['start-t'] = int(t['data-start'])
        new['end-t'] = int(t['data-end'])
        labs = json.loads(t['data-labels'])
        new['labels'] = [l[1] for l in labs]
        for l in labs:
            if not label_dict.get(l[1]):
                label_dict[l[1]] = l[0]
        thumbs_dicts.append(new)
    return thumbs_dicts, thumbnail_divs

In [None]:
thumbs_dicst, thumbnail_divs = find_thumbnails(soup)

In [None]:
len(thumbnail_divs)

**STATE: All the info needed to download the files for the categories is scraped. We have the function for downloading in only parts of the audio ready. I am waiting only for Filips category selection.**

In [None]:
pd.DataFrame.from_dict(thumbs_dicst)

# Downloading the Audio Samples

In [None]:
 url_V8_engine_sound = 'https://www.youtube.com/watch?v=QunyWALxgps'
 url_VW_review = 'https://youtu.be/UIfRuOaB2dA?si=ikC_EbabT0e-tZxq'

In [None]:
vehicle_category_url = 'https://research.google.com/audioset/unbalanced_train/vehicle.html'

In [None]:
driver_vehicle = webdriver.Chrome(service=Service(ChromeDriverManager().install()))

In [None]:
for i in range(100):
    driver_vehicle = load_full_page(driver_vehicle, vehicle_category_url)
    print(driver_vehicle)
    print(i)

In [None]:
soup_vehicles = BeautifulSoup(driver_vehicle.page_source, 'html.parser')

In [None]:
df_vehicle_audioset = pd.DataFrame.from_dict(thumbs_dicts_vehicles)
print(df_vehicle_audioset)

In [None]:
thumbs_dicts_vehicles, thumbnail_divs_vehicles = find_thumbnails(soup_vehicles)

In [None]:
df_speech_and_vehicle = df_vehicle_audioset[df_vehicle_audioset["labels"].apply(lambda lbls: "vehicle" in lbls)]
df_speech_and_vehicle

In [None]:
for _,s in tqdm(df_speech_and_vehicle.iterrows()):
    yt_info = yt_download(s["yt-id"], start_time=s["start-t"], end_time=s["end-t"])
    yt_info["labels"] = s["labels"]
    download_information.append(yt_info)

In [None]:
download_information = []

In [None]:
audio_test

In [None]:
from tqdm import tqdm

In [None]:
df_download_info = pd.DataFrame.from_dict(download_information)

In [None]:
df_download_info.to_parquet('../download_info.parquet')
# loaded_df = pd.read_parquet('../download_info.parquet')

In [None]:
df_speech_and_vehicle.to_parquet("../speech_and_vehicle_audioset.parquet")

In [None]:
df_vehicle_audioset.to_parquet("../vehicle_audioset.parquet")

# Cleaning the Sounds and Indexing the Files

In [None]:
import pandas as pd

# Load the Parquet file
parquet_file = '../vehicle_audioset_full.parquet'
df = pd.read_parquet(parquet_file)

# Save the DataFrame to JSON
json_file = 'vehicle_audioset_full.json'
df.to_json(json_file, orient='records', lines=True)

print("Conversion completed. JSON saved at:", json_file)

In [None]:
import pandas as pd
import json

# Load Parquet file
input_file = "../vehicle_audioset_full.parquet"
output_file = "../filtered_vehicle_audioset.parquet"

# Load DataFrame
df = pd.read_parquet(input_file)

# Labels to filter (you can add the desired labels here)
specific_labels = ['music']  # Example labels

# Function to check if any specific label exists in 'labels.list'
def label_filter(label_list):
    try:
        # Convert the string to JSON if it's stored as a string
        parsed_list = json.loads(label_list) if isinstance(label_list, str) else label_list
        # Extract the 'element' values and decode if necessary
        elements = [''.join(chr(c) for c in elem.get('element', [])) for elem in parsed_list]
        return any(label in elements for label in specific_labels)
    except:
        return False

# Apply the filter
filtered_df = df[df['labels'].apply(label_filter)]

# Save filtered data to Parquet
filtered_df.to_parquet(output_file, index=False)

# Optionally save as JSON
filtered_df.to_json("filtered_vehicle_audioset.json", orient="records")

print("Filtered data saved successfully!")


In [None]:
import pandas as pd
from collections import Counter

# Load the Parquet file
parquet_file = '../vehicle_audioset_full.parquet'  # Replace with your local file path
df = pd.read_parquet(parquet_file)

# Filter rows that contain at least one of the desired labels
target_labels = {"engine", "speech", "music"}
filtered_df = df[df['labels'].apply(lambda x: any(label in target_labels for label in x))]

# Extract the filtered labels
all_labels = filtered_df['labels']

# Count single labels
single_labels = Counter(label for labels in all_labels for label in labels if label in target_labels)

# Count label combinations
combinations = Counter(tuple(sorted(label for label in labels if label in target_labels)) for labels in all_labels)

# Display results
print("Filtered Single Label Counts:")
for label, count in single_labels.items():
    print(f"{label}: {count}")

print("\nFiltered Label Combination Counts:")
for combo, count in combinations.items():
    print(f"{combo}: {count}")


In [None]:
import pandas as pd

# Load the Parquet file
parquet_file = '../vehicle_audioset_full.parquet'  # Replace with your file path
output_parquet_file = 'filtered_vehicle_music.parquet'  # Filtered output file

# Target labels to filter
target_labels = {"music"}

# Load the Parquet data
df = pd.read_parquet(parquet_file)

# Filter rows that contain at least one of the target labels
filtered_df = df[df['labels'].apply(
    lambda x: len(x) == 2 and "vehicle" in x and any(label in target_labels for label in x if label != "vehicle"))]

# Save the filtered DataFrame to a new Parquet file
filtered_df.to_parquet(output_parquet_file, index=False)

In [None]:
df_engine = pd.read_parquet('filtered_vehicle_engine.parquet')

In [None]:
ydl_opts = {
    'format': 'flac/bestaudio/best',
    'postprocessors': [{
        'key': 'FFmpegExtractAudio',
        'preferredcodec': 'flac',  # preferred audio format
    }],
    'outtmpl': '../newData/engine/%(id)s.%(ext)s',
    'no_warnings':True,
    'quiet': True,
    'ffmpeg_location': '/opt/homebrew/bin/ffmpeg'
    # suppress output
}

for idx, (_, s) in enumerate(tqdm(df_engine.iterrows(), total=10)):
    if idx >= 10:  # Stop the loop after 10 iterations
        break
    yt_info = yt_download(s["yt-id"], start_time=s["start-t"], end_time=s["end-t"])
    yt_info["labels"] = s["labels"]

In [None]:
import pandas as pd
from tqdm import tqdm
import os

# Load the Parquet file
parquet_file = '../vehicle_audioset_full.parquet'  # Replace with the correct path
df = pd.read_parquet(parquet_file)

# Define target label combinations
target_labels = ["engine", "speech", "music"]
number_to_download_per_label = 300
# Base output folder
output_base = "grouped"

# Function to filter rows with exactly two labels: "vehicle" + a target label
def filter_labels(row_labels, target_label):
    return len(row_labels) == 2 and "vehicle" in row_labels and target_label in row_labels

# Process each label and download files
for label in target_labels:
    print(f"Processing label: {label}...")

    # Filter DataFrame for "vehicle" + target label
    filtered_df = df[df['labels'].apply(lambda x: filter_labels(x, label))]
    print(f"Found {len(filtered_df)} rows for label '{label}'")

    # Output folder for the current label
    output_folder = os.path.join(output_base, label)
    os.makedirs(output_folder, exist_ok=True)

    # YouTube download options with dynamic outtmpl for each label
    ydl_opts = {
        'format': 'flac/bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'flac',  # preferred audio format
        }],
        'outtmpl': os.path.join(output_folder, '%(id)s.%(ext)s'),
        'no_warnings': True,
        'quiet': True,
        'ffmpeg_location': '/opt/homebrew/bin/ffmpeg'
    }

    downloaded_count = 0

    # Download up to 10 files
    for idx, (_, s) in enumerate(tqdm(filtered_df.iterrows(), total=min(number_to_download_per_label, len(filtered_df)))):
        if downloaded_count >= number_to_download_per_label:  # Stop after downloading 10 files
            break
        try:
            start_time = s.get("start-t", 0)
            end_time = s.get("end-t", 0)
            duration = end_time - start_time

            if duration > 15 or duration <= 0:
                print(f"Skipping {s['yt-id']} due to duration ({duration} seconds).")
                continue

            # Download the file with the current ydl_opts
            yt_download(
                s["yt-id"],
                opts=ydl_opts,
                start_time=start_time,
                end_time=end_time
            )
            downloaded_count += 1
        except Exception as e:
            print(f"Error processing {s['yt-id']}: {e}")

    print(f"Completed processing for label '{label}'. Downloaded {downloaded_count} files.")
