Skip to content

Commit

Permalink
Garbage-collect batch DB (#502)
Browse files Browse the repository at this point in the history
* Remove deduplication from fake batch db
* Simplify fake batch db events
* Re-generate protobufs
* Remove forgotten ts store from fake batch db
* Add garbage-collection to fake batch db
* Re-generate protobufs
* Clean up respondIfReady signature in msc
* Make ISS aware of batch db module ID
* Garbage-collect batch db state from ISS
* Explain behavior of BatchStored in a comment

Signed-off-by: Matej Pavlovic <matopavlovic@gmail.com>
  • Loading branch information
matejpavlovic committed Jul 26, 2023
1 parent f40ba06 commit 201930e
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 192 deletions.
1 change: 1 addition & 0 deletions cmd/mircat/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func debuggerNode(id t.NodeID, membership *trantorpbtypes.Membership) (*mir.Node
Self: "iss",
App: "batchfetcher",
Availability: "availability",
BatchDB: "batchdb",
Checkpoint: "checkpointing",
Net: "net",
Ordering: "ordering",
Expand Down
56 changes: 36 additions & 20 deletions pkg/availability/batchdb/fakebatchdb/fakebatchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type ModuleConfig struct {
}

type moduleState struct {
BatchStore map[msctypes.BatchID]batchInfo
TransactionStore map[tt.TxID]*trantorpbtypes.Transaction
batchStore map[msctypes.BatchID]*batch
batchesByRetIdx map[tt.RetentionIndex][]msctypes.BatchID
retIdx tt.RetentionIndex
}

type batchInfo struct {
txIDs []tt.TxID
metadata []byte
type batch struct {
txs []*trantorpbtypes.Transaction
}

// NewModule returns a new module for a fake batch database.
Expand All @@ -32,40 +32,56 @@ func NewModule(mc ModuleConfig) modules.Module {
m := dsl.NewModule(mc.Self)

state := moduleState{
BatchStore: make(map[msctypes.BatchID]batchInfo),
TransactionStore: make(map[tt.TxID]*trantorpbtypes.Transaction),
batchStore: make(map[msctypes.BatchID]*batch),
batchesByRetIdx: make(map[tt.RetentionIndex][]msctypes.BatchID),
retIdx: 0,
}

// On StoreBatch request, just store the data in the local memory.
batchdbpbdsl.UponStoreBatch(m, func(batchID msctypes.BatchID, txIDs []tt.TxID, txs []*trantorpbtypes.Transaction, metadata []byte, origin *batchdbpbtypes.StoreBatchOrigin) error {
state.BatchStore[batchID] = batchInfo{
txIDs: txIDs,
metadata: metadata,
}
batchdbpbdsl.UponStoreBatch(m, func(
batchID msctypes.BatchID,
txs []*trantorpbtypes.Transaction,
retIdx tt.RetentionIndex,
origin *batchdbpbtypes.StoreBatchOrigin,
) error {

for i, txID := range txIDs {
state.TransactionStore[txID] = txs[i]
// Only save the batch if its retention index has not yet been garbage-collected.
if retIdx >= state.retIdx {
b := batch{txs}
state.batchStore[batchID] = &b
state.batchesByRetIdx[retIdx] = append(state.batchesByRetIdx[retIdx], batchID)
}

// Note that we emit a BatchStored event even if the batch's retention index was too low
// (and thus the batch was not actually stored).
// However, since this situation is indistinguishable from
// storing the batch and immediately garbage-collecting it,
// it is simpler to report success to the module that produced the StoreBatch event
// (rather than creating a whole different code branch with no real utility).
batchdbpbdsl.BatchStored(m, origin.Module, origin)
return nil
})

// On LookupBatch request, just check the local map.
batchdbpbdsl.UponLookupBatch(m, func(batchID msctypes.BatchID, origin *batchdbpbtypes.LookupBatchOrigin) error {

info, found := state.BatchStore[batchID]
storedBatch, found := state.batchStore[batchID]
if !found {
batchdbpbdsl.LookupBatchResponse(m, origin.Module, false, nil, origin)
return nil
}

txs := make([]*trantorpbtypes.Transaction, len(info.txIDs))
for i, txID := range info.txIDs {
txs[i] = state.TransactionStore[txID]
}
batchdbpbdsl.LookupBatchResponse(m, origin.Module, true, storedBatch.txs, origin)
return nil
})

batchdbpbdsl.LookupBatchResponse(m, origin.Module, true, txs, origin)
batchdbpbdsl.UponGarbageCollect(m, func(retentionIndex tt.RetentionIndex) error {
for ; state.retIdx < retentionIndex; state.retIdx++ {
for _, batchID := range state.batchesByRetIdx[state.retIdx] {
delete(state.batchStore, batchID)
}
delete(state.batchesByRetIdx, state.retIdx)
}
return nil
})

Expand Down
24 changes: 19 additions & 5 deletions pkg/availability/multisigcollector/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,25 @@ type ModuleConfig struct {
// ModuleParams sets the values for the parameters of an instance of the protocol.
// All replicas are expected to use identical module parameters.
type ModuleParams struct {
InstanceUID []byte // unique identifier for this instance used to prevent replay attacks
EpochNr tt.EpochNr // The epoch in which this instance of MultisigCollector operates
Membership *trantorpbtypes.Membership // the list of participating nodes
Limit int // the maximum number of certificates to generate before a request is completed
MaxRequests int // the maximum number of requests to be provided by this module

// InstanceUID is a unique identifier for this instance used to prevent replay attacks.
InstanceUID []byte

// Membership defines the set of nodes participating in a particular instance of the multisig collector.
Membership *trantorpbtypes.Membership

// Limit is the maximum number of certificates to generate before a request is completed.
Limit int

// MaxRequests is the maximum number of certificate requests to be handled by this module.
// It prevents the multisig collector from continuing operation (transaction dissemination)
// after no more certificate requests are going to arrive.
MaxRequests int

// EpochNr is the epoch in which the instance of MultisigCollector operates.
// It is used as the RetentionIndex to associate with all newly stored batches.
// and batches requested from the mempool.
EpochNr tt.EpochNr
}

// SigData is the binary data that should be signed for forming a certificate.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,13 @@ func IncludeBatchReconstruction(
return nil
}

batchdbpbdsl.StoreBatch(m, mc.BatchDB, batchID, context.txIDs, context.txs, []byte{} /*metadata*/, &storeBatchContext{})
batchdbpbdsl.StoreBatch(m,
mc.BatchDB,
batchID,
context.txs,
tt.RetentionIndex(params.EpochNr),
&storeBatchContext{},
)
saveAndFinish(m, context.reqID, context.txs, context.batchID, requestState.ReqOrigin, &state)

return nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func IncludeCreatingCertificates(
reqOrigin: origin,
})

respondIfReady(m, &state, params)
respondIfReady(m, &state, params.Membership)
if len(state.certificates) == 0 {
apbdsl.ComputeCert(m, mc.Self)
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func IncludeCreatingCertificates(
newDue := membutil.HaveWeakQuorum(params.Membership, maputil.GetKeys(cert.sigs)) // keep this here...

if len(state.requestStates) > 0 {
respondIfReady(m, &state, params) // ... because this call changes the state
respondIfReady(m, &state, params.Membership) // ... because this call changes the state
}

if newDue && len(state.certificates) < params.Limit {
Expand Down Expand Up @@ -204,7 +204,7 @@ func IncludeCreatingCertificates(

// When the id of the batch is computed, store the batch persistently.
mempooldsl.UponBatchIDResponse(m, func(batchID msctypes.BatchID, context *computeIDOfReceivedBatchContext) error {
batchdbpbdsl.StoreBatch(m, mc.BatchDB, batchID, context.txIDs, context.txs, nil, /*metadata*/
batchdbpbdsl.StoreBatch(m, mc.BatchDB, batchID, context.txs, tt.RetentionIndex(params.EpochNr),
&storeBatchContext{context.sourceID, context.reqID, batchID})
// TODO minor optimization: start computing cert without waiting for reply (maybe do not even get a reply from batchdb)
return nil
Expand All @@ -224,11 +224,11 @@ func IncludeCreatingCertificates(
})
}

func respondIfReady(m dsl.Module, state *certCreationState, params *common.ModuleParams) {
func respondIfReady(m dsl.Module, state *certCreationState, membership *trantorpbtypes.Membership) {

// Select certificates with enough signatures.
finishedCerts := maputil.RemoveAll(state.certificates, func(_ requestID, cert *certificate) bool {
return membutil.HaveWeakQuorum(params.Membership, maputil.GetKeys(cert.sigs))
return membutil.HaveWeakQuorum(membership, maputil.GetKeys(cert.sigs))
})

// Return immediately if there are no finished certificates.
Expand Down
10 changes: 6 additions & 4 deletions pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"encoding/binary"
"fmt"

ppv "github.com/filecoin-project/mir/pkg/orderers/common/pprepvalidator"

es "github.com/go-errors/errors"
"google.golang.org/protobuf/proto"

Expand All @@ -30,8 +28,10 @@ import (
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/orderers"
"github.com/filecoin-project/mir/pkg/orderers/common"
ppv "github.com/filecoin-project/mir/pkg/orderers/common/pprepvalidator"
apppbdsl "github.com/filecoin-project/mir/pkg/pb/apppb/dsl"
"github.com/filecoin-project/mir/pkg/pb/availabilitypb"
batchdbpbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/batchdbpb/dsl"
apbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/dsl"
mscpbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/mscpb/types"
apbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/types"
Expand Down Expand Up @@ -629,7 +629,8 @@ func (iss *ISS) initAvailability() {
MultisigCollector: &mscpbtypes.InstanceParams{
Epoch: iss.epoch.Nr(),
Membership: iss.memberships[0],
MaxRequests: uint64(iss.Params.SegmentLength)},
MaxRequests: uint64(iss.Params.SegmentLength),
},
},
},
)
Expand Down Expand Up @@ -914,12 +915,13 @@ func (iss *ISS) deliverCommonCheckpoint(chkpData []byte) error {
pruneIndex := int(chkp.Epoch()) - iss.Params.RetainedEpochs
if pruneIndex > 0 { // "> 0" and not ">= 0", since only entries strictly smaller than the index are pruned.

// Prune timer, checkpointing, availability, and orderers.
// Prune timer, checkpointing, availability, orderers, and other modules.
eventpbdsl.TimerGarbageCollect(iss.m, iss.moduleConfig.Timer, tt.RetentionIndex(pruneIndex))
factorypbdsl.GarbageCollect(iss.m, iss.moduleConfig.Checkpoint, tt.RetentionIndex(pruneIndex))
factorypbdsl.GarbageCollect(iss.m, iss.moduleConfig.Availability, tt.RetentionIndex(pruneIndex))
factorypbdsl.GarbageCollect(iss.m, iss.moduleConfig.Ordering, tt.RetentionIndex(pruneIndex))
factorypbdsl.GarbageCollect(iss.m, iss.moduleConfig.PPrepValidatorChkp, tt.RetentionIndex(pruneIndex))
batchdbpbdsl.GarbageCollect(iss.m, iss.moduleConfig.BatchDB, tt.RetentionIndex(pruneIndex))

// Prune epoch state.
for epoch := range iss.epochs {
Expand Down
1 change: 1 addition & 0 deletions pkg/iss/moduleconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type ModuleConfig struct {
Self t.ModuleID
App t.ModuleID
Availability t.ModuleID
BatchDB t.ModuleID
Checkpoint t.ModuleID
ChkpValidator t.ModuleID
Net t.ModuleID
Expand Down

0 comments on commit 201930e

Please sign in to comment.