# Minimal RAG / Vector Search and Security Data Integration

This notebook demonstrates:

1) A minimal Retrieval-Augmented Generation (RAG) pipeline using a from-scratch TF-IDF index and cosine similarity over a small payments corpus.

2) A security data integration workflow:
   - Load and analyze synthetic authentication logs.
   - Detect suspicious behavior.
   - Use the same vector search index to retrieve relevant runbook steps.
   - Compose a response using retrieved context (RAG-style).


In [1]:
# Minimal, dependency-light setup
import re
import math
import json
from typing import List, Dict, Tuple
from collections import Counter, defaultdict
from datetime import datetime, timedelta

import numpy as np
import pandas as pd

np.set_printoptions(suppress=True, precision=4)

def tokenize(text: str) -> List[str]:
    TOKEN_RE = re.compile(r"[A-Za-z0-9_]+")
    return [t.lower() for t in TOKEN_RE.findall(text or "")]

def sentence_split(text: str) -> List[str]:
    parts = re.split(r'(?<=[.!?])\s+', (text or "").strip())
    return [p.strip() for p in parts if p.strip()]

print("Environment ready.")


Environment ready.


## Part A: Payments Knowledge Base

We synthesize a small corpus about payments platforms, chunk it by sentence, and build a TF-IDF vector index from scratch for retrieval.


In [2]:
docs = [
    {
        "id": "doc1",
        "title": "Payments Gateway Basics",
        "text": (
            "A payments gateway authorizes, captures, and settles transactions. "
            "It communicates with acquiring banks, card networks, and issuing banks. "
            "Key flows include authorization, capture, refund, and void. "
            "PCI compliance and tokenization ensure that sensitive card data is never directly stored."
        ),
    },
    {
        "id": "doc2",
        "title": "Fraud and Risk",
        "text": (
            "Fraud detection in payments involves device fingerprinting, velocity checks, and risk scoring. "
            "Rules can block transactions based on geolocation, BIN ranges, or mismatched AVS and CVV. "
            "Machine learning models can be layered on top of rules to reduce false positives."
        ),
    },
    {
        "id": "doc3",
        "title": "Ledger and Reconciliation",
        "text": (
            "A double-entry ledger records debits and credits for all money movements. "
            "Reconciliation aligns processor reports with internal records to detect discrepancies. "
            "Payouts aggregate settled funds and transfer balances to merchant bank accounts on a schedule."
        ),
    },
    {
        "id": "doc4",
        "title": "Checkout UX and Tokenization",
        "text": (
            "Hosted fields and iframes isolate card inputs to reduce PCI scope. "
            "Tokenization replaces primary account numbers with opaque tokens. "
            "A smooth checkout can improve conversion, support digital wallets, and provide client-side validation."
        ),
    },
    {
        "id": "doc5",
        "title": "Fees and FX",
        "text": (
            "Payment processing revenue commonly uses a percent plus fixed fee per transaction. "
            "Cross-border payments may involve foreign exchange spreads and scheme fees. "
            "Providers can also monetize value-added services like chargeback protection and analytics."
        ),
    },
]

chunks = []
for d in docs:
    for i, sent in enumerate(sentence_split(d["text"])):
        chunks.append({
            "doc_id": d["id"],
            "title": d["title"],
            "chunk_id": f"{d['id']}_c{i+1}",
            "text": sent
        })

len(chunks), chunks[:3]


(16,
 [{'doc_id': 'doc1',
   'title': 'Payments Gateway Basics',
   'chunk_id': 'doc1_c1',
   'text': 'A payments gateway authorizes, captures, and settles transactions.'},
  {'doc_id': 'doc1',
   'title': 'Payments Gateway Basics',
   'chunk_id': 'doc1_c2',
   'text': 'It communicates with acquiring banks, card networks, and issuing banks.'},
  {'doc_id': 'doc1',
   'title': 'Payments Gateway Basics',
   'chunk_id': 'doc1_c3',
   'text': 'Key flows include authorization, capture, refund, and void.'}])

