Multi-user RAG pipeline with archive support using FAISS + OpenAI embeddings

In [None]:
!pip install openai faiss-cpu numpy json5

In [1]:
import faiss
import numpy as np

EMBED_MODEL = "text-embedding-ada-002"
EMBED_DIM = 1536

# Global dictionary to store user-specific FAISS indices
user_indexes = {}


FAISS

In [2]:
def get_user_index(user_id: str):
    """
    Returns (or creates) a FAISS index & stored_facts list for the given user_id.
    """
    if user_id not in user_indexes:
        user_indexes[user_id] = {
            "index": faiss.IndexFlatL2(EMBED_DIM),
            "stored_facts": []  # List of tuples: (fact_text, fact_id)
        }
    return user_indexes[user_id]

def embed_text(text: str) -> np.ndarray:
    """
    Calls OpenAI to embed text and returns a numpy array.
    """
    response = client.embeddings.create(
        model=EMBED_MODEL,
        input=[text]  
    )
    emb = response.data[0].embedding
    return np.array(emb, dtype=np.float32)


Process Memory Updates with time

In [3]:
import time 

def process_memory_updates(user_id: str, updated_mem: dict):
    """
    Processes memory updates and assigns timestamps to new facts.
    """
    if "memories" not in updated_mem:
        return

    current_time = time.time()  # Timestamp for new session

    for mem_item in updated_mem["memories"]:
        evt = mem_item.get("event", "")
        cat = mem_item.get("category", "")
        content = mem_item.get("content", "")
        fact_id = mem_item.get("id", "")
        fact_text = f"{cat}: {content}"

        if evt in ("ADD", "UPDATE"):
            add_fact(user_id, fact_text, fact_id, current_time) 

def add_fact(user_id: str, fact_text: str, fact_id: str, timestamp: float):
    """
    Adds a timestamp when storing facts in FAISS for tracking recent sessions.
    """
    user_data = get_user_index(user_id)
    vec = embed_text(fact_text).reshape(1, -1)
    user_data["index"].add(vec)
    user_data["stored_facts"].append((fact_text, fact_id, timestamp))  


Retrieve Long-Term Facts

In [4]:
def retrieve_long_term_facts(user_id: str):
    """
    Retrieves ALL stored facts, ensures no duplicates, and prioritizes the most recent session.
    """
    user_data = get_user_index(user_id)
    if user_data["index"].ntotal == 0:
        return {}

    # Retrieve ALL stored facts sorted by newest first
    indexed_facts = sorted(user_data["stored_facts"], key=lambda x: x[2], reverse=True)  

    categorized_facts = {"Recent Session": [], "Long-Term": []}
    seen_facts = set()  # unique

    # Assign most recent session to 'local_evidence'
    if indexed_facts:
        recent_fact = indexed_facts[0][0]
        if recent_fact not in seen_facts:
            categorized_facts["Recent Session"].append(recent_fact)
            seen_facts.add(recent_fact)

    # Assign all other to 'long_term_evidence' 
    for fact_text, fact_id, timestamp in indexed_facts[1:]:  
        if fact_text not in seen_facts:
            categorized_facts["Long-Term"].append(fact_text)
            seen_facts.add(fact_text)

    return categorized_facts


Generate GPT Response Based on Multi-Session Memory

In [5]:
from openai import OpenAI
import json

client = OpenAI(api_key="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")  # Replace with your actual key

