# Gemini Becomes an Expert

The idea builds on Google's Gemini LLM to offer detailed responses on specialized topics. It addresses the limitations of language models, such as their tendency to miss nuanced details or very specific formulas where accuracy is critical, and their potential for generating incorrect information or "hallucinations." By converting user queries into targeted searches across multiple platforms—Wikipedia for foundational concepts, the web for in-depth technical details, YouTube for explanatory and comparative content, and Arxiv for scholarly articles—the algorithm ensures a thorough collection of information. A web scraping component then aggregates this data into a vast text file, often exceeding a million characters. This method provides a robust foundation of context, functioning as a solution to enhance the accuracy and depth of the responses generated by the LLM, mimicking the comprehensive research typically conducted by human experts.

In [4]:
# Install all the necessary libraries
!pip install google-generativeai
!pip install arxiv2text
!pip install arxiv
!pip install googlesearch-python
!pip install beautifulsoup4
!pip install youtube-transcript-api

Collecting arxiv2text
  Downloading arxiv2text-0.1.14-py3-none-any.whl.metadata (4.4 kB)
Collecting pdfminer-six (from arxiv2text)
  Downloading pdfminer.six-20240706-py3-none-any.whl.metadata (4.1 kB)
Collecting PyPDF2 (from arxiv2text)
  Downloading pypdf2-3.0.1-py3-none-any.whl.metadata (6.8 kB)
Downloading arxiv2text-0.1.14-py3-none-any.whl (15 kB)
Downloading pdfminer.six-20240706-py3-none-any.whl (5.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m53.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hDownloading pypdf2-3.0.1-py3-none-any.whl (232 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m11.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: PyPDF2, pdfminer-six, arxiv2text
Successfully installed PyPDF2-3.0.1 arxiv2text-0.1.14 pdfminer-six-20240706
Collecting arxiv
  Downloading arxiv-2.1.3-py3-none-any.whl.metadata (6.1 kB)
Collecting feedparser~=6.0.10 (from arxiv)
  Dow

In [23]:
# Import all the libraries we will need
# Standard library imports
import datetime
import json
import re
import time
from concurrent.futures import ThreadPoolExecutor, as_completed

# Related third-party imports
import arxiv
import requests
from arxiv2text import arxiv_to_text
from bs4 import BeautifulSoup
from googlesearch import search
from tqdm import tqdm
from youtube_transcript_api import YouTubeTranscriptApi

# Imports from the google.generativeai library
import google.generativeai as genai
from google.generativeai import caching
from google.generativeai.types import GenerationConfig

# Initialize the arxiv client
client = arxiv.Client()

In [9]:
# Use Kaggle secrets to store and retrieve your Gemini API Key
from kaggle_secrets import UserSecretsClient
user_secrets = UserSecretsClient()
api_key = user_secrets.get_secret("API_KEY")
# Set it!
genai.configure(api_key=api_key)

In [6]:
user_query = "I want to understand the diffusion implicit model (DDIM). More exactly, how is it deterministic if the counterpart works in stochasticly."

## Extract Online Knowledge

### Use an Agent to Create Queries

In [7]:
google_searcher_agent_instructions = """You are a google searcher agent. From a given user query, your job is to create a series of google search queries (from general to specific) so you can become an expert in the matter before answering.
Do not literally copy the user's query, instead, be intelligent about it and use your own knowledge in the matter to create a list of queries that the user might find best helpful.
You will respond only in JSON format. Answer only with queries (from general to specific) split by the sites: web, wikipedia, arxiv and youtube, WITHOUT OEVERLAPPING!.  Fill them however you want, but do not write too many queries (at most 4).
DO NOT ANSWER THE USER QUERY! THAT IS NOT YOUR TASK!
Examples:

# EXAMPLE 1
## USER INPUT
User query: "I want to learn more about generative AI models, specifically diffusion models, and how they compare to GANs."
## YOUR OUTPUT
{
    "wikipedia": ["Generative AI"],
    "web": ["How autorregresive denoising diffusion models work"],
    "youtube": ["Diffusion models explained", "GANs vs Diffusion models comparison"],
    "arxiv": ["Normalizing Flows", "Variational Autoencoders", "Generative Adversarial Network", "Autorregresive Denoising Probabilistic Models"]
}

# EXAMPLE 2
## USER INPUT
User query: "Puedes escribir un articulo sobre manolo valdes (el escultor)? Biografia, sus obras, etc"
## YOUR OUTPUT
{
    "wikipedia": ["Manolo Valdés", "Spanish sculptors", "Pop Art in Spain"],
    "web": ["Manolo Valdés biography", "Manolo Valdés famous sculptures", "Analysis of Manolo Valdés artworks"],
    "youtube": [],
    "arxiv": []
}
"""

In [10]:
# We call out first agent
google_searcher_agent = genai.GenerativeModel(
    "gemini-1.5-flash-002",
    system_instruction=google_searcher_agent_instructions,
    generation_config={"response_mime_type": "application/json"}
)
response = google_searcher_agent.generate_content("User query: " + user_query)
google_queries = json.loads(response.text)
google_queries

{'wikipedia': ['Diffusion models', 'Markov chains'],
 'web': ['DDIM algorithm explained',
  'Deterministic diffusion models',
  'Comparison of DDIM and DDPM'],
 'youtube': ['DDIM tutorial', 'Understanding diffusion models'],
 'arxiv': ['Denoising Diffusion Probabilistic Models',
  'Improved Denoising Diffusion Probabilistic Models']}

### Extract information from resources

In [15]:
# We create all necessary functions to get the context from the internet

def google_search(query, num_results):
    # Use google-search-api to get the top-n best results
    query = f"{query}"
    result = search(query, num_results=num_results)
    return result


def extract_and_format_text_from_url(url):
    # Extract all readable text from a website
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
    except requests.RequestException as e:
        return f"Failed to retrieve content: {e}"
    soup = BeautifulSoup(response.text, 'html.parser')
    tags_of_interest = {f"h{i}": "#" * i for i in range(1, 7)}
    tags_of_interest["p"] = ''
    formatted_text_segments = []
    for element in soup.descendants:
        if element.name in tags_of_interest:
            prefix = tags_of_interest[element.name]
            text = element.get_text(strip=True)
            if text:
                if prefix:
                    formatted_text_segments.append(f"{prefix} {text}\n")
                else:
                    formatted_text_segments.append(f"{text}\n")
    formatted_text = '\n'.join(formatted_text_segments)
    return formatted_text


def process_link(site_query, link, result_type, i, num_links):
    # Processes general website
    if result_type == 'web' and any(sub in link for sub in ["insta", "youtube", "wiki"]):
        return None
    try:
        web_text = extract_and_format_text_from_url(link)
    except Exception as e:
        print(f"Error processing link {link}: {e}")
        return None
    header = f"""
    -------------------------
    QUERY: {site_query}
    {result_type.upper()} RESULTS {i + 1}/{num_links}
    URL: {link}
    -------------------------

    """
    return header + web_text


def process_web_query(site_query, num_results=10):
    # Processes web query in parallel
    links = list(google_search(site_query, num_results))
    num_links = len(links)
    results = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(process_link, site_query, link, 'web', i, num_links)
            for i, link in enumerate(links)
        ]
        for future in as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
    return "\n\n".join(results)


def process_wikipedia_query(site_query, num_results=2):
    # Processes wikipedia query in parallel
    links = list(google_search(f"{site_query} site:wikipedia.org", num_results))
    num_links = len(links)
    results = []
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(process_link, site_query, link, 'wikipedia', i, num_links)
            for i, link in enumerate(links)
        ]
        for future in as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
    return "\n\n".join(results)