### Build a from-scratch TF-IDF index

We compute:
- Vocabulary over all chunk texts
- Term frequencies per chunk
- Document frequencies and IDF
- TF-IDF matrix with L2 normalization


In [3]:
# Build vocabulary
vocab: Dict[str, int] = {}
doc_tokens: List[List[str]] = []
for ch in chunks:
    toks = tokenize(ch["text"])
    doc_tokens.append(toks)
    for t in toks:
        if t not in vocab:
            vocab[t] = len(vocab)

V = len(vocab)
D = len(chunks)

# Term frequency matrix
tf = np.zeros((D, V), dtype=float)
for i, toks in enumerate(doc_tokens):
    for t in toks:
        tf[i, vocab[t]] += 1.0

# Document frequency and IDF (smoothed)
df = np.count_nonzero(tf > 0, axis=0)
idf = np.log((1 + D) / (1 + df)) + 1.0

# TF-IDF with L2 normalization
tfidf = tf * idf
norms = np.linalg.norm(tfidf, axis=1, keepdims=True)
norms[norms == 0] = 1.0
tfidf = tfidf / norms

print(f"Vocabulary size: {V}")
print(f"Chunks indexed:  {D}")


Vocabulary size: 147
Chunks indexed:  16


### Query, retrieve top-k chunks, and compose a simple RAG-style answer

We compute a query TF-IDF vector in the same space, cosine similarities, and return the top-k chunks.  
We then produce a naive answer by concatenating the most relevant chunk texts as context.


In [4]:
def vectorize_query(query: str) -> np.ndarray:
    q_tf = np.zeros((V,), dtype=float)
    for tok in tokenize(query):
        if tok in vocab:
            q_tf[vocab[tok]] += 1.0
    q_vec = q_tf * idf
    n = np.linalg.norm(q_vec)
    return q_vec / (n if n else 1.0)

def retrieve(query: str, k: int = 4) -> List[Tuple[float, Dict]]:
    q = vectorize_query(query)
    sims = tfidf @ q
    idx = np.argsort(-sims)[:k]
    return [(float(sims[i]), chunks[i]) for i in idx]

def rag_answer(query: str, k: int = 4) -> Dict:
    hits = retrieve(query, k=k)
    context = "\n".join([f"- {c['text']}" for _, c in hits])
    answer = (
        f"Query: {query}\n\n"
        f"Relevant context:\n{context}\n\n"
        f"Naive answer:\n"
        f"Based on the retrieved context, the system supports key payment flows such as authorization, capture, refund, and void, "
        f"with PCI scope reduction via hosted fields and tokenization. Fraud controls involve rules and optional machine learning. "
        f"A double-entry ledger and reconciliation align money movements with processor reports, while fees typically combine a "
        f"percent plus fixed component; cross-border FX introduces additional spreads."
    )
    return {"hits": hits, "answer": answer}

# Demo
demo = rag_answer("How does tokenization help PCI scope and what are common gateway flows?")
print(demo["answer"])


Query: How does tokenization help PCI scope and what are common gateway flows?

Relevant context:
- Hosted fields and iframes isolate card inputs to reduce PCI scope.
- PCI compliance and tokenization ensure that sensitive card data is never directly stored.
- A payments gateway authorizes, captures, and settles transactions.
- Key flows include authorization, capture, refund, and void.

Naive answer:
Based on the retrieved context, the system supports key payment flows such as authorization, capture, refund, and void, with PCI scope reduction via hosted fields and tokenization. Fraud controls involve rules and optional machine learning. A double-entry ledger and reconciliation align money movements with processor reports, while fees typically combine a percent plus fixed component; cross-border FX introduces additional spreads.


## Part B: Security Data Integration

We create a synthetic authentication log dataset, detect suspicious activity, and then use the vector index over a small set of security runbooks to assemble response guidance with a RAG-style approach.


In [5]:
rng = np.random.default_rng(42)

users = ["alice", "bob", "carol", "dave"]
base_time = datetime(2025, 10, 21, 9, 0, 0)

