In [1]:
#!pip install os re json time wave yt-dlp sqlite3 pinecone-client requests tiktoken subprocess tqdm datasets vosk langchain langchain-pinecone langchain-openai tabulate ipython

In [2]:
import os
import re
import json
import time
import wave
import yt_dlp
import sqlite3
import pinecone
import requests
import tiktoken
import subprocess

from uuid import uuid4
from typing import List
from tqdm.auto import tqdm
from datasets import load_dataset

from vosk import Model, KaldiRecognizer

from pinecone import Pinecone, ServerlessSpec

from langchain.chains import RetrievalQA
from langchain_pinecone import PineconeVectorStore
from langchain.agents import Tool, create_react_agent
from langchain_openai import OpenAIEmbeddings, ChatOpenAI, OpenAI
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains.conversation.memory import ConversationBufferWindowMemory


from langchain.output_parsers import ListOutputParser
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

from vosk import Model, KaldiRecognizer
from tabulate import tabulate
from IPython.display import Image, display

from dotenv import load_dotenv

In [3]:
# Load environment variables
load_dotenv()

# Set API Keys (ensure these are set in your .env file)
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')
langchain_api_key = os.getenv('LANGCHAIN_API_KEY')

# Ensure ffmpeg is in PATH
os.environ['PATH'] += os.pathsep + '/usr/local/bin'

In [4]:
def is_valid_youtube_url(url):
    """Validate if the given URL is a YouTube URL."""
    pattern = r'^(https?://)?(www\.)?(youtube\.com|youtu\.?be)/.+$'
    return re.match(pattern, url) is not None

def download_video(url):
    """Download the audio of the YouTube video as a .wav file."""
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'wav',
            'preferredquality': '192',
        }],
        'outtmpl': 'audio.%(ext)s',
        'ffmpeg_location': '/usr/local/bin'
    }
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            ydl.download([url])
        return 'audio.wav'
    except Exception as e:
        print(f"An error occurred during download: {e}")
        return None

def convert_audio(input_file, output_file):
    """Convert audio to the required format for transcription."""
    command = [
        '/usr/local/bin/ffmpeg',
        '-i', input_file,
        '-acodec', 'pcm_s16le',
        '-ac', '1',
        '-ar', '16000',
        '-y', output_file
    ]
    try:
        subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print(f"Audio converted successfully: {output_file}")
        return True
    except subprocess.CalledProcessError as e:
        print(f"Error converting audio: {e}")
        return False

def transcribe_audio(audio_file):
    """Transcribe audio using the Vosk model."""
    model_path = "model"
    if not os.path.exists(model_path):
        print("Speech recognition model not found. Please make sure you've downloaded it.")
        return None

    model = Model(model_path)
    recognizer = KaldiRecognizer(model, 16000)

    def is_valid_wave_file(wave_file):
        """Check if the wave file has the correct properties."""
        return (wave_file.getnchannels() == 1 and
                wave_file.getsampwidth() == 2 and
                wave_file.getcomptype() == "NONE")
    
    try:
        # Open the original file to check format
        with wave.open(audio_file, "rb") as wf:
            if not is_valid_wave_file(wf):
                print("Converting audio to the correct format...")
                converted_file = "converted_audio.wav"
                if not convert_audio(audio_file, converted_file):
                    return None
                audio_file = converted_file

        # Reopen the audio file (either original or converted)
        with wave.open(audio_file, "rb") as wf:
            results = []
            total_frames = wf.getnframes()
            with tqdm(total=total_frames, desc="Transcribing") as pbar:
                while True:
                    data = wf.readframes(4000)
                    if not data:
                        break
                    if recognizer.AcceptWaveform(data):
                        part_result = json.loads(recognizer.Result())
                        results.append(part_result.get('text', ''))
                    pbar.update(4000)

                # Final result
                part_result = json.loads(recognizer.FinalResult())
                results.append(part_result.get('text', ''))

            transcription = " ".join(results)
            if not transcription.strip():
                print("No transcription result obtained. Please check the audio file and model.")
            return transcription
    except Exception as e:
        print(f"An error occurred during transcription: {e}")
        return None

def get_youtube_video_info(url):
    """Extract YouTube video info and M3U8 URLs without downloading the video."""
    ydl_opts = {
        'quiet': True,
        'skip_download': True,
        'extract_flat': False,
        'force_generic_extractor': True,
        'no_warnings': True
    }
    try:
        with yt_dlp.YoutubeDL(ydl_opts) as ydl:
            info = ydl.extract_info(url, download=False)
            info.pop('description', None)

            # Extract M3U8 URLs from formats
            formats = info.get('formats', [])
            m3u8_urls = [fmt['url'] for fmt in formats if 'm3u8' in fmt.get('url', '')]
            info['m3u8_urls'] = m3u8_urls

        return info
    except Exception as e:
        print(f"An error occurred while extracting YouTube video info: {e}")
        return None

