Skip to content

Commit

Permalink
use more efficient hashes when talking to kubernetes (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
grosser committed Oct 5, 2020
1 parent 8f95b0e commit 672a6bc
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 226 deletions.
10 changes: 7 additions & 3 deletions lib/fluent/plugin/filter_kubernetes_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,13 @@ def log.trace?
end

log.debug "Creating K8S client"
@client = Kubeclient::Client.new @kubernetes_url, @apiVersion,
ssl_options: ssl_options,
auth_options: auth_options
@client = Kubeclient::Client.new(
@kubernetes_url,
@apiVersion,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed_symbolized
)

begin
@client.api_valid?
Expand Down
40 changes: 20 additions & 20 deletions lib/fluent/plugin/kubernetes_metadata_common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,65 +39,65 @@ def match_annotations(annotations)

def parse_namespace_metadata(namespace_object)
labels = String.new
labels = syms_to_strs(namespace_object['metadata']['labels'].to_h) unless @skip_labels
labels = syms_to_strs(namespace_object[:metadata][:labels].to_h) unless @skip_labels

annotations = match_annotations(syms_to_strs(namespace_object['metadata']['annotations'].to_h))
annotations = match_annotations(syms_to_strs(namespace_object[:metadata][:annotations].to_h))
if @de_dot
self.de_dot!(labels) unless @skip_labels
self.de_dot!(annotations)
end
kubernetes_metadata = {
'namespace_id' => namespace_object['metadata']['uid'],
'creation_timestamp' => namespace_object['metadata']['creationTimestamp']
'namespace_id' => namespace_object[:metadata][:uid],
'creation_timestamp' => namespace_object[:metadata][:creationTimestamp]
}
kubernetes_metadata['namespace_labels'] = labels unless labels.empty?
kubernetes_metadata['namespace_annotations'] = annotations unless annotations.empty?
return kubernetes_metadata
kubernetes_metadata
end

def parse_pod_metadata(pod_object)
labels = String.new
labels = syms_to_strs(pod_object['metadata']['labels'].to_h) unless @skip_labels
labels = syms_to_strs(pod_object[:metadata][:labels].to_h) unless @skip_labels

annotations = match_annotations(syms_to_strs(pod_object['metadata']['annotations'].to_h))
annotations = match_annotations(syms_to_strs(pod_object[:metadata][:annotations].to_h))
if @de_dot
self.de_dot!(labels) unless @skip_labels
self.de_dot!(annotations)
end

# collect container informations
# collect container information
container_meta = {}
begin
pod_object['status']['containerStatuses'].each do|container_status|
pod_object[:status][:containerStatuses].each do|container_status|
# get plain container id (eg. docker://hash -> hash)
container_id = container_status['containerID'].sub /^[-_a-zA-Z0-9]+:\/\//, ''
container_id = container_status[:containerID].sub /^[-_a-zA-Z0-9]+:\/\//, ''
unless @skip_container_metadata
container_meta[container_id] = {
'name' => container_status['name'],
'image' => container_status['image'],
'image_id' => container_status['imageID']
'name' => container_status[:name],
'image' => container_status[:image],
'image_id' => container_status[:imageID]
}
else
container_meta[container_id] = {
'name' => container_status['name']
'name' => container_status[:name]
}
end
end
rescue
log.debug("parsing container meta information failed for: #{pod_object['metadata']['namespace']}/#{pod_object['metadata']['name']} ")
log.debug("parsing container meta information failed for: #{pod_object[:metadata][:namespace]}/#{pod_object[:metadata][:name]} ")
end

kubernetes_metadata = {
'namespace_name' => pod_object['metadata']['namespace'],
'pod_id' => pod_object['metadata']['uid'],
'pod_name' => pod_object['metadata']['name'],
'namespace_name' => pod_object[:metadata][:namespace],
'pod_id' => pod_object[:metadata][:uid],
'pod_name' => pod_object[:metadata][:name],
'containers' => syms_to_strs(container_meta),
'host' => pod_object['spec']['nodeName']
'host' => pod_object[:spec][:nodeName]
}
kubernetes_metadata['annotations'] = annotations unless annotations.empty?
kubernetes_metadata['labels'] = labels unless labels.empty?
kubernetes_metadata['master_url'] = @kubernetes_url unless @skip_master_url
return kubernetes_metadata
kubernetes_metadata
end

def syms_to_strs(hsh)
Expand Down
16 changes: 8 additions & 8 deletions lib/fluent/plugin/kubernetes_metadata_watch_namespaces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ def get_namespaces_and_start_watcher
resource_version: '0' # Fetch from API server.
}
namespaces = @client.get_namespaces(options)
namespaces.each do |namespace|
cache_key = namespace.metadata['uid']
namespaces[:items].each do |namespace|
cache_key = namespace[:metadata][:uid]
@namespace_cache[cache_key] = parse_namespace_metadata(namespace)
@stats.bump(:namespace_cache_host_updates)
end
options[:resource_version] = namespaces.resourceVersion
options[:resource_version] = namespaces[:metadata][:resourceVersion]
@client.watch_namespaces(options)
end

