From 79413b860be09c1f0db0d121a706cab785da80ad Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Mon, 17 Sep 2018 10:56:21 -0400 Subject: [PATCH 1/5] Add support for watching and ingesting more Kubernetes resources. The threads to watch resources other than Node and Pod are started based on the Kubernetes resource kind and API version provided in a static vector. This works for all resource types that use the generic callback. --- src/kubernetes.cc | 34 ++++++++++++++++++++++++++++------ src/kubernetes.h | 18 ++++++++---------- 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index ad1767f2..7d5763d2 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -82,6 +82,25 @@ constexpr const char kK8sNamedGroupsResourceTypeFormat[] = constexpr const char kK8sWatchPathFormat[] = "/{{api_prefix}}/{{api_version}}/watch/{{plural_kind}}{{selector}}"; +const std::vector> + kPluralKindAndApiVersions = { + {"cronjobs", "batch/v1beta1"}, + {"daemonsets", "apps/v1"}, + {"daemonsets", "extensions/v1beta1"}, + {"deployments", "apps/v1"}, + {"deployments", "extensions/v1beta1"}, + {"endpoints", "v1"}, + {"ingresses", "extensions/v1beta1"}, + {"jobs", "batch/v1"}, + {"namespaces", "v1"}, + {"replicasets", "apps/v1"}, + {"replicasets", "extensions/v1beta1"}, + {"replicationcontrollers", "v1"}, + {"services", "v1"}, + {"statefulsets", "apps/v1"}, + }; + + // Returns the full path to the secret filename. std::string SecretPath(const std::string& secret) { return std::string(kServiceAccountDirectory) + "/" + secret; @@ -990,12 +1009,15 @@ void KubernetesUpdater::StartUpdater() { }); if (config().KubernetesClusterLevelMetadata() && config().KubernetesServiceMetadata()) { - service_watch_thread_ = std::thread([=]() { - reader_.WatchObjects("services", "v1", cb); - }); - endpoints_watch_thread_ = std::thread([=]() { - reader_.WatchObjects("endpoints", "v1", cb); - }); + for (const auto& plural_kind_and_api_version: kPluralKindAndApiVersions) { + const std::string& plural_kind = plural_kind_and_api_version.first; + const std::string& api_version = plural_kind_and_api_version.second; + std::thread watch_thread([=]() { + reader_.WatchObjects(plural_kind, api_version, cb); + }); + object_watch_threads_.emplace( + plural_kind_and_api_version, std::move(watch_thread)); + } } } else { // Only try to poll if watch is disabled. diff --git a/src/kubernetes.h b/src/kubernetes.h index 88ce9c62..24e13a1c 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -174,9 +174,6 @@ class KubernetesReader { mutable std::string current_node_; mutable std::string kubernetes_api_token_; mutable std::string kubernetes_namespace_; - // A memoized map from version to a map from kind to name. - mutable std::map> - version_to_kind_to_name_; const Configuration& config_; HealthChecker* health_checker_; @@ -195,11 +192,11 @@ class KubernetesUpdater : public PollingMetadataUpdater { if (pod_watch_thread_.joinable()) { pod_watch_thread_.join(); } - if (service_watch_thread_.joinable()) { - service_watch_thread_.join(); - } - if (endpoints_watch_thread_.joinable()) { - endpoints_watch_thread_.join(); + for (auto& thread_it: object_watch_threads_) { + std::thread& watch_thread = thread_it.second; + if (watch_thread.joinable()) { + watch_thread.join(); + } } } @@ -218,8 +215,9 @@ class KubernetesUpdater : public PollingMetadataUpdater { HealthChecker* health_checker_; std::thread node_watch_thread_; std::thread pod_watch_thread_; - std::thread service_watch_thread_; - std::thread endpoints_watch_thread_; + // Map from plural kind and API version to the thread. + std::map, std::thread> + object_watch_threads_; }; } From 8a2736251a053ec0edb83f8e26affc62a1e4d225 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Wed, 19 Sep 2018 17:55:26 -0400 Subject: [PATCH 2/5] Address comments --- src/kubernetes.cc | 53 ++++++++++++++++++++++++----------------------- src/kubernetes.h | 17 ++++++--------- 2 files changed, 33 insertions(+), 37 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 7d5763d2..4d1c3dc7 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -82,25 +82,6 @@ constexpr const char kK8sNamedGroupsResourceTypeFormat[] = constexpr const char kK8sWatchPathFormat[] = "/{{api_prefix}}/{{api_version}}/watch/{{plural_kind}}{{selector}}"; -const std::vector> - kPluralKindAndApiVersions = { - {"cronjobs", "batch/v1beta1"}, - {"daemonsets", "apps/v1"}, - {"daemonsets", "extensions/v1beta1"}, - {"deployments", "apps/v1"}, - {"deployments", "extensions/v1beta1"}, - {"endpoints", "v1"}, - {"ingresses", "extensions/v1beta1"}, - {"jobs", "batch/v1"}, - {"namespaces", "v1"}, - {"replicasets", "apps/v1"}, - {"replicasets", "extensions/v1beta1"}, - {"replicationcontrollers", "v1"}, - {"services", "v1"}, - {"statefulsets", "apps/v1"}, - }; - - // Returns the full path to the secret filename. std::string SecretPath(const std::string& secret) { return std::string(kServiceAccountDirectory) + "/" + secret; @@ -969,6 +950,23 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config, config.KubernetesUpdaterIntervalSeconds(), [=]() { return reader_.MetadataQuery(); }) { } +const KubernetesUpdater::WatchId KubernetesUpdater::watch_ids_[] = { + {"cronjobs", "batch/v1beta1"}, + {"daemonsets", "apps/v1"}, + {"daemonsets", "extensions/v1beta1"}, + {"deployments", "apps/v1"}, + {"deployments", "extensions/v1beta1"}, + {"endpoints", "v1"}, + {"ingresses", "extensions/v1beta1"}, + {"jobs", "batch/v1"}, + {"namespaces", "v1"}, + {"replicasets", "apps/v1"}, + {"replicasets", "extensions/v1beta1"}, + {"replicationcontrollers", "v1"}, + {"services", "v1"}, + {"statefulsets", "apps/v1"}, +}; + void KubernetesUpdater::ValidateDynamicConfiguration() const throw(ConfigurationValidationError) { PollingMetadataUpdater::ValidateDynamicConfiguration(); @@ -1001,22 +999,25 @@ void KubernetesUpdater::StartUpdater() { auto cb = [=](std::vector&& results) { MetadataCallback(std::move(results)); }; - node_watch_thread_ = std::thread([=]() { + std::thread node_watch_thread = std::thread([=]() { reader_.WatchNodes(watched_node, cb); }); - pod_watch_thread_ = std::thread([=]() { + object_watch_threads_.emplace( + WatchId("nodes", "v1"), std::move(node_watch_thread)); + std::thread pod_watch_thread = std::thread([=]() { reader_.WatchPods(watched_node, cb); }); + object_watch_threads_.emplace( + WatchId("pods", "v1"), std::move(pod_watch_thread)); if (config().KubernetesClusterLevelMetadata() && config().KubernetesServiceMetadata()) { - for (const auto& plural_kind_and_api_version: kPluralKindAndApiVersions) { - const std::string& plural_kind = plural_kind_and_api_version.first; - const std::string& api_version = plural_kind_and_api_version.second; + for (const auto& watch_id: watch_ids_) { + const std::string& plural_kind = watch_id.first; + const std::string& api_version = watch_id.second; std::thread watch_thread([=]() { reader_.WatchObjects(plural_kind, api_version, cb); }); - object_watch_threads_.emplace( - plural_kind_and_api_version, std::move(watch_thread)); + object_watch_threads_.emplace(watch_id, std::move(watch_thread)); } } } else { diff --git a/src/kubernetes.h b/src/kubernetes.h index 24e13a1c..087c875f 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -186,12 +186,6 @@ class KubernetesUpdater : public PollingMetadataUpdater { KubernetesUpdater(const Configuration& config, HealthChecker* health_checker, MetadataStore* store); ~KubernetesUpdater() { - if (node_watch_thread_.joinable()) { - node_watch_thread_.join(); - } - if (pod_watch_thread_.joinable()) { - pod_watch_thread_.join(); - } for (auto& thread_it: object_watch_threads_) { std::thread& watch_thread = thread_it.second; if (watch_thread.joinable()) { @@ -213,11 +207,12 @@ class KubernetesUpdater : public PollingMetadataUpdater { KubernetesReader reader_; HealthChecker* health_checker_; - std::thread node_watch_thread_; - std::thread pod_watch_thread_; - // Map from plural kind and API version to the thread. - std::map, std::thread> - object_watch_threads_; + // WatchId combines the plural kubernetes kind, and API version. + using WatchId = std::pair; + // Map from the watch IDs to the respective threads. + std::map object_watch_threads_; + // List of watch IDs. + static const WatchId watch_ids_[]; }; } From 948b44fe61b6cc57a7e234ef4454fdfca91d5c77 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Thu, 20 Sep 2018 12:47:41 -0400 Subject: [PATCH 3/5] Address coments 2 --- src/kubernetes.cc | 37 +++++++++++++++++++------------------ src/kubernetes.h | 9 +++++---- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 4d1c3dc7..a9d612bc 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -950,7 +950,8 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config, config.KubernetesUpdaterIntervalSeconds(), [=]() { return reader_.MetadataQuery(); }) { } -const KubernetesUpdater::WatchId KubernetesUpdater::watch_ids_[] = { +const KubernetesUpdater::WatchId KubernetesUpdater::kClusterLevelObjectTypes[] = +{ {"cronjobs", "batch/v1beta1"}, {"daemonsets", "apps/v1"}, {"daemonsets", "extensions/v1beta1"}, @@ -999,25 +1000,25 @@ void KubernetesUpdater::StartUpdater() { auto cb = [=](std::vector&& results) { MetadataCallback(std::move(results)); }; - std::thread node_watch_thread = std::thread([=]() { - reader_.WatchNodes(watched_node, cb); - }); object_watch_threads_.emplace( - WatchId("nodes", "v1"), std::move(node_watch_thread)); - std::thread pod_watch_thread = std::thread([=]() { - reader_.WatchPods(watched_node, cb); - }); + WatchId("nodes", "v1"), + std::thread([=]() { + reader_.WatchNodes(watched_node, cb); + })); object_watch_threads_.emplace( - WatchId("pods", "v1"), std::move(pod_watch_thread)); - if (config().KubernetesClusterLevelMetadata() && - config().KubernetesServiceMetadata()) { - for (const auto& watch_id: watch_ids_) { - const std::string& plural_kind = watch_id.first; - const std::string& api_version = watch_id.second; - std::thread watch_thread([=]() { - reader_.WatchObjects(plural_kind, api_version, cb); - }); - object_watch_threads_.emplace(watch_id, std::move(watch_thread)); + WatchId("pods", "v1"), + std::thread([=]() { + reader_.WatchPods(watched_node, cb); + })); + if (config().KubernetesClusterLevelMetadata()) { + for (const auto& watch_id: kClusterLevelObjectTypes) { + const std::string plural_kind = watch_id.first; + const std::string api_version = watch_id.second; + object_watch_threads_.emplace( + watch_id, + std::thread([=]() { + reader_.WatchObjects(plural_kind, api_version, cb); + })); } } } else { diff --git a/src/kubernetes.h b/src/kubernetes.h index 087c875f..2c8f7c83 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -202,17 +202,18 @@ class KubernetesUpdater : public PollingMetadataUpdater { void NotifyStopUpdater(); private: + // WatchId combines the plural Kubernetes kind and API version. + using WatchId = std::pair; + // List of cluster level objects to watch. + static const WatchId kClusterLevelObjectTypes[]; + // Metadata watcher callback. void MetadataCallback(std::vector&& result_vector); KubernetesReader reader_; HealthChecker* health_checker_; - // WatchId combines the plural kubernetes kind, and API version. - using WatchId = std::pair; // Map from the watch IDs to the respective threads. std::map object_watch_threads_; - // List of watch IDs. - static const WatchId watch_ids_[]; }; } From 1c1d9b8962a47b0d7d7f76ced71e33881a1d521d Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Wed, 26 Sep 2018 19:03:36 -0400 Subject: [PATCH 4/5] Fix the initialization of the list of cluster level object types. --- src/kubernetes.cc | 60 +++++++++++++++++++++++------------------------ src/kubernetes.h | 4 ++-- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index a9d612bc..a8f310c4 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -950,23 +950,27 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config, config.KubernetesUpdaterIntervalSeconds(), [=]() { return reader_.MetadataQuery(); }) { } -const KubernetesUpdater::WatchId KubernetesUpdater::kClusterLevelObjectTypes[] = -{ - {"cronjobs", "batch/v1beta1"}, - {"daemonsets", "apps/v1"}, - {"daemonsets", "extensions/v1beta1"}, - {"deployments", "apps/v1"}, - {"deployments", "extensions/v1beta1"}, - {"endpoints", "v1"}, - {"ingresses", "extensions/v1beta1"}, - {"jobs", "batch/v1"}, - {"namespaces", "v1"}, - {"replicasets", "apps/v1"}, - {"replicasets", "extensions/v1beta1"}, - {"replicationcontrollers", "v1"}, - {"services", "v1"}, - {"statefulsets", "apps/v1"}, -}; +const std::vector& +KubernetesUpdater::ClusterLevelObjectTypes() { + static const std::vector* cluster_level_object_types = + new std::vector{ + {"cronjobs", "batch/v1beta1"}, + {"daemonsets", "apps/v1"}, + {"daemonsets", "extensions/v1beta1"}, + {"deployments", "apps/v1"}, + {"deployments", "extensions/v1beta1"}, + {"endpoints", "v1"}, + {"ingresses", "extensions/v1beta1"}, + {"jobs", "batch/v1"}, + {"namespaces", "v1"}, + {"replicasets", "apps/v1"}, + {"replicasets", "extensions/v1beta1"}, + {"replicationcontrollers", "v1"}, + {"services", "v1"}, + {"statefulsets", "apps/v1"}, + }; + return *cluster_level_object_types; +} void KubernetesUpdater::ValidateDynamicConfiguration() const throw(ConfigurationValidationError) { @@ -1000,20 +1004,16 @@ void KubernetesUpdater::StartUpdater() { auto cb = [=](std::vector&& results) { MetadataCallback(std::move(results)); }; - object_watch_threads_.emplace( - WatchId("nodes", "v1"), - std::thread([=]() { - reader_.WatchNodes(watched_node, cb); - })); - object_watch_threads_.emplace( - WatchId("pods", "v1"), - std::thread([=]() { - reader_.WatchPods(watched_node, cb); - })); + object_watch_threads_.emplace(WatchId("nodes", "v1"), std::thread([=]() { + reader_.WatchNodes(watched_node, cb); + })); + object_watch_threads_.emplace(WatchId("pods", "v1"), std::thread([=]() { + reader_.WatchPods(watched_node, cb); + })); if (config().KubernetesClusterLevelMetadata()) { - for (const auto& watch_id: kClusterLevelObjectTypes) { - const std::string plural_kind = watch_id.first; - const std::string api_version = watch_id.second; + for (const auto& watch_id: ClusterLevelObjectTypes()) { + const std::string& plural_kind = watch_id.first; + const std::string& api_version = watch_id.second; object_watch_threads_.emplace( watch_id, std::thread([=]() { diff --git a/src/kubernetes.h b/src/kubernetes.h index 2c8f7c83..4b0ca19e 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -203,9 +203,9 @@ class KubernetesUpdater : public PollingMetadataUpdater { private: // WatchId combines the plural Kubernetes kind and API version. - using WatchId = std::pair; + using WatchId = std::pair; // List of cluster level objects to watch. - static const WatchId kClusterLevelObjectTypes[]; + static const std::vector& ClusterLevelObjectTypes(); // Metadata watcher callback. void MetadataCallback(std::vector&& result_vector); From 7cf58a788acb22e3d11a3cc140051d1cd1410cfe Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Fri, 28 Sep 2018 14:46:08 -0400 Subject: [PATCH 5/5] Address comments 3 --- src/kubernetes.cc | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index a8f310c4..056d75e0 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -1014,11 +1014,9 @@ void KubernetesUpdater::StartUpdater() { for (const auto& watch_id: ClusterLevelObjectTypes()) { const std::string& plural_kind = watch_id.first; const std::string& api_version = watch_id.second; - object_watch_threads_.emplace( - watch_id, - std::thread([=]() { - reader_.WatchObjects(plural_kind, api_version, cb); - })); + object_watch_threads_.emplace(watch_id, std::thread([=]() { + reader_.WatchObjects(plural_kind, api_version, cb); + })); } } } else {