Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes fluentd new push path API. #1363

Merged
merged 2 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)

Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.2.3'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']
spec.version = '1.2.4'
spec.authors = %w[woodsaj briangann cyriltovena]
spec.email = ['awoods@grafana.com', 'brian@grafana.com' , 'cyril.tovena@grafana.com']

spec.summary = 'Output plugin to ship logs to a Grafana Loki server'
spec.description = 'Output plugin to ship logs to a Grafana Loki server'
Expand Down
86 changes: 51 additions & 35 deletions fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,26 @@ def configure(conf)
@remove_keys_accessors.push(record_accessor_create(key))
end

if !@key.nil? && !@cert.nil?
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
@key = OpenSSL::PKey.read(File.read(@key)) if @key

if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
raise "Unsupported private key type #{key.class}"
end
if ssl_cert?
load_ssl
validate_ssl_key
end

if !@ca_cert.nil? && !File.exist?(@ca_cert)
raise "CA certificate file #{@ca_cert} not found"
raise "CA certificate file #{@ca_cert} not found" if !@ca_cert.nil? && !File.exist?(@ca_cert)
end

def ssl_cert?
!@key.nil? && !@cert.nil?
end

def load_ssl
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert
@key = OpenSSL::PKey.read(File.read(@key)) if @key
end

def validate_ssl_key
if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA)
raise "Unsupported private key type #{key.class}"
end
end

Expand Down Expand Up @@ -127,6 +136,24 @@ def write(chunk)
req.add_field('X-Scope-OrgID', @tenant) if @tenant
req.body = Yajl.dump(body)
req.basic_auth(@username, @password) if @username

opts = ssl_opts(uri)

log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
'res=nil'
end
log.warn "failed to #{req.method} #{uri} (#{res_summary})"
log.warn Yajl.dump(body)

end
end

def ssl_opts(uri)
opts = {
use_ssl: uri.scheme == 'https'
}
Expand All @@ -139,24 +166,12 @@ def write(chunk)
)
end

if !@ca_cert.nil?
unless @ca_cert.nil?
opts = opts.merge(
ca_file: @ca_cert
)
end

log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
'res=nil'
end
log.warn "failed to #{req.method} #{uri} (#{res_summary})"
log.warn Yajl.dump(body)

end
opts
end

def generic_to_loki(chunk)
Expand All @@ -174,17 +189,14 @@ def numeric?(val)
false
end

def labels_to_protocol(data_labels)
formatted_labels = []

def format_labels(data_labels)
formatted_labels = {}
# merge extra_labels with data_labels. If there are collisions extra_labels win.
data_labels = {} if data_labels.nil?
data_labels = data_labels.merge(@extra_labels)

data_labels.each do |k, v|
formatted_labels.push(%(#{k}="#{v.gsub('"', '\\"')}")) if v
end
'{' + formatted_labels.join(',') + '}'
# sanitize label values
data_labels.each { |k, v| formatted_labels[k] = v.gsub('"', '\\"') }
formatted_labels
end

def payload_builder(streams)
Expand All @@ -193,15 +205,19 @@ def payload_builder(streams)
# create a stream for each label set.
# Additionally sort the entries by timestamp just in case we
# got them out of order.
# 'labels' => '{worker="0"}',
entries = v.sort_by.with_index { |hsh, i| [hsh['ts'], i] }
payload.push(
'labels' => labels_to_protocol(k),
'entries' => v.sort_by.with_index { |hsh, i| [Time.parse(hsh['ts']), i] }
'stream' => format_labels(k),
'values' => entries.map { |e| [e['ts'].to_s, e['line']] }
)
end
payload
end

def to_nano(time)
time.to_i * (10**9) + time.nsec
end

def record_to_line(record)
line = ''
if @drop_single_key && record.keys.length == 1
Expand Down Expand Up @@ -272,7 +288,7 @@ def chunk_to_loki(chunk)
# NOTE: timestamp must include nanoseconds
# append to matching chunk_labels key
streams[chunk_labels].push(
'ts' => Time.at(time.to_f).gmtime.iso8601(6),
'ts' => to_nano(time),
'line' => result[:line]
)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
single_chunk = [1_546_270_458, content]
single_chunk = [Time.at(1_546_270_458), content]
payload = driver.instance.generic_to_loki([single_chunk])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
end

it 'converts syslog output with extra labels to loki output' do
Expand All @@ -64,12 +64,12 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
single_chunk = [1_546_270_458, content]
single_chunk = [Time.at(1_546_270_458), content]
payload = driver.instance.generic_to_loki([single_chunk])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{env="test"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['stream']).to eq('env' => 'test')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
end

it 'converts multiple syslog output lines to loki output' do
Expand All @@ -79,14 +79,14 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
line1 = [1_546_270_458, content[0]]
line2 = [1_546_270_460, content[1]]
line1 = [Time.at(1_546_270_458), content[0]]
line2 = [Time.at(1_546_270_460), content[1]]
payload = driver.instance.generic_to_loki([line1, line2])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 2
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][1]['ts']).to eq '2018-12-31T15:34:20.000000Z'
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 2
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][1][0]).to eq '1546270460000000000'
end

