In [106]:
import librosa
import os
import yt_dlp
import soundfile as sf

def trim_audio(input_file, output_dir, segment_duration=10):
    # Load the audio file
    audio, sr = librosa.load(input_file, sr=None)
    num_segments = len(audio) // (sr * segment_duration)
    
    for i in range(num_segments):
        start_sample = i * sr * segment_duration
        end_sample = (i + 1) * sr * segment_duration
        segment = audio[start_sample:end_sample]
        
        output_file = os.path.join(output_dir, f"segment_{i}.wav")
        sf.write(output_file, segment, sr)


def extract_segment(input_file, output_file, start_time, end_time):
    # Load the audio file
    audio, sr = librosa.load(input_file, sr=None)
    
    # Calculate the start and end samples
    start_sample = int(start_time * sr)
    end_sample = int(end_time * sr)
    
    # Extract the segment
    segment = audio[start_sample:end_sample]
    
    # Save the segment to a new file
    sf.write(output_file, segment, sr)

def download_audio(url, output_template):
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',  # You can change this to other formats like 'wav', 'm4a', etc.
            'preferredquality': '192',
        }],
        'outtmpl': output_template,
    }

    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])


def resample_audio(input_file, output_file, sr=16000):
    audio, sr = sf.read(input_file)
    audio_resampled = librosa.resample(audio, orig_sr=sr, target_sr=16000)
    sf.write(output_file, audio_resampled, sr)

In [140]:
url = "https://www.youtube.com/watch?v=cflUoc82iUA&t=620s"
vid = url.split("shorts/")[1].split("?")[0] if "shorts" in url else url.split("?v=")[1].split("&")[0]

#1 Download audio
output_template = f"{vid}"
# download_audio(url, output_template)

In [149]:
#2 Download transcript
from collections import defaultdict
from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound
allow_auto_transcript = False

transcripts = list()
try:
    tx = YouTubeTranscriptApi.list_transcripts(vid)
    languages = [t.language_code for t in tx]
    is_generated = [t.is_generated for t in tx]
    scripts = [t.fetch() for t in tx]

    for l, g, s in zip(languages, is_generated, scripts):        
        transcripts.append({"text": s, "is_generated": g, "language": l})

    
    if not allow_auto_transcript:
        for i,t in enumerate(transcripts):
            if t["is_generated"]:
                del transcripts[i]
                
    
    for i,t in enumerate(transcripts):
        # print(f"Transcript {i+1}:")
        print(f"Language: {t['language']}")
        print(f"Generated: {t['is_generated']}")
        # print(f"Text: {t['text']}")
        print("\n")
    if len(transcripts) == 0:
        print("No transcript available.")
    
except NoTranscriptFound:
    raise NoTranscriptFound("No transcript found.")

Language: de
Generated: False




In [102]:
def binary_search(arr:list, x:float):
    left = 0
    right = len(arr) - 1

    while left <= right:
        mid = (left + right) // 2
        if arr[mid] == x:
            return mid, arr[mid]
        elif arr[mid] < x:
            left = mid + 1
        else:
            right = mid - 1

    if right < 0:
        return left, arr[left]
    if left >= len(arr):
        return right, arr[right]
    
    if x - arr[right] <= arr[left] - x:
        return right, arr[right]
    else:
        return left, arr[left]



In [103]:
#3 Create segments with corresponding transcript

from collections import defaultdict


output_dir = f"segments/{vid}"
os.makedirs(output_dir, exist_ok=True)

segment_length = 10  # seconds

# Assuming `transcript` contains a list of dictionaries with 'text', 'start', and 'duration' keys
if transcript is not None:
    texts, starts, durations = zip(*[(t['text'], t['start'], t['duration']) for t in transcript])
    ends = [start + duration for start, duration in zip(starts, durations)]
    
    segments = []
    done = False
    start_idx = 0
    while not done:
        new_segment = {
            'text': [],
            'start': 0,
            'duration': 0
        }

        end_idx, end_time = binary_search(ends, segment_length)
        end_idx+=1
        
        new_text = texts[start_idx:end_idx]
        new_start = starts[start_idx]
        new_duration = end_time - new_start
        new_segment['text'] = new_text
        new_segment['start'] = new_start
        new_segment['duration'] = new_duration

        # for k,v in new_segment.items():
        #     print(k,v)
        
        segments.append(new_segment)

        if end_idx == len(ends):
            done = True
        
        start_idx = end_idx
        segment_length += 10
        

#4 Trim audio according to segments
input_file = f"{vid}.mp3"
output_dir = f"segments/{vid}"
os.makedirs(output_dir, exist_ok=True)

for segment in segments:
    start_time = segment['start']
    duration = segment['duration']
    text = segment['text']
    output_file = f"{output_dir}/segment_{start_time}.wav"
    extract_segment(input_file, output_file, start_time, start_time + duration)
    with open(f"{output_dir}/segment_{start_time}.txt", "w", encoding="utf-8") as f:
        f.write("\n".join(text))


In [105]:
output_dir = "segments"
os.makedirs(output_dir, exist_ok=True)
trim_audio(output_template, output_dir)

output_resampled_dir = "segments_resampled"
os.makedirs(output_resampled_dir, exist_ok=True)

for segment_file in os.listdir(output_dir):
    input_file = os.path.join(output_dir, segment_file)
    output_file = os.path.join(output_resampled_dir, segment_file)
    resample_audio(input_file, output_file)

KeyboardInterrupt: 

In [None]:
from IPython.display import Audio
from matplotlib import pyplot as plt

# sample -> {"file", "audio{"file", "array", "sampling_rate"}, "text", "speaker_id","chapter_id", "id"}

segments_dir = "segments_resampled/"
for root, dirs, files in os.walk(segments_dir):
    for f in files:
        print(f)
        seg = os.path.join(root, f)
        sample = {"audio": {
                    "file": seg,
                    "array": librosa.load(seg, sr=16000)[0],
                    "sampling_rate": librosa.load(seg, sr=16000)[1]
                    }
                }
        print(sample["audio"]["array"].shape, sample["audio"]["sampling_rate"])
        fig,axs = plt.subplots(2,1, figsize=(10,10))
        axs[0].plot(sample["audio"]["array"])
        Audio(sample["audio"]["array"], rate=sample["audio"]["sampling_rate"])

        break




sample["audio"]["array"].shape

fig,axs = plt.subplots(2,1, figsize=(10,10))
axs[0].plot(sample["audio"]["array"])


In [None]:
from transformers import pipeline

pipe = pipeline("automatic-speech-recognition", model="facebook/wav2vec2-base-100h")

In [None]:
pipe(sample["audio"].copy())

In [None]:
import torch
from transformers import pipeline

device = "cuda:0" if torch.cuda.is_available() else "cpu"
pipe = pipeline(
    "automatic-speech-recognition", model="openai/whisper-base", device=device
)

In [None]:
pipe(sample["audio"], max_new_tokens=256)

In [None]:
model