Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements AWS SigV4 for the HTTP output plugin. #4459

Merged
merged 4 commits into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,7 @@ Gem::Specification.new do |gem|
gem.add_development_dependency("oj", [">= 2.14", "< 4"])
gem.add_development_dependency("async", "~> 1.23")
gem.add_development_dependency("async-http", ">= 0.50.0")
gem.add_development_dependency("aws-sigv4", ["~> 1.8"])
gem.add_development_dependency("aws-sdk-core", ["~> 3.191"])
gem.add_development_dependency("rexml", ["~> 3.2"])
end
71 changes: 65 additions & 6 deletions lib/fluent/plugin/out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,17 @@ class RetryableResponse < StandardError; end

config_section :auth, required: false, multi: false do
desc 'The method for HTTP authentication'
config_param :method, :enum, list: [:basic], default: :basic
config_param :method, :enum, list: [:basic, :aws_sigv4], default: :basic
desc 'The username for basic authentication'
config_param :username, :string, default: nil
desc 'The password for basic authentication'
config_param :password, :string, default: nil, secret: true
desc 'The AWS service to authenticate against'
config_param :aws_service, :string, default: nil
desc 'The AWS region to use when authenticating'
config_param :aws_region, :string, default: nil
desc 'The AWS role ARN to assume when authenticating'
config_param :aws_role_arn, :string, default: nil
end

def initialize
Expand Down Expand Up @@ -121,6 +127,36 @@ def configure(conf)
end
define_singleton_method(:format, method(:format_json_array))
end

if @auth and @auth.method == :aws_sigv4
begin
require 'aws-sigv4'
require 'aws-sdk-core'
rescue LoadError
raise Fluent::ConfigError, "The aws-sdk-core and aws-sigv4 gems are required for aws_sigv4 auth. Run: gem install aws-sdk-core -v '~> 3.191'"
end

raise Fluent::ConfigError, "aws_service is required for aws_sigv4 auth" unless @auth.aws_service != nil
raise Fluent::ConfigError, "aws_region is required for aws_sigv4 auth" unless @auth.aws_region != nil

if @auth.aws_role_arn == nil
aws_credentials = Aws::CredentialProviderChain.new.resolve
else
aws_credentials = Aws::AssumeRoleCredentials.new(
client: Aws::STS::Client.new(
region: @auth.aws_region
),
role_arn: @auth.aws_role_arn,
role_session_name: "fluentd"
)
end

@aws_signer = Aws::Sigv4::Signer.new(
service: @auth.aws_service,
region: @auth.aws_region,
credentials_provider: aws_credentials
)
end
end

def multi_workers_ready?
Expand Down Expand Up @@ -215,7 +251,7 @@ def parse_endpoint(chunk)
URI.parse(endpoint)
end

def set_headers(req, chunk)
def set_headers(req, uri, chunk)
if @headers
@headers.each do |k, v|
req[k] = v
Expand All @@ -229,21 +265,44 @@ def set_headers(req, chunk)
req['Content-Type'] = @content_type
end

def set_auth(req, uri)
return unless @auth

if @auth.method == :basic
req.basic_auth(@auth.username, @auth.password)
elsif @auth.method == :aws_sigv4
signature = @aws_signer.sign_request(
http_method: req.method,
url: uri.request_uri,
headers: {
'Content-Type' => @content_type,
'Host' => uri.host
},
body: req.body
)
req.add_field('x-amz-date', signature.headers['x-amz-date'])
req.add_field('x-amz-security-token', signature.headers['x-amz-security-token'])
req.add_field('x-amz-content-sha256', signature.headers['x-amz-content-sha256'])
req.add_field('authorization', signature.headers['authorization'])
end
end

def create_request(chunk, uri)
req = case @http_method
when :post
Net::HTTP::Post.new(uri.request_uri)
when :put
Net::HTTP::Put.new(uri.request_uri)
end
if @auth
req.basic_auth(@auth.username, @auth.password)
end
set_headers(req, chunk)
set_headers(req, uri, chunk)
req.body = @json_array ? "[#{chunk.read.chop}]" : chunk.read

# At least one authentication method requires the body and other headers, so the order of this call matters
set_auth(req, uri)
req
end


def send_request(uri, req)
res = if @proxy_uri
Net::HTTP.start(uri.host, uri.port, @proxy_uri.host, @proxy_uri.port, @proxy_uri.user, @proxy_uri.password, @http_opt) { |http|
Expand Down
92 changes: 92 additions & 0 deletions test/plugin/test_out_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'net/http'
require 'uri'
require 'json'
require 'aws-sdk-core'

# WEBrick's ProcHandler doesn't handle PUT by default
module WEBrick::HTTPServlet
Expand Down Expand Up @@ -390,6 +391,97 @@ def test_basic_auth_with_invalid_auth
end
end


sub_test_case 'aws sigv4 auth' do
setup do
@@fake_aws_credentials = Aws::Credentials.new(
'fakeaccess',
'fakesecret',
'fake session token'
)
end

def server_port
19883
end

def test_aws_sigv4_sts_role_arn
stub(Aws::AssumeRoleCredentials).new do |credentials_provider|
stub(credentials_provider).credentials {
@@fake_aws_credentials
}
credentials_provider
end

d = create_driver(config + %[
<auth>
method aws_sigv4
aws_service someservice
aws_region my-region-1
aws_role_arn arn:aws:iam::123456789012:role/MyRole
</auth>
])
d.run(default_tag: 'test.http') do
test_events.each { |event|
d.feed(event)
}
end

result = @@result
assert_equal 'POST', result.method
assert_equal 'application/x-ndjson', result.content_type
assert_equal test_events, result.data
assert_not_empty result.headers
assert_not_nil result.headers['authorization']
assert_match /AWS4-HMAC-SHA256 Credential=[a-zA-Z0-9]*\/\d+\/my-region-1\/someservice\/aws4_request/, result.headers['authorization']
assert_match /SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token/, result.headers['authorization']
assert_equal @@fake_aws_credentials.session_token, result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-content-sha256']
assert_not_empty result.headers['x-amz-content-sha256']
assert_not_nil result.headers['x-amz-security-token']
assert_not_empty result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-date']
assert_not_empty result.headers['x-amz-date']
end

def test_aws_sigv4_no_role
stub(Aws::CredentialProviderChain).new do |provider_chain|
stub(provider_chain).resolve {
@@fake_aws_credentials
}
provider_chain
end
d = create_driver(config + %[
<auth>
method aws_sigv4
aws_service someservice
aws_region my-region-1
</auth>
])
d.run(default_tag: 'test.http') do
test_events.each { |event|
d.feed(event)
}
end

result = @@result
assert_equal 'POST', result.method
assert_equal 'application/x-ndjson', result.content_type
assert_equal test_events, result.data
assert_not_empty result.headers
assert_not_nil result.headers['authorization']
assert_match /AWS4-HMAC-SHA256 Credential=[a-zA-Z0-9]*\/\d+\/my-region-1\/someservice\/aws4_request/, result.headers['authorization']
assert_match /SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date;x-amz-security-token/, result.headers['authorization']
assert_equal @@fake_aws_credentials.session_token, result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-content-sha256']
assert_not_empty result.headers['x-amz-content-sha256']
assert_not_nil result.headers['x-amz-security-token']
assert_not_empty result.headers['x-amz-security-token']
assert_not_nil result.headers['x-amz-date']
assert_not_empty result.headers['x-amz-date']
end
end

sub_test_case 'HTTPS' do
def server_port
19882
Expand Down
Loading