In [4]:
import time
import shutil
import mimetypes
import tempfile
import atexit
from pathlib import Path
from dataclasses import dataclass

from slugify import slugify
from yt_dlp import YoutubeDL


@dataclass
class MediaData:
    filename: str
    file_dir: str
    file_suffix: str
    path: str
    content_type: str
    data: bytes


class YTDLManager:
    def __init__(self):
        self._tmp = tempfile.TemporaryDirectory(prefix="ytdl_")
        self.output_dir = Path(self._tmp.name)
        atexit.register(self._cleanup)

        self._node = shutil.which("node")
        if not self._node:
            raise RuntimeError("Node.js not found in PATH")

    def _cleanup(self):
        try:
            self._tmp.cleanup()
        except Exception:
            pass

    def download(self, url: str, *, audio=True, srt=True, audio_format="mp3"):
        try:
            info = self._extract_info(url)

            if srt:
                file = self._download_subtitle(url, info)
                if file:
                    return self._as_data(file)

            time.sleep(1)

            if audio:
                file = self._download_audio(url, info, audio_format)
                if file:
                    return self._as_data(file)

            return None

        except Exception as e:
            atexit.unregister(self._cleanup)
            return {
                "error": "ytdlp_unavailable",
                "retryable": True,
                "details": str(e)
            }

    # -------- yt-dlp core --------

    def _base_ydl_opts(self):
        return {
            "quiet": True,
            "ignoreerrors": True,
            "sleep_interval": 2,
            "max_sleep_interval": 5,
            "js_runtimes": {
                "node": {
                    "path": self._node
                }
            },
            "extractor_args": {
                "youtube": {
                    "player_client": ["web"],
                }
            },
        }

    def _extract_info(self, url: str):
        opts = {
            "quiet": True,
            "js_runtimes": {
                "node": {
                    "path": self._node
                }
            }
        }
        with YoutubeDL(opts) as ydl:
            return ydl.extract_info(url, download=False)

    def _slug_rename(self, info: dict):
        title = slugify(info.get("title", "output"))
        for f in self.output_dir.iterdir():
            new_path = self.output_dir / f"{title}{f.suffix}"
            f.rename(new_path)
            return new_path
        return None

    # -------- downloads --------

    def _download_subtitle(self, url: str, info: dict):
        has_manual = bool(info.get("subtitles"))
        has_auto = bool(info.get("automatic_captions"))

        if not (has_manual or has_auto):
            return None

        ydl_opts = {
            **self._base_ydl_opts(),
            "skip_download": True,
            "writesubtitles": has_manual,
            "writeautomaticsub": not has_manual and has_auto,
            "subtitlesformat": "srt",
            "outtmpl": str(self.output_dir / "%(id)s.%(ext)s"),
        }

        with YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])

        return self._slug_rename(info)

    def _download_audio(self, url: str, info: dict, audio_format: str):
        ydl_opts = {
            **self._base_ydl_opts(),
            "format": "bestaudio/best",
            "outtmpl": str(self.output_dir / "%(id)s.%(ext)s"),
            "postprocessors": [
                {
                    "key": "FFmpegExtractAudio",
                    "preferredcodec": audio_format,
                    "preferredquality": "192",
                }
            ],
        }

        with YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])

        return self._slug_rename(info)

    # -------- output --------

    def _as_data(self, file: Path) -> MediaData:
        return MediaData(
            filename=file.name,
            file_dir=file.as_posix(),
            file_suffix=file.suffix,
            path=str(file.resolve()),
            content_type=mimetypes.guess_type(file.name)[0]
            or "application/octet-stream",
            data=file.read_bytes(),
        )

In [5]:
r = YTDLManager()

In [6]:
result = r.download('https://www.youtube.com/watch?v=4TEdR9x-oT4')



                                                         

In [7]:
result.file_dir

'C:/Users/seker/AppData/Local/Temp/ytdl_fonwuayj/step-by-step-how-to-get-freelance-clients-from-linkedin.mp3'

In [9]:
import base64
import wave
import time
from pathlib import Path
from langchain_openai import ChatOpenAI
from langchain.messages import HumanMessage
from openai import APIError


