Skip to content

Commit

Permalink
Add reconnects and max_reconnect_interval config options for ers
Browse files Browse the repository at this point in the history
They are separate for each configured reader.

Additional changes:
 - rearrange config_defaults fields for ers/ees;
 - add comment for RunDelay config option inside struct definition;
 - improve comments for amqp opts in config_defaults.
  • Loading branch information
ionutboangiu authored and danbogos committed Nov 8, 2023
1 parent a30e261 commit f696164
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 93 deletions.
86 changes: 43 additions & 43 deletions config/config_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,12 @@ const CGRATES_CFG_JSON = `
"concurrent_requests": 1024, // maximum simultaneous requests/files to process, 0 for unlimited
"source_path": "/var/spool/cgrates/ers/in", // read data from this path
"processed_path": "/var/spool/cgrates/ers/out", // move processed data here
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"reconnects": -1, // number of retries in case of connection lost
"max_reconnect_interval": "5m", // time to wait in between reconnect attempts
"opts": {
// Partial
// "partialPath": "/", // the path were the partial events will be sent
Expand All @@ -385,27 +391,26 @@ const CGRATES_CFG_JSON = `
// FileXML
// "xmlRootPath": "", // path towards one event in case of XML CDRs
// AMQP and AMQPv1
// "amqpQueueID": "cgrates_cdrs", // the queue id for AMQP and AMQPv1 readers from were the events are read
// "amqpQueueIDProcessed": "", // the queue id for AMQP and AMQPv1 readers were the events are sent after they are processed
// "amqpUsername": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, usually represents the policy name
// "amqpPassword": "", // amqp 1.0 exclusive, used for SASL PLAIN auth, populated with one of its policy's keys
// "amqpUsernameProcessed": "",
// "amqpPasswordProcessed": "",
// "amqpConsumerTag": "cgrates", // the ID of the consumer, amqp 0.9.1 exclusive
// "amqpExchange": "", // amqp 0.9.1 exclusive
// "amqpExchangeType": "", // amqp 0.9.1 exclusive
// "amqpRoutingKey": "", // amqp 0.9.1 exclusive
// "amqpQueueID": "cgrates_cdrs", // identifier for the primary queue where messages are consumed (0.9.1/1.0)
// "amqpQueueIDProcessed": "", // identifier for the queue where processed events are sent (0.9.1/1.0)
// "amqpUsername": "", // username for SASL PLAIN auth, exclusive to AMQP 1.0, often representing the policy name
// "amqpPassword": "", // password for authentication, exclusive to AMQP 1.0
// "amqpExchangeProcessed": "",
// "amqpExchangeTypeProcessed": "",
// "amqpRoutingKeyProcessed": "",
// "amqpUsernameProcessed": "", // username for authentication related to processed messages queue
// "amqpPasswordProcessed": "", // password for authentication related to processed messages queue
// "amqpConsumerTag": "cgrates", // unique tag for the consumer, useful for message tracking and consumer management (0.9.1)
// "amqpExchange": "", // name of the primary exchange where messages will be published (0.9.1)
// "amqpExchangeType": "", // type of the primary exchange (direct, topic, fanout, headers) (0.9.1)
// "amqpRoutingKey": "", // key used for routing messages to the primary queue (0.9.1)
// "amqpExchangeProcessed": "", // name of the exchange where processed messages will be published
// "amqpExchangeTypeProcessed": "", // type of the exchange for processed messages
// "amqpRoutingKeyProcessed": "", // key used for routing processed messages
// Kafka
// "kafkaTopic": "cgrates", // the topic from were the events are read
// "kafkaGroupID": "cgrates", // the group that reads the events
Expand All @@ -421,23 +426,23 @@ const CGRATES_CFG_JSON = `
// "sqlDBNameProcessed": "", // the name of the database were the events are sent after they are processed
// "sqlTableNameProcessed": "", // the name of the table were the events are sent after they are processed
// "pgSSLModeProcessed": "", // the ssl mode for postgres db
// SQS and S3
// "awsRegion": "",
// "awsKey": "",
// "awsSecret": "",
// "awsToken": "",
// "awsRegionProcessed": "",
// "awsKeyProcessed": "",
// "awsSecretProcessed": "",
// "awsTokenProcessed": "",
// SQS
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read
// "sqsQueueIDProcessed": "", // the queue id for SQS readers were the events are sent after they are processed
// S3
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read
// "s3FolderPathProcessed": "", // only for S3 event posting
Expand Down Expand Up @@ -466,10 +471,6 @@ const CGRATES_CFG_JSON = `
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response
},
"tenant": "", // tenant used by import
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true},
{"tag": "OriginID", "path": "*cgreq.OriginID", "type": "*variable", "value": "~*req.3", "mandatory": true},
Expand All @@ -488,8 +489,7 @@ const CGRATES_CFG_JSON = `
},
],
},
"ees": { // EventExporterService
"enabled": false, // starts the EventReader service: <true|false>
"attributes_conns":[], // RPC Connections IDs
Expand All @@ -498,18 +498,26 @@ const CGRATES_CFG_JSON = `
},
"exporters": [
{
"id": "*default", // identifier of the EventReader profile
"type": "*none", // exporter type
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
"concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited
"id": "*default", // identifier of the EventReader profile
"type": "*none", // exporter type
"export_path": "/var/spool/cgrates/ees", // path where the exported events will be placed
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
"concurrent_requests": 0, // maximum simultaneous requests to process, 0 for unlimited
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"attribute_ids": [], // select Attribute profiles instead of discovering them
"attribute_context": "", // context used to discover matching Attribute profiles
"synchronous": false, // block processing until export has a result
"attempts": 1, // export attempts
"opts": {
// CSV
// "csvFieldSeparator": ",", // separator used when reading the fields
// Elasticsearch options
// "elsCloud":true, //ExportPath will be an CLoud ID deployment
// "elsCloud":true, // ExportPath will be an CLoud ID deployment
// "elsApiKey": "", // Base64-encoded token for authorization; if set, overrides username/password and service token.
// "elsUsername":"", // Username for HTTP Basic Authentication.
// "elsPassword":"", // Password for HTTP Basic Authentication.
Expand Down Expand Up @@ -598,17 +606,9 @@ const CGRATES_CFG_JSON = `
// "rpcReplyTimeout":"2s", // connection down at replies if taking longer that this value
// "rpcAPIOpts": {},
}, // extra options for exporter
"timezone": "", // timezone for timestamps where not specified <""|UTC|Local|$IANA_TZ_DB>
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing <*attributes|*log>
"attribute_ids": [], // select Attribute profiles instead of discovering them
"attribute_context": "", // context used to discover matching Attribute profiles
"synchronous": false, // block processing until export has a result
"attempts": 1, // export attempts
"fields":[], // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
"failed_posts_dir": "/var/spool/cgrates/failed_posts", // directory path where we store failed requests
},
],
"fields":[] // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
}
]
},
Expand Down
94 changes: 58 additions & 36 deletions config/erscfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,28 @@ type EventReaderOpts struct {

// EventReaderCfg the event for the Event Reader
type EventReaderCfg struct {
ID string
Type string
RunDelay time.Duration
ConcurrentReqs int
SourcePath string
ProcessedPath string
Opts *EventReaderOpts
Tenant RSRParsers
Timezone string
Filters []string
Flags utils.FlagsWithParams
Fields []*FCTemplate
PartialCommitFields []*FCTemplate
CacheDumpFields []*FCTemplate
ID string
Type string

// RunDelay determines how the Serve method initiates the reading process.
// - A value of 0 disables automatic reading, allowing manual control, possibly through an API.
// - A value of -1 enables watching directory changes indefinitely, applicable for file-based readers.
// - Any positive duration sets a fixed time interval for automatic reading cycles.
RunDelay time.Duration

ConcurrentReqs int
SourcePath string
ProcessedPath string
Tenant RSRParsers
Timezone string
Filters []string
Flags utils.FlagsWithParams
Reconnects int
MaxReconnectInterval time.Duration
Opts *EventReaderOpts
Fields []*FCTemplate
PartialCommitFields []*FCTemplate
CacheDumpFields []*FCTemplate
}

func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err error) {
Expand Down Expand Up @@ -492,7 +500,6 @@ func (erOpts *EventReaderOpts) loadFromJSONCfg(jsnCfg *EventReaderOptsJson) (err
if jsnCfg.PartialOrderField != nil {
erOpts.PartialOrderField = jsnCfg.PartialOrderField
}

if jsnCfg.XMLRootPath != nil {
erOpts.XMLRootPath = jsnCfg.XMLRootPath
}
Expand Down Expand Up @@ -539,6 +546,14 @@ func (er *EventReaderCfg) loadFromJSONCfg(jsnCfg *EventReaderJsonCfg, msgTemplat
if jsnCfg.Flags != nil {
er.Flags = utils.FlagsWithParamsFromSlice(*jsnCfg.Flags)
}
if jsnCfg.Reconnects != nil {
er.Reconnects = *jsnCfg.Reconnects
}
if jsnCfg.Max_reconnect_interval != nil {
if er.MaxReconnectInterval, err = utils.ParseDurationWithNanosecs(*jsnCfg.Max_reconnect_interval); err != nil {
return err
}
}
if jsnCfg.Fields != nil {
if er.Fields, err = FCTemplatesFromFCTemplatesJSONCfg(*jsnCfg.Fields, sep); err != nil {
return err
Expand Down Expand Up @@ -885,16 +900,18 @@ func (erOpts *EventReaderOpts) Clone() *EventReaderOpts {
// Clone returns a deep copy of EventReaderCfg
func (er EventReaderCfg) Clone() (cln *EventReaderCfg) {
cln = &EventReaderCfg{
ID: er.ID,
Type: er.Type,
RunDelay: er.RunDelay,
ConcurrentReqs: er.ConcurrentReqs,
SourcePath: er.SourcePath,
ProcessedPath: er.ProcessedPath,
Tenant: er.Tenant.Clone(),
Timezone: er.Timezone,
Flags: er.Flags.Clone(),
Opts: er.Opts.Clone(),
ID: er.ID,
Type: er.Type,
RunDelay: er.RunDelay,
ConcurrentReqs: er.ConcurrentReqs,
SourcePath: er.SourcePath,
ProcessedPath: er.ProcessedPath,
Tenant: er.Tenant.Clone(),
Timezone: er.Timezone,
Flags: er.Flags.Clone(),
Reconnects: er.Reconnects,
MaxReconnectInterval: er.MaxReconnectInterval,
Opts: er.Opts.Clone(),
}
if er.Filters != nil {
cln.Filters = make([]string, len(er.Filters))
Expand Down Expand Up @@ -1135,19 +1152,24 @@ func (er *EventReaderCfg) AsMapInterface(separator string) (initialMP map[string
}
}
initialMP = map[string]any{
utils.IDCfg: er.ID,
utils.TypeCfg: er.Type,
utils.ConcurrentRequestsCfg: er.ConcurrentReqs,
utils.SourcePathCfg: er.SourcePath,
utils.ProcessedPathCfg: er.ProcessedPath,
utils.TenantCfg: er.Tenant.GetRule(separator),
utils.TimezoneCfg: er.Timezone,
utils.FiltersCfg: er.Filters,
utils.FlagsCfg: []string{},
utils.RunDelayCfg: "0",
utils.OptsCfg: opts,
utils.IDCfg: er.ID,
utils.TypeCfg: er.Type,
utils.ConcurrentRequestsCfg: er.ConcurrentReqs,
utils.SourcePathCfg: er.SourcePath,
utils.ProcessedPathCfg: er.ProcessedPath,
utils.TenantCfg: er.Tenant.GetRule(separator),
utils.TimezoneCfg: er.Timezone,
utils.FiltersCfg: er.Filters,
utils.FlagsCfg: []string{},
utils.RunDelayCfg: "0",
utils.ReconnectsCfg: er.Reconnects,
utils.MaxReconnectIntervalCfg: "0",
utils.OptsCfg: opts,
}

if er.MaxReconnectInterval != 0 {
initialMP[utils.MaxReconnectIntervalCfg] = er.MaxReconnectInterval.String()
}
initialMP[utils.OptsCfg] = opts

if flags := er.Flags.SliceFlags(); flags != nil {
Expand Down
30 changes: 16 additions & 14 deletions config/libconfig_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,20 +274,22 @@ type EventReaderOptsJson struct {

// EventReaderSJsonCfg is the configuration of a single EventReader
type EventReaderJsonCfg struct {
Id *string
Type *string
Run_delay *string
Concurrent_requests *int
Source_path *string
Processed_path *string
Opts *EventReaderOptsJson
Tenant *string
Timezone *string
Filters *[]string
Flags *[]string
Fields *[]*FcTemplateJsonCfg
Partial_commit_fields *[]*FcTemplateJsonCfg
Cache_dump_fields *[]*FcTemplateJsonCfg
Id *string
Type *string
Run_delay *string
Concurrent_requests *int
Source_path *string
Processed_path *string
Tenant *string
Timezone *string
Filters *[]string
Flags *[]string
Reconnects *int
Max_reconnect_interval *string
Opts *EventReaderOptsJson
Fields *[]*FcTemplateJsonCfg
Partial_commit_fields *[]*FcTemplateJsonCfg
Cache_dump_fields *[]*FcTemplateJsonCfg
}

// EEsJsonCfg contains the configuration of EventExporterService
Expand Down

0 comments on commit f696164

Please sign in to comment.