In [1]:
# Import necessary libraries
import os
import re
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from langchain_core.documents import Document  # Fixing Chroma issue
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from langchain_community.document_loaders import UnstructuredMarkdownLoader, UnstructuredHTMLLoader

# ✅ Define folders where documents are stored
FOLDER_PATHS = ["kafka", "react", "spark"]

# ✅ Set API Key
openai_api_key = "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"
embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=openai_api_key)

# ✅ Define ChromaDB storage path
CHROMA_DB_PATH = "./chroma_db"

# ✅ Reset ChromaDB (Deletes Existing Data)
if os.path.exists(CHROMA_DB_PATH):
    print("🧹 Clearing existing ChromaDB...")
    for file in Path(CHROMA_DB_PATH).glob("*"):
        try:
            file.unlink()  # Delete files inside chroma_db
        except Exception as e:
            print(f"⚠️ Error deleting {file}: {e}")
    print("✅ ChromaDB reset complete!")

# ✅ Reinitialize ChromaDB (Fresh Start)
vector_db = Chroma(persist_directory=CHROMA_DB_PATH, embedding_function=embedding_model)


# ✅ Function to load documents in parallel
def load_documents(folder):
    all_docs = []
    for file_path in Path(folder).rglob("*"):
        if file_path.suffix == ".md":
            loader = UnstructuredMarkdownLoader(str(file_path))
        elif file_path.suffix == ".html":
            loader = UnstructuredHTMLLoader(str(file_path))
        else:
            continue  # Skip unsupported files

        try:
            docs = loader.load()
            for doc in docs:
                doc.metadata["source"] = str(file_path)  # Store file source separately
            all_docs.extend(docs)
            print(f"✅ Loaded: {file_path}")
        except Exception as e:
            print(f"❌ Error loading {file_path}: {e}")

    return all_docs


# ✅ Load all documents from folders using parallel processing
all_docs = []
with ThreadPoolExecutor() as executor:
    results = list(executor.map(load_documents, FOLDER_PATHS))
    for res in results:
        all_docs.extend(res)

print(f"\n📄 Successfully loaded {len(all_docs)} documents (Markdown + HTML)!")


# ✅ Improved Semantic Chunking
class SemanticChunker:
    """
    Custom chunker that:
    - Preserves Markdown headers (`#` tags).
    - Keeps code blocks intact.
    - Uses sentence-aware chunking for coherence.
    """

    def __init__(self, chunk_size=5000, min_chunk_size=2500, overlap_ratio=0.2):
        self.chunk_size = chunk_size
        self.min_chunk_size = min_chunk_size
        self.chunk_overlap = int(chunk_size * overlap_ratio)

    def split_text(self, text):
        """
        Splits text using semantic awareness:
        - **Markdown Headers:** Sections are split at `#`, `##`, `###`.
        - **Code Blocks:** Keeps fenced code blocks (` ``` `).
        - **Sentence-aware splitting:** Ensures logical sentence breaks.
        """

        # ✅ Step 1: **First split by Markdown headers (`# Section Title`)**
        sections = re.split(r'(#+ .+)', text)  # Keep headers with their sections
        final_chunks = []
        current_chunk = []

        for section in sections:
            # ✅ Step 2: **Detect Code Blocks and Keep Them Together**
            if "```" in section:
                if current_chunk:
                    final_chunks.append(" ".join(current_chunk))
                    current_chunk = []
                final_chunks.append(section.strip())  # Store entire code block
                continue

            # ✅ Step 3: **Apply Sentence Splitting Inside Each Section**
            sentences = re.split(r'(?<=[.!?])\s+', section)  # Sentence-aware split
            for sentence in sentences:
                if sum(len(s) for s in current_chunk) + len(sentence) < self.chunk_size:
                    current_chunk.append(sentence)
                else:
                    if sum(len(s) for s in current_chunk) >= self.min_chunk_size:
                        final_chunks.append(" ".join(current_chunk))
                    current_chunk = [sentence]

        if current_chunk:
            final_chunks.append(" ".join(current_chunk))  # Add remaining chunk

        return final_chunks


