In [None]:
# !pip install moviepy
# !pip install pytube
# !pip install yt-dlp
# !pip install nemo_toolkit['asr']
# !pip install -U nemo_toolkit["asr"]

In [None]:
import subprocess
import os

VIDEO_URLS = [
    "https://www.youtube.com/watch?v=Dh4XO8mL8Rw",
    "https://www.youtube.com/watch?v=5CdNZ2Agn8I",
    "https://www.youtube.com/watch?v=OwkiSeoaaF8"
]

def download_youtube_video(url, output_path="downloads"):
    try:
        os.makedirs(output_path, exist_ok=True)
        print(f"\n Downloading video from: {url}")
        
        cmd = [
            "yt-dlp",
            "-f", "bestvideo+bestaudio/best",
            "-o", os.path.join(output_path, "%(title)s.%(ext)s"),
            url
        ]
        
        subprocess.run(cmd, check=True)
        print("Download complete.")
    
    except subprocess.CalledProcessError as e:
        print(f"Download failed for {url}: {e}")
    except Exception as e:
        print(f"Unexpected error for {url}: {e}")

if __name__ == "__main__":
    for video_url in VIDEO_URLS:
        download_youtube_video(video_url)


In [None]:
from moviepy.editor import VideoFileClip
import os

def extract_audio(video_path, output_audio_path):
    if not os.path.exists(video_path):
        print(f"File not found: {video_path}")
        return

    try:
        video_clip = VideoFileClip(video_path)

        if video_clip.audio is None:
            print(f" No audio track found in {video_path}")
            return

        video_clip.audio.write_audiofile(output_audio_path)
        print(f"Audio extracted: {output_audio_path}")

    except Exception as e:
        print(f"Error extracting audio from {video_path}: {e}")

if __name__ == "__main__":
    input_dir = "downloads"
    output_dir = "audio"
    os.makedirs(output_dir, exist_ok=True)

    for filename in os.listdir(input_dir):
        if filename.endswith(".mkv"):
            video_path = os.path.join(input_dir, filename)
            base_name = os.path.splitext(filename)[0]
            audio_path = os.path.join(output_dir, f"{base_name}.wav") 

            extract_audio(video_path, audio_path)


In [None]:
import os
import torchaudio
import soundfile as sf

def convert_to_mono_16k(input_path, output_path):
    waveform, sample_rate = torchaudio.load(input_path)

    # Convert to mono if stereo
    if waveform.shape[0] > 1:
        waveform = waveform.mean(dim=0, keepdim=True)

    # Resample if needed
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)
        sample_rate = 16000

    sf.write(output_path, waveform.squeeze().numpy(), sample_rate)
    print(f"✓ Converted: {output_path}")

def process_audio_folder(input_dir="./audio", output_dir="./processed_audio"):
    os.makedirs(output_dir, exist_ok=True)

    for file in os.listdir(input_dir):
        if file.endswith(".wav") or file.endswith(".mp3"):
            input_path = os.path.join(input_dir, file)
            output_path = os.path.join(output_dir, file.replace(".mp3", ".wav"))  
            convert_to_mono_16k(input_path, output_path)

if __name__ == "__main__":
    process_audio_folder()


In [None]:
import os
import torchaudio
import nemo.collections.asr as nemo_asr

CHUNK_DURATION = 30  
CHUNK_OVERLAP = 3   

def load_audio_chunks(file_path, chunk_duration=CHUNK_DURATION, overlap=CHUNK_OVERLAP, target_sr=16000):
    waveform, sample_rate = torchaudio.load(file_path)

    if sample_rate != target_sr:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=target_sr)
        waveform = resampler(waveform)

    waveform = waveform.mean(dim=0)
    total_samples = waveform.shape[0]
    chunk_size = chunk_duration * target_sr
    overlap_size = overlap * target_sr
    step_size = chunk_size - overlap_size

    chunks = []
    for start in range(0, total_samples, step_size):
        end = min(start + chunk_size, total_samples)
        chunk_waveform = waveform[start:end]
        chunk_path = f"tmp_chunk_{start}_{end}.wav"
        torchaudio.save(chunk_path, chunk_waveform.unsqueeze(0), target_sr)
        chunks.append(chunk_path)

        if end == total_samples:
            break
    return chunks

