In [1]:
import torch
import torchaudio
from torch_vggish_yamnet import yamnet
from torch_vggish_yamnet.input_proc import WaveformToInput
import librosa
import numpy as np
import pandas as pd
from pydub import AudioSegment
import subprocess
import os


In [4]:
# Initialize the waveform converter
converter = WaveformToInput()

# Initialize the YAMNet model
model = yamnet.yamnet(pretrained=True)

#device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('cpu')
model.to(device)

model.eval()  # Set the model to evaluation mode

YAMNet(
  (layer1): Conv(
    (fused): CONV_BN_RELU(
      (conv): Conv2d_tf(1, 32, kernel_size=(3, 3), stride=(2, 2), padding=SAME, bias=False)
      (bn): BatchNorm2d(32, eps=0.0001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
  )
  (layer2): SeparableConv(
    (depthwise_conv): CONV_BN_RELU(
      (conv): Conv2d_tf(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=SAME, groups=32, bias=False)
      (bn): BatchNorm2d(32, eps=0.0001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
    (pointwise_conv): CONV_BN_RELU(
      (conv): Conv2d_tf(32, 64, kernel_size=(1, 1), stride=(1, 1), padding=SAME, bias=False)
      (bn): BatchNorm2d(64, eps=0.0001, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU()
    )
  )
  (layer3): SeparableConv(
    (depthwise_conv): CONV_BN_RELU(
      (conv): Conv2d_tf(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=SAME, groups=64, bias=False)
      (bn): BatchNorm2d(

In [5]:
# def load_audio(file_path, target_sr=16000):
#     # Load audio file
#     audio = AudioSegment.from_file(file_path, format="webm")
#     print(1 )
#     # Resample to target sample rate
#     audio = audio.set_frame_rate(target_sr)
#     print(2)
#     # If stereo, average the channels to create mono
#     if audio.channels > 1:
#         samples = np.array(audio.split_to_mono())
#         averaged_samples = (samples[0].get_array_of_samples() + samples[1].get_array_of_samples()) / 2
#     else:
#         averaged_samples = np.array(audio.get_array_of_samples())
#     print(3)
#     # Convert to float32 and normalize
#     y = averaged_samples.astype(np.float32)
#     y /= np.iinfo(audio.array_type).max  # Normalize to [-1, 1]
#     print(4)
#     return y, target_sr

def load_audio(file_path, target_sr=16000):
    import torchaudio

    # Load the audio file
    y, sr = torchaudio.load(file_path)  # y is a tensor of shape [channels, samples]

    print(f"Loaded audio file {file_path}, sample rate {sr}, waveform shape {y.shape}")

    # Convert to mono by averaging channels if necessary
    if y.shape[0] > 1:
        y = y.mean(dim=0)
    else:
        y = y.squeeze(0)

    # Resample if needed
    if sr != target_sr:
        resampler = torchaudio.transforms.Resample(sr, target_sr)
        y = resampler(y)
        sr = target_sr

    y = y.numpy()  # Convert tensor to numpy array if needed
    return y, sr

def convert_webm_to_wav(file_path):
    # Define the output file path with a .wav extension
    output_path = file_path.replace(".webm", ".wav")
    # Run the ffmpeg command to convert
    subprocess.run(['ffmpeg', '-i', file_path, output_path, '-y'], check=True)
    return output_path

def detect_laughter(y_chunk, sr, threshold=0.5):
    y = torch.tensor(y_chunk).float().to(device)  # Move to the correct device

    # Ensure y has the correct shape [batch_size, samples]
    if y.ndim == 1:
        y = y.unsqueeze(0)  # Add batch dimension

    in_tensor = converter(y, sr).to(device)  # Move the input tensor to the same device as the model

    with torch.no_grad():
        embeddings, logits = model(in_tensor)

    probabilities = torch.softmax(logits, dim=1).cpu()  # Move to CPU for processing

    class_map_path = yamnet._DEFAULT_YAMNET_CLASSES_PATH
    with open(class_map_path, 'r') as f:
        class_names = [line.strip() for line in f.readlines()]

    laughter_indices = [i for i, name in enumerate(class_names) if 'laughter' in name.lower()]
    if not laughter_indices:
        raise ValueError("Laughter class not found in YAMNet class names.")
    laughter_index = laughter_indices[0]

    frame_duration = 0.48
    timestamps = np.arange(len(probabilities)) * frame_duration

    laughter_probs = probabilities[:, laughter_index]
    laughter_frames = laughter_probs > threshold

    laughter_events = []
    start_time = None
    for i, is_laughter in enumerate(laughter_frames):
        time = timestamps[i]
        if is_laughter and start_time is None:
            start_time = time
        elif not is_laughter and start_time is not None:
            end_time = time
            laughter_events.append((start_time, end_time))
            start_time = None
    if start_time is not None:
        laughter_events.append((start_time, timestamps[-1]))

    return laughter_events

def save_laughter_events(laughter_events, output_file):
    df = pd.DataFrame(laughter_events, columns=['Start Time', 'End Time'])
    df.to_csv(output_file, index=False)

def process_audio_file(file_path, output_file, threshold=0.5):
    y, sr = load_audio(file_path)
    laughter_events = detect_laughter(y, sr, threshold=threshold)
    save_laughter_events(laughter_events, output_file)
    print(f"Laughter events saved to {output_file}")
    
def process_audio_in_chunks(file_path, chunk_duration=10, threshold=0.5):
    print("Loading audio file...")
    y, sr = load_audio(file_path)
    total_duration = len(y) / sr
    laughter_events = []
    total_chunks = int(np.ceil(total_duration / chunk_duration))
    print(f"Processing audio in {chunk_duration}-second chunks...")

    for i, start in enumerate(np.arange(0, total_duration, chunk_duration)):
        print(f"Processing chunk {i+1} of {total_chunks}...")
        end = min(start + chunk_duration, total_duration)
        start_sample = int(start * sr)
        end_sample = int(end * sr)
        y_chunk = y[start_sample:end_sample]

        # Process the chunk and collect laughter events
        events = detect_laughter(y_chunk, sr, threshold)

        # Adjust event times for the full audio timeline
        adjusted_events = [(s + start, e + start) for s, e in events]
        laughter_events.extend(adjusted_events)

    # Save the aggregated laughter events
    output_file = f"{file_path}_laughter_timestamps.csv"
    save_laughter_events(laughter_events, output_file)
    print(f"Laughter events saved to {output_file}")


In [7]:
audio_files = ['@StronnyCuttles/processed/【ASMR Stream】Intense Ear Cleaning and Slime Poking!🦑🛐【VAllure】 [3rIax65ailI].webm']  # Replace with your list of audio files
# for file_path in audio_files:
#     output_file = f"{file_path}_laughter_timestamps.csv"
#     #process_audio_file(file_path, output_file, threshold=0.5)
#     process_audio_in_chunks(file_path, chunk_duration=10, threshold=0.5)


for file_path in audio_files:
    delete_file = False
    if file_path.endswith('.webm'):
        file_path = convert_webm_to_wav(file_path)  # Convert if webm format
        delete_file = True
        
    output_file = f"{file_path}_laughter_timestamps.csv"
    #process_audio_file(file_path, output_file, threshold=0.5)
    process_audio_in_chunks(file_path, chunk_duration=10, threshold=0.5)
    # delete the converted file if it was created
    if delete_file:
        print(f"Deleting converted file {file_path}")
        #os.remove(file_path)

Loading audio file...
Loaded audio file @StronnyCuttles/processed/【ASMR Stream】Intense Ear Cleaning and Slime Poking!🦑🛐【VAllure】 [3rIax65ailI].wav, sample rate 48000, waveform shape torch.Size([2, 357600305])
Processing audio in 10-second chunks...
Processing chunk 1 of 746...


AttributeError: module 'torch_vggish_yamnet.yamnet' has no attribute '_DEFAULT_YAMNET_CLASSES_PATH'

In [None]:
def filter_excessive_laughter(laughter_events, min_duration=2.0):
    return [event for event in laughter_events if (event[1] - event[0]) >= min_duration]

In [13]:
audio_files

['@StronnyCuttles/processed/【ASMR Stream】Intense Ear Cleaning and Slime Poking!🦑🛐【VAllure】 [3rIax65ailI].webm']

In [11]:
laughter_events = detect_laughter(y, sr, threshold=threshold)
excessive_laughter_events = filter_excessive_laughter(laughter_events, min_duration=2.0)
save_laughter_events(excessive_laughter_events, output_file)


NameError: name 'y' is not defined