# ✅ Function to Chunk Documents Using Semantic Chunking
def chunk_documents(docs):
    all_chunks = []
    chunk_id = 1  # Global counter for chunk IDs

    for doc in docs:
        file_name = doc.metadata.get("source", "Unknown Source")

        # ✅ Use **Semantic Chunking**
        text_splitter = SemanticChunker(
            chunk_size=5000,  # Ensure chunks stay large
            min_chunk_size=2500,  # Avoid tiny fragments
            overlap_ratio=0.2  # Preserve 20% overlap
        )

        chunks = text_splitter.split_text(doc.page_content)

        for chunk in chunks:
            doc_chunk = Document(
                page_content=chunk,
                metadata={"file_name": file_name, "source": file_name}
            )
            all_chunks.append(doc_chunk)
            chunk_id += 1  # Increment chunk ID

    return all_chunks


# ✅ Apply Semantic Chunking to Documents
split_docs = chunk_documents(all_docs)

# ✅ Store in ChromaDB
if split_docs:
    vector_db.add_documents(split_docs)
    vector_db.persist()
    print("✅ Documents indexed with **Semantic Chunking**!")
    print(f"📌 Total documents indexed: {len(vector_db.get()['documents'])}")
else:
    print("⚠️ No documents to index in ChromaDB!")


🧹 Clearing existing ChromaDB...
⚠️ Error deleting chroma_db/66fd1c1c-4caf-4331-84c3-7755196ca10d: [Errno 21] Is a directory: 'chroma_db/66fd1c1c-4caf-4331-84c3-7755196ca10d'
⚠️ Error deleting chroma_db/32bcada1-cc68-4853-9ac2-cf33ceeb7971: [Errno 21] Is a directory: 'chroma_db/32bcada1-cc68-4853-9ac2-cf33ceeb7971'
⚠️ Error deleting chroma_db/41df36fb-c945-4753-a223-1572a7a1be35: [Errno 21] Is a directory: 'chroma_db/41df36fb-c945-4753-a223-1572a7a1be35'
⚠️ Error deleting chroma_db/727ecf7b-3b98-42a6-bd53-1cd2d26e4a02: [Errno 21] Is a directory: 'chroma_db/727ecf7b-3b98-42a6-bd53-1cd2d26e4a02'
⚠️ Error deleting chroma_db/33147542-47f4-4066-81b4-44ce82adc7a4: [Errno 21] Is a directory: 'chroma_db/33147542-47f4-4066-81b4-44ce82adc7a4'
⚠️ Error deleting chroma_db/a643151c-80f0-43ba-a883-f03db4ed2c4d: [Errno 21] Is a directory: 'chroma_db/a643151c-80f0-43ba-a883-f03db4ed2c4d'
⚠️ Error deleting chroma_db/077764bd-52ce-46aa-b148-70bad5edd9ef: [Errno 21] Is a directory: 'chroma_db/077764bd-52c

  vector_db = Chroma(persist_directory=CHROMA_DB_PATH, embedding_function=embedding_model)


✅ Loaded: kafka/implementation.html
✅ Loaded: kafka/security.html
✅ Loaded: kafka/toc.html
✅ Loaded: kafka/upgrade.html
✅ Loaded: kafka/zk2kraft.html
✅ Loaded: kafka/protocol.html
✅ Loaded: react/learn/tutorial-tic-tac-toe.md
✅ Loaded: react/learn/responding-to-events.md
✅ Loaded: kafka/configuration.html
✅ Loaded: spark/mllib-isotonic-regression.md
✅ Loaded: kafka/documentation.html
✅ Loaded: kafka/ecosystem.html
✅ Loaded: react/learn/typescript.md
✅ Loaded: spark/tuning.md
✅ Loaded: spark/sql-ref-syntax-qry-select-limit.md
✅ Loaded: react/learn/passing-props-to-a-component.md
✅ Loaded: kafka/introduction.html
✅ Loaded: spark/sql-ref-syntax-dml-insert-table.md
✅ Loaded: kafka/api.html
✅ Loaded: kafka/uses.html
✅ Loaded: react/learn/updating-objects-in-state.md
✅ Loaded: spark/streaming-kinesis-integration.md
✅ Loaded: spark/sql-error-conditions.md
✅ Loaded: kafka/quickstart.html
✅ Loaded: react/learn/describing-the-ui.md
✅ Loaded: spark/sql-ref-syntax-ddl-drop-function.md
✅ Loaded: sp

  vector_db.persist()


