Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace direct access of hidden indices with system indices api #12279

Merged
merged 9 commits into from
Oct 6, 2020
31 changes: 9 additions & 22 deletions x-pack/lib/config_management/elasticsearch_source.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class ElasticsearchSource < LogStash::Config::Source::Base

class RemoteConfigError < LogStash::Error; end

PIPELINE_INDEX = ".logstash"
SYSTEM_INDICES_API_PATH = "_logstash/pipeline"
# exclude basic
VALID_LICENSES = %w(trial standard gold platinum enterprise)
FEATURE_INTERNAL = 'management'
Expand Down Expand Up @@ -69,27 +69,18 @@ def pipeline_configs
raise RemoteConfigError, "Cannot find find configuration for pipeline_id: #{pipeline_ids}, server returned status: `#{response["status"]}`, message: `#{response["error"]}`"
end

if response["docs"].nil?
logger.debug("Server returned an unknown or malformed document structure", :response => response)
raise RemoteConfigError, "Elasticsearch returned an unknown or malformed document structure"
end

# Cache pipelines to handle the case where a remote configuration error can render a pipeline unusable
# it is not reloadable
@cached_pipelines = response["docs"].collect do |document|
get_pipeline(document)
@cached_pipelines = pipeline_ids.collect do |pid|
get_pipeline(pid, response)
end.compact
end

def get_pipeline(response)
pipeline_id = response["_id"]

if response["found"] == false
def get_pipeline(pipeline_id, response)
if response.has_key?(pipeline_id) == false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consider using unless instead of if X == false, eg unless response.has_key?(pipeline_id)

logger.debug("Could not find a remote configuration for a specific `pipeline_id`", :pipeline_id => pipeline_id)
return nil
end

config_string = response.fetch("_source", {})["pipeline"]
config_string = response.fetch(pipeline_id, {})["pipeline"]

raise RemoteConfigError, "Empty configuration for pipeline_id: #{pipeline_id}" if config_string.nil? || config_string.empty?

Expand All @@ -100,7 +91,7 @@ def get_pipeline(response)
settings.set("pipeline.id", pipeline_id)

# override global settings with pipeline settings from ES, if any
pipeline_settings = response["_source"]["pipeline_settings"]
pipeline_settings = response[pipeline_id]["pipeline_settings"]
unless pipeline_settings.nil?
pipeline_settings.each do |setting, value|
if SUPPORTED_PIPELINE_SETTINGS.include? setting
Expand Down Expand Up @@ -128,12 +119,8 @@ def build_client
end

def fetch_config(pipeline_ids)
request_body_string = LogStash::Json.dump({ "docs" => pipeline_ids.collect { |pipeline_id| { "_id" => pipeline_id } } })
client.post(config_path, {}, request_body_string)
end

def config_path
"#{PIPELINE_INDEX}/_mget"
path_ids = pipeline_ids.join(",")
client.get("#{SYSTEM_INDICES_API_PATH}/#{path_ids}")
end

def populate_license_state(xpack_info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"hello" => nil
}

cleanup_elasticsearch(".logstash*")
cleanup_system_indices(@pipelines.keys)
cleanup_elasticsearch(".monitoring-logstash*")

@pipelines.each do |pipeline_id, config|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def logstash_options(pipeline_id, wait_condition)
def start_services(elasticsearch_options, logstash_options)
@elasticsearch_service = elasticsearch(elasticsearch_options)

cleanup_elasticsearch(".logstash*")
cleanup_system_indices([PIPELINE_ID])

config = "input { generator { count => 100 } tcp { port => 6000 } } output { null {} }"
push_elasticsearch_config(PIPELINE_ID, config)
Expand Down
16 changes: 15 additions & 1 deletion x-pack/qa/integration/support/helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
require "fileutils"
require "stud/try"
require "open3"
require "time"