def display_youtube_info(info):
    """Display YouTube video metadata in a pretty table format using tabulate."""
    if not info:
        print("No YouTube video info to display.")
        return

    youtube_info = [
        ["Title", info.get("title", "N/A")],
        ["Uploader", info.get("uploader", "N/A")],
        ["Uploader ID", info.get("uploader_id", "N/A")],
        ["Upload Date", info.get("upload_date", "N/A")],
        ["Duration", info.get("duration", "N/A")],
        ["View Count", info.get("view_count", "N/A")],
        ["Like Count", info.get("like_count", "N/A")]
    ]

    thumbnail_url = info.get("thumbnail")
    if thumbnail_url:
        display(Image(url=thumbnail_url, width=580))
    
    print("\nYouTube Video Info:")
    print(tabulate(youtube_info, headers=["Field", "Value"], tablefmt="grid", stralign="left"))

def main():
    """Main function to handle the workflow."""
    while True:
        video_url = input("Enter the YouTube video URL: ")
        if is_valid_youtube_url(video_url):
            break
        else:
            print("Invalid YouTube URL. Please enter a valid URL.")

    print("Extracting YouTube video info...")
    youtube_info = get_youtube_video_info(video_url)
    if youtube_info:
        display_youtube_info(youtube_info)

    print("Downloading video...")
    audio_file = download_video(video_url)

    if audio_file and os.path.exists(audio_file):
        print("Transcribing audio...")
        transcription = transcribe_audio(audio_file)

        if transcription:
            print("\nTranscription:")
            #print(transcription)

            # Save transcription to a file
            with open('transcription.txt', 'w') as f:
                f.write(transcription)

            print("Transcription saved to 'transcription.txt'.")
        else:
            print("Transcription failed. Please check the error messages above.")
    else:
        print("Failed to download the video. Please check the URL and try again.")

if __name__ == "__main__":
    main()

Enter the YouTube video URL:  What is a qubit?


Invalid YouTube URL. Please enter a valid URL.


Enter the YouTube video URL:  https://www.youtube.com/watch?v=qQviI1d_hFA


Extracting YouTube video info...



YouTube Video Info:
+-------------+-------------------------------------------------------+
| Field       | Value                                                 |
| Title       | Michio Kaku: Quantum computing is the next revolution |
+-------------+-------------------------------------------------------+
| Uploader    | Big Think                                             |
+-------------+-------------------------------------------------------+
| Uploader ID | @bigthink                                             |
+-------------+-------------------------------------------------------+
| Upload Date | 20230818                                              |
+-------------+-------------------------------------------------------+
| Duration    | 677                                                   |
+-------------+-------------------------------------------------------+
| View Count  | 2140713                                               |
+-------------+----------------------------



[youtube] qQviI1d_hFA: Downloading ios player API JSON
[youtube] qQviI1d_hFA: Downloading player 5604538d
[youtube] qQviI1d_hFA: Downloading web player API JSON
[youtube] qQviI1d_hFA: Downloading m3u8 information
[info] qQviI1d_hFA: Downloading 1 format(s): 251
[download] Destination: audio.webm
[download] 100% of   10.70MiB in 00:00:00 at 20.26MiB/s    
[ExtractAudio] Destination: audio.wav
Deleting original file audio.webm (pass -k to keep)
Transcribing audio...


LOG (VoskAPI:ReadDataFiles():model.cc:213) Decoding params beam=10 max-active=3000 lattice-beam=2
LOG (VoskAPI:ReadDataFiles():model.cc:216) Silence phones 1:2:3:4:5:6:7:8:9:10
LOG (VoskAPI:RemoveOrphanNodes():nnet-nnet.cc:948) Removed 0 orphan nodes.
LOG (VoskAPI:RemoveOrphanComponents():nnet-nnet.cc:847) Removing 0 orphan components.
LOG (VoskAPI:ReadDataFiles():model.cc:248) Loading i-vector extractor from model/ivector/final.ie
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:183) Computing derived variables for iVector extractor
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:204) Done.
LOG (VoskAPI:ReadDataFiles():model.cc:282) Loading HCL and G from model/graph/HCLr.fst model/graph/Gr.fst
LOG (VoskAPI:ReadDataFiles():model.cc:303) Loading winfo model/graph/phones/word_boundary.int


Converting audio to the correct format...
Audio converted successfully: converted_audio.wav


Transcribing:   0%|          | 0/10836846 [00:00<?, ?it/s]


Transcription:
Transcription saved to 'transcription.txt'.


In [5]:
# Define the path to the transcription file
transcription_file = 'transcription.txt'

# Read the transcription text
with open(transcription_file, 'r') as file:
    transcription_text = file.read()

# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect('transcriptions.db')
cursor = conn.cursor()