In [54]:
from rank_bm25 import BM25Okapi
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from sklearn.preprocessing import MinMaxScaler
import numpy as np

# ✅ Load ChromaDB & Initialize BM25
CHROMA_DB_PATH = "./chroma_db"
from dotenv import load_dotenv
import os

load_dotenv()  # load variables from .env
openai_api_key = os.getenv("OPENAI_API_KEY")
embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=openai_api_key)
# ✅ Load vector database
vector_db = Chroma(persist_directory=CHROMA_DB_PATH, embedding_function=embedding_model)

# ✅ Retrieve stored documents for BM25
stored_docs = vector_db.get(include=["documents", "metadatas"])
bm25_corpus = [doc.lower().split() for doc in stored_docs["documents"]] if stored_docs["documents"] else []
bm25_mapping = stored_docs["metadatas"] if stored_docs["metadatas"] else []

# ✅ Initialize BM25
bm25_model = BM25Okapi(bm25_corpus) if bm25_corpus else None


# 🔹 **Hybrid Search Function (With Chunk Text)**
def hybrid_search(query, top_k=5, alpha=0.5):
    """Perform Hybrid Retrieval using BM25 + Vector Search with Normalized Scores, including Chunk Text."""
    
    bm25_scores, bm25_results = [], []
    
    # ✅ **Vector Search Using ChromaDB**
    vector_results = vector_db.similarity_search_with_relevance_scores(query, k=top_k)

    # ✅ **BM25 Retrieval**
    if bm25_model:
        bm25_tokens = query.lower().split()
        bm25_raw_scores = bm25_model.get_scores(bm25_tokens)
        
        # Get top K BM25 results
        bm25_top_indices = np.argsort(bm25_raw_scores)[::-1][:top_k]
        
        for idx in bm25_top_indices:
            bm25_results.append((bm25_mapping[idx]["source"], bm25_raw_scores[idx], idx))  # Store index for lookup

    # ✅ **Normalize Scores for Fair Weighting**
    if bm25_results:
        bm25_scores = [score for _, score, _ in bm25_results]
        if max(bm25_scores) != min(bm25_scores):  # Prevent division by zero
            scaler = MinMaxScaler()
            bm25_scores = scaler.fit_transform(np.array(bm25_scores).reshape(-1, 1)).flatten()
        else:
            bm25_scores = [1.0] * len(bm25_scores)  # If all scores are the same, assign max

    vector_scores = [score for _, score in vector_results]
    if max(vector_scores) != min(vector_scores):  # Prevent division by zero
        scaler = MinMaxScaler()
        vector_scores = scaler.fit_transform(np.array(vector_scores).reshape(-1, 1)).flatten()
    else:
        vector_scores = [1.0] * len(vector_scores)

    # ✅ **Combine BM25 + Vector Scores using α Weighting**
    final_results = []
    for i, (bm25_doc, bm25_score, bm25_idx) in enumerate(zip(bm25_results, bm25_scores, bm25_top_indices)):
        # 🔹 **Retrieve the correct chunk text**
        bm25_chunk_text = stored_docs["documents"][bm25_idx] if bm25_idx < len(stored_docs["documents"]) else "⚠️ Chunk text not found"
        weighted_score = (alpha * vector_scores[i]) + ((1 - alpha) * bm25_score)
        final_results.append((bm25_doc[0], bm25_chunk_text, weighted_score))

    for i, (vec_doc, vec_score) in enumerate(vector_results):
        weighted_score = (alpha * vec_score) + ((1 - alpha) * bm25_scores[i])  # Reverse mix
        final_results.append((vec_doc.metadata["source"], vec_doc.page_content, weighted_score))

    # ✅ **Sort Final Results by Weighted Score**
    final_results = sorted(final_results, key=lambda x: x[2], reverse=True)[:top_k]

    return final_results


# ✅ **Test Hybrid Search**
query = "What is the compiler?"
results = hybrid_search(query, top_k=6, alpha=0.6)  # Favor Embeddings slightly

