Skip to content

Commit

Permalink
Add MetricDefs, static definitions of metric metadata generated from …
Browse files Browse the repository at this point in the history
…json

Adds a static definition of the metric metadata used by Impala. The
metric names, descriptions, and other properties are defined in
common/thrift/metrics.json file, and the generate_metrics.py script
creates a thrift representation. The metric definitions are then
available in a constant map which is used at runtime to instantiate
metrics, looking them up in the map by the metric key.

New metrics should be defined by adding an entry to the list of metrics
in metrics.json with the following properties:

key:         The unique string identifying the metric. If the metric can
             be templated, e.g. rpc call duration, it may be a format
             string (in the format used by strings::Substitute()).
description: A text description of the metric. May also be a format
             string.
label:       A brief title for the metric, not currently used by
             Impala but provided for external tools.
units:       The unit of the metric. Must be a valid value of TUnit.
kind:        The kind of metric, e.g. GAUGE or COUNTER. Must be a valid
             value of TMetricKind.
contexts:    The context in which this metric may be instantiated.
             Usually "IMPALAD", "STATESTORED", "CATALOGD", but may be
             a different kind of 'entity'. Not currently used by
             Impala but provided for modeling purposes for external
             tools.

For example, adding the counter for the total number of queries run over
the lifetime of the impalad process might look like:

  {
    "key": "impala-server.num-queries",
    "description": "The total number of queries processed.",
    "label": "Queries",
    "units": "UNIT",
    "kind": "COUNTER",
    "contexts": [
      "IMPALAD"
    ]
  }

TODO: Incorporate 'label' into the metrics debug page.
TODO: Verify the context at runtime, e.g. verify 'contexts' contains,
      e.g. a DCHECK.

After the metric definition is added, the generate_metrics.py script
will generate the TMetricDefs.thrift that contains a TMetricDef for
the metric definition. At runtime, the metric can be instantiated
using the key defined in metrics.json. Gauges, Counters, and
Properties are instantiated using static methods on MetricGroup. Other
metric types are instantiated using static CreateAndRegister methods
on their associated classes.

TODO: Generate a thrift enum used to lookup metric defs.
TODO: Consolidate the instantiation of metrics that are created
      outside of metrics.h (i.e. collection metrics, memory metrics).
TODO: Need a better way to verify if metric definitions are missing.

Change-Id: Iba7f94144d0c34f273c502ce6b9a2130ea8fedaa
Reviewed-on: http://gerrit.cloudera.org:8080/330
Reviewed-by: Matthew Jacobs <mj@cloudera.com>
Tested-by: Internal Jenkins
  • Loading branch information
Matthew Jacobs authored and Internal Jenkins committed May 14, 2015
1 parent 46c6e8d commit 8ac755b
Show file tree
Hide file tree
Showing 28 changed files with 1,774 additions and 267 deletions.
3 changes: 0 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,3 @@ tests/test-hive-udfs/target/
cdh-*-hdfs-data/
avro_schemas/
cluster_logs/

# This file is auto-generated in the build process
common/thrift/ErrorCodes.thrift
4 changes: 4 additions & 0 deletions be/generated-sources/gen-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ set(SRC_FILES
JniCatalog_constants.cpp
JniCatalog_types.cpp
Logging_types.cpp
Metrics_constants.cpp
Metrics_types.cpp
MetricDefs_constants.cpp
MetricDefs_types.cpp
NetworkTest_constants.cpp
NetworkTest_types.cpp
NetworkTestService.cpp
Expand Down
6 changes: 2 additions & 4 deletions be/src/catalog/catalog-server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,12 +150,10 @@ CatalogServer::CatalogServer(MetricGroup* metrics)
thrift_serializer_(FLAGS_compact_catalog_topic), metrics_(metrics),
topic_updates_ready_(false), last_sent_catalog_version_(0L),
catalog_objects_min_version_(0L), catalog_objects_max_version_(0L) {
topic_processing_time_metric_ = metrics_->RegisterMetric(
new StatsMetric<double>(CATALOG_SERVER_TOPIC_PROCESSING_TIMES,
TUnit::TIME_S));
topic_processing_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
CATALOG_SERVER_TOPIC_PROCESSING_TIMES);
}


