In [1]:
!pip install torch transformers langchain langchain_community faiss-cpu pandas networkx sentencepiece

Collecting langchain_community
  Downloading langchain_community-0.4-py3-none-any.whl.metadata (3.0 kB)
Collecting faiss-cpu
  Downloading faiss_cpu-1.12.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
INFO: pip is looking at multiple versions of langchain-community to determine which version is compatible with other requirements. This could take a while.
Collecting langchain_community
  Downloading langchain_community-0.3.31-py3-none-any.whl.metadata (3.0 kB)
Collecting requests (from transformers)
  Downloading requests-2.32.5-py3-none-any.whl.metadata (4.9 kB)
Collecting dataclasses-json<0.7.0,>=0.6.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7.0,>=0.6.7->langchain_community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7.0,>=0.6.7->langchain_community

In [19]:
"""
Complete fraud stream processor with:
 - async streaming ingestion (simulated)
 - enrichment (profiles + optional geo)
 - sliding-window features, per-event incremental updates
 - NetworkX graph for link analysis
 - anomaly scoring + rule checks
 - async alert delivery (HTTP webhook)
 - dynamic blacklist refresh (optional)
 - RAG (FAISS + HuggingFaceEmbeddings)
 - LLM-assisted SAR drafting (LangChain + HuggingFacePipeline)
"""

import os
import asyncio
import random
import uuid
import json
from datetime import datetime, timedelta, timezone
from typing import Dict, Any, Optional, List

import aiohttp
import pandas as pd
import networkx as nx

# Transformers & LangChain
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
from langchain.llms import HuggingFacePipeline
from langchain.prompts import PromptTemplate
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.docstore.document import Document
from langchain.chains import RetrievalQA

# ---------------- Configuration ----------------
STREAM_RATE_PER_SEC = 5           # events/sec (simulated)
WINDOW_SECONDS = 60               # sliding window length
BLOCK_CONFIDENCE_THRESHOLD = 0.8
ALERT_WEBHOOK = os.environ.get("ALERT_WEBHOOK", None)
BLACKLIST_REFRESH_URL = os.environ.get("BLACKLIST_URL", None)
BLACKLIST_REFRESH_INTERVAL = 60  # seconds

# ----- Basic profiles and starting blacklist (in-memory) -----
profiles = {
    "user_1": {"country":"IN","risk_score":0.2,"devices":["dev_1","dev_10"]},
    "user_2": {"country":"US","risk_score":0.7,"devices":["dev_2","dev_3"]},
    "user_3": {"country":"FR","risk_score":0.1,"devices":["dev_4"]},
}

blacklist = pd.DataFrame([
    {"entity_type":"ip","entity_id":"203.0.113.5","reason":"chargeback_ring"},
    {"entity_type":"card","entity_id":"4111111111111111","reason":"fraud_history"},
    {"entity_type":"device","entity_id":"dev_9","reason":"stolen"}
])