# 🔹 **Display Results (Including Chunk Text)**
for idx, (doc, chunk_text, score) in enumerate(results, 1):
    print(f"🔹 Result {idx} (Score: {score:.4f}):")
    print(f"📌 **Source:** {doc}")
    print(f"📝 **Chunk Content:**\n{chunk_text}...")  # Show first 500 characters
    print("-" * 100)



🔹 Result 1 (Score: 1.0000):
📌 **Source:** react/learn/react-compiler.md
📝 **Chunk Content:**
title: React Compiler

Getting started with the compiler

Installing the compiler and ESLint plugin

Troubleshooting

The latest Beta release can be found with the @beta tag, and daily experimental releases with @experimental.

React Compiler is a new compiler that we've open sourced to get early feedback from the community. It is a build-time only tool that automatically optimizes your React app. It works with plain JavaScript, and understands the Rules of React, so you don't need to rewrite any code to use it.

The compiler also includes an ESLint plugin that surfaces the analysis from the compiler right in your editor. We strongly recommend everyone use the linter today. The linter does not require that you have the compiler installed, so you can use it even if you are not ready to try out the compiler.

The compiler is currently released as beta, and is available to try out on React 17+ app

In [52]:
import os
import json
from json.decoder import JSONDecodeError
from typing import List, Dict, Any, Tuple

import google.generativeai as genai
from langchain_google_genai import ChatGoogleGenerativeAI

###############################################################################
# 1) Gemini-based LLM Setup
###############################################################################
# Optional: If you rely on ADC or a service account, you may not need configure() or an API key explicitly.
# Otherwise:
# genai.configure(api_key="YOUR_API_KEY")

model = ChatGoogleGenerativeAI(
    model="gemini-2.0-flash",   # or "gemini-1.5-flash"
    temperature=0,
    max_tokens=8192,
    timeout=None,
    max_retries=5
)

###############################################################################
# 2) JSON-parsing utility
###############################################################################
def get_json_output(llm_client, messages: List[Dict], num_retries: int = 2) -> Any:
    """
    Invokes the LLM with a JSON-oriented request (messages) and attempts to
    parse the response. Retries if a JSONDecodeError occurs.
    Returns a Python dict/list if successful, or raises if repeated failure.
    """
    for attempt in range(num_retries):
        try:
            generation_config = {"response_mime_type": "application/json"}
            response = llm_client.invoke(messages, generation_config=generation_config)
            return json.loads(response.content)
        except JSONDecodeError as e:
            if attempt < num_retries - 1:
                print(f"JSON decoding failed (attempt {attempt+1}), retrying...\n{e}")
            else:
                raise e



###############################################################################
# STEP A: Answer + Follow-Ups + Citations (Strict JSON)
###############################################################################
def answer_question_with_followups(query: str, top_k: int = 6, alpha: float = 0.6) -> Dict[str, Any]:
    """
    1) Runs hybrid_search to get relevant chunks (full text, no truncation).
    2) Asks LLM for a structured answer, with "answer", "follow_up_questions", "citations".
    3) Must return strict JSON, disclaim if not found in docs.
    """

    # Retrieve relevant chunks
    search_results = hybrid_search(query, top_k=top_k, alpha=alpha)

    # Build context string
    chunk_blocks = []
    for (src, text, score) in search_results:
        chunk_blocks.append(f"--- [Source: {src}] ---\n{text}\n")
    context_str = "\n".join(chunk_blocks)

    # Refined prompt for clarity, disclaimers, style
    system_message = (
        "You are an advanced AI specialized in Apache Spark, Apache Kafka, and React. "
        "Use ONLY the provided internal documents (the 'Bible') for your response. "
        "If the docs do not cover the query, disclaim it.\n\n"
        "Return strict JSON with exactly these keys:\n"
        '  "answer": a structured factual answer,\n'
        '  "follow_up_questions": array of exactly 5 relevant next questions,\n'
        '  "citations": array of chunk sources used.\n'
        "Do not include any extra keys. If you cannot find info in the docs, say so in 'answer'.\n"
    )

    user_message = (
        f"User Question: {query}\n\n"
        "Relevant Context:\n"
        f"{context_str}\n\n"
        "Respond in JSON. Example:\n"
        "{\n"
        '  "answer": "...",\n'
        '  "follow_up_questions": ["Q1", "Q2", "Q3", "Q4", "Q5"],\n'
        '  "citations": ["source1", "source2"]\n'
        "}"
    )

    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_message},
    ]

    # LLM call
    try:
        parsed = get_json_output(model, messages, num_retries=2)
        if not isinstance(parsed, dict):
            # We can gracefully handle if the LLM returns something that's not a dict
            return {
                "error": "Invalid JSON or structure",
                "answer": None,
                "follow_up_questions": [],
                "citations": []
            }
        return parsed
    except Exception as e:
        return {
            "error": str(e),
            "answer": None,
            "follow_up_questions": [],
            "citations": []
        }

