From c282d40201119bc6d1ac35f7a3435b9e70b25ec1 Mon Sep 17 00:00:00 2001 From: ionutboangiu Date: Mon, 6 Nov 2023 09:23:03 -0500 Subject: [PATCH] Implement reconnect logic for AMQP EventReader Implemented functionality that handles reconnecting to the amqp server and reinitializing the amqp channel in case of errors and timeouts. This is handled by a goroutine created in the client constructor (it also handles the initial connect/init). Reconnects and reinits will use a fibonacci backoff strategy, and the attempt amount and max waiting interval can be adjusted by the 'reconnects' and 'max_reconnect_interval' config options. Messages that fail processing are now dropped instead of being requeued, preventing infinite processing loops. However, this means that the messages are lost. Handling failed messages will need to be addressed separately. 'concurrent_requests' will now set the prefetch count. Setting the prefetch count using the Qos function was able to replace our old approach that was using channels. Default value is 1024 which, according to the rabbitmq docs, 'runs into the law of diminishing returns'. The recommended value is between 100-300. Source: https://www.rabbitmq.com/confirms.html#channel-qos-prefetch-throughput Fix test compilation errors and failing tests caused by these changes. References #4160 --- config/config_it_test.go | 100 ++-- config/config_json_test.go | 28 +- config/config_test.go | 104 ++-- config/erscfg_test.go | 444 +++++++++------- ers/amqp.go | 611 +++++++++++++++------- ers/amqp_it_test.go | 14 - ers/amqp_test.go | 66 --- ers/readers_test.go | 24 +- general_tests/all_cfg_sect_rld_it_test.go | 6 +- 9 files changed, 803 insertions(+), 594 deletions(-) delete mode 100644 ers/amqp_test.go diff --git a/config/config_it_test.go b/config/config_it_test.go index 2274d1e66e..e1a0f4176c 100644 --- a/config/config_it_test.go +++ b/config/config_it_test.go @@ -596,17 +596,19 @@ func testCGRConfigReloadERs(t *testing.T) { SessionSConns: []string{utils.MetaLocalHost}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Filters: []string{}, - Flags: flagsDefault, - Fields: content, - CacheDumpFields: []*FCTemplate{}, - PartialCommitFields: []*FCTemplate{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Filters: []string{}, + Flags: flagsDefault, + Fields: content, + CacheDumpFields: []*FCTemplate{}, + PartialCommitFields: []*FCTemplate{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Opts: &EventReaderOpts{ CSV: &CSVROpts{ FieldSeparator: utils.StringPointer(utils.FieldsSep), @@ -625,17 +627,19 @@ func testCGRConfigReloadERs(t *testing.T) { }, }, { - ID: "file_reader1", - Type: utils.MetaFileCSV, - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Filters: []string{}, - Flags: flags, - Fields: content, - CacheDumpFields: []*FCTemplate{}, - PartialCommitFields: []*FCTemplate{}, + ID: "file_reader1", + Type: utils.MetaFileCSV, + RunDelay: -1, + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + Filters: []string{}, + Flags: flags, + Fields: content, + CacheDumpFields: []*FCTemplate{}, + PartialCommitFields: []*FCTemplate{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Opts: &EventReaderOpts{ CSV: &CSVROpts{ FieldSeparator: utils.StringPointer(utils.FieldsSep), @@ -827,18 +831,20 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) { "partial_cache_ttl": "1s", "readers": []any{ map[string]any{ - "id": utils.MetaDefault, - "cache_dump_fields": []any{}, - "concurrent_requests": 1024, - "fields": content, - "filters": []string{}, - "flags": []string{}, - "run_delay": "0", - "source_path": "/var/spool/cgrates/ers/in", - "processed_path": "/var/spool/cgrates/ers/out", - "tenant": "", - "timezone": "", - "type": utils.MetaNone, + "id": utils.MetaDefault, + "cache_dump_fields": []any{}, + "concurrent_requests": 1024, + "fields": content, + "filters": []string{}, + "flags": []string{}, + "run_delay": "0", + "source_path": "/var/spool/cgrates/ers/in", + "processed_path": "/var/spool/cgrates/ers/out", + "tenant": "", + "timezone": "", + "type": utils.MetaNone, + utils.ReconnectsCfg: -1, + utils.MaxReconnectIntervalCfg: "5m0s", "opts": map[string]any{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", @@ -850,18 +856,20 @@ func testCgrCfgV1ReloadConfigSection(t *testing.T) { "partial_commit_fields": []any{}, }, map[string]any{ - "cache_dump_fields": []any{}, - "concurrent_requests": 1024, - "filters": []string{}, - "flags": []string{"*dryrun"}, - "id": "file_reader1", - "processed_path": "/tmp/ers/out", - "run_delay": "-1", - "source_path": "/tmp/ers/in", - "tenant": "", - "timezone": "", - "type": "*file_csv", - "fields": content, + "cache_dump_fields": []any{}, + "concurrent_requests": 1024, + "filters": []string{}, + "flags": []string{"*dryrun"}, + "id": "file_reader1", + "processed_path": "/tmp/ers/out", + "run_delay": "-1", + "source_path": "/tmp/ers/in", + "tenant": "", + "timezone": "", + "type": "*file_csv", + "fields": content, + utils.ReconnectsCfg: -1, + utils.MaxReconnectIntervalCfg: "5m0s", "opts": map[string]any{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", diff --git a/config/config_json_test.go b/config/config_json_test.go index 5a6116db5b..74b32f2a32 100644 --- a/config/config_json_test.go +++ b/config/config_json_test.go @@ -2003,19 +2003,21 @@ func TestDfEventReaderCfg(t *testing.T) { Sessions_conns: &[]string{utils.MetaInternal}, Readers: &[]*EventReaderJsonCfg{ { - Id: utils.StringPointer(utils.MetaDefault), - Type: utils.StringPointer(utils.MetaNone), - Run_delay: utils.StringPointer("0"), - Concurrent_requests: utils.IntPointer(1024), - Source_path: utils.StringPointer("/var/spool/cgrates/ers/in"), - Processed_path: utils.StringPointer("/var/spool/cgrates/ers/out"), - Tenant: utils.StringPointer(utils.EmptyString), - Timezone: utils.StringPointer(utils.EmptyString), - Filters: &[]string{}, - Flags: &[]string{}, - Fields: &cdrFields, - Cache_dump_fields: &[]*FcTemplateJsonCfg{}, - Partial_commit_fields: &[]*FcTemplateJsonCfg{}, + Id: utils.StringPointer(utils.MetaDefault), + Type: utils.StringPointer(utils.MetaNone), + Run_delay: utils.StringPointer("0"), + Concurrent_requests: utils.IntPointer(1024), + Source_path: utils.StringPointer("/var/spool/cgrates/ers/in"), + Processed_path: utils.StringPointer("/var/spool/cgrates/ers/out"), + Tenant: utils.StringPointer(utils.EmptyString), + Timezone: utils.StringPointer(utils.EmptyString), + Filters: &[]string{}, + Flags: &[]string{}, + Fields: &cdrFields, + Cache_dump_fields: &[]*FcTemplateJsonCfg{}, + Partial_commit_fields: &[]*FcTemplateJsonCfg{}, + Reconnects: utils.IntPointer(-1), + Max_reconnect_interval: utils.StringPointer("5m"), Opts: &EventReaderOptsJson{ CSVFieldSeparator: utils.StringPointer(utils.FieldsSep), CSVHeaderDefineChar: utils.StringPointer(utils.InInFieldSep), diff --git a/config/config_test.go b/config/config_test.go index fd62662fab..b656a7bbd8 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -2341,19 +2341,21 @@ func TestERSConfig(t *testing.T) { SessionSConns: []string{"*internal:*sessions"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, - Fields: nil, - CacheDumpFields: make([]*FCTemplate, 0), - PartialCommitFields: make([]*FCTemplate, 0), + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Fields: nil, + CacheDumpFields: make([]*FCTemplate, 0), + PartialCommitFields: make([]*FCTemplate, 0), + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Opts: &EventReaderOpts{ CSV: &CSVROpts{ FieldSeparator: utils.StringPointer(utils.FieldsSep), @@ -4453,19 +4455,21 @@ func TestV1GetConfigSectionERS(t *testing.T) { utils.SessionSConnsCfg: []string{utils.MetaInternal}, utils.ReadersCfg: []map[string]any{ { - utils.FiltersCfg: []string{}, - utils.FlagsCfg: []string{}, - utils.IDCfg: "*default", - utils.ProcessedPathCfg: "/var/spool/cgrates/ers/out", - utils.RunDelayCfg: "0", - utils.SourcePathCfg: "/var/spool/cgrates/ers/in", - utils.TenantCfg: utils.EmptyString, - utils.TimezoneCfg: utils.EmptyString, - utils.CacheDumpFieldsCfg: []map[string]any{}, - utils.PartialCommitFieldsCfg: []map[string]any{}, - utils.ConcurrentRequestsCfg: 1024, - utils.TypeCfg: utils.MetaNone, - utils.FieldsCfg: []string{}, + utils.FiltersCfg: []string{}, + utils.FlagsCfg: []string{}, + utils.IDCfg: "*default", + utils.ProcessedPathCfg: "/var/spool/cgrates/ers/out", + utils.RunDelayCfg: "0", + utils.SourcePathCfg: "/var/spool/cgrates/ers/in", + utils.TenantCfg: utils.EmptyString, + utils.TimezoneCfg: utils.EmptyString, + utils.CacheDumpFieldsCfg: []map[string]any{}, + utils.PartialCommitFieldsCfg: []map[string]any{}, + utils.ConcurrentRequestsCfg: 1024, + utils.TypeCfg: utils.MetaNone, + utils.FieldsCfg: []string{}, + utils.ReconnectsCfg: -1, + utils.MaxReconnectIntervalCfg: "5m0s", utils.OptsCfg: map[string]any{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", @@ -5134,7 +5138,7 @@ func TestV1GetConfigAsJSONCfgEES(t *testing.T) { func TestV1GetConfigAsJSONCfgERS(t *testing.T) { var reply string - expected := `{"ers":{"enabled":false,"partial_cache_ttl":"1s","readers":[{"cache_dump_fields":[],"concurrent_requests":1024,"fields":[{"mandatory":true,"path":"*cgreq.ToR","tag":"ToR","type":"*variable","value":"~*req.2"},{"mandatory":true,"path":"*cgreq.OriginID","tag":"OriginID","type":"*variable","value":"~*req.3"},{"mandatory":true,"path":"*cgreq.RequestType","tag":"RequestType","type":"*variable","value":"~*req.4"},{"mandatory":true,"path":"*cgreq.Tenant","tag":"Tenant","type":"*variable","value":"~*req.6"},{"mandatory":true,"path":"*cgreq.Category","tag":"Category","type":"*variable","value":"~*req.7"},{"mandatory":true,"path":"*cgreq.Account","tag":"Account","type":"*variable","value":"~*req.8"},{"mandatory":true,"path":"*cgreq.Subject","tag":"Subject","type":"*variable","value":"~*req.9"},{"mandatory":true,"path":"*cgreq.Destination","tag":"Destination","type":"*variable","value":"~*req.10"},{"mandatory":true,"path":"*cgreq.SetupTime","tag":"SetupTime","type":"*variable","value":"~*req.11"},{"mandatory":true,"path":"*cgreq.AnswerTime","tag":"AnswerTime","type":"*variable","value":"~*req.12"},{"mandatory":true,"path":"*cgreq.Usage","tag":"Usage","type":"*variable","value":"~*req.13"}],"filters":[],"flags":[],"id":"*default","opts":{"csvFieldSeparator":",","csvHeaderDefineChar":":","csvRowLength":0,"natsSubject":"cgrates_cdrs","partialCacheAction":"*none","partialOrderField":"~*req.AnswerTime"},"partial_commit_fields":[],"processed_path":"/var/spool/cgrates/ers/out","run_delay":"0","source_path":"/var/spool/cgrates/ers/in","tenant":"","timezone":"","type":"*none"}],"sessions_conns":["*internal"]}}` + expected := `{"ers":{"enabled":false,"partial_cache_ttl":"1s","readers":[{"cache_dump_fields":[],"concurrent_requests":1024,"fields":[{"mandatory":true,"path":"*cgreq.ToR","tag":"ToR","type":"*variable","value":"~*req.2"},{"mandatory":true,"path":"*cgreq.OriginID","tag":"OriginID","type":"*variable","value":"~*req.3"},{"mandatory":true,"path":"*cgreq.RequestType","tag":"RequestType","type":"*variable","value":"~*req.4"},{"mandatory":true,"path":"*cgreq.Tenant","tag":"Tenant","type":"*variable","value":"~*req.6"},{"mandatory":true,"path":"*cgreq.Category","tag":"Category","type":"*variable","value":"~*req.7"},{"mandatory":true,"path":"*cgreq.Account","tag":"Account","type":"*variable","value":"~*req.8"},{"mandatory":true,"path":"*cgreq.Subject","tag":"Subject","type":"*variable","value":"~*req.9"},{"mandatory":true,"path":"*cgreq.Destination","tag":"Destination","type":"*variable","value":"~*req.10"},{"mandatory":true,"path":"*cgreq.SetupTime","tag":"SetupTime","type":"*variable","value":"~*req.11"},{"mandatory":true,"path":"*cgreq.AnswerTime","tag":"AnswerTime","type":"*variable","value":"~*req.12"},{"mandatory":true,"path":"*cgreq.Usage","tag":"Usage","type":"*variable","value":"~*req.13"}],"filters":[],"flags":[],"id":"*default","max_reconnect_interval":"5m0s","opts":{"csvFieldSeparator":",","csvHeaderDefineChar":":","csvRowLength":0,"natsSubject":"cgrates_cdrs","partialCacheAction":"*none","partialOrderField":"~*req.AnswerTime"},"partial_commit_fields":[],"processed_path":"/var/spool/cgrates/ers/out","reconnects":-1,"run_delay":"0","source_path":"/var/spool/cgrates/ers/in","tenant":"","timezone":"","type":"*none"}],"sessions_conns":["*internal"]}}` cgrCfg := NewDefaultCGRConfig() if err := cgrCfg.V1GetConfigAsJSON(context.Background(), &SectionWithAPIOpts{Section: ERsJson}, &reply); err != nil { t.Error(err) @@ -5314,7 +5318,7 @@ func TestV1GetConfigAsJSONAllConfig(t *testing.T) { }` var reply string cgrCfg, err := NewCGRConfigFromJSONStringWithDefaults(cfgJSON) - expected := `{"analyzers":{"cleanup_interval":"1h0m0s","db_path":"/var/spool/cgrates/analyzers","enabled":false,"index_type":"*scorch","ttl":"24h0m0s"},"apiban":{"keys":[]},"apiers":{"attributes_conns":[],"caches_conns":["*internal"],"ees_conns":[],"enabled":false,"scheduler_conns":[]},"asterisk_agent":{"asterisk_conns":[{"address":"127.0.0.1:8088","alias":"","connect_attempts":3,"max_reconnect_interval":"0s","password":"CGRateS.org","reconnects":5,"user":"cgrates"}],"create_cdr":false,"enabled":false,"sessions_conns":["*birpc_internal"]},"attributes":{"any_context":true,"apiers_conns":[],"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*processRuns":1,"*profileIDs":[],"*profileIgnoreFilters":false,"*profileRuns":0},"prefix_indexed_fields":[],"resources_conns":[],"stats_conns":[],"suffix_indexed_fields":[]},"caches":{"partitions":{"*account_action_plans":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*action_plans":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*action_triggers":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*actions":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*apiban":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"2m0s"},"*attribute_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*attribute_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*caps_events":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*cdr_ids":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"10m0s"},"*charger_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*charger_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*closed_sessions":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"10s"},"*destinations":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*diameter_messages":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"3h0m0s"},"*dispatcher_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_hosts":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_loads":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_routes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatchers":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*event_charges":{"limit":0,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"10s"},"*event_resources":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*filters":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*load_ids":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rating_plans":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rating_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*replication_hosts":{"limit":0,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*resource_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*resource_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*resources":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*reverse_destinations":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*reverse_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*route_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*route_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rpc_connections":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rpc_responses":{"limit":0,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"2s"},"*sentrypeer":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":true,"ttl":"24h0m0s"},"*shared_groups":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*stat_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*statqueue_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*statqueues":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*stir":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"3h0m0s"},"*threshold_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*threshold_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*thresholds":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*timings":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*uch":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"3h0m0s"}},"remote_conns":[],"replication_conns":[]},"cdrs":{"attributes_conns":[],"chargers_conns":[],"ees_conns":[],"enabled":false,"extra_fields":[],"online_cdr_exports":[],"rals_conns":[],"scheduler_conns":[],"session_cost_retries":5,"stats_conns":[],"store_cdrs":true,"thresholds_conns":[]},"chargers":{"attributes_conns":[],"enabled":false,"indexed_selects":true,"nested_fields":false,"prefix_indexed_fields":[],"suffix_indexed_fields":[]},"configs":{"enabled":false,"root_dir":"/var/spool/cgrates/configs","url":"/configs/"},"cores":{"caps":0,"caps_stats_interval":"0","caps_strategy":"*busy","shutdown_timeout":"1s"},"data_db":{"db_host":"127.0.0.1","db_name":"10","db_password":"","db_port":6379,"db_type":"*redis","db_user":"cgrates","items":{"*account_action_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*accounts":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*action_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*action_triggers":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*actions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*attribute_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*attribute_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*charger_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*charger_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*destinations":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_hosts":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*filters":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*load_ids":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*rating_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*rating_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*resource_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*resource_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*resources":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*reverse_destinations":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*reverse_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*route_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*route_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*shared_groups":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*stat_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*statqueue_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*statqueues":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*threshold_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*threshold_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*thresholds":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*timings":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*versions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false}},"opts":{"mongoQueryTimeout":"10s","redisCACertificate":"","redisClientCertificate":"","redisClientKey":"","redisCluster":false,"redisClusterOndownDelay":"0s","redisClusterSync":"5s","redisConnectAttempts":20,"redisConnectTimeout":"0s","redisMaxConns":10,"redisReadTimeout":"0s","redisSentinel":"","redisTLS":false,"redisWriteTimeout":"0s"},"remote_conn_id":"","remote_conns":[],"replication_cache":"","replication_conns":[],"replication_filtered":false},"diameter_agent":{"asr_template":"","concurrent_requests":-1,"dictionaries_path":"/usr/share/cgrates/diameter/dict/","enabled":false,"forced_disconnect":"*none","listen":"127.0.0.1:3868","listen_net":"tcp","origin_host":"CGR-DA","origin_realm":"cgrates.org","product_name":"CGRateS","rar_template":"","request_processors":[],"sessions_conns":["*birpc_internal"],"synced_conn_requests":false,"vendor_id":0},"dispatchers":{"any_subsystem":true,"attributes_conns":[],"enabled":false,"indexed_selects":true,"nested_fields":false,"prefix_indexed_fields":[],"prevent_loop":false,"suffix_indexed_fields":[]},"dns_agent":{"enabled":false,"listeners":[{"address":"127.0.0.1:53","network":"udp"}],"request_processors":[],"sessions_conns":["*internal"],"timezone":""},"ees":{"attributes_conns":[],"cache":{"*file_csv":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"5s"}},"enabled":false,"exporters":[{"attempts":1,"attribute_context":"","attribute_ids":[],"concurrent_requests":0,"export_path":"/var/spool/cgrates/ees","failed_posts_dir":"/var/spool/cgrates/failed_posts","fields":[],"filters":[],"flags":[],"id":"*default","opts":{},"synchronous":false,"timezone":"","type":"*none"}]},"ers":{"enabled":false,"partial_cache_ttl":"1s","readers":[{"cache_dump_fields":[],"concurrent_requests":1024,"fields":[{"mandatory":true,"path":"*cgreq.ToR","tag":"ToR","type":"*variable","value":"~*req.2"},{"mandatory":true,"path":"*cgreq.OriginID","tag":"OriginID","type":"*variable","value":"~*req.3"},{"mandatory":true,"path":"*cgreq.RequestType","tag":"RequestType","type":"*variable","value":"~*req.4"},{"mandatory":true,"path":"*cgreq.Tenant","tag":"Tenant","type":"*variable","value":"~*req.6"},{"mandatory":true,"path":"*cgreq.Category","tag":"Category","type":"*variable","value":"~*req.7"},{"mandatory":true,"path":"*cgreq.Account","tag":"Account","type":"*variable","value":"~*req.8"},{"mandatory":true,"path":"*cgreq.Subject","tag":"Subject","type":"*variable","value":"~*req.9"},{"mandatory":true,"path":"*cgreq.Destination","tag":"Destination","type":"*variable","value":"~*req.10"},{"mandatory":true,"path":"*cgreq.SetupTime","tag":"SetupTime","type":"*variable","value":"~*req.11"},{"mandatory":true,"path":"*cgreq.AnswerTime","tag":"AnswerTime","type":"*variable","value":"~*req.12"},{"mandatory":true,"path":"*cgreq.Usage","tag":"Usage","type":"*variable","value":"~*req.13"}],"filters":[],"flags":[],"id":"*default","opts":{"csvFieldSeparator":",","csvHeaderDefineChar":":","csvRowLength":0,"natsSubject":"cgrates_cdrs","partialCacheAction":"*none","partialOrderField":"~*req.AnswerTime"},"partial_commit_fields":[],"processed_path":"/var/spool/cgrates/ers/out","run_delay":"0","source_path":"/var/spool/cgrates/ers/in","tenant":"","timezone":"","type":"*none"}],"sessions_conns":["*internal"]},"filters":{"apiers_conns":[],"resources_conns":[],"stats_conns":[]},"freeswitch_agent":{"create_cdr":false,"empty_balance_ann_file":"","empty_balance_context":"","enabled":false,"event_socket_conns":[{"address":"127.0.0.1:8021","alias":"127.0.0.1:8021","max_reconnect_interval":"0s","password":"ClueCon","reconnects":5}],"extra_fields":"","low_balance_ann_file":"","max_wait_connection":"2s","sessions_conns":["*birpc_internal"],"subscribe_park":true},"general":{"connect_attempts":5,"connect_timeout":"1s","dbdata_encoding":"*msgpack","default_caching":"*reload","default_category":"call","default_request_type":"*rated","default_tenant":"cgrates.org","default_timezone":"Local","digest_equal":":","digest_separator":",","failed_posts_dir":"/var/spool/cgrates/failed_posts","failed_posts_ttl":"5s","locking_timeout":"0","log_level":6,"logger":"*syslog","max_parallel_conns":100,"max_reconnect_interval":"0","node_id":"ENGINE1","poster_attempts":3,"reconnects":-1,"reply_timeout":"2s","rounding_decimals":5,"rsr_separator":";","tpexport_dir":"/var/spool/cgrates/tpe"},"http":{"auth_users":{},"client_opts":{"dialFallbackDelay":"300ms","dialKeepAlive":"30s","dialTimeout":"30s","disableCompression":false,"disableKeepAlives":false,"expectContinueTimeout":"0s","forceAttemptHttp2":true,"idleConnTimeout":"1m30s","maxConnsPerHost":0,"maxIdleConns":100,"maxIdleConnsPerHost":2,"responseHeaderTimeout":"0s","skipTlsVerify":false,"tlsHandshakeTimeout":"10s"},"freeswitch_cdrs_url":"/freeswitch_json","http_cdrs":"/cdr_http","json_rpc_url":"/jsonrpc","registrars_url":"/registrar","use_basic_auth":false,"ws_url":"/ws"},"http_agent":[],"kamailio_agent":{"create_cdr":false,"enabled":false,"evapi_conns":[{"address":"127.0.0.1:8448","alias":"","max_reconnect_interval":"0s","reconnects":5}],"sessions_conns":["*birpc_internal"],"timezone":""},"listen":{"http":"127.0.0.1:2080","http_tls":"127.0.0.1:2280","rpc_gob":"127.0.0.1:2013","rpc_gob_tls":"127.0.0.1:2023","rpc_json":"127.0.0.1:2012","rpc_json_tls":"127.0.0.1:2022"},"loader":{"caches_conns":["*localhost"],"data_path":"./","disable_reverse":false,"field_separator":",","gapi_credentials":".gapi/credentials.json","gapi_token":".gapi/token.json","scheduler_conns":["*localhost"],"tpid":""},"loaders":[{"caches_conns":["*internal"],"data":[{"fields":[{"mandatory":true,"path":"Tenant","tag":"TenantID","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ProfileID","type":"*variable","value":"~*req.1"},{"path":"Contexts","tag":"Contexts","type":"*variable","value":"~*req.2"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.3"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.4"},{"path":"AttributeFilterIDs","tag":"AttributeFilterIDs","type":"*variable","value":"~*req.5"},{"path":"Path","tag":"Path","type":"*variable","value":"~*req.6"},{"path":"Type","tag":"Type","type":"*variable","value":"~*req.7"},{"path":"Value","tag":"Value","type":"*variable","value":"~*req.8"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.9"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.10"}],"file_name":"Attributes.csv","flags":null,"type":"*attributes"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"Type","tag":"Type","type":"*variable","value":"~*req.2"},{"path":"Element","tag":"Element","type":"*variable","value":"~*req.3"},{"path":"Values","tag":"Values","type":"*variable","value":"~*req.4"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.5"}],"file_name":"Filters.csv","flags":null,"type":"*filters"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"UsageTTL","tag":"TTL","type":"*variable","value":"~*req.4"},{"path":"Limit","tag":"Limit","type":"*variable","value":"~*req.5"},{"path":"AllocationMessage","tag":"AllocationMessage","type":"*variable","value":"~*req.6"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.7"},{"path":"Stored","tag":"Stored","type":"*variable","value":"~*req.8"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.9"},{"path":"ThresholdIDs","tag":"ThresholdIDs","type":"*variable","value":"~*req.10"}],"file_name":"Resources.csv","flags":null,"type":"*resources"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"QueueLength","tag":"QueueLength","type":"*variable","value":"~*req.4"},{"path":"TTL","tag":"TTL","type":"*variable","value":"~*req.5"},{"path":"MinItems","tag":"MinItems","type":"*variable","value":"~*req.6"},{"path":"MetricIDs","tag":"MetricIDs","type":"*variable","value":"~*req.7"},{"path":"MetricFilterIDs","tag":"MetricFilterIDs","type":"*variable","value":"~*req.8"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.9"},{"path":"Stored","tag":"Stored","type":"*variable","value":"~*req.10"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.11"},{"path":"ThresholdIDs","tag":"ThresholdIDs","type":"*variable","value":"~*req.12"}],"file_name":"Stats.csv","flags":null,"type":"*stats"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"MaxHits","tag":"MaxHits","type":"*variable","value":"~*req.4"},{"path":"MinHits","tag":"MinHits","type":"*variable","value":"~*req.5"},{"path":"MinSleep","tag":"MinSleep","type":"*variable","value":"~*req.6"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.7"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.8"},{"path":"ActionIDs","tag":"ActionIDs","type":"*variable","value":"~*req.9"},{"path":"Async","tag":"Async","type":"*variable","value":"~*req.10"}],"file_name":"Thresholds.csv","flags":null,"type":"*thresholds"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"Sorting","tag":"Sorting","type":"*variable","value":"~*req.4"},{"path":"SortingParameters","tag":"SortingParameters","type":"*variable","value":"~*req.5"},{"path":"RouteID","tag":"RouteID","type":"*variable","value":"~*req.6"},{"path":"RouteFilterIDs","tag":"RouteFilterIDs","type":"*variable","value":"~*req.7"},{"path":"RouteAccountIDs","tag":"RouteAccountIDs","type":"*variable","value":"~*req.8"},{"path":"RouteRatingPlanIDs","tag":"RouteRatingPlanIDs","type":"*variable","value":"~*req.9"},{"path":"RouteResourceIDs","tag":"RouteResourceIDs","type":"*variable","value":"~*req.10"},{"path":"RouteStatIDs","tag":"RouteStatIDs","type":"*variable","value":"~*req.11"},{"path":"RouteWeight","tag":"RouteWeight","type":"*variable","value":"~*req.12"},{"path":"RouteBlocker","tag":"RouteBlocker","type":"*variable","value":"~*req.13"},{"path":"RouteParameters","tag":"RouteParameters","type":"*variable","value":"~*req.14"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.15"}],"file_name":"Routes.csv","flags":null,"type":"*routes"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"RunID","tag":"RunID","type":"*variable","value":"~*req.4"},{"path":"AttributeIDs","tag":"AttributeIDs","type":"*variable","value":"~*req.5"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.6"}],"file_name":"Chargers.csv","flags":null,"type":"*chargers"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"Contexts","tag":"Contexts","type":"*variable","value":"~*req.2"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.3"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.4"},{"path":"Strategy","tag":"Strategy","type":"*variable","value":"~*req.5"},{"path":"StrategyParameters","tag":"StrategyParameters","type":"*variable","value":"~*req.6"},{"path":"ConnID","tag":"ConnID","type":"*variable","value":"~*req.7"},{"path":"ConnFilterIDs","tag":"ConnFilterIDs","type":"*variable","value":"~*req.8"},{"path":"ConnWeight","tag":"ConnWeight","type":"*variable","value":"~*req.9"},{"path":"ConnBlocker","tag":"ConnBlocker","type":"*variable","value":"~*req.10"},{"path":"ConnParameters","tag":"ConnParameters","type":"*variable","value":"~*req.11"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.12"}],"file_name":"DispatcherProfiles.csv","flags":null,"type":"*dispatchers"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"Address","tag":"Address","type":"*variable","value":"~*req.2"},{"path":"Transport","tag":"Transport","type":"*variable","value":"~*req.3"},{"path":"ConnectAttempts","tag":"ConnectAttempts","type":"*variable","value":"~*req.4"},{"path":"Reconnects","tag":"Reconnects","type":"*variable","value":"~*req.5"},{"path":"MaxReconnectInterval","tag":"MaxReconnectInterval","type":"*variable","value":"~*req.6"},{"path":"ConnectTimeout","tag":"ConnectTimeout","type":"*variable","value":"~*req.7"},{"path":"ReplyTimeout","tag":"ReplyTimeout","type":"*variable","value":"~*req.8"},{"path":"TLS","tag":"TLS","type":"*variable","value":"~*req.9"},{"path":"ClientKey","tag":"ClientKey","type":"*variable","value":"~*req.10"},{"path":"ClientCertificate","tag":"ClientCertificate","type":"*variable","value":"~*req.11"},{"path":"CaCertificate","tag":"CaCertificate","type":"*variable","value":"~*req.12"}],"file_name":"DispatcherHosts.csv","flags":null,"type":"*dispatcher_hosts"}],"dry_run":false,"enabled":false,"field_separator":",","id":"*default","lockfile_path":".cgr.lck","run_delay":"0","tenant":"","tp_in_dir":"/var/spool/cgrates/loader/in","tp_out_dir":"/var/spool/cgrates/loader/out"}],"mailer":{"auth_password":"CGRateS.org","auth_user":"cgrates","from_address":"cgr-mailer@localhost.localdomain","server":"localhost"},"migrator":{"out_datadb_encoding":"msgpack","out_datadb_host":"127.0.0.1","out_datadb_name":"10","out_datadb_opts":{"mongoQueryTimeout":"0s","redisCACertificate":"","redisClientCertificate":"","redisClientKey":"","redisCluster":false,"redisClusterOndownDelay":"0s","redisClusterSync":"5s","redisConnectAttempts":20,"redisConnectTimeout":"0s","redisMaxConns":10,"redisReadTimeout":"0s","redisSentinel":"","redisTLS":false,"redisWriteTimeout":"0s"},"out_datadb_password":"","out_datadb_port":"6379","out_datadb_type":"*redis","out_datadb_user":"cgrates","out_stordb_host":"127.0.0.1","out_stordb_name":"cgrates","out_stordb_opts":{"mongoQueryTimeout":"0s","mysqlDSNParams":null,"mysqlLocation":"","pgSSLMode":"","sqlConnMaxLifetime":"0s","sqlMaxIdleConns":0,"sqlMaxOpenConns":0},"out_stordb_password":"","out_stordb_port":"3306","out_stordb_type":"*mysql","out_stordb_user":"cgrates","users_filters":null},"radius_agent":{"client_dictionaries":{"*default":["/usr/share/cgrates/radius/dict/"]},"client_secrets":{"*default":"CGRateS.org"},"enabled":false,"listen_acct":"127.0.0.1:1813","listen_auth":"127.0.0.1:1812","listen_net":"udp","request_processors":[],"sessions_conns":["*internal"]},"rals":{"balance_rating_subject":{"*any":"*zero1ns","*voice":"*zero1s"},"enabled":false,"max_computed_usage":{"*any":"189h0m0s","*data":"107374182400","*mms":"10000","*sms":"10000","*voice":"72h0m0s"},"max_increments":1000000,"remove_expired":true,"rp_subject_prefix_matching":false,"stats_conns":[],"thresholds_conns":[]},"registrarc":{"dispatchers":{"hosts":[],"refresh_interval":"5m0s","registrars_conns":[]},"rpc":{"hosts":[],"refresh_interval":"5m0s","registrars_conns":[]}},"resources":{"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*units":1,"*usageID":""},"prefix_indexed_fields":[],"store_interval":"","suffix_indexed_fields":[],"thresholds_conns":[]},"routes":{"attributes_conns":[],"default_ratio":1,"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*context":"*routes","*ignoreErrors":false,"*maxCost":""},"prefix_indexed_fields":[],"rals_conns":[],"resources_conns":[],"stats_conns":[],"suffix_indexed_fields":[]},"rpc_conns":{"*bijson_localhost":{"conns":[{"address":"127.0.0.1:2014","transport":"*birpc_json"}],"poolSize":0,"strategy":"*first"},"*birpc_internal":{"conns":[{"address":"*birpc_internal","transport":""}],"poolSize":0,"strategy":"*first"},"*internal":{"conns":[{"address":"*internal","transport":""}],"poolSize":0,"strategy":"*first"},"*localhost":{"conns":[{"address":"127.0.0.1:2012","transport":"*json"}],"poolSize":0,"strategy":"*first"}},"schedulers":{"cdrs_conns":[],"dynaprepaid_actionplans":[],"enabled":false,"filters":[],"stats_conns":[],"thresholds_conns":[]},"sentrypeer":{"Audience":"https://sentrypeer.com/api","ClientID":"","ClientSecret":"","GrantType":"client_credentials","IpUrl":"https://sentrypeer.com/api/ip-addresses","NumberUrl":"https://sentrypeer.com/api/phone-numbers","TokenURL":"https://authz.sentrypeer.com/oauth/token"},"sessions":{"alterable_fields":[],"attributes_conns":[],"cdrs_conns":[],"channel_sync_interval":"0","chargers_conns":[],"client_protocol":1,"debit_interval":"0","default_usage":{"*any":"3h0m0s","*data":"1048576","*sms":"1","*voice":"3h0m0s"},"enabled":false,"listen_bigob":"","listen_bijson":"127.0.0.1:2014","min_dur_low_balance":"0","rals_conns":[],"replication_conns":[],"resources_conns":[],"routes_conns":[],"scheduler_conns":[],"session_indexes":[],"session_ttl":"0","stale_chan_max_extra_usage":"0","stats_conns":[],"stir":{"allowed_attest":["*any"],"default_attest":"A","payload_maxduration":"-1","privatekey_path":"","publickey_path":""},"store_session_costs":false,"terminate_attempts":5,"thresholds_conns":[]},"sip_agent":{"enabled":false,"listen":"127.0.0.1:5060","listen_net":"udp","request_processors":[],"retransmission_timer":1000000000,"sessions_conns":["*internal"],"timezone":""},"stats":{"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*profileIDs":[],"*profileIgnoreFilters":false},"prefix_indexed_fields":[],"store_interval":"","store_uncompressed_limit":0,"suffix_indexed_fields":[],"thresholds_conns":[]},"stor_db":{"db_host":"127.0.0.1","db_name":"cgrates","db_password":"CGRateS.org","db_port":3306,"db_type":"*mysql","db_user":"cgrates","items":{"*cdrs":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*session_costs":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_account_actions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_action_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_action_triggers":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_actions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_attributes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_chargers":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_destination_rates":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_destinations":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_dispatcher_hosts":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_dispatcher_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_filters":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_rates":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_rating_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_rating_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_resources":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_routes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_shared_groups":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_stats":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_thresholds":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_timings":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*versions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false}},"opts":{"mongoQueryTimeout":"10s","mysqlDSNParams":{},"mysqlLocation":"Local","pgSSLMode":"disable","sqlConnMaxLifetime":"0s","sqlMaxIdleConns":10,"sqlMaxOpenConns":100},"prefix_indexed_fields":[],"remote_conns":null,"replication_conns":null,"string_indexed_fields":[]},"suretax":{"bill_to_number":"","business_unit":"","client_number":"","client_tracking":"~*req.CGRID","customer_number":"~*req.Subject","include_local_cost":false,"orig_number":"~*req.Subject","p2pplus4":"","p2pzipcode":"","plus4":"","regulatory_code":"03","response_group":"03","response_type":"D4","return_file_code":"0","sales_type_code":"R","tax_exemption_code_list":"","tax_included":"0","tax_situs_rule":"04","term_number":"~*req.Destination","timezone":"UTC","trans_type_code":"010101","unit_type":"00","units":"1","url":"","validation_key":"","zipcode":""},"templates":{"*asr":[{"mandatory":true,"path":"*diamreq.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"mandatory":true,"path":"*diamreq.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*req.Destination-Host"},{"mandatory":true,"path":"*diamreq.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*req.Destination-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Realm","tag":"DestinationRealm","type":"*variable","value":"~*req.Origin-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Host","tag":"DestinationHost","type":"*variable","value":"~*req.Origin-Host"},{"mandatory":true,"path":"*diamreq.Auth-Application-Id","tag":"AuthApplicationId","type":"*variable","value":"~*vars.*appid"}],"*cca":[{"mandatory":true,"path":"*rep.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"path":"*rep.Result-Code","tag":"ResultCode","type":"*constant","value":"2001"},{"mandatory":true,"path":"*rep.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*vars.OriginHost"},{"mandatory":true,"path":"*rep.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*vars.OriginRealm"},{"mandatory":true,"path":"*rep.Auth-Application-Id","tag":"AuthApplicationId","type":"*variable","value":"~*vars.*appid"},{"mandatory":true,"path":"*rep.CC-Request-Type","tag":"CCRequestType","type":"*variable","value":"~*req.CC-Request-Type"},{"mandatory":true,"path":"*rep.CC-Request-Number","tag":"CCRequestNumber","type":"*variable","value":"~*req.CC-Request-Number"}],"*cdrLog":[{"mandatory":true,"path":"*cdr.ToR","tag":"ToR","type":"*variable","value":"~*req.BalanceType"},{"mandatory":true,"path":"*cdr.OriginHost","tag":"OriginHost","type":"*constant","value":"127.0.0.1"},{"mandatory":true,"path":"*cdr.RequestType","tag":"RequestType","type":"*constant","value":"*none"},{"mandatory":true,"path":"*cdr.Tenant","tag":"Tenant","type":"*variable","value":"~*req.Tenant"},{"mandatory":true,"path":"*cdr.Account","tag":"Account","type":"*variable","value":"~*req.Account"},{"mandatory":true,"path":"*cdr.Subject","tag":"Subject","type":"*variable","value":"~*req.Account"},{"mandatory":true,"path":"*cdr.Cost","tag":"Cost","type":"*variable","value":"~*req.Cost"},{"mandatory":true,"path":"*cdr.Source","tag":"Source","type":"*constant","value":"*cdrLog"},{"mandatory":true,"path":"*cdr.Usage","tag":"Usage","type":"*constant","value":"1"},{"mandatory":true,"path":"*cdr.RunID","tag":"RunID","type":"*variable","value":"~*req.ActionType"},{"mandatory":true,"path":"*cdr.SetupTime","tag":"SetupTime","type":"*constant","value":"*now"},{"mandatory":true,"path":"*cdr.AnswerTime","tag":"AnswerTime","type":"*constant","value":"*now"},{"mandatory":true,"path":"*cdr.PreRated","tag":"PreRated","type":"*constant","value":"true"}],"*err":[{"mandatory":true,"path":"*rep.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"mandatory":true,"path":"*rep.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*vars.OriginHost"},{"mandatory":true,"path":"*rep.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*vars.OriginRealm"}],"*errSip":[{"mandatory":true,"path":"*rep.Request","tag":"Request","type":"*constant","value":"SIP/2.0 500 Internal Server Error"}],"*rar":[{"mandatory":true,"path":"*diamreq.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"mandatory":true,"path":"*diamreq.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*req.Destination-Host"},{"mandatory":true,"path":"*diamreq.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*req.Destination-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Realm","tag":"DestinationRealm","type":"*variable","value":"~*req.Origin-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Host","tag":"DestinationHost","type":"*variable","value":"~*req.Origin-Host"},{"mandatory":true,"path":"*diamreq.Auth-Application-Id","tag":"AuthApplicationId","type":"*variable","value":"~*vars.*appid"},{"path":"*diamreq.Re-Auth-Request-Type","tag":"ReAuthRequestType","type":"*constant","value":"0"}]},"thresholds":{"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*profileIDs":[],"*profileIgnoreFilters":false},"prefix_indexed_fields":[],"store_interval":"","suffix_indexed_fields":[]},"tls":{"ca_certificate":"","client_certificate":"","client_key":"","server_certificate":"","server_key":"","server_name":"","server_policy":4}}` + expected := `{"analyzers":{"cleanup_interval":"1h0m0s","db_path":"/var/spool/cgrates/analyzers","enabled":false,"index_type":"*scorch","ttl":"24h0m0s"},"apiban":{"keys":[]},"apiers":{"attributes_conns":[],"caches_conns":["*internal"],"ees_conns":[],"enabled":false,"scheduler_conns":[]},"asterisk_agent":{"asterisk_conns":[{"address":"127.0.0.1:8088","alias":"","connect_attempts":3,"max_reconnect_interval":"0s","password":"CGRateS.org","reconnects":5,"user":"cgrates"}],"create_cdr":false,"enabled":false,"sessions_conns":["*birpc_internal"]},"attributes":{"any_context":true,"apiers_conns":[],"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*processRuns":1,"*profileIDs":[],"*profileIgnoreFilters":false,"*profileRuns":0},"prefix_indexed_fields":[],"resources_conns":[],"stats_conns":[],"suffix_indexed_fields":[]},"caches":{"partitions":{"*account_action_plans":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*action_plans":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*action_triggers":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*actions":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*apiban":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"2m0s"},"*attribute_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*attribute_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*caps_events":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*cdr_ids":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"10m0s"},"*charger_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*charger_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*closed_sessions":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"10s"},"*destinations":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*diameter_messages":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"3h0m0s"},"*dispatcher_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_hosts":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_loads":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_routes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*dispatchers":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*event_charges":{"limit":0,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"10s"},"*event_resources":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*filters":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*load_ids":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rating_plans":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rating_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*replication_hosts":{"limit":0,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*resource_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*resource_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*resources":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*reverse_destinations":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*reverse_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*route_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*route_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rpc_connections":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*rpc_responses":{"limit":0,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"2s"},"*sentrypeer":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":true,"ttl":"24h0m0s"},"*shared_groups":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*stat_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*statqueue_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*statqueues":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*stir":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"3h0m0s"},"*threshold_filter_indexes":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*threshold_profiles":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*thresholds":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*timings":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false},"*uch":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"3h0m0s"}},"remote_conns":[],"replication_conns":[]},"cdrs":{"attributes_conns":[],"chargers_conns":[],"ees_conns":[],"enabled":false,"extra_fields":[],"online_cdr_exports":[],"rals_conns":[],"scheduler_conns":[],"session_cost_retries":5,"stats_conns":[],"store_cdrs":true,"thresholds_conns":[]},"chargers":{"attributes_conns":[],"enabled":false,"indexed_selects":true,"nested_fields":false,"prefix_indexed_fields":[],"suffix_indexed_fields":[]},"configs":{"enabled":false,"root_dir":"/var/spool/cgrates/configs","url":"/configs/"},"cores":{"caps":0,"caps_stats_interval":"0","caps_strategy":"*busy","shutdown_timeout":"1s"},"data_db":{"db_host":"127.0.0.1","db_name":"10","db_password":"","db_port":6379,"db_type":"*redis","db_user":"cgrates","items":{"*account_action_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*accounts":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*action_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*action_triggers":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*actions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*attribute_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*attribute_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*charger_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*charger_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*destinations":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_hosts":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*dispatcher_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*filters":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*load_ids":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*rating_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*rating_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*resource_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*resource_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*resources":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*reverse_destinations":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*reverse_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*route_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*route_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*shared_groups":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*stat_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*statqueue_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*statqueues":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*threshold_filter_indexes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*threshold_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*thresholds":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*timings":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*versions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false}},"opts":{"mongoQueryTimeout":"10s","redisCACertificate":"","redisClientCertificate":"","redisClientKey":"","redisCluster":false,"redisClusterOndownDelay":"0s","redisClusterSync":"5s","redisConnectAttempts":20,"redisConnectTimeout":"0s","redisMaxConns":10,"redisReadTimeout":"0s","redisSentinel":"","redisTLS":false,"redisWriteTimeout":"0s"},"remote_conn_id":"","remote_conns":[],"replication_cache":"","replication_conns":[],"replication_filtered":false},"diameter_agent":{"asr_template":"","concurrent_requests":-1,"dictionaries_path":"/usr/share/cgrates/diameter/dict/","enabled":false,"forced_disconnect":"*none","listen":"127.0.0.1:3868","listen_net":"tcp","origin_host":"CGR-DA","origin_realm":"cgrates.org","product_name":"CGRateS","rar_template":"","request_processors":[],"sessions_conns":["*birpc_internal"],"synced_conn_requests":false,"vendor_id":0},"dispatchers":{"any_subsystem":true,"attributes_conns":[],"enabled":false,"indexed_selects":true,"nested_fields":false,"prefix_indexed_fields":[],"prevent_loop":false,"suffix_indexed_fields":[]},"dns_agent":{"enabled":false,"listeners":[{"address":"127.0.0.1:53","network":"udp"}],"request_processors":[],"sessions_conns":["*internal"],"timezone":""},"ees":{"attributes_conns":[],"cache":{"*file_csv":{"limit":-1,"precache":false,"remote":false,"replicate":false,"static_ttl":false,"ttl":"5s"}},"enabled":false,"exporters":[{"attempts":1,"attribute_context":"","attribute_ids":[],"concurrent_requests":0,"export_path":"/var/spool/cgrates/ees","failed_posts_dir":"/var/spool/cgrates/failed_posts","fields":[],"filters":[],"flags":[],"id":"*default","opts":{},"synchronous":false,"timezone":"","type":"*none"}]},"ers":{"enabled":false,"partial_cache_ttl":"1s","readers":[{"cache_dump_fields":[],"concurrent_requests":1024,"fields":[{"mandatory":true,"path":"*cgreq.ToR","tag":"ToR","type":"*variable","value":"~*req.2"},{"mandatory":true,"path":"*cgreq.OriginID","tag":"OriginID","type":"*variable","value":"~*req.3"},{"mandatory":true,"path":"*cgreq.RequestType","tag":"RequestType","type":"*variable","value":"~*req.4"},{"mandatory":true,"path":"*cgreq.Tenant","tag":"Tenant","type":"*variable","value":"~*req.6"},{"mandatory":true,"path":"*cgreq.Category","tag":"Category","type":"*variable","value":"~*req.7"},{"mandatory":true,"path":"*cgreq.Account","tag":"Account","type":"*variable","value":"~*req.8"},{"mandatory":true,"path":"*cgreq.Subject","tag":"Subject","type":"*variable","value":"~*req.9"},{"mandatory":true,"path":"*cgreq.Destination","tag":"Destination","type":"*variable","value":"~*req.10"},{"mandatory":true,"path":"*cgreq.SetupTime","tag":"SetupTime","type":"*variable","value":"~*req.11"},{"mandatory":true,"path":"*cgreq.AnswerTime","tag":"AnswerTime","type":"*variable","value":"~*req.12"},{"mandatory":true,"path":"*cgreq.Usage","tag":"Usage","type":"*variable","value":"~*req.13"}],"filters":[],"flags":[],"id":"*default","max_reconnect_interval":"5m0s","opts":{"csvFieldSeparator":",","csvHeaderDefineChar":":","csvRowLength":0,"natsSubject":"cgrates_cdrs","partialCacheAction":"*none","partialOrderField":"~*req.AnswerTime"},"partial_commit_fields":[],"processed_path":"/var/spool/cgrates/ers/out","reconnects":-1,"run_delay":"0","source_path":"/var/spool/cgrates/ers/in","tenant":"","timezone":"","type":"*none"}],"sessions_conns":["*internal"]},"filters":{"apiers_conns":[],"resources_conns":[],"stats_conns":[]},"freeswitch_agent":{"create_cdr":false,"empty_balance_ann_file":"","empty_balance_context":"","enabled":false,"event_socket_conns":[{"address":"127.0.0.1:8021","alias":"127.0.0.1:8021","max_reconnect_interval":"0s","password":"ClueCon","reconnects":5}],"extra_fields":"","low_balance_ann_file":"","max_wait_connection":"2s","sessions_conns":["*birpc_internal"],"subscribe_park":true},"general":{"connect_attempts":5,"connect_timeout":"1s","dbdata_encoding":"*msgpack","default_caching":"*reload","default_category":"call","default_request_type":"*rated","default_tenant":"cgrates.org","default_timezone":"Local","digest_equal":":","digest_separator":",","failed_posts_dir":"/var/spool/cgrates/failed_posts","failed_posts_ttl":"5s","locking_timeout":"0","log_level":6,"logger":"*syslog","max_parallel_conns":100,"max_reconnect_interval":"0","node_id":"ENGINE1","poster_attempts":3,"reconnects":-1,"reply_timeout":"2s","rounding_decimals":5,"rsr_separator":";","tpexport_dir":"/var/spool/cgrates/tpe"},"http":{"auth_users":{},"client_opts":{"dialFallbackDelay":"300ms","dialKeepAlive":"30s","dialTimeout":"30s","disableCompression":false,"disableKeepAlives":false,"expectContinueTimeout":"0s","forceAttemptHttp2":true,"idleConnTimeout":"1m30s","maxConnsPerHost":0,"maxIdleConns":100,"maxIdleConnsPerHost":2,"responseHeaderTimeout":"0s","skipTlsVerify":false,"tlsHandshakeTimeout":"10s"},"freeswitch_cdrs_url":"/freeswitch_json","http_cdrs":"/cdr_http","json_rpc_url":"/jsonrpc","registrars_url":"/registrar","use_basic_auth":false,"ws_url":"/ws"},"http_agent":[],"kamailio_agent":{"create_cdr":false,"enabled":false,"evapi_conns":[{"address":"127.0.0.1:8448","alias":"","max_reconnect_interval":"0s","reconnects":5}],"sessions_conns":["*birpc_internal"],"timezone":""},"listen":{"http":"127.0.0.1:2080","http_tls":"127.0.0.1:2280","rpc_gob":"127.0.0.1:2013","rpc_gob_tls":"127.0.0.1:2023","rpc_json":"127.0.0.1:2012","rpc_json_tls":"127.0.0.1:2022"},"loader":{"caches_conns":["*localhost"],"data_path":"./","disable_reverse":false,"field_separator":",","gapi_credentials":".gapi/credentials.json","gapi_token":".gapi/token.json","scheduler_conns":["*localhost"],"tpid":""},"loaders":[{"caches_conns":["*internal"],"data":[{"fields":[{"mandatory":true,"path":"Tenant","tag":"TenantID","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ProfileID","type":"*variable","value":"~*req.1"},{"path":"Contexts","tag":"Contexts","type":"*variable","value":"~*req.2"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.3"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.4"},{"path":"AttributeFilterIDs","tag":"AttributeFilterIDs","type":"*variable","value":"~*req.5"},{"path":"Path","tag":"Path","type":"*variable","value":"~*req.6"},{"path":"Type","tag":"Type","type":"*variable","value":"~*req.7"},{"path":"Value","tag":"Value","type":"*variable","value":"~*req.8"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.9"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.10"}],"file_name":"Attributes.csv","flags":null,"type":"*attributes"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"Type","tag":"Type","type":"*variable","value":"~*req.2"},{"path":"Element","tag":"Element","type":"*variable","value":"~*req.3"},{"path":"Values","tag":"Values","type":"*variable","value":"~*req.4"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.5"}],"file_name":"Filters.csv","flags":null,"type":"*filters"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"UsageTTL","tag":"TTL","type":"*variable","value":"~*req.4"},{"path":"Limit","tag":"Limit","type":"*variable","value":"~*req.5"},{"path":"AllocationMessage","tag":"AllocationMessage","type":"*variable","value":"~*req.6"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.7"},{"path":"Stored","tag":"Stored","type":"*variable","value":"~*req.8"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.9"},{"path":"ThresholdIDs","tag":"ThresholdIDs","type":"*variable","value":"~*req.10"}],"file_name":"Resources.csv","flags":null,"type":"*resources"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"QueueLength","tag":"QueueLength","type":"*variable","value":"~*req.4"},{"path":"TTL","tag":"TTL","type":"*variable","value":"~*req.5"},{"path":"MinItems","tag":"MinItems","type":"*variable","value":"~*req.6"},{"path":"MetricIDs","tag":"MetricIDs","type":"*variable","value":"~*req.7"},{"path":"MetricFilterIDs","tag":"MetricFilterIDs","type":"*variable","value":"~*req.8"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.9"},{"path":"Stored","tag":"Stored","type":"*variable","value":"~*req.10"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.11"},{"path":"ThresholdIDs","tag":"ThresholdIDs","type":"*variable","value":"~*req.12"}],"file_name":"Stats.csv","flags":null,"type":"*stats"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"MaxHits","tag":"MaxHits","type":"*variable","value":"~*req.4"},{"path":"MinHits","tag":"MinHits","type":"*variable","value":"~*req.5"},{"path":"MinSleep","tag":"MinSleep","type":"*variable","value":"~*req.6"},{"path":"Blocker","tag":"Blocker","type":"*variable","value":"~*req.7"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.8"},{"path":"ActionIDs","tag":"ActionIDs","type":"*variable","value":"~*req.9"},{"path":"Async","tag":"Async","type":"*variable","value":"~*req.10"}],"file_name":"Thresholds.csv","flags":null,"type":"*thresholds"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"Sorting","tag":"Sorting","type":"*variable","value":"~*req.4"},{"path":"SortingParameters","tag":"SortingParameters","type":"*variable","value":"~*req.5"},{"path":"RouteID","tag":"RouteID","type":"*variable","value":"~*req.6"},{"path":"RouteFilterIDs","tag":"RouteFilterIDs","type":"*variable","value":"~*req.7"},{"path":"RouteAccountIDs","tag":"RouteAccountIDs","type":"*variable","value":"~*req.8"},{"path":"RouteRatingPlanIDs","tag":"RouteRatingPlanIDs","type":"*variable","value":"~*req.9"},{"path":"RouteResourceIDs","tag":"RouteResourceIDs","type":"*variable","value":"~*req.10"},{"path":"RouteStatIDs","tag":"RouteStatIDs","type":"*variable","value":"~*req.11"},{"path":"RouteWeight","tag":"RouteWeight","type":"*variable","value":"~*req.12"},{"path":"RouteBlocker","tag":"RouteBlocker","type":"*variable","value":"~*req.13"},{"path":"RouteParameters","tag":"RouteParameters","type":"*variable","value":"~*req.14"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.15"}],"file_name":"Routes.csv","flags":null,"type":"*routes"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.2"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.3"},{"path":"RunID","tag":"RunID","type":"*variable","value":"~*req.4"},{"path":"AttributeIDs","tag":"AttributeIDs","type":"*variable","value":"~*req.5"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.6"}],"file_name":"Chargers.csv","flags":null,"type":"*chargers"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"Contexts","tag":"Contexts","type":"*variable","value":"~*req.2"},{"path":"FilterIDs","tag":"FilterIDs","type":"*variable","value":"~*req.3"},{"path":"ActivationInterval","tag":"ActivationInterval","type":"*variable","value":"~*req.4"},{"path":"Strategy","tag":"Strategy","type":"*variable","value":"~*req.5"},{"path":"StrategyParameters","tag":"StrategyParameters","type":"*variable","value":"~*req.6"},{"path":"ConnID","tag":"ConnID","type":"*variable","value":"~*req.7"},{"path":"ConnFilterIDs","tag":"ConnFilterIDs","type":"*variable","value":"~*req.8"},{"path":"ConnWeight","tag":"ConnWeight","type":"*variable","value":"~*req.9"},{"path":"ConnBlocker","tag":"ConnBlocker","type":"*variable","value":"~*req.10"},{"path":"ConnParameters","tag":"ConnParameters","type":"*variable","value":"~*req.11"},{"path":"Weight","tag":"Weight","type":"*variable","value":"~*req.12"}],"file_name":"DispatcherProfiles.csv","flags":null,"type":"*dispatchers"},{"fields":[{"mandatory":true,"path":"Tenant","tag":"Tenant","type":"*variable","value":"~*req.0"},{"mandatory":true,"path":"ID","tag":"ID","type":"*variable","value":"~*req.1"},{"path":"Address","tag":"Address","type":"*variable","value":"~*req.2"},{"path":"Transport","tag":"Transport","type":"*variable","value":"~*req.3"},{"path":"ConnectAttempts","tag":"ConnectAttempts","type":"*variable","value":"~*req.4"},{"path":"Reconnects","tag":"Reconnects","type":"*variable","value":"~*req.5"},{"path":"MaxReconnectInterval","tag":"MaxReconnectInterval","type":"*variable","value":"~*req.6"},{"path":"ConnectTimeout","tag":"ConnectTimeout","type":"*variable","value":"~*req.7"},{"path":"ReplyTimeout","tag":"ReplyTimeout","type":"*variable","value":"~*req.8"},{"path":"TLS","tag":"TLS","type":"*variable","value":"~*req.9"},{"path":"ClientKey","tag":"ClientKey","type":"*variable","value":"~*req.10"},{"path":"ClientCertificate","tag":"ClientCertificate","type":"*variable","value":"~*req.11"},{"path":"CaCertificate","tag":"CaCertificate","type":"*variable","value":"~*req.12"}],"file_name":"DispatcherHosts.csv","flags":null,"type":"*dispatcher_hosts"}],"dry_run":false,"enabled":false,"field_separator":",","id":"*default","lockfile_path":".cgr.lck","run_delay":"0","tenant":"","tp_in_dir":"/var/spool/cgrates/loader/in","tp_out_dir":"/var/spool/cgrates/loader/out"}],"mailer":{"auth_password":"CGRateS.org","auth_user":"cgrates","from_address":"cgr-mailer@localhost.localdomain","server":"localhost"},"migrator":{"out_datadb_encoding":"msgpack","out_datadb_host":"127.0.0.1","out_datadb_name":"10","out_datadb_opts":{"mongoQueryTimeout":"0s","redisCACertificate":"","redisClientCertificate":"","redisClientKey":"","redisCluster":false,"redisClusterOndownDelay":"0s","redisClusterSync":"5s","redisConnectAttempts":20,"redisConnectTimeout":"0s","redisMaxConns":10,"redisReadTimeout":"0s","redisSentinel":"","redisTLS":false,"redisWriteTimeout":"0s"},"out_datadb_password":"","out_datadb_port":"6379","out_datadb_type":"*redis","out_datadb_user":"cgrates","out_stordb_host":"127.0.0.1","out_stordb_name":"cgrates","out_stordb_opts":{"mongoQueryTimeout":"0s","mysqlDSNParams":null,"mysqlLocation":"","pgSSLMode":"","sqlConnMaxLifetime":"0s","sqlMaxIdleConns":0,"sqlMaxOpenConns":0},"out_stordb_password":"","out_stordb_port":"3306","out_stordb_type":"*mysql","out_stordb_user":"cgrates","users_filters":null},"radius_agent":{"client_dictionaries":{"*default":["/usr/share/cgrates/radius/dict/"]},"client_secrets":{"*default":"CGRateS.org"},"enabled":false,"listen_acct":"127.0.0.1:1813","listen_auth":"127.0.0.1:1812","listen_net":"udp","request_processors":[],"sessions_conns":["*internal"]},"rals":{"balance_rating_subject":{"*any":"*zero1ns","*voice":"*zero1s"},"enabled":false,"max_computed_usage":{"*any":"189h0m0s","*data":"107374182400","*mms":"10000","*sms":"10000","*voice":"72h0m0s"},"max_increments":1000000,"remove_expired":true,"rp_subject_prefix_matching":false,"stats_conns":[],"thresholds_conns":[]},"registrarc":{"dispatchers":{"hosts":[],"refresh_interval":"5m0s","registrars_conns":[]},"rpc":{"hosts":[],"refresh_interval":"5m0s","registrars_conns":[]}},"resources":{"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*units":1,"*usageID":""},"prefix_indexed_fields":[],"store_interval":"","suffix_indexed_fields":[],"thresholds_conns":[]},"routes":{"attributes_conns":[],"default_ratio":1,"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*context":"*routes","*ignoreErrors":false,"*maxCost":""},"prefix_indexed_fields":[],"rals_conns":[],"resources_conns":[],"stats_conns":[],"suffix_indexed_fields":[]},"rpc_conns":{"*bijson_localhost":{"conns":[{"address":"127.0.0.1:2014","transport":"*birpc_json"}],"poolSize":0,"strategy":"*first"},"*birpc_internal":{"conns":[{"address":"*birpc_internal","transport":""}],"poolSize":0,"strategy":"*first"},"*internal":{"conns":[{"address":"*internal","transport":""}],"poolSize":0,"strategy":"*first"},"*localhost":{"conns":[{"address":"127.0.0.1:2012","transport":"*json"}],"poolSize":0,"strategy":"*first"}},"schedulers":{"cdrs_conns":[],"dynaprepaid_actionplans":[],"enabled":false,"filters":[],"stats_conns":[],"thresholds_conns":[]},"sentrypeer":{"Audience":"https://sentrypeer.com/api","ClientID":"","ClientSecret":"","GrantType":"client_credentials","IpUrl":"https://sentrypeer.com/api/ip-addresses","NumberUrl":"https://sentrypeer.com/api/phone-numbers","TokenURL":"https://authz.sentrypeer.com/oauth/token"},"sessions":{"alterable_fields":[],"attributes_conns":[],"cdrs_conns":[],"channel_sync_interval":"0","chargers_conns":[],"client_protocol":1,"debit_interval":"0","default_usage":{"*any":"3h0m0s","*data":"1048576","*sms":"1","*voice":"3h0m0s"},"enabled":false,"listen_bigob":"","listen_bijson":"127.0.0.1:2014","min_dur_low_balance":"0","rals_conns":[],"replication_conns":[],"resources_conns":[],"routes_conns":[],"scheduler_conns":[],"session_indexes":[],"session_ttl":"0","stale_chan_max_extra_usage":"0","stats_conns":[],"stir":{"allowed_attest":["*any"],"default_attest":"A","payload_maxduration":"-1","privatekey_path":"","publickey_path":""},"store_session_costs":false,"terminate_attempts":5,"thresholds_conns":[]},"sip_agent":{"enabled":false,"listen":"127.0.0.1:5060","listen_net":"udp","request_processors":[],"retransmission_timer":1000000000,"sessions_conns":["*internal"],"timezone":""},"stats":{"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*profileIDs":[],"*profileIgnoreFilters":false},"prefix_indexed_fields":[],"store_interval":"","store_uncompressed_limit":0,"suffix_indexed_fields":[],"thresholds_conns":[]},"stor_db":{"db_host":"127.0.0.1","db_name":"cgrates","db_password":"CGRateS.org","db_port":3306,"db_type":"*mysql","db_user":"cgrates","items":{"*cdrs":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*session_costs":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_account_actions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_action_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_action_triggers":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_actions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_attributes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_chargers":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_destination_rates":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_destinations":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_dispatcher_hosts":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_dispatcher_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_filters":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_rates":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_rating_plans":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_rating_profiles":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_resources":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_routes":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_shared_groups":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_stats":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_thresholds":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*tp_timings":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false},"*versions":{"limit":-1,"remote":false,"replicate":false,"static_ttl":false}},"opts":{"mongoQueryTimeout":"10s","mysqlDSNParams":{},"mysqlLocation":"Local","pgSSLMode":"disable","sqlConnMaxLifetime":"0s","sqlMaxIdleConns":10,"sqlMaxOpenConns":100},"prefix_indexed_fields":[],"remote_conns":null,"replication_conns":null,"string_indexed_fields":[]},"suretax":{"bill_to_number":"","business_unit":"","client_number":"","client_tracking":"~*req.CGRID","customer_number":"~*req.Subject","include_local_cost":false,"orig_number":"~*req.Subject","p2pplus4":"","p2pzipcode":"","plus4":"","regulatory_code":"03","response_group":"03","response_type":"D4","return_file_code":"0","sales_type_code":"R","tax_exemption_code_list":"","tax_included":"0","tax_situs_rule":"04","term_number":"~*req.Destination","timezone":"UTC","trans_type_code":"010101","unit_type":"00","units":"1","url":"","validation_key":"","zipcode":""},"templates":{"*asr":[{"mandatory":true,"path":"*diamreq.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"mandatory":true,"path":"*diamreq.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*req.Destination-Host"},{"mandatory":true,"path":"*diamreq.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*req.Destination-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Realm","tag":"DestinationRealm","type":"*variable","value":"~*req.Origin-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Host","tag":"DestinationHost","type":"*variable","value":"~*req.Origin-Host"},{"mandatory":true,"path":"*diamreq.Auth-Application-Id","tag":"AuthApplicationId","type":"*variable","value":"~*vars.*appid"}],"*cca":[{"mandatory":true,"path":"*rep.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"path":"*rep.Result-Code","tag":"ResultCode","type":"*constant","value":"2001"},{"mandatory":true,"path":"*rep.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*vars.OriginHost"},{"mandatory":true,"path":"*rep.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*vars.OriginRealm"},{"mandatory":true,"path":"*rep.Auth-Application-Id","tag":"AuthApplicationId","type":"*variable","value":"~*vars.*appid"},{"mandatory":true,"path":"*rep.CC-Request-Type","tag":"CCRequestType","type":"*variable","value":"~*req.CC-Request-Type"},{"mandatory":true,"path":"*rep.CC-Request-Number","tag":"CCRequestNumber","type":"*variable","value":"~*req.CC-Request-Number"}],"*cdrLog":[{"mandatory":true,"path":"*cdr.ToR","tag":"ToR","type":"*variable","value":"~*req.BalanceType"},{"mandatory":true,"path":"*cdr.OriginHost","tag":"OriginHost","type":"*constant","value":"127.0.0.1"},{"mandatory":true,"path":"*cdr.RequestType","tag":"RequestType","type":"*constant","value":"*none"},{"mandatory":true,"path":"*cdr.Tenant","tag":"Tenant","type":"*variable","value":"~*req.Tenant"},{"mandatory":true,"path":"*cdr.Account","tag":"Account","type":"*variable","value":"~*req.Account"},{"mandatory":true,"path":"*cdr.Subject","tag":"Subject","type":"*variable","value":"~*req.Account"},{"mandatory":true,"path":"*cdr.Cost","tag":"Cost","type":"*variable","value":"~*req.Cost"},{"mandatory":true,"path":"*cdr.Source","tag":"Source","type":"*constant","value":"*cdrLog"},{"mandatory":true,"path":"*cdr.Usage","tag":"Usage","type":"*constant","value":"1"},{"mandatory":true,"path":"*cdr.RunID","tag":"RunID","type":"*variable","value":"~*req.ActionType"},{"mandatory":true,"path":"*cdr.SetupTime","tag":"SetupTime","type":"*constant","value":"*now"},{"mandatory":true,"path":"*cdr.AnswerTime","tag":"AnswerTime","type":"*constant","value":"*now"},{"mandatory":true,"path":"*cdr.PreRated","tag":"PreRated","type":"*constant","value":"true"}],"*err":[{"mandatory":true,"path":"*rep.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"mandatory":true,"path":"*rep.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*vars.OriginHost"},{"mandatory":true,"path":"*rep.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*vars.OriginRealm"}],"*errSip":[{"mandatory":true,"path":"*rep.Request","tag":"Request","type":"*constant","value":"SIP/2.0 500 Internal Server Error"}],"*rar":[{"mandatory":true,"path":"*diamreq.Session-Id","tag":"SessionId","type":"*variable","value":"~*req.Session-Id"},{"mandatory":true,"path":"*diamreq.Origin-Host","tag":"OriginHost","type":"*variable","value":"~*req.Destination-Host"},{"mandatory":true,"path":"*diamreq.Origin-Realm","tag":"OriginRealm","type":"*variable","value":"~*req.Destination-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Realm","tag":"DestinationRealm","type":"*variable","value":"~*req.Origin-Realm"},{"mandatory":true,"path":"*diamreq.Destination-Host","tag":"DestinationHost","type":"*variable","value":"~*req.Origin-Host"},{"mandatory":true,"path":"*diamreq.Auth-Application-Id","tag":"AuthApplicationId","type":"*variable","value":"~*vars.*appid"},{"path":"*diamreq.Re-Auth-Request-Type","tag":"ReAuthRequestType","type":"*constant","value":"0"}]},"thresholds":{"enabled":false,"indexed_selects":true,"nested_fields":false,"opts":{"*profileIDs":[],"*profileIgnoreFilters":false},"prefix_indexed_fields":[],"store_interval":"","suffix_indexed_fields":[]},"tls":{"ca_certificate":"","client_certificate":"","client_key":"","server_certificate":"","server_key":"","server_name":"","server_policy":4}}` if err != nil { t.Fatal(err) } @@ -5362,16 +5366,18 @@ func TestCgrCdfEventReader(t *testing.T) { SessionSConns: []string{utils.ConcatenatedKey(utils.MetaInternal, utils.MetaSessionS)}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -5473,16 +5479,18 @@ func TestCgrCdfEventExporter(t *testing.T) { func TestCgrCfgEventReaderDefault(t *testing.T) { eCfg := &EventReaderCfg{ - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, diff --git a/config/erscfg_test.go b/config/erscfg_test.go index ccbbb25bde..63fda131f4 100644 --- a/config/erscfg_test.go +++ b/config/erscfg_test.go @@ -52,26 +52,30 @@ func TestERSClone(t *testing.T) { ], "failed_calls_prefix": "randomPrefix", "partial_record_cache": "1s", - "partial_cache_expiry_action": "randomAction" - }, - ], -}, + "partial_cache_expiry_action": "randomAction", + "reconnects": 5, + "max_reconnect_interval": "3m" + } + ] +} }` expectedERsCfg := &ERsCfg{ Enabled: true, SessionSConns: []string{"*internal:*sessions"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -116,16 +120,18 @@ func TestERSClone(t *testing.T) { }, }, { - ID: "file_reader1", - Type: "*file_csv", - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Tenant: NewRSRParsersMustCompile("~*req.Destination1", utils.InfieldSep), - Timezone: utils.EmptyString, - Filters: []string{"randomFiletrs"}, - Flags: utils.FlagsWithParams{}, + ID: "file_reader1", + Type: "*file_csv", + RunDelay: -1, + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + Tenant: NewRSRParsersMustCompile("~*req.Destination1", utils.InfieldSep), + Timezone: utils.EmptyString, + Filters: []string{"randomFiletrs"}, + Flags: utils.FlagsWithParams{}, + Reconnects: 5, + MaxReconnectInterval: 3 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -228,16 +234,18 @@ func TestERSLoadFromjsonCfg(t *testing.T) { SessionSConns: []string{"conn1", "conn3"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -282,16 +290,18 @@ func TestERSLoadFromjsonCfg(t *testing.T) { }, }, { - ID: "file_reader1", - Type: utils.MetaFileCSV, - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: "file_reader1", + Type: utils.MetaFileCSV, + RunDelay: -1, + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: 5, + MaxReconnectInterval: 3 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -355,8 +365,10 @@ func TestERSLoadFromjsonCfg(t *testing.T) { "source_path": "/tmp/ers/in", "processed_path": "/tmp/ers/out", "cache_dump_fields": [], - }, - ], + "reconnects": 5, + "max_reconnect_interval": "3m" + } + ] } }` @@ -442,16 +454,18 @@ func TestERSloadFromJsonCase3(t *testing.T) { Sessions_conns: &[]string{"*conn1"}, Readers: &[]*EventReaderJsonCfg{ { - Id: utils.StringPointer("file_reader1"), - Type: utils.StringPointer(utils.MetaFileCSV), - Run_delay: utils.StringPointer("-1"), - Concurrent_requests: utils.IntPointer(1024), - Source_path: utils.StringPointer("/tmp/ers/in"), - Processed_path: utils.StringPointer("/tmp/ers/out"), - Tenant: nil, - Timezone: utils.StringPointer(""), - Filters: nil, - Flags: &[]string{}, + Id: utils.StringPointer("file_reader1"), + Type: utils.StringPointer(utils.MetaFileCSV), + Run_delay: utils.StringPointer("-1"), + Concurrent_requests: utils.IntPointer(1024), + Source_path: utils.StringPointer("/tmp/ers/in"), + Processed_path: utils.StringPointer("/tmp/ers/out"), + Tenant: nil, + Timezone: utils.StringPointer(""), + Filters: nil, + Flags: &[]string{}, + Reconnects: utils.IntPointer(5), + Max_reconnect_interval: utils.StringPointer("3m"), Fields: &[]*FcTemplateJsonCfg{ { Tag: utils.StringPointer(utils.AnswerTime), @@ -469,16 +483,18 @@ func TestERSloadFromJsonCase3(t *testing.T) { SessionSConns: []string{"*conn1"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -541,8 +557,10 @@ func TestERSloadFromJsonCase3(t *testing.T) { Layout: time.RFC3339, }, }, - CacheDumpFields: make([]*FCTemplate, 0), - PartialCommitFields: make([]*FCTemplate, 0), + CacheDumpFields: make([]*FCTemplate, 0), + PartialCommitFields: make([]*FCTemplate, 0), + Reconnects: 5, + MaxReconnectInterval: 3 * time.Minute, Opts: &EventReaderOpts{ CSV: &CSVROpts{ FieldSeparator: utils.StringPointer(utils.FieldsSep), @@ -591,17 +609,19 @@ func TestERSloadFromJsonCase4(t *testing.T) { Sessions_conns: &[]string{"*conn1"}, Readers: &[]*EventReaderJsonCfg{ { - Id: utils.StringPointer("file_reader1"), - Type: utils.StringPointer(utils.MetaFileCSV), - Run_delay: utils.StringPointer("-1"), - Concurrent_requests: utils.IntPointer(1024), - Source_path: utils.StringPointer("/tmp/ers/in"), - Processed_path: utils.StringPointer("/tmp/ers/out"), - Tenant: nil, - Timezone: utils.StringPointer(""), - Filters: nil, - Flags: &[]string{}, - Fields: &[]*FcTemplateJsonCfg{}, + Id: utils.StringPointer("file_reader1"), + Type: utils.StringPointer(utils.MetaFileCSV), + Run_delay: utils.StringPointer("-1"), + Concurrent_requests: utils.IntPointer(1024), + Source_path: utils.StringPointer("/tmp/ers/in"), + Processed_path: utils.StringPointer("/tmp/ers/out"), + Tenant: nil, + Timezone: utils.StringPointer(""), + Filters: nil, + Flags: &[]string{}, + Fields: &[]*FcTemplateJsonCfg{}, + Reconnects: utils.IntPointer(5), + Max_reconnect_interval: utils.StringPointer("3m"), Cache_dump_fields: &[]*FcTemplateJsonCfg{ { Tag: utils.StringPointer("OriginID"), @@ -618,16 +638,18 @@ func TestERSloadFromJsonCase4(t *testing.T) { SessionSConns: []string{"*conn1"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -691,7 +713,9 @@ func TestERSloadFromJsonCase4(t *testing.T) { Value: NewRSRParsersMustCompile("~*req.OrderID", utils.InfieldSep), }, }, - PartialCommitFields: make([]*FCTemplate, 0), + PartialCommitFields: make([]*FCTemplate, 0), + Reconnects: 5, + MaxReconnectInterval: 3 * time.Minute, Opts: &EventReaderOpts{ CSV: &CSVROpts{ FieldSeparator: utils.StringPointer(utils.FieldsSep), @@ -762,16 +786,18 @@ func TestEventReaderSameID(t *testing.T) { SessionSConns: []string{"conn1"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -830,8 +856,10 @@ func TestEventReaderSameID(t *testing.T) { {Tag: "CustomTag2", Path: "CustomPath2", Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("CustomValue2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, }, - CacheDumpFields: make([]*FCTemplate, 0), - PartialCommitFields: make([]*FCTemplate, 0), + CacheDumpFields: make([]*FCTemplate, 0), + PartialCommitFields: make([]*FCTemplate, 0), + Reconnects: 5, + MaxReconnectInterval: 3 * time.Minute, Opts: &EventReaderOpts{ CSV: &CSVROpts{ FieldSeparator: utils.StringPointer(utils.FieldsSep), @@ -885,6 +913,8 @@ func TestEventReaderSameID(t *testing.T) { {"tag": "CustomTag2", "path": "CustomPath2", "type": "*variable", "value": "CustomValue2", "mandatory": true}, ], "cache_dump_fields": [], + "reconnects": 5, + "max_reconnect_interval": "3m0s" }, ], } @@ -900,39 +930,43 @@ func TestEventReaderSameID(t *testing.T) { func TestERsCfgAsMapInterfaceCase1(t *testing.T) { cfgJSONStr := `{ - "ers": { - "enabled": true, - "sessions_conns":["conn1","conn3"], - "readers": [ - { - "id": "file_reader1", - "run_delay": "-1", - "tenant": "~*req.Destination1", - "type": "*file_csv", - "source_path": "/tmp/ers/in", - "processed_path": "/tmp/ers/out", - "cache_dump_fields": [], - }, - ], - } +"ers": { + "enabled": true, + "sessions_conns":["conn1","conn3"], + "readers": [ + { + "id": "file_reader1", + "run_delay": "-1", + "tenant": "~*req.Destination1", + "type": "*file_csv", + "source_path": "/tmp/ers/in", + "processed_path": "/tmp/ers/out", + "cache_dump_fields": [], + "reconnects": 5, + "max_reconnect_interval": "3m" + } + ] +} }` eMap := map[string]any{ utils.EnabledCfg: true, utils.SessionSConnsCfg: []string{"conn1", "conn3"}, utils.ReadersCfg: []map[string]any{ { - utils.FiltersCfg: []string{}, - utils.FlagsCfg: []string{}, - utils.IDCfg: "*default", - utils.ProcessedPathCfg: "/var/spool/cgrates/ers/out", - utils.RunDelayCfg: "0", - utils.SourcePathCfg: "/var/spool/cgrates/ers/in", - utils.TenantCfg: "", - utils.TimezoneCfg: "", - utils.CacheDumpFieldsCfg: []map[string]any{}, - utils.PartialCommitFieldsCfg: []map[string]any{}, - utils.ConcurrentRequestsCfg: 1024, - utils.TypeCfg: "*none", + utils.FiltersCfg: []string{}, + utils.FlagsCfg: []string{}, + utils.IDCfg: "*default", + utils.ProcessedPathCfg: "/var/spool/cgrates/ers/out", + utils.RunDelayCfg: "0", + utils.SourcePathCfg: "/var/spool/cgrates/ers/in", + utils.TenantCfg: "", + utils.TimezoneCfg: "", + utils.CacheDumpFieldsCfg: []map[string]any{}, + utils.PartialCommitFieldsCfg: []map[string]any{}, + utils.ConcurrentRequestsCfg: 1024, + utils.TypeCfg: "*none", + utils.ReconnectsCfg: -1, + utils.MaxReconnectIntervalCfg: "5m0s", utils.FieldsCfg: []map[string]any{ {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.ToR", utils.TagCfg: "ToR", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.2"}, {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.OriginID", utils.TagCfg: "OriginID", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.3"}, @@ -973,14 +1007,16 @@ func TestERsCfgAsMapInterfaceCase1(t *testing.T) { {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.AnswerTime", utils.TagCfg: "AnswerTime", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.12"}, {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.Usage", utils.TagCfg: "Usage", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.13"}, }, - utils.FiltersCfg: []string{}, - utils.FlagsCfg: []string{}, - utils.IDCfg: "file_reader1", - utils.ProcessedPathCfg: "/tmp/ers/out", - utils.RunDelayCfg: "-1", - utils.SourcePathCfg: "/tmp/ers/in", - utils.TenantCfg: "~*req.Destination1", - utils.TimezoneCfg: "", + utils.FiltersCfg: []string{}, + utils.FlagsCfg: []string{}, + utils.IDCfg: "file_reader1", + utils.ProcessedPathCfg: "/tmp/ers/out", + utils.RunDelayCfg: "-1", + utils.SourcePathCfg: "/tmp/ers/in", + utils.TenantCfg: "~*req.Destination1", + utils.TimezoneCfg: "", + utils.ReconnectsCfg: 5, + utils.MaxReconnectIntervalCfg: "3m0s", utils.OptsCfg: map[string]any{ "csvFieldSeparator": ",", "csvHeaderDefineChar": ":", @@ -1085,18 +1121,20 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) { utils.SessionSConnsCfg: []string{"conn1", "conn3"}, utils.ReadersCfg: []map[string]any{ { - utils.FiltersCfg: []string{}, - utils.FlagsCfg: []string{}, - utils.IDCfg: "*default", - utils.ProcessedPathCfg: "/var/spool/cgrates/ers/out", - utils.RunDelayCfg: "0", - utils.SourcePathCfg: "/var/spool/cgrates/ers/in", - utils.TenantCfg: "", - utils.TimezoneCfg: "", - utils.CacheDumpFieldsCfg: []map[string]any{}, - utils.PartialCommitFieldsCfg: []map[string]any{}, - utils.ConcurrentRequestsCfg: 1024, - utils.TypeCfg: "*none", + utils.FiltersCfg: []string{}, + utils.FlagsCfg: []string{}, + utils.IDCfg: "*default", + utils.ProcessedPathCfg: "/var/spool/cgrates/ers/out", + utils.RunDelayCfg: "0", + utils.SourcePathCfg: "/var/spool/cgrates/ers/in", + utils.TenantCfg: "", + utils.TimezoneCfg: "", + utils.CacheDumpFieldsCfg: []map[string]any{}, + utils.PartialCommitFieldsCfg: []map[string]any{}, + utils.ConcurrentRequestsCfg: 1024, + utils.TypeCfg: "*none", + utils.ReconnectsCfg: -1, + utils.MaxReconnectIntervalCfg: "5m0s", utils.FieldsCfg: []map[string]any{ {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.ToR", utils.TagCfg: "ToR", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.2"}, {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.OriginID", utils.TagCfg: "OriginID", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.3"}, @@ -1139,14 +1177,16 @@ func TestERSCfgAsMapInterfaceCase2(t *testing.T) { {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.AnswerTime", utils.TagCfg: "AnswerTime", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.12"}, {utils.MandatoryCfg: true, utils.PathCfg: "*cgreq.Usage", utils.TagCfg: "Usage", utils.TypeCfg: "*variable", utils.ValueCfg: "~*req.13"}, }, - utils.FiltersCfg: []string{"randomFilter"}, - utils.FlagsCfg: []string{"randomFlag"}, - utils.IDCfg: "file_reader1", - utils.ProcessedPathCfg: "/tmp/ers/out", - utils.RunDelayCfg: "10s", - utils.SourcePathCfg: "/tmp/ers/in", - utils.TenantCfg: "~*req.Destination1", - utils.TimezoneCfg: "", + utils.FiltersCfg: []string{"randomFilter"}, + utils.FlagsCfg: []string{"randomFlag"}, + utils.IDCfg: "file_reader1", + utils.ProcessedPathCfg: "/tmp/ers/out", + utils.RunDelayCfg: "10s", + utils.SourcePathCfg: "/tmp/ers/in", + utils.TenantCfg: "~*req.Destination1", + utils.TimezoneCfg: "", + utils.ReconnectsCfg: -1, + utils.MaxReconnectIntervalCfg: "5m0s", utils.OptsCfg: map[string]any{ utils.CSVLazyQuotes: false, utils.KafkaGroupID: "test", @@ -1226,16 +1266,18 @@ func TestERsloadFromJsonCfg(t *testing.T) { Sessions_conns: &[]string{"*conn1"}, Readers: &[]*EventReaderJsonCfg{ { - Id: utils.StringPointer("file_reader1"), - Type: utils.StringPointer(utils.MetaFileCSV), - Run_delay: utils.StringPointer("-1"), - Concurrent_requests: utils.IntPointer(1024), - Source_path: utils.StringPointer("/tmp/ers/in"), - Processed_path: utils.StringPointer("/tmp/ers/out"), - Tenant: nil, - Timezone: utils.StringPointer(""), - Filters: nil, - Flags: &[]string{}, + Id: utils.StringPointer("file_reader1"), + Type: utils.StringPointer(utils.MetaFileCSV), + Run_delay: utils.StringPointer("-1"), + Concurrent_requests: utils.IntPointer(1024), + Source_path: utils.StringPointer("/tmp/ers/in"), + Processed_path: utils.StringPointer("/tmp/ers/out"), + Tenant: nil, + Timezone: utils.StringPointer(""), + Filters: nil, + Flags: &[]string{}, + Reconnects: utils.IntPointer(-1), + Max_reconnect_interval: utils.StringPointer("5m"), Fields: &[]*FcTemplateJsonCfg{ { Tag: utils.StringPointer(utils.CGRID), @@ -1254,16 +1296,18 @@ func TestERsloadFromJsonCfg(t *testing.T) { SessionSConns: []string{"*conn1"}, Readers: []*EventReaderCfg{ { - ID: utils.MetaDefault, - Type: utils.MetaNone, - RunDelay: 0, - ConcurrentReqs: 1024, - SourcePath: "/var/spool/cgrates/ers/in", - ProcessedPath: "/var/spool/cgrates/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: utils.MetaDefault, + Type: utils.MetaNone, + RunDelay: 0, + ConcurrentReqs: 1024, + SourcePath: "/var/spool/cgrates/ers/in", + ProcessedPath: "/var/spool/cgrates/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ {Tag: utils.ToR, Path: utils.MetaCgreq + utils.NestingSep + utils.ToR, Type: utils.MetaVariable, Value: NewRSRParsersMustCompile("~*req.2", utils.InfieldSep), Mandatory: true, Layout: time.RFC3339}, @@ -1308,16 +1352,18 @@ func TestERsloadFromJsonCfg(t *testing.T) { }, }, { - ID: "file_reader1", - Type: utils.MetaFileCSV, - RunDelay: -1, - ConcurrentReqs: 1024, - SourcePath: "/tmp/ers/in", - ProcessedPath: "/tmp/ers/out", - Tenant: nil, - Timezone: utils.EmptyString, - Filters: []string{}, - Flags: utils.FlagsWithParams{}, + ID: "file_reader1", + Type: utils.MetaFileCSV, + RunDelay: -1, + ConcurrentReqs: 1024, + SourcePath: "/tmp/ers/in", + ProcessedPath: "/tmp/ers/out", + Tenant: nil, + Timezone: utils.EmptyString, + Filters: []string{}, + Flags: utils.FlagsWithParams{}, + Reconnects: -1, + MaxReconnectInterval: 5 * time.Minute, Fields: []*FCTemplate{ { Tag: utils.CGRID, @@ -1619,17 +1665,19 @@ func TestEventReaderCfgAsMapInterface(t *testing.T) { utils.AMQPPasswordProcessedCfg: str, } exp := map[string]any{ - utils.IDCfg: er.ID, - utils.TypeCfg: er.Type, - utils.ConcurrentRequestsCfg: er.ConcurrentReqs, - utils.SourcePathCfg: er.SourcePath, - utils.ProcessedPathCfg: er.ProcessedPath, - utils.TenantCfg: er.Tenant.GetRule(""), - utils.TimezoneCfg: er.Timezone, - utils.FiltersCfg: er.Filters, - utils.FlagsCfg: []string{}, - utils.RunDelayCfg: "0", - utils.OptsCfg: opts, + utils.IDCfg: er.ID, + utils.TypeCfg: er.Type, + utils.ConcurrentRequestsCfg: er.ConcurrentReqs, + utils.SourcePathCfg: er.SourcePath, + utils.ProcessedPathCfg: er.ProcessedPath, + utils.TenantCfg: er.Tenant.GetRule(""), + utils.TimezoneCfg: er.Timezone, + utils.FiltersCfg: er.Filters, + utils.FlagsCfg: []string{}, + utils.RunDelayCfg: "0", + utils.ReconnectsCfg: 0, + utils.MaxReconnectIntervalCfg: "0", + utils.OptsCfg: opts, } rcv := er.AsMapInterface("") diff --git a/ers/amqp.go b/ers/amqp.go index 610009c60b..195d340a75 100644 --- a/ers/amqp.go +++ b/ers/amqp.go @@ -20,9 +20,12 @@ package ers import ( "encoding/json" + "errors" "fmt" + "sync" "time" + "github.com/cgrates/birpc/context" "github.com/cgrates/cgrates/agents" "github.com/cgrates/cgrates/config" "github.com/cgrates/cgrates/ees" @@ -31,55 +34,56 @@ import ( amqp "github.com/rabbitmq/amqp091-go" ) -// NewAMQPER return a new amqp event reader -func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, - rdrEvents, partialEvents chan *erEvent, rdrErr chan error, - fltrS *engine.FilterS, rdrExit chan struct{}) (er EventReader, err error) { - rdr := &AMQPER{ - cgrCfg: cfg, - cfgIdx: cfgIdx, - fltrS: fltrS, - rdrEvents: rdrEvents, - partialEvents: partialEvents, - rdrExit: rdrExit, - rdrErr: rdrErr, - } - if concReq := rdr.Config().ConcurrentReqs; concReq != -1 { - rdr.cap = make(chan struct{}, concReq) - for i := 0; i < concReq; i++ { - rdr.cap <- struct{}{} - } - } - rdr.dialURL = rdr.Config().SourcePath - rdr.setOpts(rdr.Config().Opts) - rdr.createPoster() - return rdr, nil -} - -// AMQPER implements EventReader interface for amqp message +// AMQPER implements EventReader interface for AMQP messaging. type AMQPER struct { - // sync.RWMutex cgrCfg *config.CGRConfig - cfgIdx int // index of config instance within ERsCfg.Readers + cfgIdx int // index of current config instance within ERsCfg.Readers fltrS *engine.FilterS - dialURL string - queueID string - tag string + eventChan chan *erEvent // channel to dispatch the events created + partialEvChan chan *erEvent // channel to dispatch the partial events created + errChan chan error + + poster *ees.AMQPee + client *amqpClient // AMQP client for managing connections and subscriptions. +} + +type amqpClient struct { exchange string exchangeType string routingKey string + queueID string + consumer string - rdrEvents chan *erEvent // channel to dispatch the events created to - partialEvents chan *erEvent // channel to dispatch the partial events created to - rdrExit chan struct{} - rdrErr chan error - cap chan struct{} + connection *amqp.Connection + channel *amqp.Channel + available bool // indicates if the AMQP channel has been established and is ready for use. + done chan struct{} // done signals the shutdown of the AMQP connection. + mu sync.RWMutex + notifyConnClose chan *amqp.Error + notifyChanClose chan *amqp.Error - conn *amqp.Connection - channel *amqp.Channel + // prefetchCount defines the maximum number of messages that the server will + // deliver to the consumer without waiting for acknowledgements. It's used to + // control the QoS settings for the AMQP channel. + prefetchCount int +} - poster *ees.AMQPee +// NewAMQPER returns a new AMQP EventReader with the provided configurations. +func NewAMQPER(cfg *config.CGRConfig, cfgIdx int, + eventChan, partialEvChan chan *erEvent, errChan chan error, + fltrS *engine.FilterS, exitChan chan struct{}) (EventReader, error) { + rdr := &AMQPER{ + cgrCfg: cfg, + cfgIdx: cfgIdx, + fltrS: fltrS, + eventChan: eventChan, + partialEvChan: partialEvChan, + errChan: errChan, + } + rdr.createClient(rdr.Config().Opts.AMQP, exitChan) + rdr.createPoster() + return rdr, nil } // Config returns the curent configuration @@ -87,202 +91,433 @@ func (rdr *AMQPER) Config() *config.EventReaderCfg { return rdr.cgrCfg.ERsCfg().Readers[rdr.cfgIdx] } -// Serve will start the gorutines needed to watch the amqp topic -func (rdr *AMQPER) Serve() (err error) { - if rdr.conn, err = amqp.Dial(rdr.dialURL); err != nil { - return +// createClient initializes the AMQP client with the necessary configurations. +func (rdr *AMQPER) createClient(opts *config.AMQPROpts, exitChan chan struct{}) { + rdrCfg := rdr.Config() + rdr.client = &amqpClient{ + prefetchCount: rdrCfg.ConcurrentReqs, + done: exitChan, } - if rdr.channel, err = rdr.conn.Channel(); err != nil { - rdr.close() - return + if opts != nil { + if opts.QueueID != nil { + rdr.client.queueID = *opts.QueueID + } + if opts.ConsumerTag != nil { + rdr.client.consumer = *opts.ConsumerTag + } + if opts.RoutingKey != nil { + rdr.client.routingKey = *opts.RoutingKey + } + if opts.Exchange != nil { + rdr.client.exchange = *opts.Exchange + } + if opts.ExchangeType != nil { + rdr.client.exchangeType = *opts.ExchangeType + } } - if rdr.Config().RunDelay == time.Duration(0) { // 0 disables the automatic read, maybe done per API + go rdr.client.handleReconnect(rdrCfg.SourcePath, rdrCfg.ID, + rdrCfg.Reconnects, rdrCfg.MaxReconnectInterval) +} + +func (rdr *AMQPER) createPoster() { + processedOpt := getProcessedOptions(rdr.Config().Opts) + if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { return } + eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), + rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) + rdr.poster = ees.NewAMQPee(eeCfg, nil) +} + +func (rdr *AMQPER) processMessage(msg []byte) error { + var decodedMessage map[string]any + err := json.Unmarshal(msg, &decodedMessage) + if err != nil { + return err + } - if rdr.exchange != "" { - if err = rdr.channel.ExchangeDeclare( - rdr.exchange, // name - rdr.exchangeType, // type - true, // durable - false, // audo-delete - false, // internal - false, // no-wait - nil, // args - ); err != nil { - return - } + agReq := agents.NewAgentRequest( + utils.MapStorage(decodedMessage), nil, + nil, nil, nil, rdr.Config().Tenant, + rdr.cgrCfg.GeneralCfg().DefaultTenant, + utils.FirstNonEmpty(rdr.Config().Timezone, + rdr.cgrCfg.GeneralCfg().DefaultTimezone), + rdr.fltrS, nil) // create an AgentRequest + pass, err := rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, + agReq) + if err != nil || !pass { + return err + } + if err = agReq.SetFields(rdr.Config().Fields); err != nil { + return err + } + cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) + rdrEv := rdr.eventChan + if _, isPartial := cgrEv.APIOpts[utils.PartialOpt]; isPartial { + rdrEv = rdr.partialEvChan + } + rdrEv <- &erEvent{ + cgrEvent: cgrEv, + rdrCfg: rdr.Config(), } + return nil +} - if _, err = rdr.channel.QueueDeclare( - rdr.queueID, // name - true, // durable - false, // auto-delete - false, // exclusive - false, // no-wait - nil, // args - ); err != nil { - return +// Serve starts the goroutine needed to monitor and process delieveries coming from the AMQP queue. +func (rdr *AMQPER) Serve() error { + rdrCfg := rdr.Config() + if rdrCfg.RunDelay == time.Duration(0) { + return nil } - if rdr.exchange != "" { - if err = rdr.channel.QueueBind( - rdr.queueID, // queue - rdr.routingKey, // key - rdr.exchange, // exchange - false, // no-wait - nil, // args - ); err != nil { - return + // Wait for the client to be ready + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + + fib := utils.FibDuration(time.Millisecond, 0) + for !rdr.client.isAvailable() { + select { + case <-ctx.Done(): + return fmt.Errorf("client not ready to start consuming, error: %w", ctx.Err()) + default: + time.Sleep(fib()) } } - var msgChan <-chan amqp.Delivery - if msgChan, err = rdr.channel.Consume(rdr.queueID, rdr.tag, - false, false, false, true, nil); err != nil { - return + // Start consuming messages from the queue. + deliveries, err := rdr.client.Consume() + if err != nil { + return fmt.Errorf("could not start consuming: %w", err) } - go rdr.readLoop(msgChan) // read until the connection is closed - return + + // Log the setup complete message here + utils.Logger.Info(fmt.Sprintf( + "<%s> Reader '%s' - AMQP channel setup complete", + utils.ERs, rdr.Config().ID)) + + // This channel will receive a notification when a channel closed event + // happens. This must be different from Client.notifyChanClose because the + // library sends only one notification and Client.notifyChanClose already has + // a receiver in handleReconnect(). + chClosedCh := make(chan *amqp.Error, 1) // Buffered to avoid deadlocks. + rdr.client.channel.NotifyClose(chClosedCh) + + go rdr.monitorAndProcess(deliveries, chClosedCh) + + return nil } -func (rdr *AMQPER) readLoop(msgChan <-chan amqp.Delivery) { +// monitorAndProcess manages the message processing loop for AMQP events. +// It handles reconnection logic in case the AMQP channel closes unexpectedly. +func (rdr *AMQPER) monitorAndProcess(deliveries <-chan amqp.Delivery, chClosedCh chan *amqp.Error) { + + // Initialize a Fibonacci backoff strategy to progressively wait longer + // between reconnection attempts, avoiding unnecessary load. + fib := utils.FibDuration(time.Second, rdr.Config().MaxReconnectInterval) + for { - if rdr.Config().ConcurrentReqs != -1 { - <-rdr.cap // do not try to read if the limit is reached - } select { - case <-rdr.rdrExit: - utils.Logger.Info( - fmt.Sprintf("<%s> stop monitoring amqp path <%s>", - utils.ERs, rdr.dialURL)) + case <-rdr.client.done: rdr.close() return - case msg, ok := <-msgChan: - if !ok { - utils.Logger.Warning( - fmt.Sprintf("<%s> lost connection to AMQP server at %s, closing reader...", - utils.ERs, rdr.dialURL)) - rdr.close() - return + case amqErr := <-chClosedCh: + + // This case handles the event of closed channel (e.g. abnormal shutdown). The if + // condition is there to make sure it is logged only the first time. + if amqErr != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader '%s', AMQP Channel closed due to error: %v", + utils.ERs, rdr.Config().ID, amqErr)) } - go func(msg amqp.Delivery) { - err := rdr.processMessage(msg.Body) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> processing message %s error: %s", - utils.ERs, msg.MessageId, err.Error())) - - err = msg.Reject(true) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error negatively acknowledging message %s: %s", - utils.ERs, msg.MessageId, err.Error())) - } + + // Attempt to re-establish the delivery channel to continue receiving messages. + var err error + deliveries, err = rdr.client.Consume() + if err != nil { + + // If the AMQP channel is not ready, it will continue the loop. Next + // iteration will enter this case because chClosedCh is closed by the + // library. + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, failed to init deliveries channel, will retry. Error: %v", + utils.ERs, rdr.Config().ID, err)) + + // Wait for either the backoff duration or a done signal, making sure + // to gracefully shutdown when the client is done. + select { + case <-rdr.client.done: + rdr.close() return + case <-time.After(fib()): } + continue + } - // Post the message if poster is available. - if rdr.poster != nil { - err = ees.ExportWithAttempts(rdr.poster, msg.Body, utils.EmptyString) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> writing message %s error: %s", - utils.ERs, msg.MessageId, err.Error())) + // Successfully reconnected; reset the Fibonacci backoff counter. + fib = utils.FibDuration(time.Second, rdr.Config().MaxReconnectInterval) - } - } + // Re-set channel to receive notifications. + // The library closes this channel after abnormal shutdown. + chClosedCh = make(chan *amqp.Error, 1) + rdr.client.channel.NotifyClose(chClosedCh) - err = msg.Ack(false) - if err != nil { - utils.Logger.Warning( - fmt.Sprintf("<%s> error acknowledging message %s: %s", - utils.ERs, msg.MessageId, err.Error())) - } - if rdr.Config().ConcurrentReqs != -1 { - rdr.cap <- struct{}{} - } - }(msg) + case delivery, ok := <-deliveries: + if !ok { + + // If the deliveries channel is closed, reset it to nil to stop processing + // until it is reinitialized. + deliveries = nil + continue + } + go rdr.handleDelivery(delivery) } } } -func (rdr *AMQPER) processMessage(msg []byte) (err error) { - var decodedMessage map[string]any - if err = json.Unmarshal(msg, &decodedMessage); err != nil { - return - } - agReq := agents.NewAgentRequest( - utils.MapStorage(decodedMessage), nil, - nil, nil, nil, rdr.Config().Tenant, - rdr.cgrCfg.GeneralCfg().DefaultTenant, - utils.FirstNonEmpty(rdr.Config().Timezone, - rdr.cgrCfg.GeneralCfg().DefaultTimezone), - rdr.fltrS, nil) // create an AgentRequest - var pass bool - if pass, err = rdr.fltrS.Pass(agReq.Tenant, rdr.Config().Filters, - agReq); err != nil || !pass { +// handleDelivery processes a single message delivery. +func (rdr *AMQPER) handleDelivery(dlv amqp.Delivery) { + err := rdr.processMessage(dlv.Body) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, processing message %s error: %v", + utils.ERs, rdr.Config().ID, dlv.MessageId, err)) + + err = dlv.Reject(false) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, error negatively acknowledging message %s: %v", + utils.ERs, rdr.Config().ID, dlv.MessageId, err)) + } return } - if err = agReq.SetFields(rdr.Config().Fields); err != nil { - return + + if rdr.poster != nil { + err = ees.ExportWithAttempts(rdr.poster, dlv.Body, utils.EmptyString) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, writing message %s error: %v", + utils.ERs, rdr.Config().ID, dlv.MessageId, err)) + + } } - cgrEv := utils.NMAsCGREvent(agReq.CGRRequest, agReq.Tenant, utils.NestingSep, agReq.Opts) - rdrEv := rdr.rdrEvents - if _, isPartial := cgrEv.APIOpts[utils.PartialOpt]; isPartial { - rdrEv = rdr.partialEvents + err = dlv.Ack(false) + if err != nil { + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, error acknowledging message %s: %v", + utils.ERs, rdr.Config().ID, dlv.MessageId, err)) } - rdrEv <- &erEvent{ - cgrEvent: cgrEv, - rdrCfg: rdr.Config(), +} + +func (rdr *AMQPER) close() error { + utils.Logger.Info(fmt.Sprintf( + "<%s> Reader %s, stop monitoring amqp queue <%s>", utils.ERs, rdr.Config().ID, rdr.Config().SourcePath)) + if rdr.poster != nil { + rdr.poster.Close() } - return + return rdr.client.Close() } -func (rdr *AMQPER) setOpts(opts *config.EventReaderOpts) { - rdr.queueID = utils.DefaultQueueID - if amqpOpts := opts.AMQP; amqpOpts != nil { - if opts.AMQP.QueueID != nil { - rdr.queueID = *opts.AMQP.QueueID - } - rdr.tag = utils.AMQPDefaultConsumerTag - if opts.AMQP.ConsumerTag != nil { - rdr.tag = *opts.AMQP.ConsumerTag +// Close will cleanly shut down the channel and connection. +func (client *amqpClient) Close() error { + if !client.isAvailable() { + return errors.New("already closed: not connected to the server") + } + + var err error + if client.channel != nil { + if err = client.channel.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) { + return fmt.Errorf("failed to close AMQP client channel: %w", err) } - if opts.AMQP.RoutingKey != nil { - rdr.routingKey = *opts.AMQP.RoutingKey + } + if client.connection != nil { + if err = client.connection.Close(); err != nil && !errors.Is(err, amqp.ErrClosed) { + return fmt.Errorf("failed to close AMQP client connection: %w", err) } - if opts.AMQP.Exchange != nil { - rdr.exchange = *opts.AMQP.Exchange - rdr.exchangeType = utils.DefaultExchangeType + } + client.mu.Lock() + client.available = false + client.mu.Unlock() + return nil +} + +// handleReconnect will wait for a connection error on +// notifyConnClose, and then continuously attempt to reconnect. +func (client *amqpClient) handleReconnect(addr, readerID string, maxRetries int, + maxReconnectInterval time.Duration) { + + fib := utils.FibDuration(time.Second, maxReconnectInterval) + retryCount := 0 + + for retryCount < maxRetries || maxRetries == -1 { // if maxRetries is -1, retry indefinitely + client.mu.Lock() + client.available = false + client.mu.Unlock() + + // Establish an AMQP connection. + conn, err := amqp.Dial(addr) + if err != nil { + reconnectDelay := fib() + retryCount++ + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, failed to connect to AMQP server '%s', will retry. Error: %v", + utils.ERs, readerID, addr, err)) + + select { + case <-client.done: + return + case <-time.After(reconnectDelay): + } + continue } - if opts.AMQP.ExchangeType != nil { - rdr.exchangeType = *opts.AMQP.ExchangeType + + // Take a new connection to the queue, and update + // the close listener to reflect this. + client.connection = conn + client.notifyConnClose = make(chan *amqp.Error, 1) + client.connection.NotifyClose(client.notifyConnClose) + + // Reset the fibonacci sequence and retry count after a successful connection. + fib = utils.FibDuration(time.Second, maxReconnectInterval) + retryCount = 0 + + if done := client.handleReInit(conn, readerID, maxRetries, maxReconnectInterval); done { + break } } } -func (rdr *AMQPER) close() (err error) { - if rdr.poster != nil { - rdr.poster.Close() - } - if rdr.channel != nil { - if err = rdr.channel.Cancel(rdr.tag, true); err != nil { - return +// handleReconnect will wait for a channel error +// and then continuously attempt to re-initialize both channels +func (client *amqpClient) handleReInit(conn *amqp.Connection, readerID string, maxRetries int, maxReInitInterval time.Duration) bool { + fib := utils.FibDuration(time.Second, maxReInitInterval) + retryCount := 0 + for retryCount < maxRetries || maxRetries == -1 { // if maxRetries is -1, retry indefinitely + client.mu.Lock() + client.available = false + client.mu.Unlock() + err := client.init(conn) + if err != nil { + reInitDelay := fib() + retryCount++ + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, channel init failed, will retry. Error: %v", + utils.ERs, readerID, err)) + + select { + case <-client.done: + return true + case <-client.notifyConnClose: + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, connection closed, will attempt reconnect. Error: %v", + utils.ERs, readerID, err)) + return false + case <-time.After(reInitDelay): + } + continue } - if err = rdr.channel.Close(); err != nil { - return + + // Reset the fibonacci sequence and retry count after a successful init. + fib = utils.FibDuration(time.Second, maxReInitInterval) + retryCount = 0 + + select { + case <-client.done: + return true + case err := <-client.notifyConnClose: + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, connection closed, will atempt to reconnect. Error: %v", + utils.ERs, readerID, err)) + return false + case err := <-client.notifyChanClose: + utils.Logger.Warning(fmt.Sprintf( + "<%s> Reader %s, channel closed, will attempt re-init. Error: %v", + utils.ERs, readerID, err)) } } - return rdr.conn.Close() + return true } -func (rdr *AMQPER) createPoster() { - processedOpt := getProcessedOptions(rdr.Config().Opts) - if processedOpt == nil && len(rdr.Config().ProcessedPath) == 0 { - return +// init will initialize channel & declare queue +func (client *amqpClient) init(conn *amqp.Connection) error { + ch, err := conn.Channel() + if err != nil { + return fmt.Errorf("failed to open a server channel: %w", err) } - eeCfg := config.NewEventExporterCfg(rdr.Config().ID, "", utils.FirstNonEmpty(rdr.Config().ProcessedPath, rdr.Config().SourcePath), - rdr.cgrCfg.GeneralCfg().FailedPostsDir, rdr.cgrCfg.GeneralCfg().PosterAttempts, processedOpt) - rdr.poster = ees.NewAMQPee(eeCfg, nil) + + err = ch.ExchangeDeclare( + client.exchange, // name + client.exchangeType, // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare exchange: %w", err) + } + + _, err = ch.QueueDeclare( + client.queueID, + true, + false, + false, + false, + nil, + ) + if err != nil { + return fmt.Errorf("failed to declare queue: %w", err) + } + + err = ch.QueueBind( + client.queueID, // queue name + client.routingKey, // routing key + client.exchange, // exchange + false, + nil, + ) + if err != nil { + return fmt.Errorf("failed to bind the queue to the exchange: %w", err) + } + + // Take a new channel to the queue, and update + // the channel listener to reflect this. + client.channel = ch + client.notifyChanClose = make(chan *amqp.Error, 1) + client.channel.NotifyClose(client.notifyChanClose) + + client.mu.Lock() + client.available = true + client.mu.Unlock() + return nil +} + +// Consume will continuously put queue items on the channel. +func (client *amqpClient) Consume() (<-chan amqp.Delivery, error) { + if !client.isAvailable() { + return nil, utils.ErrDisconnected + } + if err := client.channel.Qos( + client.prefetchCount, + 0, + false, + ); err != nil { + return nil, err + } + + return client.channel.Consume( + client.queueID, + client.consumer, + false, + false, + false, + false, + nil, + ) +} + +func (client *amqpClient) isAvailable() bool { + client.mu.RLock() + defer client.mu.RUnlock() + return client.available } diff --git a/ers/amqp_it_test.go b/ers/amqp_it_test.go index 2cb21ce311..441b8bd296 100644 --- a/ers/amqp_it_test.go +++ b/ers/amqp_it_test.go @@ -136,17 +136,3 @@ func TestAMQPER(t *testing.T) { } close(rdrExit) } - -func TestAMQPERServeError(t *testing.T) { - cfg := config.NewDefaultCGRConfig() - cfgIdx := 0 - expected := "AMQP scheme must be either 'amqp://' or 'amqps://'" - rdr, err := NewAMQPER(cfg, cfgIdx, nil, nil, nil, nil, nil) - if err != nil { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", nil, err) - } - err2 := rdr.Serve() - if err2 == nil || err2.Error() != expected { - t.Errorf("\nExpected <%+v>, \nReceived <%+v>", expected, err2) - } -} diff --git a/ers/amqp_test.go b/ers/amqp_test.go deleted file mode 100644 index f6b6690571..0000000000 --- a/ers/amqp_test.go +++ /dev/null @@ -1,66 +0,0 @@ -/* -Real-time Online/Offline Charging System (OCS) for Telecom & ISP environments -Copyright (C) ITsysCOM GmbH - -This program is free software: you can redistribute it and/or modify -it under the terms of the GNU General Public License as published by -the Free Software Foundation, either version 3 of the License, or -(at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. - -You should have received a copy of the GNU General Public License -along with this program. If not, see -*/ - -package ers - -import ( - "testing" - - "github.com/cgrates/cgrates/config" - "github.com/cgrates/cgrates/utils" -) - -func TestAMQPSetOpts(t *testing.T) { - k := new(AMQPER) - k.dialURL = "amqp://localhost:2013" - expKafka := &AMQPER{ - dialURL: "amqp://localhost:2013", - queueID: "cdrs", - tag: "new", - } - if k.setOpts(&config.EventReaderOpts{ - AMQP: &config.AMQPROpts{ - QueueID: utils.StringPointer("cdrs"), - ConsumerTag: utils.StringPointer("new")}, - }); expKafka.dialURL != k.dialURL { - t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) - } else if expKafka.queueID != k.queueID { - t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) - } else if expKafka.tag != k.tag { - t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag) - } - k = new(AMQPER) - k.dialURL = "amqp://localhost:2013" - expKafka = &AMQPER{ - dialURL: "amqp://localhost:2013", - queueID: "cgrates_cdrs", - tag: "cgrates", - } - if k.setOpts(&config.EventReaderOpts{ - AMQP: &config.AMQPROpts{ - QueueID: utils.StringPointer("cgrates_cdrs"), - ConsumerTag: utils.StringPointer("cgrates"), - }, - }); expKafka.dialURL != k.dialURL { - t.Errorf("Expected: %s ,received: %s", expKafka.dialURL, k.dialURL) - } else if expKafka.queueID != k.queueID { - t.Errorf("Expected: %s ,received: %s", expKafka.queueID, k.queueID) - } else if expKafka.tag != k.tag { - t.Errorf("Expected: %s ,received: %s", expKafka.tag, k.tag) - } -} diff --git a/ers/readers_test.go b/ers/readers_test.go index dfb274beb7..7ff8c17d2a 100644 --- a/ers/readers_test.go +++ b/ers/readers_test.go @@ -209,30 +209,18 @@ func TestNewAMQPReader(t *testing.T) { cfg.ERsCfg().Readers[0].Type = utils.MetaAMQPjsonMap cfg.ERsCfg().Readers[0].ConcurrentReqs = -1 exp := &AMQPER{ - cgrCfg: cfg, - cfgIdx: 0, - fltrS: fltr, - rdrEvents: nil, - rdrExit: nil, - rdrErr: nil, + cgrCfg: cfg, + fltrS: fltr, } - exp.dialURL = exp.Config().SourcePath exp.Config().ProcessedPath = "" - exp.setOpts(&config.EventReaderOpts{ - CSV: &config.CSVROpts{}, - AMQP: &config.AMQPROpts{}, - SQL: &config.SQLROpts{}, - AWS: &config.AWSROpts{}, - NATS: &config.NATSROpts{}, - Kafka: &config.KafkaROpts{}, - }) + exp.createClient(&config.AMQPROpts{}, nil) exp.createPoster() - var expected EventReader = exp + // var expected EventReader = exp rcv, err := NewEventReader(cfg, 0, nil, nil, nil, fltr, nil) if err != nil { t.Error(err) - } else if !reflect.DeepEqual(expected, rcv) { - t.Errorf("Expected %v but received %v", utils.ToJSON(expected), utils.ToJSON(rcv)) + } else if !reflect.DeepEqual(exp, rcv) { + t.Errorf("Expected %v but received %v", utils.ToJSON(exp), utils.ToJSON(rcv)) } } diff --git a/general_tests/all_cfg_sect_rld_it_test.go b/general_tests/all_cfg_sect_rld_it_test.go index 1ba61ed194..1a4b1ce2f0 100644 --- a/general_tests/all_cfg_sect_rld_it_test.go +++ b/general_tests/all_cfg_sect_rld_it_test.go @@ -499,13 +499,13 @@ func testSectConfigSReloadERS(t *testing.T) { var reply string if err := testSectRPC.Call(context.Background(), utils.ConfigSv1SetConfigFromJSON, &config.SetConfigFromJSONArgs{ Tenant: "cgrates.org", - Config: "{\"ers\":{\"enabled\":true,\"partial_cache_ttl\":\"1s\",\"readers\":[{\"cache_dump_fields\":[],\"concurrent_requests\":1024,\"fields\":[{\"mandatory\":true,\"path\":\"*cgreq.ToR\",\"tag\":\"ToR\",\"type\":\"*variable\",\"value\":\"~*req.2\"},{\"mandatory\":true,\"path\":\"*cgreq.OriginID\",\"tag\":\"OriginID\",\"type\":\"*variable\",\"value\":\"~*req.3\"},{\"mandatory\":true,\"path\":\"*cgreq.RequestType\",\"tag\":\"RequestType\",\"type\":\"*variable\",\"value\":\"~*req.4\"},{\"mandatory\":true,\"path\":\"*cgreq.Tenant\",\"tag\":\"Tenant\",\"type\":\"*variable\",\"value\":\"~*req.6\"},{\"mandatory\":true,\"path\":\"*cgreq.Category\",\"tag\":\"Category\",\"type\":\"*variable\",\"value\":\"~*req.7\"},{\"mandatory\":true,\"path\":\"*cgreq.Account\",\"tag\":\"Account\",\"type\":\"*variable\",\"value\":\"~*req.8\"},{\"mandatory\":true,\"path\":\"*cgreq.Subject\",\"tag\":\"Subject\",\"type\":\"*variable\",\"value\":\"~*req.9\"},{\"mandatory\":true,\"path\":\"*cgreq.Destination\",\"tag\":\"Destination\",\"type\":\"*variable\",\"value\":\"~*req.10\"},{\"mandatory\":true,\"path\":\"*cgreq.SetupTime\",\"tag\":\"SetupTime\",\"type\":\"*variable\",\"value\":\"~*req.11\"},{\"mandatory\":true,\"path\":\"*cgreq.AnswerTime\",\"tag\":\"AnswerTime\",\"type\":\"*variable\",\"value\":\"~*req.12\"},{\"mandatory\":true,\"path\":\"*cgreq.Usage\",\"tag\":\"Usage\",\"type\":\"*variable\",\"value\":\"~*req.13\"}],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{\"csvFieldSeparator\":\",\",\"csvHeaderDefineChar\":\":\",\"csvRowLength\":0,\"natsSubject\":\"cgrates_cdrs\",\"partialCacheAction\":\"*none\",\"partialOrderField\":\"~*req.AnswerTime\",\"xmlRootPath\":\"\"},\"partial_commit_fields\":[],\"processed_path\":\"/var/spool/cgrates/ers/out\",\"run_delay\":\"0\",\"source_path\":\"/var/spool/cgrates/ers/in\",\"tenant\":\"\",\"timezone\":\"\",\"type\":\"*none\"}],\"sessions_conns\":[\"*internal\"]}}", + Config: `{"ers":{"enabled":true,"partial_cache_ttl":"1s","readers":[{"cache_dump_fields":[],"concurrent_requests":1024,"fields":[{"mandatory":true,"path":"*cgreq.ToR","tag":"ToR","type":"*variable","value":"~*req.2"},{"mandatory":true,"path":"*cgreq.OriginID","tag":"OriginID","type":"*variable","value":"~*req.3"},{"mandatory":true,"path":"*cgreq.RequestType","tag":"RequestType","type":"*variable","value":"~*req.4"},{"mandatory":true,"path":"*cgreq.Tenant","tag":"Tenant","type":"*variable","value":"~*req.6"},{"mandatory":true,"path":"*cgreq.Category","tag":"Category","type":"*variable","value":"~*req.7"},{"mandatory":true,"path":"*cgreq.Account","tag":"Account","type":"*variable","value":"~*req.8"},{"mandatory":true,"path":"*cgreq.Subject","tag":"Subject","type":"*variable","value":"~*req.9"},{"mandatory":true,"path":"*cgreq.Destination","tag":"Destination","type":"*variable","value":"~*req.10"},{"mandatory":true,"path":"*cgreq.SetupTime","tag":"SetupTime","type":"*variable","value":"~*req.11"},{"mandatory":true,"path":"*cgreq.AnswerTime","tag":"AnswerTime","type":"*variable","value":"~*req.12"},{"mandatory":true,"path":"*cgreq.Usage","tag":"Usage","type":"*variable","value":"~*req.13"}],"filters":[],"flags":[],"id":"*default","opts":{"csvFieldSeparator":",","csvHeaderDefineChar":":","csvRowLength":0,"natsSubject":"cgrates_cdrs","partialCacheAction":"*none","partialOrderField":"~*req.AnswerTime","xmlRootPath":""},"partial_commit_fields":[],"processed_path":"/var/spool/cgrates/ers/out","run_delay":"0","source_path":"/var/spool/cgrates/ers/in","tenant":"","timezone":"","type":"*none"}],"sessions_conns":["*internal"]}}`, }, &reply); err != nil { t.Error(err) } else if reply != utils.OK { t.Errorf("Expected OK received: %+v", reply) } - cfgStr := "{\"ers\":{\"enabled\":true,\"partial_cache_ttl\":\"1s\",\"readers\":[{\"cache_dump_fields\":[],\"concurrent_requests\":1024,\"fields\":[{\"mandatory\":true,\"path\":\"*cgreq.ToR\",\"tag\":\"ToR\",\"type\":\"*variable\",\"value\":\"~*req.2\"},{\"mandatory\":true,\"path\":\"*cgreq.OriginID\",\"tag\":\"OriginID\",\"type\":\"*variable\",\"value\":\"~*req.3\"},{\"mandatory\":true,\"path\":\"*cgreq.RequestType\",\"tag\":\"RequestType\",\"type\":\"*variable\",\"value\":\"~*req.4\"},{\"mandatory\":true,\"path\":\"*cgreq.Tenant\",\"tag\":\"Tenant\",\"type\":\"*variable\",\"value\":\"~*req.6\"},{\"mandatory\":true,\"path\":\"*cgreq.Category\",\"tag\":\"Category\",\"type\":\"*variable\",\"value\":\"~*req.7\"},{\"mandatory\":true,\"path\":\"*cgreq.Account\",\"tag\":\"Account\",\"type\":\"*variable\",\"value\":\"~*req.8\"},{\"mandatory\":true,\"path\":\"*cgreq.Subject\",\"tag\":\"Subject\",\"type\":\"*variable\",\"value\":\"~*req.9\"},{\"mandatory\":true,\"path\":\"*cgreq.Destination\",\"tag\":\"Destination\",\"type\":\"*variable\",\"value\":\"~*req.10\"},{\"mandatory\":true,\"path\":\"*cgreq.SetupTime\",\"tag\":\"SetupTime\",\"type\":\"*variable\",\"value\":\"~*req.11\"},{\"mandatory\":true,\"path\":\"*cgreq.AnswerTime\",\"tag\":\"AnswerTime\",\"type\":\"*variable\",\"value\":\"~*req.12\"},{\"mandatory\":true,\"path\":\"*cgreq.Usage\",\"tag\":\"Usage\",\"type\":\"*variable\",\"value\":\"~*req.13\"}],\"filters\":[],\"flags\":[],\"id\":\"*default\",\"opts\":{\"csvFieldSeparator\":\",\",\"csvHeaderDefineChar\":\":\",\"csvRowLength\":0,\"natsSubject\":\"cgrates_cdrs\",\"partialCacheAction\":\"*none\",\"partialOrderField\":\"~*req.AnswerTime\",\"xmlRootPath\":\"\"},\"partial_commit_fields\":[],\"processed_path\":\"/var/spool/cgrates/ers/out\",\"run_delay\":\"0\",\"source_path\":\"/var/spool/cgrates/ers/in\",\"tenant\":\"\",\"timezone\":\"\",\"type\":\"*none\"}],\"sessions_conns\":[\"*internal\"]}}" + cfgStr := `{"ers":{"enabled":true,"partial_cache_ttl":"1s","readers":[{"cache_dump_fields":[],"concurrent_requests":1024,"fields":[{"mandatory":true,"path":"*cgreq.ToR","tag":"ToR","type":"*variable","value":"~*req.2"},{"mandatory":true,"path":"*cgreq.OriginID","tag":"OriginID","type":"*variable","value":"~*req.3"},{"mandatory":true,"path":"*cgreq.RequestType","tag":"RequestType","type":"*variable","value":"~*req.4"},{"mandatory":true,"path":"*cgreq.Tenant","tag":"Tenant","type":"*variable","value":"~*req.6"},{"mandatory":true,"path":"*cgreq.Category","tag":"Category","type":"*variable","value":"~*req.7"},{"mandatory":true,"path":"*cgreq.Account","tag":"Account","type":"*variable","value":"~*req.8"},{"mandatory":true,"path":"*cgreq.Subject","tag":"Subject","type":"*variable","value":"~*req.9"},{"mandatory":true,"path":"*cgreq.Destination","tag":"Destination","type":"*variable","value":"~*req.10"},{"mandatory":true,"path":"*cgreq.SetupTime","tag":"SetupTime","type":"*variable","value":"~*req.11"},{"mandatory":true,"path":"*cgreq.AnswerTime","tag":"AnswerTime","type":"*variable","value":"~*req.12"},{"mandatory":true,"path":"*cgreq.Usage","tag":"Usage","type":"*variable","value":"~*req.13"}],"filters":[],"flags":[],"id":"*default","max_reconnect_interval":"5m0s","opts":{"csvFieldSeparator":",","csvHeaderDefineChar":":","csvRowLength":0,"natsSubject":"cgrates_cdrs","partialCacheAction":"*none","partialOrderField":"~*req.AnswerTime","xmlRootPath":""},"partial_commit_fields":[],"processed_path":"/var/spool/cgrates/ers/out","reconnects":-1,"run_delay":"0","source_path":"/var/spool/cgrates/ers/in","tenant":"","timezone":"","type":"*none"}],"sessions_conns":["*internal"]}}` var rpl string if err := testSectRPC.Call(context.Background(), utils.ConfigSv1GetConfigAsJSON, &config.SectionWithAPIOpts{ Tenant: "cgrates.org", @@ -513,7 +513,7 @@ func testSectConfigSReloadERS(t *testing.T) { }, &rpl); err != nil { t.Error(err) } else if cfgStr != rpl { - t.Errorf("\nExpected %+v ,\n received: %+v", utils.ToIJSON(cfgStr), utils.ToIJSON(rpl)) + t.Errorf("\nExpected %+v ,\n received: %+v", cfgStr, rpl) } }