class AudioTranscriber:
    def __init__(self, auto_split: bool = True, max_chunk_size_mb: float = 9.5):
        """
        Args:
            auto_split: B√ºy√ºk dosyalarƒ± otomatik olarak b√∂l
            max_chunk_size_mb: Her par√ßanƒ±n maksimum boyutu (MB cinsinden)
        """
        self.llm = ChatOpenAI(
            model=STT_MODEL,
            api_key=OPENROUTER_API_KEY,
            base_url=OPENROUTER_API_HOST,
            timeout=120,
            max_retries=3,
        )
        self.auto_split = auto_split
        self.max_chunk_size_mb = max_chunk_size_mb
        
    @staticmethod
    def __get_file_size_mb(path: Path) -> float:
        """Dosya boyutunu MB cinsinden d√∂nd√ºr√ºr"""
        return path.stat().st_size / (1024 * 1024)

    @staticmethod
    def __encode_audio_to_base64(path: Path) -> str:
        with open(path, "rb") as f:
            return base64.b64encode(f.read()).decode("utf-8")

    @staticmethod
    def __get_wav_duration(audio_path: Path) -> float:
        """WAV dosyasƒ±nƒ±n s√ºresini saniye cinsinden d√∂nd√ºr√ºr."""
        try:
            with wave.open(str(audio_path), 'rb') as wav_file:
                frames = wav_file.getnframes()
                rate = wav_file.getframerate()
                return frames / float(rate)
        except Exception as e:
            print(f"‚ö†Ô∏è S√ºre hesaplama hatasƒ±: {e}")
            return 0

    @staticmethod
    def __calculate_chunk_duration_for_size(
        audio_path: Path,
        target_size_mb: float
    ) -> int:
        """Hedef dosya boyutuna ula≈ümak i√ßin gerekli chunk s√ºresini hesaplar."""
        try:
            with wave.open(str(audio_path), 'rb') as wav_file:
                # WAV dosya bilgileri
                n_channels = wav_file.getnchannels()
                sampwidth = wav_file.getsampwidth()
                framerate = wav_file.getframerate()
                n_frames = wav_file.getnframes()
                
                # Toplam dosya boyutu
                total_size_bytes = n_frames * n_channels * sampwidth
                total_size_mb = total_size_bytes / (1024 * 1024)
                
                # Toplam s√ºre
                total_duration = n_frames / framerate
                
                # MB ba≈üƒ±na s√ºre
                seconds_per_mb = total_duration / total_size_mb if total_size_mb > 0 else 0
                
                # Hedef boyut i√ßin gerekli s√ºre
                chunk_duration = int(seconds_per_mb * target_size_mb)
                
                # Minimum 10 saniye, maksimum dosya s√ºresinin yarƒ±sƒ±
                chunk_duration = max(10, min(chunk_duration, int(total_duration / 2)))
                
                print(f"üìê Hesaplama: {total_size_mb:.2f}MB = {total_duration:.2f}s")
                print(f"üìê Hedef: {target_size_mb}MB ‚Üí ~{chunk_duration}s par√ßalar")
                
                return chunk_duration
                
        except Exception as e:
            print(f"‚ö†Ô∏è Chunk s√ºresi hesaplanamadƒ±: {e}")
            return 60  # Varsayƒ±lan 60 saniye

    @staticmethod
    def __split_wav_audio_by_size(
            audio_path: Path,
            max_chunk_size_mb: float,
            output_dir: Path = None
    ) -> list[Path]:
        """WAV ses dosyasƒ±nƒ± belirtilen boyuta g√∂re b√∂ler."""
        if output_dir is None:
            output_dir = audio_path.parent / f"{audio_path.stem}_chunks"

        output_dir.mkdir(parents=True, exist_ok=True)

        # Hedef boyuta g√∂re chunk s√ºresini hesapla
        chunk_duration_seconds = AudioTranscriber.__calculate_chunk_duration_for_size(
            audio_path, 
            max_chunk_size_mb
        )

        with wave.open(str(audio_path), 'rb') as wav_file:
            n_channels = wav_file.getnchannels()
            sampwidth = wav_file.getsampwidth()
            framerate = wav_file.getframerate()
            n_frames = wav_file.getnframes()

            frames_per_chunk = int(framerate * chunk_duration_seconds)

            chunks = []
            chunk_number = 1
            frames_read = 0

            while frames_read < n_frames:
                frames_to_read = min(frames_per_chunk, n_frames - frames_read)
                audio_data = wav_file.readframes(frames_to_read)

                chunk_filename = output_dir / f"{audio_path.stem}_part_{chunk_number:03d}.wav"

                with wave.open(str(chunk_filename), 'wb') as chunk_file:
                    chunk_file.setnchannels(n_channels)
                    chunk_file.setsampwidth(sampwidth)
                    chunk_file.setframerate(framerate)
                    chunk_file.writeframes(audio_data)

                # Chunk bilgilerini g√∂ster
                chunk_size = AudioTranscriber.__get_file_size_mb(chunk_filename)
                duration = frames_to_read / framerate
                print(f"‚úÇÔ∏è Par√ßa {chunk_number}: {chunk_filename.name} ({chunk_size:.2f}MB, {duration:.2f}s)")

                chunks.append(chunk_filename)
                frames_read += frames_to_read
                chunk_number += 1

        return chunks

    @staticmethod
    def __convert_to_wav(input_path: Path) -> Path:
        """FFmpeg kullanarak ses dosyasƒ±nƒ± WAV formatƒ±na √ßevirir."""
        import subprocess

        output_path = input_path.with_suffix('.wav')

        try:
            print(f"üîÑ WAV'a √ßevriliyor: {input_path.name}")
            subprocess.run([
                'ffmpeg', '-i', str(input_path),
                '-acodec', 'pcm_s16le',
                '-ar', '16000',  # 16kHz
                '-ac', '1',  # Mono
                str(output_path),
                '-y'
            ],
                check=True,
                stdout=subprocess.DEVNULL,
                stderr=subprocess.PIPE,
                text=True
            )
            
            file_size = AudioTranscriber.__get_file_size_mb(output_path)
            print(f"‚úÖ D√∂n√º≈üt√ºrme tamamlandƒ±: {file_size:.2f}MB")
            return output_path
            
        except FileNotFoundError:
            raise Exception("‚ùå FFmpeg bulunamadƒ±. WAV olmayan dosyalar i√ßin FFmpeg gereklidir.")
        except subprocess.CalledProcessError as e:
            raise Exception(f"‚ùå D√∂n√º≈üt√ºrme hatasƒ±: {e}")

    @staticmethod
    def __cleanup_temp_files(files: list[Path]):
        """Ge√ßici dosyalarƒ± temizler."""
        for file in files:
            if file.exists():
                try:
                    file.unlink()
                except:
                    pass

        # Bo≈ü dizinleri de temizle
        for file in files:
            if file.parent.exists() and not any(file.parent.iterdir()):
                try:
                    file.parent.rmdir()
                except:
                    pass

    def __transcribe_chunk_with_retry(
        self, 
        chunk_path: Path, 
        prompt: str,
        max_retries: int = 3
    ) -> str:
        """Tek bir par√ßayƒ± retry mekanizmasƒ± ile transkrip eder"""
        
        for attempt in range(max_retries):
            try:
                # Dosya boyutunu kontrol et
                file_size = self.__get_file_size_mb(chunk_path)
                if file_size > self.max_chunk_size_mb:
                    raise Exception(
                        f"‚ö†Ô∏è Par√ßa √ßok b√ºy√ºk ({file_size:.2f}MB). "
                        f"Max {self.max_chunk_size_mb}MB olmalƒ±."
                    )
                
                base64_audio = self.__encode_audio_to_base64(chunk_path)
                
                message = HumanMessage(
                    content=[
                        {"type": "text", "text": prompt},
                        {
                            "type": "input_audio",
                            "input_audio": {
                                "data": base64_audio,
                                "format": "wav",
                            },
                        },
                    ]
                )

                response = self.llm.invoke([message])
                return response.content
                
            except APIError as e:
                if e.status_code == 500:
                    wait_time = (attempt + 1) * 10
                    print(f"üî¥ HTTP 500 hatasƒ± (Deneme {attempt + 1}/{max_retries}). {wait_time}s bekleniyor...")
                    
                    if attempt < max_retries - 1:
                        time.sleep(wait_time)
                    else:
                        raise Exception(
                            f"‚ùå {max_retries} denemeden sonra ba≈üarƒ±sƒ±z. "
                            f"max_chunk_size_mb deƒüerini azaltmayƒ± deneyin (≈üu an: {self.max_chunk_size_mb}MB)."
                        )
                else:
                    raise
                    
            except Exception as e:
                if attempt < max_retries - 1:
                    print(f"‚ö†Ô∏è Hata: {e}. Yeniden deneniyor...")
                    time.sleep(5)
                else:
                    raise

    def transcribe(
        self,
        audio_path: Path,
        prompt: str = "Please transcribe this audio file accurately. Only return the transcription, no additional comments."
    ):
        """
        Ses dosyasƒ±nƒ± transkrip eder. Gerekirse otomatik olarak boyuta g√∂re b√∂ler.
        """
        audio_path = Path(audio_path)
        temp_files = []
        wav_file = None

        try:
            # WAV formatƒ±na √ßevir (gerekirse)
            if audio_path.suffix.lower() != '.wav':
                wav_file = self.__convert_to_wav(audio_path)
                temp_files.append(wav_file)
            else:
                wav_file = audio_path

            # Dosya bilgilerini g√∂ster
            duration = self.__get_wav_duration(wav_file)
            file_size = self.__get_file_size_mb(wav_file)
            print(f"üìä Dosya: {duration:.2f}s, {file_size:.2f}MB")

            # Dosya boyutu kontrol√º
            if self.auto_split and file_size > self.max_chunk_size_mb:
                print(f"üì¶ Dosya b√ºy√ºk ({file_size:.2f}MB > {self.max_chunk_size_mb}MB)")
                print(f"üì¶ Max {self.max_chunk_size_mb}MB par√ßalara b√∂l√ºn√ºyor...")

                chunks = self.__split_wav_audio_by_size(
                    wav_file,
                    self.max_chunk_size_mb
                )
                temp_files.extend(chunks)

                # Her par√ßayƒ± transkrip et
                full_transcription = []
                for i, chunk_path in enumerate(chunks, 1):
                    chunk_size = self.__get_file_size_mb(chunk_path)
                    print(f"üéôÔ∏è Par√ßa {i}/{len(chunks)} transkrip ediliyor ({chunk_size:.2f}MB)...")
                    
                    chunk_text = self.__transcribe_chunk_with_retry(
                        chunk_path, 
                        prompt,
                        max_retries=3
                    )
                    
                    full_transcription.append(chunk_text)
                    print(f"‚úÖ Par√ßa {i} tamamlandƒ± ({len(chunk_text)} karakter)")
                    
                    # API'ye nazik ol
                    if i < len(chunks):
                        time.sleep(2)

                result = "\n\n".join(full_transcription)
                print(f"üéâ T√ºm transkripsiyon tamamlandƒ±! ({len(result)} karakter)")
                
            else:
                # Dosya k√º√ß√ºk - direkt transkrip et
                print(f"üéôÔ∏è Transkripsiyon ba≈ülƒ±yor...")
                result = self.__transcribe_chunk_with_retry(
                    wav_file,
                    prompt,
                    max_retries=3
                )
                print(f"‚úÖ Tamamlandƒ±! ({len(result)} karakter)")

            return result

        finally:
            # Ge√ßici dosyalarƒ± temizle
            if temp_files:
                print(f"üßπ Ge√ßici dosyalar temizleniyor...")
                self.__cleanup_temp_files(temp_files)