###############################################################################
# STEP B: Confidence Evaluation
###############################################################################
def evaluate_confidence(query: str, final_answer: str, context_str: str) -> float:
    """
    A second call: we show the question, final answer, and context,
    and ask the model to rate from 0.0 (no confidence) to 1.0 (absolute).
    Return that numeric score, or fallback to 1.0 if parsing fails.
    """

    system_message = (
        "You are a meticulous AI evaluator. You see the user's question, the final answer, "
        "and the same context (the 'Bible'). Rate how confident you are in the correctness "
        "and completeness of that final answer, from 0.0 to 1.0. If partially correct or missing data, "
        "lower the score. If the context fully supports it, raise it.\n\n"
        "Return valid JSON exactly like:\n"
        "{\n"
        '  "confidence_score": 0.82\n'
        "}"
    )

    user_message = (
        f"User Question: {query}\n\n"
        f"Final Answer: {final_answer}\n\n"
        "Relevant Context:\n"
        f"{context_str}\n\n"
        "Return your confidence_score in JSON, e.g. { \"confidence_score\": 0.85 }"
    )

    messages = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_message}
    ]

    try:
        result = get_json_output(model, messages, num_retries=2)
        if (
            isinstance(result, dict)
            and "confidence_score" in result
            and isinstance(result["confidence_score"], (float, int))
        ):
            score = float(result["confidence_score"])
            # clamp to [0.0, 1.0] just in case
            return max(0.0, min(1.0, score))
        # fallback
        return 1.0
    except Exception:
        return 1.0

###############################################################################
# Single Orchestrator: Q&A + Confidence
###############################################################################
def answer_question_with_confidence(query: str, top_k: int = 6, alpha: float = 0.6) -> Dict[str, Any]:
    """
    1) Call `answer_question_with_followups` to get main content in strict JSON.
    2) Re-run the same search or store context internally for the confidence step.
    3) Combine them into one final dict: { answer, follow_up_questions, citations, confidence_score }.
    """

    # Step A: get the main QA result
    qa_result = answer_question_with_followups(query, top_k, alpha)

    if qa_result.get("error"):
        return qa_result  # If there's an error in step A, pass it along

    final_answer = qa_result.get("answer", "")

    # Gather context again or store from step A
    # For clarity here, we re-run hybrid_search to pass to evaluate_confidence
    sr = hybrid_search(query, top_k=top_k, alpha=alpha)
    cb = []
    for (src, text, s) in sr:
        cb.append(f"--- [Source: {src}] ---\n{text}\n")
    combined_context = "\n".join(cb)

    # Step B: confidence
    confidence_score = evaluate_confidence(query, final_answer, combined_context)

    # Merge everything
    return {
        "answer": final_answer,
        "follow_up_questions": qa_result.get("follow_up_questions", []),
        "citations": qa_result.get("citations", []),
        "confidence_score": confidence_score,
        # If there's no 'error' from step A, we can omit or set None
        "error": qa_result.get("error")
    }

###############################################################################
# Example usage
###############################################################################
if __name__ == "__main__":
    user_query = "How to secure my cluster?"
    result = answer_question_with_confidence(user_query, top_k=10)
    print("===== Final JSON Output =====")
    print(json.dumps(result, indent=2))


