Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SUMO-113170 Support different k8s api versions #74

Merged
merged 8 commits into from
Jul 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module Connector
K8_POD_CA_CERT = 'ca.crt'.freeze
K8_POD_TOKEN = 'token'.freeze

# Need different clients to access deifferent API groups/versions
# Need different clients to access different API groups/versions
# https://github.com/abonas/kubeclient/issues/208
CORE_API_VERSIONS = ['v1'].freeze
API_GROUPS = ['apps/v1', 'extensions/v1beta1'].freeze
Expand Down
1 change: 1 addition & 0 deletions fluent-plugin-events/fluent-plugin-events.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "test-unit", "~> 3.0"
spec.add_runtime_dependency "fluentd", [">= 0.14.10", "< 2"]
spec.add_runtime_dependency 'kubeclient', '~> 4.2'
spec.add_development_dependency 'webmock', '~> 3.0'
spec.add_development_dependency 'mocha'
end
12 changes: 6 additions & 6 deletions fluent-plugin-events/lib/fluent/plugin/in_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def start_watcher_thread
params[:namespace] = @namespace
params[:timeout_seconds] = @watch_interval_seconds + 60

@watcher = @client.public_send("watch_#{resource_name}", params).tap do |watcher|
@watcher = @clients[@api_version].public_send("watch_#{resource_name}", params).tap do |watcher|
thread_create(:"watch_#{resource_name}") do
@watch_stream = watcher
@watcher_id = Thread.current.object_id
Expand Down Expand Up @@ -123,14 +123,14 @@ def start_watcher_thread

def create_config_map
@configmap.data = { "resource-version-#{resource_name}": "#{@resource_version}" }
@client.public_send("create_config_map", @configmap).tap do |map|
@clients['v1'].public_send("create_config_map", @configmap).tap do |map|
log.debug "Created config map: #{map}"
end
end

def patch_config_map
pull_resource_version
@client.public_send("patch_config_map", "fluentd-config-resource-version", {data: { "resource-version-#{resource_name}": "#{@resource_version}"}}, @deploy_namespace).tap do |map|
@clients['v1'].public_send("patch_config_map", "fluentd-config-resource-version", {data: { "resource-version-#{resource_name}": "#{@resource_version}"}}, @deploy_namespace).tap do |map|
log.debug "Patched config map for #{@resource_name}: #{map}"
end
end
Expand All @@ -145,7 +145,7 @@ def initialize_resource_version

# get or create the config map
begin
@client.public_send("get_config_map", "fluentd-config-resource-version", @deploy_namespace).tap do |resource|
@clients['v1'].public_send("get_config_map", "fluentd-config-resource-version", @deploy_namespace).tap do |resource|
log.debug "Got config map: #{resource}"
version = resource.data["resource-version-#{resource_name}"]
@resource_version = version.to_i if version
Expand All @@ -159,7 +159,7 @@ def pull_resource_version
params = Hash.new
params[:as] = :raw

response = @client.public_send("get_#{resource_name}", params)
response = @clients[@api_version].public_send("get_#{resource_name}", params)
result = JSON.parse(response)

resource_version = result.fetch('resourceVersion') do
Expand All @@ -175,7 +175,7 @@ def normalize_param
log.debug 'Kubernetes URL is not set - inspecting environment'
env_host = ENV['KUBERNETES_SERVICE_HOST']
env_port = ENV['KUBERNETES_SERVICE_PORT']
@kubernetes_url = "https://#{env_host}:#{env_port}/api" unless env_host.nil? || env_port.nil?
@kubernetes_url = "https://#{env_host}:#{env_port}" unless env_host.nil? || env_port.nil?
end
log.debug "Kubernetes URL: '#{@kubernetes_url}'"

Expand Down
31 changes: 27 additions & 4 deletions fluent-plugin-events/lib/sumologic/kubernetes/connector.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,40 @@ module Connector
K8_POD_CA_CERT = 'ca.crt'.freeze
K8_POD_TOKEN = 'token'.freeze

