diff --git a/internal/evmreader/input.go b/internal/evmreader/input.go index 574ad2df5..bf3606c21 100644 --- a/internal/evmreader/input.go +++ b/internal/evmreader/input.go @@ -258,7 +258,12 @@ func (r *Service) readAndStoreInputs( epochLength := app.application.EpochLength if epochLength == 0 { - _ = r.setApplicationInoperable(ctx, app.application, "Application has epoch length of zero") + // setApplicationInoperable always returns non-nil (the reason text itself). + // The DB error case is already logged inside setApplicationState. + // On DB success the app is marked inoperable and won't reappear next tick. + // On DB failure the app reappears as Enabled next tick, retrying this path. + _ = r.setApplicationInoperable(ctx, app.application, + "Application has epoch length of zero") continue } diff --git a/internal/evmreader/output.go b/internal/evmreader/output.go index 86cb00269..951f32f3c 100644 --- a/internal/evmreader/output.go +++ b/internal/evmreader/output.go @@ -184,6 +184,10 @@ func (r *Service) readAndUpdateOutputs( } if !bytes.Equal(output.RawData, event.Output) { + // setApplicationInoperable always returns non-nil (the reason text itself). + // The DB error case is already logged inside setApplicationState. + // On DB success the app is marked inoperable and won't reappear next tick. + // On DB failure the app reappears as Enabled next tick, retrying this path. _ = r.setApplicationInoperable(ctx, app.application, "Output mismatch. Application is in an invalid state. Output Index %d, raw data %s != event data %s", output.Index, diff --git a/internal/repository/postgres/bulk.go b/internal/repository/postgres/bulk.go index 6bd47e2f0..cc232cf8b 100644 --- a/internal/repository/postgres/bulk.go +++ b/internal/repository/postgres/bulk.go @@ -416,7 +416,10 @@ func updateEpochClaim( ). WHERE( table.Epoch.ApplicationID.EQ(postgres.Int64(e.ApplicationID)). - AND(table.Epoch.Index.EQ(uint64Expr(e.Index))), + AND(table.Epoch.Index.EQ(uint64Expr(e.Index))). + AND(table.Epoch.Status.EQ( + postgres.NewEnumValue(model.EpochStatus_InputsProcessed.String()), + )), ) sqlStr, args := updStmt.Sql() diff --git a/internal/repository/postgres/epoch.go b/internal/repository/postgres/epoch.go index bd1222bc5..8098c3455 100644 --- a/internal/repository/postgres/epoch.go +++ b/internal/repository/postgres/epoch.go @@ -126,14 +126,38 @@ func (r *PostgresRepository) CreateEpochsAndInputs( whereClause, ) + // Guard: only update epoch fields when the existing row is still OPEN. + // Once an epoch is sealed (CLOSED) or beyond, its status, block range, + // input bounds, and tournament address are finalized and must not be + // overwritten by crash-recovery re-processing. + isOpen := table.Epoch.Status.EQ( + postgres.NewEnumValue(model.EpochStatus_Open.String()), + ) + sqlStr, args := epochInsertStmt.QUERY(epochSelectQuery). ON_CONFLICT(table.Epoch.ApplicationID, table.Epoch.Index). DO_UPDATE(postgres.SET( - table.Epoch.Status.SET(postgres.NewEnumValue(epoch.Status.String())), - table.Epoch.LastBlock.SET(uint64Expr(epoch.LastBlock)), - table.Epoch.InputIndexUpperBound.SET(uint64Expr(epoch.InputIndexUpperBound)), - table.Epoch.TournamentAddress.SET(tournamentAddress), - )).Sql() // FIXME on conflict + table.Epoch.Status.SET(postgres.StringExp( + postgres.CASE(). + WHEN(isOpen).THEN(table.Epoch.EXCLUDED.Status). + ELSE(table.Epoch.Status), + )), + table.Epoch.LastBlock.SET(postgres.FloatExp( + postgres.CASE(). + WHEN(isOpen).THEN(table.Epoch.EXCLUDED.LastBlock). + ELSE(table.Epoch.LastBlock), + )), + table.Epoch.InputIndexUpperBound.SET(postgres.FloatExp( + postgres.CASE(). + WHEN(isOpen).THEN(table.Epoch.EXCLUDED.InputIndexUpperBound). + ELSE(table.Epoch.InputIndexUpperBound), + )), + table.Epoch.TournamentAddress.SET(postgres.ByteaExp( + postgres.CASE(). + WHEN(isOpen).THEN(table.Epoch.EXCLUDED.TournamentAddress). + ELSE(table.Epoch.TournamentAddress), + )), + )).Sql() _, err = tx.Exec(ctx, sqlStr, args...) if err != nil { diff --git a/internal/repository/postgres/schema/migrations/000001_create_initial_schema.down.sql b/internal/repository/postgres/schema/migrations/000001_create_initial_schema.down.sql index c13aaafcd..d3573651b 100644 --- a/internal/repository/postgres/schema/migrations/000001_create_initial_schema.down.sql +++ b/internal/repository/postgres/schema/migrations/000001_create_initial_schema.down.sql @@ -14,18 +14,15 @@ ALTER TABLE "tournaments" DROP CONSTRAINT "tournaments_parent_match_fkey"; DROP TRIGGER IF EXISTS "matches_set_updated_at" ON "matches"; DROP INDEX IF EXISTS "matches_unique_pair_idx"; -DROP INDEX IF EXISTS "matches_app_epoch_tournament_idx"; DROP TABLE IF EXISTS "matches"; DROP TRIGGER IF EXISTS "commitments_set_updated_at" ON "commitments"; DROP INDEX IF EXISTS "commitments_final_state_idx"; -DROP INDEX IF EXISTS "commitments_app_epoch_tournament_idx"; DROP TABLE IF EXISTS "commitments"; DROP TRIGGER IF EXISTS "tournaments_set_updated_at" ON "tournaments"; DROP INDEX IF EXISTS "tournaments_parent_match_nonroot_idx"; DROP INDEX IF EXISTS "unique_root_per_epoch_idx"; -DROP INDEX IF EXISTS "tournaments_epoch_idx"; DROP TABLE IF EXISTS "tournaments"; DROP TRIGGER IF EXISTS "node_config_set_updated_at" ON "node_config"; @@ -45,11 +42,14 @@ DROP INDEX IF EXISTS "input_status_idx"; DROP INDEX IF EXISTS "input_block_number_idx"; DROP TABLE IF EXISTS "input"; +DROP TRIGGER IF EXISTS "epoch_status_transition_check" ON "epoch"; DROP TRIGGER IF EXISTS "epoch_set_updated_at" ON "epoch"; DROP INDEX IF EXISTS "epoch_status_idx"; DROP INDEX IF EXISTS "epoch_last_block_idx"; DROP TABLE IF EXISTS "epoch"; +DROP FUNCTION IF EXISTS "enforce_epoch_status_transition"; + DROP TRIGGER IF EXISTS "execution_parameters_set_updated_at" ON "execution_parameters"; DROP TABLE IF EXISTS "execution_parameters"; diff --git a/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql b/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql index b9c3d5cc7..6956e6354 100644 --- a/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql +++ b/internal/repository/postgres/schema/migrations/000001_create_initial_schema.up.sql @@ -182,6 +182,88 @@ CREATE INDEX "epoch_status_idx" ON "epoch"("application_id", "status"); CREATE TRIGGER "epoch_set_updated_at" BEFORE UPDATE ON "epoch" FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); +-- Enforce valid epoch status transitions. +-- The state machine is: +-- OPEN → CLOSED → INPUTS_PROCESSED → CLAIM_COMPUTED +-- CLAIM_COMPUTED → CLAIM_SUBMITTED → CLAIM_ACCEPTED +-- CLAIM_COMPUTED → CLAIM_ACCEPTED (PRT skips SUBMITTED; also valid when +-- syncing from scratch and the claim was +-- already accepted, or in reader-only mode +-- with tx submission disabled) +-- CLAIM_COMPUTED → CLAIM_REJECTED (claim rejected on-chain before the node +-- submits, e.g. a conflicting claim was +-- already accepted) +-- CLAIM_SUBMITTED → CLAIM_REJECTED +-- Any other transition (including backwards) is rejected. +-- Same-status updates are allowed (idempotent no-ops). +-- +-- When transitioning to CLAIM_COMPUTED, the trigger also verifies that +-- required proof fields are populated: +-- All apps: machine_hash, outputs_merkle_root, outputs_merkle_proof +-- PRT (DaveConsensus): additionally commitment, commitment_proof +CREATE FUNCTION enforce_epoch_status_transition() RETURNS trigger AS $$ +DECLARE + valid_transitions text[][] := ARRAY[ + ARRAY['OPEN', 'CLOSED'], + ARRAY['CLOSED', 'INPUTS_PROCESSED'], + ARRAY['INPUTS_PROCESSED', 'CLAIM_COMPUTED'], + ARRAY['CLAIM_COMPUTED', 'CLAIM_SUBMITTED'], + ARRAY['CLAIM_COMPUTED', 'CLAIM_ACCEPTED'], + ARRAY['CLAIM_COMPUTED', 'CLAIM_REJECTED'], + ARRAY['CLAIM_SUBMITTED', 'CLAIM_ACCEPTED'], + ARRAY['CLAIM_SUBMITTED', 'CLAIM_REJECTED'] + ]; + is_valid boolean := false; + app_consensus text; +BEGIN + IF OLD.status = NEW.status THEN + RETURN NEW; + END IF; + FOR i IN 1..array_length(valid_transitions, 1) LOOP + IF OLD.status::text = valid_transitions[i][1] + AND NEW.status::text = valid_transitions[i][2] THEN + is_valid := true; + EXIT; + END IF; + END LOOP; + IF NOT is_valid THEN + RAISE EXCEPTION 'invalid epoch status transition: % -> %', + OLD.status, NEW.status; + END IF; + + -- Enforce required fields when entering CLAIM_COMPUTED. + IF NEW.status::text = 'CLAIM_COMPUTED' THEN + IF NEW.machine_hash IS NULL + OR NEW.outputs_merkle_root IS NULL + OR NEW.outputs_merkle_proof IS NULL THEN + RAISE EXCEPTION + 'CLAIM_COMPUTED requires machine_hash, outputs_merkle_root, ' + 'and outputs_merkle_proof to be non-null'; + END IF; + + SELECT a.consensus_type::text INTO app_consensus + FROM application a + WHERE a.id = NEW.application_id; + + IF app_consensus = 'PRT' THEN + IF NEW.commitment IS NULL + OR NEW.commitment_proof IS NULL THEN + RAISE EXCEPTION + 'CLAIM_COMPUTED for PRT apps requires commitment ' + 'and commitment_proof to be non-null'; + END IF; + END IF; + END IF; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER "epoch_status_transition_check" + BEFORE UPDATE OF "status" ON "epoch" + FOR EACH ROW + EXECUTE FUNCTION enforce_epoch_status_transition(); + CREATE TABLE "input" ( "epoch_application_id" int4 NOT NULL, @@ -292,9 +374,6 @@ CREATE TABLE "tournaments" CONSTRAINT "tournaments_max_level_gte_level_check" CHECK ("max_level" >= "level") ); -CREATE INDEX "tournaments_epoch_idx" - ON "tournaments"("application_id","epoch_index"); - CREATE UNIQUE INDEX "unique_root_per_epoch_idx" ON "tournaments"("application_id","epoch_index") WHERE "level" = 0; @@ -327,9 +406,6 @@ CREATE TABLE "commitments" ON DELETE CASCADE ); -CREATE INDEX "commitments_app_epoch_tournament_idx" - ON "commitments"("application_id","epoch_index","tournament_address"); - CREATE INDEX "commitments_final_state_idx" ON "commitments"("final_state_hash"); @@ -365,17 +441,14 @@ CREATE TABLE "matches" CONSTRAINT "matches_one_commitment_fkey" FOREIGN KEY ("application_id","epoch_index","tournament_address","commitment_one") REFERENCES "commitments"("application_id","epoch_index","tournament_address","commitment") - ON DELETE RESTRICT, + ON DELETE CASCADE, CONSTRAINT "matches_two_commitment_fkey" FOREIGN KEY ("application_id","epoch_index","tournament_address","commitment_two") REFERENCES "commitments"("application_id","epoch_index","tournament_address","commitment") - ON DELETE RESTRICT + ON DELETE CASCADE ); -CREATE INDEX "matches_app_epoch_tournament_idx" - ON "matches"("application_id","epoch_index","tournament_address"); - CREATE UNIQUE INDEX "matches_unique_pair_idx" ON "matches"("application_id","epoch_index","tournament_address","commitment_one","commitment_two"); diff --git a/internal/repository/repotest/builders.go b/internal/repository/repotest/builders.go index e4fac2969..6779b10eb 100644 --- a/internal/repository/repotest/builders.go +++ b/internal/repository/repotest/builders.go @@ -461,6 +461,120 @@ type SeedResult struct { Input *Input } +// AdvanceEpochStatus transitions an epoch through the valid state machine to +// the target status, finding the shortest path through the transition graph. +// The graph mirrors the SQL trigger enforce_epoch_status_transition: +// +// OPEN → CLOSED → INPUTS_PROCESSED → CLAIM_COMPUTED +// CLAIM_COMPUTED → CLAIM_SUBMITTED → CLAIM_ACCEPTED +// CLAIM_COMPUTED → CLAIM_ACCEPTED (PRT, sync catch-up, or reader-only mode) +// CLAIM_COMPUTED → CLAIM_REJECTED (rejected on-chain before node submits) +// CLAIM_SUBMITTED → CLAIM_REJECTED +func AdvanceEpochStatus( + ctx context.Context, t *testing.T, + repo repository.Repository, + nameOrAddress string, + epoch *Epoch, + target EpochStatus, +) { + t.Helper() + + // Adjacency list mirrors the SQL trigger's valid transitions. + next := map[EpochStatus][]EpochStatus{ + EpochStatus_Open: {EpochStatus_Closed}, + EpochStatus_Closed: {EpochStatus_InputsProcessed}, + EpochStatus_InputsProcessed: {EpochStatus_ClaimComputed}, + EpochStatus_ClaimComputed: {EpochStatus_ClaimSubmitted, EpochStatus_ClaimAccepted, EpochStatus_ClaimRejected}, + EpochStatus_ClaimSubmitted: {EpochStatus_ClaimAccepted, EpochStatus_ClaimRejected}, + } + + // BFS to find shortest valid path. + type step struct { + status EpochStatus + path []EpochStatus + } + queue := []step{{status: epoch.Status, path: nil}} + visited := map[EpochStatus]bool{epoch.Status: true} + + var path []EpochStatus + for len(queue) > 0 { + cur := queue[0] + queue = queue[1:] + if cur.status == target { + path = cur.path + break + } + for _, n := range next[cur.status] { + if !visited[n] { + visited[n] = true + p := make([]EpochStatus, len(cur.path)+1) + copy(p, cur.path) + p[len(cur.path)] = n + queue = append(queue, step{n, p}) + } + } + } + if path == nil { + t.Fatalf("AdvanceEpochStatus: no valid path from %s to %s", + epoch.Status, target) + } + + for _, s := range path { + // The DB trigger requires proof fields to be non-null when + // entering CLAIM_COMPUTED. Populate them with dummy values + // so tests that only care about status transitions don't need + // to set up proofs manually. + if s == EpochStatus_ClaimComputed { + setDummyProofFields(ctx, t, repo, nameOrAddress, epoch) + // For PRT apps StoreClaimAndProofs already set the status + // to CLAIM_COMPUTED, so skip the redundant UpdateEpochStatus. + app, err := repo.GetApplication(ctx, nameOrAddress) + require.NoError(t, err) + if app.IsDaveConsensus() { + epoch.Status = s + continue + } + } + epoch.Status = s + err := repo.UpdateEpochStatus(ctx, nameOrAddress, epoch) + require.NoError(t, err) + } +} + +// setDummyProofFields populates the proof fields required by the DB trigger +// for the INPUTS_PROCESSED → CLAIM_COMPUTED transition. +// For all apps: machine_hash, outputs_merkle_root, outputs_merkle_proof. +// For PRT apps: additionally commitment and commitment_proof (set via +// StoreClaimAndProofs which also transitions the status atomically). +func setDummyProofFields( + ctx context.Context, t *testing.T, + repo repository.Repository, + nameOrAddress string, + epoch *Epoch, +) { + t.Helper() + + proof := &OutputsProof{ + OutputsHash: UniqueHash(), + OutputsHashProof: [][32]byte{[32]byte(UniqueHash())}, + MachineHash: UniqueHash(), + } + err := repo.UpdateEpochOutputsProof( + ctx, epoch.ApplicationID, epoch.Index, proof) + require.NoError(t, err) + + app, err := repo.GetApplication(ctx, nameOrAddress) + require.NoError(t, err) + if app.IsDaveConsensus() { + commitHash := UniqueHash() + epoch.Commitment = &commitHash + epoch.CommitmentProof = []common.Hash{UniqueHash()} + epoch.Status = EpochStatus_ClaimComputed + err = repo.StoreClaimAndProofs(ctx, epoch, nil) + require.NoError(t, err) + } +} + // Seed creates and persists a minimal Application with one Epoch and one Input. func Seed(ctx context.Context, t *testing.T, repo repository.Repository) *SeedResult { t.Helper() diff --git a/internal/repository/repotest/bulk_test_cases.go b/internal/repository/repotest/bulk_test_cases.go index 779fb28a6..05e57afdf 100644 --- a/internal/repository/repotest/bulk_test_cases.go +++ b/internal/repository/repotest/bulk_test_cases.go @@ -354,6 +354,10 @@ func (s *BulkOperationsSuite) TestStoreClaimAndProofs() { s.Run("StoresClaimAndOutputProofs", func() { seed := Seed(s.Ctx, s.T(), s.Repo) + // Advance epoch to INPUTS_PROCESSED so StoreClaimAndProofs can set CLAIM_COMPUTED + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + seed.App.IApplicationAddress.String(), seed.Epoch, EpochStatus_InputsProcessed) + // First store an advance result to create outputs machineHash := crypto.Keccak256Hash([]byte("machine")) outputData := []byte("output-for-claim") @@ -700,6 +704,12 @@ func (s *BulkOperationsSuite) TestStoreClaimAndProofsRollback() { s.Run("RollbackOnOutputProofUpdateFailure", func() { seed := Seed(s.Ctx, s.T(), s.Repo) + // Advance epoch to INPUTS_PROCESSED so updateEpochClaim can + // set CLAIM_COMPUTED (the trigger rejects other transitions). + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + seed.App.IApplicationAddress.String(), seed.Epoch, + EpochStatus_InputsProcessed) + // Store advance result to create one output (index 0) result := &AdvanceResult{ EpochIndex: 0, @@ -733,12 +743,12 @@ func (s *BulkOperationsSuite) TestStoreClaimAndProofsRollback() { s.Ctx, seed.Epoch, []*Output{nonExistentOutput}) s.Require().Error(err) - // Verify the epoch status was rolled back — still Closed + // Verify the epoch status was rolled back — still InputsProcessed gotEpoch, err := s.Repo.GetEpoch( s.Ctx, seed.App.IApplicationAddress.String(), 0) s.Require().NoError(err) - s.Equal(EpochStatus_Closed, gotEpoch.Status, - "epoch status should be rolled back to Closed") + s.Equal(EpochStatus_InputsProcessed, gotEpoch.Status, + "epoch status should be rolled back to InputsProcessed") s.Nil(gotEpoch.Commitment, "commitment should not have been persisted") diff --git a/internal/repository/repotest/claimer_test_cases.go b/internal/repository/repotest/claimer_test_cases.go index 6f616425e..cb108a639 100644 --- a/internal/repository/repotest/claimer_test_cases.go +++ b/internal/repository/repotest/claimer_test_cases.go @@ -33,9 +33,8 @@ func (s *ClaimerSuite) createAppWithClaimComputedEpoch() *Application { map[*Epoch][]*Input{epoch: {input}}, 10) s.Require().NoError(err) - epoch.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus(s.Ctx, app.IApplicationAddress.String(), epoch) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, EpochStatus_ClaimComputed) return app } @@ -140,9 +139,8 @@ func (s *ClaimerSuite) TestSelectSubmittedClaimPairsPerApp() { map[*Epoch][]*Input{epoch: {input}}, 10) s.Require().NoError(err) - epoch.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus(s.Ctx, app.IApplicationAddress.String(), epoch) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, EpochStatus_ClaimComputed) _, computed, apps, err := s.Repo.SelectSubmittedClaimPairsPerApp(s.Ctx) s.Require().NoError(err) @@ -163,9 +161,8 @@ func (s *ClaimerSuite) TestSelectSubmittedClaimPairsPerApp() { map[*Epoch][]*Input{epoch: {input}}, 10) s.Require().NoError(err) - epoch.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus(s.Ctx, app.IApplicationAddress.String(), epoch) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, EpochStatus_ClaimComputed) reason := "test disabled" err = s.Repo.UpdateApplicationState( @@ -288,9 +285,8 @@ func (s *ClaimerSuite) TestSelectAcceptedClaimPairsPerApp() { map[*Epoch][]*Input{epoch: {input}}, 10) s.Require().NoError(err) - epoch.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus(s.Ctx, app.IApplicationAddress.String(), epoch) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, EpochStatus_ClaimComputed) txHash := UniqueHash() err = s.Repo.UpdateEpochWithSubmittedClaim(s.Ctx, app.ID, 0, txHash) @@ -319,9 +315,8 @@ func (s *ClaimerSuite) TestSelectAcceptedClaimPairsPerApp() { map[*Epoch][]*Input{epoch: {input}}, 10) s.Require().NoError(err) - epoch.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus(s.Ctx, app.IApplicationAddress.String(), epoch) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, EpochStatus_ClaimComputed) txHash := UniqueHash() err = s.Repo.UpdateEpochWithSubmittedClaim(s.Ctx, app.ID, 0, txHash) @@ -362,10 +357,8 @@ func (s *ClaimerSuite) TestSelectAcceptedClaimPairsPerApp() { s.Require().NoError(err) // Move epoch 0 to ClaimAccepted - epoch0.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus( - s.Ctx, app.IApplicationAddress.String(), epoch0) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch0, EpochStatus_ClaimComputed) txHash0 := UniqueHash() err = s.Repo.UpdateEpochWithSubmittedClaim(s.Ctx, app.ID, 0, txHash0) @@ -375,10 +368,8 @@ func (s *ClaimerSuite) TestSelectAcceptedClaimPairsPerApp() { s.Require().NoError(err) // Move epoch 1 to ClaimSubmitted - epoch1.Status = EpochStatus_ClaimComputed - err = s.Repo.UpdateEpochStatus( - s.Ctx, app.IApplicationAddress.String(), epoch1) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch1, EpochStatus_ClaimComputed) txHash1 := UniqueHash() err = s.Repo.UpdateEpochWithSubmittedClaim(s.Ctx, app.ID, 1, txHash1) @@ -463,9 +454,8 @@ func (s *ClaimerSuite) TestUpdateEpochWithAcceptedClaim() { map[*Epoch][]*Input{epoch: {input}}, 10) s.Require().NoError(err) - epoch.Status = EpochStatus_ClaimSubmitted - err = s.Repo.UpdateEpochStatus(s.Ctx, app.IApplicationAddress.String(), epoch) - s.Require().NoError(err) + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, EpochStatus_ClaimSubmitted) err = s.Repo.UpdateEpochWithAcceptedClaim(s.Ctx, app.ID, 0) s.Require().NoError(err) diff --git a/internal/repository/repotest/epoch_test_cases.go b/internal/repository/repotest/epoch_test_cases.go index e3068512e..a54fabec8 100644 --- a/internal/repository/repotest/epoch_test_cases.go +++ b/internal/repository/repotest/epoch_test_cases.go @@ -723,3 +723,325 @@ func (s *EpochSuite) TestRepeatPreviousEpochOutputsProof() { s.Require().Error(err) }) } + +// TestUpsertPreservesNonOpenEpoch verifies the CASE/WHEN crash-recovery guard +// in CreateEpochsAndInputs: re-upserting an epoch that has advanced past OPEN +// must preserve the existing row's fields (status, block range, input bounds). +func (s *EpochSuite) TestUpsertPreservesNonOpenEpoch() { + s.Run("PreservesClosedEpochFields", func() { + app := NewApplicationBuilder().Create(s.Ctx, s.T(), s.Repo) + + // Create and persist an epoch as CLOSED with known field values. + epoch := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Closed). + WithBlocks(0, 99).WithInputBounds(0, 5).Build() + input := NewInputBuilder().WithIndex(0).WithBlockNumber(5).Build() + + err := s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{epoch: {input}}, 100) + s.Require().NoError(err) + + // Re-upsert the same epoch index with DIFFERENT field values. + // The guard should silently preserve the existing CLOSED row. + conflicting := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Closed). + WithBlocks(0, 200).WithInputBounds(0, 10).Build() + + err = s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{conflicting: {}}, 200) + s.Require().NoError(err) + + got, err := s.Repo.GetEpoch( + s.Ctx, app.IApplicationAddress.String(), 0) + s.Require().NoError(err) + s.Equal(EpochStatus_Closed, got.Status) + s.Equal(uint64(99), got.LastBlock, + "LastBlock should be preserved from original epoch") + s.Equal(uint64(5), got.InputIndexUpperBound, + "InputIndexUpperBound should be preserved from original epoch") + }) + + s.Run("PreservesInputsProcessedEpochFields", func() { + app := NewApplicationBuilder().Create(s.Ctx, s.T(), s.Repo) + + epoch := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Closed). + WithBlocks(0, 50).WithInputBounds(0, 3).Build() + input := NewInputBuilder().WithIndex(0).WithBlockNumber(5).Build() + + err := s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{epoch: {input}}, 51) + s.Require().NoError(err) + + // Advance past CLOSED so it is no longer OPEN. + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, + EpochStatus_InputsProcessed) + + // Re-upsert with different values. + conflicting := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Closed). + WithBlocks(0, 999).WithInputBounds(0, 99).Build() + + err = s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{conflicting: {}}, 999) + s.Require().NoError(err) + + got, err := s.Repo.GetEpoch( + s.Ctx, app.IApplicationAddress.String(), 0) + s.Require().NoError(err) + s.Equal(EpochStatus_InputsProcessed, got.Status, + "status should be preserved, not overwritten") + s.Equal(uint64(50), got.LastBlock, + "LastBlock should be preserved from original epoch") + s.Equal(uint64(3), got.InputIndexUpperBound, + "InputIndexUpperBound should be preserved from original epoch") + }) + + s.Run("AllowsUpdateOfOpenEpoch", func() { + app := NewApplicationBuilder().Create(s.Ctx, s.T(), s.Repo) + + // Create an OPEN epoch. + epoch := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Open). + WithBlocks(0, 9).WithInputBounds(0, 0).Build() + input := NewInputBuilder().WithIndex(0).WithBlockNumber(5).Build() + + err := s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{epoch: {input}}, 10) + s.Require().NoError(err) + + // Upsert with CLOSED status and new LastBlock. + updated := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Closed). + WithBlocks(0, 50).WithInputBounds(0, 3).Build() + + err = s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{updated: {}}, 50) + s.Require().NoError(err) + + got, err := s.Repo.GetEpoch( + s.Ctx, app.IApplicationAddress.String(), 0) + s.Require().NoError(err) + s.Equal(EpochStatus_Closed, got.Status, + "OPEN epoch should be updated to CLOSED") + s.Equal(uint64(50), got.LastBlock, + "LastBlock should be updated for OPEN epoch") + }) +} + +// TestEpochStatusTransitionTrigger verifies the database trigger that enforces +// valid epoch status transitions. +func (s *EpochSuite) TestEpochStatusTransitionTrigger() { + s.Run("RejectsSkippedTransition", func() { + seed := Seed(s.Ctx, s.T(), s.Repo) + + // CLOSED -> CLAIM_COMPUTED skips INPUTS_PROCESSED — must fail. + seed.Epoch.Status = EpochStatus_ClaimComputed + err := s.Repo.UpdateEpochStatus( + s.Ctx, seed.App.IApplicationAddress.String(), seed.Epoch) + s.Require().Error(err) + s.Contains(err.Error(), "invalid epoch status transition") + }) + + s.Run("RejectsBackwardsTransition", func() { + seed := Seed(s.Ctx, s.T(), s.Repo) + + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + seed.App.IApplicationAddress.String(), seed.Epoch, + EpochStatus_InputsProcessed) + + // INPUTS_PROCESSED -> CLOSED is backwards — must fail. + seed.Epoch.Status = EpochStatus_Closed + err := s.Repo.UpdateEpochStatus( + s.Ctx, seed.App.IApplicationAddress.String(), seed.Epoch) + s.Require().Error(err) + s.Contains(err.Error(), "invalid epoch status transition") + }) + + s.Run("RejectsOpenToInputsProcessed", func() { + app := NewApplicationBuilder().Create(s.Ctx, s.T(), s.Repo) + + epoch := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Open). + WithBlocks(0, 9).WithInputBounds(0, 0).Build() + input := NewInputBuilder().WithIndex(0).WithBlockNumber(5).Build() + + err := s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{epoch: {input}}, 10) + s.Require().NoError(err) + + // OPEN -> INPUTS_PROCESSED skips CLOSED — must fail. + epoch.Status = EpochStatus_InputsProcessed + err = s.Repo.UpdateEpochStatus( + s.Ctx, app.IApplicationAddress.String(), epoch) + s.Require().Error(err) + s.Contains(err.Error(), "invalid epoch status transition") + }) + + // CLAIM_COMPUTED -> CLAIM_ACCEPTED (skipping SUBMITTED) is valid for: + // PRT apps (no claim submission step), node syncing from scratch when + // the claim was already accepted, or reader-only mode with tx submission + // disabled. + s.Run("AllowsDirectAcceptance", func() { + app := NewApplicationBuilder(). + WithConsensus(Consensus_PRT). + Create(s.Ctx, s.T(), s.Repo) + + epoch := NewEpochBuilder(app.ID). + WithIndex(0). + WithStatus(EpochStatus_Closed). + WithBlocks(0, 9). + WithInputBounds(0, 0). + Build() + + input := NewInputBuilder(). + WithIndex(0). + WithBlockNumber(5). + Build() + + err := s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{epoch: {input}}, 10) + s.Require().NoError(err) + + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + app.IApplicationAddress.String(), epoch, + EpochStatus_ClaimComputed) + + // CLAIM_COMPUTED -> CLAIM_ACCEPTED skips SUBMITTED. + epoch.Status = EpochStatus_ClaimAccepted + err = s.Repo.UpdateEpochStatus( + s.Ctx, app.IApplicationAddress.String(), epoch) + s.Require().NoError(err) + + got, err := s.Repo.GetEpoch( + s.Ctx, app.IApplicationAddress.String(), 0) + s.Require().NoError(err) + s.Equal(EpochStatus_ClaimAccepted, got.Status) + }) + + // CLAIM_COMPUTED -> CLAIM_REJECTED is valid when a claim is rejected + // on-chain before the node submits it (e.g. a conflicting claim was + // already accepted). + s.Run("AllowsDirectRejection", func() { + seed := Seed(s.Ctx, s.T(), s.Repo) + + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + seed.App.IApplicationAddress.String(), seed.Epoch, + EpochStatus_ClaimComputed) + + // CLAIM_COMPUTED -> CLAIM_REJECTED skips SUBMITTED. + seed.Epoch.Status = EpochStatus_ClaimRejected + err := s.Repo.UpdateEpochStatus( + s.Ctx, seed.App.IApplicationAddress.String(), seed.Epoch) + s.Require().NoError(err) + + got, err := s.Repo.GetEpoch( + s.Ctx, seed.App.IApplicationAddress.String(), 0) + s.Require().NoError(err) + s.Equal(EpochStatus_ClaimRejected, got.Status) + }) + + s.Run("AllowsSameStatusUpdate", func() { + seed := Seed(s.Ctx, s.T(), s.Repo) + + // CLOSED -> CLOSED should be allowed (idempotent). + err := s.Repo.UpdateEpochStatus( + s.Ctx, seed.App.IApplicationAddress.String(), seed.Epoch) + s.Require().NoError(err) + }) + + // Verify the trigger rejects CLAIM_COMPUTED when proof fields are missing. + s.Run("RejectsClaimComputedWithoutProofFields", func() { + seed := Seed(s.Ctx, s.T(), s.Repo) + + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + seed.App.IApplicationAddress.String(), seed.Epoch, + EpochStatus_InputsProcessed) + + // Try INPUTS_PROCESSED -> CLAIM_COMPUTED without setting + // machine_hash, outputs_merkle_root, outputs_merkle_proof. + seed.Epoch.Status = EpochStatus_ClaimComputed + err := s.Repo.UpdateEpochStatus( + s.Ctx, seed.App.IApplicationAddress.String(), seed.Epoch) + s.Require().Error(err) + s.Contains(err.Error(), "CLAIM_COMPUTED requires") + }) + + // Verify CLAIM_COMPUTED succeeds when all required fields are present. + s.Run("AllowsClaimComputedWithProofFields", func() { + seed := Seed(s.Ctx, s.T(), s.Repo) + + AdvanceEpochStatus(s.Ctx, s.T(), s.Repo, + seed.App.IApplicationAddress.String(), seed.Epoch, + EpochStatus_InputsProcessed) + + // Set the required proof fields. + proof := &OutputsProof{ + OutputsHash: UniqueHash(), + OutputsHashProof: [][32]byte{[32]byte(UniqueHash())}, + MachineHash: UniqueHash(), + } + err := s.Repo.UpdateEpochOutputsProof( + s.Ctx, seed.App.ID, seed.Epoch.Index, proof) + s.Require().NoError(err) + + seed.Epoch.Status = EpochStatus_ClaimComputed + err = s.Repo.UpdateEpochStatus( + s.Ctx, seed.App.IApplicationAddress.String(), seed.Epoch) + s.Require().NoError(err) + + got, err := s.Repo.GetEpoch( + s.Ctx, seed.App.IApplicationAddress.String(), 0) + s.Require().NoError(err) + s.Equal(EpochStatus_ClaimComputed, got.Status) + }) + + // Verify PRT apps require commitment fields for CLAIM_COMPUTED. + s.Run("RejectsPRTClaimComputedWithoutCommitment", func() { + app := NewApplicationBuilder(). + WithConsensus(Consensus_PRT). + Create(s.Ctx, s.T(), s.Repo) + + epoch := NewEpochBuilder(app.ID). + WithIndex(0).WithStatus(EpochStatus_Closed). + WithBlocks(0, 9).WithInputBounds(0, 0).Build() + input := NewInputBuilder().WithIndex(0).WithBlockNumber(5).Build() + + err := s.Repo.CreateEpochsAndInputs( + s.Ctx, app.IApplicationAddress.String(), + map[*Epoch][]*Input{epoch: {input}}, 10) + s.Require().NoError(err) + + // Advance to INPUTS_PROCESSED. + epoch.Status = EpochStatus_InputsProcessed + err = s.Repo.UpdateEpochStatus( + s.Ctx, app.IApplicationAddress.String(), epoch) + s.Require().NoError(err) + + // Set base proof fields but NOT commitment. + proof := &OutputsProof{ + OutputsHash: UniqueHash(), + OutputsHashProof: [][32]byte{[32]byte(UniqueHash())}, + MachineHash: UniqueHash(), + } + err = s.Repo.UpdateEpochOutputsProof( + s.Ctx, app.ID, epoch.Index, proof) + s.Require().NoError(err) + + // INPUTS_PROCESSED -> CLAIM_COMPUTED without commitment — must fail. + epoch.Status = EpochStatus_ClaimComputed + err = s.Repo.UpdateEpochStatus( + s.Ctx, app.IApplicationAddress.String(), epoch) + s.Require().Error(err) + s.Contains(err.Error(), "PRT") + }) +} diff --git a/test/validator/validator_test.go b/test/validator/validator_test.go index 052c1a5ed..c20951174 100644 --- a/test/validator/validator_test.go +++ b/test/validator/validator_test.go @@ -165,16 +165,16 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsPreviousClaim() { _, err := s.repository.CreateApplication(s.ctx, app, false) s.Require().Nil(err) - // insert the first epoch with a claim + // insert the first epoch — will be advanced to CLAIM_COMPUTED + // via StoreClaimAndProofs after storing the advance result firstEpochClaim := pristineRootHash firstEpoch := model.Epoch{ - ApplicationID: 1, - Index: 0, - VirtualIndex: 0, - Status: model.EpochStatus_ClaimComputed, - OutputsMerkleRoot: &firstEpochClaim, - FirstBlock: 0, - LastBlock: 9, + ApplicationID: 1, + Index: 0, + VirtualIndex: 0, + Status: model.EpochStatus_Closed, + FirstBlock: 0, + LastBlock: 9, } // we add an input to the epoch because they must have at least one and @@ -213,6 +213,12 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsPreviousClaim() { err = s.repository.CreateEpochsAndInputs(s.ctx, app.IApplicationAddress.String(), epochInputMap, 20) s.Require().Nil(err) + // Advance first epoch to INPUTS_PROCESSED so StoreClaimAndProofs can + // transition it to CLAIM_COMPUTED. + firstEpoch.Status = model.EpochStatus_InputsProcessed + err = s.repository.UpdateEpochStatus(s.ctx, app.IApplicationAddress.String(), &firstEpoch) + s.Require().Nil(err) + // Store the input advance result machinehash1 := crypto.Keccak256Hash([]byte("machine-hash1")) advanceResult := model.AdvanceResult{ @@ -227,6 +233,7 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsPreviousClaim() { err = s.repository.StoreAdvanceResult(s.ctx, 1, &advanceResult) s.Require().Nil(err) + firstEpoch.OutputsMerkleRoot = &firstEpochClaim err = s.repository.StoreClaimAndProofs(s.ctx, &firstEpoch, []*model.Output{}) s.Require().Nil(err) @@ -371,7 +378,7 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsANewClaimAndProofs() ApplicationID: 1, Index: 0, VirtualIndex: 0, - Status: model.EpochStatus_ClaimComputed, + Status: model.EpochStatus_Closed, FirstBlock: 0, LastBlock: 9, } @@ -389,6 +396,12 @@ func (s *ValidatorRepositoryIntegrationSuite) TestItReturnsANewClaimAndProofs() err = s.repository.CreateEpochsAndInputs(s.ctx, app.IApplicationAddress.String(), epochInputMap, 10) s.Require().Nil(err) + // Advance first epoch to INPUTS_PROCESSED so StoreClaimAndProofs can + // transition it to CLAIM_COMPUTED. + firstEpoch.Status = model.EpochStatus_InputsProcessed + err = s.repository.UpdateEpochStatus(s.ctx, app.IApplicationAddress.String(), &firstEpoch) + s.Require().Nil(err) + firstOutputData := []byte("output1") firstOutputHash := crypto.Keccak256Hash(firstOutputData) firstOutput := model.Output{