# Create the transcriptions table if it does not exist
cursor.execute('''
    CREATE TABLE IF NOT EXISTS transcriptions (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        speaker TEXT,
        text TEXT,
        timestamp TEXT
    )
''')

# Insert the transcription text into the table
cursor.execute('''
    INSERT INTO transcriptions (speaker, text, timestamp)
    VALUES (?, ?, ?)
''', ('Transcript', transcription_text, '2024-07-15 10:00:00'))

# Commit the transaction and close the connection
conn.commit()
conn.close()

# Data has been successfully loaded into the database
print("Data has been successfully loaded into the database.")


Data has been successfully loaded into the database.


In [6]:
tiktoken.encoding_for_model('gpt-4o-mini')

<Encoding 'o200k_base'>

In [7]:
# Function to calculate the length of text in terms of tokens
def tiktoken_len(text):
    return len(text.split())

# Connect to the SQLite database
conn = sqlite3.connect('transcriptions.db')
cursor = conn.cursor()

# Query the data
cursor.execute('SELECT text FROM transcriptions WHERE id = 1')
transcription_text = cursor.fetchone()[0]

# Close the connection
conn.close()

# Initialize the text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=20,
    length_function=tiktoken_len
)

# Split the transcription text into chunks
chunks = text_splitter.split_text(transcription_text)[:3]

# Print the first 3 chunks
for i, chunk in enumerate(chunks, 1):
    print(f"Chunk {i}:")
    print(chunk)
    print()

Chunk 1:
we all know the digital computers changed virtually every aspect of our life well the arrival of quantum computers could be even more historic than that we're now and the initial stages of the next revolution we're talking about a new generation of computers the ultimate computer a computer that computer on adam the ultimate constituents of batter itself the question is who's involved in this race to perfect quantum computers and the answer is everyone all the big players are part of this race because if they're not silicon valley could become the next rust belt also anyone who's interested in security is interested in the quantum computers they can crack almost any code that is based on digital technology that's why the f b i the cia and all national governments are following this is very closely what their computers will change everything the economy how he saw problems the way we interact with the universe you name it won't to computers will be there i'm doctor or michio ka

In [8]:
tiktoken_len(chunks[0]), tiktoken_len(chunks[1]), tiktoken_len(chunks[2])

(500, 500, 430)

# Text embedding

In [9]:
from langchain_openai import OpenAIEmbeddings

In [10]:
model_name = 'text-embedding-3-small'

embed = OpenAIEmbeddings(
    model=model_name,
    openai_api_key=openai_api_key
)

# Indexing

In [11]:
from pinecone import Pinecone, ServerlessSpec

In [12]:
# configure client
pc = Pinecone(api_key=pinecone_api_key)
spec = ServerlessSpec(
    cloud="aws", region="us-east-1"
)
index_name = 'langchain-retrieval-augmentation'
existing_indexes = [
    index_info["name"] for index_info in pc.list_indexes()
]

# check if index already exists (it shouldn't if this is first time)
if index_name not in existing_indexes:
    # if does not exist, create index
    pc.create_index(
        index_name,
        dimension=1536,  # dimensionality of ada 002
        metric='dotproduct',
        spec=spec
    )
    # wait for index to be initialized
    while not pc.describe_index(index_name).status['ready']:
        time.sleep(1)

# connect to index
index = pc.Index(index_name)
time.sleep(1)
# view index stats
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 17838}},
 'total_vector_count': 17838}

# Data processing

In [13]:
batch_limit=50

# Initialize text splitter
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=500,
    chunk_overlap=20,
    length_function=len  # You can replace with your custom token length function if needed
)

# Connect to SQLite database
conn = sqlite3.connect('transcriptions.db')
cursor = conn.cursor()

# Fetch data from SQLite database
cursor.execute('SELECT id, speaker, text FROM transcriptions')
data = cursor.fetchall()

# Close the database connection
conn.close()

# Initialize lists for texts and metadatas
texts = []
metadatas = []

# Process each record fetched from the database
for i, record in enumerate(tqdm(data)):
    # Metadata fields for this record
    metadata = {
        #'index': str(record[0]),  # Assuming id is the first column
        'speaker': record[1],  # Replace with actual source if available
        'text': record[2],  # Example title based on id
    }

    # Split text into chunks
    record_texts = text_splitter.split_text(record[2])  # Assuming text is the third column

    # Create individual metadata dicts for each chunk
    record_metadatas = [{
        "chunk": j,
        "text": text,
        **metadata
    } for j, text in enumerate(record_texts)]

    # Append texts and metadatas to current batches
    texts.extend(record_texts)
    metadatas.extend(record_metadatas)

    # Check if batch limit is reached, then embed and upsert
    if len(texts) >= batch_limit:
        ids = [str(uuid4()) for _ in range(len(texts))]
        embeds = embed.embed_documents(texts)

        # Assuming `index` is where you want to upsert (not defined in the snippet)
        index.upsert(vectors=zip(ids, embeds, metadatas))

        # Clear lists after upserting
        texts = []
        metadatas = []

