In [None]:
from langchain_community.document_loaders import YoutubeLoader

YT_URL="https://www.youtube.com/watch?v=zCoAufNclKI" #"https://www.youtube.com/watch?v=1xdV2j02uW4"

loader = YoutubeLoader.from_youtube_url(
    YT_URL, language=["en", "en-GB"], add_video_info=True,
)

transcript = loader.load()

In [None]:
len(transcript)

In [None]:
import os

DRIVER = os.environ["POSTGRES_DRIVER"] = "psycopg"
HOST = os.environ["POSTGRES_HOST"]="localhost"
PORT = os.environ["POSTGRES_PORT"]="5432"
DB = os.environ["POSTGRES_DATABASE"] = "quizzes"
USER = os.environ["POSTGRES_USER"] = "postgres"
PWD = os.environ["POSTGRES_PASSWORD"] = "pwd"

In [None]:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
from langchain_openai import ChatOpenAI
from langchain.chains.qa_generation.base import QAGenerationChain

from json import JSONDecodeError

import dotenv
dotenv.load_dotenv(dotenv.find_dotenv(), override=True)

import itertools
import uuid
import sys
from pathlib import Path
    
sys.path.append("../")
sys.path.append("../../")
sys.path.append(Path.cwd())

def chunk_transcript(doc: Document) -> list[Document]:
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=len(doc.page_content) // 7,
        chunk_overlap=50,
        length_function=len,
        add_start_index=True,
    )

    chunks = text_splitter.split_documents([doc])

    # add end_index
    for chunk in chunks:
        chunk.metadata["end_index"] = chunk.metadata["start_index"] + len(
            chunk.page_content
        )

    return chunks

def get_qa_from_chunk(
    chunk: Document,
    qa_generator_chain: QAGenerationChain,
) -> list[dict]:
    try:
        # return list of qa pairs
        qa_pairs = qa_generator_chain.run(chunk.page_content)

        # attach chunk metadata to qa_pair
        for qa_pair in qa_pairs:
            qa_pair["metadata"] = dict(**chunk.metadata)
            qa_pair["metadata"].update(
                {"id": str(uuid.uuid4()), "context": chunk.page_content}
            )

        return qa_pairs

    except JSONDecodeError:
        return [-1]

def generate_qa_from_transcript(transcript: Document) -> list[dict[str, str]]:

    chunks = chunk_transcript(transcript)
    
    llm = ChatOpenAI(temperature=0, model="gpt-4o-mini")
    qa_chain = QAGenerationChain.from_llm(llm, prompt=QA_GENERATION_PROMPT)

    qa_pairs = [get_qa_from_chunk(chunk, qa_chain) for chunk in chunks]
    qa_pairs = list(itertools.chain.from_iterable(qa_pairs))

    return qa_pairs

#data = generate_qa_from_transcript(transcript[0])

In [None]:
from backend.quiz_generation.generator import agenerate_quiz

import os
import dotenv
dotenv.load_dotenv(dotenv.find_dotenv(), override=True)

col_metadata, qa_ids = await agenerate_quiz("my_xyz_quiz",
                                YT_URL, 
                                "userid_0",
                                {"OPENAI_API_KEY": os.environ.get("OPENAI_API_KEY")})

In [None]:
qa_ids

In [None]:
qa_pair

In [None]:
from langchain_postgres import PGVector
from langchain_postgres.vectorstores import PGVector

# See docker command above to launch a postgres instance with pgvector enabled.
connection_string = "postgresql+psycopg://postgres:pwd@localhost:5432/quizzes"  # Uses psycopg3!
collection_name = "my_docs"

import datetime as dt

from langchain_core.embeddings import FakeEmbeddings
embeddings = FakeEmbeddings(size=1)

