diff --git a/src/kubernetes.cc b/src/kubernetes.cc index ad1767f2..056d75e0 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -950,6 +950,28 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config, config.KubernetesUpdaterIntervalSeconds(), [=]() { return reader_.MetadataQuery(); }) { } +const std::vector& +KubernetesUpdater::ClusterLevelObjectTypes() { + static const std::vector* cluster_level_object_types = + new std::vector{ + {"cronjobs", "batch/v1beta1"}, + {"daemonsets", "apps/v1"}, + {"daemonsets", "extensions/v1beta1"}, + {"deployments", "apps/v1"}, + {"deployments", "extensions/v1beta1"}, + {"endpoints", "v1"}, + {"ingresses", "extensions/v1beta1"}, + {"jobs", "batch/v1"}, + {"namespaces", "v1"}, + {"replicasets", "apps/v1"}, + {"replicasets", "extensions/v1beta1"}, + {"replicationcontrollers", "v1"}, + {"services", "v1"}, + {"statefulsets", "apps/v1"}, + }; + return *cluster_level_object_types; +} + void KubernetesUpdater::ValidateDynamicConfiguration() const throw(ConfigurationValidationError) { PollingMetadataUpdater::ValidateDynamicConfiguration(); @@ -982,20 +1004,20 @@ void KubernetesUpdater::StartUpdater() { auto cb = [=](std::vector&& results) { MetadataCallback(std::move(results)); }; - node_watch_thread_ = std::thread([=]() { + object_watch_threads_.emplace(WatchId("nodes", "v1"), std::thread([=]() { reader_.WatchNodes(watched_node, cb); - }); - pod_watch_thread_ = std::thread([=]() { + })); + object_watch_threads_.emplace(WatchId("pods", "v1"), std::thread([=]() { reader_.WatchPods(watched_node, cb); - }); - if (config().KubernetesClusterLevelMetadata() && - config().KubernetesServiceMetadata()) { - service_watch_thread_ = std::thread([=]() { - reader_.WatchObjects("services", "v1", cb); - }); - endpoints_watch_thread_ = std::thread([=]() { - reader_.WatchObjects("endpoints", "v1", cb); - }); + })); + if (config().KubernetesClusterLevelMetadata()) { + for (const auto& watch_id: ClusterLevelObjectTypes()) { + const std::string& plural_kind = watch_id.first; + const std::string& api_version = watch_id.second; + object_watch_threads_.emplace(watch_id, std::thread([=]() { + reader_.WatchObjects(plural_kind, api_version, cb); + })); + } } } else { // Only try to poll if watch is disabled. diff --git a/src/kubernetes.h b/src/kubernetes.h index 88ce9c62..4b0ca19e 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -174,9 +174,6 @@ class KubernetesReader { mutable std::string current_node_; mutable std::string kubernetes_api_token_; mutable std::string kubernetes_namespace_; - // A memoized map from version to a map from kind to name. - mutable std::map> - version_to_kind_to_name_; const Configuration& config_; HealthChecker* health_checker_; @@ -189,17 +186,11 @@ class KubernetesUpdater : public PollingMetadataUpdater { KubernetesUpdater(const Configuration& config, HealthChecker* health_checker, MetadataStore* store); ~KubernetesUpdater() { - if (node_watch_thread_.joinable()) { - node_watch_thread_.join(); - } - if (pod_watch_thread_.joinable()) { - pod_watch_thread_.join(); - } - if (service_watch_thread_.joinable()) { - service_watch_thread_.join(); - } - if (endpoints_watch_thread_.joinable()) { - endpoints_watch_thread_.join(); + for (auto& thread_it: object_watch_threads_) { + std::thread& watch_thread = thread_it.second; + if (watch_thread.joinable()) { + watch_thread.join(); + } } } @@ -211,15 +202,18 @@ class KubernetesUpdater : public PollingMetadataUpdater { void NotifyStopUpdater(); private: + // WatchId combines the plural Kubernetes kind and API version. + using WatchId = std::pair; + // List of cluster level objects to watch. + static const std::vector& ClusterLevelObjectTypes(); + // Metadata watcher callback. void MetadataCallback(std::vector&& result_vector); KubernetesReader reader_; HealthChecker* health_checker_; - std::thread node_watch_thread_; - std::thread pod_watch_thread_; - std::thread service_watch_thread_; - std::thread endpoints_watch_thread_; + // Map from the watch IDs to the respective threads. + std::map object_watch_threads_; }; }