def process_arxiv_query(arxiv_query, num_results=5):
    # Processes Arxiv query in parallel
    arxiv_search = arxiv.Search(
        query=arxiv_query,
        max_results=num_results,
        sort_by=arxiv.SortCriterion.Relevance
    )
    result_urls = list(client.results(arxiv_search))
    num_results = len(result_urls)
    results = []

    def process_arxiv_result(i, result_url):
        try:
            result_pdf_url = result_url.pdf_url.replace("/abs/", "/pdf/")
            extracted_text = arxiv_to_text(result_pdf_url)
            header = f"""
            -------------------------
            QUERY: {arxiv_query}
            ARXIV RESULTS {i + 1}/{num_results}
            URL: {result_url}
            -------------------------

            """
            return header + extracted_text
        except Exception as e:
            print("Error with arXiv...", e)
            return None

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(process_arxiv_result, i, result_url)
            for i, result_url in enumerate(result_urls)
        ]
        for future in as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
    return "\n\n".join(results)


def process_youtube_query(site_query, num_results=5):
    # Processes YouTube query in parallel
    links = list(google_search(f"{site_query} site:youtube.com", num_results))
    num_links = len(links)
    results = []

    def process_youtube_link(i, link):
        try:
            if "https://www.youtube.com/watch?v=" not in link:
                return None
            youtube_id = link.split("v=")[-1].split("&")[0]
            transcripts = YouTubeTranscriptApi.list_transcripts(youtube_id)
            transcript = transcripts.find_transcript(['en', 'en-US', 'en-GB'])
            video_transcription = transcript.fetch()
            video_transcription = " ".join([r["text"] for r in video_transcription])
            header = f"""
            -------------------------
            QUERY: {site_query}
            YOUTUBE RESULTS {i + 1}/{num_links}
            URL: {link}
            (this is a transcription, text might have errors)
            -------------------------

            """
            return header + video_transcription
        except Exception as e:
            print("Error with YouTube...", e)
            return None

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [
            executor.submit(process_youtube_link, i, link)
            for i, link in enumerate(links)
        ]
        for future in as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
    return "\n\n".join(results)

In [18]:
# Execute search! Typically takes like ~45sec (thanks to parallelization!)

expert_context_parts = []
tasks = []

# Prepare tasks for all query types
for site_query in google_queries.get("web", []):
    tasks.append(('Web', process_web_query, site_query))

for site_query in google_queries.get("web", []):
    tasks.append(('Wikipedia', process_wikipedia_query, site_query))

