From acae44cf14e6bcbd01e48e5ce9c658eb111980eb Mon Sep 17 00:00:00 2001 From: Bartossh Date: Tue, 13 Jun 2023 16:36:51 +0200 Subject: [PATCH] Websocket validator connection do not close on corrupted message. --- cmd/central/main.go | 7 ++++--- makefile | 18 ++++++++++++++---- server/ws.go | 16 ++++++++++------ setup_example.yaml | 2 +- validator/block.go | 3 +++ validator/validator.go | 20 +++++--------------- 6 files changed, 37 insertions(+), 29 deletions(-) diff --git a/cmd/central/main.go b/cmd/central/main.go index 88b3acc..d7ffd3d 100644 --- a/cmd/central/main.go +++ b/cmd/central/main.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" - "github.com/bartossh/Computantis/logo" - "github.com/pterm/pterm" - "github.com/urfave/cli/v2" "os" "os/signal" "time" + "github.com/bartossh/Computantis/logo" + "github.com/pterm/pterm" + "github.com/urfave/cli/v2" + "github.com/bartossh/Computantis/block" "github.com/bartossh/Computantis/blockchain" "github.com/bartossh/Computantis/bookkeeping" diff --git a/makefile b/makefile index 73b499a..339602a 100644 --- a/makefile +++ b/makefile @@ -1,8 +1,18 @@ build: - go build -o bin/central -ldflags="-s -w" cmd/central/main.go - go build -o bin/validator -ldflags="-s -w" cmd/validator/main.go - go build -o bin/wallet -ldflags="-s -w" cmd/client/main.go - go build -o bin/emulator -ldflags="-s -w" cmd/emulator/main.go + GOOS=linux GOARCH=amd64 go build -o bin/linux_x86/central -ldflags="-s -w" cmd/central/main.go + GOOS=linux GOARCH=amd64 go build -o bin/linux_x86/validator -ldflags="-s -w" cmd/validator/main.go + GOOS=linux GOARCH=amd64 go build -o bin/linux_x86/client -ldflags="-s -w" cmd/client/main.go + GOOS=linux GOARCH=amd64 go build -o bin/linux_x86/emulator -ldflags="-s -w" cmd/emulator/main.go + + GOOS=linux GOARCH=arm64 go build -o bin/linux_arm/central -ldflags="-s -w" cmd/central/main.go + GOOS=linux GOARCH=arm64 go build -o bin/linux_arm/validator -ldflags="-s -w" cmd/validator/main.go + GOOS=linux GOARCH=arm64 go build -o bin/linux_arm/wallet -ldflags="-s -w" cmd/client/main.go + GOOS=linux GOARCH=arm64 go build -o bin/linux_arm/emulator -ldflags="-s -w" cmd/emulator/main.go + + GOOS=darwin GOARCH=arm64 go build -o bin/darwin_arm/central -ldflags="-s -w" cmd/central/main.go + GOOS=darwin GOARCH=arm64 go build -o bin/darwin_arm/validator -ldflags="-s -w" cmd/validator/main.go + GOOS=darwin GOARCH=arm64 go build -o bin/darwin_arm/wallet -ldflags="-s -w" cmd/client/main.go + GOOS=darwin GOARCH=arm64 go build -o bin/darwin_arm/emulator -ldflags="-s -w" cmd/emulator/main.go documentation: ./gendocs.sh diff --git a/server/ws.go b/server/ws.go index 50b28a8..d4fa1fa 100644 --- a/server/ws.go +++ b/server/ws.go @@ -162,13 +162,15 @@ func (c *socket) readPump(ctx context.Context, cancel context.CancelFunc) { err := c.conn.ReadJSON(&msg) if err != nil { switch { - case websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure): - c.log.Info(fmt.Sprintf("socket closing connection to the client %s due to unexpected error %s\n", c.address, err)) + case websocket.IsUnexpectedCloseError(err, + websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseMessage, websocket.CloseMessageTooBig): + c.log.Info(fmt.Sprintf("closing socket connection to the client %s due to %s\n", c.address, err)) + cancel() + return default: - c.log.Info(fmt.Sprintf("socket closing connection to the client %s due to error %s\n", c.address, err)) + c.log.Info(fmt.Sprintf("reading socket message failed %s due to %s\n", c.address, err)) } - cancel() - return + continue } c.process(ctx, &msg) } @@ -180,7 +182,9 @@ func (c *socket) writePump(ctx context.Context, cancel context.CancelFunc) { defer func() { ticker.Stop() c.hub.unregister <- c - err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "central node stopped")) + err := c.conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage( + websocket.CloseNormalClosure, fmt.Sprintf("central node: %s stopped", c.address)), + ) if err != nil { c.log.Error(fmt.Sprintf("central node write closing msg error, %s", err.Error())) } diff --git a/setup_example.yaml b/setup_example.yaml index dc4d13d..b5cb75b 100644 --- a/setup_example.yaml +++ b/setup_example.yaml @@ -24,7 +24,7 @@ client: central_node_url: "http://localhost:8080" validator_node_url: "http://localhost:9090" emulator: - timeout_seconds: 5 + timeout_seconds: 20 tick_seconds: 1 random: false client_url: "http://localhost:8095" diff --git a/validator/block.go b/validator/block.go index 3a463b6..ba1f169 100644 --- a/validator/block.go +++ b/validator/block.go @@ -5,6 +5,8 @@ import ( ) func (a *app) validateBlock(block *block.Block) error { + a.mux.Lock() + defer a.mux.Unlock() if a.lastBlock.Index != 0 { if block.Index != a.lastBlock.Index+1 { return ErrBlockIndexIsInvalid @@ -16,5 +18,6 @@ func (a *app) validateBlock(block *block.Block) error { if !block.Validate(block.TrxHashes) { return ErrProofBlockIsInvalid } + a.lastBlock = *block return nil } diff --git a/validator/validator.go b/validator/validator.go index 9eac96f..9d7c9ac 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -162,7 +162,7 @@ func (a *app) connectToSocket(ctx context.Context, address string) error { } go a.pullPump(ctxx, c, address) - go a.pushPump(ctxx, c, address) + go a.pushPump(ctxx, cancelx, c, address) a.log.Info(fmt.Sprintf("validator connected to central node on address: %s", address)) return nil @@ -210,19 +210,18 @@ func (a *app) pullPump(ctx context.Context, conn *websocket.Conn, address string msgType, raw, err := conn.ReadMessage() if err != nil { a.log.Error(fmt.Sprintf("validator read msg error, %s", err.Error())) - return + continue } switch msgType { case websocket.PingMessage, websocket.PongMessage: continue case websocket.CloseMessage: return - default: var msg server.Message if err := json.Unmarshal(raw, &msg); err != nil { a.log.Error(fmt.Sprintf("validator unmarshal msg error, %s", err.Error())) - return + continue } if msg.Error != "" { a.log.Info(fmt.Sprintf("validator msg error, %s", msg.Error)) @@ -234,13 +233,8 @@ func (a *app) pullPump(ctx context.Context, conn *websocket.Conn, address string } } -func (a *app) pushPump(ctx context.Context, conn *websocket.Conn, address string) { +func (a *app) pushPump(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, address string) { ticker := time.NewTicker(time.Second * 10) - defer func() { - if err := a.disconnectFromSocket(address); err != nil { - a.log.Error(fmt.Sprintf("validator disconnect on close, %s", err.Error())) - } - }() defer ticker.Stop() for { select { @@ -248,7 +242,7 @@ func (a *app) pushPump(ctx context.Context, conn *websocket.Conn, address string err := conn.WriteMessage(websocket.PingMessage, nil) if err != nil { a.log.Error(fmt.Sprintf("validator write msg error, %s", err.Error())) - return + cancel() } case <-ctx.Done(): return @@ -305,15 +299,11 @@ func (a *app) processMessage(ctx context.Context, m *server.Message, remoteAddre } func (a *app) processBlock(_ context.Context, b *block.Block, remoteAddress string) { - a.mux.Lock() - defer a.mux.Unlock() err := a.validateBlock(b) if err != nil { a.log.Error(fmt.Sprintf("remote node address [ %s ], %s ", remoteAddress, err.Error())) } a.log.Info(fmt.Sprintf("block from [ %s ] :: last idx [ %v ] :: new idx [ %v ] \n", remoteAddress, a.lastBlock.Index, b.Index)) - a.lastBlock = *b - go a.wh.PostWebhookBlock(b) // post concurrently }