Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple S3 buckets support for input, dynamic bucket names from SQS, and queue url override #364

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 37 additions & 11 deletions lib/fluent/plugin/in_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def initialize
desc "Profile name. Default to 'default' or ENV['AWS_PROFILE']"
config_param :profile_name, :string, default: nil
end
desc "S3 bucket name"
config_param :s3_bucket, :string
desc "S3 bucket name(s) separated by commas in case of multiple bucket names"
config_param :s3_buckets, :string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing parameter name breaks existing configurations.
We couldn't accept this change.
Instead, using deprecated attr in config_param and adding parameter conversion code are needed to accept this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. I'll do the requested changes to make it backwards compatible and get back to you 👍

desc "S3 region name"
config_param :s3_region, :string, default: ENV["AWS_REGION"] || "us-east-1"
desc "Use 's3_region' instead"
Expand All @@ -96,6 +96,8 @@ def initialize
config_param :queue_name, :string, default: nil
desc "SQS Owner Account ID"
config_param :queue_owner_aws_account_id, :string, default: nil
desc "SQS queue url, when passed it'll not get the queue URL by name & account ID"
config_param :queue_url, :string, default: nil
desc "Use 's3_region' instead"
config_param :endpoint, :string, default: nil
desc "Skip message deletion"
Expand All @@ -106,6 +108,7 @@ def initialize
config_param :retry_error_interval, :integer, default: 300
end

# Default tag will include input.s3.bucket_name ###Check the function process(body)
desc "Tag string"
config_param :tag, :string, default: "input.s3"

Expand Down Expand Up @@ -145,16 +148,29 @@ def start
s3_client = create_s3_client
log.debug("Succeeded to create S3 client")
@s3 = Aws::S3::Resource.new(client: s3_client)
@bucket = @s3.bucket(@s3_bucket)

raise "#{@bucket.name} is not found." unless @bucket.exists?
@buckets = {}
if (@s3_buckets.include?(","))
splitted_buckets = @s3_buckets.split(',')
splitted_buckets.each do | bucket |
@buckets[bucket] = @s3.bucket(bucket)
raise "#{bucket} is not found." unless @buckets[bucket].exists?
end
else
@buckets[@s3_buckets] = @s3.bucket(@s3_buckets)
raise "#{@s3_buckets} is not found." unless @buckets[@s3_buckets].exists?
end

check_apikeys if @check_apikey_on_start

sqs_client = create_sqs_client
log.debug("Succeeded to create SQS client")
response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id)
sqs_queue_url = response.queue_url
sqs_queue_url = nil
if (@sqs.queue_url.nil?)
response = sqs_client.get_queue_url(queue_name: @sqs.queue_name, queue_owner_aws_account_id: @sqs.queue_owner_aws_account_id)
sqs_queue_url = response.queue_url
else
sqs_queue_url = @sqs.queue_url
end
log.debug("Succeeded to get SQS queue URL")

@poller = Aws::SQS::QueuePoller.new(sqs_queue_url, client: sqs_client)
Expand Down Expand Up @@ -279,29 +295,39 @@ def create_sqs_client
end

def check_apikeys
@bucket.objects.first
log.debug("Succeeded to verify API keys")
@buckets.each do | bucket_name, bucket_object |
bucket_object.objects.first
log.debug("Succeeded to verify API keys for bucket #{bucket_name}")
end
rescue => e
raise "can't call S3 API. Please check your credentials or s3_region configuration. error = #{e.inspect}"
end

def process(body)
s3 = body["Records"].first["s3"]
raw_key = s3["object"]["key"]
raw_bucket_name = s3["bucket"]["name"]
key = CGI.unescape(raw_key)

io = @bucket.object(key).get.body
if (!@buckets.key?(raw_bucket_name))
raise "S3 bucket name: #{raw_bucket_name} returned from SQS was not provided in the input configuration as one of the s3 fluentd sources."
end

io = @buckets[raw_bucket_name].object(key).get.body
content = @extractor.extract(io)
es = Fluent::MultiEventStream.new
content.each_line do |line|
@parser.parse(line) do |time, record|
if @add_object_metadata
record['s3_bucket'] = @s3_bucket
record['s3_bucket'] = raw_bucket_name
record['s3_key'] = raw_key
end
es.add(time, record)
end
end
if (@tag == "input.s3")
@tag = "input.s3.#{raw_bucket_name}"
end
router.emit_stream(@tag, es)
end