Expand All @@ -111,13 +111,13 @@ def reset_namespace_watch_retry_stats
# Process a watcher notice and potentially raise an exception.
def process_namespace_watcher_notices(watcher)
watcher.each do |notice|
case notice.type
case notice[:type]
when 'MODIFIED'
reset_namespace_watch_retry_stats
cache_key = notice.object['metadata']['uid']
cache_key = notice[:object][:metadata][:uid]
cached = @namespace_cache[cache_key]
if cached
@namespace_cache[cache_key] = parse_namespace_metadata(notice.object)
@namespace_cache[cache_key] = parse_namespace_metadata(notice[:object])
@stats.bump(:namespace_cache_watch_updates)
else
@stats.bump(:namespace_cache_watch_misses)
Expand All @@ -128,12 +128,12 @@ def process_namespace_watcher_notices(watcher)
# deleted but still processing logs
@stats.bump(:namespace_cache_watch_deletes_ignored)
when 'ERROR'
if notice.object && notice.object['code'] == 410
if notice[:object] && notice[:object][:code] == 410
@stats.bump(:namespace_watch_gone_notices)
raise GoneError
else
@stats.bump(:namespace_watch_error_type_notices)
message = notice['object']['message'] if notice['object'] && notice['object']['message']
message = notice[:object][:message] if notice[:object] && notice[:object][:message]
raise "Error while watching namespaces: #{message}"
end
else
Expand Down
22 changes: 11 additions & 11 deletions lib/fluent/plugin/kubernetes_metadata_watch_pods.rb
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ def get_pods_and_start_watcher
options[:resource_version] = @last_seen_resource_version
else
pods = @client.get_pods(options)
pods.each do |pod|
cache_key = pod.metadata['uid']
pods[:items].each do |pod|
cache_key = pod[:metadata][:uid]
@cache[cache_key] = parse_pod_metadata(pod)
@stats.bump(:pod_cache_host_updates)
end
options[:resource_version] = pods.resourceVersion
options[:resource_version] = pods[:metadata][:resourceVersion]
end
@client.watch_pods(options)
end
Expand All @@ -122,20 +122,20 @@ def process_pod_watcher_notices(watcher)
watcher.each do |notice|
# store version we processed to not reprocess it ... do not unset when there is no version in response
version = ( # TODO: replace with &.dig once we are on ruby 2.5+
notice.object && notice.object['metadata'] && notice.object['metadata']['resourceVersion']
notice[:object] && notice[:object][:metadata] && notice[:object][:metadata][:resourceVersion]
)
@last_seen_resource_version = version if version

case notice.type
case notice[:type]
when 'MODIFIED'
reset_pod_watch_retry_stats
cache_key = notice.object['metadata']['uid']
cache_key = notice.dig(:object, :metadata, :uid)
cached = @cache[cache_key]
if cached
@cache[cache_key] = parse_pod_metadata(notice.object)
@cache[cache_key] = parse_pod_metadata(notice[:object])
@stats.bump(:pod_cache_watch_updates)
elsif ENV['K8S_NODE_NAME'] == notice.object['spec']['nodeName'] then
@cache[cache_key] = parse_pod_metadata(notice.object)
elsif ENV['K8S_NODE_NAME'] == notice[:object][:spec][:nodeName] then
@cache[cache_key] = parse_pod_metadata(notice[:object])
@stats.bump(:pod_cache_host_updates)
else
@stats.bump(:pod_cache_watch_misses)
Expand All @@ -146,13 +146,13 @@ def process_pod_watcher_notices(watcher)
# deleted but still processing logs
@stats.bump(:pod_cache_watch_delete_ignored)
when 'ERROR'
if notice.object && notice.object['code'] == 410
if notice[:object] && notice[:object][:code] == 410
@last_seen_resource_version = nil # requested resourceVersion was too old, need to reset
@stats.bump(:pod_watch_gone_notices)
raise GoneError
else
@stats.bump(:pod_watch_error_type_notices)
message = notice['object']['message'] if notice['object'] && notice['object']['message']
message = notice[:object][:message] if notice[:object] && notice[:object][:message]
raise "Error while watching pods: #{message}"
end
else
Expand Down
2 changes: 2 additions & 0 deletions test/plugin/test_cache_strategy.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ def initialize
attr_accessor :stats, :cache, :id_cache, :namespace_cache, :allow_orphans

