Skip to content

Commit

Permalink
Fixes fluentd new push path API. (#1363)
Browse files Browse the repository at this point in the history
* Update fluentd to support the new v1 push API.

- also add tests for ordering.
- improve nanoseconds precision.
- fixes all lint/style issue.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Bump the gem and add myself as author.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
  • Loading branch information
cyriltovena authored Dec 5, 2019
1 parent a8a143b commit 2b7f375
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.2.3'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']
spec.version = '1.2.4'
spec.authors = %w[woodsaj briangann cyriltovena]
spec.email = ['awoods@grafana.com', 'brian@grafana.com' , 'cyril.tovena@grafana.com']

spec.summary = 'Output plugin to ship logs to a Grafana Loki server'
spec.description = 'Output plugin to ship logs to a Grafana Loki server'
Expand Down
86 changes: 51 additions & 35 deletions fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,26 @@ def configure(conf)
@remove_keys_accessors.push(record_accessor_create(key))
end

if !@key.nil? && !@cert.nil?
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
@key = OpenSSL::PKey.read(File.read(@key)) if @key

if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
raise "Unsupported private key type #{key.class}"
end
if ssl_cert?
load_ssl
validate_ssl_key
end

if !@ca_cert.nil? && !File.exist?(@ca_cert)
raise "CA certificate file #{@ca_cert} not found"
raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end

def ssl_cert?
!@key.nil? && !@cert.nil?
end

def load_ssl
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
@key = OpenSSL::PKey.read(File.read(@key)) if @key
end

def validate_ssl_key
if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
raise "Unsupported private key type #{key.class}"
end
end

Expand Down Expand Up @@ -127,6 +136,24 @@ def write(chunk)
req.add_field('X-Scope-OrgID', @tenant) if @tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username

opts = ssl_opts(uri)

log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
'res=nil'
end
log.warn "failed to #{req.method} #{uri} (#{res_summary})"
log.warn Yajl.dump(body)

end
end

def ssl_opts(uri)
opts = {
use_ssl: uri.scheme == 'https'
}
Expand All @@ -139,24 +166,12 @@ def write(chunk)
)
end

if !@ca_cert.nil?
unless @ca_cert.nil?
opts = opts.merge(
ca_file: @ca_cert
)
end

log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
'res=nil'
end
log.warn "failed to #{req.method} #{uri} (#{res_summary})"
log.warn Yajl.dump(body)

end
opts
end

def generic_to_loki(chunk)
Expand All @@ -174,17 +189,14 @@ def numeric?(val)
false
end

def labels_to_protocol(data_labels)
formatted_labels = []

def format_labels(data_labels)
formatted_labels = {}
# merge extra_labels with data_labels. If there are collisions extra_labels win.
data_labels = {} if data_labels.nil?
data_labels = data_labels.merge(@extra_labels)

data_labels.each do |k, v|
formatted_labels.push(%(#{k}="#{v.gsub('"', '\\"')}")) if v
end
'{' + formatted_labels.join(',') + '}'
# sanitize label values
data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') }
formatted_labels
end

def payload_builder(streams)
Expand All @@ -193,15 +205,19 @@ def payload_builder(streams)
# create a stream for each label set.
# Additionally sort the entries by timestamp just in case we
# got them out of order.
# 'labels' => '{worker="0"}',
entries = v.sort_by.with_index { |hsh, i| [hsh['ts'], i] }
payload.push(
'labels' => labels_to_protocol(k),
'entries' => v.sort_by.with_index { |hsh, i| [Time.parse(hsh['ts']), i] }
'stream' => format_labels(k),
'values' => entries.map { |e| [e['ts'].to_s, e['line']] }
)
end
payload
end

def to_nano(time)
time.to_i * (10**9) + time.nsec
end

def record_to_line(record)
line = ''
if @drop_single_key && record.keys.length == 1
Expand Down Expand Up @@ -272,7 +288,7 @@ def chunk_to_loki(chunk)
# NOTE: timestamp must include nanoseconds
# append to matching chunk_labels key
streams[chunk_labels].push(
'ts' => Time.at(time.to_f).gmtime.iso8601(6),
'ts' => to_nano(time),
'line' => result[:line]
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
single_chunk = [1_546_270_458, content]
single_chunk = [Time.at(1_546_270_458), content]
payload = driver.instance.generic_to_loki([single_chunk])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
end

it 'converts syslog output with extra labels to loki output' do
Expand All @@ -64,12 +64,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
single_chunk = [1_546_270_458, content]
single_chunk = [Time.at(1_546_270_458), content]
payload = driver.instance.generic_to_loki([single_chunk])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{env="test"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['stream']).to eq('env' => 'test')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
end

it 'converts multiple syslog output lines to loki output' do
Expand All @@ -79,14 +79,14 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
line1 = [1_546_270_458, content[0]]
line2 = [1_546_270_460, content[1]]
line1 = [Time.at(1_546_270_458), content[0]]
line2 = [Time.at(1_546_270_460), content[1]]
payload = driver.instance.generic_to_loki([line1, line2])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 2
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][1]['ts']).to eq '2018-12-31T15:34:20.000000Z'
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 2
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][1][0]).to eq '1546270460000000000'
end