# Process any remaining texts in the lists
if len(texts) > 0:
    ids = [str(uuid4()) for _ in range(len(texts))]
    embeds = embed.embed_documents(texts)
    index.upsert(vectors=zip(ids, embeds, metadatas))

print("Data processing completed.")

  0%|          | 0/43 [00:00<?, ?it/s]

Data processing completed.


In [14]:
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'namespaces': {'': {'vector_count': 17838}},
 'total_vector_count': 17838}

# Initialize Pinecone Vector Store

In [15]:
from langchain_pinecone import PineconeVectorStore

In [16]:
# Load environment variables
load_dotenv()

# Set API Keys
pinecone_api_key = os.getenv('PINECONE_API_KEY')
openai_api_key = os.getenv('OPENAI_API_KEY')

# Initialize Pinecone client
pc = Pinecone(api_key=pinecone_api_key)

# Define the Pinecone index and embedding model
embed_model = OpenAIEmbeddings(model='text-embedding-3-small', openai_api_key=openai_api_key)

# Connect to the Pinecone index
index = pc.Index(index_name)

# Initialize the Pinecone vector store object (assuming no text_field parameter is required)
vectorstore = PineconeVectorStore(index, embed_model)

# Get user input for the query
query = input("Please enter your query: ")

# Perform similarity search
try:
    results = vectorstore.similarity_search(query, k=3)
except Exception as e:
    print(f"An error occurred during the similarity search: {e}")
    results = []

# Connect to SQLite database (or create it if it doesn't exist)
conn = sqlite3.connect('search_results.db')
cursor = conn.cursor()

# Drop the table if it exists
cursor.execute('DROP TABLE IF EXISTS search_results')

# Create the search_results table with the correct schema
cursor.execute('''
    CREATE TABLE IF NOT EXISTS search_results (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        query TEXT,
        document_id TEXT,
        score REAL
    )
''')

# Insert the search results into the table
for result in results:
    try:
        # Adjust according to the structure of `result`
        document_id = getattr(result, 'id', 'unknown')  # Default to 'unknown' if 'id' is missing
        score = getattr(result, 'score', 0.0)          # Default to 0.0 if 'score' is missing

        cursor.execute('''
            INSERT INTO search_results (query, document_id, score)
            VALUES (?, ?, ?)
        ''', (query, document_id, score))
    except Exception as e:
        print(f"An error occurred while inserting results into the database: {e}")

# Commit the transaction and close the connection
conn.commit()
conn.close()

# Retrieve and set memory

# completion llm
llm = ChatOpenAI(
    openai_api_key=openai_api_key,
    model_name='gpt-4o-mini',
    temperature=0.0
)

# conversational memory
conversational_memory = ConversationBufferWindowMemory(
    memory_key='chat_history',
    k=5,
    return_messages=True
    )

# retrieval qa chain
qa = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=vectorstore.as_retriever()
)
qa.invoke(query)

Please enter your query:  What is a qubit?


{'query': 'What is a qubit?',
 'result': 'A qubit, or quantum bit, is the basic unit of quantum information. Unlike a classical bit, which can be either 0 or 1, a qubit can exist in a superposition of both states simultaneously. This means that a qubit can represent multiple possibilities at once, allowing quantum computers to perform complex calculations more efficiently than classical computers. The state of a qubit can be described using quantum mechanics, and it can also be entangled with other qubits, leading to even more powerful computational capabilities.'}

In [34]:
import pandas as pd
from IPython.display import display
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
from sacrebleu import corpus_bleu

# Define sample queries and reference answers
queries = [
    "What is a qubit?",
    "Explain quantum entanglement.",
    "How does a quantum computer work?"
]

reference_answers = {
    "What is a qubit?": "A qubit is the basic unit of quantum information, analogous to a bit in classical computing, but capable of being in a superposition of states.",
    "Explain quantum entanglement.": "Quantum entanglement is a physical phenomenon where particles become correlated in such a way that the state of one particle instantly influences the state of the other, no matter the distance between them.",
    "How does a quantum computer work?": "A quantum computer uses qubits and quantum gates to perform computations that can solve certain problems more efficiently than classical computers by leveraging principles of quantum mechanics."
}

model_responses = {
    "What is a qubit?": "A qubit is a unit of quantum information, similar to a bit but with more capabilities.",
    "Explain quantum entanglement.": "Quantum entanglement is a phenomenon where particles are interconnected and the state of one affects the state of another.",
    "How does a quantum computer work?": "Quantum computers use qubits to perform complex calculations that classical computers struggle with."
}

