Skip to content

Commit

Permalink
Revise ERs event exporting
Browse files Browse the repository at this point in the history
Add ees_success_ids and ees_failed_ids fields in reader config. The
former will be used to set EeIDs when the event processing returns
no error, while the latter will be used otherwise.

Add config sanity checks for the added options.

Remove Processed opts and everything related to them since they should
not be used anymore.

Fixed test compilation errors caused by the change.
  • Loading branch information
ionutboangiu authored and danbogos committed Nov 29, 2023
1 parent f4e11d8 commit 0c91933
Show file tree
Hide file tree
Showing 34 changed files with 613 additions and 1,637 deletions.
38 changes: 4 additions & 34 deletions config/config_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ const CGRATES_CFG_JSON = `
"ers": { // EventReaderService
"enabled": false, // starts the EventReader service: <true|false>
"sessions_conns":["*internal"], // RPC Connections IDs
"sessions_conns": ["*internal"], // RPC Connections IDs
"ees_conns": [], // connection for routing processed and invalid messages through EEs
"partial_cache_ttl": "1s", // the duration to cache partial records when not pairing
"readers": [
{
Expand All @@ -375,6 +376,8 @@ const CGRATES_CFG_JSON = `
"filters": [], // limit parsing based on the filters
"flags": [], // flags to influence the event processing
"reconnects": -1, // number of retries in case of connection lost
"ees_success_ids": [], // ids of exporters used for moving the successfully processed event
"ees_failed_ids": [], // ids of exporters used for moving the unprocessed event
"max_reconnect_interval": "5m", // time to wait in between reconnect attempts
"opts": {
// Partial
Expand All @@ -394,60 +397,36 @@ const CGRATES_CFG_JSON = `
// AMQP and AMQPv1
// "amqpQueueID": "cgrates_cdrs", // identifier for the primary queue where messages are consumed (0.9.1/1.0)
// "amqpQueueIDProcessed": "", // identifier for the queue where processed events are sent (0.9.1/1.0)
// "amqpUsername": "", // username for SASL PLAIN auth, exclusive to AMQP 1.0, often representing the policy name
// "amqpPassword": "", // password for authentication, exclusive to AMQP 1.0
// "amqpUsernameProcessed": "", // username for authentication related to processed messages queue
// "amqpPasswordProcessed": "", // password for authentication related to processed messages queue
// "amqpConsumerTag": "cgrates", // unique tag for the consumer, useful for message tracking and consumer management (0.9.1)
// "amqpExchange": "", // name of the primary exchange where messages will be published (0.9.1)
// "amqpExchangeType": "", // type of the primary exchange (direct, topic, fanout, headers) (0.9.1)
// "amqpRoutingKey": "", // key used for routing messages to the primary queue (0.9.1)
// "amqpExchangeProcessed": "", // name of the exchange where processed messages will be published
// "amqpExchangeTypeProcessed": "", // type of the exchange for processed messages
// "amqpRoutingKeyProcessed": "", // key used for routing processed messages
// Kafka
// "kafkaTopic": "cgrates", // the topic from were the events are read
// "kafkaGroupID": "cgrates", // the group that reads the events
// "kafkaMaxWait": "1ms", // the maximum amount of time to wait for new data to come
// "kafkaTopicProcessed": "", // the topic were the events are sent after they are processed
// SQL
// "sqlDBName": "cgrates", // the name of the database from were the events are read
// "sqlTableName": "cdrs", // the name of the table from were the events are read
// "pgSSLMode": "disable", // the ssl mode for postgres db
// "sqlDBNameProcessed": "", // the name of the database were the events are sent after they are processed
// "sqlTableNameProcessed": "", // the name of the table were the events are sent after they are processed
// "pgSSLModeProcessed": "", // the ssl mode for postgres db
// SQS and S3
// "awsRegion": "",
// "awsKey": "",
// "awsSecret": "",
// "awsToken": "",
// "awsRegionProcessed": "",
// "awsKeyProcessed": "",
// "awsSecretProcessed": "",
// "awsTokenProcessed": "",
// SQS
// "sqsQueueID": "cgrates_cdrs", // the queue id for SQS readers from were the events are read
// "sqsQueueIDProcessed": "", // the queue id for SQS readers were the events are sent after they are processed
// S3
// "s3BucketID": "cgrates_cdrs", // the bucket id for S3 readers from were the events are read
// "s3FolderPathProcessed": "", // only for S3 event posting
// "s3BucketIDProcessed": "cgrates_cdrs", // the bucket id for S3 readers were the events are sent after they are processed
// nats
// "natsJetStream": false, // controls if the nats reader uses the JetStream
Expand All @@ -461,15 +440,6 @@ const CGRATES_CFG_JSON = `
// "natsClientCertificate": "", // the path to a client certificate( used by tls)
// "natsClientKey": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWait": "5s", // the maximum amount of time to wait for a response
// "natsJetStreamProcessed": false, // controls if the nats poster uses the JetStream
// "natsSubjectProcessed": "cgrates_cdrs", // the subject were the events are posted
// "natsJWTFileProcessed": "", // the path to the JWT file( can be the chained file or the user file)
// "natsSeedFileProcessed": "", // the path to the seed files( if the JWT file is mention this is used as seedFile for the JWT user mentioned above)
// "natsCertificateAuthorityProcessed": "", // the path to a custom certificate authority file( used by tls)
// "natsClientCertificateProcessed": "", // the path to a client certificate( used by tls)
// "natsClientKeyProcessed": "", // the path to a client key( used by tls)
// "natsJetStreamMaxWaitProcessed": "5s", // the maximum amount of time to wait for a response
},
"fields":[ // import fields template, tag will match internally CDR field, in case of .csv value will be represented by index of the field value
{"tag": "ToR", "path": "*cgreq.ToR", "type": "*variable", "value": "~*req.2", "mandatory": true},
Expand Down
6 changes: 6 additions & 0 deletions config/config_it_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,7 @@ func testCGRConfigReloadERs(t *testing.T) {
expAttr := &ERsCfg{
Enabled: true,
SessionSConns: []string{utils.MetaLocalHost},
EEsConns: []string{},
Readers: []*EventReaderCfg{
{
ID: utils.MetaDefault,
Expand All @@ -609,6 +610,8 @@ func testCGRConfigReloadERs(t *testing.T) {
PartialCommitFields: []*FCTemplate{},
Reconnects: -1,
MaxReconnectInterval: 5 * time.Minute,
EEsSuccessIDs: []string{},
EEsFailedIDs: []string{},
Opts: &EventReaderOpts{
CSV: &CSVROpts{
FieldSeparator: utils.StringPointer(utils.FieldsSep),
Expand Down Expand Up @@ -640,6 +643,8 @@ func testCGRConfigReloadERs(t *testing.T) {
PartialCommitFields: []*FCTemplate{},
Reconnects: -1,
MaxReconnectInterval: 5 * time.Minute,
EEsSuccessIDs: []string{},
EEsFailedIDs: []string{},
Opts: &EventReaderOpts{
CSV: &CSVROpts{
FieldSeparator: utils.StringPointer(utils.FieldsSep),
Expand Down Expand Up @@ -884,6 +889,7 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) {
"sessions_conns": []string{
utils.MetaLocalHost,
},
utils.EEsConnsCfg: []string{},
}

cfg := NewDefaultCGRConfig()
Expand Down
4 changes: 3 additions & 1 deletion config/config_json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2005,7 +2005,7 @@ func TestDfEventReaderCfg(t *testing.T) {
eCfg := &ERsJsonCfg{
Enabled: utils.BoolPointer(false),
Sessions_conns: &[]string{utils.MetaInternal},
Ees_conns: &[]string{utils.MetaInternal},
Ees_conns: &[]string{},
Readers: &[]*EventReaderJsonCfg{
{
Id: utils.StringPointer(utils.MetaDefault),
Expand All @@ -2023,6 +2023,8 @@ func TestDfEventReaderCfg(t *testing.T) {
Partial_commit_fields: &[]*FcTemplateJsonCfg{},
Reconnects: utils.IntPointer(-1),
Max_reconnect_interval: utils.StringPointer("5m"),
Ees_success_ids: &[]string{},
Ees_failed_ids: &[]string{},
Opts: &EventReaderOptsJson{
CSVFieldSeparator: utils.StringPointer(utils.FieldsSep),
CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep),
Expand Down
13 changes: 11 additions & 2 deletions config/config_test.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions config/configsanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,22 @@ func (cfg *CGRConfig) checkConfigSanity() error {
}
}
for _, rdr := range cfg.ersCfg.Readers {
if len(rdr.EEsSuccessIDs) != 0 || len(rdr.EEsFailedIDs) != 0 {
if len(cfg.ersCfg.EEsConns) == 0 || !cfg.eesCfg.Enabled {
return fmt.Errorf("<%s> connection to <%s> required due to exporter ID references", utils.ERs, utils.EEs)
}
}
exporterIDs := cfg.eesCfg.exporterIDs()
for _, eesID := range rdr.EEsSuccessIDs {
if !slices.Contains(exporterIDs, eesID) {
return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID)
}
}
for _, eesID := range rdr.EEsFailedIDs {
if !slices.Contains(exporterIDs, eesID) {
return fmt.Errorf("<%s> exporter with id %s not defined", utils.ERs, eesID)
}
}
if !possibleReaderTypes.Has(rdr.Type) {
return fmt.Errorf("<%s> unsupported data type: %s for reader with ID: %s", utils.ERs, rdr.Type, rdr.ID)
}
Expand Down
12 changes: 12 additions & 0 deletions config/eescfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ func (eeS *EEsCfg) AsMapInterface(separator string) (initialMP map[string]any) {
return
}

func (eeS *EEsCfg) exporterIDs() []string {
ids := make([]string, 0, len(eeS.Exporters))
for _, exporter := range eeS.Exporters {
ids = append(ids, exporter.ID)
}
return ids
}

type ElsOpts struct {
Index *string
IfPrimaryTerm *int
Expand Down Expand Up @@ -198,6 +206,7 @@ type AMQPOpts struct {
Username *string
Password *string
}

type AWSOpts struct {
Region *string
Key *string
Expand All @@ -207,6 +216,7 @@ type AWSOpts struct {
S3BucketID *string
S3FolderPath *string
}

type NATSOpts struct {
JetStream *bool
Subject *string
Expand All @@ -230,9 +240,11 @@ type RPCOpts struct {
RPCReplyTimeout *time.Duration
RPCAPIOpts map[string]any
}

type KafkaOpts struct {
KafkaTopic *string
}

type EventExporterOpts struct {
CSVFieldSeparator *string
Els *ElsOpts
Expand Down

0 comments on commit 0c91933

Please sign in to comment.