From 6cf75ce7a8aaa5168b095d1bfc16aac341fd02f7 Mon Sep 17 00:00:00 2001 From: Erjan Altena Date: Wed, 2 Nov 2016 14:26:17 +0100 Subject: [PATCH] CELIX-385: Added Etcd library which can be used by other Celix subprojects (i.e. discovery-etcd) --- CMakeLists.txt | 1 + documents/subprojects/readme.md | 1 + etcdlib/CMakeLists.txt | 45 +++ etcdlib/private/src/etcd.c | 481 ++++++++++++++++++++++++++++++++ etcdlib/public/include/etcd.h | 94 +++++++ 5 files changed, 622 insertions(+) create mode 100644 etcdlib/CMakeLists.txt create mode 100644 etcdlib/private/src/etcd.c create mode 100644 etcdlib/public/include/etcd.h diff --git a/CMakeLists.txt b/CMakeLists.txt index d8dba55dd..4dafbb3b2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -78,6 +78,7 @@ set(DEFAULT_VERSION 1.0.0) add_subdirectory(config_admin) add_subdirectory(device_access) add_subdirectory(deployment_admin) +add_subdirectory(etcdlib) add_subdirectory(remote_services) add_subdirectory(remote_shell) add_subdirectory(shell_bonjour) diff --git a/documents/subprojects/readme.md b/documents/subprojects/readme.md index 92a5bc04b..17627fa24 100644 --- a/documents/subprojects/readme.md +++ b/documents/subprojects/readme.md @@ -6,6 +6,7 @@ Apache Celix is organized into several subprojects. The following subproject are * [Dependency Manager](../../dependency_manager) - A C component/dependency model for use through an API provided as library. * [C++ Dependency Manager](../../dependency_manager_cxx) - A C++ component/dependency model for use through an API provided as library. * [Device Access](../../device_access) - An implementation of the OSGi Device Access specification adapted to C. +* [Etcd library](../../etcdlib) - A C library that interfaces with ETCD. * [Examples](../../examples) - A Selection of examples showing how the framework can be used. * [Log Service](../../log_service) - An implementation of the OSGi Log Service adapated to C. * [Log Writer](../../log_writer) - A simple log writer for use in combination with the Log Service. diff --git a/etcdlib/CMakeLists.txt b/etcdlib/CMakeLists.txt new file mode 100644 index 000000000..a9a056492 --- /dev/null +++ b/etcdlib/CMakeLists.txt @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +find_package(CURL REQUIRED) +find_package(Jansson REQUIRED) + +include_directories( + ${CURL_INCLUDE_DIR} + ${JANSSON_INCLUDE_DIRS} + private/include + public/include +) + +add_library(etcdlib SHARED + private/src/etcd.c +) + +set_target_properties(etcdlib PROPERTIES "SOVERSION" 1) +target_link_libraries(etcdlib ${CURL_LIBRARIES} ${JANSSON_LIBRARY}) + +add_library(etcdlib_static STATIC + private/src/etcd.c +) + +set_target_properties(etcdlib_static PROPERTIES "SOVERSION" 1) +target_link_libraries(etcdlib_static ${CURL_LIBRARIES} ${JANSSON_LIBRARY}) + + +install(TARGETS etcdlib etcdlib_static DESTINATION ${CMAKE_INSTALL_LIBDIR} COMPONENT framework) +FILE(GLOB files "public/include/*.h") +INSTALL(FILES ${files} DESTINATION include/celix/etcdlib COMPONENT framework) diff --git a/etcdlib/private/src/etcd.c b/etcdlib/private/src/etcd.c new file mode 100644 index 000000000..9a980102c --- /dev/null +++ b/etcdlib/private/src/etcd.c @@ -0,0 +1,481 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#include +#include +#include +#include +#include + +#include "etcd.h" + +#define ETCD_JSON_NODE "node" +#define ETCD_JSON_PREVNODE "prevNode" +#define ETCD_JSON_NODES "nodes" +#define ETCD_JSON_ACTION "action" +#define ETCD_JSON_KEY "key" +#define ETCD_JSON_VALUE "value" +#define ETCD_JSON_DIR "dir" +#define ETCD_JSON_MODIFIEDINDEX "modifiedIndex" + +#define MAX_OVERHEAD_LENGTH 64 +#define DEFAULT_CURL_TIMEOUT 10 +#define DEFAULT_CURL_CONECTTIMEOUT 10 + +typedef enum { + GET, PUT, DELETE +} request_t; + +static const char* etcd_server; +static int etcd_port = 0; + +struct MemoryStruct { + char *memory; + size_t size; +}; + + +/** + * Static function declarations + */ +static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData); +static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp); +/** + * External function definition + */ + + +/** + * etcd_init + */ +int etcd_init(const char* server, int port) { + etcd_server = server; + etcd_port = port; + + return curl_global_init(CURL_GLOBAL_ALL) != 0; +} + + +/** + * etcd_get + */ +int etcd_get(const char* key, char** value, int* modifiedIndex) { + json_t* js_root = NULL; + json_t* js_node = NULL; + json_t* js_value = NULL; + json_t* js_modifiedIndex = NULL; + json_error_t error; + int res = -1; + struct MemoryStruct reply; + + reply.memory = malloc(1); /* will be grown as needed by the realloc above */ + reply.size = 0; /* no data at this point */ + + int retVal = -1; + char *url; + asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); + res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + free(url); + + if (res == CURLE_OK) { + js_root = json_loads(reply.memory, 0, &error); + + if (js_root != NULL) { + js_node = json_object_get(js_root, ETCD_JSON_NODE); + } + if (js_node != NULL) { + js_value = json_object_get(js_node, ETCD_JSON_VALUE); + js_modifiedIndex = json_object_get(js_node, + ETCD_JSON_MODIFIEDINDEX); + + if (js_modifiedIndex != NULL && js_value != NULL) { + if (modifiedIndex) { + *modifiedIndex = json_integer_value(js_modifiedIndex); + } + *value = strdup(json_string_value(js_value)); + retVal = 0; + } + } + if (js_root != NULL) { + json_decref(js_root); + } + } + + if (reply.memory) { + free(reply.memory); + } + if(retVal != 0) { + value = NULL; + } + return retVal; +} + + +static int etcd_get_recursive_values(json_t* js_root, etcd_key_value_callback callback, void *arg, json_int_t *mod_index) { + json_t *js_nodes; + if ((js_nodes = json_object_get(js_root, ETCD_JSON_NODES)) != NULL) { + // subarray + if (json_is_array(js_nodes)) { + int len = json_array_size(js_nodes); + for (int i = 0; i < len; i++) { + json_t *js_object = json_array_get(js_nodes, i); + json_t *js_mod_index = json_object_get(js_object, ETCD_JSON_MODIFIEDINDEX); + + if(js_mod_index != NULL) { + json_int_t index = json_integer_value(js_mod_index); + if(*mod_index < index) { + *mod_index = index; + } + } else { + printf("[ETCDLIB] Error: No INDEX found for key!\n"); + } + + if (json_object_get(js_object, ETCD_JSON_NODES)) { + // node contains nodes + etcd_get_recursive_values(js_object, callback, arg, mod_index); + } else { + json_t* js_key = json_object_get(js_object, ETCD_JSON_KEY); + json_t* js_value = json_object_get(js_object, ETCD_JSON_VALUE); + + if (js_key && js_value) { + if (!json_object_get(js_object, ETCD_JSON_DIR)) { + callback(json_string_value(js_key), json_string_value(js_value), arg); + } + } //else empty etcd directory, not an error. + + } + } + } else { + fprintf(stderr, "[ETCDLIB] Error: misformatted JSON: nodes element is not an array !!\n"); + } + } else { + fprintf(stderr, "[ETCDLIB] Error: nodes element not found!!\n"); + } + + return (*index > 0 ? 0 : 1); +} + +/** + * etcd_get_directory + */ +int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void* arg, long long* modifiedIndex) { + json_t* js_root = NULL; + json_t* js_rootnode = NULL; + + json_error_t error; + int res; + struct MemoryStruct reply; + + reply.memory = malloc(1); /* will be grown as needed by the realloc above */ + reply.size = 0; /* no data at this point */ + + int retVal = 0; + char *url; + + asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, directory); + + res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + free(url); + + if (res == CURLE_OK) { + js_root = json_loads(reply.memory, 0, &error); + if (js_root != NULL) { + js_rootnode = json_object_get(js_root, ETCD_JSON_NODE); + } else { + retVal = -1; + fprintf(stderr, "[ETCDLIB] Error: %s in js_root not found", ETCD_JSON_NODE); + } + if (js_rootnode != NULL) { + *modifiedIndex = 0; + retVal = etcd_get_recursive_values(js_rootnode, callback, arg, (json_int_t*)modifiedIndex); + } + if (js_root != NULL) { + json_decref(js_root); + } + } + + if (reply.memory) { + free(reply.memory); + } + + return retVal; +} + +/** + * etcd_set + */ +int etcd_set(const char* key, const char* value, int ttl, bool prevExist) { + json_error_t error; + json_t* js_root = NULL; + json_t* js_node = NULL; + json_t* js_value = NULL; + int retVal = -1; + char *url; + size_t req_len = strlen(value) + MAX_OVERHEAD_LENGTH; + char request[req_len]; + char* requestPtr = request; + int res; + struct MemoryStruct reply; + + /* Skip leading '/', etcd cannot handle this. */ + while(*key == '/') { + key++; + } + + reply.memory = calloc(1, 1); /* will be grown as needed by the realloc above */ + reply.size = 0; /* no data at this point */ + + asprintf(&url, "http://%s:%d/v2/keys/%s", etcd_server, etcd_port, key); + + requestPtr += snprintf(requestPtr, req_len, "value=%s", value); + if (ttl > 0) { + requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";ttl=%d", ttl); + } + + if (prevExist) { + requestPtr += snprintf(requestPtr, req_len-(requestPtr-request), ";prevExist=true"); + } + + res = performRequest(url, PUT, WriteMemoryCallback, request, (void*) &reply); + + if(url) { + free(url); + } + + if (res == CURLE_OK) { + js_root = json_loads(reply.memory, 0, &error); + + if (js_root != NULL) { + js_node = json_object_get(js_root, ETCD_JSON_NODE); + } + if (js_node != NULL) { + js_value = json_object_get(js_node, ETCD_JSON_VALUE); + } + if (js_value != NULL && json_is_string(js_value)) { + if(strcmp(json_string_value(js_value), value) == 0) { + retVal = 0; + } + } + if (js_root != NULL) { + json_decref(js_root); + } + } + + if (reply.memory) { + free(reply.memory); + } + + return retVal; +} + + +/** + * etcd_set_with_check + */ +int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write) { + char *etcd_value; + int result = 0; + if (etcd_get(key, &etcd_value, NULL) == 0) { + if (strcmp(etcd_value, value) != 0) { + fprintf(stderr, "[ETCDLIB] WARNING: value already exists and is different\n"); + fprintf(stderr, " key = %s\n", key); + fprintf(stderr, " old value = %s\n", etcd_value); + fprintf(stderr, " new value = %s\n", value); + result = -1; + } + if (etcd_value) { + free(etcd_value); + } + } + if(always_write || !result) { + result = etcd_set(key, value, ttl, false); + } + return result; +} + + +/** + * etcd_watch + */ +int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex) { + json_error_t error; + json_t* js_root = NULL; + json_t* js_node = NULL; + json_t* js_prevNode = NULL; + json_t* js_action = NULL; + json_t* js_value = NULL; + json_t* js_rkey = NULL; + json_t* js_prevValue = NULL; + json_t* js_modIndex = NULL; + int retVal = -1; + char *url = NULL; + int res; + struct MemoryStruct reply; + + reply.memory = malloc(1); /* will be grown as needed by the realloc above */ + reply.size = 0; /* no data at this point */ + + if (index != 0) + asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true&waitIndex=%lld", etcd_server, etcd_port, key, index); + else + asprintf(&url, "http://%s:%d/v2/keys/%s?wait=true&recursive=true", etcd_server, etcd_port, key); + res = performRequest(url, GET, WriteMemoryCallback, NULL, (void*) &reply); + if(url) + free(url); + if (res == CURLE_OK) { + js_root = json_loads(reply.memory, 0, &error); + + if (js_root != NULL) { + js_action = json_object_get(js_root, ETCD_JSON_ACTION); + js_node = json_object_get(js_root, ETCD_JSON_NODE); + js_prevNode = json_object_get(js_root, ETCD_JSON_PREVNODE); + retVal = 0; + } + if (js_prevNode != NULL) { + js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); + } + if (js_node != NULL) { + js_rkey = json_object_get(js_node, ETCD_JSON_KEY); + js_value = json_object_get(js_node, ETCD_JSON_VALUE); + js_modIndex = json_object_get(js_node, ETCD_JSON_MODIFIEDINDEX); + } + if (js_prevNode != NULL) { + js_prevValue = json_object_get(js_prevNode, ETCD_JSON_VALUE); + } + if ((prevValue != NULL) && (js_prevValue != NULL) && (json_is_string(js_prevValue))) { + + *prevValue = strdup(json_string_value(js_prevValue)); + } + if(modifiedIndex != NULL) { + if ((js_modIndex != NULL) && (json_is_integer(js_modIndex))) { + *modifiedIndex = json_integer_value(js_modIndex); + } else { + *modifiedIndex = index; + } + } + if ((rkey != NULL) && (js_rkey != NULL) && (json_is_string(js_rkey))) { + *rkey = strdup(json_string_value(js_rkey)); + + } + if ((action != NULL) && (js_action != NULL) && (json_is_string(js_action))) { + *action = strdup(json_string_value(js_action)); + } + if ((value != NULL) && (js_value != NULL) && (json_is_string(js_value))) { + *value = strdup(json_string_value(js_value)); + } + if (js_root != NULL) { + json_decref(js_root); + } + + } + + if (reply.memory) { + free(reply.memory); + } + + return retVal; +} + +/** + * etcd_del + */ +int etcd_del(const char* key) { + json_error_t error; + json_t* js_root = NULL; + json_t* js_node = NULL; + int retVal = -1; + char *url; + int res; + struct MemoryStruct reply; + + reply.memory = malloc(1); /* will be grown as needed by the realloc above */ + reply.size = 0; /* no data at this point */ + + asprintf(&url, "http://%s:%d/v2/keys/%s?recursive=true", etcd_server, etcd_port, key); + res = performRequest(url, DELETE, WriteMemoryCallback, NULL, (void*) &reply); + free(url); + + if (res == CURLE_OK) { + js_root = json_loads(reply.memory, 0, &error); + if (js_root != NULL) { + js_node = json_object_get(js_root, ETCD_JSON_NODE); + } + + if (js_node != NULL) { + retVal = 0; + } + + if (js_root != NULL) { + json_decref(js_root); + } + } + + if (reply.memory) { + free(reply.memory); + } + + return retVal; +} + + +static size_t WriteMemoryCallback(void *contents, size_t size, size_t nmemb, void *userp) { + size_t realsize = size * nmemb; + struct MemoryStruct *mem = (struct MemoryStruct *) userp; + + mem->memory = realloc(mem->memory, mem->size + realsize + 1); + if (mem->memory == NULL) { + /* out of memory! */ + fprintf(stderr, "[ETCDLIB] Error: not enough memory (realloc returned NULL)\n"); + return 0; + } + + memcpy(&(mem->memory[mem->size]), contents, realsize); + mem->size += realsize; + mem->memory[mem->size] = 0; + + return realsize; +} + +static int performRequest(char* url, request_t request, void* callback, void* reqData, void* repData) { + CURL *curl = NULL; + CURLcode res = 0; + curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_NOSIGNAL, 1); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, DEFAULT_CURL_TIMEOUT); + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, DEFAULT_CURL_CONECTTIMEOUT); + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, callback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, repData); + + if (request == PUT) { + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, reqData); + } else if (request == DELETE) { + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "DELETE"); + } else if (request == GET) { + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "GET"); + } + + res = curl_easy_perform(curl); + curl_easy_cleanup(curl); + + return res; +} diff --git a/etcdlib/public/include/etcd.h b/etcdlib/public/include/etcd.h new file mode 100644 index 000000000..ff0a21401 --- /dev/null +++ b/etcdlib/public/include/etcd.h @@ -0,0 +1,94 @@ +/** + *Licensed to the Apache Software Foundation (ASF) under one + *or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + *regarding copyright ownership. The ASF licenses this file + *to you under the Apache License, Version 2.0 (the + *"License"); you may not use this file except in compliance + *with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + *Unless required by applicable law or agreed to in writing, + *software distributed under the License is distributed on an + *"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + *specific language governing permissions and limitations + *under the License. + */ + +#ifndef CELIX_ETCDLIB_H_ +#define CELIX_ETCDLIB_H_ + +#include + +typedef void (*etcd_key_value_callback) (const char *key, const char *value, void* arg); + +/** + * @desc Initialize the ETCD-LIB with the server/port where Etcd can be reached. + * @param const char* server. String containing the IP-number of the server. + * @param int port. Port number of the server. + * @return 0 on success, non zero otherwise. + */ +int etcd_init(const char* server, int port); + +/** + * @desc Retrieve a single value from Etcd. + * @param const char* key. The Etcd-key (Note: a leading '/' should be avoided) + * @param char** value. The allocated memory contains the Etcd-value. The caller is responsible for freeing this memory. + * @param int* modifiedIndex. If not NULL the Etcd-index of the last modified value. + * @return 0 on success, non zero otherwise + */ +int etcd_get(const char* key, char** value, int* modifiedIndex); + +/** + * @desc Retrieve the contents of a directory. For every found key/value pair the given callback function is called. + * @param const char* directory. The Etcd-directory which has to be searched for keys + * @param etcd_key_value_callback callback. Callback function which is called for every found key + * @param void *arg. Argument is passed to the callback function + * @param int* modifiedIndex. If not NULL the Etcd-index of the last modified value. + * @return 0 on success, non zero otherwise + */ +int etcd_get_directory(const char* directory, etcd_key_value_callback callback, void *arg, long long* modifiedIndex); + +/** + * @desc Setting an Etcd-key/value + * @param const char* key. The Etcd-key (Note: a leading '/' should be avoided) + * @param const char* value. The Etcd-value + * @param int ttl. If non-zero this is used as the TTL value + * @param bool prevExist. If true the value is only set when the key already exists, if false it is always set + * @return 0 on success, non zero otherwise + */ +int etcd_set(const char* key, const char* value, int ttl, bool prevExist); + +/** + * @desc Setting an Etcd-key/value and checks if there is a different previuos value + * @param const char* key. The Etcd-key (Note: a leading '/' should be avoided) + * @param const char* value. The Etcd-value + * @param int ttl. If non-zero this is used as the TTL value + * @param bool always_write. If true the value is written, if false only when the given value is equal to the value in etcd. + * @return 0 on success, non zero otherwise + */ +int etcd_set_with_check(const char* key, const char* value, int ttl, bool always_write); + +/** + * @desc Deleting an Etcd-key + * @param const char* key. The Etcd-key (Note: a leading '/' should be avoided) + * @return 0 on success, non zero otherwise + */ +int etcd_del(const char* key); + +/** + * @desc Watching an etcd directory for changes + * @param const char* key. The Etcd-key (Note: a leading '/' should be avoided) + * @param long long index. The Etcd-index which the watch has to be started on. + * @param char** action. If not NULL, memory is allocated and contains the action-string. The caller is responsible of freeing the memory. + * @param char** prevValue. If not NULL, memory is allocated and contains the previous value. The caller is responsible of freeing the memory. + * @param char** value. If not NULL, memory is allocated and contains the new value. The caller is responsible of freeing the memory. + * @param char** rkey. If not NULL, memory is allocated and contains the updated key. The caller is responsible of freeing the memory. + * @param long long* modifiedIndex. If not NULL, the index of the modification is written. + * @return 0 on success, non zero otherwise + */ +int etcd_watch(const char* key, long long index, char** action, char** prevValue, char** value, char** rkey, long long* modifiedIndex); + +#endif /*CELIX_ETCDLIB_H_ */