"""youtube_translation_pipeline.py

A self‑contained OOP pipeline that
1. Grabs basic YouTube metadata **with pytube** (no Data API needed)
2. Pulls the English transcript via *youtube‑transcript‑api*
3. Translates the full transcript → Armenian with four back‑ends:
      • Google Translate (googletrans)
      • DeepL
      • OpenAI GPT
      • Anthropic Claude
4. Stores all results + source metadata in `<video_id>_translations.json`.

Install requirements:
    pip install pytube youtube-transcript-api googletrans==4.0.0-rc1 \
                deepl openai anthropic

Set env vars for the keys you actually use:
    export DEEPL_API_KEY=...
    export OPENAI_API_KEY=...
    export ANTHROPIC_API_KEY=...
"""

# Google AI Studio

In [5]:
!pip install -q -U google-genai


In [None]:
from google import genai

key = ""
client = genai.Client(api_key=key)

response = client.models.generate_content(
    model="gemini-2.5-flash", contents="Explain how AI works in a few words"
)
print(response.text)

AI learns patterns from data to make decisions or perform tasks, mimicking human intelligence.


In [None]:
# # To run this code you need to install the following dependencies:
# # pip install google-genai

# import base64
# import mimetypes
# import os
# import re
# import struct
# from google import genai
# from google.genai import types


# def save_binary_file(file_name, data):
#     f = open(file_name, "wb")
#     f.write(data)
#     f.close()
#     print(f"File saved to to: {file_name}")

# key = "AIzaSyBL4SGRSnP_uSUCfyL7bOgZFlZnCUa875c"

# def generate():
#     client = genai.Client(
#         api_key="AIzaSyBL4SGRSnP_uSUCfyL7bOgZFlZnCUa875c",
#     )

#     model = "gemini-2.5-pro-preview-tts"
#     contents = [
#         types.Content(
#             role="user",
#             parts=[
#                 types.Part.from_text(text="""Կուզես պայթիր, կուզես ճչա,
# Քեզ մարդու տեղ դնող չկա,
# Զգույշ, գլխիդ փորձանք չգա,
# Սիրտ, անցել է սրտի դարը:
# Էլ չեն երդվում քո արևով,
# Չեն տաքանում քո բարևով,
# Էլ չես վառում դու վառվելով
# Սիրտ, անցել է քո հազարը:
# Ինչքան տխրես, ինչքան ժպտաս
# Ինչքան խփես ու թպրտաս,
# Միևնույն է, տանուլ կտաս,
# Էլ չի բերում, սիրտ քո զարը:
# Միտքն է հիմա սերն աշխարհի,
# Աշխարհակալ տերն աշխարհի,
# Բեռնակիրն ու բեռն աշարհի,
# Եվ աշխարհի ճանապարհը:"""),
#             ],
#         ),
#     ]
#     generate_content_config = types.GenerateContentConfig(
#         temperature=1,
#         response_modalities=[
#             "audio",
#         ],
#         speech_config=types.SpeechConfig(
#             voice_config=types.VoiceConfig(
#                 prebuilt_voice_config=types.PrebuiltVoiceConfig(
#                     voice_name="Zephyr"
#                 )
#             )
#         ),
#     )

#     file_index = 0
#     for chunk in client.models.generate_content_stream(
#         model=model,
#         contents=contents,
#         config=generate_content_config,
#     ):
#         if (
#             chunk.candidates is None
#             or chunk.candidates[0].content is None
#             or chunk.candidates[0].content.parts is None
#         ):
#             continue
#         if chunk.candidates[0].content.parts[0].inline_data and chunk.candidates[0].content.parts[0].inline_data.data:
#             file_name = f"ENTER_FILE_NAME_{file_index}"
#             file_index += 1
#             inline_data = chunk.candidates[0].content.parts[0].inline_data
#             data_buffer = inline_data.data
#             file_extension = mimetypes.guess_extension(inline_data.mime_type)
#             if file_extension is None:
#                 file_extension = ".wav"
#                 data_buffer = convert_to_wav(inline_data.data, inline_data.mime_type)
#             save_binary_file(f"{file_name}{file_extension}", data_buffer)
#         else:
#             print(chunk.text)

# def convert_to_wav(audio_data: bytes, mime_type: str) -> bytes:
#     """Generates a WAV file header for the given audio data and parameters.