VERSIONS_YML_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "..", "versions.yml")
VERSION_PATH = File.join(File.dirname(__FILE__), "..", "..", "..", "VERSION")
Expand Down Expand Up @@ -95,14 +96,27 @@ def elasticsearch_client(options = { :url => "http://elastic:#{elastic_password}
end

def push_elasticsearch_config(pipeline_id, config)
elasticsearch_client.index :index => '.logstash', :type => "_doc", id: pipeline_id, :body => { :pipeline => config }
elasticsearch_client.perform_request(:put, "_logstash/pipeline/#{pipeline_id}", {},
{ :pipeline => config, :username => "log.stash", :pipeline_metadata => {:version => "1" },
:pipeline_settings => {"pipeline.batch.delay": "50"}, :last_modified => Time.now.utc.iso8601})
end

def cleanup_elasticsearch(index = MONITORING_INDEXES)
elasticsearch_client.indices.delete :index => index
elasticsearch_client.indices.refresh
end

def cleanup_system_indices(pipeline_ids)
pipeline_ids.each do |id|
begin
elasticsearch_client.perform_request(:delete, "_logstash/pipeline/#{id}")
rescue Elasticsearch::Transport::Transport::Errors::NotFound => e
puts ".logstash can be empty #{e.message}"
end
end
elasticsearch_client.indices.refresh
end

def logstash_command_append(cmd, argument, value)
if cmd !~ /#{Regexp.escape(argument)}/
cmd << " #{argument} #{value}"
Expand Down
90 changes: 39 additions & 51 deletions x-pack/spec/config_management/elasticsearch_source_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,18 @@
end
end

describe "#config_path" do
before do
# we are testing the arguments here, not the behavior of the elasticsearch output
allow_any_instance_of(described_class).to receive(:build_client).and_return(nil)
end
describe "#fetch_config" do
let(:mock_client) { double("http_client") }

let(:pipeline_id) { "foobar" }
let(:settings) do
{
"xpack.management.pipeline.id" => pipeline_id,
"xpack.management.elasticsearch.password" => "testpassword"
}
before do
expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client)
allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response)
allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client)
end

it "generates the path to get the configuration" do
expect(subject.config_path).to eq("#{described_class::PIPELINE_INDEX}/_mget")
expect(mock_client).to receive(:get).with("#{described_class::SYSTEM_INDICES_API_PATH}/apache,nginx").and_return(LogStash::Json.load("{}"))
subject.fetch_config(["apache", "nginx"])
end
end

Expand Down Expand Up @@ -216,11 +212,9 @@
let(:pipeline_id) { "apache" }
let(:mock_client) { double("http_client") }
let(:settings) { super.merge({ "xpack.management.pipeline.id" => pipeline_id }) }
let(:es_path) { "#{described_class::PIPELINE_INDEX}/_mget" }
let(:request_body_string) { LogStash::Json.dump({ "docs" => [{ "_id" => pipeline_id }] }) }

before do
allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_response))
allow(mock_client).to receive(:get).with(instance_of(String)).and_return(LogStash::Json.load(elasticsearch_response))
allow(mock_license_client).to receive(:get).with('_xpack').and_return(valid_xpack_response)
allow_any_instance_of(LogStash::LicenseChecker::LicenseReader).to receive(:client).and_return(mock_license_client)

