/
nested_json_split.rb
109 lines (98 loc) · 3.56 KB
/
nested_json_split.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# encoding: utf-8
require "logstash/filters/base"
require "logstash/namespace"
# This plugin ingests a (potentially nested) JSON object and splits it into
# multiple events based on one of its attributes that is an Array.
#
# For example, you could turn this aggregate event:
#
# [source,ruby]
# ----------------------------------------------------------------------------
# {
# "requestId": "123",
# "body": {
# "events": [
# { "id": "456", "master_event": "First event." },
# { "id": "789", "master_event": "Second event." },
# }
# }
# }
# ----------------------------------------------------------------------------
#
# Into these individual events:
#
# [source,ruby]
# ----------------------------------------------------------------------------
# {
# "requestId": "123",
# "event": { "id": "456", "master_event": "First event." }
# }
#
# {
# "requestId": "123",
# "event": { "id": "789", "master_event": "Second event." }
# }
# ----------------------------------------------------------------------------
#
# By configuring the plugin like so:
#
# [source]
# ----------------------------------------------------------------------------
# filter {
# nested_json_split {
# keys: ["body", "events"]
# target: "event"
# }
# }
# ----------------------------------------------------------------------------
#
# By default, the following configuration is applied:
#
# [source]
# ----------------------------------------------------------------------------
# filter {
# nested_json_split {
# keys: ["events"]
# target: "message"
# }
# }
# ----------------------------------------------------------------------------
class LogStash::Filters::NestedJsonSplit < LogStash::Filters::Base
config_name "nested_json_split"
config :keys, validate: :array, default: ["events"]
config :target, validate: :string, default: "message"
def filter(master_event)
events = master_event.remove(@keys.first)
@keys[1..-1].each do |key|
raise LogStash::ConfigurationError, "Input must be a JSON object / Ruby Hash but is instead: #{events.class.name} (#{events.inspect}). (Error occured while inspecting key #{key.inspect}.)" unless events.is_a?(Hash)
events = events[key]
end
if events.nil?
@logger.warn("Filtered events are null", events: events, keys: @keys, master_event: master_event, target: @target)
elsif not events.is_a?(Array)
raise(
LogStash::ConfigurationError,
"Filtered input should be an Array but is instead: #{events.class.name} (#{events.inspect})."
)
elsif events.empty?
@logger.info("Filtered events are empty", events: events, keys: @keys, master_event: master_event, target: @target)
else
events.each_with_index do |event_payload, idx|
if event_payload.respond_to?(:empty?) ? event_payload.empty? : !event_payload
@logger.info("Event #{idx+1} is empty", event_payload: event_payload, events: events, keys: @keys, master_event: master_event, target: @target)
else
event = master_event.clone
event[@target] = event_payload
@logger.debug("Stashing event #{idx+1}", event: event, event_payload: event_payload, events: events, keys: @keys, master_event: master_event, target: @target)
filter_matched(event)
yield event
end
end
end
master_event.cancel
filter_matched(master_event)
end
def register
raise LogStash::ConfigurationError, "\"keys\" must be an Array of Strings but is: #{@keys.inspect}." unless @keys.is_a?(Array) and @keys.all? { |k| k.is_a?(String) }
end
end