
# Transformer Web Agent (Agent + MiniDAST — Refreshed)
Bu not defteri, mini **Transformer planlayıcı** (Transformer) ile **MiniDAST yürütücüleri**ni uçtan‑uca birleştirir.


In [1]:

import math, random, json, os, time, re
from typing import List, Dict, Tuple, Optional
import torch, torch.nn as nn, torch.nn.functional as F

device = "cpu"
torch.manual_seed(11)
print("Torch", torch.__version__, "| device:", device)


Torch 2.6.0+cu124 | device: cpu


## 1) Sözcükler ve DSL izinleri

In [2]:

TOKENS = [
    "CTX::DOMAIN_ECOM",
    "HTTP::GET","HTTP::POST","HTTP::PUT","HTTP::DELETE",
    "PATH::/api/cart/add","PATH::/api/cart/apply-coupon","PATH::/api/cart","PATH::/api/cart/checkout",
    "JSON::sku","JSON::code","JSON::cart_id",
    "TOK::SKU-1","TOK::SKU-9","TOK::WELCOME10","TOK::FIRSTBUY","TOK::SELF","TOK::OTHER",
    "POL::no_negative_total","POL::coupon_once_per_user","POL::cart_is_private",
    "PLAN::Q","PLAN::W","PLAN::I","PLAN::T","PLAN::S",
    "TXT::neg_total?","TXT::reuse?","TXT::cross_cart?",
    "EXPR::delta_total<0 AND coupon_twice","EXPR::coupon_count>1","EXPR::owner_mismatch",
    "DSL::HTTP","TOK::POST","TOK::PUT","TOK::GET",
    "FIND::NEGATIVE_TOTAL","FIND::COUPON_REUSE","FIND::CROSS_CART_ACCESS","END"
]
itos = TOKENS
stoi = {t:i for i,t in enumerate(itos)}

DSL_ALLOWED = set([
    "DSL::HTTP","TOK::POST","TOK::PUT","TOK::GET",
    "PATH::/api/cart","PATH::/api/cart/apply-coupon","PATH::/api/cart/checkout",
    "JSON::code","JSON::cart_id","TOK::WELCOME10","TOK::FIRSTBUY","TOK::SELF","TOK::OTHER"
])

ROLE2ID = {"Q":0,"W":1,"I":2,"T":3,"S":4}
ROLES = list(ROLE2ID.keys())
POLICY_TOKENS = ["POL::no_negative_total","POL::coupon_once_per_user","POL::cart_is_private"]

def dsl_filter(seq: List[str]) -> List[str]:
    """Plan çıktısından sadece DSL/Path/Param token'larını al."""
    keep = {"JSON::code","JSON::cart_id","TOK::WELCOME10","TOK::FIRSTBUY","TOK::SELF","TOK::OTHER","TOK::POST","TOK::PUT","TOK::GET"}
    return [t for t in seq if t.startswith("DSL::") or t.startswith("PATH::/api/") or t in keep]


## 2) Küçük eğitim verisi (plan → DSL)

In [3]:

samples = [
    {
        "in": ["CTX::DOMAIN_ECOM","POL::no_negative_total","POL::coupon_once_per_user"],
        "out":[
            "PLAN::Q","TXT::neg_total?",
            "PLAN::W","EXPR::delta_total<0 AND coupon_twice",
            "PLAN::I","DSL::HTTP","TOK::POST","PATH::/api/cart/apply-coupon","JSON::code","TOK::WELCOME10",
                       "DSL::HTTP","TOK::POST","PATH::/api/cart/apply-coupon","JSON::code","TOK::WELCOME10",
            "PLAN::S","FIND::NEGATIVE_TOTAL","END"
        ]
    },
    {
        "in": ["CTX::DOMAIN_ECOM","POL::coupon_once_per_user"],
        "out":[
            "PLAN::Q","TXT::reuse?",
            "PLAN::W","EXPR::coupon_count>1",
            "PLAN::I","DSL::HTTP","TOK::POST","PATH::/api/cart/apply-coupon","JSON::code","TOK::FIRSTBUY",
                       "DSL::HTTP","TOK::POST","PATH::/api/cart/apply-coupon","JSON::code","TOK::FIRSTBUY",
            "PLAN::S","FIND::COUPON_REUSE","END"
        ]
    },
    {
        "in": ["CTX::DOMAIN_ECOM","POL::cart_is_private"],
        "out":[
            "PLAN::Q","TXT::cross_cart?",
            "PLAN::W","EXPR::owner_mismatch",
            "PLAN::I","DSL::HTTP","TOK::PUT","PATH::/api/cart","JSON::cart_id","TOK::OTHER",
            "PLAN::S","FIND::CROSS_CART_ACCESS","END"
        ]
    }
]

