In [3]:
# OpenAI API 사용
import os
from dotenv import load_dotenv
import pyperclip
import re
import pandas as pd
import sqlite3
from datetime import datetime, timedelta
from langchain_openai import ChatOpenAI
from langchain_community.document_loaders import YoutubeLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import load_summarize_chain
from langchain.schema import Document
from langchain.prompts import PromptTemplate
from googleapiclient.discovery import build

In [4]:
# 환경 변수 로드
load_dotenv()

True

In [1]:
import sqlite3
import re
import requests
import time
import concurrent.futures
from bs4 import BeautifulSoup
from urllib.parse import urlparse
from youtube_transcript_api import (
    YouTubeTranscriptApi,
    TranscriptsDisabled,
    NoTranscriptFound,
    VideoUnavailable,
    YouTubeRequestFailed
)
import math

class VideoDBBatchSummarizer:
    def __init__(self, db_path, model):
        self.db_path = db_path
        self.model = model

    def get_all_video_rows(self):
        conn = sqlite3.connect(self.db_path)
        cur = conn.cursor()
        cur.execute("SELECT * FROM Damoavideo")  # 필요시 칼럼명 명시
        rows = cur.fetchall()
        col_names = [desc[0] for desc in cur.description]
        cur.close()
        conn.close()
        return [dict(zip(col_names, row)) for row in rows]

    def extract_youtube_links_from_page(self, url):
        headers = {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/114.0.0.0 Safari/537.36"
            )
        }
        retries = 3
        delay = 2
        for attempt in range(retries):
            try:
                response = requests.get(url, headers=headers, timeout=10)
                response.raise_for_status()
                soup = BeautifulSoup(response.text, 'html.parser')
                youtube_links = []
                for iframe in soup.find_all("iframe"):
                    src = iframe.get("src")
                    if src and "youtube.com" in src:
                        match = re.search(r"(?:/embed/|/v/|\?v=)([a-zA-Z0-9_-]{11})", src)
                        if match:
                            video_id = match.group(1)
                            youtube_links.append(f"https://www.youtube.com/watch?v={video_id}")
                return youtube_links if youtube_links else ["[❌] 유튜브 영상 링크를 찾을 수 없습니다."]
            except Exception as e:
                if attempt < retries - 1:
                    time.sleep(delay * (2 ** attempt))
                    continue
                return [f"[❌] 오류 발생: {e}"]

    def get_youtube_transcript(self, video_url, languages=['ko', 'en'], preserve_formatting=False):
        try:
            if "v=" in video_url:
                video_id = video_url.split("v=")[-1].split("&")[0]
            else:
                video_id = video_url.strip().split("/")[-1]
            transcript = YouTubeTranscriptApi().get_transcript(
                video_id,
                languages=languages
            )
            return " ".join([snippet['text'] for snippet in transcript])
        except (TranscriptsDisabled, NoTranscriptFound, VideoUnavailable, YouTubeRequestFailed):
            return None
        except Exception as e:
            return None

    def summarize_text_llm(self, text, prompt_template=None):
        if prompt_template is None:
            prompt_template = "다음 유튜브 영상 자막을 한국어로 간결하게 요약해줘:\n\n{text}"
        prompt = prompt_template.format(text=text)
        try:
            return self.model.invoke(prompt).content
        except Exception:
            return None

    def is_summary_exist(self, page_url):
        conn = sqlite3.connect(self.db_path)
        cur = conn.cursor()
        cur.execute("SELECT 요약 FROM Damoavideo WHERE 링크 = ?", (page_url,))
        result = cur.fetchone()
        cur.close()
        conn.close()
        if result is None:
            return False
        summary = result[0]
        if summary is None:
            return False
        if isinstance(summary, float) and math.isnan(summary):
            return False
        if str(summary).strip().lower() == 'nan':
            return False
        return True

    def save_summary_to_db(self, page_url, summary):
        if self.is_summary_exist(page_url):
            return
        conn = sqlite3.connect(self.db_path)
        cur = conn.cursor()
        cur.execute("UPDATE Damoavideo SET 요약 = ? WHERE 링크 = ?", (summary, page_url))
        conn.commit()
        cur.close()
        conn.close()

    def remove_video_from_db(self, page_url):
        conn = sqlite3.connect(self.db_path)
        cur = conn.cursor()
        cur.execute("DELETE FROM Damoavideo WHERE 링크 = ?", (page_url,))
        conn.commit()
        cur.close()
        conn.close()

    def process_video(self, rec):
        results = []
        page_url = rec.get("링크")  # 칼럼명에 따라 수정 필요
        youtube_links = self.extract_youtube_links_from_page(page_url)
        for y_url in youtube_links:
            transcript = self.get_youtube_transcript(y_url)
            if transcript:
                summary = self.summarize_text_llm(transcript)
                if summary:
                    self.save_summary_to_db(page_url, summary)
                results.append({
                    "youtube_url": y_url,
                    "transcript": transcript,
                    "summary": summary
                })
            else:
                self.remove_video_from_db(page_url)
        return {**rec, "summaries": results} if results else None

    def batch_summarize_all_videos(self):
        all_rows = self.get_all_video_rows()
        with concurrent.futures.ThreadPoolExecutor() as executor:
            results = list(executor.map(self.process_video, all_rows))
        return [r for r in results if r is not None]




In [5]:
from langchain.chat_models import init_chat_model
model = init_chat_model(model='gpt-4o-mini')
summarizer = VideoDBBatchSummarizer('data/moav.db', model)
result = summarizer.batch_summarize_all_videos()