# ---------------- Fraud Processor ----------------
class FraudProcessor:
    def __init__(self, window_seconds: int = WINDOW_SECONDS):
        self.window_seconds = window_seconds
        self.df = pd.DataFrame(columns=["user_id", "amount", "timestamp", "txn_id"])
        self.graph = nx.Graph()
        self.rag_vectorstore: Optional[FAISS] = None
        self.rag_embeddings = None
        self._doc_buffer: List[Document] = []

    # ---------- enrichment ----------
    def enrich(self, txn: Dict[str, Any]) -> Dict[str, Any]:
        prof = profiles.get(txn["user_id"], {})
        txn.setdefault("timestamp", datetime.now(timezone.utc))
        txn["country"] = prof.get("country", "UNK")
        txn["risk_score"] = prof.get("risk_score", 0.5)
        txn["blacklisted"] = (
            (blacklist["entity_id"] == txn.get("ip")).any()
            or (blacklist["entity_id"] == txn.get("device")).any()
            or (blacklist["entity_id"] == txn.get("card")).any()
        )
        return txn

    # ---------- state update ----------
    def update_state(self, txn: Dict[str, Any]) -> None:
        ts = txn["timestamp"]
        if ts.tzinfo is None:
            ts = ts.replace(tzinfo=timezone.utc)
            txn["timestamp"] = ts
        self.df.loc[len(self.df)] = [txn["user_id"], txn["amount"], ts, txn["txn_id"]]

        if txn.get("ip"):
            self.graph.add_edge(txn["user_id"], txn["ip"], edge_type="user-ip")
        if txn.get("device"):
            self.graph.add_edge(txn["user_id"], txn["device"], edge_type="user-device")
        if txn.get("device") and txn.get("ip"):
            self.graph.add_edge(txn["device"], txn["ip"], edge_type="device-ip")

        window_start = datetime.now(timezone.utc) - timedelta(seconds=self.window_seconds)
        self.df = self.df[self.df["timestamp"] > window_start].reset_index(drop=True)

    # ---------- feature computation ----------
    def compute_features(self, txn: Dict[str, Any]) -> Dict[str, Any]:
        df_user = self.df[self.df["user_id"] == txn["user_id"]]
        txn["avg_amount_1m"] = float(df_user["amount"].mean()) if len(df_user) > 0 else 0.0
        txn["velocity_1m"] = int(len(df_user))
        ips = set()
        if txn.get("ip"):
            ips.add(txn["ip"])
        txn["distinct_ips_1m"] = len(ips)
        return txn

    # ---------- anomaly scoring ----------
    def anomaly_score(self, txn: Dict[str, Any]) -> float:
        amounts = self.df["amount"]
        if len(amounts) < 6:
            return 0.0
        mu = amounts.mean()
        sigma = amounts.std(ddof=0)
        if sigma < 1e-6:
            return 0.0
        z = abs((txn["amount"] - mu) / sigma)
        return float(min(z / 10.0, 1.0))

    # ---------- small ring detection helper ----------
    def ring_degree(self, node: str) -> int:
        try:
            return len(list(self.graph.neighbors(node)))
        except Exception:
            return 0

    # ---------- RAG helpers ----------
    def init_rag(self, embedding_model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        self.rag_embeddings = HuggingFaceEmbeddings(model_name=embedding_model_name)
        # Add a dummy doc to avoid FAISS empty index error
        dummy_doc = Document(page_content="Initial placeholder", metadata={"source":"init"})
        self.rag_vectorstore = FAISS.from_documents([dummy_doc], self.rag_embeddings)

    def add_doc_to_rag(self, txt: str, meta: Optional[Dict[str, Any]] = None):
        if self.rag_vectorstore is None:
            raise RuntimeError("RAG store not initialized.")
        doc = Document(page_content=txt, metadata=meta or {})
        self.rag_vectorstore.add_documents([doc])
        self._doc_buffer.append(doc)

# ----------------- Utilities: webhook alert sender -----------------
async def send_alert_via_http(txn: Dict[str, Any], reason: str):
    if not ALERT_WEBHOOK:
        print(f"🚨 ALERT [{txn['txn_id'][:8]}] user={txn['user_id']} reason={reason} amt={txn['amount']}")
        return
    payload = {
        "tx": txn,
        "reason": reason,
        "timestamp": datetime.now(timezone.utc).isoformat()
    }
    try:
        async with aiohttp.ClientSession() as session:
            async with session.post(ALERT_WEBHOOK, json=payload, timeout=10) as resp:
                if 200 <= resp.status < 300:
                    print(f"Alert delivered, webhook responded {resp.status}")
                else:
                    print(f"Alert delivery failed: {resp.status} - {await resp.text()}")
    except Exception as e:
        print("Exception while sending alert:", e)

# ----------------- LLM / RAG SAR generation -----------------
class SARGenerator:
    def __init__(self, llm_pipeline: HuggingFacePipeline, rag_store: FAISS, rag_embeddings: HuggingFaceEmbeddings):
        self.llm = llm_pipeline
        self.rag_store = rag_store
        self.rag_embeddings = rag_embeddings
        self.retriever = self.rag_store.as_retriever(search_kwargs={"k": 5})
        self.prompt_template = PromptTemplate(
            input_variables=["facts", "txn", "analysis"],
            template=(
                "You are a compliance analyst assistant. Using the retrieved facts below and the transaction record, "
                "draft a clear Suspicious Activity Report (SAR). Be concise and cite the retrieved facts when relevant.\n\n"
                "Retrieved facts:\n{facts}\n\n"
                "Transaction:\n{txn}\n\n"
                "Analysis and flags:\n{analysis}\n\n"
                "SAR:"
            ),
        )

    async def generate_sar(self, txn: Dict[str, Any], analysis: str) -> str:
        txn_text = json.dumps({
            "txn_id": txn.get("txn_id"),
            "user_id": txn.get("user_id"),
            "amount": txn.get("amount"),
            "ip": txn.get("ip"),
            "device": txn.get("device"),
            "country": txn.get("country"),
            "timestamp": txn.get("timestamp").isoformat() if isinstance(txn.get("timestamp"), datetime) else txn.get("timestamp")
        }, default=str, indent=2)

        def sync_retrieve(q):
            return self.retriever.get_relevant_documents(q)

        loop = asyncio.get_event_loop()
        retrieved_docs = await loop.run_in_executor(None, sync_retrieve, txn_text)
        facts = "\n---\n".join([f"{d.page_content}\nMETA: {d.metadata}" for d in retrieved_docs]) or "No prior facts found."

        prompt = self.prompt_template.format(facts=facts, txn=txn_text, analysis=analysis)

        def sync_llm_call(p):
            return self.llm(p)

        llm_result = await loop.run_in_executor(None, sync_llm_call, prompt)

        if isinstance(llm_result, str):
            sar_text = llm_result
        elif isinstance(llm_result, list):
            try:
                sar_text = llm_result[0].get("generated_text", str(llm_result[0]))
            except Exception:
                sar_text = str(llm_result)
        else:
            sar_text = str(llm_result)

        return sar_text

# ----------------- Model & LangChain initialization -----------------
def init_llm_pipeline(model_name: str = "TheBloke/vicuna-7B-1.1-HF"):
    print(f"Initializing LLM model {model_name} ... (this may take time & memory)")
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)
    model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto", torch_dtype="auto")
    gen_pipe = pipeline(
        "text-generation",
        model=model,
        tokenizer=tokenizer,
        max_new_tokens=256,
        temperature=0.2,
        repetition_penalty=1.05,
        do_sample=True
    )
    llm = HuggingFacePipeline(pipeline=gen_pipe)
    return llm

