In [2]:
import asyncio
import websockets
import redis
import os


# LangChain, FAISS, and text splitter imports
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings  # Using this as a placeholder for Gemini embedding.
from langchain.vectorstores import FAISS

# Google Generative AI (Gemini) import
import google.generativeai as genai


  from .autonotebook import tqdm as notebook_tqdm


In [3]:
import nest_asyncio

nest_asyncio.apply()

In [4]:
# Configure your API key for Gemini
os.environ['GOOGLE_API_KEY'] = "AIzaSyCPjR5u_-p8mqcjyujpLazP7TTwkr521eY"  # Replace with your actual API key.
genai.configure(api_key=os.environ['GOOGLE_API_KEY'])

In [20]:
# Connect to Redis (adjust host/port if needed)
redis_client = redis.Redis(host='localhost', port=6379, db=0)

def fetch_f1_data():
    # Try Redis first
    data = redis_client.get("f1:current")

    if data:
        print("Fetched data from Redis")
        return data

    # If Redis has no data, fallback to external API
    try:
        response = requests.get("https://f1connectapi.vercel.app/api/current/fp1?limit=5")
        if response.status_code == 200:
            data = response.text
            redis_client.set("f1:current", data, ex=300)  # cache for 5 min
            print("Fetched data from API and cached in Redis")
            return data
        else:
            print("API error:", response.status_code)
            return "Error fetching F1 data."
    except Exception as e:
        print("Exception while calling API:", e)
        return "API call failed."


In [6]:
def process_data_with_langchain(data):
    """
    Use LangChain’s RecursiveCharacterTextSplitter to divide the raw F1 data
    into manageable chunks for later embedding.
    """
    splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=50)
    chunks = splitter.split_text(data)
    return chunks


In [7]:
def store_embeddings_in_faiss(chunks):
    """
    Create a FAISS index from text chunks. We use HuggingFaceEmbeddings here as a placeholder.
    In your actual implementation, replace this with a Gemini 2.0 Flash embedding call if available.
    """
    # Instantiate an embeddings model (placeholder—replace if you have a Gemini embedding API)
    hf_embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
    
    # Build the FAISS vector store from the chunks
    vector_store = FAISS.from_texts(chunks, hf_embeddings)
    
    # Optionally, save the index locally for reuse
    vector_store.save_local("faiss_index")
    return vector_store


In [8]:
def query_rag(vector_store, question):
    """
    Given a user query, perform a similarity search on the FAISS index and use the Gemini model to generate an answer.
    """
    # Retrieve top 3 most relevant document chunks
    retrieved_docs = vector_store.similarity_search(question, k=3)
    
    # Combine the retrieved documents’ content into a single context string
    context = "\n".join([doc.page_content for doc in retrieved_docs])
    
    # Build a structured prompt
    prompt = (
        "Answer this F1-related question in one or two sentences. Use only the context provided.\n\n"
        f"Context: {context}\n\n"
        f"Question: {question}\n\n"
        "Answer:"
    )
    
    # Use Gemini 2.0 Flash generative model to generate the response
    model = genai.GenerativeModel("gemini-2.0-flash")
    response = model.generate_content(prompt)
    
    if response.text:
        return response.text.strip()
    else:
        return "I'm sorry, I could not generate an answer at this time."


In [None]:
async def handler(websocket, path):
    """
    WebSocket handler that:
      1. Receives a query from the client.
      2. Fetches the latest F1 data from Redis.
      3. Processes the data into chunks.
      4. Builds/updates the FAISS index.
      5. Retrieves context and generates an answer.
      6. Sends the answer back to the client.
    """
    while True:
        try:
            # Step 6.1: Receive a user query via WebSocket
            user_query = await websocket.recv()
            print(f"Received query: {user_query}")

            # Step 6.2: Fetch current F1 data from Redis
            f1_data = fetch_f1_data()
            
            # Step 6.3: Process data into text chunks
            chunks = process_data_with_langchain(f1_data)
            
            # Step 6.4: Build a FAISS vector store from these chunks
            vector_store = store_embeddings_in_faiss(chunks)
            
            # Step 6.5: Query the RAG retriever and generate an answer
            answer = query_rag(vector_store, user_query)
            print(f"Generated answer: {answer}")
            
            # Step 6.6: Send the answer back via WebSocket
            await websocket.send(answer)
        except websockets.ConnectionClosed:
            print("Connection closed by client")
            break

def run_websocket_server():
    """
    Start the WebSocket server on localhost, port 5050.
    """
    server = websockets.serve(handler, "localhost",5050)
    print("WebSocket server started on ws://localhost:5050")
    asyncio.get_event_loop().run_until_complete(server)
    asyncio.get_event_loop().run_forever()

if __name__ == "__main__":
    run_websocket_server()


WebSocket server started on ws://localhost:5000


OSError: [Errno 10048] error while attempting to bind on address ('127.0.0.1', 5000): [winerror 10048] only one usage of each socket address (protocol/network address/port) is normally permitted

In [22]:
data = fetch_f1_data()
print(data)


Exception while calling API: name 'requests' is not defined
API call failed.
