# Block2SimMC Demonstration

In [1]:
from PSSimPy import Account, Transaction
from PSSimPy.queues import PriorityQueue # key simulator class that we are checking in this demo
from typing import Iterator, Tuple
import pandas as pd
from IPython.display import display

from modules import ScopeDefinition, PropertyExtraction, AdapterBounding, ModelBuilder, ModelChecker

# Scope Definition

In [2]:
contract_path = 'smart_contract/priority_queue.sol'
scope_definition = ScopeDefinition(contract_path)
scope = scope_definition.extract_scope()

# Property Extraction

In [3]:
prop_extractor = PropertyExtraction(scope, "config.properties")
properties = prop_extractor.extract_properties()

In [4]:
print(properties)

{'safety': [('NoDoubleDequeue', AG (not dequeue or AX not dequeue))], 'liveness': [('EnqueueLeadsToDequeue', AG (not enqueue or EX dequeue))]}


# Adapter & Bounding

In [5]:
# (a) accounts setup
accounts = [ Account(id='acc1', owner=None, balance=5),
             Account(id='acc2', owner=None, balance=10) ]
account_by_id    = {a.id: a for a in accounts}

# (b) snapshot/load
def pq_snapshot(pq: PriorityQueue):
    """
    Return a fully hashable snapshot of:
      - the queue as a tuple of (sender_id, amount, priority, day, time, period)
      - the account balances
    """
    entries = [
        (
            txn.sender_account.id,
            txn.amount,
            txn.priority,
            txn.day,      # Transaction.day attribute
            txn.time,     # Transaction.time attribute
            period
        )
        for txn, period in pq.queue
    ]
    balances = tuple(acc.balance for acc in accounts)
    return (tuple(entries), balances)


def pq_load(pq: PriorityQueue, state):
    """
    Rebuild the queue and account balances from that snapshot.
    """
    entries, balances = state
    pq.queue.clear()

    for sid, amt, prio, day, time, period in entries:
        txn = Transaction(
            sender_account    = account_by_id[sid],
            recipient_account = None,
            amount            = amt,
            priority          = prio,
            day               = day,
            time              = time
        )
        pq.queue.add((txn, period))

    # restore balances
    for acc, bal in zip(accounts, balances):
        acc.balance = bal

# (c) define a bounds generator for enqueue
def enqueue_args() -> Iterator[Tuple]:
    for sid in account_by_id:
        for amt in (1, 2):
            for prio in (0, 1):
                for day in (1, 2):
                    for time in ("08:00", "09:00"):
                        # build txn with amount as positional arg
                        txn = Transaction(
                            account_by_id[sid],   # sender_account
                            None,         # recipient_account
                            amt,          # amount
                            prio,         # priority
                            day=day,
                            time=time
                        )
                        yield (txn,)

# (d) method_bounds
MAX_LEN = 1
method_bounds = {
    "enqueue": {
        "args_generator": enqueue_args,
        "label":          "enqueue",
        "guard":          lambda inst, args: len(inst.queue) < MAX_LEN
    },
    "begin_dequeueing": {
        "args_generator": lambda: [()],
        "label":          "dequeue",
        "guard":          lambda inst,_: len(inst.queue) > 0
    }
}


# (e) build and test
adapter = AdapterBounding(
    impl_cls=PriorityQueue,
    snapshot_fn=pq_snapshot,
    load_fn=pq_load,
    method_bounds=method_bounds
)

init = list(adapter.initial_states())
print("INIT:", init)

succ = list(adapter.successors(init[0]))
print("SUCCESSORS:", succ[:5])


INIT: [State(data=((), (5, 10)))]
SUCCESSORS: [(State(data=((('acc1', 1, 0, 1, '08:00', 0),), (5, 10))), {'enqueue'}), (State(data=((('acc1', 1, 0, 1, '09:00', 0),), (5, 10))), {'enqueue'}), (State(data=((('acc1', 1, 0, 2, '08:00', 0),), (5, 10))), {'enqueue'}), (State(data=((('acc1', 1, 0, 2, '09:00', 0),), (5, 10))), {'enqueue'}), (State(data=((('acc1', 1, 1, 1, '08:00', 0),), (5, 10))), {'enqueue'})]


In [6]:
# Sanity check to make sure there is no state explosion
init    = next(adapter.initial_states())
succ0   = list(adapter.successors(init))
print("level-1 transitions:", len(succ0))    # should still be 32
succ1   = sum(len(list(adapter.successors(s))) for s, _ in succ0)
print("level-2 transitions:", succ1)         # now 32 dequeues, 0 enqueues

level-1 transitions: 32
level-2 transitions: 32


# Model Builder

In [7]:
builder = ModelBuilder([adapter])
model   = builder.build() 

print(f"States: {len(model.states)}")
print(f"Transitions: {sum(len(v) for v in model.transitions.values())}")

States: 33
Transitions: 64


# Model Checker

In [8]:
checker = ModelChecker(model)
results = checker.check(properties)

# Feedback Loop

Here, we simply feedback which properties extracted from the smart contract are adequately satisfied by the corresponding Python class.

More sophisticated mechanisms could be built into this module such as identifying specific counterexamples that may be the cause of the break for more precise feedback.

In [9]:
df = pd.DataFrame(list(results.items()), columns=['Property', 'Pass?'])
df['Pass?'] = df['Pass?'].map({True: '✅', False: '❌'})
display(df)

Unnamed: 0,Property,Pass?
0,NoDoubleDequeue,✅
1,EnqueueLeadsToDequeue,✅
