Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion internal/evmreader/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ func (r *Service) readAndStoreInputs(

epochLength := app.application.EpochLength
if epochLength == 0 {
_ = r.setApplicationInoperable(ctx, app.application, "Application has epoch length of zero")
// setApplicationInoperable always returns non-nil (the reason text itself).
// The DB error case is already logged inside setApplicationState.
// On DB success the app is marked inoperable and won't reappear next tick.
// On DB failure the app reappears as Enabled next tick, retrying this path.
_ = r.setApplicationInoperable(ctx, app.application,
"Application has epoch length of zero")
continue
}

Expand Down
4 changes: 4 additions & 0 deletions internal/evmreader/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (r *Service) readAndUpdateOutputs(
}

if !bytes.Equal(output.RawData, event.Output) {
// setApplicationInoperable always returns non-nil (the reason text itself).
// The DB error case is already logged inside setApplicationState.
// On DB success the app is marked inoperable and won't reappear next tick.
// On DB failure the app reappears as Enabled next tick, retrying this path.
_ = r.setApplicationInoperable(ctx, app.application,
"Output mismatch. Application is in an invalid state. Output Index %d, raw data %s != event data %s",
output.Index,
Expand Down
5 changes: 4 additions & 1 deletion internal/repository/postgres/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,10 @@ func updateEpochClaim(
).
WHERE(
table.Epoch.ApplicationID.EQ(postgres.Int64(e.ApplicationID)).
AND(table.Epoch.Index.EQ(uint64Expr(e.Index))),
AND(table.Epoch.Index.EQ(uint64Expr(e.Index))).
AND(table.Epoch.Status.EQ(
postgres.NewEnumValue(model.EpochStatus_InputsProcessed.String()),
)),
)

sqlStr, args := updStmt.Sql()
Expand Down
34 changes: 29 additions & 5 deletions internal/repository/postgres/epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,38 @@ func (r *PostgresRepository) CreateEpochsAndInputs(
whereClause,
)

// Guard: only update epoch fields when the existing row is still OPEN.
// Once an epoch is sealed (CLOSED) or beyond, its status, block range,
// input bounds, and tournament address are finalized and must not be
// overwritten by crash-recovery re-processing.
isOpen := table.Epoch.Status.EQ(
postgres.NewEnumValue(model.EpochStatus_Open.String()),
)

sqlStr, args := epochInsertStmt.QUERY(epochSelectQuery).
ON_CONFLICT(table.Epoch.ApplicationID, table.Epoch.Index).
DO_UPDATE(postgres.SET(
table.Epoch.Status.SET(postgres.NewEnumValue(epoch.Status.String())),
table.Epoch.LastBlock.SET(uint64Expr(epoch.LastBlock)),
table.Epoch.InputIndexUpperBound.SET(uint64Expr(epoch.InputIndexUpperBound)),
table.Epoch.TournamentAddress.SET(tournamentAddress),
)).Sql() // FIXME on conflict
table.Epoch.Status.SET(postgres.StringExp(
postgres.CASE().
WHEN(isOpen).THEN(table.Epoch.EXCLUDED.Status).
ELSE(table.Epoch.Status),
)),
table.Epoch.LastBlock.SET(postgres.FloatExp(
postgres.CASE().
WHEN(isOpen).THEN(table.Epoch.EXCLUDED.LastBlock).
ELSE(table.Epoch.LastBlock),
)),
table.Epoch.InputIndexUpperBound.SET(postgres.FloatExp(
postgres.CASE().
WHEN(isOpen).THEN(table.Epoch.EXCLUDED.InputIndexUpperBound).
ELSE(table.Epoch.InputIndexUpperBound),
)),
table.Epoch.TournamentAddress.SET(postgres.ByteaExp(
postgres.CASE().
WHEN(isOpen).THEN(table.Epoch.EXCLUDED.TournamentAddress).
ELSE(table.Epoch.TournamentAddress),
)),
)).Sql()
_, err = tx.Exec(ctx, sqlStr, args...)

if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,15 @@ ALTER TABLE "tournaments" DROP CONSTRAINT "tournaments_parent_match_fkey";

DROP TRIGGER IF EXISTS "matches_set_updated_at" ON "matches";
DROP INDEX IF EXISTS "matches_unique_pair_idx";
DROP INDEX IF EXISTS "matches_app_epoch_tournament_idx";
DROP TABLE IF EXISTS "matches";

DROP TRIGGER IF EXISTS "commitments_set_updated_at" ON "commitments";
DROP INDEX IF EXISTS "commitments_final_state_idx";
DROP INDEX IF EXISTS "commitments_app_epoch_tournament_idx";
DROP TABLE IF EXISTS "commitments";

DROP TRIGGER IF EXISTS "tournaments_set_updated_at" ON "tournaments";
DROP INDEX IF EXISTS "tournaments_parent_match_nonroot_idx";
DROP INDEX IF EXISTS "unique_root_per_epoch_idx";
DROP INDEX IF EXISTS "tournaments_epoch_idx";
DROP TABLE IF EXISTS "tournaments";

DROP TRIGGER IF EXISTS "node_config_set_updated_at" ON "node_config";
Expand All @@ -45,11 +42,14 @@ DROP INDEX IF EXISTS "input_status_idx";
DROP INDEX IF EXISTS "input_block_number_idx";
DROP TABLE IF EXISTS "input";

DROP TRIGGER IF EXISTS "epoch_status_transition_check" ON "epoch";
DROP TRIGGER IF EXISTS "epoch_set_updated_at" ON "epoch";
DROP INDEX IF EXISTS "epoch_status_idx";
DROP INDEX IF EXISTS "epoch_last_block_idx";
DROP TABLE IF EXISTS "epoch";

DROP FUNCTION IF EXISTS "enforce_epoch_status_transition";

DROP TRIGGER IF EXISTS "execution_parameters_set_updated_at" ON "execution_parameters";
DROP TABLE IF EXISTS "execution_parameters";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,88 @@ CREATE INDEX "epoch_status_idx" ON "epoch"("application_id", "status");
CREATE TRIGGER "epoch_set_updated_at" BEFORE UPDATE ON "epoch"
FOR EACH ROW EXECUTE FUNCTION update_updated_at_column();

-- Enforce valid epoch status transitions.
-- The state machine is:
-- OPEN → CLOSED → INPUTS_PROCESSED → CLAIM_COMPUTED
-- CLAIM_COMPUTED → CLAIM_SUBMITTED → CLAIM_ACCEPTED
-- CLAIM_COMPUTED → CLAIM_ACCEPTED (PRT skips SUBMITTED; also valid when
-- syncing from scratch and the claim was
-- already accepted, or in reader-only mode
-- with tx submission disabled)
-- CLAIM_COMPUTED → CLAIM_REJECTED (claim rejected on-chain before the node
-- submits, e.g. a conflicting claim was
-- already accepted)
-- CLAIM_SUBMITTED → CLAIM_REJECTED
-- Any other transition (including backwards) is rejected.
-- Same-status updates are allowed (idempotent no-ops).
--
-- When transitioning to CLAIM_COMPUTED, the trigger also verifies that
-- required proof fields are populated:
-- All apps: machine_hash, outputs_merkle_root, outputs_merkle_proof
-- PRT (DaveConsensus): additionally commitment, commitment_proof
CREATE FUNCTION enforce_epoch_status_transition() RETURNS trigger AS $$
DECLARE
valid_transitions text[][] := ARRAY[
ARRAY['OPEN', 'CLOSED'],
ARRAY['CLOSED', 'INPUTS_PROCESSED'],
ARRAY['INPUTS_PROCESSED', 'CLAIM_COMPUTED'],
ARRAY['CLAIM_COMPUTED', 'CLAIM_SUBMITTED'],
ARRAY['CLAIM_COMPUTED', 'CLAIM_ACCEPTED'],
ARRAY['CLAIM_COMPUTED', 'CLAIM_REJECTED'],
ARRAY['CLAIM_SUBMITTED', 'CLAIM_ACCEPTED'],
ARRAY['CLAIM_SUBMITTED', 'CLAIM_REJECTED']
];
is_valid boolean := false;
app_consensus text;
BEGIN
IF OLD.status = NEW.status THEN
RETURN NEW;
END IF;
FOR i IN 1..array_length(valid_transitions, 1) LOOP
IF OLD.status::text = valid_transitions[i][1]
AND NEW.status::text = valid_transitions[i][2] THEN
is_valid := true;
EXIT;
END IF;
END LOOP;
IF NOT is_valid THEN
RAISE EXCEPTION 'invalid epoch status transition: % -> %',
OLD.status, NEW.status;
END IF;

-- Enforce required fields when entering CLAIM_COMPUTED.
IF NEW.status::text = 'CLAIM_COMPUTED' THEN
IF NEW.machine_hash IS NULL
OR NEW.outputs_merkle_root IS NULL
OR NEW.outputs_merkle_proof IS NULL THEN
RAISE EXCEPTION
'CLAIM_COMPUTED requires machine_hash, outputs_merkle_root, '
'and outputs_merkle_proof to be non-null';
END IF;

SELECT a.consensus_type::text INTO app_consensus
FROM application a
WHERE a.id = NEW.application_id;

IF app_consensus = 'PRT' THEN
IF NEW.commitment IS NULL
OR NEW.commitment_proof IS NULL THEN
RAISE EXCEPTION
'CLAIM_COMPUTED for PRT apps requires commitment '
'and commitment_proof to be non-null';
END IF;
END IF;
END IF;

RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER "epoch_status_transition_check"
BEFORE UPDATE OF "status" ON "epoch"
FOR EACH ROW
EXECUTE FUNCTION enforce_epoch_status_transition();

CREATE TABLE "input"
(
"epoch_application_id" int4 NOT NULL,
Expand Down Expand Up @@ -292,9 +374,6 @@ CREATE TABLE "tournaments"
CONSTRAINT "tournaments_max_level_gte_level_check" CHECK ("max_level" >= "level")
);

CREATE INDEX "tournaments_epoch_idx"
ON "tournaments"("application_id","epoch_index");

CREATE UNIQUE INDEX "unique_root_per_epoch_idx"
ON "tournaments"("application_id","epoch_index")
WHERE "level" = 0;
Expand Down Expand Up @@ -327,9 +406,6 @@ CREATE TABLE "commitments"
ON DELETE CASCADE
);