def to_ids(toks): return [stoi[t] for t in toks if t in stoi]
data = [(torch.tensor(to_ids(s["in"])), torch.tensor(to_ids(s["out"]))) for s in samples]
len(data), [len(x[0]) for x in data], [len(x[1]) for x in data]


(3, [3, 2, 2], [18, 18, 13])

## 3) Mini Transformer Planlayıcı (encoder+decoder)

In [4]:

class Encoder(nn.Module):
    def __init__(self, V, d=128, nhead=4, L=2):
        super().__init__()
        self.emb = nn.Embedding(V,d)
        self.pos = nn.Embedding(512,d)
        self.enc = nn.TransformerEncoder(nn.TransformerEncoderLayer(d, nhead, 256, batch_first=True), L)
    def forward(self, x):
        B,T = x.size()
        p = torch.arange(T, device=x.device).unsqueeze(0).expand(B,T)
        h = self.emb(x) + self.pos(p)
        return self.enc(h)

class PolicyEnc(nn.Module):
    def __init__(self, tok_emb):
        super().__init__()
        self.tok = tok_emb
        self.proj = nn.Linear(self.tok.embedding_dim, self.tok.embedding_dim, bias=False)
    def forward(self, pol_idx, device="cpu"):
        if not pol_idx: return torch.zeros((1,self.tok.embedding_dim), device=device)
        with torch.no_grad():
            v = self.tok(torch.tensor(pol_idx, device=device)).mean(0, keepdim=True)
        return self.proj(v)

class Decoder(nn.Module):
    def __init__(self, V, d=128, nhead=4, L=2):
        super().__init__()
        self.emb = nn.Embedding(V,d); self.pos = nn.Embedding(1024,d)
        self.role = nn.Embedding(5,d)
        self.layers = nn.ModuleList([nn.TransformerDecoderLayer(d, nhead, 256, batch_first=True) for _ in range(L)])
        self.out = nn.Linear(d,V)
    def forward(self, y, roles, mem, pol_bias=None, action_mask=None):
        B,T = y.size()
        p = torch.arange(T, device=y.device).unsqueeze(0).expand(B,T)
        h = self.emb(y) + self.pos(p) + self.role(roles)
        if pol_bias is not None: h = h + 0.5*pol_bias
        tgt_mask = torch.triu(torch.ones(T,T, device=y.device), 1).bool()
        for lyr in self.layers:
            h = lyr(h, mem, tgt_mask=tgt_mask)
        logits = self.out(h)
        if action_mask is not None:
            mask = torch.zeros_like(logits); mask[:]=-1e9
            allow = torch.tensor([stoi[t] for t in itos if itos[stoi[t]] in DSL_ALLOWED], device=logits.device)
            mask[:,:,allow]=0
            logits = torch.where(action_mask.unsqueeze(-1), logits+mask, logits)
        return logits

class Planner(nn.Module):
    def __init__(self, V, d=128):
        super().__init__()
        self.enc = Encoder(V,d)
        self.pol = PolicyEnc(self.enc.emb)
        self.dec = Decoder(V,d)
    def forward(self, src, tgt_inp, roles, action_mask=None, pol_idx=None):
        mem = self.enc(src)
        pb = self.pol(pol_idx or [], src.device)
        return self.dec(tgt_inp, roles, mem, pb, action_mask)


## 4) Eğitim (kısa SFT)

In [5]:

def pad_batch(batch):
    src_l = max(len(s[0]) for s in batch)
    tgt_l = max(len(s[1]) for s in batch)
    B = len(batch)
    src = torch.full((B,src_l), 0, dtype=torch.long)
    tgt_inp = torch.full((B,tgt_l), 0, dtype=torch.long)
    tgt_out = torch.full((B,tgt_l), 0, dtype=torch.long)
    roles = torch.zeros((B,tgt_l), dtype=torch.long)
    for i,(s,t) in enumerate(batch):
        src[i,:len(s)] = s
        bos = stoi["PLAN::Q"]
        tgt_inp[i,0]=bos; tgt_inp[i,1:len(t)] = t[:-1]
        tgt_out[i,:len(t)] = t
        cyc = [ROLE2ID[r] for r in ROLES]
        for k in range(len(t)): roles[i,k]=cyc[k%5]
    return src, tgt_inp, tgt_out, roles