# ----------------- Blacklist local update helper -----------------
def update_blacklist_local(txn: Dict[str, Any], reason: str):
    global blacklist
    rows = []
    for ent_field, ent_type in [("ip", "ip"), ("device", "device"), ("card", "card")]:
        ent = txn.get(ent_field)
        if ent and ent not in blacklist["entity_id"].values:
            rows.append({"entity_type": ent_type, "entity_id": ent, "reason": reason})
    if rows:
        blacklist = pd.concat([blacklist, pd.DataFrame(rows)], ignore_index=True)
        print("Updated local blacklist with:", rows)

# ----------------- Simulated stream -----------------
async def transaction_stream(queue: asyncio.Queue):
    users = list(profiles.keys())
    possible_ips = ["198.51.100.1", "203.0.113.5", "192.0.2.25", "198.51.100.77"]
    while True:
        uid = random.choice(users)
        txn = {
            "txn_id": str(uuid.uuid4()),
            "user_id": uid,
            "device": random.choice(profiles[uid]["devices"]),
            "amount": round(random.uniform(10, 5000), 2),
            "ip": random.choice(possible_ips),
            "card": random.choice([None, "4111111111111111", "5555444433332222"]),
            "timestamp": datetime.now(timezone.utc),
            "type": random.choice(["card_payment", "transfer", "crypto_withdrawal"])
        }
        await queue.put(txn)
        await asyncio.sleep(1.0 / STREAM_RATE_PER_SEC)

# ----------------- Event processing loop -----------------
async def process_loop(queue: asyncio.Queue, proc: FraudProcessor, sar_gen: SARGenerator):
    while True:
        txn = await queue.get()
        try:
            txn = proc.enrich(txn)
            proc.update_state(txn)
            txn = proc.compute_features(txn)
            score = proc.anomaly_score(txn)
            txn["anomaly_score"] = score

            flagged_reasons = []
            if txn.get("blacklisted"):
                flagged_reasons.append("blacklist_hit")
            if txn["anomaly_score"] > 0.7:
                flagged_reasons.append("high_anomaly")
            if txn["velocity_1m"] >= 5:
                flagged_reasons.append("high_velocity")
            if txn["risk_score"] >= 0.8:
                flagged_reasons.append("high_profile_risk")
            if proc.ring_degree(txn["user_id"]) >= 3:
                flagged_reasons.append("high_graph_degree")

            analysis_lines = [
                f"Anomaly score: {txn['anomaly_score']:.3f}",
                f"Velocity(1m): {txn.get('velocity_1m')}",
                f"Avg amount(1m): {txn.get('avg_amount_1m')}",
                f"Graph degree: {proc.ring_degree(txn['user_id'])}",
                f"Blacklisted: {txn.get('blacklisted')}",
                f"Profile risk: {txn.get('risk_score')}"
            ]
            analysis = "\n".join(analysis_lines)

            if flagged_reasons:
                reason = ",".join(flagged_reasons)
                asyncio.create_task(send_alert_via_http(txn, reason))

                fact_text = f"Txn {txn['txn_id']}: user={txn['user_id']} amount={txn['amount']} ip={txn.get('ip')} device={txn.get('device')} type={txn.get('type')}"
                proc.add_doc_to_rag(fact_text, meta={"txn_id": txn["txn_id"], "user_id": txn["user_id"]})

                sar_text = await sar_gen.generate_sar(txn, analysis)
                print("📝 RAG SAR Analysis:\n", sar_text)

                if txn["anomaly_score"] > BLOCK_CONFIDENCE_THRESHOLD or txn.get("blacklisted"):
                    update_blacklist_local(txn, "auto_block_from_pipeline")
                    print(f"🔒 Action: blocked user/device/ip for txn {txn['txn_id']}")
            else:
                if random.random() < 0.05:
                    fact_text = f"Txn {txn['txn_id']}: user={txn['user_id']} amount={txn['amount']} ip={txn.get('ip')}"
                    proc.add_doc_to_rag(fact_text, meta={"txn_id": txn["txn_id"]})

        except Exception as e:
            print("Processing error:", e)
        finally:
            queue.task_done()