for arxiv_query in google_queries.get("arxiv", []):
    tasks.append(('ArXiv', process_arxiv_query, arxiv_query))

for site_query in google_queries.get("youtube", []):
    tasks.append(('YouTube', process_youtube_query, site_query))

# Run all tasks in parallel
print("Starting all searches in parallel...")
with ThreadPoolExecutor(max_workers=20) as executor:
    futures = {
        executor.submit(func, query): (source, query)
        for source, func, query in tasks
    }
    for future in tqdm.tqdm(as_completed(futures), total=len(futures)):
        source, query = futures[future]
        try:
            result = future.result()
            if result:
                expert_context_parts.append(result)
        except Exception as e:
            print(f"Error processing {source} query '{query}': {e}")

expert_context = "\n\n".join(expert_context_parts)

# Save to file
print("Saving context file...")
path_to_context_file = 'expert_context.txt'
with open(path_to_context_file, "w", encoding="utf-8") as file:
    file.write(expert_context)
print("Done!")

Starting all searches in parallel...


100%|██████████| 10/10 [00:48<00:00,  4.81s/it]

Saving context file...
Done!





In [22]:
# Length reference
num_words = len(expert_context.split(" "))
harry_potter_sorcerers_stone_num_words = 76_944
print(f"Your expert context is {(num_words / harry_potter_sorcerers_stone_num_words):.2f} Harry Potter's (and the sorcerer's stone) book(s)!")

Your expert context is 1.27 Harry Potter's (and the sorcerer's stone) book(s)!


## Use the TXT as Context for the LLM

In [25]:
# Cache model system instruction with an small Chain of Thought added
system_instruction = """You are an agent with the task of resolving a user's questions and tasks in the most helpful way possible. The user will be talking about some topic, asking questions and everything.
You will be given a related huge context that is the result of an intensive web scrapping about the asked topic, use this context as reference and as help to answer the user's questions.
Rules:
- As every section has an URL, YOU MUST PROVIDE SOURCES FOR EVERY CLAIM YOU WRITE IN THE OUTPUT!
- No matter the context language, answer in the user's request language.
- Use Chain of Thought reasoning before answering. Before answering the user's request, write a <thinking> token with internal thoughts of how could you use the full given context in a way that helps the user in the best possible and unique way, using the huge provided context parts that the user might find best helpful. Whenever you finish thinking, write a </thinking> token and start answering normally. The user won't see whatever you write between your <thinking> and </thinking> tokens, just the answer.
"""

In [27]:
# Use the cache functionality in Gemini to store the context for further queries
path_to_context_file = 'expert_context.txt'
context_file = genai.upload_file(path=path_to_context_file)

while context_file.state.name == 'PROCESSING':
    print('Waiting for context to be processed.')
    time.sleep(2)
    video_file = genai.get_file(context_file.name)
print(f'Context processing complete: {context_file.uri}')

cache = caching.CachedContent.create(
    model='gemini-1.5-pro-002',
    display_name='context',
    system_instruction=system_instruction,
    contents=[context_file],
    ttl=datetime.timedelta(minutes=3)
)

Context processing complete: https://generativelanguage.googleapis.com/v1beta/files/zkmf10fkt34j


In [28]:
def remove_thinking_tags_precise(text):
    # Remove thinking tokens from Chain of Thought
    return re.sub(r'<thinking>.*?</thinking>', '', text, flags=re.DOTALL)


# We can create a model from this cache and start a chat
# High tokens output and low temperature to ensure low hallucinations and complete explanations
config = GenerationConfig(max_output_tokens=int(2**13), temperature=0.3)
model = genai.GenerativeModel.from_cached_content(
    cached_content=cache,
    generation_config=config
)
chat = model.start_chat()

In [29]:
# Send the original message
response = chat.send_message(user_query)
answer = remove_thinking_tags_precise(response.text)
print(answer)


DDIM is deterministic because it removes the stochastic element present in DDPM's sampling process.  Let's break down the key difference:

In DDPM (Denoising Diffusion Probabilistic Models), the reverse diffusion (sampling) process involves iteratively removing noise from a pure noise image to generate a sample. This process is *stochastic* because at each step, it adds random Gaussian noise.  This means that even with the same starting noise and the same model, DDPM can produce different final images due to the randomness injected at each step.

DDIM (Denoising Diffusion Implicit Models), on the other hand, modifies the sampling process to be *deterministic*.  It achieves this by eliminating the addition of random noise during the reverse diffusion process.  The equation for generating a sample `x_(t-1)` from `x_t` in DDIM is ([https://john-see.github.io/blog/2020/DDIM/](https://john-see.github.io/blog/2020/DDIM/)):

```
x_(t-1) = sqrt(α_t) * x_0 + sqrt(1-α_t-σ_t^2) * ε_θ(x_t) + σ_t 

In [None]:
# Keep chatting!

# Keep the conversation alive!
cache.update(ttl=datetime.timedelta(minutes=3))

# Send the next question (related to the same context)
user_query = "Add your message here!"
response = chat.send_message(user_query)
answer = remove_thinking_tags_precise(response.text)
print(answer)