In [44]:
from langchain_community.document_loaders import YoutubeLoader
from langchain_community.document_loaders.youtube import TranscriptFormat

In [45]:
loader = YoutubeLoader.from_youtube_url(
    "https://www.youtube.com/watch?v=QsYGlZkevEg", **{"add_video_info":True, "transcript_format":TranscriptFormat.LINES}
)

In [46]:
transcripts = loader.load()

In [47]:
transcripts

[Document(page_content='LADIES AND GENTLEMEN, PEDRO', metadata={'start': 0.467, 'duration': 21.487}),
 Document(page_content='PASCAL!', metadata={'start': 3.57, 'duration': 20.453}),
 Document(page_content='[ CHEERS AND APPLAUSE ]', metadata={'start': 16.783, 'duration': 7.807}),
 Document(page_content='>> THANK YOU, THANK YOU.', metadata={'start': 22.021, 'duration': 4.671}),
 Document(page_content='THANK YOU VERY MUCH.', metadata={'start': 24.09, 'duration': 6.473}),
 Document(page_content="I'M SO EXCITED TO BE HERE.", metadata={'start': 24.657, 'duration': 9.109}),
 Document(page_content='THANK YOU.', metadata={'start': 26.759, 'duration': 10.944}),
 Document(page_content='I SPENT THE LAST YEAR SHOOTING A', metadata={'start': 30.63, 'duration': 7.34}),
 Document(page_content='SHOW CALLED "THE LAST OF US" ON', metadata={'start': 33.833, 'duration': 6.64}),
 Document(page_content='HBO.', metadata={'start': 37.77, 'duration': 5.005}),
 Document(page_content='FOR SOME HBO SHOES, YOU GET

In [8]:
from youtube_transcript_api import YouTubeTranscriptApi

In [20]:
transcript = YouTubeTranscriptApi.get_transcript(video_id=str('QsYGlZkevEg'),languages=['en'])

In [28]:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id=str('QsYGlZkevEg'))

In [29]:
transcript=transcript_list.find_transcript(["en"])

In [51]:
# transcript_peices
transcript_peices = transcript.fetch()
transcript_peices[0]['start']

0.467

In [1]:
from langchain_community.document_loaders.base import BaseLoader
from langchain_core.documents import Document
from typing import Any, Dict, List, Optional, Sequence, Union
from enum import Enum
from urllib.parse import parse_qs, urlparse


In [2]:

class TranscriptFormat(Enum):
    """Transcript format."""

    TEXT = "text"
    LINES = "lines"


In [3]:
ALLOWED_SCHEMAS = {"http", "https"}
ALLOWED_NETLOCK = {
    "youtu.be",
    "m.youtube.com",
    "youtube.com",
    "www.youtube.com",
    "www.youtube-nocookie.com",
    "vid.plus",
}

In [4]:


def _parse_video_id(url: str) -> Optional[str]:
    """Parse a youtube url and return the video id if valid, otherwise None."""
    parsed_url = urlparse(url)

    if parsed_url.scheme not in ALLOWED_SCHEMAS:
        return None

    if parsed_url.netloc not in ALLOWED_NETLOCK:
        return None

    path = parsed_url.path

    if path.endswith("/watch"):
        query = parsed_url.query
        parsed_query = parse_qs(query)
        if "v" in parsed_query:
            ids = parsed_query["v"]
            video_id = ids if isinstance(ids, str) else ids[0]
        else:
            return None
    else:
        path = parsed_url.path.lstrip("/")
        video_id = path.split("/")[-1]

    if len(video_id) != 11:  # Video IDs are 11 characters long
        return None

    return video_id


In [16]:
class YoutubeLoader(BaseLoader):
    """Load `YouTube` transcripts."""

    def __init__(
        self,
        video_id: str,
        add_video_info: bool = False,
        language: Union[str, Sequence[str]] = "en",
        translation: Optional[str] = None,
        transcript_format: TranscriptFormat = TranscriptFormat.TEXT,
        continue_on_failure: bool = False,
        start_time : float = None,
        end_time : float = None,
        
    ):
        """Initialize with YouTube video ID."""
        self.video_id = video_id
        self.add_video_info = add_video_info
        self.language = language
        if isinstance(language, str):
            self.language = [language]
        else:
            self.language = language
        self.translation = translation
        self.transcript_format = transcript_format
        self.continue_on_failure = continue_on_failure
        self.start_time = start_time
        self.end_time = end_time

    @staticmethod
    def extract_video_id(youtube_url: str) -> str:
        """Extract video id from common YT urls."""
        video_id = _parse_video_id(youtube_url)
        if not video_id:
            raise ValueError(
                f"Could not determine the video ID for the URL {youtube_url}"
            )
        return video_id

    # @classmethod
    # def from_youtube_url(cls, youtube_url: str, **kwargs: Any) -> YoutubeLoader:
    #     """Given youtube URL, load video."""
    #     video_id = cls.extract_video_id(youtube_url)
    #     return cls(video_id, **kwargs)


    def filter_dicts_by_time_stamp(self, list_of_dicts, start=None, end=None):
    # Define the filtering function based on the provided min and/or max values
        def is_within_range(d):
            if start is not None and d['start'] < start:
                return False
            if end is not None and d['start'] > end:
                return False
            return True

        # Apply the filtering function to the list
        return [d for d in list_of_dicts if is_within_range(d)]
    
    def load(self) -> List[Document]:
        """Load documents."""
        try:
            from youtube_transcript_api import (
                NoTranscriptFound,
                TranscriptsDisabled,
                YouTubeTranscriptApi,
            )
        except ImportError:
            raise ImportError(
                "Could not import youtube_transcript_api python package. "
                "Please install it with `pip install youtube-transcript-api`."
            )

        metadata = {"source": self.video_id}

        if self.add_video_info:
            # Get more video meta info
            # Such as title, description, thumbnail url, publish_date
            video_info = self._get_video_info()
            metadata.update(video_info)

        try:
            transcript_list = YouTubeTranscriptApi.list_transcripts(self.video_id)
        except TranscriptsDisabled:
            return []

        try:
            transcript = transcript_list.find_transcript(self.language)
        except NoTranscriptFound:
            transcript = transcript_list.find_transcript(["en"])

        if self.translation is not None:
            transcript = transcript.translate(self.translation)

        transcript_pieces = transcript.fetch()
        filetred_transcrpit_peices =[]
        
        # Time Stamp Retrieval

        filetred_transcrpit_peices = self.filter_dicts_by_time_stamp(list_of_dicts=transcript_pieces, start=self.start_time, end=self.end_time)

        if self.transcript_format == TranscriptFormat.TEXT:
            transcript = " ".join([t["text"].strip(" ") for t in filetred_transcrpit_peices])
            return [Document(page_content=transcript, metadata=metadata)]
        elif self.transcript_format == TranscriptFormat.LINES:
            return [
                Document(
                    page_content=t["text"].strip(" "),
                    metadata=dict((key, t[key]) for key in t if key != "text"),
                )
                for t in filetred_transcrpit_peices
            ]
        else:
            raise ValueError("Unknown transcript format.")

    def _get_video_info(self) -> dict:
        """Get important video information.

        Components are:
            - title
            - description
            - thumbnail url,
            - publish_date
            - channel_author
            - and more.
        """
        try:
            from pytube import YouTube

        except ImportError:
            raise ImportError(
                "Could not import pytube python package. "
                "Please install it with `pip install pytube`."
            )
        yt = YouTube(f"https://www.youtube.com/watch?v={self.video_id}")
        video_info = {
            "title": yt.title or "Unknown",
            "description": yt.description or "Unknown",
            "view_count": yt.views or 0,
            "thumbnail_url": yt.thumbnail_url or "Unknown",
            "publish_date": yt.publish_date.strftime("%Y-%m-%d %H:%M:%S")
            if yt.publish_date
            else "Unknown",
            "length": yt.length or 0,
            "author": yt.author or "Unknown",
        }
        return video_info



In [26]:
fileter_loader = YoutubeLoader(
    "QsYGlZkevEg", **{"add_video_info":True, "start_time":3, "end_time": 50,"transcript_format":TranscriptFormat.LINES}
)

In [27]:
transcripts = fileter_loader.load()

In [28]:
transcripts

[Document(page_content='PASCAL!', metadata={'start': 3.57, 'duration': 20.453}),
 Document(page_content='[ CHEERS AND APPLAUSE ]', metadata={'start': 16.783, 'duration': 7.807}),
 Document(page_content='>> THANK YOU, THANK YOU.', metadata={'start': 22.021, 'duration': 4.671}),
 Document(page_content='THANK YOU VERY MUCH.', metadata={'start': 24.09, 'duration': 6.473}),
 Document(page_content="I'M SO EXCITED TO BE HERE.", metadata={'start': 24.657, 'duration': 9.109}),
 Document(page_content='THANK YOU.', metadata={'start': 26.759, 'duration': 10.944}),
 Document(page_content='I SPENT THE LAST YEAR SHOOTING A', metadata={'start': 30.63, 'duration': 7.34}),
 Document(page_content='SHOW CALLED "THE LAST OF US" ON', metadata={'start': 33.833, 'duration': 6.64}),
 Document(page_content='HBO.', metadata={'start': 37.77, 'duration': 5.005}),
 Document(page_content='FOR SOME HBO SHOES, YOU GET TO', metadata={'start': 38.037, 'duration': 5.939}),
 Document(page_content='SHOOT IN A FIVE STAR ITA

In [60]:
def filter_dicts_by_a(list_of_dicts, min_value=None, max_value=None):
    # Define the filtering function based on the provided min and/or max values
    def is_within_range(d):
        if min_value is not None and d['a'] < min_value:
            return False
        if max_value is not None and d['a'] > max_value:
            return False
        return True

    # Apply the filtering function to the list
    return [d for d in list_of_dicts if is_within_range(d)]

# Sample list of dictionaries
list_of_dicts = [
    {'a': 1, 'b': 2},
    {'a': 5, 'b': 3},
    {'a': 8, 'b': 4},
    {'a': 10, 'b': 5}
]

# Examples of calling the function with different conditions

# Case 1: Only min_value is provided
filtered_list_min = filter_dicts_by_a(list_of_dicts, min_value=6)
print(f"Filtered list with min_value=6: {filtered_list_min}")

# Case 2: Only max_value is provided
filtered_list_max = filter_dicts_by_a(list_of_dicts, max_value=7)
print(f"Filtered list with max_value=7: {filtered_list_max}")

# Case 3: Both min_value and max_value are provided
filtered_list_range = filter_dicts_by_a(list_of_dicts, min_value=6)
print(f"Filtered list with min_value=6 and max_value=9: {filtered_list_range}")

# Case 4: Neither min_value nor max_value is provided
filtered_list_all = filter_dicts_by_a(list_of_dicts)
print(f"Filtered list with no min_value and no max_value: {filtered_list_all}")


Filtered list with min_value=6: [{'a': 8, 'b': 4}, {'a': 10, 'b': 5}]
Filtered list with max_value=7: [{'a': 1, 'b': 2}, {'a': 5, 'b': 3}]
Filtered list with min_value=6 and max_value=9: [{'a': 8, 'b': 4}, {'a': 10, 'b': 5}]
Filtered list with no min_value and no max_value: [{'a': 1, 'b': 2}, {'a': 5, 'b': 3}, {'a': 8, 'b': 4}, {'a': 10, 'b': 5}]
