In [1]:
!pip install "vyper==0.3.10" web3 eth-account hexbytes



In [2]:
!pip install "fsspec>=2023.1.0,<=2025.3.0"



In [21]:
import os, sys, json, textwrap, subprocess, shutil, time
from pathlib import Path

In [29]:
def _in_notebook():
    try:
        from IPython import get_ipython  
        return True
    except Exception:
        return False
def _pip_install(pkgs):
    """Install packages within Jupyter (uses %pip) or subprocess fallback."""
    if not pkgs:
        return
    if _in_notebook():
        try:
            from IPython import get_ipython
            get_ipython().run_line_magic("pip", "install -q --disable-pip-version-check " + " ".join(pkgs))
            return
        except Exception:
            pass
    subprocess.check_call([sys.executable, "-m", "pip", "install", "--upgrade", *pkgs])

In [30]:
BASE_DIR = Path(r"C:\Users\aniru\OneDrive\Desktop\ML tutorial\DEFI Depeg sentinel")
if not BASE_DIR.exists():
    BASE_DIR = Path.cwd() / "DEFI_Depeg_sentinel"
ART = BASE_DIR / "artifacts"
ART.mkdir(parents=True, exist_ok=True)
print(f"[paths] BASE_DIR={BASE_DIR}")
print(f"[paths] ART={ART}")

[paths] BASE_DIR=C:\Users\aniru\OneDrive\Desktop\ML tutorial\DEFI Depeg sentinel
[paths] ART=C:\Users\aniru\OneDrive\Desktop\ML tutorial\DEFI Depeg sentinel\artifacts


In [31]:
vy_source = textwrap.dedent(r"""
# @version ^0.3.9

MAX_BPS: constant(uint256) = 10_000

name: public(String[32])
symbol: public(String[16])
decimals: public(uint8)

owner: public(address)
minter: public(address)
treasury: public(address)
treasury_bps: public(uint256)

paused: public(bool)
total_supply: public(uint256)

balances: HashMap[address, uint256]
allowances: HashMap[address, HashMap[address, uint256]]

used_block: HashMap[bytes32, bool]

event Transfer:
    sender: address
    receiver: address
    value: uint256

event Approval:
    owner: address
    spender: address
    value: uint256

event DataBlockSubmitted:
    block_hash: bytes32
    anomaly_bps: uint256
    novelty_bps: uint256
    severity: uint256
    reward: uint256
    to: address

@external
def __init__(name_: String[32], symbol_: String[16], treasury_: address, minter_: address, treasury_bps_: uint256):
    self.name = name_
    self.symbol = symbol_
    self.decimals = 18
    self.owner = msg.sender
    self.minter = minter_
    self.treasury = treasury_
    assert treasury_bps_ <= MAX_BPS
    self.treasury_bps = treasury_bps_
    self.paused = False
    self.total_supply = 0

@internal
def _only_owner():
    assert msg.sender == self.owner, "only owner"

@internal
def _only_minter():
    assert msg.sender == self.minter, "only minter"

@external
def set_minter(a: address):
    self._only_owner()
    self.minter = a

@external
def set_treasury(a: address):
    self._only_owner()
    self.treasury = a

@external
def set_treasury_bps(bps: uint256):
    self._only_owner()
    assert bps <= MAX_BPS
    self.treasury_bps = bps

@external
def pause():
    self._only_owner()
    self.paused = True

@external
def unpause():
    self._only_owner()
    self.paused = False

@view
@external
def used(blk: bytes32) -> bool:
    return self.used_block[blk]

@internal
def _mint(to: address, amount: uint256):
    assert to != empty(address)
    self.total_supply += amount
    self.balances[to] += amount
    log Transfer(empty(address), to, amount)

@internal
def _transfer(frm: address, to: address, amount: uint256):
    assert to != empty(address)
    assert self.balances[frm] >= amount
    self.balances[frm] -= amount
    self.balances[to] += amount
    log Transfer(frm, to, amount)

@external
def transfer(to: address, amount: uint256) -> bool:
    assert not self.paused
    self._transfer(msg.sender, to, amount)
    return True

@external
def approve(spender: address, amount: uint256) -> bool:
    self.allowances[msg.sender][spender] = amount
    log Approval(msg.sender, spender, amount)
    return True

@external
def transferFrom(frm: address, to: address, amount: uint256) -> bool:
    assert not self.paused
    allowed: uint256 = self.allowances[frm][msg.sender]
    assert allowed >= amount
    self.allowances[frm][msg.sender] = allowed - amount
    self._transfer(frm, to, amount)
    return True

@view
@external
def balanceOf(a: address) -> uint256:
    return self.balances[a]

@internal
def _compute_reward(anomaly_bps: uint256, novelty_bps: uint256, severity: uint256) -> uint256:
    assert anomaly_bps <= MAX_BPS
    assert novelty_bps <= MAX_BPS
    assert 1 <= severity and severity <= 5
    base: uint256 = 10 ** 18  # 1 token unit
    return base * (anomaly_bps + novelty_bps) * severity / (MAX_BPS * 5)

@external
def submit_data_block(block_hash: bytes32, anomaly_bps: uint256, novelty_bps: uint256, severity: uint256, recipient: address) -> uint256:
    self._only_minter()
    assert not self.paused
    assert recipient != empty(address)
    assert not self.used_block[block_hash], "dup"
    self.used_block[block_hash] = True

    reward: uint256 = self._compute_reward(anomaly_bps, novelty_bps, severity)
    t_fee: uint256 = reward * self.treasury_bps / MAX_BPS
    net: uint256 = reward - t_fee

    if t_fee > 0:
        self._mint(self.treasury, t_fee)
    self._mint(recipient, net)

    log DataBlockSubmitted(block_hash, anomaly_bps, novelty_bps, severity, reward, recipient)
    return reward
""")