# Scoring functions
def calculate_accuracy(reference_answers, model_responses):
    """
    Calculate accuracy based on exact matches between model responses and reference answers.
    """
    correct = 0
    total = len(reference_answers)
    for query, ref_answer in reference_answers.items():
        model_answer = model_responses.get(query, "")
        if model_answer.strip() == ref_answer.strip():
            correct += 1
    return (correct / total) * 100

def calculate_precision_recall_accuracy(reference_answers, model_responses):
    """
    Calculate precision, recall, and accuracy based on token-level comparison.
    """
    y_true = []
    y_pred = []

    for query, ref_answer in reference_answers.items():
        model_answer = model_responses.get(query, "")
        ref_tokens = set(ref_answer.lower().split())
        model_tokens = set(model_answer.lower().split())

        y_true.extend([1] * len(ref_tokens))
        y_pred.extend([1 if token in model_tokens else 0 for token in ref_tokens])

    precision = precision_score(y_true, y_pred, average='binary', zero_division=0)
    recall = recall_score(y_true, y_pred, average='binary', zero_division=0)
    accuracy = accuracy_score(y_true, y_pred)

    return (precision * 100, recall * 100, accuracy * 100)

def compute_metrics(predictions, references):
    """
    Compute precision, recall, F1 score, and token-based accuracy based on predictions and references.
    """
    all_pred_tokens = [token for pred in predictions for token in pred.split()]
    all_ref_tokens = [token for ref in references for token in ref.split()]

    pred_tokens = set(all_pred_tokens)
    ref_tokens = set(all_ref_tokens)

    precision = len(pred_tokens.intersection(ref_tokens)) / len(pred_tokens) if pred_tokens else 0
    recall = len(pred_tokens.intersection(ref_tokens)) / len(ref_tokens) if ref_tokens else 0

    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

    token_based_accuracy = len(pred_tokens.intersection(ref_tokens)) / len(ref_tokens) if ref_tokens else 0

    return {
        'precision': precision * 100,
        'recall': recall * 100,
        'f1_score': f1 * 100,
        'token_based_accuracy': token_based_accuracy * 100
    }

def compute_context_precision_recall(reference_answers, model_responses):
    """
    Compute context precision and recall based on the presence of relevant context words.
    """
    context_precision_scores = []
    context_recall_scores = []

    for query, ref_answer in reference_answers.items():
        model_answer = model_responses.get(query, "")
        ref_tokens = set(ref_answer.lower().split())
        model_tokens = set(model_answer.lower().split())

        if model_tokens:
            context_precision = len(ref_tokens.intersection(model_tokens)) / len(model_tokens)
        else:
            context_precision = 0
        
        if ref_tokens:
            context_recall = len(ref_tokens.intersection(model_tokens)) / len(ref_tokens)
        else:
            context_recall = 0
        
        context_precision_scores.append(context_precision)
        context_recall_scores.append(context_recall)

    avg_context_precision = np.mean(context_precision_scores) * 100
    avg_context_recall = np.mean(context_recall_scores) * 100

    return avg_context_precision, avg_context_recall

def compute_hallucination(reference_answers, model_responses):
    """
    Compute hallucination score based on the presence of irrelevant information.
    """
    hallucination_scores = []

    for query, ref_answer in reference_answers.items():
        model_answer = model_responses.get(query, "")
        ref_tokens = set(ref_answer.lower().split())
        model_tokens = set(model_answer.lower().split())

        hallucinated_tokens = model_tokens - ref_tokens
        hallucination_score = len(hallucinated_tokens) / len(model_tokens) if model_tokens else 0
        hallucination_scores.append(hallucination_score)

    avg_hallucination_score = np.mean(hallucination_scores) * 100
    return avg_hallucination_score

def compute_answer_correctness(reference_answers, model_responses):
    """
    Compute answer correctness based on exact matches between model responses and reference answers.
    """
    correct_count = 0
    total_count = len(reference_answers)

    for query, ref_answer in reference_answers.items():
        model_answer = model_responses.get(query, "")
        if model_answer.strip() == ref_answer.strip():
            correct_count += 1

    answer_correctness = (correct_count / total_count) * 100
    return answer_correctness

# Sample data for compute_metrics function
predictions = [
    "The qubit is the basic unit of quantum information.",
    "Quantum entanglement is a phenomenon where particles are interconnected and the state of one affects the state of another.",
    "Quantum computers use qubits to perform complex calculations that classical computers struggle with."
]
references = [
    "A qubit is the fundamental unit of quantum computing.",
    "Quantum entanglement is a physical phenomenon where particles become correlated in such a way that the state of one particle instantly influences the state of the other, no matter the distance between them.",
    "A quantum computer uses qubits and quantum gates to perform computations that can solve certain problems more efficiently than classical computers by leveraging principles of quantum mechanics."
]

