diff --git a/Gemfile b/Gemfile new file mode 100644 index 0000000..fa75df1 --- /dev/null +++ b/Gemfile @@ -0,0 +1,3 @@ +source 'https://rubygems.org' + +gemspec diff --git a/NOTICE.TXT b/NOTICE.TXT new file mode 100644 index 0000000..00bdbc7 --- /dev/null +++ b/NOTICE.TXT @@ -0,0 +1,14 @@ +Logstash-output-amazon_es Plugin +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either expressed or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md index 2251668..224acb6 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,158 @@ -# logstash-output-amazones -Logstash output plugin to sign and export logstash events to Amazon Elasticsearch Service +# Logstash Plugin + +This is a plugin for [Logstash](https://github.com/elastic/logstash). + +It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. + +# Setting Up + +## Installation +One command installation +`bin/plugin install logstash-output-amazon_es` + +While we are in the process of getting this plugin fully integrated within logstash to make installation simpler, +if above does not work, or you would like to patch code here is a workaround to install this plugin within your logstash: + +1. Check out/clone this code from github +2. Build plugin using - `gem build logstash-output-amazon_es.gemspec` ( this works with jruby and rubygem versions > 1.9) +3. Install plugin using `/bin/plugin install logstash-output-amazon_es-0.1.0-java.gem` (or the non java variant) + +## Configuration for Amazon Elasticsearch Output plugin + +To run the Logstash output Amazon Elasticsearch plugin simply add a configuration following the below documentation. + +An example configuration: + + output { + amazon_es { + hosts => ["foo.us-east-1.es.amazonaws.com"] + region => "us-east-1" + access_key => 'ACCESS_KEY' (optional - will follow AWS Credential provider chain) + secret_key => 'SECRET_KEY' (optional) + index => "production-logs-%{+YYYY.MM.dd}" + } + } + +* Required Parameters + * hosts (array of string) - Amazon Elasticsearch domain endpoint. eg ["foo.us-east-1.es.amazonaws.com"] + * region (string, :default => "us-east-1") - region where the domain is located + +* Optional Parameters + * Credential parameters + * aws_access_key_id, :validate => :string - Optional AWS Access key + * aws_secret_access_key, :validate => :string - Optional AWS Secret Key + The credential resolution logic can be described as follows: + - User passed aws_access_key_id and aws_secret_access_key in aes configuration + - Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), + or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK) + - Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + - Instance profile credentials delivered through the Amazon EC2 metadata service + * Retry Parameters + * max_retries (number, default => 3) - Set max retry for each event + * retry_max_items (number, default => 5000) - Set retry queue size for events that failed to send + * retry_max_interval (number, default => 5) - Set max interval between bulk retries + * index (string, default => "logstash-%{+YYYY.MM.dd}") - Elasticsearch index to write events into + * flush_size (number , default => 500) - This setting controls how many events will be buffered before sending a batch of events in bulk API + * idle_flush_time (number, default => 1) - The amount of time in seconds since last flush before a flush is forced. + This setting helps ensure slow event rates don't get stuck in Logstash. + For example, if your `flush_size` is 100, and you have received 10 events, + and it has been more than `idle_flush_time` seconds since the last flush, + Logstash will flush those 10 events automatically. + This helps keep both fast and slow log streams moving along in near-real-time. + * template (path) - You can set the path to your own template here, if you so desire. If not set, the included template will be used. + * template_name (string, default => "logstash") - defines how the template is named inside Elasticsearch + * port (string, default 80) - Amazon Elasticsearch Service listens on port 80, if you have a custom proxy fronting elasticsearch, this parameter may need tweaking. + +## Documentation + +Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). + +- For formatting code or config example, you can use the asciidoc `[source,ruby]` directive +- For more asciidoc formatting tips, see the excellent reference here https://github.com/elastic/docs#asciidoc-guide + +## Need Help? + +Need help? Try #logstash on freenode IRC or the https://discuss.elastic.co/c/logstash discussion forum. + +## Developing + +### 1. Plugin Developement and Testing + +#### Code +- To get started, you'll need JRuby with the Bundler gem installed. + +- Create a new plugin or clone and existing from the GitHub [logstash-plugins](https://github.com/logstash-plugins) organization. We also provide [example plugins](https://github.com/logstash-plugins?query=example). + +- Install dependencies +```sh +bundle install +``` + +#### Test + +- Update your dependencies + +```sh +bundle install +``` + +- Run unit tests + +```sh +bundle exec rspec +``` + +- Run integration tests + +Dependencies: [Docker](http://docker.com) + +Before the test suite is run, we will load and run an +Elasticsearch instance within a docker container. This container +will be cleaned up when suite has finished. + +```sh +bundle exec rspec --tag integration +``` + +### 2. Running your unpublished Plugin in Logstash + +#### 2.1 Run in a local Logstash clone + +- Edit Logstash `Gemfile` and add the local plugin path, for example: +```ruby +gem "logstash-filter-awesome", :path => "/your/local/logstash-filter-awesome" +``` +- Install plugin +```sh +bin/plugin install --no-verify +``` +- Run Logstash with your plugin +```sh +bin/logstash -e 'filter {awesome {}}' +``` +At this point any modifications to the plugin code will be applied to this local Logstash setup. After modifying the plugin, simply rerun Logstash. + +#### 2.2 Run in an installed Logstash + +You can use the same **2.1** method to run your plugin in an installed Logstash by editing its `Gemfile` and pointing the `:path` to your local plugin development directory or you can build the gem and install it using: + +- Build your plugin gem +```sh +gem build logstash-filter-awesome.gemspec +``` +- Install the plugin from the Logstash home +```sh +bin/plugin install /your/local/plugin/logstash-filter-awesome.gem +``` +- Start Logstash and proceed to test the plugin + +## Contributing + +All contributions are welcome: ideas, patches, documentation, bug reports, complaints, and even something you drew up on a napkin. + +Programming is not a required skill. Whatever you've seen about open source and maintainers or community members saying "send patches or die" - you will not see that here. + +It is more important to the community that you are able to contribute. + +For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. diff --git a/Rakefile b/Rakefile new file mode 100644 index 0000000..d50e796 --- /dev/null +++ b/Rakefile @@ -0,0 +1 @@ +require "logstash/devutils/rake" diff --git a/lib/logstash/outputs/amazon_es.rb b/lib/logstash/outputs/amazon_es.rb new file mode 100644 index 0000000..b66a8ee --- /dev/null +++ b/lib/logstash/outputs/amazon_es.rb @@ -0,0 +1,471 @@ +# encoding: utf-8 +require "logstash/namespace" +require "logstash/environment" +require "logstash/outputs/base" +require "logstash/outputs/amazon_es/http_client" +require "logstash/json" +require "concurrent" +require "stud/buffer" +require "socket" +require "thread" +require "uri" + +# This output plugin emits data to Amazon Elasticsearch with support for signing requests using AWS V4 Signatures +# +# +# The configuration and experience is similar to logstash-output-elasticsearch plugin and we have added Signature V4 support for the same +# Some of the default configurations like connection timeouts have been tuned for optimal performance with Amazon Elasticsearch +# +# ==== Retry Policy +# +# This plugin uses the same retry policy as logstash-output-elasticsearch, It uses bulk API to optimize its +# imports into Elasticsearch.. These requests may experience either partial or total failures. +# Events are retried if they fail due to either a network error or the status codes +# 429 (the server is busy), 409 (Version Conflict), or 503 (temporary overloading/maintenance). +# +# The retry policy's logic can be described as follows: +# +# - Block and retry all events in the bulk response that experience transient network exceptions until +# a successful submission is received by Elasticsearch. +# - Retry the subset of sent events which resulted in ES errors of a retryable nature. +# - Events which returned retryable error codes will be pushed onto a separate queue for +# retrying events. Events in this queue will be retried a maximum of 5 times by default (configurable through :max_retries). +# The size of this queue is capped by the value set in :retry_max_items. +# - Events from the retry queue are submitted again when the queue reaches its max size or when +# the max interval time is reached. The max interval time is configurable via :retry_max_interval. +# - Events which are not retryable or have reached their max retry count are logged to stderr. + +class LogStash::Outputs::AmazonES < LogStash::Outputs::Base + attr_reader :client + + include Stud::Buffer + RETRYABLE_CODES = [409, 429, 503] + SUCCESS_CODES = [200, 201] + + config_name "amazon_es" + + # The index to write events to. This can be dynamic using the `%{foo}` syntax. + # The default value will partition your indices by day so you can more easily + # delete old data or only search specific date ranges. + # Indexes may not contain uppercase characters. + # For weekly indexes ISO 8601 format is recommended, eg. logstash-%{+xxxx.ww} + config :index, :validate => :string, :default => "logstash-%{+YYYY.MM.dd}" + + # The index type to write events to. Generally you should try to write only + # similar events to the same 'type'. String expansion `%{foo}` works here. + # + # Deprecated in favor of `document_type` field. + config :index_type, :validate => :string, :deprecated => "Please use the 'document_type' setting instead. It has the same effect, but is more appropriately named." + + # The document type to write events to. Generally you should try to write only + # similar events to the same 'type'. String expansion `%{foo}` works here. + # Unless you set 'document_type', the event 'type' will be used if it exists + # otherwise the document type will be assigned the value of 'logs' + config :document_type, :validate => :string + + # Starting in Logstash 1.3 (unless you set option `manage_template` to false) + # a default mapping template for Elasticsearch will be applied, if you do not + # already have one set to match the index pattern defined (default of + # `logstash-%{+YYYY.MM.dd}`), minus any variables. For example, in this case + # the template will be applied to all indices starting with `logstash-*` + # + # If you have dynamic templating (e.g. creating indices based on field names) + # then you should set `manage_template` to false and use the REST API to upload + # your templates manually. + config :manage_template, :validate => :boolean, :default => true + + # This configuration option defines how the template is named inside Elasticsearch. + # Note that if you have used the template management features and subsequently + # change this, you will need to prune the old template manually, e.g. + # + # `curl -XDELETE ` + # + # where `OldTemplateName` is whatever the former setting was. + config :template_name, :validate => :string, :default => "logstash" + + # You can set the path to your own template here, if you so desire. + # If not set, the included template will be used. + config :template, :validate => :path + + # Overwrite the current template with whatever is configured + # in the `template` and `template_name` directives. + config :template_overwrite, :validate => :boolean, :default => false + + # The document ID for the index. Useful for overwriting existing entries in + # Elasticsearch with the same ID. + config :document_id, :validate => :string + + # A routing override to be applied to all processed events. + # This can be dynamic using the `%{foo}` syntax. + config :routing, :validate => :string + + +# Set the endpoint of your Amazon Elasticsearch domain. This will always be array of size 1 +# ["foo.us-east-1.es.amazonaws.com"] + config :hosts, :validate => :array + + # You can set the remote port as part of the host, or explicitly here as well + config :port, :validate => :string, :default => 80 + + #Signing specific details + config :region, :validate => :string, :default => "us-east-1" + + # The credential resolution logic can be described as follows: + # + # - User passed aws_access_key_id and aws_secret_access_key in aes configuration + # - Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + # (RECOMMENDED since they are recognized by all the AWS SDKs and CLI except for .NET), + # or AWS_ACCESS_KEY and AWS_SECRET_KEY (only recognized by Java SDK) + # - Credential profiles file at the default location (~/.aws/credentials) shared by all AWS SDKs and the AWS CLI + # - Instance profile credentials delivered through the Amazon EC2 metadata service + config :aws_access_key_id, :validate => :string + config :aws_secret_access_key, :validate => :string + + + # This plugin uses the bulk index api for improved indexing performance. + # To make efficient bulk api calls, we will buffer a certain number of + # events before flushing that out to Elasticsearch. This setting + # controls how many events will be buffered before sending a batch + # of events. + config :flush_size, :validate => :number, :default => 500 + + # The amount of time since last flush before a flush is forced. + # + # This setting helps ensure slow event rates don't get stuck in Logstash. + # For example, if your `flush_size` is 100, and you have received 10 events, + # and it has been more than `idle_flush_time` seconds since the last flush, + # Logstash will flush those 10 events automatically. + # + # This helps keep both fast and slow log streams moving along in + # near-real-time. + config :idle_flush_time, :validate => :number, :default => 1 + + # The Elasticsearch action to perform. Valid actions are: `index`, `delete`. + # + # Use of this setting *REQUIRES* you also configure the `document_id` setting + # because `delete` actions all require a document id. + # + # What does each action do? + # + # - index: indexes a document (an event from Logstash). + # - delete: deletes a document by id + # - create: indexes a document, fails if a document by that id already exists in the index. + # - update: updates a document by id + # following action is not supported by HTTP protocol + # + # For more details on actions, check out the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation] + config :action, :validate => %w(index delete create update), :default => "index" + + # Username and password (only valid when protocol is HTTP; this setting works with HTTP or HTTPS auth) + config :user, :validate => :string + config :password, :validate => :password + + # HTTP Path at which the Elasticsearch server lives. Use this if you must run ES behind a proxy that remaps + # the root path for the Elasticsearch HTTP API lives. + config :path, :validate => :string, :default => "/" + + # Set max retry for each event + config :max_retries, :validate => :number, :default => 3 + + # Set retry policy for events that failed to send + config :retry_max_items, :validate => :number, :default => 5000 + + # Set max interval between bulk retries + config :retry_max_interval, :validate => :number, :default => 5 + + # Set the address of a forward HTTP proxy. Must be used with the 'http' protocol + # Can be either a string, such as 'http://localhost:123' or a hash in the form + # {host: 'proxy.org' port: 80 scheme: 'http'} + # Note, this is NOT a SOCKS proxy, but a plain HTTP proxy + config :proxy + + # Enable doc_as_upsert for update mode + # create a new document with source if document_id doesn't exists + config :doc_as_upsert, :validate => :boolean, :default => false + + # Set upsert content for update mode + # create a new document with this parameter as json string if document_id doesn't exists + config :upsert, :validate => :string, :default => "" + + public + def register + @hosts = Array(@hosts) + # retry-specific variables + @retry_flush_mutex = Mutex.new + @retry_teardown_requested = Concurrent::AtomicBoolean.new(false) + # needs flushing when interval + @retry_queue_needs_flushing = ConditionVariable.new + @retry_queue_not_full = ConditionVariable.new + @retry_queue = Queue.new + @submit_mutex = Mutex.new + + client_settings = {} + common_options = { + :client_settings => client_settings + } + + client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes + @logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path]) + + if @hosts.nil? || @hosts.empty? + @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost") + @hosts = ["localhost"] + end + + client_settings.merge! setup_proxy() + common_options.merge! setup_basic_auth() + + # Update API setup + update_options = { + :upsert => @upsert, + :doc_as_upsert => @doc_as_upsert + } + common_options.merge! update_options if @action == 'update' + + @client = LogStash::Outputs::AES::HttpClient.new( + common_options.merge(:hosts => @hosts, :port => @port, :region => @region, :aws_access_key_id => @aws_access_key_id, :aws_secret_access_key => @aws_secret_access_key) + ) + + if @manage_template + begin + @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) + @client.template_install(@template_name, get_template, @template_overwrite) + rescue => e + @logger.error("Failed to install template: #{e.message}") + end + end + + @logger.info("New Elasticsearch output", :hosts => @hosts, :port => @port) + + @client_idx = 0 + + buffer_initialize( + :max_items => @flush_size, + :max_interval => @idle_flush_time, + :logger => @logger + ) + + @retry_timer_thread = Thread.new do + loop do + sleep(@retry_max_interval) + @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } + end + end + + @retry_thread = Thread.new do + while @retry_teardown_requested.false? + @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.wait(@retry_flush_mutex) } + retry_flush + end + end + end # def register + + public + def get_template + if @template.nil? + @template = ::File.expand_path('amazon_es/elasticsearch-template.json', ::File.dirname(__FILE__)) + if !File.exists?(@template) + raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" + end + end + template_json = IO.read(@template).gsub(/\n/,'') + template = LogStash::Json.load(template_json) + @logger.info("Using mapping template", :template => template) + return template + end # def get_template + + public + def receive(event) + return unless output?(event) + + # block until we have not maxed out our + # retry queue. This is applying back-pressure + # to slow down the receive-rate + @retry_flush_mutex.synchronize { + @retry_queue_not_full.wait(@retry_flush_mutex) while @retry_queue.size > @retry_max_items + } + + event['@metadata']['retry_count'] = 0 + + # Set the 'type' value for the index. + type = if @document_type + event.sprintf(@document_type) + elsif @index_type # deprecated + event.sprintf(@index_type) + else + event["type"] || "logs" + end + + params = { + :_id => @document_id ? event.sprintf(@document_id) : nil, + :_index => event.sprintf(@index), + :_type => type, + :_routing => @routing ? event.sprintf(@routing) : nil + } + + params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" + + buffer_receive([event.sprintf(@action), params, event]) + end # def receive + + public + # The submit method can be called from both the + # Stud::Buffer flush thread and from our own retry thread. + def submit(actions) + @submit_mutex.synchronize do + es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } + + bulk_response = @client.bulk(es_actions) + + if bulk_response["errors"] + actions_to_retry = [] + + bulk_response['items'].each_with_index do |item,idx| + action = es_actions[idx] + action_type, props = item.first # These are all hashes with one value, so we destructure them here + + status = props['status'] + error = props['error'] + + if RETRYABLE_CODES.include?(status) + @logger.warn "retrying failed action with response code: #{status}" + actions_to_retry << action + elsif not SUCCESS_CODES.include?(status) + @logger.warn "failed action", status: status, error: error, action: action + end + end + + retry_push(actions_to_retry) unless actions_to_retry.empty? + end + end + end + + # When there are exceptions raised upon submission, we raise an exception so that + # Stud::Buffer will retry to flush + public + def flush(actions, teardown = false) + begin + submit(actions) + rescue Manticore::SocketException => e + # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL) + # and let the user sort it out from there + @logger.error( + "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}',"+ + " but Elasticsearch appears to be unreachable or down!", + :client_config => @client.client_options, + :error_message => e.message + ) + @logger.debug("Failed actions for last bad bulk request!", :actions => actions) + rescue => e + # For all other errors print out full connection issues + @logger.error( + "Attempted to send a bulk request to Elasticsearch configured at '#{@client.client_options[:hosts]}'," + + " but an error occurred and it failed! Are you sure you can reach elasticsearch from this machine using " + + "the configuration provided?", + :client_config => @client.client_options, + :error_message => e.message, + :error_class => e.class.name, + :backtrace => e.backtrace + ) + + @logger.debug("Failed actions for last bad bulk request!", :actions => actions) + + raise e + end + end # def flush + + public + def teardown + + @retry_teardown_requested.make_true + # First, make sure retry_timer_thread is stopped + # to ensure we do not signal a retry based on + # the retry interval. + Thread.kill(@retry_timer_thread) + @retry_timer_thread.join + # Signal flushing in the case that #retry_flush is in + # the process of waiting for a signal. + @retry_flush_mutex.synchronize { @retry_queue_needs_flushing.signal } + # Now, #retry_flush is ensured to not be in a state of + # waiting and can be safely joined into the main thread + # for further final execution of an in-process remaining call. + @retry_thread.join + + # execute any final actions along with a proceeding retry for any + # final actions that did not succeed. + buffer_flush(:final => true) + retry_flush + end + + private + def setup_proxy + return {} unless @proxy + + # Symbolize keys + proxy = if @proxy.is_a?(Hash) + Hash[@proxy.map {|k,v| [k.to_sym, v]}] + elsif @proxy.is_a?(String) + @proxy + else + raise LogStash::ConfigurationError, "Expected 'proxy' to be a string or hash, not '#{@proxy}''!" + end + + return {:proxy => proxy} + end + + private + def setup_basic_auth + return {} unless @user && @password + + { + :user => ::URI.escape(@user, "@:"), + :password => ::URI.escape(@password.value, "@:") + } + end + + + private + # in charge of submitting any actions in @retry_queue that need to be + # retried + # + # This method is not called concurrently. It is only called by @retry_thread + # and once that thread is ended during the teardown process, a final call + # to this method is done upon teardown in the main thread. + def retry_flush() + unless @retry_queue.empty? + buffer = @retry_queue.size.times.map do + next_action, next_doc, next_event = @retry_queue.pop + next_event['@metadata']['retry_count'] += 1 + + if next_event['@metadata']['retry_count'] > @max_retries + @logger.error "too many attempts at sending event. dropping: #{next_event}" + nil + else + [next_action, next_doc, next_event] + end + end.compact + + submit(buffer) unless buffer.empty? + end + + @retry_flush_mutex.synchronize { + @retry_queue_not_full.signal if @retry_queue.size < @retry_max_items + } + end + + private + def retry_push(actions) + Array(actions).each{|action| @retry_queue << action} + @retry_flush_mutex.synchronize { + @retry_queue_needs_flushing.signal if @retry_queue.size >= @retry_max_items + } + end + + @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-amazon_es-/ } + + @@plugins.each do |plugin| + name = plugin.name.split('-')[-1] + require "logstash/outputs/amazon_es/#{name}" + end + +end diff --git a/lib/logstash/outputs/amazon_es/aws_transport.rb b/lib/logstash/outputs/amazon_es/aws_transport.rb new file mode 100644 index 0000000..07e6245 --- /dev/null +++ b/lib/logstash/outputs/amazon_es/aws_transport.rb @@ -0,0 +1,103 @@ +require 'faraday' +require 'faraday_middleware' +require 'aws-sdk-core' +require 'elasticsearch' +require 'elasticsearch-transport' +require 'manticore' +require 'faraday/adapter/manticore' + +# +require 'uri' + +require_relative "aws_v4_signer" + + +module Elasticsearch + module Transport + module Transport + module HTTP + + # Transport implementation, which V4 Signs requests using the [_Faraday_](https://rubygems.org/gems/faraday) + # library for abstracting the HTTP client. + # + # @see Transport::Base + # + class AWS + include Elasticsearch::Transport::Transport::Base + + + DEFAULT_PORT = 80 + DEFAULT_PROTOCOL = "http" + + CredentialConfig = Struct.new( + :access_key_id, + :secret_access_key, + :session_token, + :profile + ) + + # Performs the request by invoking {Transport::Base#perform_request} with a block. + # + # @return [Response] + # @see Transport::Base#perform_request + # + def perform_request(method, path, params={}, body=nil) + super do |connection, url| + response = connection.connection.run_request \ + method.downcase.to_sym, + url, + ( body ? __convert_to_json(body) : nil ), + {} + + Response.new response.status, response.body, response.headers + end + end + + # Builds and returns a collection of connections. + # + # @return [Connections::Collection] + # + def __build_connections + region = options[:region] + access_key_id = options[:aws_access_key_id] || nil + secret_access_key = options[:aws_secret_access_key] || nil + session_token = options[:session_token] || nil + profile = options[:profile] || 'default' + + credential_config = CredentialConfig.new(access_key_id, secret_access_key, session_token, profile) + credentials = Aws::CredentialProviderChain.new(credential_config).resolve + + Connections::Collection.new \ + :connections => hosts.map { |host| + host[:protocol] = host[:scheme] || DEFAULT_PROTOCOL + host[:port] ||= DEFAULT_PORT + url = __full_url(host) + + aes_connection = ::Faraday::Connection.new(url, (options[:transport_options] || {})) do |faraday| + faraday.request :aws_v4_signer, + credentials: credentials, + service_name: 'es', + region: region + faraday.adapter :manticore + end + + Connections::Connection.new \ + :host => host, + :connection => aes_connection + }, + :selector_class => options[:selector_class], + :selector => options[:selector] + end + + # Returns an array of implementation specific connection errors. + # + # @return [Array] + # + def host_unreachable_exceptions + [::Faraday::Error::ConnectionFailed, ::Faraday::Error::TimeoutError] + end + end + end + end + end +end \ No newline at end of file diff --git a/lib/logstash/outputs/amazon_es/aws_v4_signer.rb b/lib/logstash/outputs/amazon_es/aws_v4_signer.rb new file mode 100644 index 0000000..4213add --- /dev/null +++ b/lib/logstash/outputs/amazon_es/aws_v4_signer.rb @@ -0,0 +1,7 @@ +require 'faraday' +require_relative 'aws_v4_signer_impl' + +module FaradayMiddleware + Faraday::Request.register_middleware :aws_v4_signer => lambda { AwsV4Signer } +end + diff --git a/lib/logstash/outputs/amazon_es/aws_v4_signer_impl.rb b/lib/logstash/outputs/amazon_es/aws_v4_signer_impl.rb new file mode 100644 index 0000000..fbe6845 --- /dev/null +++ b/lib/logstash/outputs/amazon_es/aws_v4_signer_impl.rb @@ -0,0 +1,58 @@ +require 'aws-sdk-core' +require 'faraday' + + +class AwsV4Signer < Faraday::Middleware + class Request + def initialize(env) + @env = env + end + + def headers + @env.request_headers + end + + def set_header(header) + @env.request_headers = header + end + + def body + @env.body || '' + end + + def endpoint + @env.url + end + + def http_method + @env.method.to_s.upcase + end + end + + def initialize(app, options = nil) + super(app) + credentials = options.fetch(:credentials) + service_name = options.fetch(:service_name) + region = options.fetch(:region) + @signer = Aws::Signers::V4.new(credentials, service_name, region) + @net_http = app.is_a?(Faraday::Adapter::NetHttp) + end + + def call(env) + normalize_for_net_http!(env) + req = Request.new(env) + @signer.sign(req) + @app.call(env) + end + + private + def normalize_for_net_http!(env) + return unless @net_http + + if Net::HTTP::HAVE_ZLIB + env.request_headers['Accept-Encoding'] ||= 'gzip;q=1.0,deflate;q=0.6,identity;q=0.3' + end + + env.request_headers['Accept'] ||= '*/*' + end +end \ No newline at end of file diff --git a/lib/logstash/outputs/amazon_es/elasticsearch-template.json b/lib/logstash/outputs/amazon_es/elasticsearch-template.json new file mode 100644 index 0000000..fb74d72 --- /dev/null +++ b/lib/logstash/outputs/amazon_es/elasticsearch-template.json @@ -0,0 +1,41 @@ +{ + "template" : "logstash-*", + "settings" : { + "index.refresh_interval" : "5s" + }, + "mappings" : { + "_default_" : { + "_all" : {"enabled" : true, "omit_norms" : true}, + "dynamic_templates" : [ { + "message_field" : { + "match" : "message", + "match_mapping_type" : "string", + "mapping" : { + "type" : "string", "index" : "analyzed", "omit_norms" : true + } + } + }, { + "string_fields" : { + "match" : "*", + "match_mapping_type" : "string", + "mapping" : { + "type" : "string", "index" : "analyzed", "omit_norms" : true, + "fields" : { + "raw" : {"type": "string", "index" : "not_analyzed", "ignore_above" : 256} + } + } + } + } ], + "properties" : { + "@version": { "type": "string", "index": "not_analyzed" }, + "geoip" : { + "type" : "object", + "dynamic": true, + "properties" : { + "location" : { "type" : "geo_point" } + } + } + } + } + } +} diff --git a/lib/logstash/outputs/amazon_es/http_client.rb b/lib/logstash/outputs/amazon_es/http_client.rb new file mode 100644 index 0000000..9efe52e --- /dev/null +++ b/lib/logstash/outputs/amazon_es/http_client.rb @@ -0,0 +1,117 @@ +require "logstash/outputs/amazon_es" +require "cabin" +require "base64" +require "elasticsearch" + +require_relative "aws_transport" + +module LogStash::Outputs::AES + class HttpClient + attr_reader :client, :options, :client_options + DEFAULT_OPTIONS = { + :port => 80 + } + + def initialize(options={}) + @logger = Cabin::Channel.get + @options = DEFAULT_OPTIONS.merge(options) + @client = build_client(@options) + end + + def template_install(name, template, force=false) + if template_exists?(name) && !force + @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) + return + end + template_put(name, template) + end + + def bulk(actions) + bulk_body = actions.collect do |action, args, source| + if action == 'update' + if args[:_id] + source = { 'doc' => source } + if @options[:doc_as_upsert] + source['doc_as_upsert'] = true + else + source['upsert'] = args[:_upsert] if args[:_upsert] + end + else + raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.") + end + end + + args.delete(:_upsert) + + if source + next [ { action => args }, source ] + else + next { action => args } + end + end.flatten + + bulk_response = @client.bulk(:body => bulk_body) + + self.class.normalize_bulk_response(bulk_response) + end + + private + def build_client(options) + hosts = options[:hosts] + port = options[:port] + client_settings = options[:client_settings] || {} + + uris = hosts.map do |host| + "http://#{host}:#{port}#{client_settings[:path]}".gsub(/[\/]+$/,'') + end + + @client_options = { + :hosts => uris, + :region => options[:region], + :aws_access_key_id => options[:aws_access_key_id], + :aws_secret_access_key => options[:aws_secret_access_key], + :transport_options => { + :request => {:open_timeout => 0, :timeout => 60}, # ELB timeouts are set at 60 + :proxy => client_settings[:proxy], + }, + :transport_class => Elasticsearch::Transport::Transport::HTTP::AWS + } + + if options[:user] && options[:password] then + token = Base64.strict_encode64(options[:user] + ":" + options[:password]) + @client_options[:headers] = { "Authorization" => "Basic #{token}" } + end + + Elasticsearch::Client.new(client_options) + end + + def self.normalize_bulk_response(bulk_response) + if bulk_response["errors"] + # The structure of the response from the REST Bulk API is follows: + # {"took"=>74, "errors"=>true, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", + # "_type"=>"logs", + # "_id"=>"AUxTS2C55Jrgi-hC6rQF", + # "_version"=>1, + # "status"=>400, + # "error"=>"MapperParsingException[failed to parse]..."}}]} + # where each `item` is a hash of {OPTYPE => Hash[]}. calling first, will retrieve + # this hash as a single array with two elements, where the value is the second element (i.first[1]) + # then the status of that item is retrieved. + {"errors" => true, "statuses" => bulk_response["items"].map { |i| i.first[1]['status'] }} + else + {"errors" => false} + end + end + + def template_exists?(name) + @client.indices.get_template(:name => name) + return true + rescue Elasticsearch::Transport::Transport::Errors::NotFound + return false + end + + def template_put(name, template) + @client.indices.put_template(:name => name, :body => template) + end + end +end diff --git a/logstash-output-amazon_es-0.1.0.gem b/logstash-output-amazon_es-0.1.0.gem new file mode 100644 index 0000000..3b9a22f Binary files /dev/null and b/logstash-output-amazon_es-0.1.0.gem differ diff --git a/logstash-output-amazon_es.gemspec b/logstash-output-amazon_es.gemspec new file mode 100644 index 0000000..a0f9473 --- /dev/null +++ b/logstash-output-amazon_es.gemspec @@ -0,0 +1,42 @@ +Gem::Specification.new do |s| + + s.name = 'logstash-output-amazon_es' + s.version = '0.1.0' + s.licenses = ['apache-2.0'] + s.summary = "Logstash Output to Amazon Elasticsearch Service" + s.description = "Output events to Amazon Elasticsearch Service with V4 signing" + s.authors = ["Amazon"] + s.email = 'cloud-search-feedback@amazon.com' + s.homepage = "http://logstash.net/" + s.require_paths = ["lib"] + + # Files + s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT'] + + # Tests + s.test_files = s.files.grep(%r{^(test|spec|features)/}) + + # Special flag to let us know this is actually a logstash plugin + s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } + + # Gem dependencies + s.add_runtime_dependency 'concurrent-ruby' + s.add_runtime_dependency 'elasticsearch', ['>= 1.0.10', '~> 1.0'] + s.add_runtime_dependency 'stud', ['>= 0.0.17', '~> 0.0'] + s.add_runtime_dependency 'cabin', ['~> 0.6'] + s.add_runtime_dependency "logstash-core", '>= 1.4.0', '< 2.0.0' + s.add_runtime_dependency "aws-sdk", ['>= 2.1.14', '~> 2.1'] + s.add_runtime_dependency "faraday", '~> 0.9.1' + s.add_runtime_dependency "faraday_middleware", '~> 0.10.0' + + s.add_development_dependency 'ftw', '~> 0.0.42' + s.add_development_dependency 'logstash-input-generator' + + if RUBY_PLATFORM == 'java' + s.platform = RUBY_PLATFORM + s.add_runtime_dependency "manticore", '~> 0.4.2' + end + + s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'longshoreman' +end diff --git a/spec/amazon_es_spec_helper.rb b/spec/amazon_es_spec_helper.rb new file mode 100644 index 0000000..d13ed86 --- /dev/null +++ b/spec/amazon_es_spec_helper.rb @@ -0,0 +1,69 @@ +require "logstash/devutils/rspec/spec_helper" +require "ftw" +require "logstash/plugin" +require "logstash/json" +require "stud/try" +require "longshoreman" + +CONTAINER_NAME = "logstash-output-amazon-es-#{rand(999).to_s}" +CONTAINER_IMAGE = "elasticsearch" +CONTAINER_TAG = "1.6" + +DOCKER_INTEGRATION = ENV["DOCKER_INTEGRATION"] + +module ESHelper + def get_host + DOCKER_INTEGRATION ? Longshoreman.new.get_host_ip : "127.0.0.1" + end + + def get_port + return 9200 unless DOCKER_INTEGRATION + + container = Longshoreman::Container.new + container.get(CONTAINER_NAME) + container.rport(9200) + end + + def get_client + Elasticsearch::Client.new(:host => "#{get_host}:#{get_port}") + end +end + + +RSpec.configure do |config| + config.include ESHelper + + + if DOCKER_INTEGRATION + # this :all hook gets run before every describe block that is tagged with :integration => true. + config.before(:all, :integration => true) do + + + # check if container exists already before creating new one. + begin + ls = Longshoreman::new + ls.container.get(CONTAINER_NAME) + rescue Docker::Error::NotFoundError + Longshoreman.new("#{CONTAINER_IMAGE}:#{CONTAINER_TAG}", CONTAINER_NAME) + # TODO(talevy): verify ES is running instead of static timeout + sleep 10 + end + end + + # we want to do a final cleanup after all :integration runs, + # but we don't want to clean up before the last block. + # This is a final blind check to see if the ES docker container is running and + # needs to be cleaned up. If no container can be found and/or docker is not + # running on the system, we do nothing. + config.after(:suite) do + # only cleanup docker container if system has docker and the container is running + begin + ls = Longshoreman::new + ls.container.get(CONTAINER_NAME) + ls.cleanup + rescue Docker::Error::NotFoundError, Excon::Errors::SocketError + # do nothing + end + end + end +end diff --git a/spec/unit/outputs/amazon_es_spec.rb b/spec/unit/outputs/amazon_es_spec.rb new file mode 100644 index 0000000..ffe6fec --- /dev/null +++ b/spec/unit/outputs/amazon_es_spec.rb @@ -0,0 +1,50 @@ +require_relative "../../../spec/amazon_es_spec_helper" + +describe "outputs/amazon_es" do + describe "http client create" do + require "logstash/outputs/amazon_es" + require "elasticsearch" + + let(:options) { + { + "index" => "my-index", + "hosts" => "localhost", + "path" => "some-path" + } + } + + let(:eso) {LogStash::Outputs::AmazonES.new(options)} + + let(:manticore_host) { + eso.client.send(:client).transport.options[:hosts].first + } + + around(:each) do |block| + thread = eso.register + block.call() + thread.kill() + end + + describe "with path" do + it "should properly create a URI with the path" do + expect(eso.path).to eql(options["path"]) + end + + + it "should properly set the path on the HTTP client" do + expect(manticore_host).to include("/" + options["path"]) + end + + context "with extra slashes" do + let(:path) { "/slashed-path/ "} + let(:eso) { + LogStash::Outputs::AmazonES.new(options.merge("path" => "/some-path/")) + } + + it "should properly set the path on the HTTP client without adding slashes" do + expect(manticore_host).to include(options["path"]) + end + end + end + end +end diff --git a/spec/unit/outputs/elasticsearch/protocol_spec.rb b/spec/unit/outputs/elasticsearch/protocol_spec.rb new file mode 100644 index 0000000..77bf995 --- /dev/null +++ b/spec/unit/outputs/elasticsearch/protocol_spec.rb @@ -0,0 +1,36 @@ +require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/amazon_es/http_client" +require "java" + +describe LogStash::Outputs::AES::HttpClient do + context "successful" do + it "should map correctly" do + bulk_response = {"took"=>74, "errors"=>false, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", + "_type"=>"logs", "_id"=>"AUxTS2C55Jrgi-hC6rQF", + "_version"=>1, "status"=>201}}]} + actual = LogStash::Outputs::AES::HttpClient.normalize_bulk_response(bulk_response) + insist { actual } == {"errors"=> false} + end + end + + context "contains failures" do + it "should map correctly" do + bulk_response = {"took"=>71, "errors"=>true, + "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", + "_type"=>"logs", "_id"=>"AUxTQ_OI5Jrgi-hC6rQB", "status"=>400, + "error"=>"MapperParsingException[failed to parse]..."}}]} + actual = LogStash::Outputs::AES::HttpClient.normalize_bulk_response(bulk_response) + insist { actual } == {"errors"=> true, "statuses"=> [400]} + end + end + + describe "sniffing" do + let(:base_options) { {:hosts => ["127.0.0.1"] }} + let(:client) { LogStash::Outputs::AES::HttpClient.new(base_options.merge(client_opts)) } + let(:transport) { client.client.transport } + + before do + allow(transport).to receive(:reload_connections!) + end + end +end diff --git a/spec/unit/outputs/elasticsearch_proxy_spec.rb b/spec/unit/outputs/elasticsearch_proxy_spec.rb new file mode 100644 index 0000000..ab9c87b --- /dev/null +++ b/spec/unit/outputs/elasticsearch_proxy_spec.rb @@ -0,0 +1,58 @@ +require_relative "../../../spec/amazon_es_spec_helper" +require 'stud/temporary' +require 'elasticsearch' +require "logstash/outputs/amazon_es" + +describe "Proxy option" do + let(:settings) { + { + "hosts" => "node01", + "proxy" => proxy + } + } + subject { + LogStash::Outputs::AmazonES.new(settings) + } + + before do + allow(::Elasticsearch::Client).to receive(:new).with(any_args) + end + + describe "valid configs" do + before do + subject.register + end + + context "when specified as a string" do + let(:proxy) { "http://127.0.0.1:1234" } + + it "should set the proxy to the exact value" do + expect(::Elasticsearch::Client).to have_received(:new) do |options| + expect(options[:transport_options][:proxy]).to eql(proxy) + end + end + end + + context "when specified as a hash" do + let(:proxy) { {"hosts" => "127.0.0.1", "protocol" => "http"} } + + it "should pass through the proxy values as symbols" do + expected = {:hosts => proxy["hosts"], :protocol => proxy["protocol"]} + expect(::Elasticsearch::Client).to have_received(:new) do |options| + expect(options[:transport_options][:proxy]).to eql(expected) + end + end + end + end + + describe "invalid configs" do + let(:proxy) { ["bad", "stuff"] } + + it "should have raised an exception" do + expect { + subject.register + }.to raise_error(LogStash::ConfigurationError) + end + end + +end