<a href="https://colab.research.google.com/github/bhu236/IST688Lab/blob/main/Lab9%3AAI_Memory_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# **Assignment: Long-Term Memory in AI — From RAG to Advanced Architectures**

**Objective:** This assignment will help you understand the concepts of memory in detail.

- Uses a tiny **SQuAD-style** embedded dataset (famous dataset family, small educational subset here to keep it offline).

> Complete code where marked **TODO**.



##  Setup & Mini SQuAD-Style Dataset

We embed a **small educational subset** inspired by the famous **SQuAD** question-answering dataset.  
Each record includes a short *context* paragraph, a *question*, and a *reference answer*.

We'll use this as our knowledge base for RAG and downstream memory demos.


In [1]:

# Standard libraries only; no internet access is required.
from typing import List, Dict, Tuple
import numpy as np
from pprint import pprint

# Tiny, self-contained SQuAD-style subset for class practice (educational use).
# Each item: {"id": str, "context": str, "question": str, "answer": str}
SQUAD_MINI = [
    {
        "id": "1",
        "context": "Kafka is a distributed streaming platform used for building real-time data pipelines and streaming apps. It is horizontally scalable and fault-tolerant.",
        "question": "What is Kafka used for?",
        "answer": "building real-time data pipelines and streaming apps"
    },
    {
        "id": "2",
        "context": "Prometheus is an open-source monitoring system with a dimensional data model, flexible query language, and built-in alerting.",
        "question": "What kind of system is Prometheus?",
        "answer": "an open-source monitoring system"
    },
    {
        "id": "3",
        "context": "R-trees are tree data structures used for spatial access methods, allowing for efficient querying of geographical data like rectangles or polygons.",
        "question": "What are R-trees used for?",
        "answer": "spatial access methods and efficient querying of geographical data"
    },
    {
        "id": "4",
        "context": "Elastic Weight Consolidation is a technique in continual learning that reduces catastrophic forgetting by selectively slowing down learning on weights important to previous tasks.",
        "question": "What problem does Elastic Weight Consolidation address?",
        "answer": "catastrophic forgetting"
    },
    {
        "id": "5",
        "context": "A knowledge graph represents entities and their relationships, enabling structured reasoning and linking between facts.",
        "question": "What does a knowledge graph represent?",
        "answer": "entities and their relationships"
    }
]

print("Loaded mini SQuAD-style dataset with", len(SQUAD_MINI), "examples.")
pprint(SQUAD_MINI[0])


Loaded mini SQuAD-style dataset with 5 examples.
{'answer': 'building real-time data pipelines and streaming apps',
 'context': 'Kafka is a distributed streaming platform used for building '
            'real-time data pipelines and streaming apps. It is horizontally '
            'scalable and fault-tolerant.',
 'id': '1',
 'question': 'What is Kafka used for?'}



## 1) RAG (Retrieval-Augmented Generation) with TF-IDF (Foundational)

In a typical RAG pipeline:
1. **Embed or featurize** documents (here: TF-IDF for simplicity).
2. **Retrieve** the most relevant chunks for a user query.
3. **Augment** the model prompt/logic with those top chunks to produce an answer.

We'll implement a *minimal* RAG retriever using `sklearn`'s `TfidfVectorizer` (works offline).


In [2]:

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Build a small corpus from the contexts.
corpus = [x["context"] for x in SQUAD_MINI]
ids = [x["id"] for x in SQUAD_MINI]

# Vectorize with TF-IDF (no internet or large models needed).
vectorizer = TfidfVectorizer(stop_words="english")
X = vectorizer.fit_transform(corpus)  # shape: (n_docs, n_terms)

def retrieve_contexts(query: str, k: int = 2) -> List[Dict]:
    """Return top-k contexts with cosine similarity scores."""
    q_vec = vectorizer.transform([query])
    sims = cosine_similarity(q_vec, X).ravel()  # similarity per document
    topk_idx = np.argsort(sims)[::-1][:k]
    results = []
    for idx in topk_idx:
        results.append({
            "id": ids[idx],
            "context": corpus[idx],
            "score": float(sims[idx])
        })
    return results