Status CatalogServer::Start() {
TNetworkAddress subscriber_address =
MakeNetworkAddress(FLAGS_hostname, FLAGS_state_store_subscriber_port);
Expand Down
3 changes: 1 addition & 2 deletions be/src/catalog/catalogd-main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ int main(int argc, char** argv) {
StartThreadInstrumentation(metrics.get(), webserver.get());

InitRpcEventTracing(webserver.get());
metrics->AddProperty<string>("catalog.version", GetVersionString(true),
"catalogd build version");
metrics->AddProperty<string>("catalog.version", GetVersionString(true));

CatalogServer catalog_server(metrics.get());
EXIT_IF_ERROR(catalog_server.Start());
Expand Down
79 changes: 22 additions & 57 deletions be/src/resourcebroker/resource-broker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,77 +106,42 @@ ResourceBroker::ResourceBroker(const vector<TNetworkAddress>& llama_addresses,
active_llama_handle_metric_ = metrics->AddProperty<string>(
"resource-broker.active-llama-handle", "none");

reservation_rpc_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.reservation-request-rpc-time",
TUnit::TIME_S, "The time, in seconds, that a Reserve() RPC takes to "
"Llama"));
reservation_response_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.reservation-request-response-time",
TUnit::TIME_S, "The time, in seconds, that a reservation request takes "
"to be fulfilled by Llama"));
reservation_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.reservation-request-rpc-time");
reservation_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.reservation-request-response-time");
reservation_requests_total_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-total", 0, TUnit::UNIT,
"The total number of reservation requests made by this Impala daemon to Llama");
"resource-broker.reservation-requests-total", 0);
reservation_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-fulfilled", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama "
"which succeeded");
"resource-broker.reservation-requests-fulfilled", 0);
reservation_requests_failed_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-failed", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama which "
"failed");
"resource-broker.reservation-requests-failed", 0);
reservation_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-rejected", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama "
"which were rejected");
"resource-broker.reservation-requests-rejected", 0);
reservation_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.reservation-requests-timedout", 0, TUnit::UNIT,
"The number of reservation requests made by this Impala daemon to Llama "
"which timed out");

expansion_rpc_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.expansion-request-rpc-time",
TUnit::TIME_S,
"The time, in seconds, that a Reserve() RPC takes to Llama"));
expansion_response_time_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("resource-broker.expansion-request-response-time",
TUnit::TIME_S, "The time, in seconds, that a expansion request takes "
"to be fulfilled by Llama"));
"resource-broker.reservation-requests-timedout", 0);

expansion_rpc_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.expansion-request-rpc-time");
expansion_response_time_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"resource-broker.expansion-request-response-time");
expansion_requests_total_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-total", 0, TUnit::UNIT,
"The total number of expansion requests made by this Impala daemon to Llama");
"resource-broker.expansion-requests-total", 0);
expansion_requests_fulfilled_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-fulfilled", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama "
"which succeeded");
"resource-broker.expansion-requests-fulfilled", 0);
expansion_requests_failed_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-failed", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama which "
"failed");
"resource-broker.expansion-requests-failed", 0);
expansion_requests_rejected_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-rejected", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama "
"which were rejected");
"resource-broker.expansion-requests-rejected", 0);
expansion_requests_timedout_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.expansion-requests-timedout", 0, TUnit::UNIT,
"The number of expansion requests made by this Impala daemon to Llama "
"which timed out");
"resource-broker.expansion-requests-timedout", 0);

requests_released_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.requests-released", 0, TUnit::UNIT,
"The number of resource-release requests received from Llama");

"resource-broker.requests-released", 0);
allocated_memory_metric_ = metrics->AddGauge<uint64_t>(
"resource-broker.memory-resources-in-use", 0L, TUnit::BYTES, "The total"
" number of bytes currently allocated to this Impala daemon by Llama");

"resource-broker.memory-resources-in-use", 0L);
allocated_vcpus_metric_ = metrics->AddGauge<uint64_t>(
"resource-broker.vcpu-resources-in-use", 0, TUnit::UNIT, "The total number "
"of vcpus currently allocated to this Impala daemon by Llama");