# Calculate metrics
accuracy = calculate_accuracy(reference_answers, model_responses)
precision, recall, accuracy_token_based = calculate_precision_recall_accuracy(reference_answers, model_responses)
metrics = compute_metrics(predictions, references)
bleu_score = corpus_bleu(predictions, [references]).score
context_precision, context_recall = compute_context_precision_recall(reference_answers, model_responses)
hallucination_score = compute_hallucination(reference_answers, model_responses)
answer_correctness = compute_answer_correctness(reference_answers, model_responses)

# Create a DataFrame to display results
results = {
    "Metric": [
        "Accuracy", "Precision", "Recall", 
        "Accuracy (Token-Based)", "F1 Score", 
        "Token-Based Accuracy", "BLEU Score",
        "Context Precision", "Context Recall",
        "Hallucination Score", "Answer Correctness"
    ],
    "Score (%)": [
        accuracy,
        precision,
        recall,
        accuracy_token_based,
        metrics['f1_score'],
        metrics['token_based_accuracy'],
        bleu_score,
        context_precision,
        context_recall,
        hallucination_score,
        answer_correctness
    ]
}

df_results = pd.DataFrame(results)

# Display the results table using IPython.display with hidden index
display(df_results.style.hide(axis='index')
                      .set_table_attributes('style="font-size: 14px; text-align: center;"')
                      .set_caption("Evaluation Metrics"))

# Example error analysis
def error_analysis(reference_answers, model_responses):
    """
    Analyze and list the discrepancies between reference answers and model responses.
    """
    errors = []
    for query, ref_answer in reference_answers.items():
        model_answer = model_responses.get(query, "")
        if model_answer.strip() != ref_answer.strip():
            errors.append((query, ref_answer, model_answer))
    return errors

errors = error_analysis(reference_answers, model_responses)
print("\nError Analysis:")
for query, true_answer, pred_answer in errors:
    print(f"Query: {query}")
    print(f"True Answer: {true_answer}")
    print(f"Model Answer: {pred_answer}")
    print()


Metric,Score (%)
Accuracy,0.0
Precision,100.0
Recall,38.888889
Accuracy (Token-Based),38.888889
F1 Score,48.275862
Token-Based Accuracy,38.888889
BLEU Score,12.20107
Context Precision,66.170635
Context Recall,39.580247
Hallucination Score,33.829365



Error Analysis:
Query: What is a qubit?
True Answer: A qubit is the basic unit of quantum information, analogous to a bit in classical computing, but capable of being in a superposition of states.
Model Answer: A qubit is a unit of quantum information, similar to a bit but with more capabilities.

Query: Explain quantum entanglement.
True Answer: Quantum entanglement is a physical phenomenon where particles become correlated in such a way that the state of one particle instantly influences the state of the other, no matter the distance between them.
Model Answer: Quantum entanglement is a phenomenon where particles are interconnected and the state of one affects the state of another.

Query: How does a quantum computer work?
True Answer: A quantum computer uses qubits and quantum gates to perform computations that can solve certain problems more efficiently than classical computers by leveraging principles of quantum mechanics.
Model Answer: Quantum computers use qubits to perform com

# Multi querying

In [18]:
tools = [
    Tool(
        name='Knowledge Base',
        func=qa.invoke,
        description=(
            'use this tool when answering general knowledge queries to get '
            'more information about the topic'
        )
    )
]
from langchain.agents import create_react_agent
from langchain_core.prompts import PromptTemplate

# Define the prompt template (if not already defined)
template = '''Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input question you must answer
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}'''

# Create the prompt template
prompt = PromptTemplate.from_template(template)

# Create the React agent
agent = create_react_agent(
    llm=llm,
    tools=tools,
    prompt=prompt
)

# Optionally, you might use `AgentExecutor` if you need to manage interactions
from langchain.agents import AgentExecutor

agent_executor = AgentExecutor(agent=agent, tools=tools)

# Asking questions concerning the video

In [19]:
response = agent_executor.invoke({"input": query})
print(response)