vector_store = PGVector(
    embeddings=embeddings,
    collection_name=collection_name,
    connection=connection_string,
    use_jsonb=True,
    collection_metadata={"date_created": dt.datetime.now(dt.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'), 'num_tries': 1, 'acc':0.2}
)

In [None]:
vector_store = PGVector(
    embeddings=embeddings,
    collection_name=collection_name+"+",
    connection=connection_string,
    use_jsonb=True,
    collection_metadata={"date_created": dt.datetime.now(dt.UTC).strftime('%Y-%m-%dT%H:%M:%SZ'), 'num_tries': 1, 'acc':0.2}
)

In [None]:
vector_store.uui

In [None]:
docs = [Document(page_content="hello", metadata={'a':1, 'id':1}),
        Document(page_content="hello2", metadata={'a':2, 'id':2})]
vector_store.add_documents(docs,
                          ids=[doc.metadata["id"] for doc in docs])

In [None]:
from sqlalchemy import create_engine, MetaData, Table, select

TABLE_COLLECTION = "langchain_pg_collection"
TABLE_DOCS = "langchain_pg_embedding"

def list_collections() -> list[str]:
    # Create an engine
    engine = create_engine(connection_string)

    # Reflect the specific table
    metadata = MetaData()
    table = Table(TABLE_COLLECTION, metadata, autoload_with=engine)

    # Query the column
    query = select(table.c["name"])
    with engine.connect() as connection:
        results = connection.execute(query).fetchall()

    return results

def get_by_ids(ids: list[str]) -> list[str]:
    # Create an engine
    engine = create_engine(connection_string)

    # Reflect the specific table
    metadata = MetaData()
    table = Table(TABLE_DOCS, metadata, autoload_with=engine)

    # Query the column
    query = select(table).where(table.c.id.in_(ids))
    with engine.connect() as connection:
        results = connection.execute(query).fetchall()

    return results

def get_all_by_collection_id(engine:Engine, collection_id: str):

    # Reflect the specific table
    table = Table(TABLE_DOCS, MetaData(), autoload_with=engine)

    # Query the column
    query = select(table).where(table.c.collection_id == collection_id)
    with engine.connect() as connection:
        results = connection.execute(query).fetchall()

    return results

list_collections(), get_by_ids(["aad30221-cdc1-4add-86db-f4067ee9d8c7"]), get_all_by_collection_id(engine, "994272b7-354d-4c98-9c92-8d4b23b60e62")

In [None]:
from backend.commons.db import SessionLocal

def get_collection_id_by_name(collection_name: str) -> str:
    """Fetch the collection id for the given name"""

    with SessionLocal() as session:
        table = Table(TABLE_COLLECTION, MetaData(), autoload_with=session.bind)
        query = select(table.c.uuid).where(table.c.name == collection_name)
        result = session.execute(query).fetchone()

    if result:
        return result[0]
    else:
        return None

get_collection_id_by_name("userid_userid_0_quizname_my_quiz")

In [None]:
from sqlalchemy.engine import Engine
from sqlalchemy import create_engine, MetaData, Table, update, select

def get_collection_metadata(engine: Engine, collection_id: str) -> None:
    # Reflect the specific table
    table = Table(TABLE_COLLECTION,  MetaData(), autoload_with=engine)

    # Construct the update statement
    query = (
            select(table.c.cmetadata)
            .where(table.c.uuid == collection_id)
        )


    # Execute the update query
    with engine.connect() as connection:
        result = connection.execute(query).fetchone()
    return result
        
def update_collection_metadata(engine: Engine, collection_id: str, new_metadata: dict) -> None:
    # Reflect the specific table
    metadata = MetaData()
    table = Table(TABLE_COLLECTION, metadata, autoload_with=engine)

    # Construct the update statement
    stmt = (
        update(table)
        .where(table.c.uuid == collection_id)
        .values(cmetadata=new_metadata)
    )

    # Execute the update query
    with engine.connect() as connection:
        connection.execute(stmt)
        connection.commit()

update_collection_metadata(engine, "994272b7-354d-4c98-9c92-8d4b23b60e62", {'afffasdf': 123})

In [None]:
get_collection_metadata(engine, "994272b7-354d-4c98-9c92-8d4b23b60e62")

In [None]:
engine = create_engine(connection_string)

In [None]:
from pydantic import BaseModel, Field, HttpUrl, field_validator
import re


class QuizRequest(BaseModel):
    quiz_name: str = Field(min_length=3, description="Name of quiz")
    user_id: str = Field(min_length=1, description="User id")
    api_keys: dict[str, str] = Field(description="Dictionary of API keys.")
    youtube_url: HttpUrl

    @field_validator("youtube_url")
    @classmethod
    def validate_youtube_url(cls, value: HttpUrl) -> HttpUrl:

        youtube_regex = (
            r"(https?://)?(www\.)?"
            r"(youtube|youtu|youtube-nocookie)\.(com|be)/"
            r"(watch\?v=|embed/|v/|.+\?v=)?([^&=%\?]{11})"
        )

        youtube_pattern = re.compile(youtube_regex)

        if not youtube_pattern.match(str(value)):
            raise ValueError("Invalid YouTube video URL")

        return value


q = QuizRequest(quiz_name="abc", user_id="1", api_keys={'a':'b'}, youtube_url='http://www.youtube.com/watch?v=ruauz315oms')

In [None]:
quiz_data_dict = q.model_dump()
quiz_data_dict["youtube_url"] = str(quiz_data_dict["youtube_url"])

In [None]:
quiz_data_dict