===== Final JSON Output =====
{
  "answer": "To secure your Spark cluster, it's important to enable security features, which are not enabled by default. Evaluate your environment and the deployment type you are using, and take appropriate measures. Here are some key areas to consider:\n\n*   **Authentication:** Spark supports authentication for RPC channels using a shared secret. Enable it by setting the `spark.authenticate` configuration parameter. The mechanism to generate and distribute the secret is deployment-specific. For YARN, Spark handles this automatically, relying on YARN RPC encryption. For Kubernetes, Spark also generates a unique authentication secret for each application, propagated to executor pods using environment variables. You can also mount authentication secrets using files and Kubernetes secrets.\n*   **Long-Running Applications:** For long-running applications, use a keytab or ticket cache for automatic token renewal to avoid issues with maximum delegation token

In [53]:
print(result.get('answer'))

To secure your Spark cluster, it's important to enable security features, which are not enabled by default. Evaluate your environment and the deployment type you are using, and take appropriate measures. Here are some key areas to consider:

*   **Authentication:** Spark supports authentication for RPC channels using a shared secret. Enable it by setting the `spark.authenticate` configuration parameter. The mechanism to generate and distribute the secret is deployment-specific. For YARN, Spark handles this automatically, relying on YARN RPC encryption. For Kubernetes, Spark also generates a unique authentication secret for each application, propagated to executor pods using environment variables. You can also mount authentication secrets using files and Kubernetes secrets.
*   **Long-Running Applications:** For long-running applications, use a keytab or ticket cache for automatic token renewal to avoid issues with maximum delegation token lifetime. This is supported on YARN and Kuberne

In [57]:
# retrieval.py
import os
import numpy as np
from rank_bm25 import BM25Okapi
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
from sklearn.preprocessing import MinMaxScaler
from dotenv import load_dotenv

# Path to your ChromaDB folder, built by your indexing script
CHROMA_DB_PATH = "./chroma_db"

# Load your OpenAI API key from env (or .env), e.g. OPENAI_API_KEY
openai_key = os.getenv("OPENAI_API_KEY")
print(openai_key)
# Initialize embeddings
embedding_model = OpenAIEmbeddings(
    model="text-embedding-ada-002",
    openai_api_key=openai_key
)

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


In [55]:
from qa_service import answer_question_with_confidence

query = "What is Kafka used for?"
response = answer_question_with_confidence(query, top_k=5)

import json
print(json.dumps(response, indent=2))


{
  "answer": "Kafka is used as a replacement for traditional message brokers, for website activity tracking, metrics, log aggregation, stream processing, event sourcing, and as a commit log for distributed systems.",
  "follow_up_questions": [
    "How does Kafka compare to traditional messaging systems like ActiveMQ or RabbitMQ?",
    "What is Kafka Streams and how does it facilitate stream processing?",
    "How does Kafka's partitioning work?",
    "What are the advantages of using Kafka for log aggregation over systems like Scribe or Flume?",
    "How does Kafka ensure data durability?"
  ],
  "citations": [
    "kafka/uses.html"
  ],
  "confidence_score": 0.95
}


In [56]:
from retrieval import hybrid_search

query = "How does React handle state?"
results = hybrid_search(query, top_k=5)

for res in results:
    print(res)


('react/learn/adding-interactivity.md', 'title: Adding Interactivity\n\nSome things on the screen update in response to user input. For example, clicking an image gallery switches the active image. In React, data that changes over time is called state. You can add state to any component, and update it as needed. In this chapter, you\'ll learn how to write components that handle interactions, update their state, and display different output over time.\n\nHow to handle user-initiated events\n\nHow to make components "remember" information with state\n\nHow React updates the UI in two phases\n\nWhy state doesn\'t update right after you change it\n\nHow to queue multiple state updates\n\nHow to update an object in state\n\nHow to update an array in state\n\nResponding to events {/responding-to-events/}\n\nReact lets you add event handlers to your JSX. Event handlers are your own functions that will be triggered in response to user interactions like clicking, hovering, focusing on form inpu

In [7]:
import google.generativeai as palm

# 1. Configure your API key
palm.configure(api_key="AIzaSyC5FUT2l7ApdCB19sE2i_ZxWb11BJkwubY")

# 2. Generate text with the 'generate_text' method
#    You must specify a valid model name (e.g. "models/text-bison-001")
response = palm.generate_text(
    model="models/text-bison-001",
    prompt="What can you do?",
    temperature=0.2,
)

# 3. Print the generated text from `response.result`
print("Generated Text:", response.result)

# 4. (Optional) Inspect other response fields
#    e.g. response.filters, response.citations, response.safety_attributes, etc.
print("Full Response:", response)


AttributeError: module 'google.generativeai' has no attribute 'generate_text'

In [8]:
!pip install --upgrade langchain-google-genai


Defaulting to user installation because normal site-packages is not writeable
Collecting langchain-google-genai
  Downloading langchain_google_genai-2.1.0-py3-none-any.whl.metadata (3.6 kB)
Collecting google-ai-generativelanguage<0.7.0,>=0.6.16 (from langchain-google-genai)
  Downloading google_ai_generativelanguage-0.6.17-py3-none-any.whl.metadata (9.8 kB)
Downloading langchain_google_genai-2.1.0-py3-none-any.whl (40 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.2/40.2 kB[0m [31m2.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading google_ai_generativelanguage-0.6.17-py3-none-any.whl (1.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m35.3 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25hInstalling collected packages: google-ai-generativelanguage, langchain-google-genai
  Attempting uninstall: google-ai-generativelanguage
    Found existing installation: google-ai-generativelanguage 0.6.15
    Uninstalling google-ai-generativel

In [24]:
print("\n🔍 **Retrieved Chunks for LLM:**\n")
for idx, (doc, score) in enumerate(hybrid_results, 1):
    print(f"🔹 **Result {idx} (Score: {score:.4f}):**")
    print(f"📌 **Source:** {doc.metadata.get('source', 'Unknown')}")
    print(f"📝 **Chunk Content:**\n{doc.page_content[:500]}...")  # Display first 500 characters
    print("-" * 100)



🔍 **Retrieved Chunks for LLM:**



NameError: name 'hybrid_results' is not defined

In [6]:
query = "What is kafka?"
results = vector_db.similarity_search(query, k=10)

for i, doc in enumerate(results, 1):
    print(f"🔹 Result {i}:\n{doc.page_content}\n")


🔹 Result 1:
Documentation

Kafka Streams

🔹 Result 2:
Documentation

Kafka Streams

🔹 Result 3:
Documentation

Kafka Streams

🔹 Result 4:
Documentation

Kafka Streams

🔹 Result 5:
Documentation

Kafka Streams

🔹 Result 6:
Documentation

Kafka Streams

🔹 Result 7:
Documentation

Kafka Streams

🔹 Result 8:
Stream Processing

Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up processing. For example, a processing pipeline for recommending news articles might crawl article content from RSS feeds and publish it to an "articles" topic; further processing might normalize or deduplicate this content and publish the cleansed article content to a new topic; a final processing stage might attempt to recommend this content to users. Such processing pipelines create graphs of real-time data flows based on

In [7]:
query = "What is Kafka?"
results = vector_db.similarity_search_with_score(query, k=10)

for i, (doc, score) in enumerate(results, 1):
    citation = doc.metadata.get("source", "Unknown Source")
    print(f"🔹 Result {i} (Score: {score:.4f}):\n{doc.page_content}\n📌 Source: {citation}\n")


🔹 Result 1 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/core-concepts.html

🔹 Result 2 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/developer-guide/index.html

🔹 Result 3 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/index.html

🔹 Result 4 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/architecture.html

🔹 Result 5 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/tutorial.html

🔹 Result 6 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/upgrade-guide.html

🔹 Result 7 (Score: 0.2975):
Documentation

Kafka Streams
📌 Source: kafka/streams/quickstart.html

🔹 Result 8 (Score: 0.3138):
Stream Processing

Many users of Kafka process data in processing pipelines consisting of multiple stages, where raw input data is consumed from Kafka topics and then aggregated, enriched, or otherwise transformed into new topics for further consumption or follow-up process

🔹 Result 1:
Kafka itself

🔹 Result 2:
Apache Kafka is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Please read the Kafka documentation thoroughly before starting an integration using Spark.

At the moment, Spark requires Kafka 0.10 and higher. See Kafka 0.10 integration documentation for details.

🔹 Result 3:
Kafka Streams has a low barrier to entry: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka's parallelism model.

To learn more about Kafka Streams, visit the Kafka Streams page.

🔹 Result 4:
Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in