def random_ip():
    return f"203.0.113.{rng.integers(1, 255)}"

rows = []
# Normal activity
for hr in range(0, 4):  # 4 hours
    t = base_time + timedelta(hours=hr)
    for _ in range(50):
        rows.append({
            "timestamp": t + timedelta(minutes=int(rng.integers(0, 60))),
            "user": rng.choice(users),
            "ip": random_ip(),
            "event": "auth_attempt",
            "status": "success" if rng.random() > 0.15 else "failure"
        })

# Suspicious burst from a single IP
bad_ip = "198.51.100.77"
for m in range(30):
    rows.append({
        "timestamp": base_time + timedelta(hours=2, minutes=m),
        "user": rng.choice(users),
        "ip": bad_ip,
        "event": "auth_attempt",
        "status": "failure"
    })

logs = pd.DataFrame(rows).sort_values("timestamp").reset_index(drop=True)
logs.head(10)


Unnamed: 0,timestamp,user,ip,event,status
0,2025-10-21 09:00:00,dave,203.0.113.169,auth_attempt,success
1,2025-10-21 09:05:00,dave,203.0.113.167,auth_attempt,success
2,2025-10-21 09:06:00,alice,203.0.113.196,auth_attempt,success
3,2025-10-21 09:07:00,bob,203.0.113.179,auth_attempt,success
4,2025-10-21 09:07:00,bob,203.0.113.58,auth_attempt,success
5,2025-10-21 09:08:00,carol,203.0.113.25,auth_attempt,failure
6,2025-10-21 09:09:00,dave,203.0.113.178,auth_attempt,success
7,2025-10-21 09:12:00,dave,203.0.113.148,auth_attempt,success
8,2025-10-21 09:15:00,carol,203.0.113.201,auth_attempt,success
9,2025-10-21 09:16:00,dave,203.0.113.142,auth_attempt,success


In [6]:
# Aggregate failures by IP and user
failures = logs[logs["status"] == "failure"]
by_ip = failures.groupby("ip").size().sort_values(ascending=False).rename("failures")
by_user = failures.groupby("user").size().sort_values(ascending=False).rename("failures")

# Simple anomaly via z-score on failures per IP
ip_counts = by_ip.to_frame()
mu = ip_counts["failures"].mean()
sigma = ip_counts["failures"].std(ddof=1) if len(ip_counts) > 1 else 0.0
ip_counts["zscore"] = 0.0 if sigma == 0 else (ip_counts["failures"] - mu) / sigma

suspect_ips = ip_counts[ip_counts["zscore"] >= 2.0].sort_values("zscore", ascending=False)

print("Top failure IPs:")
display(ip_counts.sort_values("failures", ascending=False).head(10))

print("\nSuspect IPs (zscore >= 2):")
display(suspect_ips)

print("\nFailures by user:")
display(by_user)


Top failure IPs:


Unnamed: 0_level_0,failures,zscore
ip,Unnamed: 1_level_1,Unnamed: 2_level_1
198.51.100.77,30,6.002193
203.0.113.105,1,-0.162221
203.0.113.106,1,-0.162221
203.0.113.112,1,-0.162221
203.0.113.134,1,-0.162221
203.0.113.141,1,-0.162221
203.0.113.150,1,-0.162221
203.0.113.155,1,-0.162221
203.0.113.162,1,-0.162221
203.0.113.174,1,-0.162221



Suspect IPs (zscore >= 2):


Unnamed: 0_level_0,failures,zscore
ip,Unnamed: 1_level_1,Unnamed: 2_level_1
198.51.100.77,30,6.002193



Failures by user:


user
alice    18
carol    18
dave     17
bob      14
Name: failures, dtype: int64

### Security runbooks corpus

We define a small corpus of security runbooks and guidance, then reuse the same TF-IDF machinery to retrieve relevant steps for the detected incident.