it 'converts multiple syslog output lines with extra labels to loki output' do
Expand All @@ -97,14 +97,14 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
line1 = [1_546_270_458, content[0]]
line2 = [1_546_270_460, content[1]]
line1 = [Time.at(1_546_270_458), content[0]]
line2 = [Time.at(1_546_270_460), content[1]]
payload = driver.instance.generic_to_loki([line1, line2])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{env="test"}'
expect(body[:streams][0]['entries'].count).to eq 2
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][1]['ts']).to eq '2018-12-31T15:34:20.000000Z'
expect(body[:streams][0]['stream']).to eq('env' => 'test')
expect(body[:streams][0]['values'].count).to eq 2
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][1][0]).to eq '1546270460000000000'
end

it 'formats record hash as key_value' do
Expand All @@ -114,13 +114,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq 'message="' + content[0] + '" stream="stdout"'
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq 'message="' + content[0] + '" stream="stdout"'
end

it 'formats record hash as json' do
Expand All @@ -131,13 +131,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump(line1[1])
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump(line1[1])
end

it 'extracts record key as label' do
Expand All @@ -151,13 +151,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{stream="stdout"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
expect(body[:streams][0]['stream']).to eq('stream' => 'stdout')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump('message' => content[0])
end

it 'extracts nested record key as label' do
Expand All @@ -171,13 +171,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0], 'kubernetes' => {})
expect(body[:streams][0]['stream']).to eq('pod' => 'podname')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump('message' => content[0], 'kubernetes' => {})
end

it 'extracts nested record key as label and drop key after' do
Expand All @@ -192,13 +192,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
expect(body[:streams][0]['stream']).to eq('pod' => 'podname')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump('message' => content[0])
end

it 'formats as simple string when only 1 record key' do
Expand All @@ -213,12 +213,35 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{stream="stdout"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq content[0]
expect(body[:streams][0]['stream']).to eq('stream' => 'stdout')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq content[0]
end

it 'order by timestamp then index when received unordered' do
config = <<-CONF
url https://logs-us-west1.grafana.net
drop_single_key true
<label>
stream
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
lines = [
[Time.at(1_546_270_460), { 'message' => '4', 'stream' => 'stdout' }],
[Time.at(1_546_270_459), { 'message' => '2', 'stream' => 'stdout' }],
[Time.at(1_546_270_458), { 'message' => '1', 'stream' => 'stdout' }],
[Time.at(1_546_270_459), { 'message' => '3', 'stream' => 'stdout' }],
[Time.at(1_546_270_450), { 'message' => '0', 'stream' => 'stdout' }],
[Time.at(1_546_270_460), { 'message' => '5', 'stream' => 'stdout' }]
]
res = driver.instance.generic_to_loki(lines)
expect(res[0]['stream']).to eq('stream' => 'stdout')
6.times { |i| expect(res[0]['values'][i][1]).to eq i.to_s }
end
end

0 comments on commit 2b7f375

Please sign in to comment.