def connect_kubernetes
@client = Kubeclient::Client.new(
@kubernetes_url, @api_version,
# Need different clients to access different API groups/versions
CORE_API_VERSIONS = ['v1'].freeze

def core_clients
CORE_API_VERSIONS.map do |ver|
[ver, create_client('api', ver)]
end.to_h
end

# If @api_version is not v1, we will create client for other API groups/versions
def group_clients
[@api_version].map do |ver|
maimaisie marked this conversation as resolved.
Show resolved Hide resolved
[ver, create_client('apis', ver)]
end.to_h
end

def create_client(base, ver)
url = "#{@kubernetes_url}/#{base}"
log.info "create client with URL: #{url} and apiVersion: #{ver}"
client = Kubeclient::Client.new(
url, ver,
ssl_options: ssl_options,
auth_options: auth_options
)
@client.api_valid?
client.api_valid?
client
rescue Exception => e
log.error e
end

def connect_kubernetes
@clients = @api_version == 'v1' ? core_clients : core_clients.merge(group_clients)
end

def ssl_store
require 'openssl'
ssl_store = OpenSSL::X509::Store.new
Expand Down
21 changes: 18 additions & 3 deletions fluent-plugin-events/test/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require "fluent/test/driver/input"
require "fluent/test/helpers"
require 'mocha/test_unit'
require 'webmock/test_unit'

Test::Unit::TestCase.include(Fluent::Test::Helpers)
Test::Unit::TestCase.extend(Fluent::Test::Helpers)
Expand All @@ -12,9 +13,24 @@ def test_resource(name)
File.new("test/resources/#{name}")
end

def mock_get_events
def stub_apis
maimaisie marked this conversation as resolved.
Show resolved Hide resolved
stub_request(:any, %r{/api$})
.to_return(
'body' => {
'versions' => ['v1']
}.to_json
)
stub_request(:any, %r{/apis$})
.to_return(
'body' => {
'versions' => ['events.k8s.io/v1beta1']
}.to_json
)
end

def mock_get_events(file_name)
Kubeclient::Client.any_instance.stubs(:public_send).with("get_events", {:as=>:raw})
.returns(File.read(test_resource('api_list_events_v1.json')))
.returns(File.read(test_resource(file_name)))
end

def mock_get_config_map
Expand Down Expand Up @@ -55,7 +71,6 @@ def get_watch_resources_count_by_type_selector(type_selector, file_name)

def init_globals
@kubernetes_url = 'http://localhost:8080'
@api_version = 'v1'
@verify_ssl = false
@ca_file = nil
@client_cert = nil
Expand Down
75 changes: 66 additions & 9 deletions fluent-plugin-events/test/plugin/test_in_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class EventsInputTest < Test::Unit::TestCase
def setup
# runs before each test
init_globals
connect_kubernetes
stub_apis
end

def teardown
Expand All @@ -24,20 +24,55 @@ def create_driver(conf)
driver = Fluent::Test::Driver::Input.new(Fluent::Plugin::EventsInput).configure(conf)
end

def connect_kubernetes_with_api_version(driver)
@api_version = driver.instance_variable_get(:@api_version)
connect_kubernetes
end

test 'pull_resource_version correctly from eventlist' do
config = %([])
driver = create_driver(config).instance
driver.instance_variable_set(:@client, @client)
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)

mock_get_events
mock_get_events('api_list_events_v1.json')
resource_version = driver.pull_resource_version
assert_equal '2346293', resource_version
end

test 'pull_resource_version correctly from eventlist with v1beta1 api version' do
config = %([
api_version "events.k8s.io/v1beta1"
])
driver = create_driver(config).instance
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)

mock_get_events('api_list_events_v1beta1.json')
resource_version = driver.pull_resource_version
assert_equal '2721303', resource_version
end

test 'initialize_resource_version correctly for different resources' do
config = %([])
driver = create_driver(config).instance
driver.instance_variable_set(:@client, @client)
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)

mock_get_config_map
resource = driver.initialize_resource_version
assert_equal 'dummy-events-rv', resource.data['resource-version-events']
assert_equal 'dummy-pods-rv', resource.data['resource-version-pods']
assert_equal 'dummy-services-rv', resource.data['resource-version-services']
end

test 'initialize_resource_verion correctly for different client' do
config = %([
api_version "events.k8s.io/v1beta1"
])
driver = create_driver(config).instance
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)

mock_get_config_map
resource = driver.initialize_resource_version
Expand All @@ -49,7 +84,8 @@ def create_driver(conf)
test 'watch_events with default type_selector' do
config = %([])
driver = create_driver(config).instance
driver.instance_variable_set(:@client, @client)
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)
mock_watch_events('api_watch_events_v1.txt')

selected_events_count = get_watch_resources_count_by_type_selector(["ADDED", "MODIFIED"],
Expand All @@ -67,7 +103,8 @@ def create_driver(conf)
type_selector ["ADDED", "MODIFIED", "DELETED"]
])
driver = create_driver(config).instance
driver.instance_variable_set(:@client, @client)
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)
mock_watch_events('api_watch_events_v1.txt')

selected_events_count = get_watch_resources_count_by_type_selector(["ADDED", "MODIFIED", "DELETED"],
Expand Down Expand Up @@ -104,7 +141,8 @@ def create_driver(conf)
resource_name "services"
])
driver = create_driver(config).instance
driver.instance_variable_set(:@client, @client)
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)
mock_watch_services

selected_services_count = get_watch_resources_count_by_type_selector(["ADDED", "MODIFIED"],
Expand All @@ -116,13 +154,32 @@ def create_driver(conf)
assert_equal 4, selected_services_count
end

test 'watch events correctly with v1beta1 api version' do
config = %([
api_version "events.k8s.io/v1beta1"
])
driver = create_driver(config).instance
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)
mock_watch_events('api_watch_events_v1beta1.txt')

selected_events_count = get_watch_resources_count_by_type_selector(["ADDED", "MODIFIED"],
'api_watch_events_v1beta1.txt')
driver.router.expects(:emit).times(selected_events_count).with(anything, anything, anything)
events = driver.start_watcher_thread
sleep 2
assert_equal 8, events.length
assert_equal 7, selected_events_count
end

test 'no events are ingested with too old resource version error' do
config = %([])
driver = create_driver(config).instance
driver.instance_variable_set(:@client, @client)
connect_kubernetes_with_api_version(driver)
driver.instance_variable_set(:@clients, @clients)
driver.instance_variable_set(:@last_recreated, 0)

mock_get_events
mock_get_events('api_list_events_v1.json')
mock_patch_config_map(2346293)
mock_watch_events('api_watch_events_error_v1.txt')
driver.router.expects(:emit).never.with(anything, anything, anything)
Expand Down
Loading