#     Args:
#         audio_data: The raw audio data as a bytes object.
#         mime_type: Mime type of the audio data.

#     Returns:
#         A bytes object representing the WAV file header.
#     """
#     parameters = parse_audio_mime_type(mime_type)
#     bits_per_sample = parameters["bits_per_sample"]
#     sample_rate = parameters["rate"]
#     num_channels = 1
#     data_size = len(audio_data)
#     bytes_per_sample = bits_per_sample // 8
#     block_align = num_channels * bytes_per_sample
#     byte_rate = sample_rate * block_align
#     chunk_size = 36 + data_size  # 36 bytes for header fields before data chunk size

#     # http://soundfile.sapp.org/doc/WaveFormat/

#     header = struct.pack(
#         "<4sI4s4sIHHIIHH4sI",
#         b"RIFF",          # ChunkID
#         chunk_size,       # ChunkSize (total file size - 8 bytes)
#         b"WAVE",          # Format
#         b"fmt ",          # Subchunk1ID
#         16,               # Subchunk1Size (16 for PCM)
#         1,                # AudioFormat (1 for PCM)
#         num_channels,     # NumChannels
#         sample_rate,      # SampleRate
#         byte_rate,        # ByteRate
#         block_align,      # BlockAlign
#         bits_per_sample,  # BitsPerSample
#         b"data",          # Subchunk2ID
#         data_size         # Subchunk2Size (size of audio data)
#     )
#     return header + audio_data

# def parse_audio_mime_type(mime_type: str) -> dict[str, int | None]:
#     """Parses bits per sample and rate from an audio MIME type string.

#     Assumes bits per sample is encoded like "L16" and rate as "rate=xxxxx".

#     Args:
#         mime_type: The audio MIME type string (e.g., "audio/L16;rate=24000").

#     Returns:
#         A dictionary with "bits_per_sample" and "rate" keys. Values will be
#         integers if found, otherwise None.
#     """
#     bits_per_sample = 16
#     rate = 24000

#     # Extract rate from parameters
#     parts = mime_type.split(";")
#     for param in parts: # Skip the main type part
#         param = param.strip()
#         if param.lower().startswith("rate="):
#             try:
#                 rate_str = param.split("=", 1)[1]
#                 rate = int(rate_str)
#             except (ValueError, IndexError):
#                 # Handle cases like "rate=" with no value or non-integer value
#                 pass # Keep rate as default
#         elif param.startswith("audio/L"):
#             try:
#                 bits_per_sample = int(param.split("L", 1)[1])
#             except (ValueError, IndexError):
#                 pass # Keep bits_per_sample as default if conversion fails

#     return {"bits_per_sample": bits_per_sample, "rate": rate}


# if __name__ == "__main__":
#     generate()


ClientError: 429 RESOURCE_EXHAUSTED. {'error': {'code': 429, 'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits.', 'status': 'RESOURCE_EXHAUSTED', 'details': [{'@type': 'type.googleapis.com/google.rpc.QuotaFailure', 'violations': [{'quotaMetric': 'generativelanguage.googleapis.com/generate_content_free_tier_input_token_count', 'quotaId': 'GenerateContentInputTokensPerModelPerMinute-FreeTier', 'quotaDimensions': {'model': 'gemini-2.5-pro-tts', 'location': 'global'}}, {'quotaMetric': 'generativelanguage.googleapis.com/generate_content_free_tier_requests', 'quotaId': 'GenerateRequestsPerMinutePerProjectPerModel-FreeTier', 'quotaDimensions': {'location': 'global', 'model': 'gemini-2.5-pro-tts'}}, {'quotaMetric': 'generativelanguage.googleapis.com/generate_content_free_tier_requests', 'quotaId': 'GenerateRequestsPerDayPerProjectPerModel-FreeTier', 'quotaDimensions': {'location': 'global', 'model': 'gemini-2.5-pro-tts'}}]}, {'@type': 'type.googleapis.com/google.rpc.Help', 'links': [{'description': 'Learn more about Gemini API quotas', 'url': 'https://ai.google.dev/gemini-api/docs/rate-limits'}]}, {'@type': 'type.googleapis.com/google.rpc.RetryInfo', 'retryDelay': '14s'}]}}

In [None]:
# from google import genai
# from google.genai import types
# import wave

