Skip to content

Commit

Permalink
Merge pull request #57 from SumoLogic/ssong-service-metadata
Browse files Browse the repository at this point in the history
Add service metadata monitoring
  • Loading branch information
samjsong committed Jul 8, 2019
2 parents 7241daa + 35002ed commit 1ea2661
Show file tree
Hide file tree
Showing 11 changed files with 1,369 additions and 6 deletions.
1 change: 1 addition & 0 deletions deploy/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ RUN apk add --no-cache libexecinfo libexecinfo-dev
RUN apk add --no-cache snappy g++ snappy-dev

RUN apk add --no-cache --update --virtual .build-deps sudo build-base ruby-dev \
&& gem install concurrent-ruby \
&& gem install google-protobuf \
&& gem install kubeclient \
&& gem install lru_redux \
Expand Down
3 changes: 0 additions & 3 deletions deploy/docker/fluent.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
<filter FOR_TEST_ONLY>
@type prometheus_format
</filter>
<filter FOR_TEST_ONLY>
@type enhance_k8s_metadata
</filter>
<filter FOR_TEST_ONLY>
@type kubernetes_metadata
</filter>
Expand Down
1 change: 1 addition & 0 deletions deploy/kubernetes/fluentd-sumologic.yaml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ rules:
- configmaps
- daemonsets
- deployments
- endpoints
- events
- namespaces
- nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Gem::Specification.new do |spec|
spec.test_files = test_files
spec.require_paths = ['lib']

spec.add_runtime_dependency 'concurrent-ruby', '~> 1.1'
spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2']
spec.add_runtime_dependency 'kubeclient', '~> 4.4.0'
spec.add_runtime_dependency 'lru_redux', '~> 1.1.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ class EnhanceK8sMetadataFilter < Fluent::Plugin::Filter
Fluent::Plugin.register_filter('enhance_k8s_metadata', self)

require_relative '../../sumologic/kubernetes/cache_strategy.rb'
require_relative '../../sumologic/kubernetes/service_monitor.rb'

helpers :record_accessor
helpers :thread
include SumoLogic::Kubernetes::Connector
include SumoLogic::Kubernetes::Reader
include SumoLogic::Kubernetes::CacheStrategy
include SumoLogic::Kubernetes::ServiceMonitor

# parameters for read/write record
config_param :in_namespace_path, :array, default: ['$.namespace']
Expand Down Expand Up @@ -44,6 +47,11 @@ def configure(conf)
@in_pod_ac = @in_pod_path.map { |path| record_accessor_create(path) }
end

def start
super
start_service_monitor
end

def filter(tag, time, record)
decorate_record(record)
record
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ def group_clients
end

