Skip to content

Commit

Permalink
Merge pull request #128 from jcantrill/remove_json
Browse files Browse the repository at this point in the history
partial fix to #79. remove json parsing and preserving
  • Loading branch information
jcantrill committed May 14, 2018
2 parents 694994a + 9a41c60 commit a8d43a0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 118 deletions.
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ This must used named capture groups for `container_name`, `pod_name` & `namespac
* `cache_size` - size of the cache of Kubernetes metadata to reduce requests to the API server (default: `1000`)
* `cache_ttl` - TTL in seconds of each cached element. Set to negative value to disable TTL eviction (default: `3600` - 1 hour)
* `watch` - set up a watch on pods on the API server for updates to metadata (default: `true`)
* `merge_json_log` - merge logs in JSON format as top level keys (default: `true`)
* `preserve_json_log` - preserve JSON logs in raw form in the `log` key, only used if the previous option is true (default: `true`)
* `de_dot` - replace dots in labels and annotations with configured `de_dot_separator`, required for ElasticSearch 2.x compatibility (default: `true`)
* `de_dot_separator` - separator to use if `de_dot` is enabled (default: `_`)
* `use_journal` - If false (default), messages are expected to be formatted and tagged as if read by the fluentd in\_tail plugin with wildcard filename. If true, messages are expected to be formatted as if read from the systemd journal. The `MESSAGE` field has the full message. The `CONTAINER_NAME` field has the encoded k8s metadata (see below). The `CONTAINER_ID_FULL` field has the full container uuid. This requires docker to use the `--log-driver=journald` log driver.
Expand All @@ -55,6 +53,12 @@ when true (default: `true`)
* `orphaned_namespace_name` - The namespace to associate with records where the namespace can not be determined (default: `.orphaned`)
* `orphaned_namespace_id` - The namespace id to associate with records where the namespace can not be determined (default: `orphaned`)

**NOTE:** As of the release 1.1.x of this plugin, it no longer supports parsing the source message into JSON and attaching it to the
payload. The following configuration options are removed:

* `merge_json_log`
* `preserve_json_log`

Reading from the JSON formatted log files with `in_tail` and wildcard filenames:
```
<source>
Expand Down
38 changes: 5 additions & 33 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class KubernetesMetadataFilter < Fluent::Plugin::Filter
:string,
:default => 'var\.log\.containers\.(?<pod_name>[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$'
config_param :bearer_token_file, :string, default: nil
config_param :merge_json_log, :bool, default: true
config_param :preserve_json_log, :bool, default: true
config_param :secret_dir, :string, default: '/var/run/secrets/kubernetes.io/serviceaccount'
config_param :de_dot, :bool, default: true
config_param :de_dot_separator, :string, default: '_'
Expand Down Expand Up @@ -247,11 +245,9 @@ def log.trace?
end
if @use_journal
log.debug "Will stream from the journal"
@merge_json_log_key = 'MESSAGE'
self.class.class_eval { alias_method :filter_stream, :filter_stream_from_journal }
else
log.debug "Will stream from the files"
@merge_json_log_key = 'log'
self.class.class_eval { alias_method :filter_stream, :filter_stream_from_files }
end

Expand Down Expand Up @@ -309,22 +305,18 @@ def filter_stream_from_files(tag, es)
}
end

es.each { |time, record|
record = merge_json_log(record) if @merge_json_log

es.each do |time, record|
record = record.merge(Marshal.load(Marshal.dump(metadata))) if metadata

new_es.add(time, record)
}
end
dump_stats
new_es
end

def filter_stream_from_journal(tag, es)
new_es = Fluent::MultiEventStream.new
batch_miss_cache = {}
es.each { |time, record|
record = merge_json_log(record) if @merge_json_log
es.each do |time, record|
metadata = nil
if record.has_key?('CONTAINER_NAME') && record.has_key?('CONTAINER_ID_FULL')
metadata = record['CONTAINER_NAME'].match(@container_name_to_kubernetes_regexp_compiled) do |match_data|
Expand All @@ -347,35 +339,15 @@ def filter_stream_from_journal(tag, es)
@stats.bump(:container_name_id_missing)
end

if metadata
record = record.merge(metadata)
end
record = record.merge(metadata) if metadata

new_es.add(time, record)
}
end

