Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace direct access of hidden indices with system indices api #12279

Merged
merged 9 commits into from
Oct 6, 2020
120 changes: 90 additions & 30 deletions x-pack/lib/config_management/elasticsearch_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ class ElasticsearchSource < LogStash::Config::Source::Base

class RemoteConfigError < LogStash::Error; end

PIPELINE_INDEX = ".logstash"
# exclude basic
VALID_LICENSES = %w(trial standard gold platinum enterprise)
FEATURE_INTERNAL = 'management'
Expand Down Expand Up @@ -50,6 +49,21 @@ def config_conflict?
false
end

# decide using system indices api (7.10+) or legacy api (< 7.10) base on elasticsearch server version
def get_pipeline_fetcher
response = client.get("/")

if response["error"]
raise RemoteConfigError, "Cannot find elasticsearch version, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end

logger.debug("Reading configuration from Elasticsearch version {}", response["version"]["number"])
version_number = response["version"]["number"].split(".")
first = version_number[0].to_i
second = version_number[1].to_i
(first >= 8 || (first == 7 && second >= 10))? SystemIndicesFetcher.new: LegacyHiddenIndicesFetcher.new
roaksoax marked this conversation as resolved.
Show resolved Hide resolved
end

def pipeline_configs
logger.trace("Fetch remote config pipeline", :pipeline_ids => pipeline_ids)

Expand All @@ -63,33 +77,21 @@ def pipeline_configs
end
end

response = fetch_config(pipeline_ids)

if response["error"]
raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end

if response["docs"].nil?
logger.debug("Server returned an unknown or malformed document structure", :response => response)
raise RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure"
end
fetcher = get_pipeline_fetcher
fetcher.fetch_config(pipeline_ids, client)

# Cache pipelines to handle the case where a remote configuration error can render a pipeline unusable
# it is not reloadable
@cached_pipelines = response["docs"].collect do |document|
get_pipeline(document)
@cached_pipelines = pipeline_ids.collect do |pid|
get_pipeline(pid, fetcher)
end.compact
end

def get_pipeline(response)
pipeline_id = response["_id"]

if response["found"] == false
def get_pipeline(pipeline_id, fetcher)
unless fetcher.config_exist?(pipeline_id)
logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id)
return nil
end

config_string = response.fetch("_source", {})["pipeline"]
config_string = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline"]

raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?

Expand All @@ -100,7 +102,7 @@ def get_pipeline(response)
settings.set("pipeline.id", pipeline_id)

# override global settings with pipeline settings from ES, if any
pipeline_settings = response["_source"]["pipeline_settings"]
pipeline_settings = fetcher.get_single_pipeline_setting(pipeline_id)["pipeline_settings"]
unless pipeline_settings.nil?
pipeline_settings.each do |setting, value|
if SUPPORTED_PIPELINE_SETTINGS.include? setting
Expand All @@ -127,15 +129,6 @@ def build_client
es.build_client
end

def fetch_config(pipeline_ids)
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
client.post(config_path, {}, request_body_string)
end

def config_path
"#{PIPELINE_INDEX}/_mget"
end

def populate_license_state(xpack_info)
if xpack_info.failed?
{
Expand Down Expand Up @@ -193,5 +186,72 @@ def client
@client ||= build_client
end
end

module Fetcher
def config_exist?(pipeline_id)
@response.has_key?(pipeline_id)
end

def fetch_config(pipeline_ids, client) end
def get_single_pipeline_setting(pipeline_id) end
end

class SystemIndicesFetcher
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this might be simpler if we kept the response object in this class, and had methods like 'config_exists?(pipeline_id)', get_pipeline_config(pipeline_id) and get_pipeline_settings(pipeline_id). This would avoid having to pass around the response and fetcher objects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This question to me is if we want to further refactor the existing code or the goal is to apply the new API in a manageable way. This involved thirty lines of code, mainly moving get_pipeline and the fetcher in a OO way, which is not a big change. At the same time, the readability of the current version is quite similar to the existing one. I am opened to the suggestion. Do you think we should refactor the code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure there is a huge amount of refactoring to the existing code either way, beyond what is already present; you already have the method fetcher.get_single_pipeline_setting(response, pipeline_id)["pipeline"]
which could change to something like fetched_config.get_pipeline_settings(pipeline_id), although I do realize that there is more work to be done in the extra classes that you have added.

I'm comfortable either way, what you have appears to be functionally correct after running this code locally against Elasticsearch 7.9 and 8.0

include LogStash::Util::Loggable, Fetcher

SYSTEM_INDICES_API_PATH = "_logstash/pipeline"

def fetch_config(pipeline_ids, client)
path_ids = pipeline_ids.join(",")
response = client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}")