def create_client(base, ver)
url = "#{@kubernetes_url}/#{base}/#{ver}"
log.info "create client with URL: #{url}"
url = "#{@kubernetes_url}/#{base}"
log.info "create client with URL: #{url} and apiVersion: #{ver}"
client = Kubeclient::Client.new(
url,
'',
ver,
ssl_options: ssl_options,
auth_options: auth_options,
as: :parsed
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
require 'concurrent'

module SumoLogic
module Kubernetes
# module for watching changes to services
module ServiceMonitor
require_relative 'connector.rb'

def start_service_monitor
log.info "Starting watching for service changes"

@watch_service_interval_seconds = 300

last_recreated = Time.now.to_i
log.debug "last_recreated initialized to #{last_recreated}"

while true do
# Periodically restart watcher connection by checking if enough time has passed since
# last time watcher thread was recreated or if the watcher thread has been stopped.
now = Time.now.to_i
watcher_exists = Thread.list.select {|thread| thread.object_id == @watcher_id && thread.alive?}.count > 0
if now - last_recreated >= @watch_service_interval_seconds || !watcher_exists

log.debug "Recreating service watcher thread"
@watch_stream.finish if @watch_stream

start_service_watcher_thread
last_recreated = now
log.debug "last_recreated updated to #{last_recreated}"
end
sleep(10)
end
end

def start_service_watcher_thread
log.debug "Starting service endpoints watcher thread"
params = Hash.new
params[:as] = :raw
params[:resource_version] = get_current_service_snapshot_resource_version
params[:timeout_seconds] = @watch_service_interval_seconds + 60

@watcher = @clients['v1'].public_send("watch_endpoints", params).tap do |watcher|
thread_create(:"watch_endpoints") do
@watch_stream = watcher
@watcher_id = Thread.current.object_id
log.debug "New thread to watch service endpoints #{@watcher_id} from resource version #{params[:resource_version]}"

watcher.each do |event|
begin
event = JSON.parse(event)
handle_service_event(event)
rescue => e
log.error "Got exception #{e} parsing entity #{entity}. Skipping."
end
end
log.info "Closing watch stream"
end
end
end

def get_current_service_snapshot_resource_version
log.debug "Getting current service snapshot"
begin
params = Hash.new
params[:as] = :raw
response = @clients['v1'].public_send "get_endpoints", params
result = JSON.parse(response)
new_snapshot_pods_to_services = Concurrent::Map.new {|h, k| h[k] = []}

result['items'].each do |endpoint|
service = endpoint['metadata']['name']
get_pods_for_service(endpoint).each {|pod| new_snapshot_pods_to_services[pod] << service}
end

@pods_to_services = new_snapshot_pods_to_services
result['metadata']['resourceVersion']
rescue => e
log.error "Got exception #{e} getting current service snapshot and corresponding resource version."
0
end
end

def get_pods_for_service(endpoint)
pods = []
if endpoint.key? 'subsets'
endpoint['subsets'].each do |subset|
['addresses', 'notReadyAddresses'].each do |key|
if subset.key? key
subset[key].each do |object|
if object.key? 'targetRef'
if object['targetRef']['kind'] == 'Pod'
pod = object['targetRef']['name']
log.debug "Found Pod #{pod} for Service #{endpoint['metadata']['name']}"
pods << pod
end
end
end
end
end
end
end
pods
end

def handle_service_event(event)
type = event['type']
endpoint = event['object']
service = endpoint['metadata']['name']
case type
when 'ADDED'
get_pods_for_service(endpoint).each {|pod| @pods_to_services[pod] << service unless @pods_to_services[pod].include? service}
when 'MODIFIED'
desired_pods = get_pods_for_service(endpoint)
desired_pods.each {|pod| @pods_to_services[pod] |= [service]}
@pods_to_services.each do |pod, services|
if services.include? service
services.delete service unless desired_pods.include? pod
end
@pods_to_services.delete pod if services.length == 0
end
when 'DELETED'
get_pods_for_service(endpoint).each do |pod|
@pods_to_services[pod].delete service
@pods_to_services.delete pod if @pods_to_services[pod].length == 0
end
else
log.error "Unknown type for watch endpoint event #{type}"
end
end
end
end
end
14 changes: 14 additions & 0 deletions fluent-plugin-enhance-k8s-metadata/test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,26 @@ def test_resource(name)

def stub_apis
init_globals
stub_request(:any, %r{/api$})
.to_return(
'body' => {
'versions' => ['v1']
}.to_json
)
stub_request(:any, %r{/apis$})
.to_return(
'body' => {
'versions' => ['apps/v1', 'extensions/v1beta1']
}.to_json
)
stub_request(:get, %r{/api/v1$})
.to_return(body: test_resource('api_list_core_v1.json'), status: 200)
stub_request(:get, %r{/apis/apps/v1$})
.to_return(body: test_resource('api_list_apps_v1.json'), status: 200)
stub_request(:get, %r{/apis/extensions/v1beta1$})
.to_return(body: test_resource('api_list_extensions_v1beta1.json'), status: 200)
stub_request(:get, %r{/api/v1/endpoints$})
.to_return(body: test_resource('endpoints_list.json'), status: 200)
stub_request(:get, %r{/api/v1/namespaces/sumologic/pods})
.to_return(body: test_resource('pod_sumologic.json'), status: 200)
stub_request(:get, %r{/apis/extensions/v1beta1/namespaces/sumologic/replicasets})
Expand Down
Loading

0 comments on commit 1ea2661

Please sign in to comment.