# # Set up the wave file to save the output:
# def wave_file(filename, pcm, channels=1, rate=24000, sample_width=2):
#    with wave.open(filename, "wb") as wf:
#       wf.setnchannels(channels)
#       wf.setsampwidth(sample_width)
#       wf.setframerate(rate)
#       wf.writeframes(pcm)

# client = genai.Client(api_key=key)

# response = client.models.generate_content(
#    model="gemini-2.5-flash-preview-tts",
#    contents="Say cheerfully: Պանիր",
#    config=types.GenerateContentConfig(
#       response_modalities=["AUDIO"],
#       speech_config=types.SpeechConfig(
#          voice_config=types.VoiceConfig(
#             prebuilt_voice_config=types.PrebuiltVoiceConfig(
#                voice_name='Kore',
#             )
#          )
#       ),
#    )
# )

# data = response.candidates[0].content.parts[0].inline_data.data

# file_name='out.wav'
# wave_file(file_name, data) # Saves the file to current directory

AttributeError: 'NoneType' object has no attribute 'parts'

# Manim 

In [None]:
# conda create --name youtube 
# pip install pytube youtube_transcript_api deepl openai

In [19]:
from __future__ import annotations

import os
import json
from abc import ABC, abstractmethod

from dataclasses import dataclass, field, asdict

from datetime import datetime
from typing import Dict, List, Optional

# --- third‑party deps -------------------------------------------------------
from pytubefix import YouTube
from youtube_transcript_api import YouTubeTranscriptApi  # for captions
from googletrans import Translator as _GoogleTranslator # https://github.com/microsoft/TaskWeaver/issues/172
# import deepl  
# import openai 
# import anthropic  

In [None]:
# https://github.com/microsoft/TaskWeaver/issues/172

# Data models

In [20]:
@dataclass()
class VideoInfo:
    video_id: str
    title: str
    channel: str
    publish_date: str  # ISO‑8601 date string
    description: str
    keywords: List[str]
    length_seconds: int
    url: str


@dataclass()
class TranscriptSegment:
    start: float
    duration: float
    text: str


@dataclass()
class TranslationResult:
    engine: str  # e.g. "google", "deepl", "openai", "claude"
    translated_text: str


# Translation

In [21]:

class BaseTranslator(ABC):
    """Abstract translator: subclasses implement *translate()* returning Armenian text."""

    def __init__(self, target_lang: str = "hy") -> None:
        self.target_lang = target_lang

    @abstractmethod
    def translate(self, text: str) -> str:
        """Translate *text* to *self.target_lang* and return the result."""


In [22]:


class GoogleTranslator(BaseTranslator):
    def __init__(self, target_lang: str = "hy") -> None:
        super().__init__(target_lang)
        self._client = _GoogleTranslator()

    def translate(self, text: str) -> str:
        return self._client.translate(text, dest=self.target_lang).text  # type: ignore[attr-defined]


# class DeepLTranslator(BaseTranslator):
#     def __init__(self, api_key: Optional[str] = None, target_lang: str = "hy") -> None:
#         super().__init__(target_lang)
#         api_key = api_key or os.getenv("DEEPL_API_KEY")
#         if not api_key:
#             raise RuntimeError("DEEPL_API_KEY env var required for DeepL translator")
#         self._client = deepl.Translator(api_key)

#     def translate(self, text: str) -> str:
#         result = self._client.translate_text(text, target_lang=self.target_lang.upper())
#         return result.text  # type: ignore[attr-defined]


# class OpenAITranslator(BaseTranslator):
#     def __init__(self, model: str = "gpt-4o-mini", temperature: float = 0.0, target_lang: str = "hy") -> None:
#         super().__init__(target_lang)
#         self.model = model
#         self.temperature = temperature

#     def translate(self, text: str) -> str:
#         openai.api_key = os.getenv("OPENAI_API_KEY")
#         if not openai.api_key:
#             raise RuntimeError("OPENAI_API_KEY env var not set")
#         response = .create(  # type: ignore[attr-defined]
#             model=self.model,
#             messages=[
#                 {
#                     "role": "system",
#                     "content": "Translate the following text to Armenian (hy) keeping formatting and line breaks where reasonable."
#                 },
#                 {"role": "user", "content": text},
#             ],
#             temperature=self.temperature,
#         )
#         return response.choices[0].message.content.strip()  # type: ignore[attr-defined]


