Skip to content

Commit

Permalink
Merge branch 'visualization' into ws_readme
Browse files Browse the repository at this point in the history
  • Loading branch information
andreaj00 committed Feb 14, 2024
2 parents 55c95ef + 1d36644 commit 41e5d8d
Show file tree
Hide file tree
Showing 23 changed files with 77 additions and 47 deletions.
2 changes: 1 addition & 1 deletion cmd/bench/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var (
clientCmd = &cobra.Command{
Use: "client",
Short: "Generate and submit transactions to a Mir cluster",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
return runClient()
},
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/bench/cmd/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var (
nodeCmd = &cobra.Command{
Use: "node",
Short: "Start a Mir node",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
if err := runNode(); !es.Is(err, mir.ErrStopped) {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/bench/cmd/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var (
paramsCmd = &cobra.Command{
Use: "params",
Short: "generate parameters",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, args []string) error {
return generateParams(args)
},
}
Expand Down
2 changes: 1 addition & 1 deletion codegen/generators/dsl-gen/generator/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func generateDslFunctionForHandlingSimpleEvent(eventNode *events.EventNode, upon

func GetDslOriginOption(origin *events.Origin) (*types.OneofOption, bool) {
return sliceutil.Any(
sliceutil.Filter(origin.TypeOneof.Options, func(i int, opt *types.OneofOption) bool {
sliceutil.Filter(origin.TypeOneof.Options, func(_ int, opt *types.OneofOption) bool {
return opt.Name() == "Dsl"
}),
)
Expand Down
6 changes: 3 additions & 3 deletions codegen/model/types/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ type Fields []*Field

// FuncParamsPbTypes returns a list of field lowercase names followed by their pb types.
func (fs Fields) FuncParamsPbTypes() []jen.Code {
return sliceutil.Transform(fs, func(i int, f *Field) jen.Code { return f.FuncParamPbType() })
return sliceutil.Transform(fs, func(_ int, f *Field) jen.Code { return f.FuncParamPbType() })
}

// FuncParamsMirTypes returns a list of field lowercase names followed by their mir types.
func (fs Fields) FuncParamsMirTypes() []jen.Code {
return sliceutil.Transform(fs, func(i int, f *Field) jen.Code { return f.FuncParamMirType() })
return sliceutil.Transform(fs, func(_ int, f *Field) jen.Code { return f.FuncParamMirType() })
}

// FuncParamsIDs returns a list of fields lowercase names as identifiers, without the types.
func (fs Fields) FuncParamsIDs() []jen.Code {
return sliceutil.Transform(fs, func(i int, f *Field) jen.Code { return jen.Id(f.LowercaseName()) })
return sliceutil.Transform(fs, func(_ int, f *Field) jen.Code { return jen.Id(f.LowercaseName()) })
}

// ByName returns the field with the given name (or nil if there is no such field).
Expand Down
13 changes: 11 additions & 2 deletions frontend/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
}

function handleMessageWebSocket(event){
const message = decomposeJSON(JSON.parse(event.data));
const message = decomposeJSON(event.data);
console.log("currently in sync and waiting", port);
setIncomingMessage(message);
}
Expand All @@ -79,7 +79,16 @@
}

const decomposeJSON = (message) => {
return JSON.stringify(message, null, 2);
// Note: This decompose function is closely tied to the composition in websocketwriter.go.
// We expect the following structure:
// {
// event: string(message),
// timestamp: "sending_timestamp"
// }
// Here, we need to perform double parsing because we need to re-deparse the 'message' as it was originally in another JSON format.
let parsedMessage = JSON.parse(message);
parsedMessage.event = JSON.parse(parsedMessage.event);
return JSON.stringify(parsedMessage, null, 2);
}

const acceptIncomingLog = () => {
Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (c *consumer) ImplementsModule() {}
// ApplyEvents increments a counter and sleeps for a given duration (set at module instantiation)
// for each event in the given list.
func (c *consumer) ApplyEvents(evts *events.EventList) (*events.EventList, error) {
evtsOut, err := modules.ApplyEventsSequentially(evts, func(event *eventpb.Event) (*events.EventList, error) {
evtsOut, err := modules.ApplyEventsSequentially(evts, func(_ *eventpb.Event) (*events.EventList, error) {
atomic.AddUint64(&c.numProcessed, 1)
time.Sleep(c.delay)
return events.EmptyList(), nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func IncludeVerificationOfCertificates(
})

// When the signatures in a certificate are verified, output the result of certificate verification.
cryptopbdsl.UponSigsVerified(m, func(nodeIDs []t.NodeID, errs []error, allOK bool, context *verifySigsInCertContext) error {
cryptopbdsl.UponSigsVerified(m, func(_ []t.NodeID, _ []error, allOK bool, context *verifySigsInCertContext) error {
reqID := context.reqID

if _, ok := state.RequestState[reqID]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion pkg/batchfetcher/batchfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func NewModule(mc ModuleConfig, epochNr tt.EpochNr, clientProgress *clientprogre

// The DeliverCert handler requests the transactions referenced by the received availability certificate
// from the availability layer.
isspbdsl.UponDeliverCert(m, func(sn tt.SeqNr, cert *apbtypes.Cert, empty bool) error {
isspbdsl.UponDeliverCert(m, func(_ tt.SeqNr, cert *apbtypes.Cert, empty bool) error {
// Create an empty output item and enqueue it immediately.
// Actual output will be delayed until the transactions have been received.
// This is necessary to preserve the order of incoming and outgoing events.
Expand Down
4 changes: 2 additions & 2 deletions pkg/bcb/bcbmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func NewModule(mc ModuleConfig, params *ModuleParams, nodeID t.NodeID) modules.P
return nil
})

cryptopbdsl.UponSignResult(m, func(signature []byte, context *signStartMessageContext) error {
cryptopbdsl.UponSignResult(m, func(signature []byte, _ *signStartMessageContext) error {
if !state.sentEcho {
state.sentEcho = true
transportpbdsl.SendMessage(m, mc.Net, bcbpbmsgs.EchoMessage(mc.Self, signature), []t.NodeID{params.Leader})
Expand Down Expand Up @@ -132,7 +132,7 @@ func NewModule(mc ModuleConfig, params *ModuleParams, nodeID t.NodeID) modules.P
})

// upon event <al, Deliver | p, [FINAL, m, Σ]> do
bcbpbdsl.UponFinalMessageReceived(m, func(from t.NodeID, data []byte, signers []t.NodeID, signatures [][]byte) error {
bcbpbdsl.UponFinalMessageReceived(m, func(_ t.NodeID, data []byte, signers []t.NodeID, signatures [][]byte) error {
// if #({p ∈ Π | Σ[p] != ⊥ ∧ verifysig(p, bcb||p||ECHO||m, Σ[p])}) > (N+f)/2 and delivered = FALSE do
if len(signers) == len(signatures) && len(signers) > (params.GetN()+params.GetF())/2 && !state.delivered {
signedMessage := [][]byte{params.InstanceUID, []byte("ECHO"), data}
Expand Down
4 changes: 2 additions & 2 deletions pkg/checkpoint/serializing_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ import (
func FuzzCheckpointForSig(f *testing.F) {
f.Add(uint64(0), uint64(0), []byte("13242342342342"))

f.Fuzz(func(t *testing.T, s, n uint64, data []byte) {
f.Fuzz(func(_ *testing.T, s, n uint64, data []byte) {
serializeCheckpointForSig(tt.EpochNr(s), tt.SeqNr(n), data)
})
}

func FuzzSnapshotForHash(f *testing.F) {
f.Add(100, uint64(0), "/ip4/7.7.7.7/tcp/1234/p2p/QmYyQSo1c1Ym7orWxLYvCrM2EmxFTANf8wXmmE7DWjhx5N", "127.0.0.1:3333", []byte("13242342342342"))

f.Fuzz(func(t *testing.T, n int, e uint64, k, v string, data []byte) {
f.Fuzz(func(_ *testing.T, n int, e uint64, k, v string, data []byte) {
n = n % 5000
membership := trantorpbtypes.Membership{make(map[types.NodeID]*trantorpbtypes.NodeIdentity)} // nolint:govet

Expand Down
2 changes: 1 addition & 1 deletion pkg/debugger/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewWebSocketDebugger(
logger logging.Logger,
) (*eventlog.Recorder, error) {
// writerFactory creates and returns a WebSocket-based event writer
writerFactory := func(_ string, ownID t.NodeID, l logging.Logger) (eventlog.EventWriter, error) {
writerFactory := func(_ string, _ t.NodeID, l logging.Logger) (eventlog.EventWriter, error) {
return newWSWriter(fmt.Sprintf(":%s", port), l), nil
}

Expand Down
47 changes: 34 additions & 13 deletions pkg/debugger/websocketwriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/http"
"time"

"google.golang.org/protobuf/encoding/protojson"

"github.com/gorilla/websocket"

"github.com/filecoin-project/mir/pkg/events"
Expand Down Expand Up @@ -45,7 +47,7 @@ func newWSWriter(port string, logger logging.Logger) *WSWriter {
}

http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
wsWriter.upgrader.CheckOrigin = func(r *http.Request) bool { return true } // Allow opening the connection by HTML file
wsWriter.upgrader.CheckOrigin = func(_ *http.Request) bool { return true } // Allow opening the connection by HTML file
conn, err := wsWriter.upgrader.Upgrade(w, r, nil)
if err != nil {
panic(err)
Expand Down Expand Up @@ -113,7 +115,7 @@ func (wsw *WSWriter) Close() error {

// Write sends every event to the frontend and then waits for a message detailing how to proceed with that event
// The returned EventList contains the accepted events
func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList, error) {
func (wsw *WSWriter) Write(list *events.EventList, timestamp int64) (*events.EventList, error) {
for wsw.conn == nil {
wsw.logger.Log(logging.LevelInfo, "No connection")
time.Sleep(time.Millisecond * 100) // TODO: Why do we sleep here? Do we need it?
Expand All @@ -126,17 +128,17 @@ func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList,
iter := list.Iterator()

for event := iter.Next(); event != nil; event = iter.Next() {
// Create a new JSON object with a timestamp field
timestamp := time.Now()
logData := map[string]interface{}{
"event": event,
"timestamp": timestamp,
// Assuming 'event' is a Protobuf message
eventJSON, err := protojson.Marshal(event)
if err != nil {
return list, fmt.Errorf("error marshaling event to JSON: %w", err)
}

// Marshal the JSON data
message, err := json.Marshal(logData)
message, err := json.Marshal(map[string]interface{}{
"event": string(eventJSON),
"timestamp": timestamp,
})
if err != nil {
panic(err)
return list, fmt.Errorf("error marshaling eventJSON and timestamp to JSON: %w", err)
}

// Send the JSON message over WebSocket
Expand All @@ -145,7 +147,10 @@ func (wsw *WSWriter) Write(list *events.EventList, _ int64) (*events.EventList,
}

action := <-wsw.eventSignal
acceptedEvents, _ = eventAction(action["Type"], action["Value"], acceptedEvents, event)
acceptedEvents, err = eventAction(action["Type"], action["Value"], acceptedEvents, event)
if err != nil {
return list, err
}
}
return acceptedEvents, nil
}
Expand All @@ -157,12 +162,28 @@ func (wsw *WSWriter) HandleClientSignal(signal map[string]string) {
// EventAction decides, based on the input what exactly is done next with the current event
func eventAction(
actionType string,
_ string,
value string,
acceptedEvents *events.EventList,
currentEvent *eventpb.Event,
) (*events.EventList, error) {
if actionType == "accept" {
acceptedEvents.PushBack(currentEvent)
} else if actionType == "replace" {
type ValueFormat struct {
EventJSON string `json:"event"`
Timestamp int64 `json:"timestamp"`
}
var input ValueFormat
err := json.Unmarshal([]byte(value), &input)
if err != nil {
return acceptedEvents, fmt.Errorf("error unmarshalling value to ValueFormat: %w", err)
}
var modifiedEvent eventpb.Event
err = protojson.Unmarshal([]byte(input.EventJSON), &modifiedEvent)
if err != nil {
return acceptedEvents, fmt.Errorf("error unmarshalling modified event using protojson: %w", err)
}
acceptedEvents.PushBack(&modifiedEvent)
}
return acceptedEvents, nil
}
2 changes: 1 addition & 1 deletion pkg/deploytest/localtransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type LocalTransportLayer interface {
func NewLocalTransportLayer(sim *Simulation, transportType string, nodeIDsWeight map[t.NodeID]types.VoteWeight, logger logging.Logger) (LocalTransportLayer, error) {
switch transportType {
case "sim":
messageDelayFn := func(from, to t.NodeID) time.Duration {
messageDelayFn := func(_, _ t.NodeID) time.Duration {
// TODO: Make min and max message delay configurable
return testsim.RandDuration(sim.Rand, 0, 10*time.Millisecond)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dsl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func MirOrigin(contextID ContextID) *dslpbtypes.Origin {

// UponInit invokes handler when the module is initialized.
func UponInit(m Module, handler func() error) {
UponEvent[*eventpb.Event_Init](m, func(ev *eventpb.Init) error {
UponEvent[*eventpb.Event_Init](m, func(_ *eventpb.Init) error {
return handler()
})
}
10 changes: 5 additions & 5 deletions pkg/dsl/test/dslmodule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func newContextTestingModule(mc *contextTestingModuleModuleConfig) dsl.Module {
return nil
})

cryptopbdsl.UponSigsVerified(m, func(nodeIDs []types.NodeID, errs []error, allOK bool, context *uint64) error {
cryptopbdsl.UponSigsVerified(m, func(nodeIDs []types.NodeID, _ []error, allOK bool, context *uint64) error {
if allOK {
for _, nodeID := range nodeIDs {
EmitTestingString(m, mc.Verified, fmt.Sprintf("%v: %v verified", *context, nodeID))
Expand All @@ -298,7 +298,7 @@ func newContextTestingModule(mc *contextTestingModuleModuleConfig) dsl.Module {
return nil
})

cryptopbdsl.UponSigsVerified(m, func(nodeIDs []types.NodeID, errs []error, allOK bool, context *uint64) error {
cryptopbdsl.UponSigsVerified(m, func(_ []types.NodeID, _ []error, allOK bool, context *uint64) error {
if allOK {
EmitTestingUint(m, mc.Verified, *context)
}
Expand All @@ -309,8 +309,8 @@ func newContextTestingModule(mc *contextTestingModuleModuleConfig) dsl.Module {
}

func TestDslModule_ContextRecoveryAndCleanup(t *testing.T) {
testCases := map[string]func(mc *contextTestingModuleModuleConfig, m dsl.Module){
"empty": func(mc *contextTestingModuleModuleConfig, m dsl.Module) {},
testCases := map[string]func(_ *contextTestingModuleModuleConfig, m dsl.Module){
"empty": func(_ *contextTestingModuleModuleConfig, _ dsl.Module) {},

"request response": func(mc *contextTestingModuleModuleConfig, m dsl.Module) {
eventsOut, err := m.ApplyEvents(events.ListOf(events.TestingString(mc.Self, "hello")))
Expand Down Expand Up @@ -410,7 +410,7 @@ func TestDslModule_ContextRecoveryAndCleanup(t *testing.T) {

for testName, tc := range testCases {
tc := tc
t.Run(testName, func(t *testing.T) {
t.Run(testName, func(_ *testing.T) {
mc := defaultContextTestingModuleConfig()
m := newContextTestingModule(mc)
tc(mc, m)
Expand Down
2 changes: 1 addition & 1 deletion pkg/eventlog/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewRecorder(
fileCount: 1,
newDests: OneFileLogger(),
path: path,
filter: func(event *eventpb.Event) bool {
filter: func(_ *eventpb.Event) bool {
// Record all events by default.
return true
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/iss/iss.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func New(
})

chkppbdsl.UponStableCheckpointReceived(iss.m,
func(sender t.NodeID, sn tt.SeqNr, snapshot *trantorpbtypes.StateSnapshot, cert map[t.NodeID][]byte) error {
func(_ t.NodeID, sn tt.SeqNr, snapshot *trantorpbtypes.StateSnapshot, cert map[t.NodeID][]byte) error {
chkp := &checkpointpbtypes.StableCheckpoint{
Sn: sn,
Snapshot: snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func IncludeBatchCreation( // nolint:gocognit
batchSize := 0
txCount := 0

txIDs, txs, _ := state.Iterator.NextWhile(func(txID tt.TxID, tx *trantorpbtypes.Transaction) bool {
txIDs, txs, _ := state.Iterator.NextWhile(func(_ tt.TxID, tx *trantorpbtypes.Transaction) bool {
if txCount < params.MaxTransactionsInBatch && batchSize+len(tx.Data) <= params.MaxPayloadInBatch {
txCount++
state.NumUnproposed--
Expand Down Expand Up @@ -182,7 +182,7 @@ func IncludeBatchCreation( // nolint:gocognit
// ClientProgress - for each client, list of pending transactions sorted by TxNo - that
// would make pruning significantly more efficient.
state.ClientProgress.LoadPb(clientProgress.Pb())
_, removedTXs := state.Transactions.RemoveSelected(func(txID tt.TxID, tx *trantorpbtypes.Transaction) bool {
_, removedTXs := state.Transactions.RemoveSelected(func(_ tt.TxID, tx *trantorpbtypes.Transaction) bool {
return state.ClientProgress.Contains(tx.ClientId, tx.TxNo)
})
for _, tx := range removedTXs {
Expand Down
2 changes: 1 addition & 1 deletion pkg/net/libp2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ func TestMessagingWithNewNodes(t *testing.T) {
}
}

receiver := func(nodeID types.NodeID, events chan *events.EventList, stop chan struct{}) {
receiver := func(_ types.NodeID, events chan *events.EventList, stop chan struct{}) {
defer receivers.Done()

for {
Expand Down
2 changes: 1 addition & 1 deletion pkg/orderers/internal/parts/catchup/catchup.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func SendDoneMessages(

// Collect the preprepare digests of all committed certificates.
digests := make([][]byte, 0, state.Segment.Len())
maputil.IterateSorted(state.Slots[state.View], func(sn tt.SeqNr, slot *common.PbftSlot) bool {
maputil.IterateSorted(state.Slots[state.View], func(_ tt.SeqNr, slot *common.PbftSlot) bool {
digests = append(digests, slot.Digest)
return true
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/trantor/testing/smr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func newDeployment(conf *TestConfig) (*deploytest.Deployment, error) {
var simulation *deploytest.Simulation
if conf.Transport == "sim" {
r := rand.New(rand.NewSource(conf.RandomSeed)) // nolint: gosec
eventDelayFn := func(e *eventpb.Event) time.Duration {
eventDelayFn := func(_ *eventpb.Event) time.Duration {
// TODO: Make min and max event processing delay configurable
return testsim.RandDuration(r, 0, time.Microsecond)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/indexedlist/indexedlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func TestIndexedList_RemoveSelected(t *testing.T) {
il.Append([]string{"a", "b", "c"}, []int{0, 1, 2})
assert.Equal(t, 3, il.Len())

keys, vals := il.RemoveSelected(func(key string, val int) bool {
keys, vals := il.RemoveSelected(func(key string, _ int) bool {
return key == "b" || key == "c"
})
assert.Equal(t, []string{"b", "c"}, keys)
Expand Down Expand Up @@ -149,7 +149,7 @@ func TestIterator_NextWhile(t *testing.T) {
// Condition allowing all elements to be traversed.
iter := il.Iterator(0)
sum := 0
keys, vals, ok := iter.NextWhile(func(key string, val int) bool {
keys, vals, ok := iter.NextWhile(func(_ string, val int) bool {
if sum+val <= 10 {
sum += val
return true
Expand All @@ -171,7 +171,7 @@ func TestIterator_NextWhile(t *testing.T) {
// Condition allowing only part of the elements to be traversed.
iter = il.Iterator(0)
sum = 0
keys, vals, ok = iter.NextWhile(func(key string, val int) bool {
keys, vals, ok = iter.NextWhile(func(_ string, val int) bool {
if sum+val <= 1 {
sum += val
return true
Expand Down

0 comments on commit 41e5d8d

Please sign in to comment.