def generate_long_term_json_response(user_id: str, user_query: str):
    """
    Generates a structured long-term memory response using OpenAI API.
    """
    retrieved_facts = retrieve_long_term_facts(user_id)  # Retrieves ALL stored facts

    local_evidence = retrieved_facts.get("Recent Session", [])
    long_term_evidence = retrieved_facts.get("Long-Term", [])

    json_template = {
        "question": user_query,
        "local_evidence": local_evidence,
        "long_term_evidence": long_term_evidence,
        "final_answer": "..."
    }

    rag_prompt = f"""
    You are an AI counselor summarizing user experiences across multiple conversations.
    
    ## Local Evidence (most recent session, prioritize this)
    {json.dumps(local_evidence, indent=4)}

    ## Long-Term Evidence (all past sessions, do not ignore)
    {json.dumps(long_term_evidence, indent=4)}

    **INSTRUCTIONS:**
    1️⃣ **You MUST consider all evidence from past sessions, but prioritize recent session first.**  
    2️⃣ **Do NOT say 'evidence unavailable' if past conversations exist.**  
    3️⃣ **Answer in a structured and concise manner.**  

    Now, use the retrieved facts to answer: "{user_query}".

    **STRICT OUTPUT FORMAT:**
    ```json
    {json.dumps(json_template, indent=4)}
    ```
    **Do NOT include explanations, preambles, or extra text.**
    """

    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "system", "content": rag_prompt}],
            response_format={"type": "json_object"},  
            max_tokens=1024,
            temperature=0.7
        )

        # Extract JSON content from GPT response
        structured_output = response.choices[0].message.content  

        # Convert from string to JSON object
        return json.loads(structured_output)

    except json.JSONDecodeError:
        print("❌ ERROR: GPT failed to return valid JSON!")
        return None  


Demo Test for Long-Term Memory

In [7]:
def simulate_real_conversations():
    user_id = "user_7"

    # First session (conversation)
    session_1 = {"memories": [
        {"id": "uuid-1", "content": "Maya, 2nd-year Biochemistry student", "category": "Demographics", "event": "ADD"},
        {"id": "uuid-2", "content": "Feels anxious about grades and self-worth", "category": "Mental State", "event": "ADD"}
    ]}
    process_memory_updates(user_id, session_1)
    print("[Session 1 Processed]")

    # Second 
    session_2 = {"memories": [
        {"id": "uuid-3", "content": "Parents expect her to go to medical school, but she is unsure", "category": "Relationships", "event": "ADD"}
    ]}
    process_memory_updates(user_id, session_2)
    print("[Session 2 Processed]")

    # Third 
    session_3 = {"memories": [
        {"id": "uuid-4", "content": "Compares herself to her cousin who is excelling in medicine", "category": "Relationships", "event": "ADD"}
    ]}
    process_memory_updates(user_id, session_3)
    print("[Session 3 Processed]")

    # latest
    session_4 = {"memories": [
        {"id": "uuid-5", "content": "She started writing down her achievements to feel more grounded", "category": "Behavioral Patterns", "event": "ADD"}
    ]}
    process_memory_updates(user_id, session_4)
    print("[Session 4 Processed]")

    # Ask a long-term memory question
    long_term_question = "How did Maya's feelings of inadequacy manifest before she sought support, and what specific comparison prompted her to feel this way?"

    # Retrieve JSON output
    final_output = generate_long_term_json_response(user_id, long_term_question)


    print("[FINAL LONG-TERM MEMORY RESPONSE]:\n")
    print(json.dumps(final_output, indent=4))

simulate_real_conversations()


[Session 1 Processed]
[Session 2 Processed]
[Session 3 Processed]
[Session 4 Processed]
[FINAL LONG-TERM MEMORY RESPONSE]:

