<a href="https://colab.research.google.com/github/erdincozsertel/MultiGroupGenAI_ProjectDemo_Colab/blob/main/MultiGroupGenAI_ProjectDemo3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
"""
Movie/TV Recommender RAG Demo using Haystack & Gemini with Langfuse

Adapted for Movie/TV Recommendations and Langfuse logging
"""

# --- 1. Install Dependencies ---
!pip install -U -q haystack-ai google-ai-haystack sentence-transformers trafilatura langfuse-haystack hf_xet

# --- 2. Set Up API Keys & Langfuse ---
import os
import logging
from google.colab import userdata # Use Colab secrets for API keys
from getpass import getpass

# Fetch keys from Colab secrets
try:
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
    SERPERDEV_API_KEY = userdata.get('SERPERDEV_API_KEY')
    # Langfuse Credentials
    LANGFUSE_PUBLIC_KEY = userdata.get('LANGFUSE_PUBLIC_KEY')
    LANGFUSE_SECRET_KEY = userdata.get('LANGFUSE_SECRET_KEY')
    LANGFUSE_HOST = userdata.get('LANGFUSE_HOST')
    if LANGFUSE_HOST is None:
        LANGFUSE_HOST = "https://cloud.langfuse.com"
    HF_TOKEN = userdata.get('HF_TOKEN')

except userdata.SecretNotFoundError as e:
    print(f"API key or Langfuse credential not found in Colab Secrets: {e}. Please add them.")
    print("Secrets needed: GOOGLE_API_KEY, SERPERDEV_API_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY (and optionally LANGFUSE_HOST)")
    raise ValueError("Required secrets not configured in Colab Secrets.")


# Set environment variables for Haystack components and Langfuse
os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY
os.environ["SERPERDEV_API_KEY"] = SERPERDEV_API_KEY
os.environ["LANGFUSE_PUBLIC_KEY"] = LANGFUSE_PUBLIC_KEY
os.environ["LANGFUSE_SECRET_KEY"] = LANGFUSE_SECRET_KEY
os.environ["LANGFUSE_HOST"] = LANGFUSE_HOST
os.environ["HF_TOKEN"] = HF_TOKEN
# Optional: Enable Haystack content tracing for debugging if needed (Langfuse also captures IO)
os.environ["HAYSTACK_CONTENT_TRACING_ENABLED"] = "true"

# --- Logging Configuration ---
logging.basicConfig(format="%(levelname)s - %(name)s -  %(message)s", level=logging.WARNING)
logging.getLogger("haystack").setLevel(logging.INFO)
print("Inital setup is completed")

Inital setup is completed


In [None]:
# --- 3. Import Haystack Components ---
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import (
    SentenceTransformersDocumentEmbedder,
    SentenceTransformersTextEmbedder
)
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.websearch import SerperDevWebSearch
from haystack.components.builders import ChatPromptBuilder
from haystack.components.routers import ConditionalRouter
from haystack.dataclasses import ChatMessage
from haystack_integrations.components.generators.google_ai.chat.gemini import GoogleAIGeminiChatGenerator
# Import Langfuse Connector
from haystack_integrations.components.connectors.langfuse import LangfuseConnector


# --- 4. Define Document Store ---
document_store = InMemoryDocumentStore()

In [None]:
# --- 3. Import Haystack Components ---

from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.fetchers import LinkContentFetcher
from haystack.components.converters import HTMLToDocument
from haystack.components.writers import DocumentWriter
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import (
    SentenceTransformersDocumentEmbedder,
    SentenceTransformersTextEmbedder
)
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.websearch import SerperDevWebSearch
from haystack.components.builders import ChatPromptBuilder
from haystack.components.routers import ConditionalRouter
from haystack.dataclasses import ChatMessage # Ensure ChatMessage is imported if not already
from haystack_integrations.components.generators.google_ai.chat.gemini import GoogleAIGeminiChatGenerator
from haystack_integrations.components.connectors.langfuse import LangfuseConnector

# --- 4. Define Document Store ---
document_store = InMemoryDocumentStore()

In [None]:
# --- 5. Build Indexing Pipeline ---
# (Indexing pipeline remains the same as before - Langfuse primarily traces query pipelines)
print("Building Indexing Pipeline...")
indexing_pipeline = Pipeline()
indexing_pipeline.add_component(instance=LinkContentFetcher(), name="fetcher")
indexing_pipeline.add_component(instance=HTMLToDocument(), name="converter")
indexing_pipeline.add_component(instance=DocumentCleaner(), name="cleaner")
indexing_pipeline.add_component(instance=DocumentSplitter(split_by="sentence", split_length=10, split_overlap=2), name="splitter")
indexing_pipeline.add_component(instance=SentenceTransformersDocumentEmbedder(model="sentence-transformers/all-mpnet-base-v2"), name="embedder")
indexing_pipeline.add_component(instance=DocumentWriter(document_store=document_store), name="writer")