# ----------------- Entrypoint -----------------
async def main():
    q: asyncio.Queue = asyncio.Queue(maxsize=10000)
    proc = FraudProcessor(window_seconds=WINDOW_SECONDS)
    proc.init_rag()
    model_name = os.environ.get("LLM_MODEL", "TheBloke/vicuna-7B-1.1-HF")
    llm_pipeline = init_llm_pipeline(model_name=model_name)
    sar_gen = SARGenerator(llm_pipeline=llm_pipeline, rag_store=proc.rag_vectorstore, rag_embeddings=proc.rag_embeddings)

    producers = [asyncio.create_task(transaction_stream(q))]
    consumers = [asyncio.create_task(process_loop(q, proc, sar_gen))]

    await asyncio.gather(*producers, *consumers)

# ----------------- Jupyter-safe entry -----------------
if __name__ == "__main__":
    try:
        # Jupyter / Colab compatibility
        import nest_asyncio
        nest_asyncio.apply()
        asyncio.run(main())
    except KeyboardInterrupt:
        print("Interrupted by user")

Initializing LLM model TheBloke/vicuna-7B-1.1-HF ... (this may take time & memory)


tokenizer_config.json:   0%|          | 0.00/727 [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/500k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/411 [00:00<?, ?B/s]

You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama.LlamaTokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565 - if you loaded a llama tokenizer from a GGUF file you can ignore this message
You are using the default legacy behaviour of the <class 'transformers.models.llama.tokenization_llama_fast.LlamaTokenizerFast'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggin

config.json:   0%|          | 0.00/582 [00:00<?, ?B/s]

`torch_dtype` is deprecated! Use `dtype` instead!


pytorch_model.bin.index.json: 0.00B [00:00, ?B/s]

Fetching 2 files:   0%|          | 0/2 [00:00<?, ?it/s]

pytorch_model-00002-of-00002.bin:   0%|          | 0.00/3.50G [00:00<?, ?B/s]

pytorch_model-00001-of-00002.bin:   0%|          | 0.00/9.98G [00:00<?, ?B/s]

The following generation flags are not valid and may be ignored: ['pad_token_id']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/137 [00:00<?, ?B/s]

Device set to use cuda:0
  llm = HuggingFacePipeline(pipeline=gen_pipe)
  return self.retriever.get_relevant_documents(q)
  return self.llm(p)


🚨 ALERT [19877b44] user=user_3 reason=blacklist_hit amt=1644.76
📝 RAG SAR Analysis:
 You are a compliance analyst assistant. Using the retrieved facts below and the transaction record, draft a clear Suspicious Activity Report (SAR). Be concise and cite the retrieved facts when relevant.

Retrieved facts:
Txn 19877b44-08cf-4135-af47-1608087f5552: user=user_3 amount=1644.76 ip=198.51.100.1 device=dev_4 type=card_payment
META: {'txn_id': '19877b44-08cf-4135-af47-1608087f5552', 'user_id': 'user_3'}
---
Initial placeholder
META: {'source': 'init'}

Transaction:
{
  "txn_id": "19877b44-08cf-4135-af47-1608087f5552",
  "user_id": "user_3",
  "amount": 1644.76,
  "ip": "198.51.100.1",
  "device": "dev_4",
  "country": "FR",
  "timestamp": "2025-10-23T09:04:31.705203+00:00"
}

Analysis and flags:
Anomaly score: 0.000
Velocity(1m): 1
Avg amount(1m): 1644.76
Graph degree: 2
Blacklisted: True
Profile risk: 0.1

SAR:
Title: Suspicious Transaction - User_3
Description:
The transaction appears to be l

In [18]:
from langchain.docstore.document import Document