Skip to content

Commit

Permalink
Merge 2d9f8ed into b198745
Browse files Browse the repository at this point in the history
  • Loading branch information
cosmo0920 committed Jan 12, 2022
2 parents b198745 + 2d9f8ed commit e0e47ec
Show file tree
Hide file tree
Showing 3 changed files with 278 additions and 2 deletions.
179 changes: 179 additions & 0 deletions README.md
Expand Up @@ -101,6 +101,7 @@ Send your logs to OpenSearch (and search them with OpenSearch Dashboard maybe?)
* [Configuration - OpenSearch Input](#configuration---opensearch-input)
* [Configuration - OpenSearch Filter GenID](#configuration---opensearch-filter-genid)
* [Configuration - OpenSearch Output Data Stream](#configuration---opensearch-output-data-stream)
* [Configuration - AWS OpenSearch Service](#configuration---aws-opensearch-service)
* [Troubleshooting](#troubleshooting)
* [Contact](#contact)
* [Contributing](#contributing)
Expand Down Expand Up @@ -1347,6 +1348,184 @@ You can specify an existing matching index template for the data stream. If not

Default value is `data_stream_name`.

## Configuration - AWS OpenSearch Service

### \<endpoint\> section

AWS OpenSearch Service related settings are placed in `<endpoint>` directive.

Configuration example is below:

```aconf
<match es.**>
type opensearch
logstash_format true
include_tag_key true
flush_interval 1s
<endpoint>
url https://CLUSTER_ENDPOINT_URL
region YOUR_AWS_REGION
# access_key_id "secret"
# secret_access_key "foo_secret"
</endpoint>
</match>
```

#### region

Specify AWS region.

```aconf
<endpoint>
region us-east-2 # e.g.) AWS Ohio region
# other stuffs.
</endpoint>
```

#### url

Specify AWS OpenSearch Service endpoint.

```aconf
<endpoint>
url https://CLUSTER_ENDPOINT_URL
# other stuffs.
</endpoint>
```

**NOTE:** This plugin will remove trailing slashes automatically. You don't need to pay attension to the trailing slash characters.

#### access_key_id

Specify AWS access key.

```aconf
<endpoint>
access_key_id YOUR_AWS_ACCESS_KEY
# other stuffs.
</endpoint>
```

#### secret_access_key

Specify AWS secret access key.

```aconf
<endpoint>
secret_access_key YOUR_AWS_SECRET_ACCESS_KEY
# other stuffs.
</endpoint>
```

### IAM

If you don't want to use `access_key_id` and `secret_access_key` on your endpoint configuration, you should use IAM policies instead.

#### Assign an IAM instance role

The first step needs to assign an IAM instance role (ROLE) to your EC2 instances. You should change its name appropriately.
The attaching role should not contain no policy: We're using the role as the authenticating factor and placing the policy into the OpenSearch cluster.

Then, you should configure a policy for the OpenSearch cluster policy with substitution s for the capitalized terms:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::ACCOUNT:role/ROLE"
},
"Action": "es:*",
"Resource": "arn:aws:es:AWS_REGION:ACCOUNT:domain/OPENSEARCH_DOMAIN/*"
},
{
"Effect": "Allow",
"Principal": {
"AWS": "*"
},
"Action": "es:*",
"Resource": "arn:aws:es:AWS_REGION:ACCOUNT:domain/OPENSEARCH_DOMAIN/*",
"Condition": {
"IpAddress": {
"aws:SourceIp": [
"1.2.3.4/32",
"5.6.7.8/32"
]
}
}
}
]
}
```

The above policy will allow your Fluentd hosts and any traffic coming from the specified IP addresses (you querying OpenSearch Dashboard) to access the various endpoints. While not ideally secure it should allow you to get up and ingesting logs without anything

#### Use STS assumed role as the authenticating factor

Additionally, you can use a STS assumed role as the authenticating factor and instruct the plugin to assume this role.
This is useful for cross-account access and when assigning a standard role is not possible. In this case, the endpoint configuration looks like:

```aconf
<endpoint>
url https://CLUSTER_ENDPOINT_URL
region YOUR_AWS_REGION
assume_role_arn arn:aws:sts::ACCOUNT:role/ROLE
assume_role_session_name SESSION_ID # Defaults to fluentd if omitted
sts_credentials_region YOUR_AWS_STS_REGION # Defaults to region if omitted
</endpoint>
```

The policy attached into your OpenSearch cluster becomes something like:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:sts::ACCOUNT:assumed-role/ROLE/SESSION_ID"
},
"Action": "es:*",
"Resource": "arn:aws:es:AWS_REGION:ACCOUNT:domain/ES_DOMAIN/*"
}
]
}
```

#### Ensure the environment to assume roles

You'll need to ensure that the environment where the Fluentd plugin runs to have the capability to assume this role, by attaching a policy something like this to the instance profile:

```json
{
"Version": "2012-10-17",
"Statement": {
"Effect": "Allow",
"Action": "sts:AssumeRole",
"Resource": "arn:aws:iam::ACCOUNT:role/ROLE"
}
}
```

### EKS

If you want to use IAM roles for service accounts on your Amazon EKS clusters, please refer to the official documentation and specify a Service Account for your fluentd Pod.

In this case, the endpoint configuration looks like:

```aconf
<endpoint>
url https://CLUSTER_ENDPOINT_URL
region eu-west-1
assume_role_arn "#{ENV['AWS_ROLE_ARN']}"
assume_role_web_identity_token_file "#{ENV['AWS_WEB_IDENTITY_TOKEN_FILE']}"
</endpoint>
```

## Troubleshooting

See [Troubleshooting document](README.Troubleshooting.md)
Expand Down
2 changes: 2 additions & 0 deletions fluent-plugin-opensearch.gemspec
Expand Up @@ -25,6 +25,8 @@ Gem::Specification.new do |s|
s.add_runtime_dependency 'fluentd', '>= 0.14.22'
s.add_runtime_dependency 'excon', '>= 0'
s.add_runtime_dependency 'opensearch-ruby'
s.add_runtime_dependency "aws-sdk-core", "~> 3"
s.add_runtime_dependency "faraday_middleware-aws-sigv4"


s.add_development_dependency 'rake', '>= 0'
Expand Down
99 changes: 97 additions & 2 deletions lib/fluent/plugin/out_opensearch.rb
Expand Up @@ -54,6 +54,8 @@
require_relative 'oj_serializer'
rescue LoadError
end
require 'aws-sdk-core'
require 'faraday_middleware/aws_sigv4'

module Fluent::Plugin
class OpenSearchOutput < Output
Expand Down Expand Up @@ -177,6 +179,20 @@ def initialize(retry_stream)
config_param :chunk_id_key, :string, :default => "chunk_id".freeze
end

config_section :endpoint, multi: false do
config_param :region, :string
config_param :url do |c|
c.chomp("/")
end
config_param :access_key_id, :string, :default => ""
config_param :secret_access_key, :string, :default => "", secret: true
config_param :assume_role_arn, :string, :default => nil
config_param :ecs_container_credentials_relative_uri, :string, :default => nil #Set with AWS_CONTAINER_CREDENTIALS_RELATIVE_URI environment variable value
config_param :assume_role_session_name, :string, :default => "fluentd"
config_param :assume_role_web_identity_token_file, :string, :default => nil
config_param :sts_credentials_region, :string, :default => nil
end

config_section :buffer do
config_set_default :@type, DEFAULT_BUFFER_TYPE
config_set_default :chunk_keys, ['tag']
Expand All @@ -191,10 +207,68 @@ def initialize
super
end

######################################################################################################
# This creating AWS credentials code part is heavily based on fluent-plugin-aws-elasticsearch-service:
# https://github.com/atomita/fluent-plugin-aws-elasticsearch-service/blob/master/lib/fluent/plugin/out_aws-elasticsearch-service.rb#L73-L134
######################################################################################################
def aws_credentials(conf)
credentials = nil
unless conf[:access_key_id].empty? || conf[:secret_access_key].empty?
credentials = Aws::Credentials.new(conf[:access_key_id], conf[:secret_access_key])
else
if conf[:assume_role_arn].nil?
aws_container_credentials_relative_uri = conf[:ecs_container_credentials_relative_uri] || ENV["AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"]
if aws_container_credentials_relative_uri.nil?
credentials = Aws::SharedCredentials.new({retries: 2}).credentials
credentials ||= Aws::InstanceProfileCredentials.new.credentials
credentials ||= Aws::ECSCredentials.new.credentials
else
credentials = Aws::ECSCredentials.new({
credential_path: aws_container_credentials_relative_uri
}).credentials
end
else
if conf[:assume_role_web_identity_token_file].nil?
credentials = Aws::AssumeRoleCredentials.new({
role_arn: conf[:assume_role_arn],
role_session_name: conf[:assume_role_session_name],
region: sts_creds_region(conf)
}).credentials
else
credentials = Aws::AssumeRoleWebIdentityCredentials.new({
role_arn: conf[:assume_role_arn],
web_identity_token_file: conf[:assume_role_web_identity_token_file],
region: sts_creds_region(conf)
}).credentials
end
end
end
raise "No valid AWS credentials found." unless credentials.set?

credentials
end

def sts_creds_region(conf)
conf[:sts_credentials_region] || conf[:region]
end
###############################
# AWS credential part is ended.
###############################

def configure(conf)
compat_parameters_convert(conf, :buffer)

super

if @endpoint
# here overrides default value of reload_connections to false because
# AWS Elasticsearch Service doesn't return addresses of nodes and Elasticsearch client
# fails to reload connections properly. This ends up "temporarily failed to flush the buffer"
# error repeating forever. See this discussion for details:
# https://discuss.elastic.co/t/elasitcsearch-ruby-raises-cannot-get-new-connection-from-pool-error/36252
@reload_connections = false
end

if placeholder_substitution_needed_for_template?
# nop.
elsif not @buffer_config.chunk_keys.include? "tag" and
Expand Down Expand Up @@ -518,7 +592,21 @@ def client(host = nil, compress_connection = false)
@_os ||= begin
@compressable_connection = compress_connection
@current_config = connection_options[:hosts].clone
adapter_conf = lambda {|f| f.adapter @http_backend, @backend_options }
adapter_conf = if @endpoint
lambda do |f|
f.request(
:aws_sigv4,
service: 'es',
region: @endpoint.region,
credentials: aws_credentials(@endpoint),
)

f.adapter @http_backend, @backend_options
end
else
lambda {|f| f.adapter @http_backend, @backend_options }
end

local_reload_connections = @reload_connections
if local_reload_connections && @reload_after > DEFAULT_RELOAD_AFTER
local_reload_connections = @reload_after
Expand Down Expand Up @@ -572,7 +660,14 @@ def get_escaped_userinfo(host_str)

def get_connection_options(con_host=nil)

hosts = if con_host || @hosts
hosts = if @endpoint # For AWS OpenSearch Service
uri = URI(@endpoint.url)
host = %w(user password path).inject(host: uri.host, port: uri.port, scheme: uri.scheme) do |hash, key|
hash[key.to_sym] = uri.public_send(key) unless uri.public_send(key).nil? || uri.public_send(key) == ''
hash
end
[host]
elsif con_host || @hosts
(con_host || @hosts).split(',').map do |host_str|
# Support legacy hosts format host:port,host:port,host:port...
if host_str.match(%r{^[^:]+(\:\d+)?$})
Expand Down

0 comments on commit e0e47ec

Please sign in to comment.