def make_action_mask(tgt_inp):
    B,T = tgt_inp.size()
    m = torch.zeros((B,T), dtype=torch.bool)
    for i in range(B):
        for t in range(T):
            if itos[tgt_inp[i,t].item()].startswith("DSL::"):
                m[i,t]=True
    return m

model = Planner(len(itos), d=128).to(device)
opt = torch.optim.AdamW(model.parameters(), lr=2e-3)
loss_fn = nn.CrossEntropyLoss()
pol_idx = [stoi[p] for p in POLICY_TOKENS if p in stoi]

for step in range(160):
    batch = random.sample(data, k=min(3,len(data)))
    src, tgt_inp, tgt_out, roles = pad_batch(batch)
    src,tgt_inp,tgt_out,roles = src.to(device),tgt_inp.to(device),tgt_out.to(device),roles.to(device)
    am = make_action_mask(tgt_inp).to(device)
    logits = model(src, tgt_inp, roles, am, pol_idx)
    B,T,V = logits.shape
    loss = loss_fn(logits.reshape(B*T,V), tgt_out.reshape(B*T))
    opt.zero_grad(); loss.backward(); nn.utils.clip_grad_norm_(model.parameters(), 1.0); opt.step()
    if (step+1)%40==0: print(f"step {step+1:03d} | loss {loss.item():.3f}")


step 040 | loss 0.006
step 080 | loss 0.002
step 120 | loss 0.002
step 160 | loss 0.001


## 5) DSL üretimi (greedy)

In [6]:

def generate(plan_ctx: List[str], max_len=32):
    src = torch.tensor([stoi[t] for t in plan_ctx if t in stoi], dtype=torch.long).unsqueeze(0)
    mem = model.enc(src)
    pb = model.pol([stoi[p] for p in POLICY_TOKENS if p in stoi], device)
    y = torch.tensor([[stoi["PLAN::Q"]]], dtype=torch.long)
    r = torch.tensor([[ROLE2ID["Q"]]], dtype=torch.long)
    out = []
    for _ in range(max_len):
        prev = itos[y[0,-1].item()]
        am = torch.ones((1,y.size(1)), dtype=torch.bool) if prev.startswith("DSL::") else torch.zeros((1,y.size(1)), dtype=torch.bool)
        logits = model.dec(y, r, mem, pb, am)
        nxt = int(torch.argmax(logits[0,-1]))
        out.append(itos[nxt])
        y = torch.cat([y, torch.tensor([[nxt]], dtype=torch.long)], dim=1)
        r = torch.cat([r, torch.tensor([[(int(r[0,-1])+1)%5]], dtype=torch.long)], dim=1)
        if itos[nxt]=="END": break
    return out

print("ex NEGATIVE_TOTAL:", " ".join(generate(["CTX::DOMAIN_ECOM","POL::no_negative_total","POL::coupon_once_per_user"])))
print("ex COUPON_REUSE  :", " ".join(generate(["CTX::DOMAIN_ECOM","POL::coupon_once_per_user"])))
print("ex CROSS_CART    :", " ".join(generate(["CTX::DOMAIN_ECOM","POL::cart_is_private"])))


ex NEGATIVE_TOTAL: PLAN::Q TXT::neg_total? PLAN::W EXPR::delta_total<0 AND coupon_twice PLAN::I DSL::HTTP TOK::POST PATH::/api/cart/apply-coupon JSON::code TOK::WELCOME10 DSL::HTTP TOK::POST PATH::/api/cart/apply-coupon JSON::code TOK::WELCOME10 PLAN::S FIND::NEGATIVE_TOTAL END
ex COUPON_REUSE  : PLAN::Q TXT::reuse? PLAN::W EXPR::coupon_count>1 PLAN::I DSL::HTTP TOK::POST PATH::/api/cart/apply-coupon JSON::code TOK::FIRSTBUY DSL::HTTP TOK::POST PATH::/api/cart/apply-coupon JSON::code TOK::FIRSTBUY PLAN::S FIND::COUPON_REUSE END
ex CROSS_CART    : PLAN::Q TXT::cross_cart? PLAN::W EXPR::owner_mismatch PLAN::I DSL::HTTP TOK::PUT PATH::/api/cart JSON::cart_id TOK::OTHER PLAN::S FIND::CROSS_CART_ACCESS END


