In [None]:
import yt_dlp
import os
import subprocess
import pandas as pd
from google.cloud import speech
from google.cloud import storage
from pydub import AudioSegment

def authenticate_gcp(json_key_path: str):
    """Authenticates the Google Cloud client using a service account key."""
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = json_key_path
    print("GCP authentication successful.")

def download_audio(url: str, download_folder: str) -> str:
    """Downloads audio from a YouTube URL and saves it as a WAV file."""
    # Set up yt-dlp options for audio download
    ydl_opts = {
        'format': 'bestaudio/best',  # Download the best available audio quality
        'outtmpl': os.path.join(download_folder, '%(title)s.%(ext)s'),  # Save path and file name
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',  # Extract audio using FFmpeg
            'preferredcodec': 'wav',  # Save as WAV format
            'preferredquality': '192',  # Set the audio quality (192 kbps)
        }],
    }

    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            print(f"Downloading audio from: {url}")
            result = ydl.extract_info(url, download=True)
            print(f"Audio successfully downloaded to: {download_folder}")
            # Get the filename
            filename = ydl.prepare_filename(result)
            # Adjust the filename to have .wav extension
            base, ext = os.path.splitext(filename)
            wav_filename = base + '.wav'
            return wav_filename
    except Exception as e:
        print(f"An error occurred while downloading audio: {e}")
        return None

def split_audio(audio_file: str, chunk_length_ms: int = 30000):
    """Splits an audio file into chunks of specified length (in milliseconds) and converts to mono."""
    audio = AudioSegment.from_file(audio_file)
    chunks = []
    for i in range(0, len(audio), chunk_length_ms):
        chunk = audio[i:i+chunk_length_ms]
        # Convert chunk to mono and set sample rate to 48000 Hz
        chunk = chunk.set_channels(1).set_frame_rate(48000)
        chunk_filename = f"{os.path.splitext(audio_file)[0]}_chunk{i//chunk_length_ms}.wav"
        chunk.export(chunk_filename, format="wav")
        chunks.append(chunk_filename)
    print(f"Audio file {audio_file} split into {len(chunks)} chunks.")
    return chunks

def upload_to_gcs(bucket_name: str, source_file_name: str, destination_blob_name: str):
    """Uploads a file to the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)
    print(f"File {source_file_name} uploaded to {destination_blob_name}.")
    return blob.public_url  # Return the public URL of the uploaded file

def transcribe_gcs(gcs_uri: str) -> str:
    """Asynchronously transcribes the audio file from Cloud Storage."""
    client = speech.SpeechClient()
    audio = speech.RecognitionAudio(uri=gcs_uri)
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=48000,  # Adjusted to match the sample rate of the chunks
        language_code="en-US",
    )

    operation = client.long_running_recognize(config=config, audio=audio)
    print(f"Transcribing {gcs_uri}...")
    response = operation.result(timeout=600)

    # Combine all transcripts into one text
    transcript = " ".join(result.alternatives[0].transcript for result in response.results)
    return transcript

def main():
    # User input for authentication
    json_key_path = input("Enter the path to your Google Cloud service account JSON key file: ")
    authenticate_gcp(json_key_path)

    # Replace these with your specific details
    bucket_name = input("Enter your GCS bucket name: ")  # GCS bucket name

    # Get list of YouTube URLs
    youtube_urls = input("Enter YouTube URLs separated by commas: ").split(',')

    # Download folder
    download_folder = '/Users/milanvaghani/Desktop/Unstructed Machine Learning/Audio Files'
    if not os.path.exists(download_folder):
        os.makedirs(download_folder)

    # Prepare a list to collect results
    results = []

    for url in youtube_urls:
        url = url.strip()
        if not url:
            continue

        # Download audio
        wav_file_path = download_audio(url, download_folder)
        if not wav_file_path:
            continue

        # Split audio into 30-second chunks
        chunk_files = split_audio(wav_file_path, chunk_length_ms=30000)

        # Process each chunk
        for idx, chunk_file in enumerate(chunk_files):
            # Upload the chunk to Google Cloud Storage and get the public URL
            destination_blob_name = os.path.basename(chunk_file)
            audio_file_url = upload_to_gcs(bucket_name, chunk_file, destination_blob_name)

            # Construct the GCS URI
            gcs_uri = f'gs://{bucket_name}/{destination_blob_name}'

            # Transcribe the audio chunk
            transcript = transcribe_gcs(gcs_uri)

            # Append results to the list
            results.append({
                "YouTube URL": url,
                "Chunk Number": idx + 1,
                "Chunk Filename": chunk_file,
                "Transcript": transcript.strip()
            })

    # Create a DataFrame with the results
    df = pd.DataFrame(results)

    # Display the DataFrame
    print(df)

    # Optionally save the DataFrame to a CSV file
    df.to_csv("data-files/transcript_data_combined.csv", index=False)
    print("DataFrame saved to transcript_data_combined.cs")

if __name__ == '__main__':
    main()