{
    "question": "How did Maya's feelings of inadequacy manifest before she sought support, and what specific comparison prompted her to feel this way?",
    "local_evidence": [
        "Behavioral Patterns: She started writing down her achievements to feel more grounded"
    ],
    "long_term_evidence": [
        "Relationships: Compares herself to her cousin who is excelling in medicine",
        "Relationships: Parents expect her to go to medical school, but she is unsure",
        "Demographics: Maya, 2nd-year Biochemistry student",
        "Mental State: Feels anxious about grades and self-worth"
    ],
    "final_answer": "Maya's feelings of inadequacy manifested in her anxiety about grades and self-worth, leading her to start writing down her achievements to feel more grounded. The specific comparison that prompted her feelings of inadequacy was her cousin

Memory Extraction (ensure json format/without updating)

In [8]:
def extract_memories_from_conversation(conversation: str, session_name: str, user_id: str):
    """
    Extracts structured memory facts from a conversation session and immediately stores them in FAISS.
    """
    extraction_prompt = f"""
    You are an AI assistant that extracts structured memory facts from conversations.
    
    ## Conversation Transcript:
    {conversation}

    ## Task:
    Extract facts about the student using these categories:
    
    1️⃣ **Demographics**: Name, age, major, university year, cultural background  
    2️⃣ **Relationships**: Family dynamics, romantic status, peer conflicts, social support  
    3️⃣ **Mental State**: Persistent emotions (anxiety, loneliness), self-perception, stress triggers  
    4️⃣ **Academic Context**: Course workload, GPA pressure, career aspirations  
    5️⃣ **Behavioral Patterns**: Sleep habits, coping mechanisms, health routines  
    6️⃣ **Life Events**: Transitions, trauma, upcoming challenges  
    7️⃣ **Resources**: Support systems, positive hobbies, personal strengths  

    ## STRICT OUTPUT FORMAT:
    ```json
    {{
        "session": "{session_name}",
        "memories": [
            {{"id": "uuid-1", "content": "...", "category": "Demographics", "event": "ADD"}},
            {{"id": "uuid-2", "content": "...", "category": "Mental State", "event": "ADD"}}
        ]
    }}
    ```

    **DO NOT INCLUDE explanations, preambles, or extra text.**
    """

    try:
        response = client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "system", "content": extraction_prompt}],
            response_format={"type": "json_object"},  
            max_tokens=512,
            temperature=0.5
        )

        extracted_memory = response.choices[0].message.content
        extracted_memory_json = json.loads(extracted_memory)  # Convert JSON string to Python dict

        # Send extracted memory directly to FAISS
        process_memory_updates(user_id, extracted_memory_json)

        print(f"[Memory Stored] Extracted {len(extracted_memory_json['memories'])} facts for {user_id}")

        return extracted_memory_json  

    except json.JSONDecodeError:
        print(f"❌ ERROR: GPT failed to return valid JSON for {session_name}!")
        return None  


Test

In [9]:
test_conversation = """
Student: Hi, I'm Alex, a 3rd-year Computer Science student. I’ve been feeling really anxious about my workload.
Counselor: That sounds tough, Alex. What’s making it feel overwhelming?
Student: I have two major projects due next week, and I’m struggling to stay focused.
Counselor: That’s a lot to handle. Have you found any strategies that help?
Student: Not really. I tend to procrastinate, which makes it worse.
Counselor: That makes sense. Have external pressures contributed to this stress?
Student: Yeah, my parents expect me to maintain a high GPA, and I don’t want to let them down.
"""

# Extract & store memory for Alex
user_id = "user_alex"
extracted_memories = extract_memories_from_conversation(test_conversation, "Session_Alex_1", user_id)


[Memory Stored] Extracted 5 facts for user_alex


In [10]:
# Retrieve stored facts for Alex
retrieved_facts = retrieve_long_term_facts(user_id)

print("[Retrieved Memory for Alex]:\n")
print(json.dumps(retrieved_facts, indent=4))


[Retrieved Memory for Alex]:

{
    "Recent Session": [
        "Demographics: Name: Alex, Major: Computer Science, University Year: 3rd year"
    ],
    "Long-Term": [
        "Mental State: Feeling anxious about workload",
        "Behavioral Patterns: Struggling to stay focused and tends to procrastinate",
        "Academic Context: Two major projects due next week",
        "Relationships: Parents expect a high GPA"
    ]
}


In [11]:
long_term_question = "How has Alex’s academic stress evolved over time, and what factors contributed to it?"

final_output = generate_long_term_json_response(user_id, long_term_question)

print("[FINAL LONG-TERM MEMORY RESPONSE]:\n")
print(json.dumps(final_output, indent=4))


[FINAL LONG-TERM MEMORY RESPONSE]:

{
    "question": "How has Alex\u0019s academic stress evolved over time, and what factors contributed to it?",
    "local_evidence": [
        "Demographics: Name: Alex, Major: Computer Science, University Year: 3rd year"
    ],
    "long_term_evidence": [
        "Mental State: Feeling anxious about workload",
        "Behavioral Patterns: Struggling to stay focused and tends to procrastinate",
        "Academic Context: Two major projects due next week",
        "Relationships: Parents expect a high GPA"
    ],
    "final_answer": "Alex's academic stress has been influenced by a combination of workload anxiety, difficulty in maintaining focus, and a tendency to procrastinate. The stress is exacerbated by the immediate pressure of two major projects due next week and the high GPA expectations from parents."
}