In [10]:
transcriber = AudioTranscriber()

In [11]:
transcriber.transcribe(
    result.file_dir
)

üîÑ WAV'a √ßevriliyor: step-by-step-how-to-get-freelance-clients-from-linkedin.mp3
‚úÖ D√∂n√º≈üt√ºrme tamamlandƒ±: 16.15MB
üìä Dosya: 529.28s, 16.15MB
üì¶ Dosya b√ºy√ºk (16.15MB > 9.5MB)
üì¶ Max 9.5MB par√ßalara b√∂l√ºn√ºyor...
üìê Hesaplama: 16.15MB = 529.28s
üìê Hedef: 9.5MB ‚Üí ~264s par√ßalar
‚úÇÔ∏è Par√ßa 1: step-by-step-how-to-get-freelance-clients-from-linkedin_part_001.wav (8.06MB, 264.00s)
‚úÇÔ∏è Par√ßa 2: step-by-step-how-to-get-freelance-clients-from-linkedin_part_002.wav (8.06MB, 264.00s)
‚úÇÔ∏è Par√ßa 3: step-by-step-how-to-get-freelance-clients-from-linkedin_part_003.wav (0.04MB, 1.28s)
üéôÔ∏è Par√ßa 1/3 transkrip ediliyor (8.06MB)...
‚úÖ Par√ßa 1 tamamlandƒ± (5643 karakter)
üéôÔ∏è Par√ßa 2/3 transkrip ediliyor (8.06MB)...
‚úÖ Par√ßa 2 tamamlandƒ± (5528 karakter)
üéôÔ∏è Par√ßa 3/3 transkrip ediliyor (0.04MB)...
‚úÖ Par√ßa 3 tamamlandƒ± (11 karakter)
üéâ T√ºm transkripsiyon tamamlandƒ±! (11186 karakter)
üßπ Ge√ßici dosyalar temizleniyor...


'All right, right now it\'s becoming very clear that there is one social media channel that you really can\'t afford to not be on and put some time and effort in if you want to have a successful freelance writing business. And in today\'s video, we\'re going to talk about what that is, but then we\'re also going to share five strategies for how you can make sure you are leveraging your time properly, giving yourself the best chance of success, which means you\'re building relationships and actually getting clients on this platform. So if that sounds good, then keep watching today\'s video. It\'s for you.\n\nAll right, so what is that one platform that freelance writers can\'t afford to not be on? You might have guessed it already. It is LinkedIn. Why? Because that is the platform where people are going to do business. That is the one social media platform that is business focused, where people are looking for thought leadership. They\'re looking for ideas. They\'re looking for other pe