Skip to content

Commit

Permalink
Use bucket based on placeholder
Browse files Browse the repository at this point in the history
  • Loading branch information
rmontenegroo committed Dec 19, 2020
1 parent 26ce74e commit 0fd013d
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ vendor
.ruby-version

test/tmp/

docker/
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.5.0
1.5.0
75 changes: 55 additions & 20 deletions lib/fluent/plugin/out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def initialize
config_param :aws_iam_retries, :integer, default: nil, deprecated: "Use 'instance_profile_credentials' instead"
desc "S3 bucket name"
config_param :s3_bucket, :string
desc "Set bucket name fallback if fails fetching from placeholders"
config_param :s3_bucket_fallback, :string, :default => nil
desc "S3 region name"
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
desc "Use 's3_region' instead"
Expand Down Expand Up @@ -249,11 +251,13 @@ def start

s3_client = Aws::S3::Client.new(options)
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)

check_apikeys if @check_apikey_on_start
ensure_bucket if @check_bucket
ensure_bucket_lifecycle
if not @s3_bucket =~ /\$\{.*\}/
@bucket = @s3.bucket(@s3_bucket)
check_apikeys(@bucket) if @check_apikey_on_start
ensure_bucket(@bucket) if @check_bucket
ensure_bucket_lifecycle(@bucket)
end

super
end
Expand All @@ -264,6 +268,7 @@ def format(tag, time, record)
end

def write(chunk)

i = 0
metadata = chunk.metadata
previous_path = nil
Expand All @@ -272,6 +277,35 @@ def write(chunk)
else
@time_slice_with_tz.call(metadata.timekey)
end

bucket_name = nil

if @s3_bucket =~ /\$\{.*\}/
@s3_bucket.scan(/\$\{([^\$\{\}]+)\}/) do |placeholder|
placeholder = placeholder.join
if (not chunk.metadata.variables) or (not chunk.metadata.variables.keys.include?(placeholder.to_sym))
log.warn "There is no placeholder '#{placeholder}'"
if @s3_bucket_fallback
bucket_name = @s3_bucket_fallback
log.warn "Using @s3_bucket_fallback ('#{@s3_bucket_fallback}') as a fallback bucket name."
break
else
raise "It was not possible to extract placeholder '#{placeholder}' from chunk and @s3_bucket_fallback is not set."
end
end
end

if not bucket_name
bucket_name = extract_placeholders(@s3_bucket, chunk)
end

bucket = @s3.bucket(bucket_name)
check_apikeys(bucket) if @check_apikey_on_start
ensure_bucket(bucket) if @check_bucket
ensure_bucket_lifecycle(bucket)
else
bucket = @bucket
end

if @check_object
begin
Expand Down Expand Up @@ -304,7 +338,7 @@ def write(chunk)

i += 1
previous_path = s3path
end while @bucket.object(s3path).exists?
end while bucket.object(s3path).exists?
else
if @localtime
hms_slicer = Time.now.strftime("%H%M%S")
Expand Down Expand Up @@ -362,18 +396,19 @@ def write(chunk)
put_options[:metadata][k] = extract_placeholders(v, chunk).gsub(%r(%{[^}]+}), {"%{index}" => sprintf(@index_format, i - 1)})
end
end
@bucket.object(s3path).put(put_options)
bucket.object(s3path).put(put_options)

@values_for_s3_object_chunk.delete(chunk.unique_id)

if @warn_for_delay
if Time.at(chunk.metadata.timekey) < Time.now - @warn_for_delay
log.warn "out_s3: delayed events were put to s3://#{@s3_bucket}/#{s3path}"
log.warn "out_s3: delayed events were put to s3://#{bucket.name}/#{s3path}"
end
end
ensure
tmp.close(true) rescue nil
end

end

private
Expand All @@ -399,34 +434,34 @@ def timekey_to_timeformat(timekey)
end
end

def ensure_bucket
if !@bucket.exists?
def ensure_bucket(bucket)
if !bucket.exists?
if @auto_create_bucket
log.info "Creating bucket #{@s3_bucket} on #{@s3_endpoint}"
@s3.create_bucket(bucket: @s3_bucket)
log.info "Creating bucket #{bucket.name} on #{@s3_endpoint}"
@s3.create_bucket(bucket: bucket.name)
else
raise "The specified bucket does not exist: bucket = #{@s3_bucket}"
raise "The specified bucket does not exist: bucket = #{bucket.name}"
end
end
end

def ensure_bucket_lifecycle
def ensure_bucket_lifecycle(bucket)
unless @bucket_lifecycle_rules.empty?
old_rules = get_bucket_lifecycle_rules
old_rules = get_bucket_lifecycle_rules(bucket)
new_rules = @bucket_lifecycle_rules.sort_by { |rule| rule.id }.map do |rule|
{ id: rule.id, expiration: { days: rule.expiration_days }, prefix: rule.prefix, status: "Enabled" }
end

unless old_rules == new_rules
log.info "Configuring bucket lifecycle rules for #{@s3_bucket} on #{@s3_endpoint}"
@bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
log.info "Configuring bucket lifecycle rules for #{bucket.name} on #{@s3_endpoint}"
bucket.lifecycle_configuration.put({ lifecycle_configuration: { rules: new_rules } })
end
end
end

def get_bucket_lifecycle_rules
def get_bucket_lifecycle_rules(bucket)
begin
@bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
bucket.lifecycle_configuration.rules.sort_by { |rule| rule[:id] }.map do |rule|
{ id: rule[:id], expiration: { days: rule[:expiration][:days] }, prefix: rule[:prefix], status: rule[:status] }
end
rescue Aws::S3::Errors::NoSuchLifecycleConfiguration
Expand Down Expand Up @@ -461,8 +496,8 @@ def check_s3_path_safety(conf)
end
end

def check_apikeys
@bucket.objects(prefix: @path, :max_keys => 1).first
def check_apikeys(bucket)
bucket.objects(prefix: @path, :max_keys => 1).first
rescue Aws::S3::Errors::NoSuchBucket
# ignore NoSuchBucket Error because ensure_bucket checks it.
rescue => e
Expand Down
8 changes: 4 additions & 4 deletions test/test_out_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def write(chunk)

private

def ensure_bucket
def ensure_bucket(bucket)
end

def check_apikeys
def check_apikeys(bucket)
end
end.configure(conf)
end
Expand Down Expand Up @@ -287,7 +287,7 @@ def write(chunk)

private

def check_apikeys
def check_apikeys(bucket)
end
end.configure(conf)
end
Expand Down Expand Up @@ -427,7 +427,7 @@ def setup_mocks(exists_return = false)
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
client: @s3_client))
@s3_bucket.exists? { exists_return }
@s3_object = mock(Aws::S3::Object.new(bucket_name: "test_bucket",
Expand Down

0 comments on commit 0fd013d

Please sign in to comment.