diff --git a/clients/cmd/logstash/lib/logstash/outputs/loki.rb b/clients/cmd/logstash/lib/logstash/outputs/loki.rb index 01f9a0aa2e79..1be696dfcedb 100644 --- a/clients/cmd/logstash/lib/logstash/outputs/loki.rb +++ b/clients/cmd/logstash/lib/logstash/outputs/loki.rb @@ -50,6 +50,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base ## 'An array of fields to map to labels, if defined only fields in this list will be mapped.' config :include_fields, :validate => :array, :default => [], :required => false + ## 'An array of fields to map to structure metadata, if defined only fields in this list will be mapped.' + config :metadata_fields, :validate => :array, :default => [], :required => false + ## 'Backoff configuration. Maximum backoff time between retries. Default 300s' config :max_delay, :validate => :number, :default => 300, :required => false @@ -71,7 +74,7 @@ def register @logger.info("Loki output plugin", :class => self.class.name) # initialize Queue and Mutex - @entries = Queue.new + @entries = Queue.new @mutex = Mutex.new @stop = false @@ -94,7 +97,7 @@ def max_batch_size @mutex.synchronize do return if @stop end - + e = @entries.deq return if e.nil? @@ -201,13 +204,13 @@ def is_batch_expired ## Receives logstash events public def receive(event) - @entries << Entry.new(event, @message_field, @include_fields) + @entries << Entry.new(event, @message_field, @include_fields, @metadata_fields) end def close @entries.close - @mutex.synchronize do - @stop = true + @mutex.synchronize do + @stop = true end @batch_wait_thread.join @batch_size_thread.join diff --git a/clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb b/clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb index da9c64e9b07f..23ec13aa804c 100644 --- a/clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb +++ b/clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb @@ -52,7 +52,19 @@ def to_json def build_stream(stream) values = [] stream['entries'].each { |entry| - values.append([entry['ts'].to_s, entry['line']]) + if entry.key?('metadata') + sorted_metadata = entry['metadata'].sort.to_h + values.append([ + entry['ts'].to_s, + entry['line'], + sorted_metadata + ]) + else + values.append([ + entry['ts'].to_s, + entry['line'] + ]) + end } return { 'stream'=>stream['labels'], diff --git a/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb b/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb index 26f04fa36ba5..be4c2c0c3d4f 100644 --- a/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb +++ b/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb @@ -5,7 +5,7 @@ def to_ns(s) class Entry include Loki attr_reader :labels, :entry - def initialize(event,message_field,include_fields) + def initialize(event,message_field,include_fields,metadata_fields) @entry = { "ts" => to_ns(event.get("@timestamp")), "line" => event.get(message_field).to_s @@ -21,6 +21,22 @@ def initialize(event,message_field,include_fields) next if include_fields.length() > 0 and not include_fields.include?(key) @labels[key] = value.to_s } + + # Unlike include_fields we should skip if no metadata_fields provided + if metadata_fields.length() > 0 + @metadata = {} + event.to_hash.each { |key,value| + next if key.start_with?('@') + next if value.is_a?(Hash) + next if metadata_fields.length() > 0 and not metadata_fields.include?(key) + @metadata[key] = value.to_s + } + + # Add @metadata to @entry if there was a match + if @metadata.size > 0 + @entry.merge!('metadata' => @metadata) + end + end end end end diff --git a/clients/cmd/logstash/logstash-output-loki.gemspec b/clients/cmd/logstash/logstash-output-loki.gemspec index cbd66eb06212..0a12d218d20c 100644 --- a/clients/cmd/logstash/logstash-output-loki.gemspec +++ b/clients/cmd/logstash/logstash-output-loki.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-loki' - s.version = '1.1.0' + s.version = '1.2.0' s.authors = ['Aditya C S','Cyril Tovena'] s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com'] diff --git a/clients/cmd/logstash/loki-metadata.conf b/clients/cmd/logstash/loki-metadata.conf new file mode 100644 index 000000000000..891f17d83dbf --- /dev/null +++ b/clients/cmd/logstash/loki-metadata.conf @@ -0,0 +1,15 @@ +input { + generator { + message => "Hello world!" + count => 10 + add_field => {cluster=> "foo" namespace=>"bar" trace_id=> "trace_001"} + } +} + +output { + loki { + url => "http://localhost:3100" + include_fields => ["cluster"] + metadata_fields => ["trace_id"] + } +} diff --git a/clients/cmd/logstash/loki.conf b/clients/cmd/logstash/loki.conf index a0ab6e062a0c..a505981ee53a 100644 --- a/clients/cmd/logstash/loki.conf +++ b/clients/cmd/logstash/loki.conf @@ -15,6 +15,9 @@ output { # If include_fields is set, only fields in this list will be sent to Loki as labels. #include_fields => ["service","host","app","env"] #default empty array, all labels included. + # If metadata_fields is set, fields in this list will be sent to Loki as structured metadata for the associated log. + #metadata_fields => ["trace_id"] #default empty array, no structure metadata will be included + #batch_wait => 1 ## in seconds #default 1 second #batch_size => 102400 #bytes #default 102400 bytes diff --git a/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb b/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb index 615d873bccff..a0ccae53b5b0 100644 --- a/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb +++ b/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb @@ -21,31 +21,40 @@ {'@path' => '/path/to/file.log'}, }, 'host' => '172.0.0.1', + 'trace_id' => 'trace_001', '@timestamp' => Time.now } ) } it 'labels extracted should not contains object and metadata or timestamp' do - entry = Entry.new(event,"message", []) - expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'}) + entry = Entry.new(event,"message", [], []) + expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5', 'trace_id'=>'trace_001'}) expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp")) expect(entry.entry['line']).to eql 'hello' end it 'labels extracted should only contain allowlisted labels' do - entry = Entry.new(event, "message", %w[agent foo]) + entry = Entry.new(event, "message", %w[agent foo], []) expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'}) expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp")) expect(entry.entry['line']).to eql 'hello' end + + it 'labels and structured metadata extracted should only contain allow listed labels and metadata' do + entry = Entry.new(event, "message", %w[agent foo], %w[trace_id]) + expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'}) + expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp")) + expect(entry.entry['line']).to eql 'hello' + expect(entry.entry['metadata']).to eql({'trace_id' => 'trace_001'}) + end end context 'test batch generation with label order' do let (:entries) {[ - Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []), - Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []), - Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []), + Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], []), + Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", [], []), + Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", [], []), ]} let (:expected) { diff --git a/clients/cmd/logstash/spec/outputs/loki_spec.rb b/clients/cmd/logstash/spec/outputs/loki_spec.rb index 8183798f23c1..76972fd52342 100644 --- a/clients/cmd/logstash/spec/outputs/loki_spec.rb +++ b/clients/cmd/logstash/spec/outputs/loki_spec.rb @@ -28,15 +28,15 @@ context 'when adding en entry to the batch' do let (:simple_loki_config) {{'url' => 'http://localhost:3100'}} - let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])} + let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])} let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h} let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }} - let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])} + let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])} let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h} it 'should not add empty line' do plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config) - emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", []) + emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [], []) expect(plugin.add_entry_to_batch(emptyEntry)).to eql true expect(plugin.batch).to eql nil end @@ -83,8 +83,51 @@ end end + context 'when building json from batch to send' do + let (:basic_loki_config) {{'url' => 'http://localhost:3100'}} + let (:basic_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", [], [])} + let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }} + let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])} + let (:metadata_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id"] }} + let (:metadata_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id"])} + let (:metadata_multi_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id", "user_id"] }} + let (:metadata_multi_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","user_id"=>"user_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id", "user_id"])} + + it 'should not include labels or metadata' do + plugin = LogStash::Plugin.lookup("output", "loki").new(basic_loki_config) + expect(plugin.batch).to eql nil + expect(plugin.add_entry_to_batch(basic_entry)).to eql true + expect(plugin.batch).not_to be_nil + expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"buzz":"bar","cluster":"us-central1","trace_id":"trace_001"},"values":[["1000000000","foobuzz"]]}]}' + end + + it 'should include metadata with no labels' do + plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_loki_config) + expect(plugin.batch).to eql nil + expect(plugin.add_entry_to_batch(metadata_entry)).to eql true + expect(plugin.batch).not_to be_nil + expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001"}]]}]}' + end + + it 'should include labels with no metadata' do + plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config) + expect(plugin.batch).to eql nil + expect(plugin.add_entry_to_batch(include_entry)).to eql true + expect(plugin.batch).not_to be_nil + expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz"]]}]}' + end + + it 'should include labels with multiple metadata' do + plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_multi_loki_config) + expect(plugin.batch).to eql nil + expect(plugin.add_entry_to_batch(metadata_multi_entry)).to eql true + expect(plugin.batch).not_to be_nil + expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001","user_id":"user_001"}]]}]}' + end + end + context 'batch expiration' do - let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])} + let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])} it 'should not expire if empty' do loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5})) @@ -147,13 +190,13 @@ loki.receive(event) sent.deq sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil - expect(loki.batch).to be_nil + expect(loki.batch).to be_nil loki.close end end context 'http requests' do - let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])} + let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])} it 'should send credentials' do conf = { diff --git a/docs/sources/get-started/labels/structured-metadata.md b/docs/sources/get-started/labels/structured-metadata.md index 50a5acd320e2..ee6c58c1cecd 100644 --- a/docs/sources/get-started/labels/structured-metadata.md +++ b/docs/sources/get-started/labels/structured-metadata.md @@ -32,6 +32,8 @@ For more information on how to push logs to Loki via the HTTP endpoint, refer to Alternatively, you can use the Grafana Agent or Promtail to extract and attach structured metadata to your log lines. See the [Promtail: Structured metadata stage]({{< relref "../../send-data/promtail/stages/structured_metadata" >}}) for more information. +With Loki version 1.2.0, support for structured metadata has been added to the Logstash output plugin. For more information, see [logstash]({{< relref "../../send-data/logstash/_index.md" >}}). + ## Querying structured metadata Structured metadata is extracted automatically for each returned log line and added to the labels returned for the query. @@ -49,7 +51,7 @@ Of course, you can filter by multiple labels of structured metadata at the same {job="example"} | trace_id="0242ac120002" | user_id="superUser123" ``` -Note that since structured metadata is extracted automatically to the results labels, some metric queries might return +Note that since structured metadata is extracted automatically to the results labels, some metric queries might return an error like `maximum of series (50000) reached for a single query`. You can use the [Keep]({{< relref "../../query/log_queries#keep-labels-expression" >}}) and [Drop]({{< relref "../../query/log_queries#drop-labels-expression" >}}) stages to filter out labels that you don't need. For example: diff --git a/docs/sources/send-data/logstash/_index.md b/docs/sources/send-data/logstash/_index.md index d95cada15f44..7e3bf7dcf377 100644 --- a/docs/sources/send-data/logstash/_index.md +++ b/docs/sources/send-data/logstash/_index.md @@ -1,8 +1,8 @@ --- title: Logstash plugin -menuTitle: +menuTitle: description: Instructions to install, configure, and use the Logstash plugin to send logs to Loki. -aliases: +aliases: - ../send-data/a/logstash/ weight: 800 --- @@ -61,9 +61,11 @@ output { [tenant_id => string | default = nil | required=false] [message_field => string | default = "message" | required=false] - + [include_fields => array | default = [] | required=false] + [metadata_fields => array | default = [] | required=false] + [batch_wait => number | default = 1(s) | required=false] [batch_size => number | default = 102400(bytes) | required=false] @@ -112,8 +114,6 @@ Contains a `message` and `@timestamp` fields, which are respectively used to for All other fields (except nested fields) will form the label set (key value pairs) attached to the log line. [This means you're responsible for mutating and dropping high cardinality labels](/blog/2020/04/21/how-labels-in-loki-can-make-log-queries-faster-and-easier/) such as client IPs. You can usually do so by using a [`mutate`](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) filter. -**Note:** In version 1.1.0 and greater of this plugin you can also specify a list of labels to allowlist via the `include_fields` configuration. - For example the configuration below : ```conf @@ -194,6 +194,13 @@ filter { } ``` +### Version Notes + +Important notes regarding versions: + +- Version 1.1.0 and greater of this plugin you can also specify a list of labels to allow list via the `include_fields` configuration. +- Version 1.2.0 and greater of this plugin you can also specify structured metadata via the `metadata_fields` configuration. + ### Configuration Properties #### url @@ -216,6 +223,10 @@ Message field to use for log lines. You can use logstash key accessor language t An array of fields which will be mapped to labels and sent to Loki, when this list is configured **only** these fields will be sent, all other fields will be ignored. +#### metadata_fields + +An array of fields which will be mapped to [structured metadata]({{< relref "../../get-started/labels/structured-metadata.md" >}}) and sent to Loki for each log line + #### batch_wait Interval in seconds to wait before pushing a batch of records to Loki. This means even if the [batch size](#batch_size) is not reached after `batch_wait` a partial batch will be sent, this is to ensure freshness of the data. @@ -246,7 +257,7 @@ Loki is a multi-tenant log storage platform and all requests sent must include a Specify a pair of client certificate and private key with `cert` and `key` if a reverse proxy with client certificate verification is configured in front of Loki. `ca_cert` can also be specified if the server uses custom certificate authority. -### insecure_skip_verify +#### insecure_skip_verify A flag to disable server certificate verification. By default it is set to `false`. @@ -286,6 +297,7 @@ output { max_delay => 500 message_field => "message" include_fields => ["container_name","namespace","pod","host"] + metadata_fields => ["pod"] } # stdout { codec => rubydebug } }