indexing_pipeline.connect("fetcher.streams", "converter.sources")
indexing_pipeline.connect("converter.documents", "cleaner")
indexing_pipeline.connect("cleaner", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
print("Indexing Pipeline Built.")

# --- 6. Index Your Data Sources ---
urls_to_index = [
  "https://www.imdb.com/chart/top",
  "https://www.imdb.com/chart/toptv",
  "https://myanimelist.net/topanime.php",
  "https://myanimelist.net/topanime.php?type=airing",
  "https://myanimelist.net/topanime.php?type=upcoming",
  "https://myanimelist.net/topanime.php?type=bypopularity",
  "https://www.themoviedb.org/movie",
  "https://www.themoviedb.org/tv",
  "https://www.themoviedb.org/movie/now-playing",
  "https://www.themoviedb.org/tv/on-the-air"
]

if not document_store.count_documents():
    print(f"Indexing {len(urls_to_index)} URLs...")
    try:
        indexing_pipeline.run({"fetcher": {"urls": urls_to_index}})
        print(f"Successfully indexed {document_store.count_documents()} documents.")
    except Exception as e:
        print(f"Error during indexing: {e}")
else:
     print(f"Document store already contains {document_store.count_documents()} documents. Skipping indexing.")

In [None]:
# --- 7. Build Advanced RAG Query Pipeline with Langfuse ---
print("Building Query Pipeline with Langfuse...")

prompt_template = """
{% if web_documents %}
    You were asked to answer the following query about movies/TV shows/anime based on the indexed documents, but the context was not enough.
    Answer the question based on the given web search context.
    Provide the sources/links you used from the web search context if possible.

    User Question: {{ query }}

    Web Search Context:
    {% for document in web_documents %}
    URL: {{document.meta.link}}
    Content: {{document.content}}
    ---
    {% endfor %}
{% else %}
    You are a helpful assistant for recommending movies, TV series, and anime.
    Answer the following query based ONLY on the documents provided below (e.g., top lists, descriptions).

    Documents:
    {% for document in documents %}
    {{document.content}}
    {% endfor %}

    Query: {{query}}

    Provide recommendations or answer the question based on the documents.
    If the documents do NOT provide enough information to answer the query or give recommendations, ONLY output the text 'N0_ANSWER'. Do not add any other explanation.
{% endif %}
"""
prompt = [ChatMessage.from_user(prompt_template)]

main_routes = [
    {
        "condition": "{{'N0_ANSWER' in replies[0].text.replace('\\n', '')}}",
        "output" :"{{query}}",
        "output_name": "go_web",
        "output_type": str,
    },
    {
        "condition": "{{'N0_ANSWER' not in replies[0].text.replace('\\n', '')}}",
        "output": "{{replies[0].text}}",
        "output_name": "answer",
        "output_type": str,
    },
]

# Define the query pipeline
query_pipeline = Pipeline(max_runs_per_component=5)

# Add Langfuse Connector FIRST (or anywhere, it hooks automatically)
# Name your Langfuse project appropriately
query_pipeline.add_component("tracer", LangfuseConnector("Movie Recommender RAG Demo 3"))

# Add the rest of the components
query_pipeline.add_component("embedder", SentenceTransformersTextEmbedder(model="sentence-transformers/all-mpnet-base-v2"))
query_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=5))
query_pipeline.add_component("prompt_builder", ChatPromptBuilder(template=prompt))
query_pipeline.add_component("llm", GoogleAIGeminiChatGenerator(model="gemini-1.5-flash", generation_config={"temperature": 0.7}))
query_pipeline.add_component("web_search", SerperDevWebSearch())
query_pipeline.add_component("router", ConditionalRouter(main_routes))

# Connect components (LangfuseConnector doesn't need explicit connection)
query_pipeline.connect("embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever.documents", "prompt_builder.documents")
query_pipeline.connect("prompt_builder.prompt", "llm.messages")
query_pipeline.connect("llm.replies", "router.replies")
query_pipeline.connect("router.go_web", "web_search.query")
query_pipeline.connect("web_search.documents", "prompt_builder.web_documents")

print("Query Pipeline Built with Langfuse.")

In [None]:
# --- 8. Run a Query (Traces will be sent to Langfuse) ---
print("\n--- Running Sample Query (check Langfuse for trace) ---")
query = "Recommend some highly rated sci-fi TV shows."
print(f"Query: {query}")

try:
    pipeline_input = {
        "embedder": {"text": query},
        "prompt_builder": {"query": query},
        "router": {"query": query}
    }
    result = query_pipeline.run(pipeline_input)

    print(result["llm"]["replies"])  # Inspect the replies structure

    if "answer" in result.get("router", {}):
        final_answer = result["router"]["answer"]
        if isinstance(final_answer, list):
             final_answer = final_answer[0] if final_answer else "No answer found."
        print("\nAnswer:")
        print(final_answer)
    else:
         print("\nSorry, I couldn't generate a recommendation based on the available information.")
         if "documents" in result.get("web_search", {}):
             print("\nWeb Search Results considered:")
             for doc in result["web_search"]["documents"]:
                 print(f"- {doc.meta.get('title', 'No Title')}: {doc.meta.get('link', 'No Link')}")

except Exception as e:
    print(f"\nAn error occurred while running the query: {e}")

# --- Example of another query ---
print("\n--- Running Another Query (check Langfuse for trace) ---")
query_2 = "What is the plot of The Shawshank Redemption?"
print(f"Query: {query_2}")

try:
    pipeline_input_2 = {
        "embedder": {"text": query_2},
        "prompt_builder": {"query": query_2},
        "router": {"query": query_2}
    }
    result_2 = query_pipeline.run(pipeline_input_2)

    if "answer" in result_2.get("router", {}):
        final_answer_2 = result_2["router"]["answer"]
        if isinstance(final_answer_2, list):
             final_answer_2 = final_answer_2[0] if final_answer_2 else "No answer found."
        print("\nAnswer:")
        print(final_answer_2)
    else:
         print("\nSorry, I couldn't generate an answer based on the available information.")
         if "documents" in result_2.get("web_search", {}):
             print("\nWeb Search Results considered:")
             for doc in result_2["web_search"]["documents"]:
                 print(f"- {doc.meta.get('title', 'No Title')}: {doc.meta.get('link', 'No Link')}")

except Exception as e:
    print(f"\nAn error occurred while running the query: {e}")