From 0fd013d978c0f01b9ff20dbd5d7d0471c0f81dc3 Mon Sep 17 00:00:00 2001 From: rmontenegroo Date: Sat, 19 Dec 2020 20:42:12 -0300 Subject: [PATCH] Use bucket based on placeholder --- .gitignore | 2 + VERSION | 2 +- lib/fluent/plugin/out_s3.rb | 75 +++++++++++++++++++++++++++---------- test/test_out_s3.rb | 8 ++-- 4 files changed, 62 insertions(+), 25 deletions(-) diff --git a/.gitignore b/.gitignore index 73ddea98..8f980920 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ vendor .ruby-version test/tmp/ + +docker/ diff --git a/VERSION b/VERSION index 3e1ad720..bc80560f 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.5.0 \ No newline at end of file +1.5.0 diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index 956ed968..118aeb34 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -87,6 +87,8 @@ def initialize config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead" desc "S3 bucket name" config_param :s3_bucket, :string + desc "Set bucket name fallback if fails fetching from placeholders" + config_param :s3_bucket_fallback, :string, :default => nil desc "S3 region name" config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1" desc "Use 's3_region' instead" @@ -249,11 +251,13 @@ def start s3_client = Aws::S3::Client.new(options) @s3 = Aws::S3::Resource.new(client: s3_client) - @bucket = @s3.bucket(@s3_bucket) - check_apikeys if @check_apikey_on_start - ensure_bucket if @check_bucket - ensure_bucket_lifecycle + if not @s3_bucket =~ /\$\{.*\}/ + @bucket = @s3.bucket(@s3_bucket) + check_apikeys(@bucket) if @check_apikey_on_start + ensure_bucket(@bucket) if @check_bucket + ensure_bucket_lifecycle(@bucket) + end super end @@ -264,6 +268,7 @@ def format(tag, time, record) end def write(chunk) + i = 0 metadata = chunk.metadata previous_path = nil @@ -272,6 +277,35 @@ def write(chunk) else @time_slice_with_tz.call(metadata.timekey) end + + bucket_name = nil + + if @s3_bucket =~ /\$\{.*\}/ + @s3_bucket.scan(/\$\{([^\$\{\}]+)\}/) do |placeholder| + placeholder = placeholder.join + if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym)) + log.warn "There is no placeholder '#{placeholder}'" + if @s3_bucket_fallback + bucket_name = @s3_bucket_fallback + log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name." + break + else + raise "It was not possible to extract placeholder '#{placeholder}' from chunk and @s3_bucket_fallback is not set." + end + end + end + + if not bucket_name + bucket_name = extract_placeholders(@s3_bucket, chunk) + end + + bucket = @s3.bucket(bucket_name) + check_apikeys(bucket) if @check_apikey_on_start + ensure_bucket(bucket) if @check_bucket + ensure_bucket_lifecycle(bucket) + else + bucket = @bucket + end if @check_object begin @@ -304,7 +338,7 @@ def write(chunk) i += 1 previous_path = s3path - end while @bucket.object(s3path).exists? + end while bucket.object(s3path).exists? else if @localtime hms_slicer = Time.now.strftime("%H%M%S") @@ -362,18 +396,19 @@ def write(chunk) put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)}) end end - @bucket.object(s3path).put(put_options) + bucket.object(s3path).put(put_options) @values_for_s3_object_chunk.delete(chunk.unique_id) if @warn_for_delay if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay - log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}" + log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}" end end ensure tmp.close(true) rescue nil end + end private @@ -399,34 +434,34 @@ def timekey_to_timeformat(timekey) end end - def ensure_bucket - if !@bucket.exists? + def ensure_bucket(bucket) + if !bucket.exists? if @auto_create_bucket - log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}" - @s3.create_bucket(bucket: @s3_bucket) + log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}" + @s3.create_bucket(bucket: bucket.name) else - raise "The specified bucket does not exist: bucket = #{@s3_bucket}" + raise "The specified bucket does not exist: bucket = #{bucket.name}" end end end - def ensure_bucket_lifecycle + def ensure_bucket_lifecycle(bucket) unless @bucket_lifecycle_rules.empty? - old_rules = get_bucket_lifecycle_rules + old_rules = get_bucket_lifecycle_rules(bucket) new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule| { id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" } end unless old_rules == new_rules - log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}" - @bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } }) + log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}" + bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } }) end end end - def get_bucket_lifecycle_rules + def get_bucket_lifecycle_rules(bucket) begin - @bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule| + bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule| { id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] } end rescue Aws::S3::Errors::NoSuchLifecycleConfiguration @@ -461,8 +496,8 @@ def check_s3_path_safety(conf) end end - def check_apikeys - @bucket.objects(prefix: @path, :max_keys => 1).first + def check_apikeys(bucket) + bucket.objects(prefix: @path, :max_keys => 1).first rescue Aws::S3::Errors::NoSuchBucket # ignore NoSuchBucket Error because ensure_bucket checks it. rescue => e diff --git a/test/test_out_s3.rb b/test/test_out_s3.rb index c1530e62..f475d9fa 100644 --- a/test/test_out_s3.rb +++ b/test/test_out_s3.rb @@ -44,10 +44,10 @@ def write(chunk) private - def ensure_bucket + def ensure_bucket(bucket) end - def check_apikeys + def check_apikeys(bucket) end end.configure(conf) end @@ -287,7 +287,7 @@ def write(chunk) private - def check_apikeys + def check_apikeys(bucket) end end.configure(conf) end @@ -427,7 +427,7 @@ def setup_mocks(exists_return = false) mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client } @s3_resource = mock(Aws::S3::Resource.new(client: @s3_client)) mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource } - @s3_bucket = mock(Aws::S3::Bucket.new(name: "test", + @s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket", client: @s3_client)) @s3_bucket.exists? { exists_return } @s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",