# Demo retrieval
demo = retrieve_contexts("How do I prevent catastrophic forgetting?", k=2)
demo


[{'id': '4',
  'context': 'Elastic Weight Consolidation is a technique in continual learning that reduces catastrophic forgetting by selectively slowing down learning on weights important to previous tasks.',
  'score': 0.33333333333333337},
 {'id': '5',
  'context': 'A knowledge graph represents entities and their relationships, enabling structured reasoning and linking between facts.',
  'score': 0.0}]


### **Exercise 1 (RAG):**
- **TODO:** Implement a function `rag_answer(query, k=2)` that:
  1) calls `retrieve_contexts(query, k)`,
  2) **builds a compact answer** by extracting the *most relevant phrase* (you can do simple heuristics like grabbing the sentence with highest overlap),
  3) returns both the answer string and the top contexts used.
- **Stretch:** Add a simple *citation* mechanism: include the `id` of the top-ranked context next to your answer.


In [3]:

# ========== YOUR TURN ==========
# TODO: Write rag_answer(query, k=2) below.
# Hints:
# - You can split context into sentences using simple `.split('.')`.
# - Score sentences by token overlap with the query (very simple, but OK for now).
# - Return {"answer": str, "used_contexts": List[Dict]}
def rag_answer(query: str, k: int = 2):
    raise NotImplementedError("Implement me")  # TODO

# Test (uncomment when implemented)
# print(rag_answer("What prevents forgetting in continual learning?", k=2))



## 2) Episodic Interaction Memory (Persistent Across Sessions)

We'll implement a minimal **episodic memory store** that keeps **user-specific** interactions.
- `remember_episode(user_id, text, topic)` writes to memory.
- `recall_episode(user_id, topic, k)` retrieves recent episodes relevant to a topic.

This lets an assistant *carry context forward* across sessions.


In [4]:

from collections import defaultdict, deque
from datetime import datetime

EPISODES = defaultdict(lambda: deque(maxlen=500))  # user_id -> deque of episodes

def remember_episode(user_id: str, text: str, topic: str):
    EPISODES[user_id].append({
        "ts": datetime.utcnow(),
        "topic": topic.lower(),
        "text": text
    })

def recall_episode(user_id: str, topic: str, k: int = 3):
    topic = topic.lower()
    matches = [e for e in reversed(EPISODES[user_id]) if e["topic"] == topic]
    if len(matches) < k:  # backfill with recent mixed topics
        backfill = [e for e in reversed(EPISODES[user_id]) if e not in matches]
        matches.extend(backfill)
    return matches[:k]

# Demo writes
remember_episode("u1", "Configured JMX for Kafka brokers", "observability")
remember_episode("u1", "Adjusted batch.size and linger.ms", "kafka-tuning")
remember_episode("u1", "Validated metrics in Prometheus", "observability")

recall_episode("u1", "observability", k=2)


  "ts": datetime.utcnow(),


[{'ts': datetime.datetime(2025, 11, 13, 16, 6, 12, 56524),
  'topic': 'observability',
  'text': 'Validated metrics in Prometheus'},
 {'ts': datetime.datetime(2025, 11, 13, 16, 6, 12, 56254),
  'topic': 'observability',
  'text': 'Configured JMX for Kafka brokers'}]


### **Exercise 2 (Episodic):**
- **TODO:** Create a helper `episodic_rag(user_id, query)` that:
  1) retrieves contexts via **RAG**, and
  2) **prepends** the top episodic memory snippet (if topic overlaps with the query).
- **Tip:** You may define topic as the **top 1 retrieved doc's id** or based on simple keyword rules.
- **Goal:** Show how persistent episodes can *steer* retrieval or answers.


In [5]:

# ========== YOUR TURN ==========
# TODO: Implement episodic_rag(user_id, query) returning a dict:
# {"answer": str, "episodic_used": str|None, "contexts": List[Dict]}
def episodic_rag(user_id: str, query: str):
    raise NotImplementedError("Implement me")  # TODO

