diff --git a/config/config_defaults.go b/config/config_defaults.go index 2026535c74..21fa63d5a2 100644 --- a/config/config_defaults.go +++ b/config/config_defaults.go @@ -370,6 +370,12 @@ const CGRATES_CFG_JSON = ` "concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited "source_path": "/var/spool/cgrates/ers/in", // read data from this path "processed_path": "/var/spool/cgrates/ers/out", // move processed data here + "tenant": "", // tenant used by import + "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + "reconnects": -1, // number of retries in case of connection lost + "max_reconnect_interval": "5m", // time to wait in between reconnect attempts "opts": { // Partial // "partialPath": "/", // the path were the partial events will be sent @@ -385,27 +391,26 @@ const CGRATES_CFG_JSON = ` // FileXML // "xmlRootPath": "", // path towards one event in case of XML CDRs - + // AMQP and AMQPv1 - // "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP and AMQPv1 readers from were the events are read - // "amqpQueueIDProcessed": "", // the queue id for AMQP and AMQPv1 readers were the events are sent after they are processed - - // "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name - // "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys - - // "amqpUsernameProcessed": "", - // "amqpPasswordProcessed": "", - - // "amqpConsumerTag": "cgrates", // the ID of the consumer, amqp 0.9.1 exclusive - // "amqpExchange": "", // amqp 0.9.1 exclusive - // "amqpExchangeType": "", // amqp 0.9.1 exclusive - // "amqpRoutingKey": "", // amqp 0.9.1 exclusive + // "amqpQueueID": "cgrates_cdrs", // identifier for the primary queue where messages are consumed (0.9.1/1.0) + // "amqpQueueIDProcessed": "", // identifier for the queue where processed events are sent (0.9.1/1.0) + + // "amqpUsername": "", // username for SASL PLAIN auth, exclusive to AMQP 1.0, often representing the policy name + // "amqpPassword": "", // password for authentication, exclusive to AMQP 1.0 - // "amqpExchangeProcessed": "", - // "amqpExchangeTypeProcessed": "", - // "amqpRoutingKeyProcessed": "", + // "amqpUsernameProcessed": "", // username for authentication related to processed messages queue + // "amqpPasswordProcessed": "", // password for authentication related to processed messages queue + // "amqpConsumerTag": "cgrates", // unique tag for the consumer, useful for message tracking and consumer management (0.9.1) + // "amqpExchange": "", // name of the primary exchange where messages will be published (0.9.1) + // "amqpExchangeType": "", // type of the primary exchange (direct, topic, fanout, headers) (0.9.1) + // "amqpRoutingKey": "", // key used for routing messages to the primary queue (0.9.1) + // "amqpExchangeProcessed": "", // name of the exchange where processed messages will be published + // "amqpExchangeTypeProcessed": "", // type of the exchange for processed messages + // "amqpRoutingKeyProcessed": "", // key used for routing processed messages + // Kafka // "kafkaTopic": "cgrates", // the topic from were the events are read // "kafkaGroupID": "cgrates", // the group that reads the events @@ -421,13 +426,13 @@ const CGRATES_CFG_JSON = ` // "sqlDBNameProcessed": "", // the name of the database were the events are sent after they are processed // "sqlTableNameProcessed": "", // the name of the table were the events are sent after they are processed // "pgSSLModeProcessed": "", // the ssl mode for postgres db - + // SQS and S3 // "awsRegion": "", // "awsKey": "", // "awsSecret": "", // "awsToken": "", - + // "awsRegionProcessed": "", // "awsKeyProcessed": "", // "awsSecretProcessed": "", @@ -435,9 +440,9 @@ const CGRATES_CFG_JSON = ` // SQS // "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read - + // "sqsQueueIDProcessed": "", // the queue id for SQS readers were the events are sent after they are processed - + // S3 // "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read // "s3FolderPathProcessed": "", // only for S3 event posting @@ -466,10 +471,6 @@ const CGRATES_CFG_JSON = ` // "natsClientKeyProcessed": "", // the path to a client key( used by tls) // "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response }, - "tenant": "", // tenant used by import - "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - "filters": [], // limit parsing based on the filters - "flags": [], // flags to influence the event processing "fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value {"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true}, {"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true}, @@ -488,8 +489,7 @@ const CGRATES_CFG_JSON = ` }, ], }, - - + "ees": { // EventExporterService "enabled": false, // starts the EventReader service: "attributes_conns":[], // RPC Connections IDs @@ -498,10 +498,18 @@ const CGRATES_CFG_JSON = ` }, "exporters": [ { - "id": "*default", // identifier of the EventReader profile - "type": "*none", // exporter type - "export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed - "concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited + "id": "*default", // identifier of the EventReader profile + "type": "*none", // exporter type + "export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed + "failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests + "concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited + "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> + "filters": [], // limit parsing based on the filters + "flags": [], // flags to influence the event processing + "attribute_ids": [], // select Attribute profiles instead of discovering them + "attribute_context": "", // context used to discover matching Attribute profiles + "synchronous": false, // block processing until export has a result + "attempts": 1, // export attempts "opts": { // CSV @@ -509,7 +517,7 @@ const CGRATES_CFG_JSON = ` // Elasticsearch options - // "elsCloud":true, //ExportPath will be an CLoud ID deployment + // "elsCloud":true, // ExportPath will be an CLoud ID deployment // "elsApiKey": "", // Base64-encoded token for authorization; if set, overrides username/password and service token. // "elsUsername":"", // Username for HTTP Basic Authentication. // "elsPassword":"", // Password for HTTP Basic Authentication. @@ -598,17 +606,9 @@ const CGRATES_CFG_JSON = ` // "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value // "rpcAPIOpts": {}, }, // extra options for exporter - "timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB> - "filters": [], // limit parsing based on the filters - "flags": [], // flags to influence the event processing <*attributes|*log> - "attribute_ids": [], // select Attribute profiles instead of discovering them - "attribute_context": "", // context used to discover matching Attribute profiles - "synchronous": false, // block processing until export has a result - "attempts": 1, // export attempts - "fields":[], // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value - "failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests - }, - ], + "fields":[] // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value + } + ] }, diff --git a/config/erscfg.go b/config/erscfg.go index aa09d3a31c..285b5fca2a 100644 --- a/config/erscfg.go +++ b/config/erscfg.go @@ -445,20 +445,28 @@ type EventReaderOpts struct { // EventReaderCfg the event for the Event Reader type EventReaderCfg struct { - ID string - Type string - RunDelay time.Duration - ConcurrentReqs int - SourcePath string - ProcessedPath string - Opts *EventReaderOpts - Tenant RSRParsers - Timezone string - Filters []string - Flags utils.FlagsWithParams - Fields []*FCTemplate - PartialCommitFields []*FCTemplate - CacheDumpFields []*FCTemplate + ID string + Type string + + // RunDelay determines how the Serve method initiates the reading process. + // - A value of 0 disables automatic reading, allowing manual control, possibly through an API. + // - A value of -1 enables watching directory changes indefinitely, applicable for file-based readers. + // - Any positive duration sets a fixed time interval for automatic reading cycles. + RunDelay time.Duration + + ConcurrentReqs int + SourcePath string + ProcessedPath string + Tenant RSRParsers + Timezone string + Filters []string + Flags utils.FlagsWithParams + Reconnects int + MaxReconnectInterval time.Duration + Opts *EventReaderOpts + Fields []*FCTemplate + PartialCommitFields []*FCTemplate + CacheDumpFields []*FCTemplate } func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) { @@ -492,7 +500,6 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err if jsnCfg.PartialOrderField != nil { erOpts.PartialOrderField = jsnCfg.PartialOrderField } - if jsnCfg.XMLRootPath != nil { erOpts.XMLRootPath = jsnCfg.XMLRootPath } @@ -539,6 +546,14 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat if jsnCfg.Flags != nil { er.Flags = utils.FlagsWithParamsFromSlice(*jsnCfg.Flags) } + if jsnCfg.Reconnects != nil { + er.Reconnects = *jsnCfg.Reconnects + } + if jsnCfg.Max_reconnect_interval != nil { + if er.MaxReconnectInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_reconnect_interval); err != nil { + return err + } + } if jsnCfg.Fields != nil { if er.Fields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Fields, sep); err != nil { return err @@ -885,16 +900,18 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts { // Clone returns a deep copy of EventReaderCfg func (er EventReaderCfg) Clone() (cln *EventReaderCfg) { cln = &EventReaderCfg{ - ID: er.ID, - Type: er.Type, - RunDelay: er.RunDelay, - ConcurrentReqs: er.ConcurrentReqs, - SourcePath: er.SourcePath, - ProcessedPath: er.ProcessedPath, - Tenant: er.Tenant.Clone(), - Timezone: er.Timezone, - Flags: er.Flags.Clone(), - Opts: er.Opts.Clone(), + ID: er.ID, + Type: er.Type, + RunDelay: er.RunDelay, + ConcurrentReqs: er.ConcurrentReqs, + SourcePath: er.SourcePath, + ProcessedPath: er.ProcessedPath, + Tenant: er.Tenant.Clone(), + Timezone: er.Timezone, + Flags: er.Flags.Clone(), + Reconnects: er.Reconnects, + MaxReconnectInterval: er.MaxReconnectInterval, + Opts: er.Opts.Clone(), } if er.Filters != nil { cln.Filters = make([]string, len(er.Filters)) @@ -1135,19 +1152,24 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string } } initialMP = map[string]any{ - utils.IDCfg: er.ID, - utils.TypeCfg: er.Type, - utils.ConcurrentRequestsCfg: er.ConcurrentReqs, - utils.SourcePathCfg: er.SourcePath, - utils.ProcessedPathCfg: er.ProcessedPath, - utils.TenantCfg: er.Tenant.GetRule(separator), - utils.TimezoneCfg: er.Timezone, - utils.FiltersCfg: er.Filters, - utils.FlagsCfg: []string{}, - utils.RunDelayCfg: "0", - utils.OptsCfg: opts, + utils.IDCfg: er.ID, + utils.TypeCfg: er.Type, + utils.ConcurrentRequestsCfg: er.ConcurrentReqs, + utils.SourcePathCfg: er.SourcePath, + utils.ProcessedPathCfg: er.ProcessedPath, + utils.TenantCfg: er.Tenant.GetRule(separator), + utils.TimezoneCfg: er.Timezone, + utils.FiltersCfg: er.Filters, + utils.FlagsCfg: []string{}, + utils.RunDelayCfg: "0", + utils.ReconnectsCfg: er.Reconnects, + utils.MaxReconnectIntervalCfg: "0", + utils.OptsCfg: opts, } + if er.MaxReconnectInterval != 0 { + initialMP[utils.MaxReconnectIntervalCfg] = er.MaxReconnectInterval.String() + } initialMP[utils.OptsCfg] = opts if flags := er.Flags.SliceFlags(); flags != nil { diff --git a/config/libconfig_json.go b/config/libconfig_json.go index 2d03aa3df8..14d26b3368 100644 --- a/config/libconfig_json.go +++ b/config/libconfig_json.go @@ -274,20 +274,22 @@ type EventReaderOptsJson struct { // EventReaderSJsonCfg is the configuration of a single EventReader type EventReaderJsonCfg struct { - Id *string - Type *string - Run_delay *string - Concurrent_requests *int - Source_path *string - Processed_path *string - Opts *EventReaderOptsJson - Tenant *string - Timezone *string - Filters *[]string - Flags *[]string - Fields *[]*FcTemplateJsonCfg - Partial_commit_fields *[]*FcTemplateJsonCfg - Cache_dump_fields *[]*FcTemplateJsonCfg + Id *string + Type *string + Run_delay *string + Concurrent_requests *int + Source_path *string + Processed_path *string + Tenant *string + Timezone *string + Filters *[]string + Flags *[]string + Reconnects *int + Max_reconnect_interval *string + Opts *EventReaderOptsJson + Fields *[]*FcTemplateJsonCfg + Partial_commit_fields *[]*FcTemplateJsonCfg + Cache_dump_fields *[]*FcTemplateJsonCfg } // EEsJsonCfg contains the configuration of EventExporterService