## 6) MiniDAST yürütücüler (simulate + http iskelet)

In [7]:

def now_iso():
    import datetime as dt
    return dt.datetime.utcnow().isoformat()+"Z"

class ToyEnv:
    def __init__(self, user_id="SELF", cart_owner="SELF"):
        self.cart={"total":0.0,"coupon_applied_count":0,"owner_id":cart_owner}
        self.user={"id":user_id}
        self.hist=[]
    def add_item(self, price): self.cart["total"]+=price
    def apply_coupon(self, code):
        if code in ("WELCOME10","FIRSTBUY"):
            self.cart["total"]-=10.0; self.cart["coupon_applied_count"]+=1
            self.hist.append({"op":"apply_coupon","code":code,"total":self.cart["total"]})
    def set_cart_owner(self, owner):
        self.cart["owner_id"]=owner; self.hist.append({"op":"set_cart_owner","owner":owner})
    def checkout(self): self.hist.append({"op":"checkout","total":self.cart["total"]}); return self.cart["total"]

class SimulateExec:
    def __init__(self):
        self.env = ToyEnv("SELF","SELF")
    def seed(self, total=25.0): self.env.add_item(total)
    def run(self, toks: List[str]):
        i=0
        while i<len(toks):
            t=toks[i]
            if t=="TOK::WELCOME10": self.env.apply_coupon("WELCOME10")
            elif t=="TOK::FIRSTBUY": self.env.apply_coupon("FIRSTBUY")
            elif t=="PATH::/api/cart/checkout": self.env.checkout()
            elif t=="PATH::/api/cart":
                look=toks[i+1:i+5]
                if "JSON::cart_id" in look and "TOK::OTHER" in look: self.env.set_cart_owner("OTHER")
            i+=1
        return {"cart":dict(self.env.cart),"user":dict(self.env.user),"history":list(self.env.hist)}
    def policies(self):
        f=[]
        if self.env.cart["total"]<0: f.append("NEGATIVE_TOTAL")
        if self.env.cart["coupon_applied_count"]>1: f.append("COUPON_REUSE")
        if self.env.cart["owner_id"]!=self.env.user["id"]: f.append("CROSS_CART_ACCESS")
        return f

class HTTPExec:
    def __init__(self, base, rps=2.0, budget=50):
        self.base=base.rstrip("/"); self.rps=rps; self.rem=budget; self.last=0.0
        try: import requests  # noqa
        except Exception: raise RuntimeError("HTTP mode requires 'requests'.")
        self.s = __import__("requests").Session(); self.hist=[]
    def _throttle(self):
        import time as _t
        dt=_t.time()-self.last; need=1.0/max(self.rps,0.1)
        if dt<need: _t.sleep(need-dt)
        self.last=_t.time()
    def _req(self, m, path, body=None):
        if self.rem<=0: raise RuntimeError("Budget exhausted")
        self._throttle()
        url=self.base+path; r=self.s.request(method=m, url=url, json=body, timeout=10)
        self.rem-=1
        item={"ts":now_iso(),"req":{"m":m,"url":url,"json":body},"status":r.status_code,"body":None}
        try: item["body"]=r.json()
        except Exception: item["body"]=r.text[:400]
        self.hist.append(item); return item
    def run(self, toks: List[str]):
        i=0; m="GET"
        while i<len(toks):
            t=toks[i]
            if t=="TOK::POST": m="POST"
            if t=="TOK::PUT": m="PUT"
            if t=="TOK::GET": m="GET"
            if t.startswith("PATH::"):
                path=t.replace("PATH::","")
                look=toks[i+1:i+5]; body=None
                if "JSON::code" in look:
                    val="WELCOME10" if "TOK::WELCOME10" in look else ("FIRSTBUY" if "TOK::FIRSTBUY" in look else None)
                    if val: body={"code":val}
                if "JSON::cart_id" in look:
                    owner="OTHER" if "TOK::OTHER" in look else "SELF"; body={"cart_id":owner}
                self._req(m, path, body)
            i+=1
        return {"history":list(self.hist)}


## 7) `scan()`

In [8]:

from dataclasses import dataclass, asdict

@dataclass
class Finding:
    id: str
    kind: str
    severity: str
    evidence: dict
    repeatability: float = 0.6

