Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ test-cascade:
# SQLite history/dedup state is not shared with Cascade fixtures or other nodes.
test-lep6: setup-lep6-supernodes
@echo "Running LEP-6 e2e tests..."
@cd tests/system && ${GO} mod tidy && ${GO} test -tags=system_test -timeout=900s -v -run '^TestLEP6' .
@cd tests/system && ${GO} mod tidy && ${GO} test -tags=system_test -timeout=1500s -v -run '^TestLEP6' .

# Validate LEP-6 local config/default/fixture coverage without starting a network.
lep6-validate-config:
Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/rqstore/rq_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions pkg/storage/rqstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const createRQSymbolsDir string = `
type Store interface {
DeleteSymbolsByTxID(txid string) error
StoreSymbolDirectory(txid, dir string) error
UpsertSymbolDirectory(txid, dir string) error
GetDirectoryByTxID(txid string) (string, error)
GetToDoStoreSymbolDirs() ([]SymbolDir, error)
SetIsCompleted(txid string) error
Expand Down Expand Up @@ -121,6 +122,32 @@ func (s *SQLiteRQStore) StoreSymbolDirectory(txid, dir string) error {
return nil
}

// UpsertSymbolDirectory associates a txid with a directory path idempotently.
//
// LEP-6 heal finalization retries publish after transient P2P failures. A retry
// for the same action must not get stuck behind the row inserted by the failed
// attempt, so this path explicitly upserts instead of changing the legacy insert
// semantics of StoreSymbolDirectory.
func (s *SQLiteRQStore) UpsertSymbolDirectory(txid, dir string) error {
stmt, err := s.db.Prepare(`
INSERT INTO rq_symbols_dir (txid, dir, is_completed)
VALUES (?, ?, FALSE)
ON CONFLICT(txid) DO UPDATE SET
dir = excluded.dir,
is_completed = FALSE
`)
if err != nil {
return fmt.Errorf("failed to prepare upsert statement for directory: %w", err)
}
defer stmt.Close()

if _, err := stmt.Exec(txid, dir); err != nil {
return fmt.Errorf("failed to execute upsert statement for directory: %w", err)
}

return nil
}

// GetDirectoryByTxID retrieves the directory path associated with a given txid.
func (s *SQLiteRQStore) GetDirectoryByTxID(txid string) (string, error) {
var dir string
Expand Down
22 changes: 22 additions & 0 deletions pkg/storage/rqstore/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestGetToDoStoreSymbolDirs(t *testing.T) {
Expand Down Expand Up @@ -159,3 +160,24 @@ func TestStoreSymbolDirectory(t *testing.T) {
})
}
}

func TestUpsertSymbolDirectory(t *testing.T) {
store := SetupTestDB(t)
defer store.Close()

require.NoError(t, store.StoreSymbolDirectory("tx123", "dir123"))
require.NoError(t, store.UpdateIsFirstBatchStored("tx123"))
require.NoError(t, store.SetIsCompleted("tx123"))

require.NoError(t, store.UpsertSymbolDirectory("tx123", "dir456"))

var row struct {
Dir string `db:"dir"`
IsFirstBatchStored bool `db:"is_first_batch_stored"`
IsCompleted bool `db:"is_completed"`
}
require.NoError(t, store.db.Get(&row, "SELECT dir, is_first_batch_stored, is_completed FROM rq_symbols_dir WHERE txid = ?", "tx123"))
assert.Equal(t, "dir456", row.Dir)
assert.True(t, row.IsFirstBatchStored)
assert.False(t, row.IsCompleted)
}
28 changes: 20 additions & 8 deletions supernode/adaptors/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ func NewP2PService(client p2p.Client, store rqstore.Store) P2PService {
}

type StoreArtefactsRequest struct {
TaskID string
ActionID string
IDFiles [][]byte
SymbolsDir string
Layout codec.Layout
TaskID string
ActionID string
IDFiles [][]byte
SymbolsDir string
Layout codec.Layout
IdempotentDirectoryRecord bool
}

func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest, f logtrace.Fields) error {
Expand All @@ -67,7 +68,7 @@ func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest,
"symbols_dir": req.SymbolsDir,
})
start := time.Now()
firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles, req.Layout)
firstPassSymbols, totalSymbols, err := p.storeCascadeSymbolsAndData(ctx, req.TaskID, req.ActionID, req.SymbolsDir, req.IDFiles, req.Layout, req.IdempotentDirectoryRecord)
if err != nil {
return fmt.Errorf("error storing artefacts: %w", err)
}
Expand All @@ -89,8 +90,14 @@ func (p *p2pImpl) StoreArtefacts(ctx context.Context, req StoreArtefactsRequest,
return nil
}

func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, actionID string, symbolsDir string, metadataFiles [][]byte, layout codec.Layout) (int, int, error) {
if err := p.rqStore.StoreSymbolDirectory(taskID, symbolsDir); err != nil {
func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, actionID string, symbolsDir string, metadataFiles [][]byte, layout codec.Layout, idempotentDirectoryRecord bool) (int, int, error) {
var err error
if idempotentDirectoryRecord {
err = p.rqStore.UpsertSymbolDirectory(taskID, symbolsDir)
} else {
err = p.rqStore.StoreSymbolDirectory(taskID, symbolsDir)
}
if err != nil {
return 0, 0, fmt.Errorf("store symbol dir: %w", err)
}
metadataBytes := totalBytes(metadataFiles)
Expand Down Expand Up @@ -203,6 +210,11 @@ func (p *p2pImpl) storeCascadeSymbolsAndData(ctx context.Context, taskID, action
if err := p.rqStore.UpdateIsFirstBatchStored(taskID); err != nil {
return totalSymbols, totalAvailable, fmt.Errorf("update first-batch flag: %w", err)
}
if totalSymbols >= totalAvailable {
if err := p.rqStore.SetIsCompleted(taskID); err != nil {
return totalSymbols, totalAvailable, fmt.Errorf("mark symbols completed: %w", err)
}
}
logtrace.Info(ctx, "store: first-pass bytes summary", logtrace.Fields{
"taskID": taskID,
"symbols_stored": totalSymbols,
Expand Down
2 changes: 1 addition & 1 deletion supernode/cascade/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (task *CascadeRegistrationTask) storeArtefacts(ctx context.Context, actionI
}
ctx = logtrace.CtxWithOrigin(ctx, "first_pass")
logtrace.Info(ctx, "store: first-pass begin", lf)
if err := task.P2P.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{IDFiles: idFiles, SymbolsDir: symbolsDir, Layout: layout, TaskID: task.taskID, ActionID: actionID}, f); err != nil {
if err := task.P2P.StoreArtefacts(ctx, adaptors.StoreArtefactsRequest{IDFiles: idFiles, SymbolsDir: symbolsDir, Layout: layout, TaskID: task.taskID, ActionID: actionID, IdempotentDirectoryRecord: f[logtrace.FieldMethod] == "PublishStagedArtefacts"}, f); err != nil {
return task.wrapErr(ctx, "failed to store artefacts", err, lf)
}
logtrace.Info(ctx, "store: first-pass ok", lf)
Expand Down
Loading