diff --git a/lib/fluent/plugin/out_opensearch.rb b/lib/fluent/plugin/out_opensearch.rb index 7fd543c..c98f32c 100644 --- a/lib/fluent/plugin/out_opensearch.rb +++ b/lib/fluent/plugin/out_opensearch.rb @@ -83,6 +83,7 @@ def initialize(retry_stream) attr_reader :template_names attr_reader :ssl_version_options attr_reader :compressable_connection + attr_reader :duration_seconds helpers :event_emitter, :compat_parameters, :record_accessor, :timer @@ -94,6 +95,7 @@ def initialize(retry_stream) DEFAULT_RELOAD_AFTER = -1 DEFAULT_TARGET_BULK_BYTES = -1 DEFAULT_POLICY_ID = "logstash-policy" + DEFAULT_DURATION = "5h" config_param :host, :string, :default => 'localhost' config_param :port, :integer, :default => 9200 @@ -195,7 +197,7 @@ def initialize(retry_stream) config_param :assume_role_session_name, :string, :default => "fluentd" config_param :assume_role_web_identity_token_file, :string, :default => nil config_param :sts_credentials_region, :string, :default => nil - config_param :refresh_credentials_interval, :time, :default => "5h" + config_param :refresh_credentials_interval, :time, :default => DEFAULT_DURATION config_param :aws_service_name, :enum, list: [:es, :aoss], :default => :es end @@ -211,6 +213,8 @@ def initialize(retry_stream) def initialize super + + @duration_seconds = Fluent::Config.time_value(DEFAULT_DURATION) end ###################################################################################################### @@ -238,13 +242,15 @@ def aws_credentials(conf) credentials = Aws::AssumeRoleCredentials.new({ role_arn: conf[:assume_role_arn], role_session_name: conf[:assume_role_session_name], - region: sts_creds_region(conf) + region: sts_creds_region(conf), + duration_seconds: @duration_seconds }).credentials else credentials = Aws::AssumeRoleWebIdentityCredentials.new({ role_arn: conf[:assume_role_arn], web_identity_token_file: conf[:assume_role_web_identity_token_file], - region: sts_creds_region(conf) + region: sts_creds_region(conf), + duration_seconds: @duration_seconds }).credentials end end @@ -345,7 +351,18 @@ class << self @_aws_credentials = aws_credentials(@endpoint) if @endpoint.refresh_credentials_interval - timer_execute(:out_opensearch_expire_credentials, @endpoint.refresh_credentials_interval) do + @duration_seconds = Fluent::Config.time_value(@endpoint.refresh_credentials_interval) + # 60 * 60 * 12 = 12 hours + if @duration_seconds > 43200 + raise Fluent::ConfigError, "Maximum duration is 12 hours." + end + + # 60 * 15 = 15 minutes + if @duration_seconds < 900 + raise Fluent::ConfigError, "Minimum duration is 15 minutes." + end + + timer_execute(:out_opensearch_expire_credentials, @duration_seconds) do log.debug('Recreate the AWS credentials') @credential_mutex.synchronize do diff --git a/test/plugin/test_out_opensearch.rb b/test/plugin/test_out_opensearch.rb index bb76fb6..f6a24c2 100644 --- a/test/plugin/test_out_opensearch.rb +++ b/test/plugin/test_out_opensearch.rb @@ -300,6 +300,7 @@ def test_configure 'region' => "local", 'access_key_id' => 'YOUR_AWESOME_KEY', 'secret_access_key' => 'YOUR_AWESOME_SECRET', + 'refresh_credentials_interval' => '10h' }, []), Fluent::Config::Element.new('buffer', 'tag', {}, []) @@ -316,6 +317,8 @@ def test_configure assert_nil instance.endpoint.assume_role_web_identity_token_file assert_nil instance.endpoint.sts_credentials_region assert_equal :es, instance.endpoint.aws_service_name + assert_equal 36000, instance.endpoint.refresh_credentials_interval + assert_equal 36000, instance.duration_seconds end data("OpenSearch Service" => [:es, 'es'],