Expand All @@ -241,29 +235,22 @@
let(:invalid_pipeline_setting) {"nonsensical.invalid.setting"}
let(:elasticsearch_response) {
%{
{ "docs": [{
"_index":".logstash",
"_type":"pipelines",
"_id":"#{pipeline_id}",
"_version":8,
"found":true,
"_source": {
"id":"apache",
"description":"Process apache logs",
"modified_timestamp":"2017-02-28T23:02:17.023Z",
"pipeline_metadata":{
"version":5,
"type":"logstash_pipeline",
"username":"elastic"
},
"pipeline":"#{config}",
"pipeline_settings": {
"#{whitelisted_pipeline_setting_name}":#{whitelisted_pipeline_setting_value},
"#{non_whitelisted_pipeline_setting_name}":#{non_whitelisted_pipeline_setting_value},
"#{invalid_pipeline_setting}":-9999
}
{
"#{pipeline_id}" : {
"username": "log.stash",
"modified_timestamp":"2017-02-28T23:02:17.023Z",
"pipeline_metadata":{
"version":5,
"type":"logstash_pipeline",
"username":"elastic"
},
"pipeline":"#{config}",
"pipeline_settings": {
"#{whitelisted_pipeline_setting_name}":#{whitelisted_pipeline_setting_value},
"#{non_whitelisted_pipeline_setting_name}":#{non_whitelisted_pipeline_setting_value},
"#{invalid_pipeline_setting}":-9999
}
}]
}
}
}
}
Expand All @@ -287,7 +274,7 @@

context 'when the license has expired' do
let(:config) { "input { generator {} } filter { mutate {} } output { }" }
let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" }
let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" }
let(:license_status) { 'expired'}
let(:license_expiry_date) { Time.now - (60 * 60 * 24)}

Expand All @@ -305,7 +292,7 @@

context 'when the license server is not available' do
let(:config) { "input { generator {} } filter { mutate {} } output { }" }
let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" }
let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" }

before :each do
allow(mock_license_client).to receive(:get).with('_xpack').and_raise("An error is here")
Expand All @@ -319,7 +306,7 @@

context 'when the xpack is not installed' do
let(:config) { "input { generator {} } filter { mutate {} } output { }" }
let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" }
let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" }

before :each do
expect(mock_license_client).to receive(:get).with('_xpack').and_return(no_xpack_response)
Expand All @@ -333,7 +320,7 @@

describe 'security enabled/disabled in Elasticsearch' do
let(:config) { "input { generator {} } filter { mutate {} } output { }" }
let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" }
let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" }

let(:xpack_response) do
{
Expand Down Expand Up @@ -377,7 +364,7 @@

context "With an invalid basic license, it should raise an error" do
let(:config) { "input { generator {} } filter { mutate {} } output { }" }
let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" }
let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" }
let(:license_type) { 'basic' }

it 'should raise an error' do
Expand All @@ -394,7 +381,7 @@
end

let(:config) { "input { generator {} } filter { mutate {} } output { }" }
let(:elasticsearch_response) { "{ \"docs\":[{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}]}" }
let(:elasticsearch_response) { "{\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}}" }
let(:license_type) { license_type }

it "returns a valid pipeline config" do
Expand All @@ -410,7 +397,7 @@
context "with multiples `pipeline_id` configured" do

before do
allow(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_return(LogStash::Json.load(elasticsearch_response))
allow(mock_client).to receive(:get).with(instance_of(String)).and_return(LogStash::Json.load(elasticsearch_response))
expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client)
end

Expand All @@ -421,15 +408,16 @@
"firewall" => config_firewall
}
end
let(:pipeline_id) { pipelines.keys }

let(:config_apache) { "input { generator { id => '123'} } filter { mutate {} } output { }" }
let(:config_firewall) { "input { generator { id => '321' } } filter { mutate {} } output { }" }
let(:elasticsearch_response) do
content = "{ \"docs\":["
content = "{"
content << pipelines.collect do |pipeline_id, config|
"{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"#{pipeline_id}\",\"_version\":8,\"found\":true,\"_source\":{\"id\":\"apache\",\"description\":\"Process apache logs\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\",\"username\":\"elastic\"},\"pipeline\":\"#{config}\"}}"
"\"#{pipeline_id}\":{\"username\":\"log.stash\",\"modified_timestamp\":\"2017-02-28T23:02:17.023Z\",\"pipeline_metadata\":{\"version\":5,\"type\":\"logstash_pipeline\"},\"pipeline\":\"#{config}\",\"pipeline_settings\":{\"pipeline.batch.delay\":\"50\"}}"
end.join(",")
content << "]}"
content << "}"
content
end

Expand All @@ -443,7 +431,7 @@
end

context "when the configuration is not found" do
let(:elasticsearch_response) { "{ \"docs\": [{\"_index\":\".logstash\",\"_type\":\"pipelines\",\"_id\":\"donotexist\",\"found\":false}]}" }
let(:elasticsearch_response) { "{}" }

before do
expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client)
Expand All @@ -455,14 +443,14 @@
end

context "when any error returned from elasticsearch" do
let(:elasticsearch_response){'{ "error":{"root_cause":[{"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"}],"type":"illegal_argument_exception","reason":"No endpoint or operation is available at [testing_ph]"},"status":400}' }
let(:elasticsearch_response){"{\"error\" : \"no handler found for uri [/_logstash/pipelines?pretty] and method [GET]\"}" }

before do
expect_any_instance_of(described_class).to receive(:build_client).and_return(mock_client)
end

it "raises a `RemoteConfigError`" do
expect { subject.pipeline_configs }.to raise_error /illegal_argument_exception/
expect { subject.pipeline_configs }.to raise_error /no handler found/
end
end

Expand All @@ -474,7 +462,7 @@
end

it "raises the exception upstream" do
expect(mock_client).to receive(:post).with(es_path, {}, request_body_string).and_raise("Something bad")
expect(mock_client).to receive(:get).with(instance_of(String)).and_raise("Something bad")
expect { subject.pipeline_configs }.to raise_error /Something bad/
end
end
Expand Down