Skip to content

Commit

Permalink
Merge pull request #2378 from cramertj/grpc-meta
Browse files Browse the repository at this point in the history
Add metadata support to GRPC plugin
  • Loading branch information
octo committed Jul 27, 2017
2 parents 7d462fc + fdee620 commit 6d79874
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 4 deletions.
13 changes: 12 additions & 1 deletion proto/types.proto
Expand Up @@ -38,6 +38,16 @@ message Identifier {
string type_instance = 5;
}

message MetadataValue {
oneof value {
string string_value = 1;
int64 int64_value = 2;
uint64 uint64_value = 3;
double double_value = 4;
bool bool_value = 5;
};
}

message Value {
oneof value {
uint64 counter = 1;
Expand All @@ -56,4 +66,5 @@ message ValueList {
Identifier identifier = 4;

repeated string ds_names = 5;
}
map<string, MetadataValue> meta_data = 6;
}
9 changes: 9 additions & 0 deletions src/daemon/utils_cache.c
Expand Up @@ -913,6 +913,15 @@ int uc_iterator_get_interval(uc_iter_t *iter, cdtime_t *ret_interval) {
return 0;
} /* int uc_iterator_get_name */

int uc_iterator_get_meta(uc_iter_t *iter, meta_data_t **ret_meta) {
if ((iter == NULL) || (iter->entry == NULL) || (ret_meta == NULL))
return -1;

*ret_meta = meta_data_clone(iter->entry->meta);

return 0;
} /* int uc_iterator_get_meta */

/*
* Meta data interface
*/
Expand Down
2 changes: 2 additions & 0 deletions src/daemon/utils_cache.h
Expand Up @@ -106,6 +106,8 @@ int uc_iterator_get_values(uc_iter_t *iter, value_t **ret_values,
size_t *ret_num);
/* Return the interval of the value at the current position. */
int uc_iterator_get_interval(uc_iter_t *iter, cdtime_t *ret_interval);
/* Return the metadata for the value at the current position. */
int uc_iterator_get_meta(uc_iter_t *iter, meta_data_t **ret_meta);

/*
* Meta data interface
Expand Down
145 changes: 142 additions & 3 deletions src/grpc.cc
Expand Up @@ -56,6 +56,8 @@ using collectd::QueryValuesResponse;

using google::protobuf::util::TimeUtil;

typedef google::protobuf::Map<grpc::string, collectd::types::MetadataValue> grpcMetadata;

/*
* private types
*/
Expand Down Expand Up @@ -154,6 +156,124 @@ static grpc::Status unmarshal_ident(const collectd::types::Identifier &msg,
return grpc::Status::OK;
} /* unmarshal_ident() */

static grpc::Status marshal_meta_data(meta_data_t *meta,
grpcMetadata *mutable_meta_data) {
char **meta_data_keys = nullptr;
int meta_data_keys_len = meta_data_toc(meta, &meta_data_keys);
if (meta_data_keys_len < 0) {
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("error getting metadata keys"));
}

for (int i = 0; i < meta_data_keys_len; i++) {
char *key = meta_data_keys[i];
int md_type = meta_data_type(meta, key);

collectd::types::MetadataValue md_value;
md_value.Clear();

switch (md_type) {
case MD_TYPE_STRING:
char *md_string;
if (meta_data_get_string(meta, key, &md_string) != 0 || md_string == nullptr) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_string_value(md_string);
free(md_string);
break;
case MD_TYPE_SIGNED_INT:
int64_t int64_value;
if (meta_data_get_signed_int(meta, key, &int64_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_int64_value(int64_value);
break;
case MD_TYPE_UNSIGNED_INT:
uint64_t uint64_value;
if (meta_data_get_unsigned_int(meta, key, &uint64_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_uint64_value(uint64_value);
break;
case MD_TYPE_DOUBLE:
double double_value;
if (meta_data_get_double(meta, key, &double_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_double_value(double_value);
break;
case MD_TYPE_BOOLEAN:
bool bool_value;
if (meta_data_get_boolean(meta, key, &bool_value) != 0) {
strarray_free(meta_data_keys, meta_data_keys_len);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("missing metadata"));
}
md_value.set_bool_value(bool_value);
break;
default:
strarray_free(meta_data_keys, meta_data_keys_len);
ERROR("grpc: invalid metadata type (%d)", md_type);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("unknown metadata type"));
}

(*mutable_meta_data)[grpc::string(key)] = md_value;

strarray_free(meta_data_keys, meta_data_keys_len);
}

return grpc::Status::OK;
}

static grpc::Status unmarshal_meta_data(const grpcMetadata &rpc_metadata,
meta_data_t **md_out) {
*md_out = meta_data_create();
if (*md_out == nullptr) {
return grpc::Status(grpc::StatusCode::RESOURCE_EXHAUSTED,
grpc::string("failed to metadata list"));
}
for (auto kv: rpc_metadata) {
auto k = kv.first.c_str();
auto v = kv.second;

// The meta_data collection individually allocates copies of the keys and
// string values for each entry, so it's safe for us to pass a reference
// to our short-lived strings.

switch (v.value_case()) {
case collectd::types::MetadataValue::ValueCase::kStringValue:
meta_data_add_string(*md_out, k, v.string_value().c_str());
break;
case collectd::types::MetadataValue::ValueCase::kInt64Value:
meta_data_add_signed_int(*md_out, k, v.int64_value());
break;
case collectd::types::MetadataValue::ValueCase::kUint64Value:
meta_data_add_unsigned_int(*md_out, k, v.uint64_value());
break;
case collectd::types::MetadataValue::ValueCase::kDoubleValue:
meta_data_add_double(*md_out, k, v.double_value());
break;
case collectd::types::MetadataValue::ValueCase::kBoolValue:
meta_data_add_boolean(*md_out, k, v.bool_value());
break;
default:
meta_data_destroy(*md_out);
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
grpc::string("Metadata of unknown type"));
}
}
return grpc::Status::OK;
}

static grpc::Status marshal_value_list(const value_list_t *vl,
collectd::types::ValueList *msg) {
auto id = msg->mutable_identifier();
Expand All @@ -170,9 +290,18 @@ static grpc::Status marshal_value_list(const value_list_t *vl,
msg->set_allocated_time(new google::protobuf::Timestamp(t));
msg->set_allocated_interval(new google::protobuf::Duration(d));

msg->clear_meta_data();
if (vl->meta != nullptr) {
grpc::Status status = marshal_meta_data(vl->meta, msg->mutable_meta_data());
if (!status.ok()) {
return status;
}
}

for (size_t i = 0; i < vl->values_len; ++i) {
auto v = msg->add_values();
switch (ds->ds[i].type) {
int value_type = ds->ds[i].type;
switch (value_type) {
case DS_TYPE_COUNTER:
v->set_counter(vl->values[i].counter);
break;
Expand All @@ -186,6 +315,7 @@ static grpc::Status marshal_value_list(const value_list_t *vl,
v->set_absolute(vl->values[i].absolute);
break;
default:
ERROR("grpc: invalid value type (%d)", value_type);
return grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("unknown value type"));
}
Expand All @@ -207,6 +337,10 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
if (!status.ok())
return status;

status = unmarshal_meta_data(msg.meta_data(), &vl->meta);
if (!status.ok())
return status;

value_t *values = NULL;
size_t values_len = 0;

Expand Down Expand Up @@ -249,7 +383,8 @@ static grpc::Status unmarshal_value_list(const collectd::types::ValueList &msg,
if (status.ok()) {
vl->values = values;
vl->values_len = values_len;
} else if (values) {
} else {
meta_data_destroy(vl->meta);
free(values);
}

Expand Down Expand Up @@ -280,6 +415,7 @@ class CollectdImpl : public collectd::Collectd::Service {
auto vl = value_lists.front();
value_lists.pop();
sfree(vl.values);
meta_data_destroy(vl.meta);
}

return status;
Expand Down Expand Up @@ -328,7 +464,6 @@ class CollectdImpl : public collectd::Collectd::Service {

if (!ident_matches(&vl, match))
continue;

if (uc_iterator_get_time(iter, &vl.time) < 0) {
status =
grpc::Status(grpc::StatusCode::INTERNAL,
Expand All @@ -346,6 +481,10 @@ class CollectdImpl : public collectd::Collectd::Service {
grpc::string("failed to retrieve values"));
break;
}
if (uc_iterator_get_meta(iter, &vl.meta) < 0) {
status = grpc::Status(grpc::StatusCode::INTERNAL,
grpc::string("failed to retrieve value metadata"));
}

value_lists->push(vl);
} // while (uc_iterator_next(iter, &name) == 0)
Expand Down

0 comments on commit 6d79874

Please sign in to comment.