def transcribe_chunks_nemo(audio_path, model):
    chunk_paths = load_audio_chunks(audio_path)
    full_transcription = ""

    for chunk_path in chunk_paths:
        output = model.transcribe([chunk_path])
        full_transcription += output[0].text.strip() + " "
        os.remove(chunk_path)

    return full_transcription.strip()

def batch_transcribe(audio_dir="./processed_audio", output_dir="./transcripts"):
    os.makedirs(output_dir, exist_ok=True)

    asr_model = nemo_asr.models.ASRModel.from_pretrained(model_name="nvidia/parakeet-tdt-0.6b-v2")

    for file in os.listdir(audio_dir):
        if file.endswith(".wav"):
            audio_path = os.path.join(audio_dir, file)
            print(f"\nTranscribing: {file}")
            transcription = transcribe_chunks_nemo(audio_path, asr_model)

            output_file = os.path.join(output_dir, f"transcript_{os.path.splitext(file)[0]}.txt")
            with open(output_file, "w", encoding="utf-8") as f:
                f.write(transcription + "\n")
            print(f"Saved transcription to {output_file}")

if __name__ == "__main__":
    batch_transcribe()

In [None]:
import os
import torch
import torchaudio
import math
from transformers import WhisperProcessor, WhisperForConditionalGeneration

MAX_INPUT_LENGTH = 30 * 16000  # 30 seconds at 16kHz

def load_model(model_name="openai/whisper-large-v3"):
    device = "cuda" if torch.cuda.is_available() else "cpu"
    processor = WhisperProcessor.from_pretrained(model_name)
    model = WhisperForConditionalGeneration.from_pretrained(model_name).to(device)
    return processor, model, device

def load_audio(file_path):
    waveform, sample_rate = torchaudio.load(file_path)
    if sample_rate != 16000:
        resampler = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=16000)
        waveform = resampler(waveform)
    return waveform[0]  # mono

def transcribe_chunks(audio_path, processor, model, device):
    audio = load_audio(audio_path)
    total_length = audio.shape[0]
    num_chunks = math.ceil(total_length / MAX_INPUT_LENGTH)

    transcription = []

    for i in range(num_chunks):
        start = i * MAX_INPUT_LENGTH
        end = min((i + 1) * MAX_INPUT_LENGTH, total_length)
        chunk = audio[start:end]

        inputs = processor(chunk, sampling_rate=16000, return_tensors="pt")
        input_features = inputs.input_features.to(device)

        predicted_ids = model.generate(input_features)
        decoded = processor.batch_decode(predicted_ids, skip_special_tokens=True)[0]
        transcription.append(decoded.strip())

    return " ".join(transcription)

def batch_transcribe(audio_dir="processed_audio", output_dir="transcripts_whisper"):
    os.makedirs(output_dir, exist_ok=True)

    processor, model, device = load_model()

    for file in os.listdir(audio_dir):
        if file.endswith(".wav"):
            audio_path = os.path.join(audio_dir, file)
            print(f"\nTranscribing with Whisper: {file}")

            transcription = transcribe_chunks(audio_path, processor, model, device)

            output_file = os.path.join(output_dir, f"transcript_{os.path.splitext(file)[0]}.txt")
            with open(output_file, "w", encoding="utf-8") as f:
                f.write(transcription + "\n")

            print(f"Saved to {output_file}")

if __name__ == "__main__":
    batch_transcribe()


In [None]:
import os
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "qingy2024/GRMR-V3-Q1.7B"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto")

def correct_text(text):
    messages = [{"role": "user", "content": text}]
    prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    outputs = model.generate(
        inputs["input_ids"],
        max_new_tokens=1024,
        temperature=0.1,
        do_sample=True
    )

    return tokenizer.decode(outputs[0], skip_special_tokens=True)

def batch_correct_transcripts(input_dir, output_dir):
    os.makedirs(output_dir, exist_ok=True)

    for filename in os.listdir(input_dir):
        if filename.endswith(".txt"):
            input_path = os.path.join(input_dir, filename)
            with open(input_path, "r", encoding="utf-8") as f:
                original_text = f.read()

            print(f"Correcting: {filename}")
            corrected_text = correct_text(original_text)

            output_path = os.path.join(output_dir, f"corrected_{filename}")
            with open(output_path, "w", encoding="utf-8") as f:
                f.write(corrected_text.strip() + "\n")
            print(f"Saved: {output_path}")

if __name__ == "__main__":
    batch_correct_transcripts(input_dir="transcripts", output_dir="transcripts_corrected")