From df928fb591343a293ced7dc2cce4f30ae1dbae36 Mon Sep 17 00:00:00 2001 From: Bartosz Lenart Date: Mon, 8 May 2023 16:30:51 +0200 Subject: [PATCH] Validator topology check. --- README.md | 36 +++- cmd/central/main.go | 3 +- cmd/validator/main.go | 8 +- docker-compose-postgresql.yaml | 2 +- docker_postgres_init.sql | 2 +- docs/docs.md | 36 +++- queries_postgresql/insert_token.sql | 9 +- queries_postgresql/set_tokens _true.sql | 3 - server/ws.go | 2 - stdoutwriter/stdoutwriter.go | 10 + validator/{check.go => block.go} | 8 +- validator/socket.go | 12 ++ validator/validator.go | 244 +++++++++++++----------- 13 files changed, 231 insertions(+), 144 deletions(-) delete mode 100644 queries_postgresql/set_tokens _true.sql create mode 100644 stdoutwriter/stdoutwriter.go rename validator/{check.go => block.go} (53%) create mode 100644 validator/socket.go diff --git a/README.md b/README.md index 48285fa..1f9bf3b 100644 --- a/README.md +++ b/README.md @@ -2377,6 +2377,30 @@ type Verifier interface { } ``` +# stdoutwriter + +```go +import "github.com/bartossh/Computantis/stdoutwriter" +``` + +## Index + +- [type Logger](<#type-logger>) + - [func (l Logger) Write(p []byte) (n int, err error)](<#func-logger-write>) + + +## type [Logger]() + +```go +type Logger struct{} +``` + +### func \(Logger\) [Write]() + +```go +func (l Logger) Write(p []byte) (n int, err error) +``` + # stress ```go @@ -2568,7 +2592,7 @@ var ( ) ``` -## func [Run]() +## func [Run]() ```go func Run(ctx context.Context, cfg Config, srw StatusReadWriter, log logger.Logger, ver Verifier, wh WebhookCreateRemovePoster, wallet *wallet.Wallet) error @@ -2576,7 +2600,7 @@ func Run(ctx context.Context, cfg Config, srw StatusReadWriter, log logger.Logge Run initializes routing and runs the validator. To stop the validator cancel the context. Validator connects to the central server via websocket and listens for new blocks. It will block until the context is canceled. -## type [Config]() +## type [Config]() Config contains configuration of the validator. @@ -2604,7 +2628,7 @@ type CreateRemoveUpdateHookRequest struct { } ``` -## type [Status]() +## type [Status]() Status is a status of each received block by the validator. It keeps track of invalid blocks in case of blockchain corruption. @@ -2618,7 +2642,7 @@ type Status struct { } ``` -## type [StatusReadWriter]() +## type [StatusReadWriter]() StatusReadWriter provides methods to bulk read and single write validator status. @@ -2629,7 +2653,7 @@ type StatusReadWriter interface { } ``` -## type [Verifier]() +## type [Verifier]() Verifier provides methods to verify the signature of the message. @@ -2651,7 +2675,7 @@ type WebHookNewBlockMessage struct { } ``` -## type [WebhookCreateRemovePoster]() +## type [WebhookCreateRemovePoster]() WebhookCreateRemovePoster provides methods to create, remove webhooks and post messages to webhooks. diff --git a/cmd/central/main.go b/cmd/central/main.go index 8c5cdcd..6130957 100644 --- a/cmd/central/main.go +++ b/cmd/central/main.go @@ -15,6 +15,7 @@ import ( "github.com/bartossh/Computantis/logging" "github.com/bartossh/Computantis/reactive" "github.com/bartossh/Computantis/server" + "github.com/bartossh/Computantis/stdoutwriter" "github.com/bartossh/Computantis/wallet" ) @@ -57,7 +58,7 @@ func main() { panic(fmt.Sprintf("error with logger: %s", err)) } - log := logging.New(callbackOnErr, callbackOnFatal, db) + log := logging.New(callbackOnErr, callbackOnFatal, db, stdoutwriter.Logger{}) if err := blockchain.GenesisBlock(ctx, db); err != nil { fmt.Println(err) diff --git a/cmd/validator/main.go b/cmd/validator/main.go index 08cd86d..8a7b91b 100644 --- a/cmd/validator/main.go +++ b/cmd/validator/main.go @@ -12,6 +12,7 @@ import ( "github.com/bartossh/Computantis/configuration" "github.com/bartossh/Computantis/fileoperations" "github.com/bartossh/Computantis/logging" + "github.com/bartossh/Computantis/stdoutwriter" "github.com/bartossh/Computantis/validator" "github.com/bartossh/Computantis/wallet" "github.com/bartossh/Computantis/webhooks" @@ -40,14 +41,14 @@ func main() { defer db.Disconnect(ctxx) callbackOnErr := func(err error) { - fmt.Println("error with logger: ", err) + fmt.Println("logger error: ", err) } callbackOnFatal := func(err error) { - panic(fmt.Sprintf("error with logger: %s", err)) + panic(fmt.Sprintf("fatal error: %s", err)) } - log := logging.New(callbackOnErr, callbackOnFatal, db) + log := logging.New(callbackOnErr, callbackOnFatal, db, stdoutwriter.Logger{}) go func() { <-c @@ -70,6 +71,5 @@ func main() { if err := validator.Run(ctx, cfg.Validator, db, log, verify, wh, &wl); err != nil { log.Error(err.Error()) - fmt.Println(err.Error()) } } diff --git a/docker-compose-postgresql.yaml b/docker-compose-postgresql.yaml index 1d179b0..e7626fd 100644 --- a/docker-compose-postgresql.yaml +++ b/docker-compose-postgresql.yaml @@ -5,7 +5,7 @@ services: container_name: computantis_postgresql image: postgres:latest environment: - POSTGRES_USER: ${POSTGRES_DB_USER} + POSTGRES_USER: "postgres" POSTGRES_PASSWORD: ${POSTGRES_DB_PASSWORD} volumes: - ./docker_postgres_init.sql:/docker-entrypoint-initdb.d/docker_postgres_init.sql diff --git a/docker_postgres_init.sql b/docker_postgres_init.sql index 5b22c4b..c7b7761 100644 --- a/docker_postgres_init.sql +++ b/docker_postgres_init.sql @@ -121,7 +121,7 @@ CREATE INDEX validator_created_at ON validatorStatus USING BTREE (created_at); CREATE TABLE IF NOT EXISTS nodes ( id serial PRIMARY KEY, node VARCHAR ( 64 ) UNIQUE NOT NULL, - websocket VARCHAR ( 256 ) UNIQUE NOT NULL, + websocket VARCHAR ( 256 ) UNIQUE NOT NULL ); CREATE INDEX nodes_index ON nodes USING HASH (node); diff --git a/docs/docs.md b/docs/docs.md index f6d4935..3d83c6e 100644 --- a/docs/docs.md +++ b/docs/docs.md @@ -2296,6 +2296,30 @@ type Verifier interface { } ``` +# stdoutwriter + +```go +import "github.com/bartossh/Computantis/stdoutwriter" +``` + +## Index + +- [type Logger](<#type-logger>) + - [func (l Logger) Write(p []byte) (n int, err error)](<#func-logger-write>) + + +## type [Logger]() + +```go +type Logger struct{} +``` + +### func \(Logger\) [Write]() + +```go +func (l Logger) Write(p []byte) (n int, err error) +``` + # stress ```go @@ -2487,7 +2511,7 @@ var ( ) ``` -## func [Run]() +## func [Run]() ```go func Run(ctx context.Context, cfg Config, srw StatusReadWriter, log logger.Logger, ver Verifier, wh WebhookCreateRemovePoster, wallet *wallet.Wallet) error @@ -2495,7 +2519,7 @@ func Run(ctx context.Context, cfg Config, srw StatusReadWriter, log logger.Logge Run initializes routing and runs the validator. To stop the validator cancel the context. Validator connects to the central server via websocket and listens for new blocks. It will block until the context is canceled. -## type [Config]() +## type [Config]() Config contains configuration of the validator. @@ -2523,7 +2547,7 @@ type CreateRemoveUpdateHookRequest struct { } ``` -## type [Status]() +## type [Status]() Status is a status of each received block by the validator. It keeps track of invalid blocks in case of blockchain corruption. @@ -2537,7 +2561,7 @@ type Status struct { } ``` -## type [StatusReadWriter]() +## type [StatusReadWriter]() StatusReadWriter provides methods to bulk read and single write validator status. @@ -2548,7 +2572,7 @@ type StatusReadWriter interface { } ``` -## type [Verifier]() +## type [Verifier]() Verifier provides methods to verify the signature of the message. @@ -2570,7 +2594,7 @@ type WebHookNewBlockMessage struct { } ``` -## type [WebhookCreateRemovePoster]() +## type [WebhookCreateRemovePoster]() WebhookCreateRemovePoster provides methods to create, remove webhooks and post messages to webhooks. diff --git a/queries_postgresql/insert_token.sql b/queries_postgresql/insert_token.sql index 1cda359..1e93f5d 100644 --- a/queries_postgresql/insert_token.sql +++ b/queries_postgresql/insert_token.sql @@ -1,3 +1,10 @@ -INSERT INTO tokens (token, valid, expiration_date) VALUES ('80fda91a43989fa81347aa011e0f1e0fdde4eaabb408bf426166a62c80456c30', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('ykkeD6Tr6xikkYwC805kVoFThm8VGEHStTFk1lIU6RgEf7p3vjFpPQFI3VP9SYeARjYh2jecMSYsmgddjZZcy32iySHijJQ', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('8CdWLXrx5GGSSu3je0m6SbCqIuEj7emrsrt7lvm6AeaIQl8d6MCNZKMS00ODA6TrjVYKg4NB9Js4xlSetRdZ4edYupHgBKwX', true, 9223372036854775807); INSERT INTO tokens (token, valid, expiration_date) VALUES ('G8OH7lHu5qfWVumWom0ySN29lakog8nhzSPEwROMjvhdI6VgZ6GoPcdJmoIo7sF3lxQNJMOTKxpYBr6zF992WN86uB7xTEJZ', true, 9223372036854775807); INSERT INTO tokens (token, valid, expiration_date) VALUES ('jykkeD6Tr6xikkYwC805kVoFThm8VGEHStTFk1lIU6RgEf7p3vjFpPQFI3VP9SYeARjYh2jecMSYsmgddjZZcy32iySHijJQ', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('bIJZyIQLw9hTP0rnbOwmK1G4xlcAXT46IPEkqFdF03gpb2YDuASjWyYVtJIDFdbJm5cRueIbEozhxN8DeevIuapj4BPwfK3d', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('wGrKWMTNzVT5kqtBWPAlRz58L2AOY3BSZ9PN7WGm1EonyGStnOFNX9y3Tr0p635vbe5dD1TiONgCGiP7yIVc2tVEzfCnYL15', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('ZepH88DsFcoPoZUzIE0AI3gRcCrQ8KhDpzESbxoQiyrB77CtKn7MZnjcj9cRla4aucjrgpnTMtM1AtkegwhXnE6iAKRv6hON', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('w4NXZ8H5vebzhfgvfanFXzEIaoPwyWeZpZjRheo4LnG8vjWlMQeNVBz9lCMhTiBbj1PjVFWXHiUyZW21P7o6DkTlrx5x3tJ1', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('a6858eLd1GHvoGXrq6nNhEiHrEmkRN3tXu5dYqCjiMUL9sRfUz1iBns0kEnPizzrLfj2TZGU2Wel52fJ6YDNiVrdtvf2kZm4', true, 9223372036854775807); +INSERT INTO tokens (token, valid, expiration_date) VALUES ('80fda91a43989fa81347aa011e0f1e0fdde4eaabb408bf426166a62c80456c30', true, 9223372036854775807); diff --git a/queries_postgresql/set_tokens _true.sql b/queries_postgresql/set_tokens _true.sql deleted file mode 100644 index 411b922..0000000 --- a/queries_postgresql/set_tokens _true.sql +++ /dev/null @@ -1,3 +0,0 @@ -UPDATE tokens SET valid = ? WHERE token = ?, true, "80fda91a43989fa81347aa011e0f1e0fdde4eaabb408bf426166a62c80456c30"; -UPDATE tokens SET valid = ? WHERE token = ?, true, "G8OH7lHu5qfWVumWom0ySN29lakog8nhzSPEwROMjvhdI6VgZ6GoPcdJmoIo7sF3lxQNJMOTKxpYBr6zF992WN86uB7xTEJZ"; -UPDATE tokens SET valid = ? WHERE token = ?, true, "jykkeD6Tr6xikkYwC805kVoFThm8VGEHStTFk1lIU6RgEf7p3vjFpPQFI3VP9SYeARjYh2jecMSYsmgddjZZcy32iySHijJQ"; \ No newline at end of file diff --git a/server/ws.go b/server/ws.go index 2dfd6a2..62751d0 100644 --- a/server/ws.go +++ b/server/ws.go @@ -142,8 +142,6 @@ func (s *server) wsWrapper(c *fiber.Ctx) error { client.readPump() } - fmt.Println("serving ws", c.IP()) - return websocket.New(serveWs)(c) } diff --git a/stdoutwriter/stdoutwriter.go b/stdoutwriter/stdoutwriter.go new file mode 100644 index 0000000..5d62749 --- /dev/null +++ b/stdoutwriter/stdoutwriter.go @@ -0,0 +1,10 @@ +package stdoutwriter + +import "fmt" + +type Logger struct{} + +func (l Logger) Write(p []byte) (n int, err error) { + fmt.Println(string(p)) + return len(p), nil +} diff --git a/validator/check.go b/validator/block.go similarity index 53% rename from validator/check.go rename to validator/block.go index f3f1609..3a463b6 100644 --- a/validator/check.go +++ b/validator/block.go @@ -1,8 +1,10 @@ package validator -import "github.com/bartossh/Computantis/block" +import ( + "github.com/bartossh/Computantis/block" +) -func (a app) validateBlock(block *block.Block) error { +func (a *app) validateBlock(block *block.Block) error { if a.lastBlock.Index != 0 { if block.Index != a.lastBlock.Index+1 { return ErrBlockIndexIsInvalid @@ -11,7 +13,7 @@ func (a app) validateBlock(block *block.Block) error { return ErrBlockPrevHashIsInvalid } } - if !block.Validate(block.TrxHashes) { // TODO: use hashes of received transactions after sending transactions is implemented + if !block.Validate(block.TrxHashes) { return ErrProofBlockIsInvalid } return nil diff --git a/validator/socket.go b/validator/socket.go new file mode 100644 index 0000000..dbc6e6d --- /dev/null +++ b/validator/socket.go @@ -0,0 +1,12 @@ +package validator + +import ( + "context" + + "github.com/fasthttp/websocket" +) + +type socket struct { + conn *websocket.Conn + cancel context.CancelFunc +} diff --git a/validator/validator.go b/validator/validator.go index fa5db0e..f502d4b 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -4,6 +4,7 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "fmt" "net/http" "sync" @@ -71,10 +72,11 @@ type app struct { cfg Config srw StatusReadWriter log logger.Logger - conns map[string]*websocket.Conn + conns map[string]socket ver Verifier wh WebhookCreateRemovePoster wallet *wallet.Wallet + cancel context.CancelFunc } func (a *app) blocks(c *fiber.Ctx) error { @@ -85,142 +87,147 @@ func (a *app) blocks(c *fiber.Ctx) error { // Validator connects to the central server via websocket and listens for new blocks. // It will block until the context is canceled. func Run(ctx context.Context, cfg Config, srw StatusReadWriter, log logger.Logger, ver Verifier, wh WebhookCreateRemovePoster, wallet *wallet.Wallet) error { - hash, signature := wallet.Sign([]byte(cfg.Token)) - - header := make(http.Header) - header.Add("Token", cfg.Token) - header.Add("Address", wallet.Address()) - header.Add("Signature", hex.EncodeToString(signature[:])) - header.Add("Hash", hex.EncodeToString(hash[:])) - - ctxTimeout, cancel := context.WithTimeout(ctx, wsConnectionTimeout) - c, _, err := websocket.DefaultDialer.DialContext(ctxTimeout, cfg.Websocket, header) - cancel() - if err != nil { - return err - } - defer c.Close() - ctxx, cancel := context.WithCancel(ctx) - a := &app{ mux: sync.RWMutex{}, cfg: cfg, srw: srw, log: log, - conns: map[string]*websocket.Conn{cfg.Websocket: c}, + conns: make(map[string]socket), ver: ver, wh: wh, wallet: wallet, + cancel: cancel, + } + if err := a.connectToSocket(ctxx, cfg.Websocket); err != nil { + return err } + return a.runServer(ctxx, cancel) +} - go a.pullPump(ctxx, cancel) - go a.pushPump(ctxx, cancel) +func (a *app) connectToSocket(ctx context.Context, address string) error { + a.mux.RLock() + if _, ok := a.conns[address]; ok { + a.mux.RUnlock() + return nil + } + a.mux.RUnlock() - return a.runServer(ctxx, cancel) + hash, signature := a.wallet.Sign([]byte(a.cfg.Token)) + header := make(http.Header) + header.Add("Token", a.cfg.Token) + header.Add("Address", a.wallet.Address()) + header.Add("Signature", hex.EncodeToString(signature[:])) + header.Add("Hash", hex.EncodeToString(hash[:])) + + ctxTimeout, cancelTimeout := context.WithTimeout(ctx, wsConnectionTimeout) + defer cancelTimeout() + c, _, err := websocket.DefaultDialer.DialContext(ctxTimeout, a.cfg.Websocket, header) + if err != nil { + return err + } + + ctxx, cancelx := context.WithCancel(ctx) + a.mux.Lock() + defer a.mux.Unlock() + a.conns[address] = socket{ + conn: c, + cancel: cancelx, + } + + go a.pullPump(ctxx, c, address) + go a.pushPump(ctxx, c, address) + a.log.Info(fmt.Sprintf("validator connected to central node on address: %s", address)) + + return nil +} + +func (a *app) disconnectFromSocket(address string) error { + a.mux.Lock() + defer a.mux.Unlock() + conn, ok := a.conns[address] + if !ok { + if len(a.conns) == 0 { + a.cancel() + return errors.New("no connections left") + } + return nil + } + conn.cancel() + delete(a.conns, address) + if len(a.conns) == 0 { + a.cancel() + return errors.New("no connections left") + } + a.log.Info(fmt.Sprintf("disconnected from %s", address)) + + return nil } -func (a *app) pullPump(ctx context.Context, cancel context.CancelFunc) { +func (a *app) pullPump(ctx context.Context, conn *websocket.Conn, address string) { ticker := time.NewTicker(time.Millisecond * 100) defer ticker.Stop() - defer cancel() -listener: for { select { case <-ctx.Done(): - break listener - case <-ticker.C: - var connect, remove []string - for _, conn := range a.conns { - msgType, raw, err := conn.ReadMessage() - if err != nil { - a.log.Error(fmt.Sprintf("validator read msg error, %s", err.Error())) - continue - } - switch msgType { - case websocket.PingMessage, websocket.PongMessage: - continue - case websocket.CloseMessage: - remove = append(remove, conn.RemoteAddr().String()) - default: - var msg server.Message - if err := json.Unmarshal(raw, &msg); err != nil { - a.log.Error(fmt.Sprintf("validator unmarshal msg error, %s", err.Error())) - continue - } - if msg.Error != "" { - a.log.Error(fmt.Sprintf("validator msg error, %s", msg.Error)) - continue - } - c, r := a.processMessage(&msg) - connect = append(connect, c...) - remove = append(remove, r...) - } + err := a.disconnectFromSocket(address) + if err != nil { + a.log.Error(err.Error()) } - a.mux.Lock() - for _, del := range remove { - if _, ok := a.conns[del]; ok { - a.conns[del].Close() - delete(a.conns, del) - } - } - if len(connect) == 0 { - a.mux.Unlock() + return + case <-ticker.C: + msgType, raw, err := conn.ReadMessage() + if err != nil { + a.log.Error(fmt.Sprintf("validator read msg error, %s", err.Error())) continue } + switch msgType { + case websocket.PingMessage, websocket.PongMessage: + continue + case websocket.CloseMessage: + if err := a.disconnectFromSocket(address); err != nil { + a.log.Error(fmt.Sprintf("disconnect on close, %s", err.Error())) + } - hash, signature := a.wallet.Sign([]byte(a.cfg.Token)) - header := make(http.Header) - header.Add("Token", a.cfg.Token) - header.Add("Address", a.wallet.Address()) - header.Add("Signature", hex.EncodeToString(signature[:])) - header.Add("Hash", hex.EncodeToString(hash[:])) - for _, add := range connect { - ctxTimeout, cancel := context.WithTimeout(ctx, wsConnectionTimeout) - c, _, err := websocket.DefaultDialer.DialContext(ctxTimeout, add, header) - cancel() - if err != nil { - return + default: + var msg server.Message + if err := json.Unmarshal(raw, &msg); err != nil { + a.log.Error(fmt.Sprintf("validator unmarshal msg error, %s", err.Error())) + continue + } + if msg.Error != "" { + a.log.Error(fmt.Sprintf("validator msg error, %s", msg.Error)) + continue } - a.conns[add] = c + a.processMessage(ctx, &msg, conn.RemoteAddr().String()) } - a.mux.Unlock() - } } } -func (a *app) pushPump(ctx context.Context, cancel context.CancelFunc) { +func (a *app) pushPump(ctx context.Context, conn *websocket.Conn, address string) { ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() - defer cancel() -connectionPingCloser: for { select { case <-ticker.C: - a.mux.RLock() - for _, conn := range a.conns { - err := conn.WriteMessage(websocket.PingMessage, nil) - if err != nil { - a.log.Error(fmt.Sprintf("validator write msg error, %s", err.Error())) - break connectionPingCloser + err := conn.WriteMessage(websocket.PingMessage, nil) + if err != nil { + a.log.Error(fmt.Sprintf("validator write msg error, %s", err.Error())) + if err := a.disconnectFromSocket(address); err != nil { + a.log.Error(err.Error()) } + return } - a.mux.RUnlock() - case <-ctx.Done(): - a.mux.Lock() - a.log.Info("validator closing connection") - for k, conn := range a.conns { - err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) - if err != nil { - a.log.Error(fmt.Sprintf("validator write closing msg error, %s", err.Error())) - } - conn.Close() - delete(a.conns, k) + err := conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + if err != nil { + a.log.Error(fmt.Sprintf("validator write closing msg error, %s", err.Error())) } - a.mux.Unlock() - break connectionPingCloser + if err := a.disconnectFromSocket(address); err != nil { + a.log.Error(err.Error()) + } + return } } } @@ -255,37 +262,35 @@ func (a *app) runServer(ctx context.Context, cancel context.CancelFunc) error { return nil } -func (a *app) processMessage(m *server.Message) (connect, remove []string) { +func (a *app) processMessage(ctx context.Context, m *server.Message, remoteAddress string) { switch m.Command { case server.CommandNewBlock: - return a.processBlock(&m.Block) + a.processBlock(ctx, &m.Block, remoteAddress) case server.CommandNewTransaction: - return + a.log.Warn("not implemented") case server.CommandSocketList: - return a.processSocketList(m.Sockets) + a.processSocketList(ctx, m.Sockets) default: a.log.Error(fmt.Sprintf("validator received unknown command, %s", m.Command)) - return } } -func (a *app) processBlock(b *block.Block) (_, _ []string) { +func (a *app) processBlock(_ context.Context, b *block.Block, remoteAddress string) { + a.mux.Lock() + defer a.mux.Unlock() err := a.validateBlock(b) - - switch err { - case nil: - a.lastBlock = *b - default: - // TODO: trigger webhook alert about invalid block - // TODO: implement strategy to handle invalid blocks - a.log.Error(fmt.Sprintf("validator received invalid block, %s", err.Error())) + if err != nil { + a.log.Error(fmt.Sprintf("remote address: %s => validator received invalid: %s ", remoteAddress, err.Error())) } + fmt.Printf("remote address: %s => last block idx: %v | new block idx %v \n", remoteAddress, a.lastBlock.Index, b.Index) + a.lastBlock = *b go a.wh.PostWebhookBlock(b) // post concurrently - return } -func (a *app) processSocketList(sockets []string) (connect, remove []string) { +func (a *app) processSocketList(ctx context.Context, sockets []string) { + var connect, remove []string + a.mux.RLock() uniqueSockets := make(map[string]struct{}) for _, socket := range sockets { if _, ok := a.conns[socket]; !ok { @@ -298,5 +303,12 @@ func (a *app) processSocketList(sockets []string) (connect, remove []string) { remove = append(remove, socket) } } - return + a.mux.RUnlock() + + for _, socket := range connect { + a.connectToSocket(ctx, socket) + } + for _, socket := range remove { + a.disconnectFromSocket(socket) + } }