In [32]:
def compile_vyper_verbose(src: str):
    (ART / "SentinelDataToken.vy").write_text(src, encoding="utf-8")
    try:
        import vyper  
    except ModuleNotFoundError:
        _pip_install(["vyper"])
        import vyper  
    try:
        import vyper
        out = vyper.compile_code(src, output_formats=["abi", "bytecode"])
        abi, bytecode = out["abi"], out["bytecode"]
        if isinstance(bytecode, (bytes, bytearray)):
            bytecode = "0x" + bytecode.hex()
        print("[vyper] compiled via Python API")
        return {"abi": abi, "bytecode": bytecode}
    except Exception as e:
        print("[vyper] Python API failed:", e)
        exe = shutil.which("vyper")
        cmd = [exe] if exe else [sys.executable, "-m", "vyper"]
        def run(args):
            r = subprocess.run(cmd + args, capture_output=True, text=True)
            if r.returncode != 0:
                raise RuntimeError(r.stderr.strip() or r.stdout.strip() or "vyper CLI error")
            return r.stdout.strip()
        abi_json = run(["-f", "abi", str(ART / "SentinelDataToken.vy")])
        byte_hex = run(["-f", "bytecode", str(ART / "SentinelDataToken.vy")])
        if not byte_hex.startswith("0x"): byte_hex = "0x" + byte_hex
        print("[vyper] compiled via CLI")
        return {"abi": json.loads(abi_json), "bytecode": byte_hex}

In [33]:
compiled = compile_vyper_verbose(vy_source)
(ART / "SentinelDataToken.abi.json").write_text(json.dumps(compiled["abi"], indent=2))
(ART / "SentinelDataToken.bin").write_text(compiled["bytecode"])
print("[artifacts] wrote ABI & bytecode")

[vyper] compiled via Python API
[artifacts] wrote ABI & bytecode


In [34]:
try:
    from web3 import Web3
except ModuleNotFoundError:
    _pip_install(["web3"])
    from web3 import Web3

In [35]:
def get_web3_and_signer():
    rpc = os.getenv("WEB3_RPC_URL", "http://127.0.0.1:8545")
    w3 = Web3(Web3.HTTPProvider(rpc))
    if w3.is_connected():
        try:
            from eth_account import Account
        except ModuleNotFoundError:
            _pip_install(["eth-account"])
            from eth_account import Account
        pk = os.getenv("PRIVATE_KEY", "").strip()
        if pk:
            acct = Account.from_key(pk)
            return w3, acct.address, acct, "http+pk"
        try:
            sender = w3.eth.accounts[0]
            return w3, sender, None, "http+unlocked"
        except Exception:
            raise SystemExit(
                "HTTP provider reachable but no signer available.\n"
                "Set PRIVATE_KEY for hosted RPC, or unlock an account on the local node."
            )
    try:
        from web3.providers.eth_tester import EthereumTesterProvider
        from eth_tester import EthereumTester
    except ModuleNotFoundError:
        _pip_install(["eth-tester", "py-evm"])
        from web3.providers.eth_tester import EthereumTesterProvider
        from eth_tester import EthereumTester
    w3 = Web3(EthereumTesterProvider(EthereumTester()))
    sender = w3.eth.accounts[0]
    return w3, sender, None, "tester"

In [36]:
w3, sender, acct, mode = get_web3_and_signer()
print(f"[rpc] connected via {mode}, chainId={w3.eth.chain_id}, sender={sender}")

Note: you may need to restart the kernel to use updated packages.
[rpc] connected via tester, chainId=131277322940537, sender=0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf


In [37]:
abi = compiled["abi"]; bytecode = compiled["bytecode"]
Token = w3.eth.contract(abi=abi, bytecode=bytecode)
NAME, SYMB = "SentinelData", "SDT"
TREASURY, MINTER = sender, sender
TREASURY_BPS = 500

