Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service metadata monitoring #57

Merged
merged 16 commits into from
Jul 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deploy/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RUN apk add --no-cache libexecinfo libexecinfo-dev
RUN apk add --no-cache snappy g++ snappy-dev

RUN apk add --no-cache --update --virtual .build-deps sudo build-base ruby-dev \
&& gem install concurrent-ruby \
&& gem install google-protobuf \
&& gem install kubeclient \
&& gem install lru_redux \
Expand Down
3 changes: 0 additions & 3 deletions deploy/docker/fluent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
<filter FOR_TEST_ONLY>
@type prometheus_format
</filter>
<filter FOR_TEST_ONLY>
@type enhance_k8s_metadata
</filter>
<filter FOR_TEST_ONLY>
@type kubernetes_metadata
</filter>
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/fluentd-sumologic.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ rules:
- configmaps
- daemonsets
- deployments
- endpoints
- events
- namespaces
- nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |spec|
spec.test_files = test_files
spec.require_paths = ['lib']

spec.add_runtime_dependency 'concurrent-ruby', '~> 1.1'
spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2']
spec.add_runtime_dependency 'kubeclient', '~> 4.4.0'
spec.add_runtime_dependency 'lru_redux', '~> 1.1.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ class EnhanceK8sMetadataFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('enhance_k8s_metadata', self)

require_relative '../../sumologic/kubernetes/cache_strategy.rb'
require_relative '../../sumologic/kubernetes/service_monitor.rb'

helpers :record_accessor
helpers :thread
include SumoLogic::Kubernetes::Connector
include SumoLogic::Kubernetes::Reader
include SumoLogic::Kubernetes::CacheStrategy
include SumoLogic::Kubernetes::ServiceMonitor

# parameters for read/write record
config_param :in_namespace_path, :array, default: ['$.namespace']
Expand Down Expand Up @@ -44,6 +47,11 @@ def configure(conf)
@in_pod_ac = @in_pod_path.map { |path| record_accessor_create(path) }
end

def start
super
start_service_monitor
end

def filter(tag, time, record)
decorate_record(record)
record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def group_clients
end

def create_client(base, ver)
url = "#{@kubernetes_url}/#{base}/#{ver}"
log.info "create client with URL: #{url}"
url = "#{@kubernetes_url}/#{base}"
log.info "create client with URL: #{url} and apiVersion: #{ver}"
client = Kubeclient::Client.new(
url,
'',
ver,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
require 'concurrent'

module SumoLogic
module Kubernetes
# module for watching changes to services
module ServiceMonitor
require_relative 'connector.rb'

def start_service_monitor
log.info "Starting watching for service changes"

@watch_service_interval_seconds = 300

last_recreated = Time.now.to_i
log.debug "last_recreated initialized to #{last_recreated}"

while true do
# Periodically restart watcher connection by checking if enough time has passed since
# last time watcher thread was recreated or if the watcher thread has been stopped.
now = Time.now.to_i
watcher_exists = Thread.list.select {|thread| thread.object_id == @watcher_id && thread.alive?}.count > 0
if now - last_recreated >= @watch_service_interval_seconds || !watcher_exists

log.debug "Recreating service watcher thread"
@watch_stream.finish if @watch_stream

start_service_watcher_thread
last_recreated = now
log.debug "last_recreated updated to #{last_recreated}"
end
sleep(10)
end
end

def start_service_watcher_thread
log.debug "Starting service endpoints watcher thread"
params = Hash.new
params[:as] = :raw
params[:resource_version] = get_current_service_snapshot_resource_version
params[:timeout_seconds] = @watch_service_interval_seconds + 60

@watcher = @clients['v1'].public_send("watch_endpoints", params).tap do |watcher|
thread_create(:"watch_endpoints") do
@watch_stream = watcher
@watcher_id = Thread.current.object_id
log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}"

watcher.each do |event|
begin
event = JSON.parse(event)
handle_service_event(event)
rescue => e
log.error "Got exception #{e} parsing entity #{entity}. Skipping."
end
end
log.info "Closing watch stream"
end
end
end