Expand Down
107 changes: 101 additions & 6 deletions test/test_in_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def setup
CONFIG = %[
aws_key_id test_key_id
aws_sec_key test_sec_key
s3_bucket test_bucket
s3_buckets test_bucket
buffer_type memory
<sqs>
queue_name test_queue
Expand All @@ -47,7 +47,7 @@ def test_default
actual = {
aws_key_id: d.instance.aws_key_id,
aws_sec_key: d.instance.aws_sec_key,
s3_bucket: d.instance.s3_bucket,
s3_buckets: d.instance.s3_buckets,
s3_region: d.instance.s3_region,
sqs_queue_name: d.instance.sqs.queue_name,
extractor_ext: extractor.ext,
Expand All @@ -56,7 +56,7 @@ def test_default
expected = {
aws_key_id: "test_key_id",
aws_sec_key: "test_sec_key",
s3_bucket: "test_bucket",
s3_buckets: "test_bucket",
s3_region: "us-east-1",
sqs_queue_name: "test_queue",
extractor_ext: "gz",
Expand All @@ -65,6 +65,77 @@ def test_default
assert_equal(expected, actual)
end

def test_with_multiple_buckets
conf = %[
aws_key_id test_key_id
aws_sec_key test_sec_key
s3_buckets test_bucket1,test_bucket2
buffer_type memory
<sqs>
queue_name test_queue
queue_owner_aws_account_id 123456789123
</sqs>
]
d = create_driver(conf)
extractor = d.instance.instance_variable_get(:@extractor)
actual = {
aws_key_id: d.instance.aws_key_id,
aws_sec_key: d.instance.aws_sec_key,
s3_buckets: d.instance.s3_buckets,
s3_region: d.instance.s3_region,
sqs_queue_name: d.instance.sqs.queue_name,
extractor_ext: extractor.ext,
extractor_content_type: extractor.content_type
}
expected = {
aws_key_id: "test_key_id",
aws_sec_key: "test_sec_key",
s3_buckets: "test_bucket1,test_bucket2",
s3_region: "us-east-1",
sqs_queue_name: "test_queue",
extractor_ext: "gz",
extractor_content_type: "application/x-gzip"
}
assert_equal(expected, actual)
end

def test_with_multiple_buckets_and_sqs_queue_url_override
conf = %[
aws_key_id test_key_id
aws_sec_key test_sec_key
s3_buckets test_bucket1,test_bucket2
buffer_type memory
<sqs>
queue_name test_queue
queue_owner_aws_account_id 123456789123
queue_url https://sqs.us-east-1.amazonaws.com/345678912345/test_override_queue
</sqs>
]
d = create_driver(conf)
extractor = d.instance.instance_variable_get(:@extractor)
actual = {
aws_key_id: d.instance.aws_key_id,
aws_sec_key: d.instance.aws_sec_key,
s3_buckets: d.instance.s3_buckets,
s3_region: d.instance.s3_region,
sqs_queue_name: d.instance.sqs.queue_name,
extractor_ext: extractor.ext,
extractor_content_type: extractor.content_type,
sqs_queue_url: d.instance.sqs.queue_url
}
expected = {
aws_key_id: "test_key_id",
aws_sec_key: "test_sec_key",
s3_buckets: "test_bucket1,test_bucket2",
s3_region: "us-east-1",
sqs_queue_name: "test_queue",
extractor_ext: "gz",
extractor_content_type: "application/x-gzip",
sqs_queue_url: "https://sqs.us-east-1.amazonaws.com/345678912345/test_override_queue"
}
assert_equal(expected, actual)
end

def test_empty
assert_raise(Fluent::ConfigError) do
create_driver("")
Expand All @@ -75,7 +146,7 @@ def test_without_sqs_section
conf = %[
aws_key_id test_key_id
aws_sec_key test_sec_key
s3_bucket test_bucket
s3_buckets test_bucket
]
assert_raise_message("'<sqs>' sections are required") do
create_driver(conf)
Expand Down Expand Up @@ -137,7 +208,7 @@ def test_sqs_endpoint_with_invalid_endpoint(endpoint)
conf = <<"EOS"
aws_key_id test_key_id
aws_sec_key test_sec_key
s3_bucket test_bucket
s3_buckets test_bucket
buffer_type memory
<sqs>
queue_name test_queue
Expand All @@ -157,7 +228,7 @@ def setup_mocks
mock(Aws::S3::Client).new(anything).at_least(0) { @s3_client }
@s3_resource = mock(Aws::S3::Resource.new(client: @s3_client))
mock(Aws::S3::Resource).new(client: @s3_client) { @s3_resource }
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test",
@s3_bucket = mock(Aws::S3::Bucket.new(name: "test_bucket",
client: @s3_client))
@s3_bucket.exists? { true }
@s3_resource.bucket(anything) { @s3_bucket }
Expand Down Expand Up @@ -207,6 +278,9 @@ def test_one_record
"s3" => {
"object" => {
"key" => "test_key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -242,6 +316,9 @@ def test_one_record_with_metadata
"s3" => {
"object" => {
"key" => "test_key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -277,6 +354,9 @@ def test_one_record_url_encoded
"s3" => {
"object" => {
"key" => "test+key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -312,6 +392,9 @@ def test_one_record_url_encoded_with_metadata
"s3" => {
"object" => {
"key" => "test+key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -347,6 +430,9 @@ def test_one_record_multi_line
"s3" => {
"object" => {
"key" => "test_key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -387,6 +473,9 @@ def test_one_record_multi_line_with_metadata
"s3" => {
"object" => {
"key" => "test_key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -435,6 +524,9 @@ def test_gzip_single_stream
"s3" => {
"object" => {
"key" => "test_key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down Expand Up @@ -486,6 +578,9 @@ def test_gzip_multiple_steams
"s3" => {
"object" => {
"key" => "test_key"
},
"bucket" => {
"name"=> "test_bucket"
}
}
}
Expand Down