Permalink
Browse files

Add two new modules to handle load balancing across multiple apache s…

…ervers

within the same datacenter.

mod_heartbeat generates multicast status messages with the current number of 
clients connected, but the formated can easily be extended to include other 
things.

mod_heartmonitor collects these messages into a static file, which then can be 
used for other modules to make load balancing decisions on.

This module was originally written at Joost by Sander Striker, Justin 
Erenkrantz, and myself.  We have been given permission by our employer to 
contribute this module.

git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@721952 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent 4cab746 commit 86f171ac6a32bee05f61eea7b02fa754f468058a @pquerna pquerna committed Dec 1, 2008
View
6 CHANGES
@@ -2,6 +2,12 @@
Changes with Apache 2.3.0
[ When backported to 2.2.x, remove entry from this file ]
+ *) mod_heartmonitor: New module to collect heartbeats, and write out a file
+ so that other modules can load balance traffic as needed. [Paul Querna]
+
+ *) mod_heartbeat: New module to genarate multicast heartbeats to konw if a
+ server is online. [Paul Querna]
+
*) core: Error responses set by filters were being coerced into 500 errors,
sometimes appended to the original error response. Log entry of:
'Handler for (null) returned invalid result code -3'
View
3 modules/README
@@ -13,6 +13,9 @@ cache/
database/
The apache DBD framework manages connections to SQL backends efficiently.
+cluster/
+ Modules for working with multiple servers.
+
dav/
This directory houses modules that implement WebDAV functionality.
View
3 modules/cluster/Makefile.in
@@ -0,0 +1,3 @@
+# a modules Makefile has no explicit targets -- they will be defined by
+# whatever modules are enabled. just grab special.mk to deal with this.
+include $(top_srcdir)/build/special.mk
View
33 modules/cluster/README.heartbeat
@@ -0,0 +1,33 @@
+mod_heartbeat
+
+Broadcasts the current Apache Connection status over multicast.
+
+Example Configuration:
+ HeartbeatAddress 239.0.0.1:27999
+
+Dependencies:
+ mod_status must be either a static module, or if a dynamic module, it must be
+ loaded before mod_heartbeat.
+
+
+Consuming:
+ Every 1 second, this module generates a single multicast UDP packet,
+ containing the number of busy and idle workers.
+
+ The packet is a simple ASCII format, similiar to GET query parameters in UDP.
+
+ An Example packet:
+ v=1&ready=75&busy=0
+
+ Consumers should handle new variables besides busy and ready, separated by '&'
+ being added in the future.
+
+Misc:
+ The interval of 1 seconds is controlled by the HEARTBEAT_INTERVAL
+ compile time define. This is not currently tunable at run time. To make this
+ module send the status packet more often, you must add to the CFLAGS used to
+ compile the module to include:
+ -DHEARTBEAT_INTERVAL=3
+ Would cause the broadcasts to be sent every 3 seconds.
+
+
View
30 modules/cluster/README.heartmonitor
@@ -0,0 +1,30 @@
+mod_heartmonitor
+
+Collects the Apache Connection status data over multicast.
+
+Example Configuration:
+ # First parameter is the interface to listen on
+ HeartbeatListen 239.0.0.1:27999
+ # Absolute path, or relative path to ServerRoot
+ HeartbeatStorage logs/hb.dat
+
+Dependencies:
+ Due to a bug in APR's apr_socket_recvfrom, version 1.2.12 or newer must be
+ used:
+ <http://svn.apache.org/viewvc?view=rev&revision=467600>
+
+Consuming:
+ This module atomically writes to the configured path, a list of servers,
+ along with metadata about them.
+
+ Included data about each server:
+ - IP Address
+ - Busy Slots
+ - Open Slots
+ - Last Seen
+
+ Every 5 seconds, this file will be updated with the current status of the
+ cluster.
+
+
+
View
7 modules/cluster/config.m4
@@ -0,0 +1,7 @@
+
+APACHE_MODPATH_INIT(cluster)
+
+APACHE_MODULE(heartbeat, Generates Heartbeats, , , most)
+APACHE_MODULE(heartmonitor, Collects Heartbeats, , , most)
+
+APACHE_MODPATH_FINISH
View
354 modules/cluster/mod_heartbeat.c
@@ -0,0 +1,354 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "apr_strings.h"
+
+#include "ap_mpm.h"
+#include "scoreboard.h"
+
+#ifndef HEARTBEAT_INTERVAL
+#define HEARTBEAT_INTERVAL (1)
+#endif
+
+module AP_MODULE_DECLARE_DATA heartbeat_module;
+
+typedef struct hb_ctx_t
+{
+ int active;
+ apr_sockaddr_t *mcast_addr;
+ int server_limit;
+ int thread_limit;
+ int status;
+ int keep_running;
+ apr_proc_mutex_t *mutex;
+ const char *mutex_path;
+ apr_thread_mutex_t *start_mtx;
+ apr_thread_t *thread;
+ apr_file_t *lockf;
+} hb_ctx_t;
+
+static const char *msg_format = "v=%u&ready=%u&busy=%u";
+
+#define MSG_VERSION (1)
+
+static int hb_monitor(hb_ctx_t *ctx, apr_pool_t *p)
+{
+ int i, j;
+ apr_uint32_t ready = 0;
+ apr_uint32_t busy = 0;
+
+ for (i = 0; i < ctx->server_limit; i++) {
+ process_score *ps;
+ ps = ap_get_scoreboard_process(i);
+
+ for (j = 0; j < ctx->thread_limit; j++) {
+ worker_score *ws = NULL;
+
+ ws = &ap_scoreboard_image->servers[i][j];
+
+ int res = ws->status;
+
+ if (res == SERVER_READY && ps->generation == ap_my_generation) {
+ ready++;
+ }
+ else if (res != SERVER_DEAD &&
+ res != SERVER_STARTING && res != SERVER_IDLE_KILL) {
+ busy++;
+ }
+ }
+ }
+
+ char buf[256];
+ apr_size_t len =
+ apr_snprintf(buf, sizeof(buf), msg_format, MSG_VERSION, ready, busy);
+
+ apr_socket_t *sock = NULL;
+ do {
+ apr_status_t rv;
+ rv = apr_socket_create(&sock, ctx->mcast_addr->family,
+ SOCK_DGRAM, APR_PROTO_UDP, p);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+ NULL, "Heartbeat: apr_socket_create failed");
+ break;
+ }
+
+ rv = apr_mcast_loopback(sock, 1);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+ NULL, "Heartbeat: apr_mcast_loopback failed");
+ break;
+ }
+
+ rv = apr_socket_sendto(sock, ctx->mcast_addr, 0, buf, &len);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_WARNING, rv,
+ NULL, "Heartbeat: apr_socket_sendto failed");
+ break;
+ }
+ } while (0);
+
+ if (sock) {
+ apr_socket_close(sock);
+ }
+
+ return OK;
+}
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+static void *hb_worker(apr_thread_t *thd, void *data)
+{
+ hb_ctx_t *ctx = (hb_ctx_t *) data;
+ apr_status_t rv;
+
+ apr_pool_t *pool = apr_thread_pool_get(thd);
+ apr_pool_tag(pool, "heartbeat_worker");
+ ctx->status = 0;
+ ctx->keep_running = 1;
+ apr_thread_mutex_unlock(ctx->start_mtx);
+
+ while (ctx->keep_running) {
+ rv = apr_proc_mutex_trylock(ctx->mutex);
+ if (rv == APR_SUCCESS) {
+ break;
+ }
+ apr_sleep(apr_time_from_msec(200));
+ }
+
+ while (ctx->keep_running) {
+ int mpm_state = 0;
+ rv = ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state);
+
+ if (rv != APR_SUCCESS) {
+ break;
+ }
+
+ if (mpm_state == AP_MPMQ_STOPPING) {
+ ctx->keep_running = 0;
+ break;
+ }
+
+ apr_pool_t *tpool;
+ apr_pool_create(&tpool, pool);
+ apr_pool_tag(tpool, "heartbeat_worker_temp");
+ hb_monitor(ctx, tpool);
+ apr_pool_destroy(tpool);
+ apr_sleep(apr_time_from_sec(HEARTBEAT_INTERVAL));
+ }
+
+ apr_proc_mutex_unlock(ctx->mutex);
+ apr_thread_exit(ctx->thread, APR_SUCCESS);
+
+ return NULL;
+}
+
+static apr_status_t hb_pool_cleanup(void *baton)
+{
+ apr_status_t rv;
+ hb_ctx_t *ctx = (hb_ctx_t *) baton;
+
+ ctx->keep_running = 0;
+
+ apr_thread_join(&rv, ctx->thread);
+
+ return rv;
+}
+
+static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx)
+{
+ apr_status_t rv;
+
+ rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
+ p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartbeat: apr_thread_cond_create failed");
+ ctx->status = rv;
+ return;
+ }
+
+ apr_thread_mutex_lock(ctx->start_mtx);
+
+ apr_pool_cleanup_register(p, ctx, hb_pool_cleanup, apr_pool_cleanup_null);
+
+ rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p);
+ if (rv) {
+ apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup);
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartbeat: apr_thread_create failed");
+ ctx->status = rv;
+ }
+
+ apr_thread_mutex_lock(ctx->start_mtx);
+ apr_thread_mutex_unlock(ctx->start_mtx);
+ apr_thread_mutex_destroy(ctx->start_mtx);
+}
+
+static void hb_child_init(apr_pool_t *p, server_rec *s)
+{
+ hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
+
+ apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
+
+ ctx->status = -1;
+
+ if (ctx->active) {
+ start_hb_worker(p, ctx);
+ if (ctx->status != 0) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
+ "Heartbeat: Failed to start worker thread.");
+ return;
+ }
+ }
+
+ return;
+}
+
+static int hb_init(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp,
+ server_rec *s)
+{
+ apr_status_t rv;
+ hb_ctx_t *ctx = ap_get_module_config(s->module_config, &heartbeat_module);
+
+ ap_mpm_query(AP_MPMQ_HARD_LIMIT_THREADS, &ctx->thread_limit);
+ ap_mpm_query(AP_MPMQ_HARD_LIMIT_DAEMONS, &ctx->server_limit);
+
+ rv = apr_proc_mutex_create(&ctx->mutex, ctx->mutex_path,
+#if APR_HAS_FCNTL_SERIALIZE
+ APR_LOCK_FCNTL,
+#else
+#if APR_HAS_FLOCK_SERIALIZE
+ APR_LOCK_FLOCK,
+#else
+#error port me to a non crap platform.
+#endif
+#endif
+ p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
+ "Heartbeat: mutex failed creation at %s (type=%s)",
+ ctx->mutex_path, apr_proc_mutex_defname());
+ return !OK;
+ }
+
+ return OK;
+}
+
+static void hb_register_hooks(apr_pool_t *p)
+{
+ ap_hook_post_config(hb_init, NULL, NULL, APR_HOOK_MIDDLE);
+ ap_hook_child_init(hb_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+static void *hb_create_config(apr_pool_t *p, server_rec *s)
+{
+ hb_ctx_t *cfg = (hb_ctx_t *) apr_palloc(p, sizeof(hb_ctx_t));
+
+ cfg->active = 0;
+ cfg->thread_limit = 0;
+ cfg->server_limit = 0;
+
+ return cfg;
+}
+
+static const char *cmd_hb_address(cmd_parms *cmd,
+ void *dconf, const char *addr)
+{
+ apr_status_t rv;
+ char *host_str;
+ char *scope_id;
+ apr_port_t port = 0;
+ apr_pool_t *p = cmd->pool;
+ hb_ctx_t *ctx =
+ (hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
+ &heartbeat_module);
+ const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+ if (err != NULL) {
+ return err;
+ }
+
+ ctx->active = 1;
+
+ rv = apr_parse_addr_port(&host_str, &scope_id, &port, addr, p);
+
+ if (rv) {
+ return "HeartbeatAddress: Unable to parse address.";
+ }
+
+ if (host_str == NULL) {
+ return "HeartbeatAddress: No host provided in address";
+ }
+
+ if (port == 0) {
+ return "HeartbeatAddress: No port provided in address";
+ }
+
+ rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
+ p);
+
+ if (rv) {
+ return "HeartbeatAddress: apr_sockaddr_info_get failed.";
+ }
+
+ const char *tmpdir = NULL;
+ rv = apr_temp_dir_get(&tmpdir, p);
+ if (rv) {
+ return "HeartbeatAddress: unable to find temp directory.";
+ }
+
+ char *path = apr_pstrcat(p, tmpdir, "/hb-tmp.XXXXXX", NULL);
+
+ rv = apr_file_mktemp(&ctx->lockf, path, 0, p);
+
+ if (rv) {
+ return "HeartbeatAddress: unable to allocate temp file.";
+ }
+
+ rv = apr_file_name_get(&ctx->mutex_path, ctx->lockf);
+
+ if (rv) {
+ return "HeartbeatAddress: unable to get lockf name.";
+ }
+
+ apr_file_close(ctx->lockf);
+
+ return NULL;
+}
+
+static const command_rec hb_cmds[] = {
+ AP_INIT_TAKE1("HeartbeatAddress", cmd_hb_address, NULL, RSRC_CONF,
+ "Address to send heartbeat requests"),
+ {NULL}
+};
+
+module AP_MODULE_DECLARE_DATA heartbeat_module = {
+ STANDARD20_MODULE_STUFF,
+ NULL, /* create per-directory config structure */
+ NULL, /* merge per-directory config structures */
+ hb_create_config, /* create per-server config structure */
+ NULL, /* merge per-server config structures */
+ hb_cmds, /* command apr_table_t */
+ hb_register_hooks
+};
View
551 modules/cluster/mod_heartmonitor.c
@@ -0,0 +1,551 @@
+/* Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "apr_strings.h"
+#include "apr_hash.h"
+#include "ap_mpm.h"
+#include "scoreboard.h"
+
+module AP_MODULE_DECLARE_DATA heartmonitor_module;
+
+typedef struct hm_server_t
+{
+ const char *ip;
+ int busy;
+ int ready;
+ apr_time_t seen;
+} hm_server_t;
+
+typedef struct hm_ctx_t
+{
+ int active;
+ const char *storage_path;
+ apr_proc_mutex_t *mutex;
+ const char *mutex_path;
+ apr_sockaddr_t *mcast_addr;
+ int status;
+ int keep_running;
+ apr_thread_mutex_t *start_mtx;
+ apr_thread_t *thread;
+ apr_socket_t *sock;
+ apr_pool_t *p;
+ apr_hash_t *servers;
+} hm_ctx_t;
+
+static apr_status_t hm_listen(hm_ctx_t *ctx)
+{
+ apr_status_t rv;
+
+ rv = apr_socket_create(&ctx->sock, ctx->mcast_addr->family,
+ SOCK_DGRAM, APR_PROTO_UDP, ctx->p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Failed to create listening socket.");
+ return rv;
+ }
+
+ rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
+ return rv;
+ }
+
+
+ rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Failed to set APR_SO_REUSEADDR to 1 on socket.");
+ return rv;
+ }
+
+ rv = apr_socket_bind(ctx->sock, ctx->mcast_addr);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Failed to bind on socket.");
+ return rv;
+ }
+
+ rv = apr_mcast_join(ctx->sock, ctx->mcast_addr, NULL, NULL);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Failed to join multicast group");
+ return rv;
+ }
+
+ rv = apr_mcast_loopback(ctx->sock, 1);
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Failed to accept localhost mulitcast on socket.");
+ return rv;
+ }
+
+ ctx->servers = apr_hash_make(ctx->p);
+
+ return APR_SUCCESS;
+}
+
+static void qs_to_table(const char *input, apr_table_t *parms,
+ apr_pool_t *p)
+{
+ char *key;
+ char *value;
+ char *query_string;
+ char *strtok_state;
+
+ if (input == NULL) {
+ return;
+ }
+
+ query_string = apr_pstrdup(p, input);
+
+ key = apr_strtok(query_string, "&", &strtok_state);
+ while (key) {
+ value = strchr(key, '=');
+ if (value) {
+ *value = '\0'; /* Split the string in two */
+ value++; /* Skip passed the = */
+ }
+ else {
+ value = "1";
+ }
+ ap_unescape_url(key);
+ ap_unescape_url(value);
+ apr_table_set(parms, key, value);
+ /*
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "Found query arg: %s = %s", key, value);
+ */
+ key = apr_strtok(NULL, "&", &strtok_state);
+ }
+}
+
+
+#define SEEN_TIMEOUT (30)
+
+static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
+{
+ apr_status_t rv;
+ apr_file_t *fp;
+ char *path = apr_pstrcat(p, ctx->storage_path, ".tmp.XXXXXX", NULL);
+ /* TODO: Update stats file (!) */
+ rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Unable to open tmp file: %s", path);
+ return rv;
+ }
+
+ apr_hash_index_t *hi;
+ apr_time_t now = apr_time_now();
+ for (hi = apr_hash_first(p, ctx->servers);
+ hi != NULL; hi = apr_hash_next(hi)) {
+ hm_server_t *s = NULL;
+ apr_hash_this(hi, NULL, NULL, (void **) &s);
+ apr_uint32_t seen = apr_time_sec(now - s->seen);
+ if (seen > SEEN_TIMEOUT) {
+ /*
+ * Skip this entry from the heartbeat file -- when it comes back,
+ * we will reuse the memory...
+ */
+ }
+ else {
+ apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u\n",
+ s->ip, s->ready, s->busy, seen);
+ }
+ }
+
+ apr_file_close(fp);
+
+ rv = apr_file_perms_set(path,
+ APR_FPROT_UREAD | APR_FPROT_GREAD |
+ APR_FPROT_WREAD);
+ if (rv && rv != APR_INCOMPLETE) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Unable to set file permssions on %s",
+ path);
+ return rv;
+ }
+
+ rv = apr_file_rename(path, ctx->storage_path, p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Unable to move file: %s -> %s", path,
+ ctx->storage_path);
+ return rv;
+ }
+
+ return APR_SUCCESS;
+}
+
+static hm_server_t *hm_get_server(hm_ctx_t *ctx, const char *ip)
+{
+ hm_server_t *s;
+
+ s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
+
+ if (s == NULL) {
+ s = apr_palloc(ctx->p, sizeof(hm_server_t));
+ s->ip = apr_pstrdup(ctx->p, ip);
+ s->ready = 0;
+ s->busy = 0;
+ s->seen = 0;
+ apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
+ }
+
+ return s;
+}
+
+#define MAX_MSG_LEN (1000)
+static apr_status_t hm_recv(hm_ctx_t *ctx, apr_pool_t *p)
+{
+ char buf[MAX_MSG_LEN + 1];
+ apr_sockaddr_t from;
+ from.pool = p;
+ apr_size_t len = MAX_MSG_LEN;
+ apr_status_t rv;
+
+ rv = apr_socket_recvfrom(&from, ctx->sock, 0, buf, &len);
+
+ if (APR_STATUS_IS_EAGAIN(rv)) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: would block");
+ return APR_SUCCESS;
+ }
+ else if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: recvfrom failed");
+ return rv;
+ }
+
+ buf[len] = '\0';
+
+ apr_table_t *tbl;
+
+ tbl = apr_table_make(p, 10);
+
+ qs_to_table(buf, tbl, p);
+
+ if (apr_table_get(tbl, "v") != NULL &&
+ apr_table_get(tbl, "busy") != NULL &&
+ apr_table_get(tbl, "ready") != NULL) {
+ char *ip;
+ /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
+ ap_log_error(APLOG_MARK, APLOG_DEBUG, rv, NULL,
+ "Heartmonitor: %pI busy=%s ready=%s", &from,
+ apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
+
+ apr_sockaddr_ip_get(&ip, &from);
+
+ hm_server_t *s = hm_get_server(ctx, ip);
+
+ s->busy = atoi(apr_table_get(tbl, "busy"));
+ s->ready = atoi(apr_table_get(tbl, "ready"));
+ s->seen = apr_time_now();
+ }
+ else {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: malformed multicast message from %pI",
+ &from);
+ }
+
+ return rv;
+}
+
+
+#ifndef apr_time_from_msec
+#define apr_time_from_msec(x) (x * 1000)
+#endif
+
+static void *hm_worker(apr_thread_t *thd, void *data)
+{
+ hm_ctx_t *ctx = (hm_ctx_t *) data;
+ apr_status_t rv;
+
+ ctx->p = apr_thread_pool_get(thd);
+ ctx->status = 0;
+ ctx->keep_running = 1;
+ apr_thread_mutex_unlock(ctx->start_mtx);
+
+ while (ctx->keep_running) {
+ rv = apr_proc_mutex_trylock(ctx->mutex);
+ if (rv == APR_SUCCESS) {
+ break;
+ }
+ apr_sleep(apr_time_from_msec(200));
+ }
+
+ rv = hm_listen(ctx);
+
+ if (rv) {
+ ctx->status = rv;
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: Unable to listen for connections!");
+ apr_proc_mutex_unlock(ctx->mutex);
+ apr_thread_exit(ctx->thread, rv);
+ return NULL;
+ }
+
+
+ apr_time_t last = apr_time_now();
+ while (ctx->keep_running) {
+ int n;
+ apr_pool_t *p;
+ apr_pollfd_t pfd;
+ apr_interval_time_t timeout;
+ apr_pool_create(&p, ctx->p);
+
+ apr_time_t now = apr_time_now();
+
+ if (apr_time_sec((now - last)) > 5) {
+ hm_update_stats(ctx, p);
+ apr_pool_clear(p);
+ last = now;
+ }
+
+ pfd.desc_type = APR_POLL_SOCKET;
+ pfd.desc.s = ctx->sock;
+ pfd.p = p;
+ pfd.reqevents = APR_POLLIN;
+
+ timeout = apr_time_from_sec(1);
+
+ rv = apr_poll(&pfd, 1, &n, timeout);
+
+ if (!ctx->keep_running) {
+ break;
+ }
+
+ if (rv) {
+ apr_pool_destroy(p);
+ continue;
+ }
+
+ if (pfd.rtnevents & APR_POLLIN) {
+ hm_recv(ctx, p);
+ }
+
+ apr_pool_destroy(p);
+ }
+
+ apr_proc_mutex_unlock(ctx->mutex);
+ apr_thread_exit(ctx->thread, APR_SUCCESS);
+
+ return NULL;
+}
+
+static apr_status_t hm_pool_cleanup(void *baton)
+{
+ apr_status_t rv;
+ hm_ctx_t *ctx = (hm_ctx_t *) baton;
+
+ ctx->keep_running = 0;
+
+ apr_thread_join(&rv, ctx->thread);
+
+ return rv;
+}
+
+static void start_hm_worker(apr_pool_t *p, hm_ctx_t *ctx)
+{
+ apr_status_t rv;
+
+ rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED,
+ p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: apr_thread_cond_create failed");
+ ctx->status = rv;
+ return;
+ }
+
+ apr_thread_mutex_lock(ctx->start_mtx);
+
+ apr_pool_cleanup_register(p, ctx, hm_pool_cleanup, apr_pool_cleanup_null);
+
+ rv = apr_thread_create(&ctx->thread, NULL, hm_worker, ctx, p);
+ if (rv) {
+ apr_pool_cleanup_kill(p, ctx, hm_pool_cleanup);
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL,
+ "Heartmonitor: apr_thread_create failed");
+ ctx->status = rv;
+ }
+
+ apr_thread_mutex_lock(ctx->start_mtx);
+ apr_thread_mutex_unlock(ctx->start_mtx);
+ apr_thread_mutex_destroy(ctx->start_mtx);
+}
+
+static void hm_child_init(apr_pool_t *p, server_rec *s)
+{
+ hm_ctx_t *ctx =
+ ap_get_module_config(s->module_config, &heartmonitor_module);
+
+ apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p);
+
+ ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s,
+ "Heartmonitor: Starting Listener Thread. mcast=%pI",
+ ctx->mcast_addr);
+
+ ctx->status = -1;
+
+ start_hm_worker(p, ctx);
+
+ if (ctx->status != 0) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s,
+ "Heartmonitor: Failed to start listener thread.");
+ return;
+ }
+
+ return;
+}
+
+static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
+ apr_pool_t *ptemp, server_rec *s)
+{
+ hm_ctx_t *ctx = ap_get_module_config(s->module_config,
+ &heartmonitor_module);
+
+ apr_status_t rv = apr_proc_mutex_create(&ctx->mutex,
+ ctx->mutex_path,
+#if APR_HAS_FCNTL_SERIALIZE
+
+ APR_LOCK_FCNTL,
+#else
+#if APR_HAS_FLOCK_SERIALIZE
+ APR_LOCK_FLOCK,
+#else
+#error port me to a non crap platform.
+#endif
+#endif
+ p);
+
+ if (rv) {
+ ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
+ "Heartmonitor: Failed to create listener "
+ "mutex at %s (type=%s)", ctx->mutex_path,
+ apr_proc_mutex_defname());
+ return !OK;
+ }
+
+ return OK;
+}
+
+static void hm_register_hooks(apr_pool_t *p)
+{
+ ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
+ ap_hook_child_init(hm_child_init, NULL, NULL, APR_HOOK_MIDDLE);
+}
+
+static void *hm_create_config(apr_pool_t *p, server_rec *s)
+{
+ hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
+
+ ctx->active = 0;
+ ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat");
+
+ return ctx;
+}
+
+static const char *cmd_hm_storage(cmd_parms *cmd,
+ void *dconf, const char *path)
+{
+ apr_pool_t *p = cmd->pool;
+ hm_ctx_t *ctx =
+ (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
+ &heartmonitor_module);
+ const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+ if (err != NULL) {
+ return err;
+ }
+
+ ctx->storage_path = ap_server_root_relative(p, path);
+ ctx->mutex_path =
+ ap_server_root_relative(p, apr_pstrcat(p, path, ".hm-lock", NULL));
+
+ return NULL;
+}
+
+static const char *cmd_hm_listen(cmd_parms *cmd,
+ void *dconf, const char *mcast_addr)
+{
+ apr_status_t rv;
+ char *host_str;
+ char *scope_id;
+ apr_port_t port = 0;
+ apr_pool_t *p = cmd->pool;
+ hm_ctx_t *ctx =
+ (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
+ &heartmonitor_module);
+ const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
+
+ if (err != NULL) {
+ return err;
+ }
+
+ ctx->active = 1;
+
+ rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, p);
+
+ if (rv) {
+ return "HeartbeatListen: Unable to parse multicast address.";
+ }
+
+ if (host_str == NULL) {
+ return "HeartbeatListen: No host provided in multicast address";
+ }
+
+ if (port == 0) {
+ return "HeartbeatListen: No port provided in multicast address";
+ }
+
+ rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
+ p);
+
+ if (rv) {
+ return
+ "HeartbeatListen: apr_sockaddr_info_get failed on multicast address";
+ }
+
+ return NULL;
+}
+
+static const command_rec hm_cmds[] = {
+ AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF,
+ "Address to listen for heartbeat requests"),
+ AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF,
+ "Path to store heartbeat data."),
+ {NULL}
+};
+
+module AP_MODULE_DECLARE_DATA heartmonitor_module = {
+ STANDARD20_MODULE_STUFF,
+ NULL, /* create per-directory config structure */
+ NULL, /* merge per-directory config structures */
+ hm_create_config, /* create per-server config structure */
+ NULL, /* merge per-server config structures */
+ hm_cmds, /* command apr_table_t */
+ hm_register_hooks
+};

0 comments on commit 86f171a

Please sign in to comment.