it 'converts multiple syslog output lines with extra labels to loki output' do
Expand All @@ -97,14 +97,14 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog2')
line1 = [1_546_270_458, content[0]]
line2 = [1_546_270_460, content[1]]
line1 = [Time.at(1_546_270_458), content[0]]
line2 = [Time.at(1_546_270_460), content[1]]
payload = driver.instance.generic_to_loki([line1, line2])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{env="test"}'
expect(body[:streams][0]['entries'].count).to eq 2
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][1]['ts']).to eq '2018-12-31T15:34:20.000000Z'
expect(body[:streams][0]['stream']).to eq('env' => 'test')
expect(body[:streams][0]['values'].count).to eq 2
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][1][0]).to eq '1546270460000000000'
end

it 'formats record hash as key_value' do
Expand All @@ -114,13 +114,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq 'message="' + content[0] + '" stream="stdout"'
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq 'message="' + content[0] + '" stream="stdout"'
end

it 'formats record hash as json' do
Expand All @@ -131,13 +131,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump(line1[1])
expect(body[:streams][0]['stream'].empty?).to eq true
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump(line1[1])
end

it 'extracts record key as label' do
Expand All @@ -151,13 +151,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{stream="stdout"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
expect(body[:streams][0]['stream']).to eq('stream' => 'stdout')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump('message' => content[0])
end

it 'extracts nested record key as label' do
Expand All @@ -171,13 +171,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0], 'kubernetes' => {})
expect(body[:streams][0]['stream']).to eq('pod' => 'podname')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump('message' => content[0], 'kubernetes' => {})
end

it 'extracts nested record key as label and drop key after' do
Expand All @@ -192,13 +192,13 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
expect(body[:streams][0]['stream']).to eq('pod' => 'podname')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq Yajl.dump('message' => content[0])
end

it 'formats as simple string when only 1 record key' do
Expand All @@ -213,12 +213,35 @@
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'stream' => 'stdout' }]
line1 = [Time.at(1_546_270_458), { 'message' => content[0], 'stream' => 'stdout' }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{stream="stdout"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq content[0]
expect(body[:streams][0]['stream']).to eq('stream' => 'stdout')
expect(body[:streams][0]['values'].count).to eq 1
expect(body[:streams][0]['values'][0][0]).to eq '1546270458000000000'
expect(body[:streams][0]['values'][0][1]).to eq content[0]
end

it 'order by timestamp then index when received unordered' do
config = <<-CONF
url https://logs-us-west1.grafana.net
drop_single_key true
<label>
stream
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
lines = [
[Time.at(1_546_270_460), { 'message' => '4', 'stream' => 'stdout' }],
[Time.at(1_546_270_459), { 'message' => '2', 'stream' => 'stdout' }],
[Time.at(1_546_270_458), { 'message' => '1', 'stream' => 'stdout' }],
[Time.at(1_546_270_459), { 'message' => '3', 'stream' => 'stdout' }],
[Time.at(1_546_270_450), { 'message' => '0', 'stream' => 'stdout' }],
[Time.at(1_546_270_460), { 'message' => '5', 'stream' => 'stdout' }]
]
res = driver.instance.generic_to_loki(lines)
expect(res[0]['stream']).to eq('stream' => 'stdout')
6.times { |i| expect(res[0]['values'][i][1]).to eq i.to_s }
end
end