CREATE INDEX "commitments_app_epoch_tournament_idx"
ON "commitments"("application_id","epoch_index","tournament_address");

CREATE INDEX "commitments_final_state_idx"
ON "commitments"("final_state_hash");

Expand Down Expand Up @@ -365,17 +441,14 @@ CREATE TABLE "matches"
CONSTRAINT "matches_one_commitment_fkey"
FOREIGN KEY ("application_id","epoch_index","tournament_address","commitment_one")
REFERENCES "commitments"("application_id","epoch_index","tournament_address","commitment")
ON DELETE RESTRICT,
ON DELETE CASCADE,

CONSTRAINT "matches_two_commitment_fkey"
FOREIGN KEY ("application_id","epoch_index","tournament_address","commitment_two")
REFERENCES "commitments"("application_id","epoch_index","tournament_address","commitment")
ON DELETE RESTRICT
ON DELETE CASCADE
);

CREATE INDEX "matches_app_epoch_tournament_idx"
ON "matches"("application_id","epoch_index","tournament_address");

CREATE UNIQUE INDEX "matches_unique_pair_idx"
ON "matches"("application_id","epoch_index","tournament_address","commitment_one","commitment_two");

Expand Down
114 changes: 114 additions & 0 deletions internal/repository/repotest/builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,120 @@ type SeedResult struct {
Input *Input
}