if response["error"]
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end

@response = response
end

def get_single_pipeline_setting(pipeline_id)
@response.fetch(pipeline_id, {})
end
end

# clean up LegacyHiddenIndicesFetcher https://github.com/elastic/logstash/issues/12291
class LegacyHiddenIndicesFetcher
include LogStash::Util::Loggable, Fetcher

PIPELINE_INDEX = ".logstash"

def fetch_config(pipeline_ids, client)
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
response = client.post("#{PIPELINE_INDEX}/_mget", {}, request_body_string)

if response["error"]
raise ElasticsearchSource::RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end

if response["docs"].nil?
logger.debug("Server returned an unknown or malformed document structure", :response => response)
raise ElasticsearchSource::RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure"
end

@response = format_response(response)
end

def get_single_pipeline_setting(pipeline_id)
@response.fetch(pipeline_id, {}).fetch("_source", {})
end

private
# transform legacy response to be similar to system indices response
def format_response(response)
response["docs"].map { |pipeline|
{pipeline["_id"] => pipeline} if pipeline.fetch("found", false)
}.compact
.reduce({}, :merge)
roaksoax marked this conversation as resolved.
Show resolved Hide resolved
end
end

end
end
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"hello" => nil
}

cleanup_elasticsearch(".logstash*")
cleanup_system_indices(@pipelines.keys)
cleanup_elasticsearch(".monitoring-logstash*")

@pipelines.each do |pipeline_id, config|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def logstash_options(pipeline_id, wait_condition)
def start_services(elasticsearch_options, logstash_options)
@elasticsearch_service = elasticsearch(elasticsearch_options)

cleanup_elasticsearch(".logstash*")
cleanup_system_indices([PIPELINE_ID])

config = "input { generator { count => 100 } tcp { port => 6000 } } output { null {} }"
push_elasticsearch_config(PIPELINE_ID, config)
Expand Down
27 changes: 25 additions & 2 deletions x-pack/qa/integration/support/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "fileutils"
require "stud/try"
require "open3"
require "time"

VERSIONS_YML_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "..", "versions.yml")
VERSION_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "VERSION")
Expand Down Expand Up @@ -94,15 +95,37 @@ def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password}
Elasticsearch::Client.new(options)
end

def es_version
response = elasticsearch_client.perform_request(:get, "")
response.body["version"]["number"].gsub(/(\d+\.\d+)\..+/, '\1').to_f
end

def push_elasticsearch_config(pipeline_id, config)
elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config }
if es_version >= 7.10
elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {},
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" },
:pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601})
else
elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config }
end
end

def cleanup_elasticsearch(index = MONITORING_INDEXES)
elasticsearch_client.indices.delete :index => index
elasticsearch_client.indices.refresh
end

def cleanup_system_indices(pipeline_ids)
pipeline_ids.each do |id|
begin
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts ".logstash can be empty #{e.message}"
end
end
elasticsearch_client.indices.refresh
end

def logstash_command_append(cmd, argument, value)
if cmd !~ /#{Regexp.escape(argument)}/
cmd << " #{argument} #{value}"
Expand Down Expand Up @@ -136,4 +159,4 @@ def verify_response!(cmd, response)
unless response.successful?
raise "Something went wrong when installing xpack,\ncmd: #{cmd}\nresponse: #{response}"
end
end
end
Loading