In [23]:
from openai import OpenAI

client = OpenAI()

response = client.responses.create(
  model="gpt-4.1-nano",
  input="Tell me a three sentence bedtime story about a unicorn."
)

print(response)


OpenAIError: The api_key client option must be set either by passing api_key to the client or by setting the OPENAI_API_KEY environment variable

# YouTube metadata

In [27]:

def fetch_video_info(url: str) -> VideoInfo:
    yt = YouTube(url)
    print(f"Fetching video info for {url}...")
    print(f"Title: {yt.title}")
    return VideoInfo(
        video_id=yt.video_id,
        title=yt.title or "",
        channel=yt.author or "",
        publish_date=yt.publish_date.isoformat() if yt.publish_date else "",
        description=yt.description or "",
        keywords=yt.keywords or [],
        length_seconds=yt.length,
        url=url,
        
    ), yt
    # return yt 


def fetch_transcript(video_id: str, lang: str = "en") -> List[TranscriptSegment]:
    raw = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang])
    return [TranscriptSegment(**seg) for seg in raw]



In [38]:
yt = fetch_video_info("https://www.youtube.com/watch?v=3MqYE2UuN24")

Fetching video info for https://www.youtube.com/watch?v=3MqYE2UuN24...
Title: Is it Better to Walk or Run in the Rain?


In [43]:
yt[1].streams.filter(progressive=True, file_extension="mp4").order_by("resolution")\
.desc().first().download(filename="video.mp4")

'c:\\Users\\hayk_\\OneDrive\\Desktop\\python_math_ml_course\\python\\video.mp4'

In [47]:
yt[1].streams.get_audio_only().download(filename="audio.mp3")

'c:\\Users\\hayk_\\OneDrive\\Desktop\\python_math_ml_course\\python\\audio.mp3'

In [54]:
c = yt[1].captions["en"]#.download("a")

In [56]:
c.generate_txt_captions()

"On those cold, rainy days when you forget your rain jacket or umbrella and you want to stay as dry as possible… should you walk and spend more time in the rain? Or should you run, which means you'll be smashing into more raindrops from the side? Assuming you haven't been fully soaked yet and you aren't jumping into puddles, the answer is simple. As you move out of the way of one falling raindrop, you move into the way of another. So the amount of rain hitting the top of you is constant, regardless of how fast you're going. Alternatively, you can picture that the raindrops themselves are stationary and you (and the earth beneath you) are moving upwards through the rain! And since the volume of a parallelepiped (that's a 3D parallelogram) doesn't depend at all on its slant, then no matter how fast you're moving horizontally the same amount of rain will land on top of you each second. Now, if you're not moving, the rain from the top is all you'll get. But if you ARE moving, you'll also r

In [58]:
from googletrans import Translator

In [61]:
tr = Translator()

In [63]:
a = tr.translate(text="Hello, world!", dest="hy")

In [68]:
import asyncio
from googletrans import Translator

async def translate_text():
    async with Translator() as translator:
        result = await translator.translate('안녕하세요.')
        print(result)  # <Translated src=ko dest=en text=Good evening. pronunciation=Good evening.>

  
asyncio.run(translate_text())

RuntimeError: asyncio.run() cannot be called from a running event loop

In [2]:
async def main():
    print(1)
    
await main()

1


In [4]:
import asyncio
from googletrans import Translator

# https://stackoverflow.com/questions/55409641/asyncio-run-cannot-be-called-from-a-running-event-loop-when-using-jupyter-no

async def translate_text():
    async with Translator() as translator:
        result = await translator.translate('cheese', dest='hy')
        print(result)  # <Translated src=ko dest=en text=Good evening. pronunciation=Good evening.>

await translate_text()

Translated(src=en, dest=hy, text=պանիր, pronunciation=panir, extra_data="{'translat...")


In [None]:
import deepl

auth_key = ""  # Replace with your key
deepl_client = deepl.DeepLClient(auth_key)

result = deepl_client.translate_text("Hello, world!", target_lang="FR")
print(result.text)  # "Bonjour, le monde !"

Bonjour à tous !


In [57]:
GoogleTranslator().translate(c.generate_txt_captions())

AttributeError: 'coroutine' object has no attribute 'text'

In [32]:
yt[1].__dir__()