// AdvanceEpochStatus transitions an epoch through the valid state machine to
// the target status, finding the shortest path through the transition graph.
// The graph mirrors the SQL trigger enforce_epoch_status_transition:
//
// OPEN → CLOSED → INPUTS_PROCESSED → CLAIM_COMPUTED
// CLAIM_COMPUTED → CLAIM_SUBMITTED → CLAIM_ACCEPTED
// CLAIM_COMPUTED → CLAIM_ACCEPTED (PRT, sync catch-up, or reader-only mode)
// CLAIM_COMPUTED → CLAIM_REJECTED (rejected on-chain before node submits)
// CLAIM_SUBMITTED → CLAIM_REJECTED
func AdvanceEpochStatus(
ctx context.Context, t *testing.T,
repo repository.Repository,
nameOrAddress string,
epoch *Epoch,
target EpochStatus,
) {
t.Helper()

// Adjacency list mirrors the SQL trigger's valid transitions.
next := map[EpochStatus][]EpochStatus{
EpochStatus_Open: {EpochStatus_Closed},
EpochStatus_Closed: {EpochStatus_InputsProcessed},
EpochStatus_InputsProcessed: {EpochStatus_ClaimComputed},
EpochStatus_ClaimComputed: {EpochStatus_ClaimSubmitted, EpochStatus_ClaimAccepted, EpochStatus_ClaimRejected},
EpochStatus_ClaimSubmitted: {EpochStatus_ClaimAccepted, EpochStatus_ClaimRejected},
}

// BFS to find shortest valid path.
type step struct {
status EpochStatus
path []EpochStatus
}
queue := []step{{status: epoch.Status, path: nil}}
visited := map[EpochStatus]bool{epoch.Status: true}

var path []EpochStatus
for len(queue) > 0 {
cur := queue[0]
queue = queue[1:]
if cur.status == target {
path = cur.path
break
}
for _, n := range next[cur.status] {
if !visited[n] {
visited[n] = true
p := make([]EpochStatus, len(cur.path)+1)
copy(p, cur.path)
p[len(cur.path)] = n
queue = append(queue, step{n, p})
}
}
}
if path == nil {
t.Fatalf("AdvanceEpochStatus: no valid path from %s to %s",
epoch.Status, target)
}

for _, s := range path {
// The DB trigger requires proof fields to be non-null when
// entering CLAIM_COMPUTED. Populate them with dummy values
// so tests that only care about status transitions don't need
// to set up proofs manually.
if s == EpochStatus_ClaimComputed {
setDummyProofFields(ctx, t, repo, nameOrAddress, epoch)
// For PRT apps StoreClaimAndProofs already set the status
// to CLAIM_COMPUTED, so skip the redundant UpdateEpochStatus.
app, err := repo.GetApplication(ctx, nameOrAddress)
require.NoError(t, err)
if app.IsDaveConsensus() {
epoch.Status = s
continue
}
}
epoch.Status = s
err := repo.UpdateEpochStatus(ctx, nameOrAddress, epoch)
require.NoError(t, err)
}
}