# Test (after implementing)
# episodic_rag("u1", "How to expose Kafka metrics?" )



## 3) Memory-Augmented Reasoning (One Differentiable Read/Write)

We simulate a minimal **external memory** accessed by a neural model step.
- Content-based addressing (cosine attention) to **read**.
- Soft, differentiable **write** to attended slots.

> This is **not** full training code — just the memory operation primitive.


In [6]:

import torch
import torch.nn.functional as F

def memory_step(M: torch.Tensor, key: torch.Tensor, write_val: torch.Tensor, alpha: float = 0.2):
    """
    M: [N, D]  memory matrix
    key: [D]   query key
    write_val: [D] value to write
    returns: read_vec [D], updated memory [N, D]
    """
    k = F.normalize(key, dim=0)
    M_norm = F.normalize(M, dim=1)
    attn = F.softmax((M_norm @ k), dim=0)  # [N]
    read_vec = attn @ M                    # [D]
    write = attn.unsqueeze(1) * write_val.unsqueeze(0)  # [N, D]
    M_new = M * (1 - alpha * attn.unsqueeze(1)) + alpha * write
    return read_vec, M_new

# Demo
torch.manual_seed(0)
M = torch.randn(8, 32)
key = torch.randn(32)
val = torch.randn(32)
r, M2 = memory_step(M, key, val)
r.shape, torch.allclose(M, M2)


(torch.Size([32]), False)


### **Exercise 3 (Memory Step):**
- **TODO:** Add **erase** capability:
  - Implement an `erase_val` vector (shape `[D]`) and an `erase_rate` in `memory_step` that *reduces* memory content at attended locations before writing.
- **Question:** What happens if erase is too strong? Explain the trade-off.


In [7]:

# ========== YOUR TURN ==========
# TODO: Re-implement memory_step with erase gate; name it memory_step_erase(...)
def memory_step_erase(M: torch.Tensor, key: torch.Tensor, write_val: torch.Tensor, erase_val: torch.Tensor, alpha: float = 0.2, erase_rate: float = 0.2):
    raise NotImplementedError("Implement erase-enhanced memory step")  # TODO



## 4) Coordination via Memory (Blackboard Pattern)

Agents coordinate by **reading and writing** to a shared blackboard:
- `planner` posts steps.
- `executor` completes steps and updates status.
- `validator` checks outputs.

We simulate a tiny asynchronous loop.


In [8]:

BLACKBOARD = {"steps": [], "status": {}}

def post_step(step: str, by: str):
    BLACKBOARD["steps"].append({"step": step, "by": by, "done": False})

def execute_next(by: str):
    for item in BLACKBOARD["steps"]:
        if not item["done"]:
            # simulate work
            item["done"] = True
            BLACKBOARD["status"][item["step"]] = f"completed by {by}"
            return item["step"]
    return None

def validate_all(by: str):
    return all(s.endswith("completed by executor") for s in BLACKBOARD["status"].values())

# Demo
post_step("collect-metrics", "planner")
post_step("verify-alerts", "planner")
execute_next("executor")
execute_next("executor")
validate_all("validator"), BLACKBOARD


(True,
 {'steps': [{'step': 'collect-metrics', 'by': 'planner', 'done': True},
   {'step': 'verify-alerts', 'by': 'planner', 'done': True}],
  'status': {'collect-metrics': 'completed by executor',
   'verify-alerts': 'completed by executor'}})


### **Exercise 4 (Coordination):**
- **TODO:** Add a **moderation layer** that validates step names against a whitelist before posting (e.g., only `collect-metrics`, `verify-alerts` allowed).
- **Stretch:** Add a **priority** field so the executor picks highest priority first.


In [9]:

# ========== YOUR TURN ==========
ALLOWED_STEPS = {"collect-metrics", "verify-alerts"}

# TODO: Wrap post_step with moderation: post_step_safe(step, by) -> bool (accepted?)
def post_step_safe(step: str, by: str) -> bool:
    raise NotImplementedError("Implement moderation and optional priority logic")  # TODO



## 5) Federated Memory (Edge Summaries → Central Aggregation)