requests_released_metric_ = metrics->AddCounter<int64_t>(
"resource-broker.requests-released", 0, TUnit::UNIT, "The total number of "
"resource allocations released by this Impala daemon");
"resource-broker.vcpu-resources-in-use", 0);
}

Status ResourceBroker::Init() {
Expand Down
10 changes: 6 additions & 4 deletions be/src/rpc/rpc-trace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ using namespace impala;
using namespace rapidjson;
using namespace strings;

// Metric key format for rpc call duration metrics.
const string RPC_TIME_STATS_METRIC_KEY = "rpc-method.$0.call_duration";

// Singleton class to keep track of all RpcEventHandlers, and to render them to a
// web-based summary page.
class RpcEventHandlerManager {
Expand Down Expand Up @@ -165,10 +168,9 @@ void* RpcEventHandler::getContext(const char* fn_name, void* server_context) {
if (it == method_map_.end()) {
MethodDescriptor* descriptor = new MethodDescriptor();
descriptor->name = fn_name;
const string& time_metric_name =
Substitute("rpc-method.$0.$1.call_duration", server_name_, descriptor->name);
descriptor->time_stats = metrics_->RegisterMetric(
new StatsMetric<double>(time_metric_name, TUnit::TIME_MS));
const string& rpc_name = Substitute("$0.$1", server_name_, descriptor->name);
descriptor->time_stats = StatsMetric<double>::CreateAndRegister(metrics_,
RPC_TIME_STATS_METRIC_KEY, rpc_name);
it = method_map_.insert(make_pair(descriptor->name, descriptor)).first;
}
}
Expand Down
9 changes: 8 additions & 1 deletion be/src/runtime/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,14 @@ Status Coordinator::Exec(QuerySchedule& schedule,

query_events_->MarkEvent("Ready to start remote fragments");
int backend_num = 0;
StatsMetric<double> latencies("fragment-latencies", TUnit::TIME_NS);

// TODO: Add a runtime-profile stats mechanism so this doesn't need to create a
// non-registered TMetricDef.
TMetricDef md;
md.__set_key("fragment-latencies");
md.__set_units(TUnit::TIME_NS);
md.__set_kind(TMetricKind::STATS);
StatsMetric<double> latencies(md);
for (int fragment_idx = (has_coordinator_fragment ? 1 : 0);
fragment_idx < request.fragments.size(); ++fragment_idx) {
const FragmentExecParams& params = (*fragment_exec_params)[fragment_idx];
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/mem-tracker-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
}

TEST(MemTestTest, ConsumptionMetric) {
UIntGauge metric("test", TUnit::BYTES, 0);
TMetricDef md;
md.__set_key("test");
md.__set_units(TUnit::BYTES);
md.__set_kind(TMetricKind::GAUGE);
UIntGauge metric(md, 0);
EXPECT_EQ(metric.value(), 0);

MemTracker t(&metric, 100, -1, "");
Expand Down
7 changes: 3 additions & 4 deletions be/src/runtime/mem-tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,14 @@ MemTracker::~MemTracker() {
}

void MemTracker::RegisterMetrics(MetricGroup* metrics, const string& prefix) {
num_gcs_metric_ = metrics->AddCounter(
Substitute("$0.num-gcs", prefix), 0L, TUnit::UNIT);
num_gcs_metric_ = metrics->AddCounter(Substitute("$0.num-gcs", prefix), 0L);

// TODO: Consider a total amount of bytes freed counter
bytes_freed_by_last_gc_metric_ = metrics->AddGauge<int64_t>(
Substitute("$0.bytes-freed-by-last-gc", prefix), -1, TUnit::BYTES);
Substitute("$0.bytes-freed-by-last-gc", prefix), -1);

bytes_over_limit_metric_ = metrics->AddGauge<int64_t>(
Substitute("$0.bytes-over-limit", prefix), -1, TUnit::BYTES);
Substitute("$0.bytes-over-limit", prefix), -1);
}

// Calling this on the query tracker results in output like:
Expand Down
30 changes: 15 additions & 15 deletions be/src/scheduling/admission-controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -690,40 +690,40 @@ AdmissionController::GetPoolMetrics(const string& pool_name) {

PoolMetrics* pool_metrics = &pool_metrics_map_[pool_name];
pool_metrics->local_admitted = metrics_->AddCounter(
Substitute(LOCAL_ADMITTED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_ADMITTED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_queued = metrics_->AddCounter(
Substitute(LOCAL_QUEUED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_QUEUED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_dequeued = metrics_->AddCounter(
Substitute(LOCAL_DEQUEUED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_DEQUEUED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_rejected = metrics_->AddCounter(
Substitute(LOCAL_REJECTED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_REJECTED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_timed_out = metrics_->AddCounter(
Substitute(LOCAL_TIMED_OUT_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_TIMED_OUT_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_completed = metrics_->AddCounter(
Substitute(LOCAL_COMPLETED_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_COMPLETED_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_time_in_queue_ms = metrics_->AddCounter(
Substitute(LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_TIME_IN_QUEUE_METRIC_KEY_FORMAT, 0L, pool_name);

pool_metrics->cluster_num_running = metrics_->AddGauge(
Substitute(CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_NUM_RUNNING_METRIC_KEY_FORMAT, 0L, pool_name);

pool_metrics->cluster_in_queue = metrics_->AddGauge(
Substitute(CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_IN_QUEUE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->cluster_mem_usage = metrics_->AddGauge(
Substitute(CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_MEM_USAGE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->cluster_mem_estimate = metrics_->AddGauge(
Substitute(CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT, pool_name), 0L);
CLUSTER_MEM_ESTIMATE_METRIC_KEY_FORMAT, 0L, pool_name);

pool_metrics->local_num_running = metrics_->AddGauge(
Substitute(LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_NUM_RUNNING_METRIC_KEY_FORMAT, 0L, pool_name);

pool_metrics->local_in_queue = metrics_->AddGauge(
Substitute(LOCAL_IN_QUEUE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_IN_QUEUE_METRIC_KEY_FORMAT, 0L, pool_name);

pool_metrics->local_mem_usage = metrics_->AddGauge(
Substitute(LOCAL_MEM_USAGE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_MEM_USAGE_METRIC_KEY_FORMAT, 0L, pool_name);
pool_metrics->local_mem_estimate = metrics_->AddGauge(
Substitute(LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT, pool_name), 0L);
LOCAL_MEM_ESTIMATE_METRIC_KEY_FORMAT, 0L, pool_name);
return pool_metrics;
}
}
4 changes: 2 additions & 2 deletions be/src/scheduling/request-pool-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ const string RESOLVE_POOL_METRIC_NAME = "request-pool-service.resolve-pool-durat
RequestPoolService::RequestPoolService(MetricGroup* metrics) :
metrics_(metrics), resolve_pool_ms_metric_(NULL) {
DCHECK(metrics_ != NULL);
resolve_pool_ms_metric_ = metrics_->RegisterMetric(
new StatsMetric<double>(RESOLVE_POOL_METRIC_NAME, TUnit::TIME_MS));
resolve_pool_ms_metric_ =
StatsMetric<double>::CreateAndRegister(metrics_, RESOLVE_POOL_METRIC_NAME);

if (FLAGS_fair_scheduler_allocation_path.empty() &&
FLAGS_llama_site_path.empty()) {
Expand Down
24 changes: 9 additions & 15 deletions be/src/statestore/statestore-subscriber.cc
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,15 @@ StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
"statestore-subscriber.last-recovery-duration", 0.0);
last_recovery_time_metric_ = metrics_->AddProperty<string>(
"statestore-subscriber.last-recovery-time", "N/A");
topic_update_interval_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("statestore-subscriber.topic-update-interval-time",
TUnit::TIME_S));
topic_update_duration_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("statestore-subscriber.topic-update-duration",
TUnit::TIME_S));
heartbeat_interval_metric_ = metrics->RegisterMetric(
new StatsMetric<double>("statestore-subscriber.heartbeat-interval-time",
TUnit::TIME_S));
topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.topic-update-interval-time");
topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.topic-update-duration");
heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.heartbeat-interval-time");

registration_id_metric_ = metrics->AddProperty<string>(
"statestore-subscriber.registration-id", "N/A",
"The most recent registration ID for this subscriber with the statestore. Set to "
"'N/A' if no registration has been completed");
"statestore-subscriber.registration-id", "N/A");

client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
}
Expand All @@ -130,9 +125,8 @@ Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
Callbacks* cb = &(update_callbacks_[topic_id]);
cb->callbacks.push_back(callback);
if (cb->processing_time_metric == NULL) {
const string& metric_name = Substitute(CALLBACK_METRIC_PATTERN, topic_id);
cb->processing_time_metric = metrics_->RegisterMetric(
new StatsMetric<double>(metric_name, TUnit::TIME_S));
cb->processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
CALLBACK_METRIC_PATTERN, topic_id);
}
topic_registrations_[topic_id] = is_transient;
return Status::OK;
Expand Down
13 changes: 6 additions & 7 deletions be/src/statestore/statestore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,17 +234,16 @@ Statestore::Statestore(MetricGroup* metrics)
DCHECK(metrics != NULL);
num_subscribers_metric_ =
metrics->AddGauge(STATESTORE_LIVE_SUBSCRIBERS, 0L);
subscriber_set_metric_ =
metrics->RegisterMetric(new SetMetric<string>(STATESTORE_LIVE_SUBSCRIBERS_LIST,
set<string>()));
subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
key_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0L);
value_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0L);
topic_size_metric_ = metrics->AddGauge(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0L);

topic_update_duration_metric_ = metrics->RegisterMetric(
new StatsMetric<double>(STATESTORE_UPDATE_DURATION, TUnit::TIME_S));
heartbeat_duration_metric_ = metrics->RegisterMetric(
new StatsMetric<double>(STATESTORE_HEARTBEAT_DURATION, TUnit::TIME_S));
topic_update_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
heartbeat_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);

update_state_client_cache_->InitMetrics(metrics, "subscriber-update-state");
heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
Expand Down
3 changes: 1 addition & 2 deletions be/src/util/cgroups-mgr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ const std::string IMPALA_CGROUP_SUFFIX = "_impala";
const int32_t CPU_DEFAULT_WEIGHT = 1024;

CgroupsMgr::CgroupsMgr(MetricGroup* metrics) {
active_cgroups_metric_ =
metrics->AddCounter<int64_t>("cgroups-mgr.active-cgroups", 0);
active_cgroups_metric_ = metrics->AddGauge<int64_t>("cgroups-mgr.active-cgroups", 0);
}

Status CgroupsMgr::Init(const string& cgroups_hierarchy_path,
Expand Down
2 changes: 1 addition & 1 deletion be/src/util/cgroups-mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class CgroupsMgr {
std::string* cgroup_path, std::string* tasks_path) const;

/// Number of currently active Impala-managed cgroups.
IntCounter* active_cgroups_metric_;
IntGauge* active_cgroups_metric_;

/// Root of the CPU cgroup hierarchy. Created cgroups are placed directly under it.
std::string cgroups_hierarchy_path_;
Expand Down
21 changes: 17 additions & 4 deletions be/src/util/collection-metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,15 @@ namespace impala {
template <typename T>
class SetMetric : public Metric {
public:
SetMetric(const std::string& key, const std::set<T>& value,
const std::string& description = "") : Metric(key, description), value_(value) { }
static SetMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::set<T>& value) {
return metrics->RegisterMetric(new SetMetric(MetricDefs::Get(key), value));
}

SetMetric(const TMetricDef& def, const std::set<T>& value)
: Metric(def), value_(value) {
DCHECK_EQ(def.kind, TMetricKind::SET);
}

/// Put an item in this set.
void Add(const T& item) {
Expand Down Expand Up @@ -106,8 +113,14 @@ class SetMetric : public Metric {
template <typename T>
class StatsMetric : public Metric {
public:
StatsMetric(const std::string& key, const TUnit::type unit,
const std::string& description = "") : Metric(key, description), unit_(unit) { }
static StatsMetric* CreateAndRegister(MetricGroup* metrics, const std::string& key,
const std::string& arg = "") {
return metrics->RegisterMetric(new StatsMetric(MetricDefs::Get(key, arg)));
}

StatsMetric(const TMetricDef& def) : Metric(def), unit_(def.units) {
DCHECK_EQ(def.kind, TMetricKind::STATS);
}

void Update(const T& value) {
boost::lock_guard<boost::mutex> l(lock_);
Expand Down
Loading

0 comments on commit 8ac755b

Please sign in to comment.