// setDummyProofFields populates the proof fields required by the DB trigger
// for the INPUTS_PROCESSED → CLAIM_COMPUTED transition.
// For all apps: machine_hash, outputs_merkle_root, outputs_merkle_proof.
// For PRT apps: additionally commitment and commitment_proof (set via
// StoreClaimAndProofs which also transitions the status atomically).
func setDummyProofFields(
ctx context.Context, t *testing.T,
repo repository.Repository,
nameOrAddress string,
epoch *Epoch,
) {
t.Helper()

proof := &OutputsProof{
OutputsHash: UniqueHash(),
OutputsHashProof: [][32]byte{[32]byte(UniqueHash())},
MachineHash: UniqueHash(),
}
err := repo.UpdateEpochOutputsProof(
ctx, epoch.ApplicationID, epoch.Index, proof)
require.NoError(t, err)

app, err := repo.GetApplication(ctx, nameOrAddress)
require.NoError(t, err)
if app.IsDaveConsensus() {
commitHash := UniqueHash()
epoch.Commitment = &commitHash
epoch.CommitmentProof = []common.Hash{UniqueHash()}
epoch.Status = EpochStatus_ClaimComputed
err = repo.StoreClaimAndProofs(ctx, epoch, nil)
require.NoError(t, err)
}
}

// Seed creates and persists a minimal Application with one Epoch and one Input.
func Seed(ctx context.Context, t *testing.T, repo repository.Repository) *SeedResult {
t.Helper()
Expand Down
Loading
Loading