In [7]:
security_docs = [
    {
        "id": "sec1",
        "title": "Credential Stuffing Runbook",
        "text": (
            "Detect bursts of authentication failures from a single IP, especially across multiple accounts. "
            "Mitigations include enabling rate limiting, enforcing CAPTCHA, blocking abusive IPs, and requiring MFA enrollment. "
            "Review access logs and notify impacted users to reset passwords if necessary."
        ),
    },
    {
        "id": "sec2",
        "title": "Brute Force Login Runbook",
        "text": (
            "Repeated login failures for one or more accounts could indicate brute force attempts. "
            "Apply progressive account lockouts, IP reputation checks, and throttle authentication endpoints. "
            "Correlate unusual login times and geolocations."
        ),
    },
    {
        "id": "sec3",
        "title": "Account Takeover Triage",
        "text": (
            "If an account shows anomalous access patterns, reset session tokens and require MFA. "
            "Validate user identity, examine device fingerprints, and check for password reuse. "
            "Search for indicators of compromise related to the source IP or ASN."
        ),
    },
]

security_chunks = []
for d in security_docs:
    for i, sent in enumerate(sentence_split(d["text"])):
        security_chunks.append({
            "doc_id": d["id"],
            "title": d["title"],
            "chunk_id": f"{d['id']}_c{i+1}",
            "text": sent
        })

# Build a separate TF-IDF index for security runbooks (kept independent for clarity)
sec_vocab: Dict[str, int] = {}
sec_tokens: List[List[str]] = []
for ch in security_chunks:
    toks = tokenize(ch["text"])
    sec_tokens.append(toks)
    for t in toks:
        if t not in sec_vocab:
            sec_vocab[t] = len(sec_vocab)

SV = len(sec_vocab)
SD = len(security_chunks)

sec_tf = np.zeros((SD, SV), dtype=float)
for i, toks in enumerate(sec_tokens):
    for t in toks:
        sec_tf[i, sec_vocab[t]] += 1.0

sec_df = np.count_nonzero(sec_tf > 0, axis=0)
sec_idf = np.log((1 + SD) / (1 + sec_df)) + 1.0

sec_tfidf = sec_tf * sec_idf
sec_norms = np.linalg.norm(sec_tfidf, axis=1, keepdims=True)
sec_norms[sec_norms == 0] = 1.0
sec_tfidf = sec_tfidf / sec_norms

print(f"Security vocab size: {SV}")
print(f"Security chunks:     {SD}")


Security vocab size: 84
Security chunks:     9


In [8]:
def sec_vectorize_query(query: str) -> np.ndarray:
    q_tf = np.zeros((SV,), dtype=float)
    for tok in tokenize(query):
        if tok in sec_vocab:
            q_tf[sec_vocab[tok]] += 1.0
    q_vec = q_tf * sec_idf
    n = np.linalg.norm(q_vec)
    return q_vec / (n if n else 1.0)

def sec_retrieve(query: str, k: int = 4) -> List[Tuple[float, Dict]]:
    q = sec_vectorize_query(query)
    sims = sec_tfidf @ q
    idx = np.argsort(-sims)[:k]
    return [(float(sims[i]), security_chunks[i]) for i in idx]

def sec_rag_guidance(incident_summary: str, k: int = 4) -> Dict:
    hits = sec_retrieve(incident_summary, k=k)
    steps = "\n".join([f"- {c['text']}" for _, c in hits])
    guidance = (
        f"Incident summary: {incident_summary}\n\n"
        f"Relevant runbook guidance:\n{steps}\n\n"
        f"Actionable plan:\n"
        f"1) Enforce rate limiting and throttle abusive sources.\n"
        f"2) Consider temporary IP block or CAPTCHA for the offending IPs.\n"
        f"3) Require MFA enrollment for affected users and reset passwords as needed.\n"
        f"4) Review access patterns, correlate geolocation and device fingerprints.\n"
        f"5) Notify stakeholders and document containment and recovery steps."
    )
    return {"hits": hits, "guidance": guidance}


