<a href="https://colab.research.google.com/github/Ak-Gautam/AudioDataPrerocess/blob/main/audio_diary_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install aiohttp aiofiles huggingface_hub -q

In [None]:
import os
import asyncio
import aiohttp
import aiofiles
from huggingface_hub import hf_hub_url, HfApi
from tqdm.asyncio import tqdm_asyncio

In [None]:
async def download_file(session, file, repo_id, repo_type, destination_dir, semaphore):
    async with semaphore:
        file_url = hf_hub_url(repo_id, file, repo_type=repo_type)
        dest_path = os.path.join(destination_dir, file)
        os.makedirs(os.path.dirname(dest_path), exist_ok=True)

        async with session.get(file_url) as response:
            if response.status == 200:
                async with aiofiles.open(dest_path, 'wb') as f:
                    await f.write(await response.read())
            else:
                print(f"Failed to download {file}: HTTP {response.status}")

async def download_dataset(repo_id, repo_type, folder_path, destination_dir, max_concurrent=10):
    api = HfApi()
    all_files = api.list_repo_files(repo_id, repo_type=repo_type)
    folder_files = [f for f in all_files if f.startswith(folder_path)]

    semaphore = asyncio.Semaphore(max_concurrent)
    async with aiohttp.ClientSession() as session:
        tasks = [
            download_file(session, file, repo_id, repo_type, destination_dir, semaphore)
            for file in folder_files
        ]
        await tqdm_asyncio.gather(*tasks, desc="Downloading files")

# Configuration
repo_id = "Alignment-Lab-AI/podcast-1-test-preprocessed"
repo_type = "dataset"
folder_path = "0"
destination_dir = "content/ddata"

# Run the async function
async def main():
    await download_dataset(repo_id, repo_type, folder_path, destination_dir)
    print(f"Folder '{folder_path}' from repository '{repo_id}' has been saved to '{destination_dir}'")

# This part is changed to work in Jupyter/Colab
import nest_asyncio
nest_asyncio.apply()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

In [None]:
!pip install pydub -q

In [None]:
# Download a static FFmpeg build and add it to PATH.
exist = !which ffmpeg
if not exist:
  !curl https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz -o ffmpeg.tar.xz \
     && tar -xf ffmpeg.tar.xz && rm ffmpeg.tar.xz
  ffmdir = !find . -iname ffmpeg-*-static
  path = %env PATH
  path = path + ':' + ffmdir[0]
  %env PATH $path
print('')
!which ffmpeg
print('Done!')

In [None]:
from pydub import AudioSegment

In [None]:
!pip install pyannote.audio==3.1.1 -q

In [None]:
from pyannote.audio import Pipeline

In [None]:
# Constants
HF_TOKEN = ""
INPUT_FOLDER = "content/ddata/0"
OUTPUT_FOLDER = "contet/autodiarization"
SPACER_DURATION = 2000  # milliseconds

In [None]:
pipeline = Pipeline.from_pretrained('pyannote/speaker-diarization-3.1', use_auth_token=HF_TOKEN)

In [None]:
import torch

In [None]:
!pip install git+https://github.com/openai/whisper.git -q

In [None]:
import csv
from typing import List, Tuple
import shutil
import whisper

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
pipeline.to(device)

In [None]:
# Initialize Whisper model
whisper_model = whisper.load_model('large', device=device)

In [None]:
def create_directory(path: str):
    os.makedirs(path, exist_ok=True)

In [None]:
def transcribe_audio(audio_path: str) -> str:
    result = whisper_model.transcribe(audio=audio_path, language='en')
    return result['text']

In [None]:
def process_audio_file(input_file: str, output_dir: str) -> List[Tuple[str, str, str]]:
    audio = AudioSegment.from_file(input_file)
    spacer = AudioSegment.silent(duration=SPACER_DURATION)
    audio = spacer.append(audio, crossfade=0)

    temp_wav = os.path.join(output_dir, "temp.wav")
    audio.export(temp_wav, format='wav')

    diarization = pipeline(temp_wav)

    segments = []
    for turn, _, speaker in diarization.itertracks(yield_label=True):
        start = int(turn.start * 1000)
        end = int(turn.end * 1000)
        segment = audio[start:end]

        speaker_dir = os.path.join(output_dir, f"speaker_{speaker.lower()}")
        create_directory(speaker_dir)

        segment_filename = f"speaker_{speaker}_{len(segments):03d}.wav"
        segment_path = os.path.join(speaker_dir, segment_filename)
        segment.export(segment_path, format='wav')

        # Transcribe the segment
        transcription = transcribe_audio(segment_path)

        segments.append((segment_filename, f"Speaker {speaker}", transcription))

    os.remove(temp_wav)

    return segments

In [None]:
def write_metadata(speaker_dir: str, segments: List[Tuple[str, str, str]]):
    create_directory(speaker_dir)
    metadata_path = os.path.join(speaker_dir, "metadata.csv")
    with open(metadata_path, 'w', newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter='|')
        writer.writerow(["filename", "speaker", "text"])
        for segment in segments:
            writer.writerow([segment[0].split('.')[0], segment[1], segment[2]])

In [None]:
def main():
    create_directory(OUTPUT_FOLDER)
    for i, filename in enumerate(os.listdir(INPUT_FOLDER)):
        if filename.endswith(('.mp3', '.wav', '.flac')):
            input_file = os.path.join(INPUT_FOLDER, filename)
            output_dir = os.path.join(OUTPUT_FOLDER, str(i).lower())
            create_directory(output_dir)

            print(f"Processing file {i + 1}: {filename}")
            segments = process_audio_file(input_file, output_dir)

            speakers = set(segment[1] for segment in segments)
            for speaker in speakers:
                speaker_segments = [segment for segment in segments if segment[1] == speaker]
                speaker_dir = os.path.join(output_dir, speaker.lower().replace(' ', '_'))
                write_metadata(speaker_dir, speaker_segments)

            print(f"Finished processing file {i + 1}: {filename}")

In [None]:
main()