diff --git a/internal/config/config.go b/internal/config/config.go index 089c7b83c6..644e9c8959 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -273,6 +273,9 @@ type PrefixArray interface { type RootKey string func Reset() { + keysMutex.Lock() // must only call viper directly here (as we already hold the lock) + defer keysMutex.Unlock() + viper.Reset() // Set defaults @@ -351,11 +354,14 @@ func Reset() { viper.SetDefault(string(IdentityManagerCacheLimit), 100 /* items */) viper.SetDefault(string(IdentityManagerCacheTTL), "1h") - i18n.SetLang(GetString(Lang)) + i18n.SetLang(viper.GetString(string(Lang))) } // ReadConfig initializes the config func ReadConfig(cfgFile string) error { + keysMutex.Lock() // must only call viper directly here (as we already hold the lock) + defer keysMutex.Unlock() + // Set precedence order for reading config location viper.SetEnvPrefix("firefly") viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) @@ -377,6 +383,9 @@ func ReadConfig(cfgFile string) error { } func MergeConfig(configRecords []*fftypes.ConfigRecord) error { + keysMutex.Lock() + defer keysMutex.Unlock() + for _, c := range configRecords { s := viper.New() s.SetConfigType("json") @@ -422,9 +431,10 @@ func rootKey(k string) RootKey { // GetKnownKeys gets the known keys func GetKnownKeys() []string { - keys := make([]string, 0, len(knownKeys)) keysMutex.Lock() defer keysMutex.Unlock() + + keys := make([]string, 0, len(knownKeys)) for k := range knownKeys { keys = append(keys, k) } @@ -454,8 +464,7 @@ func NewPluginConfig(prefix string) Prefix { } func (c *configPrefix) prefixKey(k string) string { - keysMutex.Lock() - defer keysMutex.Unlock() + // Caller responsible for holding lock when calling key := c.prefix + k if !knownKeys[key] { panic(fmt.Sprintf("Undefined configuration key '%s'", key)) @@ -506,9 +515,10 @@ func (c *configPrefixArray) ArrayEntry(i int) Prefix { } func (c *configPrefixArray) AddKnownKey(k string, defValue ...interface{}) { - // Put a simulated key in the known keys array, to pop into the help info. keysMutex.Lock() defer keysMutex.Unlock() + + // Put a simulated key in the known keys array, to pop into the help info. knownKeys[fmt.Sprintf("%s[].%s", c.base, k)] = true c.defaults[k] = defValue } @@ -531,6 +541,9 @@ func (c *configPrefix) SetDefault(k string, defValue interface{}) { } func GetConfig() fftypes.JSONObject { + keysMutex.Lock() + defer keysMutex.Unlock() + conf := fftypes.JSONObject{} _ = viper.Unmarshal(&conf) return conf @@ -541,6 +554,9 @@ func GetString(key RootKey) string { return root.GetString(string(key)) } func (c *configPrefix) GetString(key string) string { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetString(c.prefixKey(key)) } @@ -549,6 +565,9 @@ func GetStringSlice(key RootKey) []string { return root.GetStringSlice(string(key)) } func (c *configPrefix) GetStringSlice(key string) []string { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetStringSlice(c.prefixKey(key)) } @@ -557,6 +576,9 @@ func GetBool(key RootKey) bool { return root.GetBool(string(key)) } func (c *configPrefix) GetBool(key string) bool { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetBool(c.prefixKey(key)) } @@ -565,6 +587,9 @@ func GetDuration(key RootKey) time.Duration { return root.GetDuration(string(key)) } func (c *configPrefix) GetDuration(key string) time.Duration { + keysMutex.Lock() + defer keysMutex.Unlock() + return fftypes.ParseToDuration(viper.GetString(c.prefixKey(key))) } @@ -573,7 +598,10 @@ func GetByteSize(key RootKey) int64 { return root.GetByteSize(string(key)) } func (c *configPrefix) GetByteSize(key string) int64 { - return fftypes.ParseToByteSize(c.GetString(key)) + keysMutex.Lock() + defer keysMutex.Unlock() + + return fftypes.ParseToByteSize(viper.GetString(c.prefixKey(key))) } // GetUint gets a configuration uint @@ -581,6 +609,9 @@ func GetUint(key RootKey) uint { return root.GetUint(string(key)) } func (c *configPrefix) GetUint(key string) uint { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetUint(c.prefixKey(key)) } @@ -589,6 +620,9 @@ func GetInt(key RootKey) int { return root.GetInt(string(key)) } func (c *configPrefix) GetInt(key string) int { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetInt(c.prefixKey(key)) } @@ -597,6 +631,9 @@ func GetInt64(key RootKey) int64 { return root.GetInt64(string(key)) } func (c *configPrefix) GetInt64(key string) int64 { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetInt64(c.prefixKey(key)) } @@ -605,6 +642,9 @@ func GetFloat64(key RootKey) float64 { return root.GetFloat64(string(key)) } func (c *configPrefix) GetFloat64(key string) float64 { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.GetFloat64(c.prefixKey(key)) } @@ -613,6 +653,9 @@ func GetObject(key RootKey) fftypes.JSONObject { return root.GetObject(string(key)) } func (c *configPrefix) GetObject(key string) fftypes.JSONObject { + keysMutex.Lock() + defer keysMutex.Unlock() + return fftypes.JSONObject(viper.GetStringMap(c.prefixKey(key))) } @@ -621,6 +664,9 @@ func GetObjectArray(key RootKey) fftypes.JSONObjectArray { return root.GetObjectArray(string(key)) } func (c *configPrefix) GetObjectArray(key string) fftypes.JSONObjectArray { + keysMutex.Lock() + defer keysMutex.Unlock() + v, _ := fftypes.ToJSONObjectArray(viper.Get(c.prefixKey(key))) return v } @@ -630,6 +676,9 @@ func Get(key RootKey) interface{} { return root.Get(string(key)) } func (c *configPrefix) Get(key string) interface{} { + keysMutex.Lock() + defer keysMutex.Unlock() + return viper.Get(c.prefixKey(key)) } @@ -638,11 +687,17 @@ func Set(key RootKey, value interface{}) { root.Set(string(key), value) } func (c *configPrefix) Set(key string, value interface{}) { + keysMutex.Lock() + defer keysMutex.Unlock() + viper.Set(c.prefixKey(key), value) } // Resolve gives the fully qualified path of a key func (c *configPrefix) Resolve(key string) string { + keysMutex.Lock() + defer keysMutex.Unlock() + return c.prefixKey(key) } diff --git a/internal/config/wsconfig/wsconfig.go b/internal/config/wsconfig/wsconfig.go index 0b6a0113bc..9ebf76d6e1 100644 --- a/internal/config/wsconfig/wsconfig.go +++ b/internal/config/wsconfig/wsconfig.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -25,6 +25,7 @@ import ( const ( defaultIntialConnectAttempts = 5 defaultBufferSize = "16Kb" + defaultHeartbeatInterval = "30s" // up to a minute to detect a dead connection ) const ( @@ -38,6 +39,8 @@ const ( WSConfigKeyInitialConnectAttempts = "ws.initialConnectAttempts" // WSConfigKeyPath if set will define the path to connect to - allows sharing of the same URL between HTTP and WebSocket connection info WSConfigKeyPath = "ws.path" + // WSConfigHeartbeatInterval is the frequency of ping/pong requests, and also used for the timeout to receive a response to the heartbeat + WSConfigHeartbeatInterval = "ws.heartbeatInterval" ) // InitPrefix ensures the prefix is initialized for HTTP too, as WS and HTTP @@ -48,6 +51,7 @@ func InitPrefix(prefix config.KeySet) { prefix.AddKnownKey(WSConfigKeyReadBufferSize, defaultBufferSize) prefix.AddKnownKey(WSConfigKeyInitialConnectAttempts, defaultIntialConnectAttempts) prefix.AddKnownKey(WSConfigKeyPath) + prefix.AddKnownKey(WSConfigHeartbeatInterval, defaultHeartbeatInterval) } func GenerateConfigFromPrefix(prefix config.Prefix) *wsclient.WSConfig { @@ -62,5 +66,6 @@ func GenerateConfigFromPrefix(prefix config.Prefix) *wsclient.WSConfig { HTTPHeaders: prefix.GetObject(restclient.HTTPConfigHeaders), AuthUsername: prefix.GetString(restclient.HTTPConfigAuthUsername), AuthPassword: prefix.GetString(restclient.HTTPConfigAuthPassword), + HeartbeatInterval: prefix.GetDuration(WSConfigHeartbeatInterval), } } diff --git a/internal/events/websockets/websocket_connection.go b/internal/events/websockets/websocket_connection.go index 79dc29687b..e33dc74775 100644 --- a/internal/events/websockets/websocket_connection.go +++ b/internal/events/websockets/websocket_connection.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -44,6 +44,7 @@ type websocketConnection struct { connID string sendMessages chan interface{} senderDone chan struct{} + receiverDone chan struct{} autoAck bool started []*websocketStartedSub inflight []*fftypes.EventDeliveryResponse @@ -64,6 +65,7 @@ func newConnection(pCtx context.Context, ws *WebSockets, wsConn *websocket.Conn) connID: connID, sendMessages: make(chan interface{}), senderDone: make(chan struct{}), + receiverDone: make(chan struct{}), } go wc.sendLoop() go wc.receiveLoop() @@ -104,11 +106,7 @@ func (wc *websocketConnection) sendLoop() { defer wc.close() for { select { - case msg, ok := <-wc.sendMessages: - if !ok { - l.Debugf("Sender closing") - return - } + case msg := <-wc.sendMessages: l.Tracef("Sending: %+v", msg) writer, err := wc.wsConn.NextWriter(websocket.TextMessage) if err == nil { @@ -119,6 +117,9 @@ func (wc *websocketConnection) sendLoop() { l.Errorf("Write failed on socket: %s", err) return } + case <-wc.receiverDone: + l.Debugf("Sender closing - receiver completed") + return case <-wc.ctx.Done(): l.Debugf("Sender closing - context cancelled") return @@ -128,7 +129,7 @@ func (wc *websocketConnection) sendLoop() { func (wc *websocketConnection) receiveLoop() { l := log.L(wc.ctx) - defer close(wc.sendMessages) + defer close(wc.receiverDone) for { var msgData []byte var msgHeader fftypes.WSClientActionBase @@ -361,5 +362,5 @@ func (wc *websocketConnection) close() { func (wc *websocketConnection) waitClose() { <-wc.senderDone - <-wc.sendMessages + <-wc.receiverDone } diff --git a/internal/i18n/en_translations.go b/internal/i18n/en_translations.go index f8b9c0b2d3..9d2950f532 100644 --- a/internal/i18n/en_translations.go +++ b/internal/i18n/en_translations.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -18,205 +18,234 @@ package i18n //revive:disable var ( - MsgConfigFailed = ffm("FF10101", "Failed to read config") - MsgTBD = ffm("FF10102", "TODO: Description") - MsgJSONDecodeFailed = ffm("FF10103", "Failed to decode input JSON") - MsgAPIServerStartFailed = ffm("FF10104", "Unable to start listener on %s: %s") - MsgTLSConfigFailed = ffm("FF10105", "Failed to initialize TLS configuration") - MsgInvalidCAFile = ffm("FF10106", "Invalid CA certificates file") - MsgResponseMarshalError = ffm("FF10107", "Failed to serialize response data", 400) - MsgWebsocketClientError = ffm("FF10108", "Error received from WebSocket client: %s") - Msg404NotFound = ffm("FF10109", "Not found", 404) - MsgUnknownBlockchainPlugin = ffm("FF10110", "Unknown blockchain plugin: %s") - MsgEthconnectRESTErr = ffm("FF10111", "Error from ethconnect: %s") - MsgDBInitFailed = ffm("FF10112", "Database initialization failed") - MsgDBQueryBuildFailed = ffm("FF10113", "Database query builder failed") - MsgDBBeginFailed = ffm("FF10114", "Database begin transaction failed") - MsgDBQueryFailed = ffm("FF10115", "Database query failed") - MsgDBInsertFailed = ffm("FF10116", "Database insert failed") - MsgDBUpdateFailed = ffm("FF10117", "Database update failed") - MsgDBDeleteFailed = ffm("FF10118", "Database delete failed") - MsgDBCommitFailed = ffm("FF10119", "Database commit failed") - MsgDBMissingJoin = ffm("FF10120", "Database missing expected join entry in table '%s' for id '%s'") - MsgDBReadErr = ffm("FF10121", "Database resultset read error from table '%s'") - MsgUnknownDatabasePlugin = ffm("FF10122", "Unknown database plugin '%s'") - MsgNullDataReferenceID = ffm("FF10123", "Data id is null in message data reference %d") - MsgDupDataReferenceID = ffm("FF10124", "Duplicate data ID in message '%s'") - MsgScanFailed = ffm("FF10125", "Failed to restore type '%T' into '%T'") - MsgUnregisteredBatchType = ffm("FF10126", "Unregistered batch type '%s'") - MsgBatchDispatchTimeout = ffm("FF10127", "Timed out dispatching work to batch") - MsgInitializationNilDepError = ffm("FF10128", "Initialization error due to unmet dependency") - MsgNilResponseNon204 = ffm("FF10129", "No output from API call") - MsgInvalidContentType = ffm("FF10130", "Invalid content type", 415) - MsgInvalidName = ffm("FF10131", "Field '%s' must be 1-64 characters, including alphanumerics (a-zA-Z0-9), dot (.), dash (-) and underscore (_), and must start/end in an alphanumeric", 400) - MsgUnknownFieldValue = ffm("FF10132", "Unknown %s '%v'", 400) - MsgDataNotFound = ffm("FF10133", "Data not found for message %s", 400) - MsgUnknownPublicStoragePlugin = ffm("FF10134", "Unknown Public Storage plugin '%s'") - MsgIPFSHashDecodeFailed = ffm("FF10135", "Failed to decode IPFS hash into 32byte value '%s'") - MsgIPFSRESTErr = ffm("FF10136", "Error from IPFS: %s") - MsgSerializationFailed = ffm("FF10137", "Serialization failed") - MsgMissingPluginConfig = ffm("FF10138", "Missing configuration '%s' for %s") - MsgMissingDataHashIndex = ffm("FF10139", "Missing data hash for index '%d' in message", 400) - MsgMissingRequiredField = ffm("FF10140", "Field '%s' is required", 400) - MsgInvalidEthAddress = ffm("FF10141", "Supplied ethereum address is invalid", 400) - MsgInvalidUUID = ffm("FF10142", "Invalid UUID supplied", 400) - Msg404NoResult = ffm("FF10143", "No result found", 404) - MsgNilDataReferenceSealFail = ffm("FF10144", "Invalid message: nil data reference at index %d", 400) - MsgDupDataReferenceSealFail = ffm("FF10145", "Invalid message: duplicate data reference at index %d", 400) - MsgVerifyFailedInvalidHashes = ffm("FF10146", "Invalid message: hashes do not match Hash=%s Expected=%s DataHash=%s DataHashExpected=%s", 400) - MsgVerifyFailedNilHashes = ffm("FF10147", "Invalid message: nil hashes", 400) - MsgInvalidFilterField = ffm("FF10148", "Unknown filter '%s'", 400) - MsgInvalidValueForFilterField = ffm("FF10149", "Unable to parse value for filter '%s'", 400) - MsgUnsupportedSQLOpInFilter = ffm("FF10150", "No SQL mapping implemented for filter operator '%s'", 400) - MsgJSONObjectParseFailed = ffm("FF10151", "Failed to parse '%s' as JSON") - MsgFilterParamDesc = ffm("FF10152", "Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^") - MsgSuccessResponse = ffm("FF10153", "Success") - MsgFilterSortDesc = ffm("FF10154", "Sort field. For multi-field sort use comma separated values (or multiple query values) with '-' prefix for descending") - MsgFilterDescendingDesc = ffm("FF10155", "Descending sort order (overrides all fields in a multi-field sort)") - MsgFilterSkipDesc = ffm("FF10156", "The number of records to skip (max: %d). Unsuitable for bulk operations") - MsgFilterLimitDesc = ffm("FF10157", "The maximum number of records to return (max: %d)") - MsgContextCanceled = ffm("FF10158", "Context cancelled") - MsgWSSendTimedOut = ffm("FF10159", "Websocket send timed out") - MsgWSClosing = ffm("FF10160", "Websocket closing") - MsgWSConnectFailed = ffm("FF10161", "Websocket connect failed") - MsgInvalidURL = ffm("FF10162", "Invalid URL: '%s'") - MsgDBMigrationFailed = ffm("FF10163", "Database migration failed") - MsgHashMismatch = ffm("FF10164", "Hash mismatch") - MsgTimeParseFail = ffm("FF10165", "Cannot parse time as RFC3339, Unix, or UnixNano: '%s'", 400) - MsgDefaultNamespaceNotFound = ffm("FF10166", "namespaces.default '%s' must be included in the namespaces.predefined configuration") - MsgDurationParseFail = ffm("FF10167", "Unable to parse '%s' as duration string, or millisecond number", 400) - MsgEventTypesParseFail = ffm("FF10168", "Unable to parse list of event types", 400) - MsgUnknownEventType = ffm("FF10169", "Unknown event type '%s'", 400) - MsgIDMismatch = ffm("FF10170", "ID mismatch") - MsgRegexpCompileFailed = ffm("FF10171", "Unable to compile '%s' regexp '%s'") - MsgUnknownEventTransportPlugin = ffm("FF10172", "Unknown event transport plugin: %s") - MsgWSConnectionNotActive = ffm("FF10173", "Websocket connection '%s' no longer active") - MsgWSSubAlreadyInFlight = ffm("FF10174", "Websocket subscription '%s' already has a message in flight") - MsgWSMsgSubNotMatched = ffm("FF10175", "Acknowledgment does not match an inflight event + subscription") - MsgWSClientSentInvalidData = ffm("FF10176", "Invalid data") - MsgWSClientUnknownAction = ffm("FF10177", "Unknown action '%s'") - MsgWSInvalidStartAction = ffm("FF10178", "A start action must set namespace and either a name or ephemeral=true") - MsgWSAutoAckChanged = ffm("FF10179", "The autoack option must be set consistently on all start requests") - MsgWSAutoAckEnabled = ffm("FF10180", "The autoack option is enabled on this connection") - MsgConnSubscriptionNotStarted = ffm("FF10181", "Subscription %v is not started on connection") - MsgDispatcherClosing = ffm("FF10182", "Event dispatcher closing") - MsgMaxFilterSkip = ffm("FF10183", "You have reached the maximum pagination limit for this query (%d)") - MsgMaxFilterLimit = ffm("FF10184", "Your query exceeds the maximum filter limit (%d)") - MsgAPIServerStaticFail = ffm("FF10185", "An error occurred loading static content", 500) - MsgEventListenerClosing = ffm("FF10186", "Event listener closing") - MsgNamespaceNotExist = ffm("FF10187", "Namespace does not exist") - MsgFieldTooLong = ffm("FF10188", "Field '%s' maximum length is %d", 400) - MsgInvalidSubscription = ffm("FF10189", "Invalid subscription", 400) - MsgMismatchedTransport = ffm("FF10190", "Connection ID '%s' appears not to be unique between transport '%s' and '%s'", 400) - MsgInvalidFirstEvent = ffm("FF10191", "Invalid firstEvent definition - must be 'newest','oldest' or a sequence number", 400) - MsgNumberMustBeGreaterEqual = ffm("FF10192", "Number must be greater than or equal to %d", 400) - MsgAlreadyExists = ffm("FF10193", "A %s with name '%s:%s' already exists", 409) - MsgJSONValidatorBadRef = ffm("FF10194", "Cannot use JSON validator for data with type '%s' and validator reference '%v'", 400) - MsgDatatypeNotFound = ffm("FF10195", "Datatype '%v' not found", 400) - MsgSchemaLoadFailed = ffm("FF10196", "Datatype '%s' schema invalid", 400) - MsgDataCannotBeValidated = ffm("FF10197", "Data cannot be validated", 400) - MsgJSONDataInvalidPerSchema = ffm("FF10198", "Data does not conform to the JSON schema of datatype '%s': %s", 400) - MsgDataValueIsNull = ffm("FF10199", "Data value is null", 400) - MsgUnknownValidatorType = ffm("FF10200", "Unknown validator type: '%s'", 400) - MsgDataInvalidHash = ffm("FF10201", "Invalid data: hashes do not match Hash=%s Expected=%s", 400) - MsgSystemNSDescription = ffm("FF10202", "FireFly system namespace") - MsgNilID = ffm("FF10203", "ID is nil") - MsgDataReferenceUnresolvable = ffm("FF10204", "Data reference %d cannot be resolved", 400) - MsgDataMissing = ffm("FF10205", "Data entry %d has neither 'id' to refer to existing data, or 'value' to include in-line JSON data", 400) - MsgAuthorInvalid = ffm("FF10206", "Invalid author specified", 400) - MsgNoTransaction = ffm("FF10207", "Message does not have a transaction", 404) - MsgBatchNotSet = ffm("FF10208", "Message does not have an assigned batch", 404) - MsgBatchNotFound = ffm("FF10209", "Batch '%s' not found for message", 500) - MsgBatchTXNotSet = ffm("FF10210", "Batch '%s' does not have an assigned transaction", 404) - MsgOwnerMissing = ffm("FF10211", "Owner missing", 400) - MsgUnknownIdentityPlugin = ffm("FF10212", "Unknown Identity plugin '%s'") - MsgUnknownDataExchangePlugin = ffm("FF10213", "Unknown Data Exchange plugin '%s'") - MsgParentIdentityNotFound = ffm("FF10214", "Organization with identity '%s' not found in identity chain for %s '%s'") - MsgInvalidSigningIdentity = ffm("FF10215", "Invalid signing identity") - MsgNodeAndOrgIDMustBeSet = ffm("FF10216", "node.name, org.name and org.key must be configured first", 409) - MsgBlobStreamingFailed = ffm("FF10217", "Blob streaming terminated with error", 500) - MsgMultiPartFormReadError = ffm("FF10218", "Error reading multi-part form input", 400) - MsgGroupMustHaveMembers = ffm("FF10219", "Group must have at least one member", 400) - MsgEmptyMemberIdentity = ffm("FF10220", "Identity is blank in member %d") - MsgEmptyMemberNode = ffm("FF10221", "Node is blank in member %d") - MsgDuplicateMember = ffm("FF10222", "Member %d is a duplicate org+node combination") - MsgOrgNotFound = ffm("FF10223", "Org with name or identity '%s' not found", 400) - MsgNodeNotFound = ffm("FF10224", "Node with name or identity '%s' not found", 400) - MsgLocalNodeResolveFailed = ffm("FF10225", "Unable to find local node to add to group. Check the status API to confirm the node is registered", 500) - MsgGroupNotFound = ffm("FF10226", "Group '%s' not found", 404) - MsgTooManyItems = ffm("FF10227", "Maximum number of %s items is %d (supplied=%d)", 400) - MsgDuplicateArrayEntry = ffm("FF10228", "Duplicate %s at index %d: '%s'", 400) - MsgDXRESTErr = ffm("FF10229", "Error from data exchange: %s") - MsgGroupInvalidHash = ffm("FF10230", "Invalid group: hashes do not match Hash=%s Expected=%s", 400) - MsgInvalidHex = ffm("FF10231", "Invalid hex supplied", 400) - MsgInvalidWrongLenB32 = ffm("FF10232", "Byte length must be 32 (64 hex characters)", 400) - MsgNodeNotFoundInOrg = ffm("FF10233", "Unable to find any nodes owned by org '%s', or parent orgs", 400) - MsgFilterAscendingDesc = ffm("FF10234", "Ascending sort order (overrides all fields in a multi-field sort)") - MsgPreInitCheckFailed = ffm("FF10235", "Pre-initialization has not yet been completed. Add config records with the admin API complete initialization and reset the node") - MsgFieldsAfterFile = ffm("FF10236", "Additional form field sent after file in multi-part form (ignored): '%s'", 400) - MsgDXBadResponse = ffm("FF10237", "Unexpected '%s' in data exchange response: %s") - MsgDXBadHash = ffm("FF10238", "Unexpected hash returned from data exchange upload. Hash=%s Expected=%s") - MsgBlobNotFound = ffm("FF10239", "No blob has been uploaded or confirmed received, with hash=%s", 404) - MsgDownloadBlobFailed = ffm("FF10240", "Error download blob with reference '%s' from local data exchange") - MsgDataDoesNotHaveBlob = ffm("FF10241", "Data does not have a blob attachment", 404) - MsgWebhookURLEmpty = ffm("FF10242", "Webhook subscription option 'url' cannot be empty", 400) - MsgWebhookInvalidStringMap = ffm("FF10243", "Webhook subscription option '%s' must be map of string values. %s=%T", 400) - MsgWebsocketsNoData = ffm("FF10244", "Websockets subscriptions do not support streaming the full data payload, just the references (withData must be false)", 400) - MsgWebhooksWithData = ffm("FF10245", "Webhook subscriptions require the full data payload (withData must be true)", 400) - MsgWebhooksOptURL = ffm("FF10246", "Webhook url to invoke. Can be relative if a base URL is set in the webhook plugin config") - MsgWebhooksOptMethod = ffm("FF10247", "Webhook method to invoke. Default=POST") - MsgWebhooksOptJSON = ffm("FF10248", "Whether to assume the response body is JSON, regardless of the returned Content-Type") - MsgWebhooksOptReply = ffm("FF10249", "Whether to automatically send a reply event, using the body returned by the webhook") - MsgWebhooksOptHeaders = ffm("FF10250", "Static headers to set on the webhook request") - MsgWebhooksOptQuery = ffm("FF10251", "Static query params to set on the webhook request") - MsgWebhooksOptInput = ffm("FF10252", "A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true") - MsgWebhooksOptInputQuery = ffm("FF10253", "A top-level property of the first data input, to use for query parameters") - MsgWebhooksOptInputHeaders = ffm("FF10254", "A top-level property of the first data input, to use for headers") - MsgWebhooksOptInputBody = ffm("FF10255", "A top-level property of the first data input, to use for the request body. Default is the whole first body") - MsgWebhooksOptFastAck = ffm("FF10256", "When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations") - MsgWebhooksReplyBadJSON = ffm("FF10257", "Failed to process reply from webhook as JSON") - MsgWebhooksOptReplyTag = ffm("FF10258", "The tag to set on the reply message") - MsgWebhooksOptReplyTx = ffm("FF10259", "The transaction type to set on the reply message") - MsgRequestTimeout = ffm("FF10260", "The request with id '%s' timed out after %.2fms", 408) - MsgRequestReplyTagRequired = ffm("FF10261", "For request messages 'header.tag' must be set on the request message to route it to a suitable responder", 400) - MsgRequestCannotHaveCID = ffm("FF10262", "For request messages 'header.cid' must be unset", 400) - MsgRequestTimeoutDesc = ffm("FF10263", "Server-side request timeout (millseconds, or set a custom suffix like 10s)") - MsgWebhooksOptInputPath = ffm("FF10264", "A top-level property of the first data input, to use for a path to append with escaping to the webhook path") - MsgWebhooksOptInputReplyTx = ffm("FF10265", "A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose)") - MsgSystemTransportInternal = ffm("FF10266", "You cannot create subscriptions on the system events transport") - MsgFilterCountNotSupported = ffm("FF10267", "This query does not support generating a count of all results") - MsgFilterCountDesc = ffm("FF10268", "Return a total count as well as items (adds extra database processing)") - MsgRejected = ffm("FF10269", "Message with ID '%s' was rejected. Please check the FireFly logs for more information") - MsgConfirmQueryParam = ffm("FF10270", "When true the HTTP request blocks until the message is confirmed") - MsgRequestMustBePrivate = ffm("FF10271", "For request messages you must specify a group of private recipients", 400) - MsgUnknownTokensPlugin = ffm("FF10272", "Unknown tokens plugin '%s'", 400) - MsgMissingTokensPluginConfig = ffm("FF10273", "Invalid tokens configuration - name and connector are required", 400) - MsgTokensRESTErr = ffm("FF10274", "Error from tokens service: %s") - MsgTokenPoolDuplicate = ffm("FF10275", "Duplicate token pool") - MsgTokenPoolRejected = ffm("FF10276", "Token pool with ID '%s' was rejected. Please check the FireFly logs for more information") - MsgAuthorNotFoundByDID = ffm("FF10277", "Author could not be resolved via DID '%s'") - MsgAuthorOrgNotFoundByName = ffm("FF10278", "Author organization could not be resolved via name '%s'") - MsgAuthorOrgSigningKeyMismatch = ffm("FF10279", "Author organization '%s' is not associated with signing key '%s'") - MsgCannotTransferToSelf = ffm("FF10280", "From and to addresses must be different", 400) - MsgLocalOrgLookupFailed = ffm("FF10281", "Unable resolve the local org by the configured signing key on the node. Please confirm the org is registered with key '%s'", 500) - MsgBigIntTooLarge = ffm("FF10282", "Byte length of serialized integer is too large %d (max=%d)") - MsgBigIntParseFailed = ffm("FF10283", "Failed to parse JSON value '%s' into BigInt") - MsgFabconnectRESTErr = ffm("FF10284", "Error from fabconnect: %s") - MsgInvalidIdentity = ffm("FF10285", "Supplied Fabric signer identity is invalid", 400) - MsgFailedToDecodeCertificate = ffm("FF10286", "Failed to decode certificate: %s", 500) - MsgInvalidMessageType = ffm("FF10287", "Invalid message type - allowed types are %s", 400) - MsgNoUUID = ffm("FF10288", "Field '%s' must not be a UUID", 400) - MsgFetchDataDesc = ffm("FF10289", "Fetch the data and include it in the messages returned", 400) - MsgWSClosed = ffm("FF10290", "Websocket closed") - MsgTokenTransferFailed = ffm("FF10291", "Token transfer with ID '%s' failed. Please check the FireFly logs for more information") - MsgFieldNotSpecified = ffm("FF10292", "Field '%s' must be specified", 400) - MsgTokenPoolNotConfirmed = ffm("FF10293", "Token pool is not yet confirmed") - MsgHistogramStartTimeParam = ffm("FF10294", "Start time of the data to be fetched") - MsgHistogramEndTimeParam = ffm("FF10295", "End time of the data to be fetched") - MsgHistogramBucketsParam = ffm("FF10296", "Number of buckets between start time and end time") - MsgHistogramCollectionParam = ffm("FF10297", "Collection to fetch") - MsgInvalidNumberOfIntervals = ffm("FF10298", "Number of time intervals must be between %d and %d", 400) - MsgInvalidChartNumberParam = ffm("FF10299", "Invalid %s. Must be a number.", 400) - MsgHistogramInvalidTimes = ffm("FF10300", "Start time must be before end time", 400) - MsgUnsupportedCollection = ffm("FF10301", "%s collection is not supported", 400) + MsgConfigFailed = ffm("FF10101", "Failed to read config") + MsgTBD = ffm("FF10102", "TODO: Description") + MsgJSONDecodeFailed = ffm("FF10103", "Failed to decode input JSON") + MsgAPIServerStartFailed = ffm("FF10104", "Unable to start listener on %s: %s") + MsgTLSConfigFailed = ffm("FF10105", "Failed to initialize TLS configuration") + MsgInvalidCAFile = ffm("FF10106", "Invalid CA certificates file") + MsgResponseMarshalError = ffm("FF10107", "Failed to serialize response data", 400) + MsgWebsocketClientError = ffm("FF10108", "Error received from WebSocket client: %s") + Msg404NotFound = ffm("FF10109", "Not found", 404) + MsgUnknownBlockchainPlugin = ffm("FF10110", "Unknown blockchain plugin: %s") + MsgEthconnectRESTErr = ffm("FF10111", "Error from ethconnect: %s") + MsgDBInitFailed = ffm("FF10112", "Database initialization failed") + MsgDBQueryBuildFailed = ffm("FF10113", "Database query builder failed") + MsgDBBeginFailed = ffm("FF10114", "Database begin transaction failed") + MsgDBQueryFailed = ffm("FF10115", "Database query failed") + MsgDBInsertFailed = ffm("FF10116", "Database insert failed") + MsgDBUpdateFailed = ffm("FF10117", "Database update failed") + MsgDBDeleteFailed = ffm("FF10118", "Database delete failed") + MsgDBCommitFailed = ffm("FF10119", "Database commit failed") + MsgDBMissingJoin = ffm("FF10120", "Database missing expected join entry in table '%s' for id '%s'") + MsgDBReadErr = ffm("FF10121", "Database resultset read error from table '%s'") + MsgUnknownDatabasePlugin = ffm("FF10122", "Unknown database plugin '%s'") + MsgNullDataReferenceID = ffm("FF10123", "Data id is null in message data reference %d") + MsgDupDataReferenceID = ffm("FF10124", "Duplicate data ID in message '%s'") + MsgScanFailed = ffm("FF10125", "Failed to restore type '%T' into '%T'") + MsgUnregisteredBatchType = ffm("FF10126", "Unregistered batch type '%s'") + MsgBatchDispatchTimeout = ffm("FF10127", "Timed out dispatching work to batch") + MsgInitializationNilDepError = ffm("FF10128", "Initialization error due to unmet dependency") + MsgNilResponseNon204 = ffm("FF10129", "No output from API call") + MsgInvalidContentType = ffm("FF10130", "Invalid content type", 415) + MsgInvalidName = ffm("FF10131", "Field '%s' must be 1-64 characters, including alphanumerics (a-zA-Z0-9), dot (.), dash (-) and underscore (_), and must start/end in an alphanumeric", 400) + MsgUnknownFieldValue = ffm("FF10132", "Unknown %s '%v'", 400) + MsgDataNotFound = ffm("FF10133", "Data not found for message %s", 400) + MsgUnknownPublicStoragePlugin = ffm("FF10134", "Unknown Public Storage plugin '%s'") + MsgIPFSHashDecodeFailed = ffm("FF10135", "Failed to decode IPFS hash into 32byte value '%s'") + MsgIPFSRESTErr = ffm("FF10136", "Error from IPFS: %s") + MsgSerializationFailed = ffm("FF10137", "Serialization failed") + MsgMissingPluginConfig = ffm("FF10138", "Missing configuration '%s' for %s") + MsgMissingDataHashIndex = ffm("FF10139", "Missing data hash for index '%d' in message", 400) + MsgMissingRequiredField = ffm("FF10140", "Field '%s' is required", 400) + MsgInvalidEthAddress = ffm("FF10141", "Supplied ethereum address is invalid", 400) + MsgInvalidUUID = ffm("FF10142", "Invalid UUID supplied", 400) + Msg404NoResult = ffm("FF10143", "No result found", 404) + MsgNilDataReferenceSealFail = ffm("FF10144", "Invalid message: nil data reference at index %d", 400) + MsgDupDataReferenceSealFail = ffm("FF10145", "Invalid message: duplicate data reference at index %d", 400) + MsgVerifyFailedInvalidHashes = ffm("FF10146", "Invalid message: hashes do not match Hash=%s Expected=%s DataHash=%s DataHashExpected=%s", 400) + MsgVerifyFailedNilHashes = ffm("FF10147", "Invalid message: nil hashes", 400) + MsgInvalidFilterField = ffm("FF10148", "Unknown filter '%s'", 400) + MsgInvalidValueForFilterField = ffm("FF10149", "Unable to parse value for filter '%s'", 400) + MsgUnsupportedSQLOpInFilter = ffm("FF10150", "No SQL mapping implemented for filter operator '%s'", 400) + MsgJSONObjectParseFailed = ffm("FF10151", "Failed to parse '%s' as JSON") + MsgFilterParamDesc = ffm("FF10152", "Data filter field. Prefixes supported: > >= < <= @ ^ ! !@ !^") + MsgSuccessResponse = ffm("FF10153", "Success") + MsgFilterSortDesc = ffm("FF10154", "Sort field. For multi-field sort use comma separated values (or multiple query values) with '-' prefix for descending") + MsgFilterDescendingDesc = ffm("FF10155", "Descending sort order (overrides all fields in a multi-field sort)") + MsgFilterSkipDesc = ffm("FF10156", "The number of records to skip (max: %d). Unsuitable for bulk operations") + MsgFilterLimitDesc = ffm("FF10157", "The maximum number of records to return (max: %d)") + MsgContextCanceled = ffm("FF10158", "Context cancelled") + MsgWSSendTimedOut = ffm("FF10159", "Websocket send timed out") + MsgWSClosing = ffm("FF10160", "Websocket closing") + MsgWSConnectFailed = ffm("FF10161", "Websocket connect failed") + MsgInvalidURL = ffm("FF10162", "Invalid URL: '%s'") + MsgDBMigrationFailed = ffm("FF10163", "Database migration failed") + MsgHashMismatch = ffm("FF10164", "Hash mismatch") + MsgTimeParseFail = ffm("FF10165", "Cannot parse time as RFC3339, Unix, or UnixNano: '%s'", 400) + MsgDefaultNamespaceNotFound = ffm("FF10166", "namespaces.default '%s' must be included in the namespaces.predefined configuration") + MsgDurationParseFail = ffm("FF10167", "Unable to parse '%s' as duration string, or millisecond number", 400) + MsgEventTypesParseFail = ffm("FF10168", "Unable to parse list of event types", 400) + MsgUnknownEventType = ffm("FF10169", "Unknown event type '%s'", 400) + MsgIDMismatch = ffm("FF10170", "ID mismatch") + MsgRegexpCompileFailed = ffm("FF10171", "Unable to compile '%s' regexp '%s'") + MsgUnknownEventTransportPlugin = ffm("FF10172", "Unknown event transport plugin: %s") + MsgWSConnectionNotActive = ffm("FF10173", "Websocket connection '%s' no longer active") + MsgWSSubAlreadyInFlight = ffm("FF10174", "Websocket subscription '%s' already has a message in flight") + MsgWSMsgSubNotMatched = ffm("FF10175", "Acknowledgment does not match an inflight event + subscription") + MsgWSClientSentInvalidData = ffm("FF10176", "Invalid data") + MsgWSClientUnknownAction = ffm("FF10177", "Unknown action '%s'") + MsgWSInvalidStartAction = ffm("FF10178", "A start action must set namespace and either a name or ephemeral=true") + MsgWSAutoAckChanged = ffm("FF10179", "The autoack option must be set consistently on all start requests") + MsgWSAutoAckEnabled = ffm("FF10180", "The autoack option is enabled on this connection") + MsgConnSubscriptionNotStarted = ffm("FF10181", "Subscription %v is not started on connection") + MsgDispatcherClosing = ffm("FF10182", "Event dispatcher closing") + MsgMaxFilterSkip = ffm("FF10183", "You have reached the maximum pagination limit for this query (%d)") + MsgMaxFilterLimit = ffm("FF10184", "Your query exceeds the maximum filter limit (%d)") + MsgAPIServerStaticFail = ffm("FF10185", "An error occurred loading static content", 500) + MsgEventListenerClosing = ffm("FF10186", "Event listener closing") + MsgNamespaceNotExist = ffm("FF10187", "Namespace does not exist") + MsgFieldTooLong = ffm("FF10188", "Field '%s' maximum length is %d", 400) + MsgInvalidSubscription = ffm("FF10189", "Invalid subscription", 400) + MsgMismatchedTransport = ffm("FF10190", "Connection ID '%s' appears not to be unique between transport '%s' and '%s'", 400) + MsgInvalidFirstEvent = ffm("FF10191", "Invalid firstEvent definition - must be 'newest','oldest' or a sequence number", 400) + MsgNumberMustBeGreaterEqual = ffm("FF10192", "Number must be greater than or equal to %d", 400) + MsgAlreadyExists = ffm("FF10193", "A %s with name '%s:%s' already exists", 409) + MsgJSONValidatorBadRef = ffm("FF10194", "Cannot use JSON validator for data with type '%s' and validator reference '%v'", 400) + MsgDatatypeNotFound = ffm("FF10195", "Datatype '%v' not found", 400) + MsgSchemaLoadFailed = ffm("FF10196", "Datatype '%s' schema invalid", 400) + MsgDataCannotBeValidated = ffm("FF10197", "Data cannot be validated", 400) + MsgJSONDataInvalidPerSchema = ffm("FF10198", "Data does not conform to the JSON schema of datatype '%s': %s", 400) + MsgDataValueIsNull = ffm("FF10199", "Data value is null", 400) + MsgUnknownValidatorType = ffm("FF10200", "Unknown validator type: '%s'", 400) + MsgDataInvalidHash = ffm("FF10201", "Invalid data: hashes do not match Hash=%s Expected=%s", 400) + MsgSystemNSDescription = ffm("FF10202", "FireFly system namespace") + MsgNilID = ffm("FF10203", "ID is nil") + MsgDataReferenceUnresolvable = ffm("FF10204", "Data reference %d cannot be resolved", 400) + MsgDataMissing = ffm("FF10205", "Data entry %d has neither 'id' to refer to existing data, or 'value' to include in-line JSON data", 400) + MsgAuthorInvalid = ffm("FF10206", "Invalid author specified", 400) + MsgNoTransaction = ffm("FF10207", "Message does not have a transaction", 404) + MsgBatchNotSet = ffm("FF10208", "Message does not have an assigned batch", 404) + MsgBatchNotFound = ffm("FF10209", "Batch '%s' not found for message", 500) + MsgBatchTXNotSet = ffm("FF10210", "Batch '%s' does not have an assigned transaction", 404) + MsgOwnerMissing = ffm("FF10211", "Owner missing", 400) + MsgUnknownIdentityPlugin = ffm("FF10212", "Unknown Identity plugin '%s'") + MsgUnknownDataExchangePlugin = ffm("FF10213", "Unknown Data Exchange plugin '%s'") + MsgParentIdentityNotFound = ffm("FF10214", "Organization with identity '%s' not found in identity chain for %s '%s'") + MsgInvalidSigningIdentity = ffm("FF10215", "Invalid signing identity") + MsgNodeAndOrgIDMustBeSet = ffm("FF10216", "node.name, org.name and org.identity must be configured first", 409) + MsgBlobStreamingFailed = ffm("FF10217", "Blob streaming terminated with error", 500) + MsgMultiPartFormReadError = ffm("FF10218", "Error reading multi-part form input", 400) + MsgGroupMustHaveMembers = ffm("FF10219", "Group must have at least one member", 400) + MsgEmptyMemberIdentity = ffm("FF10220", "Identity is blank in member %d") + MsgEmptyMemberNode = ffm("FF10221", "Node is blank in member %d") + MsgDuplicateMember = ffm("FF10222", "Member %d is a duplicate org+node combination") + MsgOrgNotFound = ffm("FF10223", "Org with name or identity '%s' not found", 400) + MsgNodeNotFound = ffm("FF10224", "Node with name or identity '%s' not found", 400) + MsgLocalNodeResolveFailed = ffm("FF10225", "Unable to find local node to add to group. Check the status API to confirm the node is registered", 500) + MsgGroupNotFound = ffm("FF10226", "Group '%s' not found", 404) + MsgTooManyItems = ffm("FF10227", "Maximum number of %s items is %d (supplied=%d)", 400) + MsgDuplicateArrayEntry = ffm("FF10228", "Duplicate %s at index %d: '%s'", 400) + MsgDXRESTErr = ffm("FF10229", "Error from data exchange: %s") + MsgGroupInvalidHash = ffm("FF10230", "Invalid group: hashes do not match Hash=%s Expected=%s", 400) + MsgInvalidHex = ffm("FF10231", "Invalid hex supplied", 400) + MsgInvalidWrongLenB32 = ffm("FF10232", "Byte length must be 32 (64 hex characters)", 400) + MsgNodeNotFoundInOrg = ffm("FF10233", "Unable to find any nodes owned by org '%s', or parent orgs", 400) + MsgFilterAscendingDesc = ffm("FF10234", "Ascending sort order (overrides all fields in a multi-field sort)") + MsgPreInitCheckFailed = ffm("FF10235", "Pre-initialization has not yet been completed. Add config records with the admin API complete initialization and reset the node") + MsgFieldsAfterFile = ffm("FF10236", "Additional form field sent after file in multi-part form (ignored): '%s'", 400) + MsgDXBadResponse = ffm("FF10237", "Unexpected '%s' in data exchange response: %s") + MsgDXBadHash = ffm("FF10238", "Unexpected hash returned from data exchange upload. Hash=%s Expected=%s") + MsgBlobNotFound = ffm("FF10239", "No blob has been uploaded or confirmed received, with hash=%s", 404) + MsgDownloadBlobFailed = ffm("FF10240", "Error download blob with reference '%s' from local data exchange") + MsgDataDoesNotHaveBlob = ffm("FF10241", "Data does not have a blob attachment", 404) + MsgWebhookURLEmpty = ffm("FF10242", "Webhook subscription option 'url' cannot be empty", 400) + MsgWebhookInvalidStringMap = ffm("FF10243", "Webhook subscription option '%s' must be map of string values. %s=%T", 400) + MsgWebsocketsNoData = ffm("FF10244", "Websockets subscriptions do not support streaming the full data payload, just the references (withData must be false)", 400) + MsgWebhooksWithData = ffm("FF10245", "Webhook subscriptions require the full data payload (withData must be true)", 400) + MsgWebhooksOptURL = ffm("FF10246", "Webhook url to invoke. Can be relative if a base URL is set in the webhook plugin config") + MsgWebhooksOptMethod = ffm("FF10247", "Webhook method to invoke. Default=POST") + MsgWebhooksOptJSON = ffm("FF10248", "Whether to assume the response body is JSON, regardless of the returned Content-Type") + MsgWebhooksOptReply = ffm("FF10249", "Whether to automatically send a reply event, using the body returned by the webhook") + MsgWebhooksOptHeaders = ffm("FF10250", "Static headers to set on the webhook request") + MsgWebhooksOptQuery = ffm("FF10251", "Static query params to set on the webhook request") + MsgWebhooksOptInput = ffm("FF10252", "A set of options to extract data from the first JSON input data in the incoming message. Only applies if withData=true") + MsgWebhooksOptInputQuery = ffm("FF10253", "A top-level property of the first data input, to use for query parameters") + MsgWebhooksOptInputHeaders = ffm("FF10254", "A top-level property of the first data input, to use for headers") + MsgWebhooksOptInputBody = ffm("FF10255", "A top-level property of the first data input, to use for the request body. Default is the whole first body") + MsgWebhooksOptFastAck = ffm("FF10256", "When true the event will be acknowledged before the webhook is invoked, allowing parallel invocations") + MsgWebhooksReplyBadJSON = ffm("FF10257", "Failed to process reply from webhook as JSON") + MsgWebhooksOptReplyTag = ffm("FF10258", "The tag to set on the reply message") + MsgWebhooksOptReplyTx = ffm("FF10259", "The transaction type to set on the reply message") + MsgRequestTimeout = ffm("FF10260", "The request with id '%s' timed out after %.2fms", 408) + MsgRequestReplyTagRequired = ffm("FF10261", "For request messages 'header.tag' must be set on the request message to route it to a suitable responder", 400) + MsgRequestCannotHaveCID = ffm("FF10262", "For request messages 'header.cid' must be unset", 400) + MsgRequestTimeoutDesc = ffm("FF10263", "Server-side request timeout (millseconds, or set a custom suffix like 10s)") + MsgWebhooksOptInputPath = ffm("FF10264", "A top-level property of the first data input, to use for a path to append with escaping to the webhook path") + MsgWebhooksOptInputReplyTx = ffm("FF10265", "A top-level property of the first data input, to use to dynamically set whether to pin the response (so the requester can choose)") + MsgSystemTransportInternal = ffm("FF10266", "You cannot create subscriptions on the system events transport") + MsgFilterCountNotSupported = ffm("FF10267", "This query does not support generating a count of all results") + MsgFilterCountDesc = ffm("FF10268", "Return a total count as well as items (adds extra database processing)") + MsgRejected = ffm("FF10269", "Message with ID '%s' was rejected. Please check the FireFly logs for more information") + MsgConfirmQueryParam = ffm("FF10270", "When true the HTTP request blocks until the message is confirmed") + MsgRequestMustBePrivate = ffm("FF10271", "For request messages you must specify a group of private recipients", 400) + MsgUnknownTokensPlugin = ffm("FF10272", "Unknown tokens plugin '%s'", 400) + MsgMissingTokensPluginConfig = ffm("FF10273", "Invalid tokens configuration - name and connector are required", 400) + MsgTokensRESTErr = ffm("FF10274", "Error from tokens service: %s") + MsgTokenPoolDuplicate = ffm("FF10275", "Duplicate token pool") + MsgTokenPoolRejected = ffm("FF10276", "Token pool with ID '%s' was rejected. Please check the FireFly logs for more information") + MsgAuthorNotFoundByDID = ffm("FF10277", "Author could not be resolved via DID '%s'") + MsgAuthorOrgNotFoundByName = ffm("FF10278", "Author organization could not be resolved via name '%s'") + MsgAuthorOrgSigningKeyMismatch = ffm("FF10279", "Author organization '%s' is not associated with signing key '%s'") + MsgCannotTransferToSelf = ffm("FF10280", "From and to addresses must be different", 400) + MsgLocalOrgLookupFailed = ffm("FF10281", "Unable resolve the local org by the configured signing key on the node. Please confirm the org is registered with key '%s'", 500) + MsgBigIntTooLarge = ffm("FF10282", "Byte length of serialized integer is too large %d (max=%d)") + MsgBigIntParseFailed = ffm("FF10283", "Failed to parse JSON value '%s' into BigInt") + MsgFabconnectRESTErr = ffm("FF10284", "Error from fabconnect: %s") + MsgInvalidIdentity = ffm("FF10285", "Supplied Fabric signer identity is invalid", 400) + MsgFailedToDecodeCertificate = ffm("FF10286", "Failed to decode certificate: %s", 500) + MsgInvalidMessageType = ffm("FF10287", "Invalid message type - allowed types are %s", 400) + MsgNoUUID = ffm("FF10288", "Field '%s' must not be a UUID", 400) + MsgFetchDataDesc = ffm("FF10289", "Fetch the data and include it in the messages returned", 400) + MsgWSClosed = ffm("FF10290", "Websocket closed") + MsgTokenTransferFailed = ffm("FF10291", "Token transfer with ID '%s' failed. Please check the FireFly logs for more information") + MsgFieldNotSpecified = ffm("FF10292", "Field '%s' must be specified", 400) + MsgTokenPoolNotConfirmed = ffm("FF10293", "Token pool is not yet confirmed") + MsgHistogramStartTimeParam = ffm("FF10294", "Start time of the data to be fetched") + MsgHistogramEndTimeParam = ffm("FF10295", "End time of the data to be fetched") + MsgHistogramBucketsParam = ffm("FF10296", "Number of buckets between start time and end time") + MsgHistogramCollectionParam = ffm("FF10297", "Collection to fetch") + MsgInvalidNumberOfIntervals = ffm("FF10298", "Number of time intervals must be between %d and %d", 400) + MsgInvalidChartNumberParam = ffm("FF10299", "Invalid %s. Must be a number.", 400) + MsgHistogramInvalidTimes = ffm("FF10300", "Start time must be before end time", 400) + MsgUnsupportedCollection = ffm("FF10301", "%s collection is not supported", 400) + MsgContractInterfaceExists = ffm("FF10302", "A contract interface already exists in the namespace: '%s' with name: '%s' and version: '%s'", 409) + MsgContractInterfaceNotFound = ffm("FF10303", "Contract interface %s not found", 404) + MsgContractMissingInputArgument = ffm("FF10304", "Missing required input argument '%s'", 400) + MsgContractWrongInputType = ffm("FF10305", "Input '%v' is of type '%v' not expected type of '%v'", 400) + MsgContractMissingInputField = ffm("FF10306", "Expected object of type '%v' to contain field named '%v' but it was missing", 400) + MsgContractMapInputType = ffm("FF10307", "Unable to map input type '%v' to known FireFly type - was expecting '%v'", 400) + MsgContractByteDecode = ffm("FF10308", "Unable to decode field '%v' as bytes", 400) + MsgContractInternalType = ffm("FF10309", "Input '%v' of type '%v' is not compatible blockchain internalType of '%v'", 400) + MsgContractLocationInvalid = ffm("FF10310", "Failed to validate contract location: %v", 400) + MsgContractParamInvalid = ffm("FF10311", "Failed to validate contract param: %v", 400) + MsgContractSubscriptionExists = ffm("FF10312", "A contract subscription already exists in the namespace: '%s' with name: '%s'", 409) + MsgContractMethodNotSet = ffm("FF10313", "Method not specified on invoke contract request", 400) + MsgContractNoMethodSignature = ffm("FF10314", "Method signature is required if interfaceID is absent", 400) + MsgContractMethodResolveError = ffm("FF10315", "Unable to resolve contract method: %s", 400) + MsgContractLocationExists = ffm("FF10316", "The contract location cannot be changed after it is created", 400) + MsgSubscriptionNoEvent = ffm("FF10317", "An eventId or in-line event definition must be supplied when subscribing", 400) + MsgSubscriptionEventNotFound = ffm("FF10318", "No event was found in namespace '%s' with id '%s'", 400) + MsgEventNameMustBeSet = ffm("FF10319", "Event name must be set", 400) + MsgMethodNameMustBeSet = ffm("FF10320", "Method name must be set", 400) + MsgContractEventResolveError = ffm("FF10321", "Unable to resolve contract event", 400) + MsgQueryOpUnsupportedMod = ffm("FF10322", "Operation '%s' on '%s' does not support modifiers", 400) + MsgDXBadSize = ffm("FF10323", "Unexpected size returned from data exchange upload. Size=%d Expected=%d") + MsgBlobMismatchSealingData = ffm("FF10324", "Blob mismatch when sealing data") + MsgFieldTypeNoStringMatching = ffm("FF10325", "Field '%s' of type '%s' does not support partial or case-insensitive string matching", 400) + MsgFieldMatchNoNull = ffm("FF10326", "Comparison operator for field '%s' cannot accept a null value", 400) + MsgTooLargeBroadcast = ffm("FF10327", "Message size %.2fkb is too large for the max broadcast batch size of %.2fkb", 400) + MsgTooLargePrivate = ffm("FF10328", "Message size %.2fkb is too large for the max private message size of %.2fkb", 400) + MsgManifestMismatch = ffm("FF10329", "Manifest mismatch overriding '%s' status as failure: '%s'", 400) + MsgWSHeartbeatTimeout = ffm("FF10330", "Websocket heartbeat timed out after %.2fms", 500) ) diff --git a/pkg/wsclient/wsclient.go b/pkg/wsclient/wsclient.go index 7d30c68be6..677f649997 100644 --- a/pkg/wsclient/wsclient.go +++ b/pkg/wsclient/wsclient.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -23,6 +23,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sync" "time" "github.com/gorilla/websocket" @@ -43,6 +44,7 @@ type WSConfig struct { AuthUsername string `json:"authUsername,omitempty"` AuthPassword string `json:"authPassword,omitempty"` HTTPHeaders fftypes.JSONObject `json:"headers,omitempty"` + HeartbeatInterval time.Duration `json:"heartbeatInterval,omitempty"` } type WSClient interface { @@ -68,6 +70,10 @@ type wsClient struct { sendDone chan []byte closing chan struct{} afterConnect WSPostConnectHandler + heartbeatInterval time.Duration + heartbeatMux sync.Mutex + activePingSent *time.Time + lastPingCompleted time.Time } // WSPostConnectHandler will be called after every connect/reconnect. Can send data over ws, but must not block listening for data on the ws. @@ -97,6 +103,7 @@ func New(ctx context.Context, config *WSConfig, afterConnect WSPostConnectHandle send: make(chan []byte), closing: make(chan struct{}), afterConnect: afterConnect, + heartbeatInterval: config.HeartbeatInterval, } for k, v := range config.HTTPHeaders { if vs, ok := v.(string); ok { @@ -159,6 +166,21 @@ func (w *wsClient) Send(ctx context.Context, message []byte) error { } } +func (w *wsClient) heartbeatTimeout(ctx context.Context) (context.Context, context.CancelFunc) { + if w.heartbeatInterval > 0 { + w.heartbeatMux.Lock() + baseTime := w.lastPingCompleted + if w.activePingSent != nil { + // We're waiting for a pong + baseTime = *w.activePingSent + } + waitTime := w.heartbeatInterval - time.Since(baseTime) // if negative, will pop immediately + w.heartbeatMux.Unlock() + return context.WithTimeout(ctx, waitTime) + } + return context.WithCancel(ctx) +} + func buildWSUrl(ctx context.Context, config *WSConfig) (string, error) { u, err := url.Parse(config.HTTPURL) if err != nil { @@ -195,6 +217,8 @@ func (w *wsClient) connect(initial bool) error { l.Warnf("WS %s connect attempt %d failed [%d]: %s", w.url, attempt, status, string(b)) return !initial || attempt > w.initialRetryAttempts, i18n.WrapError(w.ctx, err, i18n.MsgWSConnectFailed) } + w.pongReceivedOrReset(false) + w.wsconn.SetPongHandler(w.pongHandler) l.Infof("WS %s connected", w.url) return false, nil }) @@ -226,22 +250,64 @@ func (w *wsClient) readLoop() { } } +func (w *wsClient) pongHandler(appData string) error { + w.pongReceivedOrReset(true) + return nil +} + +func (w *wsClient) pongReceivedOrReset(isPong bool) { + w.heartbeatMux.Lock() + defer w.heartbeatMux.Unlock() + + if isPong && w.activePingSent != nil { + log.L(w.ctx).Debugf("WS %s heartbeat completed (pong) after %.2fms", w.url, float64(time.Since(*w.activePingSent))/float64(time.Millisecond)) + } + w.lastPingCompleted = time.Now() // in new connection case we still want to consider now the time we completed the ping + w.activePingSent = nil +} + +func (w *wsClient) heartbeatCheck() error { + w.heartbeatMux.Lock() + defer w.heartbeatMux.Unlock() + + if w.activePingSent != nil { + return i18n.NewError(w.ctx, i18n.MsgWSHeartbeatTimeout, float64(time.Since(*w.activePingSent))/float64(time.Millisecond)) + } + log.L(w.ctx).Debugf("WS %s heartbeat timer popped (ping) after %.2fms", w.url, float64(time.Since(w.lastPingCompleted))/float64(time.Millisecond)) + now := time.Now() + w.activePingSent = &now + return nil +} + func (w *wsClient) sendLoop(receiverDone chan struct{}) { l := log.L(w.ctx) defer close(w.sendDone) - for { + disconnecting := false + for !disconnecting { + timeoutContext, timeoutCancel := w.heartbeatTimeout(w.ctx) + select { case message := <-w.send: l.Tracef("WS sending: %s", message) if err := w.wsconn.WriteMessage(websocket.TextMessage, message); err != nil { l.Errorf("WS %s send failed: %s", w.url, err) - return + disconnecting = true + } + case <-timeoutContext.Done(): + if err := w.heartbeatCheck(); err != nil { + l.Errorf("WS %s closing: %s", w.url, err) + disconnecting = true + } else if err := w.wsconn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + l.Errorf("WS %s heartbeat send failed: %s", w.url, err) + disconnecting = true } case <-receiverDone: l.Debugf("WS %s send loop exiting", w.url) - return + disconnecting = true } + + timeoutCancel() } } diff --git a/pkg/wsclient/wsclient_test.go b/pkg/wsclient/wsclient_test.go index 1fe45688a9..84eff63684 100644 --- a/pkg/wsclient/wsclient_test.go +++ b/pkg/wsclient/wsclient_test.go @@ -22,6 +22,7 @@ import ( "net/http" "net/http/httptest" "testing" + "time" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" @@ -47,13 +48,14 @@ func TestWSClientE2E(t *testing.T) { wsConfig.HTTPURL = url wsConfig.WSKeyPath = "/test" + wsConfig.HeartbeatInterval = 50 * time.Millisecond - wsClient, err := New(context.Background(), wsConfig, afterConnect) + wsc, err := New(context.Background(), wsConfig, afterConnect) assert.NoError(t, err) // Change the settings and connect - wsClient.SetURL(wsClient.URL() + "/updated") - err = wsClient.Connect() + wsc.SetURL(wsc.URL() + "/updated") + err = wsc.Connect() assert.NoError(t, err) // Receive the message automatically sent in afterConnect @@ -62,19 +64,25 @@ func TestWSClientE2E(t *testing.T) { // Tell the unit test server to send us a reply, and confirm it fromServer <- `some data from server` - reply := <-wsClient.Receive() + reply := <-wsc.Receive() assert.Equal(t, `some data from server`, string(reply)) // Send some data back - err = wsClient.Send(context.Background(), []byte(`some data to server`)) + err = wsc.Send(context.Background(), []byte(`some data to server`)) assert.NoError(t, err) // Check the sevrer got it message2 := <-toServer assert.Equal(t, `some data to server`, message2) + // Check heartbeating works + beforePing := time.Now() + for wsc.(*wsClient).lastPingCompleted.Before(beforePing) { + time.Sleep(10 * time.Millisecond) + } + // Close the client - wsClient.Close() + wsc.Close() } @@ -276,3 +284,42 @@ func TestWSSendInstructClose(t *testing.T) { w.sendLoop(receiverClosed) <-w.sendDone } + +func TestHeartbeatTimedout(t *testing.T) { + + now := time.Now() + w := &wsClient{ + ctx: context.Background(), + sendDone: make(chan []byte), + heartbeatInterval: 1 * time.Microsecond, + activePingSent: &now, + } + + w.sendLoop(make(chan struct{})) + +} + +func TestHeartbeatSendFailed(t *testing.T) { + + _, _, url, close := NewTestWSServer(func(req *http.Request) {}) + defer close() + + wsc, err := New(context.Background(), &WSConfig{HTTPURL: url}, func(ctx context.Context, w WSClient) error { return nil }) + assert.NoError(t, err) + defer wsc.Close() + + err = wsc.Connect() + assert.NoError(t, err) + + // Close and use the underlying wsconn to drive a failure to send a heartbeat + wsc.(*wsClient).wsconn.Close() + w := &wsClient{ + ctx: context.Background(), + sendDone: make(chan []byte), + heartbeatInterval: 1 * time.Microsecond, + wsconn: wsc.(*wsClient).wsconn, + } + + w.sendLoop(make(chan struct{})) + +} diff --git a/pkg/wsclient/wstestserver.go b/pkg/wsclient/wstestserver.go index dc0fef7676..a487c03d9f 100644 --- a/pkg/wsclient/wstestserver.go +++ b/pkg/wsclient/wstestserver.go @@ -1,4 +1,4 @@ -// Copyright © 2021 Kaleido, Inc. +// Copyright © 2022 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 //