Each edge node keeps **local memory** and periodically shares **summaries** (not raw data) with a central aggregator.
We simulate:
- Per-node **local_stats**
- Summaries with **differential privacy noise** (simple Laplace) to protect details
- Central **aggregate** that fuses these summaries


In [10]:

rng = np.random.default_rng(42)

def laplace_noise(scale: float = 1.0):
    # Simple Laplace noise for DP-like behavior (educational only)
    u = rng.uniform(-0.5, 0.5)
    return -scale * np.sign(u) * np.log(1 - 2 * abs(u))

class EdgeNode:
    def __init__(self, name):
        self.name = name
        self.local_stats = {"queries": 0, "success": 0}
    def observe(self, ok: bool):
        self.local_stats["queries"] += 1
        self.local_stats["success"] += int(ok)
    def summarize(self, noise_scale: float = 0.5):
        # share noisy rate only
        q = max(1, self.local_stats["queries"])  # avoid div by zero
        rate = self.local_stats["success"]/q + laplace_noise(noise_scale)
        return {"node": self.name, "success_rate": float(rate)}

def central_aggregate(summaries: List[Dict]):
    # clamp and average
    rates = [min(1.0, max(0.0, s["success_rate"])) for s in summaries]
    return {"global_success_rate": float(np.mean(rates)), "n": len(rates)}

# Demo
e1, e2 = EdgeNode("edge-a"), EdgeNode("edge-b")
for _ in range(10): e1.observe(ok=True)
for _ in range(10): e2.observe(ok=(rng.random() > 0.3))
summ = [e1.summarize(), e2.summarize()]
central_aggregate(summ)


{'global_success_rate': 0.9252623517160539, 'n': 2}


### **Exercise 5 (Federated):**
- **TODO:** Extend `central_aggregate` to also compute a **weighted** global rate using the **true query counts** privately shared as integers (no noise on counts).  
- **Discuss:** Trade-offs between noisy metrics vs. exact counts in privacy-preserving systems.


In [11]:

# ========== YOUR TURN ==========
# TODO: Modify summarize() to also return raw count, and central_aggregate() to compute weighted rate.
# You may create new functions summarize_with_count() and central_aggregate_weighted(...)
def summarize_with_count(node: EdgeNode, noise_scale: float = 0.5):
    raise NotImplementedError("Return dict with noisy rate and raw count")  # TODO

def central_aggregate_weighted(summaries: List[Dict]):
    raise NotImplementedError("Compute weighted success rate using counts")  # TODO



## 6) Putting It Together (Mini Workflow)

We combine:
- RAG retrieval
- Episodic prime
- Consolidation events
- Telemetry

Run this end-to-end to see interactions.



### **Exercise 6 (Integration):**
- **TODO:** Modify `mini_assistant` to:
  - Use your `rag_answer` from Exercise 1 instead of returning raw context.
  - Include a citation (top context `id`) in the final string.
  - Log a simple **trace record** that lists which components were used (episodic, rag, consolidation).
- **Reflection:** What additional safeguards or governance would you add for a production system?


In [12]:

def mini_assistant(user_id: str, query: str):
    # 1) episodic prime
    episodes = recall_episode(user_id, topic="observability", k=1)  # as an example
    epi_text = episodes[0]["text"] if episodes else None

    # 2) RAG
    contexts = retrieve_contexts(query, k=2)

    # 3) naive answer (just return top context for demo)
    answer = contexts[0]["context"] if contexts else "No context found."

    # 4) consolidate a synthetic key for demonstration
    put_item("last_query_topic", {"query": query, "contexts": [c["id"] for c in contexts]})
    _ = get_item("last_query_topic")
    consolidate_memory()

    return {"episodic": epi_text, "answer": answer, "contexts": contexts}

# Demo
mini_assistant("u1", "How do I expose Kafka metrics to Prometheus?")


NameError: name 'put_item' is not defined

### **Exercise 7 (Review questions):**

•	How does pruning affect personalization?

•	When should user consent matter most for stored facts? Why?

•	What are the risks if an AI agent never forgets anything?


_Write your answers here_