def get_current_service_snapshot_resource_version
log.debug "Getting current service snapshot"
begin
params = Hash.new
params[:as] = :raw
response = @clients['v1'].public_send "get_endpoints", params
result = JSON.parse(response)
new_snapshot_pods_to_services = Concurrent::Map.new {|h, k| h[k] = []}

result['items'].each do |endpoint|
service = endpoint['metadata']['name']
get_pods_for_service(endpoint).each {|pod| new_snapshot_pods_to_services[pod] << service}
end

@pods_to_services = new_snapshot_pods_to_services
result['metadata']['resourceVersion']
rescue => e
log.error "Got exception #{e} getting current service snapshot and corresponding resource version."
0
end
end

def get_pods_for_service(endpoint)
pods = []
if endpoint.key? 'subsets'
endpoint['subsets'].each do |subset|
['addresses', 'notReadyAddresses'].each do |key|
if subset.key? key
subset[key].each do |object|
if object.key? 'targetRef'
if object['targetRef']['kind'] == 'Pod'
pod = object['targetRef']['name']
log.debug "Found Pod #{pod} for Service #{endpoint['metadata']['name']}"
pods << pod
end
end
end
end
end
end
end
pods
end

def handle_service_event(event)
type = event['type']
endpoint = event['object']
service = endpoint['metadata']['name']
case type
when 'ADDED'
get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service unless @pods_to_services[pod].include? service}
when 'MODIFIED'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if I'm following the MODIFIED case, can you give some explanation?

Copy link
Contributor Author

@samjsong samjsong Jul 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the MODIFIED case was the hardest part. There's two cases to cover:

  1. The service was modified such that there are more pods than there once were (increasing the number of replicas). This is the easier case. We simply add the service to @pods_to_services[pod] unless it already exists.
  2. The service was modified such that there are now fewer pods than there once were (downscaling the number of replicas). This is the harder case, since you don't know which pods were removed you have to actually search the map for mentions of the service, and delete it if it's not one of the pods we were told about in the MODIFIED event. Then if there are no services for that pod in the map, we can remove that key from the map.

I offhandedly asked Chris about it at some point, since the "looking through the entire map for mentions of the service" part really irked me... but he seemed to think it wasn't that big of an issue to be worth "over-optimizing prematurely", since this case likely will not happen often. I'm open to any suggestions 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the detailed explanation! I'm fine with current approach

desired_pods = get_pods_for_service(endpoint)
desired_pods.each {|pod| @pods_to_services[pod] |= [service]}
@pods_to_services.each do |pod, services|
if services.include? service
services.delete service unless desired_pods.include? pod
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does deleting service from services equivalent to deleting it from @pods_to_services[pod]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I believe services is a reference to the array in the value

end
@pods_to_services.delete pod if services.length == 0
end
when 'DELETED'
get_pods_for_service(endpoint).each do |pod|
@pods_to_services[pod].delete service
@pods_to_services.delete pod if @pods_to_services[pod].length == 0
end
else
log.error "Unknown type for watch endpoint event #{type}"
end
end
end
end
end
14 changes: 14 additions & 0 deletions fluent-plugin-enhance-k8s-metadata/test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@ def test_resource(name)

def stub_apis
init_globals
stub_request(:any, %r{/api$})
.to_return(
'body' => {
'versions' => ['v1']
}.to_json
)
stub_request(:any, %r{/apis$})
.to_return(
'body' => {
'versions' => ['apps/v1', 'extensions/v1beta1']
}.to_json
)
stub_request(:get, %r{/api/v1$})
.to_return(body: test_resource('api_list_core_v1.json'), status: 200)
stub_request(:get, %r{/apis/apps/v1$})
.to_return(body: test_resource('api_list_apps_v1.json'), status: 200)
stub_request(:get, %r{/apis/extensions/v1beta1$})
.to_return(body: test_resource('api_list_extensions_v1beta1.json'), status: 200)
stub_request(:get, %r{/api/v1/endpoints$})
.to_return(body: test_resource('endpoints_list.json'), status: 200)
stub_request(:get, %r{/api/v1/namespaces/sumologic/pods})
.to_return(body: test_resource('pod_sumologic.json'), status: 200)
stub_request(:get, %r{/apis/extensions/v1beta1/namespaces/sumologic/replicasets})
Expand Down
Loading