In [38]:
def deploy():
    if acct is None:
        tx_hash = Token.constructor(NAME, SYMB, TREASURY, MINTER, TREASURY_BPS).transact({"from": sender})
        rcpt = w3.eth.wait_for_transaction_receipt(tx_hash)
    else:
        tx = Token.constructor(NAME, SYMB, TREASURY, MINTER, TREASURY_BPS).build_transaction({
            "from": sender,
            "nonce": w3.eth.get_transaction_count(sender),
            "gas": 2_500_000,
            "gasPrice": w3.eth.gas_price,
            "chainId": w3.eth.chain_id,
        })
        signed = w3.eth.account.sign_transaction(tx, acct.key)
        tx_hash = w3.eth.send_raw_transaction(signed.rawTransaction)
        rcpt = w3.eth.wait_for_transaction_receipt(tx_hash)
    return rcpt

In [39]:
rcpt = deploy()
addr = rcpt.contractAddress
print(f"[deploy] SentinelDataToken @ {addr}")
token = w3.eth.contract(address=addr, abi=abi)

[deploy] SentinelDataToken @ 0xF2E246BB76DF876Cef8b38ae84130F4F55De395b


In [40]:
print("name =", token.functions.name().call())
print("symbol =", token.functions.symbol().call())
print("decimals =", token.functions.decimals().call())
print("treasury_bps =", token.functions.treasury_bps().call())
print("owner =", token.functions.owner().call())
print("minter =", token.functions.minter().call())
print("totalSupply =", token.functions.total_supply().call())

name = SentinelData
symbol = SDT
decimals = 18
treasury_bps = 500
owner = 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf
minter = 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf
totalSupply = 0


In [41]:
payload = {"ts": int(time.time()), "pool": "USDC/USDT", "anom": 820, "novel": 430, "sev": 3}
blob = json.dumps(payload, sort_keys=True)
blk_hash = w3.keccak(text=blob)  
anomaly_bps = 820
novelty_bps = 430
severity    = 3
recipient   = sender
bal0 = token.functions.balanceOf(recipient).call()
ts0  = token.functions.total_supply().call()

In [42]:
def submit():
    fn = token.functions.submit_data_block(blk_hash, anomaly_bps, novelty_bps, severity, recipient)
    if acct is None:
        tx_hash = fn.transact({"from": sender})
        rcpt = w3.eth.wait_for_transaction_receipt(tx_hash)
    else:
        tx = fn.build_transaction({
            "from": sender,
            "nonce": w3.eth.get_transaction_count(sender),
            "gas": 250000,
            "gasPrice": w3.eth.gas_price,
            "chainId": w3.eth.chain_id,
        })
        signed = w3.eth.account.sign_transaction(tx, acct.key)
        tx_hash = w3.eth.send_raw_transaction(signed.rawTransaction)
        rcpt = w3.eth.wait_for_transaction_receipt(tx_hash)
    return rcpt

In [43]:
rcpt_submit = submit()
bal1 = token.functions.balanceOf(recipient).call()
ts1  = token.functions.total_supply().call()
treasury = token.functions.treasury().call()
tbal  = token.functions.balanceOf(treasury).call()

In [44]:
print(f"[submit] status={rcpt_submit.status} gasUsed={rcpt_submit.gasUsed}")
print("recipient balance delta =", bal1 - bal0)
print("totalSupply delta =", ts1 - ts0)
print("treasury =", treasury, "treasury balance =", tbal)

[submit] status=1 gasUsed=104446
recipient balance delta = 75000000000000000
totalSupply delta = 75000000000000000
treasury = 0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf treasury balance = 75000000000000000


In [45]:
try:
    evt = token.events.DataBlockSubmitted().process_receipt(rcpt_submit)
    if evt:
        e0 = evt[0]["args"]
        print("[event] DataBlockSubmitted:",
              dict(block_hash=e0["block_hash"].hex(),
                   anomaly_bps=e0["anomaly_bps"],
                   novelty_bps=e0["novelty_bps"],
                   severity=e0["severity"],
                   reward=e0["reward"],
                   to=e0["to"]))
except Exception as _e:
    print("[event] decode skipped:", _e)

[event] DataBlockSubmitted: {'block_hash': 'a0ddeab799e3173ce5f63f228b0be34e188a8e90c7dc48d88debd2043d2fbdea', 'anomaly_bps': 820, 'novelty_bps': 430, 'severity': 3, 'reward': 75000000000000000, 'to': '0x7E5F4552091A69125d5DfCb7b8C2659029395Bdf'}


  return callback(fn(*args, **kwargs))
  return callback(fn(*args, **kwargs))


In [46]:
print("\nAll done")


All done