['_js',
 '_js_url',
 '_vid_info',
 '_vid_details',
 '_watch_html',
 '_embed_html',
 '_player_config_args',
 '_age_restricted',
 '_fmt_streams',
 '_initial_data',
 '_metadata',
 'video_id',
 'watch_url',
 'embed_url',
 'client',
 'fallback_clients',
 '_signature_timestamp',
 '_visitor_data',
 'stream_monostate',
 '_author',
 '_title',
 '_publish_date',
 'use_oauth',
 'allow_oauth_cache',
 'token_file',
 'oauth_verifier',
 'use_po_token',
 'po_token_verifier',
 'po_token',
 '_pot',
 '__module__',
 '__doc__',
 '__init__',
 '__repr__',
 '__eq__',
 'watch_html',
 'embed_html',
 'age_restricted',
 'js_url',
 'js',
 'visitor_data',
 'pot',
 'initial_data',
 'streaming_data',
 'fmt_streams',
 'check_availability',
 'signature_timestamp',
 'video_playback_ustreamer_config',
 'server_abr_streaming_url',
 'vid_info',
 'vid_details',
 'age_check',
 'caption_tracks',
 'captions',
 'chapters',
 'key_moments',
 'replayed_heatmap',
 'streams',
 'thumbnail_url',
 'publish_date',
 'title',
 'description

In [None]:
# fetch_transcript("3MqYE2UuN24")

In [38]:


# ---------------------------------------------------------------------------
# YouTube utilities (pytube + youtube_transcript_api)
# ---------------------------------------------------------------------------

# ---------------------------------------------------------------------------
# Orchestration pipeline
# ---------------------------------------------------------------------------

@dataclass
class TranslationPipeline:
    url: str
    translators: List[BaseTranslator] = field(default_factory=list)
    out_dir: str = "out"

    def run(self) -> None:
        # 1. fetch metadata + transcript
        info = fetch_video_info(self.url)
        transcript_segments = fetch_transcript(info.video_id)
        full_text = "\n".join(seg.text for seg in transcript_segments)

        # 2. run each translator
        results: List[TranslationResult] = []
        for t in self.translators:
            print(f"Translating with {t.__class__.__name__}…")
            armenian_text = t.translate(full_text)
            results.append(TranslationResult(engine=t.__class__.__name__, translated_text=armenian_text))

        # 3. persist
        self.out_dir.mkdir(exist_ok=True)
        out_path = self.out_dir / f"{info.video_id}_translations.json"
        with out_path.open("w", encoding="utf-8") as fp:
            json.dump(
                {
                    "video": asdict(info),
                    "translations": [asdict(r) for r in results],
                    "generated_at": datetime.utcnow().isoformat() + "Z",
                },
                fp,
                ensure_ascii=False,
                indent=2,
            )
        print(f"Saved ⇒ {out_path}")


# ---------------------------------------------------------------------------
# Simple entry‑point (no argparse)
# ---------------------------------------------------------------------------

if __name__ == "__main__":
    # example_url = input("YouTube URL: ").strip()
    example_url = "https://www.youtube.com/watch?v=3MqYE2UuN24"
    pipeline = TranslationPipeline(
        url=example_url,
        translators=[
            GoogleTranslator(),
            # DeepLTranslator(),
            # OpenAITranslator(),
            # ClaudeTranslator(),
        ],
    )
    pipeline.run()


Fetching video info for https://www.youtube.com/watch?v=3MqYE2UuN24...
Title: Is it Better to Walk or Run in the Rain?
Title: Is it Better to Walk or Run in the Rain?
Translating with GoogleTranslator…
Translating with GoogleTranslator…


AttributeError: 'coroutine' object has no attribute 'text'

In [None]:
import os
import openai 


openai.api_key = os.getenv("OPENAI_API_KEY")

print(os.environ)

if not openai.api_key:
    raise RuntimeError("OPENAI_API_KEY env var not set")
response = openai.ChatCompletion.create(  # type: ignore[attr-defined]
    model=self.model,
    messages=[
        {
            "role": "system",
            "content": "Translate the following text to Armenian (hy) keeping formatting and line breaks where reasonable."
        },
        {"role": "user", "content": "Cheese"},
    ],
    temperature=self.temperature,
)
return response.choices[0].message.content.strip()

In [None]:
import icecream