In [9]:
# Build an incident summary automatically from analytics results
if not suspect_ips.empty:
    top_ip = suspect_ips.index[0]
    top_count = int(ip_counts.loc[top_ip, "failures"])
    affected_users = (
        failures[failures["ip"] == top_ip]["user"]
        .value_counts()
        .index.tolist()
    )
    incident = (
        f"Detected a burst of failed authentications from IP {top_ip} "
        f"with {top_count} failures across users {affected_users}. "
        f"Possible credential stuffing or brute force."
    )
else:
    incident = "No suspicious IPs detected; failure rates within normal bands."

print("Incident summary:")
print(incident)

# Retrieve runbook guidance with the security runbooks index
sec_demo = sec_rag_guidance(incident_summary=incident, k=4)
print("\nRAG-style guidance:")
print(sec_demo["guidance"])


Incident summary:
Detected a burst of failed authentications from IP 198.51.100.77 with 30 failures across users [np.str_('dave'), np.str_('alice'), np.str_('carol'), np.str_('bob')]. Possible credential stuffing or brute force.

RAG-style guidance:
Incident summary: Detected a burst of failed authentications from IP 198.51.100.77 with 30 failures across users [np.str_('dave'), np.str_('alice'), np.str_('carol'), np.str_('bob')]. Possible credential stuffing or brute force.

Relevant runbook guidance:
- Detect bursts of authentication failures from a single IP, especially across multiple accounts.
- Repeated login failures for one or more accounts could indicate brute force attempts.
- Search for indicators of compromise related to the source IP or ASN.
- Review access logs and notify impacted users to reset passwords if necessary.

Actionable plan:
1) Enforce rate limiting and throttle abusive sources.
2) Consider temporary IP block or CAPTCHA for the offending IPs.
3) Require MFA enr

### Compose a unified report

We can synthesize both the payments RAG and the security guidance into a single structured output (for example, a JSON report your system could send to downstream services).


In [10]:
def payments_rag_snippet(query: str) -> Dict:
    res = rag_answer(query, k=3)
    return {
        "query": query,
        "top_chunks": [{"chunk_id": c["chunk_id"], "text": c["text"]} for _, c in res["hits"]],
        "answer": res["answer"],
    }

report = {
    "generated_at": datetime.utcnow().isoformat() + "Z",
    "security": {
        "incident_summary": incident,
        "suspect_ips": suspect_ips.reset_index().to_dict(orient="records"),
        "guidance": sec_demo["guidance"],
    },
    "payments": payments_rag_snippet(
        "What are common gateway flows and how does tokenization reduce PCI scope?"
    ),
}

print(json.dumps(report, indent=2))


{
  "generated_at": "2025-10-21T20:38:00.417163Z",
  "security": {
    "incident_summary": "Detected a burst of failed authentications from IP 198.51.100.77 with 30 failures across users [np.str_('dave'), np.str_('alice'), np.str_('carol'), np.str_('bob')]. Possible credential stuffing or brute force.",
    "suspect_ips": [
      {
        "ip": "198.51.100.77",
        "failures": 30,
        "zscore": 6.002192581838214
      }
    ],
    "guidance": "Incident summary: Detected a burst of failed authentications from IP 198.51.100.77 with 30 failures across users [np.str_('dave'), np.str_('alice'), np.str_('carol'), np.str_('bob')]. Possible credential stuffing or brute force.\n\nRelevant runbook guidance:\n- Detect bursts of authentication failures from a single IP, especially across multiple accounts.\n- Repeated login failures for one or more accounts could indicate brute force attempts.\n- Search for indicators of compromise related to the source IP or ASN.\n- Review access logs an

## Notes and Next Steps

- Replace the synthetic corpora with your real documents or runbooks.
- Swap the from-scratch TF-IDF with production embeddings and a vector DB.
- Plug in your preferred LLM to synthesize final answers using the retrieved context.
- Expand the security pipeline with:
  - IP reputation lookups
  - Geo and ASN enrichment
  - Correlation across systems (IdP, WAF, SIEM)
  - Alert routing and case management