def fetch_pod_metadata(namespace_name, pod_name)
{}
end

def fetch_namespace_metadata(namespace_name)
{}
end

def log
Expand Down
12 changes: 9 additions & 3 deletions test/plugin/test_filter_kubernetes_metadata.rb
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,13 @@ def emit_with_tag(tag, msg={}, config='
'CONTAINER_ID_FULL' => '49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed459',
'randomfield' => 'randomvalue'
}
VCR.use_cassettes([{name: 'valid_kubernetes_api_server'}, {name: 'kubernetes_get_api_v1'}, {name: 'kubernetes_get_pod'},
{name: 'kubernetes_get_namespace_default'},
{name: 'metadata_from_tag_and_journald_fields'}]) do
VCR.use_cassettes([
{name: 'valid_kubernetes_api_server'},
{name: 'kubernetes_get_api_v1'},
{name: 'kubernetes_get_pod'},
{name: 'kubernetes_get_namespace_default'},
{name: 'metadata_from_tag_and_journald_fields'}
]) do
filtered = emit_with_tag(tag, msg, '
kubernetes_url https://localhost:8443
watch false
Expand Down Expand Up @@ -868,6 +872,7 @@ def emit_with_tag(tag, msg={}, config='
assert_equal(expected_kube_metadata, filtered[0])
end
end

test 'with CONTAINER_NAME that does not match' do
tag = 'var.log.containers.junk4_junk5_junk6-49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed450.log'
msg = {
Expand All @@ -892,6 +897,7 @@ def emit_with_tag(tag, msg={}, config='
assert_equal(expected_kube_metadata, filtered[0])
end
end

test 'with CONTAINER_NAME starts with k8s_ that does not match' do
tag = 'var.log.containers.junk4_junk5_junk6-49095a2894da899d3b327c5fde1e056a81376cc9a8f8b09a195f2a92bceed450.log'
msg = {
Expand Down
105 changes: 53 additions & 52 deletions test/plugin/test_watch_namespaces.rb
Original file line number Diff line number Diff line change
Expand Up @@ -24,71 +24,72 @@ class WatchNamespacesTestTest < WatchTest
include KubernetesMetadata::WatchNamespaces

setup do
@initial = Kubeclient::Common::EntityList.new(
'NamespaceList',
'123',
[
Kubeclient::Resource.new({
'metadata' => {
'name' => 'initial',
'uid' => 'initial_uid'
}
}),
Kubeclient::Resource.new({
'metadata' => {
'name' => 'modified',
'uid' => 'modified_uid'
}
})
])
@initial = {
kind: 'NamespaceList',
metadata: {resourceVersion: '123'},
items: [
{
metadata: {
name: 'initial',
uid: 'initial_uid'
}
},
{
metadata: {
name: 'modified',
uid: 'modified_uid'
}
}
]
}

@created = OpenStruct.new(
@created = {
type: 'CREATED',
object: {
'metadata' => {
'name' => 'created',
'uid' => 'created_uid'
}
metadata: {
name: 'created',
uid: 'created_uid'
}
}
)
@modified = OpenStruct.new(
}
@modified = {
type: 'MODIFIED',
object: {
'metadata' => {
'name' => 'foo',
'uid' => 'modified_uid'
}
metadata: {
name: 'foo',
uid: 'modified_uid'
}
}
)
@deleted = OpenStruct.new(
}
@deleted = {
type: 'DELETED',
object: {
'metadata' => {
'name' => 'deleteme',
'uid' => 'deleted_uid'
}
metadata: {
name: 'deleteme',
uid: 'deleted_uid'
}
}
}
@error = {
type: 'ERROR',
object: {
message: 'some error message'
}
)
@error = OpenStruct.new(
}
@gone = {
type: 'ERROR',
object: {
'message' => 'some error message'
code: 410,
kind: 'Status',
message: 'too old resource version: 123 (391079)',
metadata: {
name: 'gone',
namespace: 'gone',
uid: 'gone_uid'
},
reason: 'Gone'
}
)
@gone = OpenStruct.new(
type: 'ERROR',
object: {
'code' => 410,
'kind' => 'Status',
'message' => 'too old resource version: 123 (391079)',
'metadata' => {
'name' => 'gone',
'namespace' => 'gone',
'uid' => 'gone_uid'
},
'reason' => 'Gone'
}
)
}
end

test 'namespace list caches namespaces' do
Expand Down
Loading

0 comments on commit 672a6bc

Please sign in to comment.