def scan(mode: str = "simulate",
         base: str = "https://juice-shop.herokuapp.com/",
         plans: Optional[List[List[str]]] = None,
         out_path: Optional[str] = None):
    """Run flows against simulate/http executors and return a report dict."""
    if mode == "simulate":
        exe = SimulateExec(); exe.seed(25.0)
    else:
        exe = HTTPExec(base, rps=2.0, budget=50)

    findings: List[Finding] = []
    plans = plans or []

    for i, plan in enumerate(plans, start=1):
        # Bounded autonomy check
        for tok in plan:
            if tok.startswith(("DSL::", "TOK::", "JSON::", "PATH::")) and (
                tok not in DSL_ALLOWED
                and not tok.startswith("PATH::/api/")
                and not tok.startswith("JSON::")
            ):
                raise RuntimeError(f"Token not allowed: {tok}")

        res = exe.run(plan)
        ev = {"plan_tokens": plan, "result": res}

        if mode == "simulate":
            for p in exe.policies():
                sev = "high" if p in ("NEGATIVE_TOTAL","CROSS_CART_ACCESS") else "medium"
                findings.append(Finding(id=f"FND::{p}::{int(time.time())}",
                                        kind=p, severity=sev, evidence=ev))
        else:
            # Build text & text_min locally (per-iteration) to avoid scope issues
            hist = res.get("history", [])
            text = json.dumps(hist, ensure_ascii=False)
            text_min = re.sub(r"\s+", "", text)

            # Heuristic oracles (replace with JSON field checks for prod)
            if "\"total\":-" in text_min:
                findings.append(Finding(id=f"FND::NEGATIVE_TOTAL::{int(time.time())}",
                                        kind="NEGATIVE_TOTAL", severity="high", evidence=ev))

            if text.count("apply-coupon") >= 2:
                findings.append(Finding(id=f"FND::COUPON_REUSE::{int(time.time())}",
                                        kind="COUPON_REUSE", severity="medium", evidence=ev))

            if "\"cart_id\":\"OTHER\"" in text_min:
                findings.append(Finding(id=f"FND::CROSS_CART_ACCESS::{int(time.time())}",
                                        kind="CROSS_CART_ACCESS", severity="high", evidence=ev))

    report = {
        "ts": now_iso(),
        "mode": mode,
        "target": base,
        "findings": [asdict(f) for f in findings]
    }
    if out_path:
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
    return report


## 8) E2E demo (model → DSL → simulate)

In [11]:

# Modelden üç bağlam için DSL üret, filtrele ve tara
ctxs = [
    ["CTX::DOMAIN_ECOM","POL::no_negative_total","POL::coupon_once_per_user"],
    ["CTX::DOMAIN_ECOM","POL::coupon_once_per_user"],
    ["CTX::DOMAIN_ECOM","POL::cart_is_private"],
]
plans = [dsl_filter(generate(c)) for c in ctxs]
print("Plans:", plans)
report = scan(mode="simulate", base="https://juice-shop.herokuapp.com/", plans=plans, out_path="report_e2e_sim_fixed.json")
print(json.dumps(report, ensure_ascii=False, indent=2))
print("Rapor kaydedildi:", "report_e2e_sim_fixed.json")


Plans: [['DSL::HTTP', 'TOK::POST', 'PATH::/api/cart/apply-coupon', 'JSON::code', 'TOK::WELCOME10', 'DSL::HTTP', 'TOK::POST', 'PATH::/api/cart/apply-coupon', 'JSON::code', 'TOK::WELCOME10'], ['DSL::HTTP', 'TOK::POST', 'PATH::/api/cart/apply-coupon', 'JSON::code', 'TOK::FIRSTBUY', 'DSL::HTTP', 'TOK::POST', 'PATH::/api/cart/apply-coupon', 'JSON::code', 'TOK::FIRSTBUY'], ['DSL::HTTP', 'TOK::PUT', 'PATH::/api/cart', 'JSON::cart_id', 'TOK::OTHER']]
{
  "ts": "2025-08-16T13:27:36.880577Z",
  "mode": "simulate",
  "target": "https://juice-shop.herokuapp.com/",
  "findings": [
    {
      "id": "FND::COUPON_REUSE::1755350856",
      "kind": "COUPON_REUSE",
      "severity": "medium",
      "evidence": {
        "plan_tokens": [
          "DSL::HTTP",
          "TOK::POST",
          "PATH::/api/cart/apply-coupon",
          "JSON::code",
          "TOK::WELCOME10",
          "DSL::HTTP",
          "TOK::POST",
          "PATH::/api/cart/apply-coupon",
          "JSON::code",
          "TOK::WEL