dump_stats
new_es
end

def merge_json_log(record)
if record.has_key?(@merge_json_log_key)
value = record[@merge_json_log_key].strip
if value[0].eql?('{') && value[-1].eql?('}')
begin
record = JSON.parse(value).merge(record)
unless @preserve_json_log
record.delete(@merge_json_log_key)
end
rescue JSON::ParserError=>e
@stats.bump(:merge_json_parse_errors)
log.debug(e)
end
end
end
record
end

def de_dot!(h)
h.keys.each do |ref|
if h[ref] && ref =~ /\./
Expand Down
83 changes: 0 additions & 83 deletions test/plugin/test_filter_kubernetes_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -381,45 +381,6 @@ def emit_with_tag(tag, msg={}, config='
assert_false(filtered[0].has_key?(:kubernetes))
end

test 'merges json log data' do
json_log = {
'hello' => 'world'
}
msg = {
'log' => "#{json_log.to_json}"
}
filtered = emit_with_tag('non-kubernetes', msg, '')
assert_equal(msg.merge(json_log), filtered[0])
end

test 'merges json log data in MESSAGE' do
json_log = {
'hello' => 'world'
}
msg = {
'MESSAGE' => "#{json_log.to_json}"
}
filtered = emit_with_tag('non-kubernetes', msg, 'use_journal true')
assert_equal(msg.merge(json_log), filtered[0])
end

test 'merges json log data with message field' do
json_log = {
'timeMillis' => 1459853347608,
'thread' => 'main',
'level' => 'INFO',
'loggerName' => 'org.apache.camel.spring.SpringCamelContext',
'message' => 'Total 1 routes, of which 1 is started.',
'endOfBatch' => false,
'loggerFqcn' => 'org.apache.logging.slf4j.Log4jLogger'
}
msg = {
'log' => "#{json_log.to_json}"
}
filtered = emit_with_tag('non-kubernetes', msg, '')
assert_equal(msg.merge(json_log), filtered[0])
end

test 'ignores invalid json in log field' do
json_log = "{'foo':123}"
msg = {
Expand All @@ -429,50 +390,6 @@ def emit_with_tag(tag, msg={}, config='
assert_equal(msg, filtered[0])
end

test 'merges json log data with message field in MESSAGE' do
json_log = {
'timeMillis' => 1459853347608,
'thread' => 'main',
'level' => 'INFO',
'loggerName' => 'org.apache.camel.spring.SpringCamelContext',
'message' => 'Total 1 routes, of which 1 is started.',
'endOfBatch' => false,
'loggerFqcn' => 'org.apache.logging.slf4j.Log4jLogger'
}
msg = {
'MESSAGE' => "#{json_log.to_json}"
}
filtered = emit_with_tag('non-kubernetes', msg, 'use_journal true')
assert_equal(msg.merge(json_log), filtered[0])
end

test 'emit individual fields from json, throw out whole original string' do
json_log = {
'hello' => 'world',
'more' => 'data'
}
msg = {
'log' => "#{json_log.to_json}"
}
filtered = emit_with_tag('non-kubernetes', msg, 'preserve_json_log false')
assert_equal(json_log, filtered[0])
end

test 'emit individual fields from json, throw out whole original string in MESSAGE' do
json_log = {
'hello' => 'world',
'more' => 'data'
}
msg = {
'MESSAGE' => "#{json_log.to_json}"
}
filtered = emit_with_tag('non-kubernetes', msg, '
preserve_json_log false
use_journal true
')
assert_equal(json_log, filtered[0])
end

test 'with kubernetes dotted labels, de_dot enabled' do
VCR.use_cassette('kubernetes_docker_metadata_dotted_labels') do
filtered = emit({}, '
Expand Down

0 comments on commit a8d43a0

Please sign in to comment.