{'input': 'How does a quantum computer work?', 'output': 'A quantum computer works by utilizing the principles of quantum mechanics, which govern the behavior of particles at the atomic and subatomic levels. Unlike classical computers that use bits as the smallest unit of data (which can be either 0 or 1), quantum computers use quantum bits, or qubits. Qubits can exist in a state of 0, 1, or both simultaneously due to a property called superposition, allowing quantum computers to process vast amounts of information at once. Additionally, qubits can be entangled, meaning the state of one qubit can depend on the state of another, regardless of the distance between them. This entanglement enables quantum computers to perform complex calculations more efficiently than classical computers. Quantum computers manipulate the states of qubits through quantum gates, which are the quantum equivalent of classical logic gates. The combination of superposition, entanglement, and quantum gates allows

In [20]:
response = agent_executor.invoke({"input": "Who is Michio Kaku?"})
print(response)

{'input': 'Who is Michio Kaku?', 'output': 'Michio Kaku is a theoretical physicist, futurist, and popular science communicator. He is a professor of theoretical physics at the City University of New York and is known for his work in string theory and quantum physics. Kaku is also an author of several books, including "Quantum Supremacy," which discusses the rise of quantum computers. He frequently appears in media to discuss scientific topics and the future of technology.'}


In [21]:
response = agent_executor.invoke({"input": "How fast is a quantum computer?"})
print(response)

{'input': 'How fast is a quantum computer?', 'output': 'Quantum computers can theoretically be infinitely faster than classical computers due to their use of qubits, which allow for simultaneous calculations. However, practical applications are still in development, and they do not consistently outperform classical computers across all tasks yet.'}


# Ask a complete random question not related at all with the video

In [22]:
response = agent_executor.invoke({"input": "history of portugal in XV century?"})
print(response)

{'input': 'history of portugal in XV century?', 'output': "The 15th century was a pivotal period in Portuguese history, marked by significant exploration and maritime expansion. Key events include:\n\n1. **Age of Discoveries**: Portugal became a leading maritime power, initiating the Age of Discoveries. This era saw explorers like Prince Henry the Navigator promoting voyages along the African coast.\n\n2. **Exploration of Africa**: Portuguese explorers reached the Azores and Madeira Islands, and they began to explore the West African coast, establishing trade routes and colonies.\n\n3. **Vasco da Gama**: In 1498, Vasco da Gama successfully sailed around the Cape of Good Hope to reach India, opening up a sea route that would enhance trade and establish Portuguese influence in Asia.\n\n4. **Colonization**: The 15th century also saw the beginning of Portuguese colonization in the Atlantic and the establishment of trading posts in Africa and Asia.\n\n5. **Treaty of Tordesillas**: In 1494, 

In [23]:
import pinecone
import time

# Initialize Pinecone client directly
pinecone_client = pinecone.Pinecone(api_key=pinecone_api_key, environment='us-west1-gcp')

index_name = 'langchain-multi-query'

if index_name not in pinecone_client.list_indexes().names():  # Use .names() to get list of index names
    # Define index configuration
    index_spec = {
        'dimension': 1536,
        'metric': 'cosine'
    }
    pinecone_client.create_index(name=index_name, **index_spec)  # Pass index name as 'name'
    while not pinecone_client.describe_index(index_name).status['ready']:
        time.sleep(1)

index = pinecone_client.Index(index_name)

In [24]:
len(texts)

17

In [25]:
batch_size = 1

for i in tqdm(range(0, len(texts), batch_size)):
    i_end = min(i+batch_size, len(texts))
    ids = [str(uuid4()) for _ in range(i_end-i)]
    embeds = embed.embed_documents(texts[i:i_end])
    index.upsert(vectors=zip(ids, embeds, metadatas[i:i_end]))
    time.sleep(1)

  0%|          | 0/17 [00:00<?, ?it/s]

In [26]:
text_field = "text"

vectorstore = PineconeVectorStore(index, embed, text_field)
llm = ChatOpenAI(temperature=0.0, openai_api_key=openai_api_key)

from langchain.retrievers.multi_query import MultiQueryRetriever

retriever = MultiQueryRetriever.from_llm(retriever=vectorstore.as_retriever(), llm=llm)

import logging

logging.basicConfig()
logging.getLogger("langchain.retrievers.multi_query").setLevel(logging.INFO)
question = "How fast is a quantum computer?"

texts = retriever.invoke(input=question)

INFO:langchain.retrievers.multi_query:Generated queries: ['What is the speed of a quantum computer?', 'Can you tell me the velocity of a quantum computer?', 'How quickly can a quantum computer operate?']


In [27]:
len(texts)

2

In [28]:
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain

QA_PROMPT = PromptTemplate(
    input_variables=["query", "contexts"],
    template="""You are a helpful assistant who answers user queries using the contexts provided. If the question cannot be answered using the information provided say "I don't know".

    Contexts:
    {contexts}

    Question: {query}
    Answer:"""
)

qa_chain = LLMChain(llm=llm, prompt=QA_PROMPT, verbose=False)

output = qa_chain(inputs={"query": question, "contexts": "\n---\n".join([d.page_content for d in texts])})
print(output["text"])

  warn_deprecated(
  warn_deprecated(


In principle, a quantum computer is infinitely faster than a digital computer.


In [29]:
from langchain.chains import TransformChain, SequentialChain

def retrieval_transform(inputs: dict) -> dict:
    texts = retriever.get_relevant_documents(query=inputs["question"])
    texts = [d.page_content for d in texts]
    texts_dict = {
        "query": inputs["question"],
        "contexts": "\n---\n".join(texts)
    }
    return texts_dict

retrieval_chain = TransformChain(
    input_variables=["question"],
    output_variables=["query", "contexts"],
    transform=retrieval_transform
)

rag_chain = SequentialChain(
    chains=[retrieval_chain, qa_chain],
    input_variables=["question"],
    output_variables=["query", "contexts", "text"],
    verbose=True
)
output = rag_chain({"question": question})
print(output["text"])



[1m> Entering new SequentialChain chain...[0m


  warn_deprecated(
INFO:langchain.retrievers.multi_query:Generated queries: ['What is the speed of a quantum computer?', 'Can you tell me the velocity of a quantum computer?', 'What is the rate at which a quantum computer operates?']



[1m> Finished chain.[0m
In principle, a quantum computer is infinitely faster than a digital computer.


In [30]:
class SimpleListOutputParser(ListOutputParser):
    def parse(self, text: str) -> List[str]:
        # Split the text into lines and handle potential empty strings
        return [line.strip() for line in text.split("\n") if line.strip()]

output_parser = SimpleListOutputParser()

template = """
Your task is to generate 3 different queries that aim to answer the user question from multiple perspectives.
Every query MUST tackle the question from a different viewpoint, we want to get a variety of RELEVANT search results.
Provide these alternative questions separated by newlines.
Original question: {question}
"""

QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template=template
)

llm = OpenAI(temperature=0.3, openai_api_key=openai_api_key)

llm_chain = LLMChain(llm=llm, prompt=QUERY_PROMPT, output_parser=output_parser)

# Debug: Test the LLMChain output
test_question = "What are the effects of climate change?"
result = llm_chain.invoke(test_question)
print("LLMChain output:", result)

# Assuming vectorstore is defined elsewhere
retriever = MultiQueryRetriever(
    retriever=vectorstore.as_retriever(),
    llm_chain=llm_chain,
    parser_key="lines"  # This should match the attribute name in the output parser
)

# Debug: Print the type and content of 'question'
print("Type of question:", type(question))
print("Content of question:", question)

texts = retriever.get_relevant_documents(query=question)
print("Number of retrieved texts:", len(texts))

LLMChain output: {'question': 'What are the effects of climate change?', 'text': ['1. How does climate change impact global temperatures and weather patterns?', '2. What are the environmental consequences of climate change, such as rising sea levels and loss of biodiversity?', '3. In what ways does climate change affect human health and economies, both locally and globally?']}
Type of question: <class 'str'>
Content of question: How fast is a quantum computer?


INFO:langchain.retrievers.multi_query:Generated queries: ['1. What is the speed of a quantum computer compared to a traditional computer?', '2. Can a quantum computer solve complex problems faster than a classical computer?', '3. How does the processing speed of a quantum computer differ from a regular computer?']


Number of retrieved texts: 2


In [31]:
class SimpleListOutputParser(ListOutputParser):
    def parse(self, text: str) -> List[str]:
        # Split the text into lines and handle potential empty strings
        return [line.strip() for line in text.split("\n") if line.strip()]

output_parser = SimpleListOutputParser()

template = """
Your task is to generate 3 different search queries that aim to answer the question from multiple perspectives. The user questions are focused on Quantum Computing, AI, future technology and related subjects.
Every query MUST tackle the question from a different viewpoint, we want to get a variety of RELEVANT search results.
Provide these alternative questions separated by newlines.
Original question: {question}
"""

QUERY_PROMPT = PromptTemplate(
    input_variables=["question"],
    template=template
)

llm = OpenAI(temperature=0.3, openai_api_key=openai_api_key)

llm_chain = LLMChain(llm=llm, prompt=QUERY_PROMPT, output_parser=output_parser)

# Debug: Test the LLMChain output
test_question = "What are the effects of climate change?"
result = llm_chain.invoke(test_question)
print("LLMChain output:", result)

# Assuming vectorstore is defined elsewhere
retriever = MultiQueryRetriever(
    retriever=vectorstore.as_retriever(),
    llm_chain=llm_chain,
    parser_key="lines"  # This should match the attribute name in the output parser
)

# Debug: Print the type and content of 'question'
print("Type of question:", type(question))
print("Content of question:", question)

texts = retriever.get_relevant_documents(query=question)
print("Number of retrieved texts:", len(texts))

LLMChain output: {'question': 'What are the effects of climate change?', 'text': ['1. How is quantum computing being used to study and address the effects of climate change?', '2. What role does AI play in predicting and mitigating the effects of climate change?', '3. What advancements in future technology are being developed to combat the effects of climate change?']}
Type of question: <class 'str'>
Content of question: How fast is a quantum computer?


INFO:langchain.retrievers.multi_query:Generated queries: ['1. What are the advantages of using a quantum computer over a traditional computer?', '2. How does the speed of a quantum computer compare to the speed of a supercomputer?', '3. Can quantum computing technology be used to solve complex problems faster than classical computing methods?']


Number of retrieved texts: 3
