Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge remote branch 'upstream/master'

Conflicts:
	buildconf/default.ini
  • Loading branch information...
commit df435412a0fa0b4efb6248b60ac83d97e8a2fdba 2 parents c148b9c + eb0e9e7
@git001 authored
Showing with 4,133 additions and 2,238 deletions.
  1. +397 −0 apache2/mod_proxy_uwsgi.c
  2. +16 −1 apache2/mod_uwsgi.c
  3. +17 −10 async.c
  4. +45 −0 buildconf/base.ini
  5. +3 −0  buildconf/cgi.ini
  6. +2 −1  buildconf/core.ini
  7. +2 −49 buildconf/default.ini
  8. +2 −1  buildconf/django.ini
  9. +2 −1  buildconf/embedded.ini
  10. +2 −2 buildconf/erlang.ini
  11. +2 −2 buildconf/gevent.ini
  12. +1 −1  buildconf/lib.ini
  13. +2 −2 buildconf/lua.ini
  14. +2 −1  buildconf/luap.ini
  15. +1 −1  buildconf/modular.ini
  16. +2 −2 buildconf/package.ini
  17. +3 −0  buildconf/php.ini
  18. +2 −2 buildconf/psgi.ini
  19. +2 −2 buildconf/pyerl.ini
  20. +2 −2 buildconf/pylua.ini
  21. +2 −4 buildconf/pypy.ini
  22. +2 −2 buildconf/pyuwsgi.ini
  23. +2 −2 buildconf/rack.ini
  24. +0 −4 buildconf/upcode.ini
  25. +100 −0 contrib/uwsgisubscribers.ru
  26. +48 −37 emperor.c
  27. +51 −0 event.c
  28. +3 −2 gateway.c
  29. +19 −0 install.sh
  30. +26 −6 lock.c
  31. +21 −10 master.c
  32. +112 −77 master_utils.c
  33. +857 −0 plugins/corerouter/corerouter.c
  34. +188 −0 plugins/corerouter/cr.h
  35. +50 −38 lib/corerouter.h → plugins/corerouter/cr_common.c
  36. +198 −0 plugins/corerouter/cr_map.c
  37. 0  plugins/{fastrouter/fr_sctp.c → corerouter/cr_sctp.c}
  38. +7 −0 plugins/corerouter/uwsgiplugin.py
  39. +39 −776 plugins/fastrouter/fastrouter.c
  40. +8 −152 plugins/fastrouter/fr.h
  41. +180 −127 plugins/fastrouter/fr_events.c
  42. +0 −190 plugins/fastrouter/fr_map.c
  43. +1 −1  plugins/fastrouter/uwsgiplugin.py
  44. +0 −5 plugins/graylog2/graylog2_plugin.c
  45. +487 −594 plugins/http/http.c
  46. +1 −1  plugins/php/uwsgiplugin.py
  47. +2 −0  plugins/psgi/psgi.h
  48. +19 −2 plugins/psgi/psgi_loader.c
  49. +34 −0 plugins/psgi/psgi_plugin.c
  50. +15 −6 plugins/psgi/psgi_response.c
  51. +4 −0 plugins/psgi/uwsgiplugin.py
  52. +137 −14 plugins/python/python_plugin.c
  53. +3 −1 plugins/python/uwsgi_pymodule.c
  54. +5 −1 plugins/python/uwsgi_python.h
  55. +6 −1 plugins/python/web3_subhandler.c
  56. +9 −4 plugins/python/wsgi_handlers.c
  57. +123 −45 plugins/python/wsgi_headers.c
  58. +26 −0 plugins/python/wsgi_subhandler.c
  59. +6 −2 plugins/rack/rack_plugin.c
  60. +162 −0 plugins/redislog/redislog_plugin.c
  61. +6 −0 plugins/redislog/uwsgiplugin.py
  62. +0 −5 plugins/rsyslog/rsyslog_plugin.c
  63. +0 −5 plugins/syslog/syslog_plugin.c
  64. +2 −2 plugins/zergpool/zergpool.c
  65. +32 −15 protocol.c
  66. +34 −0 socket.c
  67. +214 −0 stats.c
  68. +106 −0 utils.c
  69. +192 −19 uwsgi.c
  70. +63 −3 uwsgi.h
  71. +22 −4 uwsgiconfig.py
  72. +2 −1  welcome.py
View
397 apache2/mod_proxy_uwsgi.c
@@ -0,0 +1,397 @@
+#define APR_WANT_MEMFUNC
+#define APR_WANT_STRFUNC
+#include "apr_strings.h"
+#include "apr_hooks.h"
+#include "apr_optional_hooks.h"
+#include "apr_buckets.h"
+
+#include "httpd.h"
+#include "http_config.h"
+#include "http_log.h"
+#include "http_protocol.h"
+#include "http_request.h"
+#include "util_script.h"
+
+#include "mod_proxy.h"
+
+
+#define UWSGI_SCHEME "uwsgi"
+#define UWSGI_DEFAULT_PORT 3031
+
+module AP_MODULE_DECLARE_DATA proxy_uwsgi_module;
+
+
+static int uwsgi_canon(request_rec *r, char *url)
+{
+ char *host, sport[sizeof(":65535")];
+ const char *err, *path;
+ apr_port_t port = UWSGI_DEFAULT_PORT;
+
+ if (strncasecmp(url, UWSGI_SCHEME "://", sizeof(UWSGI_SCHEME) + 2)) {
+ return DECLINED;
+ }
+ url += sizeof(UWSGI_SCHEME); /* Keep slashes */
+
+ err = ap_proxy_canon_netloc(r->pool, &url, NULL, NULL, &host, &port);
+ if (err) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "error parsing URL %s: %s", url, err);
+ return HTTP_BAD_REQUEST;
+ }
+
+ apr_snprintf(sport, sizeof(sport), ":%u", port);
+
+ if (ap_strchr(host, ':')) { /* if literal IPv6 address */
+ host = apr_pstrcat(r->pool, "[", host, "]", NULL);
+ }
+
+ path = ap_proxy_canonenc(r->pool, url, strlen(url), enc_path, 0,
+ r->proxyreq);
+ if (!path) {
+ return HTTP_BAD_REQUEST;
+ }
+
+ r->filename = apr_pstrcat(r->pool, "proxy:" UWSGI_SCHEME "://", host, sport, "/",
+ path, NULL);
+
+ return OK;
+}
+
+
+static int uwsgi_send(proxy_conn_rec *conn, const char *buf, apr_size_t length,
+ request_rec *r)
+{
+ apr_status_t rv;
+ apr_size_t written;
+
+ while (length > 0) {
+ written = length;
+ if ((rv = apr_socket_send(conn->sock, buf, &written)) != APR_SUCCESS) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r,
+ "sending data to %s:%u failed",
+ conn->hostname, conn->port);
+ return HTTP_SERVICE_UNAVAILABLE;
+ }
+
+ /* count for stats */
+ conn->worker->s->transferred += written;
+ buf += written;
+ length -= written;
+ }
+
+ return OK;
+}
+
+
+/*
+ * Send uwsgi header block
+ */
+static int uwsgi_send_headers(request_rec *r, proxy_conn_rec *conn)
+{
+ char *buf, *ptr;
+
+ const apr_array_header_t *env_table;
+ const apr_table_entry_t *env;
+
+ int j;
+
+ apr_size_t headerlen = 4;
+ uint16_t pktsize, keylen, vallen;
+
+ ap_add_common_vars(r);
+ ap_add_cgi_vars(r);
+
+ const char *script_name = apr_table_get(r->subprocess_env, "SCRIPT_NAME");
+ const char *path_info = apr_table_get(r->subprocess_env, "PATH_INFO");
+ if (script_name && path_info) {
+ apr_table_set(r->subprocess_env, "SCRIPT_NAME", apr_pstrndup(r->pool, script_name, strlen(script_name)-strlen(path_info)));
+ }
+
+ env_table = apr_table_elts(r->subprocess_env);
+ env = (apr_table_entry_t *)env_table->elts;
+
+ for (j = 0; j < env_table->nelts; ++j) {
+ headerlen += 2 + strlen(env[j].key) + 2 + strlen(env[j].val) ;
+ }
+
+ ptr = buf = apr_palloc(r->pool, headerlen);
+
+ ptr+=4;
+
+ for (j = 0; j < env_table->nelts; ++j) {
+ keylen = strlen(env[j].key);
+ *ptr++= (uint8_t) (keylen & 0xff);
+ *ptr++= (uint8_t) ((keylen >> 8) & 0xff);
+ memcpy(ptr, env[j].key, keylen) ; ptr+=keylen;
+
+ vallen = strlen(env[j].val);
+ *ptr++= (uint8_t) (vallen & 0xff);
+ *ptr++= (uint8_t) ((vallen >> 8) & 0xff);
+ memcpy(ptr, env[j].val, vallen) ; ptr+=vallen;
+ }
+
+ pktsize = headerlen-4;
+
+ buf[0] = 0;
+ buf[1] = (uint8_t) (pktsize & 0xff);
+ buf[2] = (uint8_t) ((pktsize >> 8) & 0xff);
+ buf[0] = 0;
+
+ return uwsgi_send(conn, buf, headerlen, r);
+}
+
+
+static int uwsgi_send_body(request_rec *r, proxy_conn_rec *conn)
+{
+ if (ap_should_client_block(r)) {
+ char *buf = apr_palloc(r->pool, AP_IOBUFSIZE);
+ int status;
+ apr_size_t readlen;
+
+ readlen = ap_get_client_block(r, buf, AP_IOBUFSIZE);
+ while (readlen > 0) {
+ status = uwsgi_send(conn, buf, readlen, r);
+ if (status != OK) {
+ return HTTP_SERVICE_UNAVAILABLE;
+ }
+ readlen = ap_get_client_block(r, buf, AP_IOBUFSIZE);
+ }
+ if (readlen == -1) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "receiving request body failed");
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+ }
+
+ return OK;
+}
+
+static
+apr_status_t ap_proxygetline(apr_bucket_brigade *bb, char *s, int n, request_rec *r,
+ int fold, int *writen)
+{
+ char *tmp_s = s;
+ apr_status_t rv;
+ apr_size_t len;
+
+ rv = ap_rgetline(&tmp_s, n, &len, r, fold, bb);
+ apr_brigade_cleanup(bb);
+
+ if (rv == APR_SUCCESS) {
+ *writen = (int) len;
+ } else if (rv == APR_ENOSPC) {
+ *writen = n;
+ } else {
+ *writen = -1;
+ }
+
+ return rv;
+}
+
+static int uwsgi_response(request_rec *r, proxy_conn_rec *backend, proxy_server_conf *conf)
+{
+
+ char buffer[HUGE_STRING_LEN];
+ const char *buf;
+ char *value, *end;
+ int len;
+ apr_status_t rc;
+ conn_rec *c = r->connection;
+ apr_off_t readbytes;
+ apr_status_t rv;
+ apr_bucket *e;
+ apr_read_type_e mode = APR_NONBLOCK_READ;
+
+ request_rec *rp = ap_proxy_make_fake_req(backend->connection, r);
+ rp->proxyreq = PROXYREQ_RESPONSE;
+
+ apr_bucket_brigade *bb = apr_brigade_create(r->pool, c->bucket_alloc);
+
+
+ rc = ap_proxygetline(bb, buffer, sizeof(buffer), rp, 0, &len);
+ if (len == 0) {
+ rc = ap_proxygetline(bb, buffer, sizeof(buffer), rp, 0, &len);
+ }
+
+ if (len <= 0) {
+ // oops
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ backend->worker->s->read += len;
+
+ if (!apr_date_checkmask(buffer, "HTTP/#.# ###*")) {
+ // oops
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ int major, minor;
+
+ major = buffer[5] - '0';
+ minor = buffer[7] - '0';
+
+ /* If not an HTTP/1 message or
+ * if the status line was > 8192 bytes
+ */
+ if ((major != 1) || (len >= sizeof(buffer)-1)) {
+ return ap_proxyerror(r, HTTP_BAD_GATEWAY,
+ apr_pstrcat(r->pool, "Corrupt status line returned by remote "
+ "server: ", buffer, NULL));
+ }
+
+ char keepchar = buffer[12];
+ buffer[12] = '\0';
+ r->status = atoi(&buffer[9]);
+
+ if (keepchar != '\0') {
+ buffer[12] = keepchar;
+ } else {
+ /* 2616 requires the space in Status-Line; the origin
+ * server may have sent one but ap_rgetline_core will
+ * have stripped it. */
+ buffer[12] = ' ';
+ buffer[13] = '\0';
+ }
+ r->status_line = apr_pstrdup(r->pool, &buffer[9]);
+
+ // start parsing headers;
+ while ((len = ap_getline(buffer, sizeof(buffer), rp, 1)) > 0) {
+ value = strchr(buffer, ':');
+ // invalid header skip
+ if (!value) continue;
+ *value = '\0';
+ ++value;
+ while (apr_isspace(*value)) ++value;
+ for (end = &value[strlen(value)-1]; end > value && apr_isspace(*end); --end) *end = '\0';
+ apr_table_add(r->headers_out, buffer, value);
+ }
+
+
+ if ((buf = apr_table_get(r->headers_out, "Content-Type"))) {
+ ap_set_content_type(r, apr_pstrdup(r->pool, buf));
+ }
+
+ for(;;) {
+ rv = ap_get_brigade(rp->input_filters, bb,
+ AP_MODE_READBYTES, mode,
+ conf->io_buffer_size);
+ if (APR_STATUS_IS_EAGAIN(rv)
+ || (rv == APR_SUCCESS && APR_BRIGADE_EMPTY(bb))) {
+ e = apr_bucket_flush_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, e);
+ if (ap_pass_brigade(r->output_filters, bb) || c->aborted) {
+ break;
+ }
+ apr_brigade_cleanup(bb);
+ mode = APR_BLOCK_READ;
+ continue;
+ }
+ else if (rv == APR_EOF) {
+ break;
+ }
+ else if (rv != APR_SUCCESS) {
+ ap_proxy_backend_broke(r, bb);
+ ap_pass_brigade(r->output_filters, bb);
+ }
+
+ mode = APR_NONBLOCK_READ;
+ apr_brigade_length(bb, 0, &readbytes);
+ backend->worker->s->read += readbytes;
+ ap_pass_brigade(r->output_filters, bb);
+ apr_brigade_cleanup(bb);
+ }
+ e = apr_bucket_eos_create(c->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, e);
+ ap_pass_brigade(r->output_filters, bb);
+
+ apr_brigade_cleanup(bb);
+
+ return OK;
+}
+
+static int uwsgi_handler(request_rec *r, proxy_worker *worker,
+ proxy_server_conf *conf, char *url,
+ const char *proxyname, apr_port_t proxyport)
+{
+ int status;
+ proxy_conn_rec *backend = NULL;
+ apr_pool_t *p = r->pool;
+ apr_uri_t *uri = apr_palloc(r->pool, sizeof(*uri));
+ char server_portstr[32];
+
+ if (strncasecmp(url, UWSGI_SCHEME "://", sizeof(UWSGI_SCHEME) + 2)) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "declining URL %s", url);
+ return DECLINED;
+ }
+
+ // ADD PATH_INFO
+ apr_table_add(r->subprocess_env, "PATH_INFO", url+strlen(worker->name));
+
+
+ /* Create space for state information */
+ status = ap_proxy_acquire_connection(UWSGI_SCHEME, &backend, worker,
+ r->server);
+ if (status != OK) {
+ goto cleanup;
+ }
+ backend->is_ssl = 0;
+
+ /* Step One: Determine Who To Connect To */
+ status = ap_proxy_determine_connection(p, r, conf, worker, backend,
+ uri, &url, proxyname, proxyport,
+ server_portstr, sizeof(server_portstr));
+ if (status != OK) {
+ goto cleanup;
+ }
+
+
+ /* Step Two: Make the Connection */
+ if (ap_proxy_connect_backend(UWSGI_SCHEME, backend, worker, r->server)) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "failed to make connection to backend: %s:%u",
+ backend->hostname, backend->port);
+ status = HTTP_SERVICE_UNAVAILABLE;
+ goto cleanup;
+ }
+
+ /* Step Three: Create conn_rec */
+ if (!backend->connection) {
+ if ((status = ap_proxy_connection_create(UWSGI_SCHEME, backend,
+ r->connection, r->server)) != OK)
+ goto cleanup;
+ }
+
+ /* Step Four: Process the Request */
+ if ( ((status = ap_setup_client_block(r, REQUEST_CHUNKED_ERROR)) != OK)
+ || ((status = uwsgi_send_headers(r, backend)) != OK)
+ || ((status = uwsgi_send_body(r, backend)) != OK)
+ || ((status = uwsgi_response(r, backend, conf)) != OK)) {
+ goto cleanup;
+ }
+
+cleanup:
+ if (backend) {
+ backend->close = 1; /* always close the socket */
+ ap_proxy_release_connection(UWSGI_SCHEME, backend, r->server);
+ }
+ return status;
+}
+
+
+static void register_hooks(apr_pool_t *p)
+{
+ proxy_hook_scheme_handler(uwsgi_handler, NULL, NULL, APR_HOOK_FIRST);
+ proxy_hook_canon_handler(uwsgi_canon, NULL, NULL, APR_HOOK_FIRST);
+}
+
+
+module AP_MODULE_DECLARE_DATA proxy_uwsgi_module = {
+ STANDARD20_MODULE_STUFF,
+ NULL, /* create per-directory config structure */
+ NULL, /* merge per-directory config structures */
+ NULL, /* create per-server config structure */
+ NULL, /* merge per-server config structures */
+ NULL, /* command table */
+ register_hooks /* register hooks */
+};
View
17 apache2/mod_uwsgi.c
@@ -517,7 +517,22 @@ static int uwsgi_handler(request_rec *r) {
free(uwsgi_vars);
return HTTP_INTERNAL_SERVER_ERROR;
}
- apr_brigade_write(bb, NULL, NULL, buf, cnt);
+ if (!c->cgi_mode) {
+ b = apr_bucket_transient_create(buf, cnt, r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+ b = apr_bucket_flush_create(r->connection->bucket_alloc);
+ APR_BRIGADE_INSERT_TAIL(bb, b);
+ if (ap_pass_brigade(r->output_filters, bb) != APR_SUCCESS) {
+ close(uwsgi_poll.fd);
+ apr_brigade_destroy(bb);
+ free(uwsgi_vars);
+ return HTTP_INTERNAL_SERVER_ERROR;
+ }
+ apr_brigade_cleanup(bb);
+ }
+ else {
+ apr_brigade_write(bb, NULL, NULL, buf, cnt);
+ }
}
else {
// EOF
View
27 async.c
@@ -214,6 +214,7 @@ void *async_loop(void *arg1) {
if (uwsgi.signal_socket > -1) {
event_queue_add_fd_read(uwsgi.async_queue, uwsgi.signal_socket);
+ event_queue_add_fd_read(uwsgi.async_queue, uwsgi.my_signal_socket);
}
// set a default request manager
@@ -253,6 +254,12 @@ void *async_loop(void *arg1) {
// manage events
interesting_fd = event_queue_interesting_fd(events, i);
+ if (uwsgi.signal_socket > -1 && (interesting_fd == uwsgi.signal_socket || interesting_fd == uwsgi.my_signal_socket)) {
+ uwsgi_receive_signal(interesting_fd, "worker", uwsgi.mywid);
+ continue;
+ }
+
+
is_a_new_connection = 0;
// new request coming in ?
@@ -271,7 +278,7 @@ void *async_loop(void *arg1) {
uwsgi_log("async queue is full !!!\n");
last_now = now;
}
- break;;
+ break;
}
wsgi_req_setup(uwsgi.wsgi_req, uwsgi.wsgi_req->async_id, uwsgi_sock );
@@ -289,15 +296,15 @@ void *async_loop(void *arg1) {
// on linux we do not need to reset the socket to blocking state
#ifndef __linux__
- if (uwsgi.numproc > 1) {
- /* re-set blocking socket */
- if (fcntl(uwsgi.wsgi_req->poll.fd, F_SETFL, uwsgi_sock->arg) < 0) {
- uwsgi_error("fcntl()");
- uwsgi.async_queue_unused_ptr++;
- uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = uwsgi.wsgi_req;
- break;
- }
- }
+ /* re-set blocking socket */
+ int arg = uwsgi_sock->arg;
+ arg &= (~O_NONBLOCK);
+ if (fcntl(uwsgi.wsgi_req->poll.fd, F_SETFL, arg) < 0) {
+ uwsgi_error("fcntl()");
+ uwsgi.async_queue_unused_ptr++;
+ uwsgi.async_queue_unused[uwsgi.async_queue_unused_ptr] = uwsgi.wsgi_req;
+ break;
+ }
#endif
if (wsgi_req_async_recv(uwsgi.wsgi_req)) {
View
45 buildconf/base.ini
@@ -0,0 +1,45 @@
+[uwsgi]
+xml = auto
+ini = true
+yaml = true
+json = auto
+sqlite3 = auto
+zeromq = auto
+snmp = true
+sctp = false
+spooler = true
+embedded = true
+ssl = auto
+udp = true
+multicast = true
+threading = true
+sendfile = true
+minterpreters = true
+async = true
+ldap = auto
+pcre = auto
+routing = auto
+debug = false
+unbit = false
+xml_implementation = libxml2
+yaml_implementation = auto
+malloc_implementation = libc
+extras =
+plugins =
+bin_name = uwsgi
+append_version =
+plugin_dir = .
+embedded_plugins = %(main_plugin)s, ping, cache, nagios, rrdtool, carbon, rpc, corerouter, fastrouter, http, ugreen, signal, syslog, rsyslog, logsocket, router_uwsgi, router_redirect, router_basicauth, zergpool, redislog
+as_shared_library = false
+
+locking = auto
+event = auto
+timer = auto
+filemonitor = auto
+
+blacklist =
+whitelist =
+
+embed_files =
+
+embed_config =
View
3  buildconf/cgi.ini
@@ -0,0 +1,3 @@
+[uwsgi]
+main_plugin = cgi
+inherit = base
View
3  buildconf/core.ini
@@ -1,3 +1,4 @@
[uwsgi]
-inherit = default
+main_plugin =
+inherit = base
embedded_plugins = null
View
51 buildconf/default.ini
@@ -1,50 +1,3 @@
[uwsgi]
-xml = auto
-ini = true
-yaml = true
-json = auto
-sqlite3 = auto
-zeromq = auto
-snmp = true
-sctp = false
-spooler = true
-embedded = true
-udp = true
-multicast = true
-threading = true
-sendfile = true
-minterpreters = true
-async = true
-ldap = auto
-pcre = auto
-routing = auto
-debug = false
-unbit = false
-xml_implementation = libxml2
-yaml_implementation = auto
-malloc_implementation = libc
-extras =
-additional_include_paths = /home/reddit/uwsgi-fork/libs/zeromq/include
-additional_lib_paths = /home/reddit/uwsgi-fork/libs/zeromq/lib
-plugins =
-bin_name = uwsgi
-append_version =
-plugin_dir = .
-embedded_plugins = python, ping, cache, nagios, rrdtool, carbon, rpc, fastrouter, http, ugreen, signal, syslog, rsyslog, logsocket, router_uwsgi, router_redirect, router_basicauth, zergpool, cgi
-as_shared_library = false
-
-locking = auto
-event = auto
-timer = auto
-filemonitor = auto
-
-blacklist =
-whitelist =
-
-embed_files =
-
-embed_config =
-
-[python]
-paste = true
-web3 = true
+main_plugin = python
+inherit = base
View
3  buildconf/django.ini
@@ -1,3 +1,4 @@
[uwsgi]
-inherit = default
+main_plugin = python
+inherit = base
embed_files = bootstrap3.py,django.zip,djapp.zip
View
3  buildconf/embedded.ini
@@ -1,4 +1,5 @@
[uwsgi]
-inherit = default
+main_plugin = python
+inherit = base
embed_files = embedme.py,uwsgidecorators.py
View
4 buildconf/erlang.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = +,erlang
+main_plugin = erlang
+inherit = base
View
4 buildconf/gevent.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = python, ping, cache, nagios, rpc, fastrouter, http, gevent
+main_plugin = python,gevent
+inherit = base
View
2  buildconf/lib.ini
@@ -1,5 +1,5 @@
[uwsgi]
-inherit = default
+inherit = base
bin_name = libuwsgi.so
embedded_plugins = null
as_shared_library = true
View
4 buildconf/lua.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = lua
+main_plugin = lua
+inherit = base
View
3  buildconf/luap.ini
@@ -1,3 +1,4 @@
[uwsgi]
-inherit = default
+main_plugin =
+inherit = base
plugins = lua
View
2  buildconf/modular.ini
@@ -1,5 +1,5 @@
[uwsgi]
-inherit = default
+inherit = base
plugins = python, ping, rack, psgi, cache, http, fastrouter
plugin_dir = plugins
embedded_plugins = null
View
4 buildconf/package.ini
@@ -1,4 +1,4 @@
[uwsgi]
-inherit = default
+inherit = base
plugin_dir = /usr/lib/uwsgi
-embedded_plugins = null
+embedded_plugins =
View
3  buildconf/php.ini
@@ -0,0 +1,3 @@
+[uwsgi]
+main_plugin = php
+inherit = base
View
4 buildconf/psgi.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = psgi
+main_plugin = psgi
+inherit = base
View
4 buildconf/pyerl.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = +,erlang,pyerl
+main_plugin = python,erlang,pyerl
+inherit = base
View
4 buildconf/pylua.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = +,lua
+main_plugin = python,lua
+inherit = base
View
6 buildconf/pypy.ini
@@ -1,5 +1,3 @@
[uwsgi]
-inherit = default
-bin_name = libuwsgi.so
-embedded_plugins = symcall
-as_shared_library = true
+main_plugin = pypy
+inherit = base
View
4 buildconf/pyuwsgi.ini
@@ -1,5 +1,5 @@
[uwsgi]
-inherit = default
+main_plugin = pyuwsgi
+inherit = base
bin_name = pyuwsgi.so
-embedded_plugins = pyuwsgi,python,ping,cache,nagios,rpc,ugreen
as_shared_library = true
View
4 buildconf/rack.ini
@@ -1,3 +1,3 @@
[uwsgi]
-inherit = default
-embedded_plugins = rack, ping, cache, nagios, rpc, fastrouter, http, ugreen
+main_plugin = rack
+inherit = base
View
4 buildconf/upcode.ini
@@ -43,7 +43,3 @@ whitelist =
embed_files =
embed_config =
-
-[python]
-paste = true
-web3 = true
View
100 contrib/uwsgisubscribers.ru
@@ -0,0 +1,100 @@
+require 'sinatra'
+require 'socket'
+require 'json'
+
+module USubscribers
+ def self.uwsgi_get_stats(server)
+ parts = server.split(':')
+ begin
+ if parts.length > 1:
+ s = TCPSocket.open(parts[0], parts[1])
+ else
+ s = UNIXSocket.open(server)
+ end
+ return JSON.parse(s.read())
+ rescue
+ nil
+ end
+ end
+end
+
+template = <<eof
+<!DOCTYPE html>
+<html>
+ <head>
+ <meta charset="utf-8">
+ <title><%=@title%></title>
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <meta name="description" content="">
+ <meta name="author" content="unbit">
+ <link href="http://unbit.github.com/bootstrap/css/bootstrap.css" rel="stylesheet">
+ <style>
+ body {
+ padding-top: 60px; /* 60px to make the container go all the way to the bottom of the topbar */
+ }
+ </style>
+ <link href="http://unbit.github.com/bootstrap/css/bootstrap-responsive.css" rel="stylesheet">
+ </head>
+ <body>
+
+ <div class="navbar navbar-fixed-top">
+ <div class="navbar-inner">
+ <div class="container">
+ <a class="brand" href="#"><%=@title%></a>
+ </div>
+ </div>
+ </div>
+
+ <% for server in @servers.keys %>
+ <div class="container">
+ <hr/>
+ <h1>SubscriptionServer: <%=server%></h1>
+ <hr/>
+
+ <div class="row">
+ <% for pool in @servers[server] %>
+ <div class="span6">
+ <h3><%=pool['key']%> (<%=pool['hits']%> hits)</h3>
+ <table class="table table-striped table-bordered">
+ <thead>
+ <tr>
+ <th>node</th><th>load</th><th>requests</th>
+ <th>last check</th>
+ <th>fail count</th>
+ </tr>
+ </thead>
+ <% for node in pool['nodes'] %>
+ <tr>
+ <td><%=node['name']%></td><td><%=node['load']%></td><td><%=node['requests']%></td>
+ <td><%=Time.at(node['last_check']).strftime("%d-%m-%Y %H:%M:%S")%></td>
+ <td><%=node['failcnt']%></td>
+ </tr>
+ <% end %>
+ </table>
+ </div>
+ <% end %>
+ </div>
+
+ </div>
+
+ <% end %>
+
+ <script src="http://unbit.github.com/jquery-1.7.2.min.js" type="text/javascript"></script>
+ <script src="http://unbit.github.com/bootstrap/js/bootstrap.min.js" type="text/javascript"></script>
+ </body>
+</html>
+eof
+
+get '/' do
+ @servers = {}
+ for server in ENV['U_SERVERS'].split(',')
+ stats = USubscribers::uwsgi_get_stats(server)
+ if stats
+ @servers[server] = stats['subscriptions']
+ end
+ end
+ @title = 'uWSGI subscriptions viewer'
+ erb template
+end
+
+run Sinatra::Application
View
85 emperor.c
@@ -871,72 +871,83 @@ void emperor_loop() {
}
-#define stats_send_llu(x, y) fprintf(output, x, (long long unsigned int) y)
-#define stats_send(x, y) fprintf(output, x, y)
-
void emperor_send_stats(int fd) {
struct sockaddr_un client_src;
socklen_t client_src_len = 0;
+
int client_fd = accept(fd, (struct sockaddr *) &client_src, &client_src_len);
if (client_fd < 0) {
uwsgi_error("accept()");
return;
}
- FILE *output = fdopen(client_fd, "w");
- if (!output) {
- uwsgi_error("fdopen()");
- close(client_fd);
- return;
- }
-
- stats_send("{ \"version\": \"%s\",\n", UWSGI_VERSION);
-
- fprintf(output,"\"pid\": %d,\n", (int)(getpid()));
- fprintf(output,"\"uid\": %d,\n", (int)(getuid()));
- fprintf(output,"\"gid\": %d,\n", (int)(getgid()));
+ struct uwsgi_stats *us = uwsgi_stats_new(8192);
- char *cwd = uwsgi_get_cwd();
- stats_send("\"cwd\": \"%s\",\n", cwd);
- free(cwd);
+ if (uwsgi_stats_keyval_comma(us, "version", UWSGI_VERSION)) goto end;
+ if (uwsgi_stats_keylong_comma(us, "pid", (unsigned long long) getpid())) goto end;
+ if (uwsgi_stats_keylong_comma(us, "uid", (unsigned long long) getuid())) goto end;
+ if (uwsgi_stats_keylong_comma(us, "gid", (unsigned long long) getgid())) goto end;
- stats_send("\"emperor\": \"%s\",\n", uwsgi.emperor_dir);
+ char *cwd = uwsgi_get_cwd();
+ if (uwsgi_stats_keyval_comma(us, "cwd", cwd)) goto end0;
- fprintf(output,"\"emperor_tyrant\": %d,\n", uwsgi.emperor_tyrant);
+ if (uwsgi_stats_keyval_comma(us, "emperor", uwsgi.emperor_dir)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "emperor_tyrant", (unsigned long long) uwsgi.emperor_tyrant)) goto end0;
- fprintf(output, "\"vassals\": [\n");
+ if (uwsgi_stats_key(us ,"vassals")) goto end0;
+ if (uwsgi_stats_list_open(us)) goto end0;
struct uwsgi_instance *c_ui = ui->ui_next;
while (c_ui) {
- fprintf(output,"\t{");
- stats_send("\"id\": \"%s\", ", c_ui->name);
- fprintf(output,"\"pid\": %d, ", (int) c_ui->pid);
+ if (uwsgi_stats_object_open(us)) goto end0;
+
+ if (uwsgi_stats_keyval_comma(us, "id", c_ui->name)) goto end0;
- stats_send_llu( "\"born\": %llu, ", c_ui->born);
- stats_send_llu( "\"last_mod\": %llu, ", c_ui->last_mod);
+ if (uwsgi_stats_keylong_comma(us, "pid", (unsigned long long) c_ui->pid)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "born", (unsigned long long) c_ui->born)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "last_mod", (unsigned long long) c_ui->last_mod)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "loyal", (unsigned long long) c_ui->loyal)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "zerg", (unsigned long long) c_ui->zerg)) goto end0;
- fprintf(output,"\"loyal\": %d, ", c_ui->loyal);
- fprintf(output,"\"zerg\": %d, ", c_ui->zerg);
+ if (uwsgi_stats_keylong_comma(us, "uid", (unsigned long long) c_ui->uid)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "gid", (unsigned long long) c_ui->gid)) goto end0;
- fprintf(output,"\"uid\": %d, ", (int)c_ui->uid);
- fprintf(output,"\"gid\": %d, ", (int)c_ui->gid);
+ if (uwsgi_stats_keylong(us, "respawns", (unsigned long long) c_ui->respawns)) goto end0;
+
+ if (uwsgi_stats_object_close(us)) goto end0;
- stats_send_llu( "\"respawns\": %llu ", c_ui->respawns);
c_ui = c_ui->ui_next;
if (c_ui) {
- fprintf(output,"},\n");
- }
- else {
- fprintf(output,"}\n");
+ if (uwsgi_stats_comma(us)) goto end0;
}
}
- fprintf(output,"]}\n");
- fclose(output);
+ if (uwsgi_stats_list_close(us)) goto end0;
+ if (uwsgi_stats_object_close(us)) goto end0;
+
+ size_t remains = us->pos;
+ off_t pos = 0;
+ while(remains > 0) {
+ ssize_t res = write(client_fd, us->base + pos, remains);
+ if (res <= 0) {
+ if (res < 0) {
+ uwsgi_error("write()");
+ }
+ goto end0;
+ }
+ pos += res;
+ remains -= res;
+ }
+end0:
+ free(cwd);
+end:
+ free(us->base);
+ free(us);
+ close(client_fd);
}
View
51 event.c
@@ -42,6 +42,18 @@ int event_queue_fd_write_to_read(int eq, int fd) {
}
+int event_queue_fd_read_to_write(int eq, int fd) {
+
+ if (port_associate(eq, PORT_SOURCE_FD, fd, POLLOUT, NULL)) {
+ uwsgi_error("port_associate");
+ return -1;
+ }
+
+ return fd;
+
+}
+
+
int event_queue_interesting_fd_has_error(void *events, int id) {
port_event_t *pe = (port_event_t *) events;
@@ -216,6 +228,23 @@ int event_queue_fd_write_to_read(int eq, int fd) {
return fd;
}
+int event_queue_fd_read_to_write(int eq, int fd) {
+
+ struct epoll_event ee;
+
+ memset(&ee, 0, sizeof(struct epoll_event));
+ ee.events = EPOLLOUT;
+ ee.data.fd = fd;
+
+ if (epoll_ctl(eq, EPOLL_CTL_MOD, fd, &ee)) {
+ uwsgi_error("epoll_ctl()");
+ return -1;
+ }
+
+ return fd;
+}
+
+
int event_queue_del_fd(int eq, int fd, int event) {
struct epoll_event ee;
@@ -345,6 +374,28 @@ int event_queue_fd_write_to_read(int eq, int fd) {
return fd;
}
+int event_queue_fd_read_to_write(int eq, int fd) {
+
+ struct kevent kev;
+
+#ifndef __FreeBSD__
+ EV_SET(&kev, fd, EVFILT_READ, EV_DISABLE, 0, 0, 0);
+ if (kevent(eq, &kev, 1, NULL, 0, NULL) < 0) {
+ uwsgi_error("kevent()");
+ return -1;
+ }
+#endif
+
+ EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
+ if (kevent(eq, &kev, 1, NULL, 0, NULL) < 0) {
+ uwsgi_error("kevent()");
+ return -1;
+ }
+
+ return fd;
+}
+
+
int event_queue_del_fd(int eq, int fd, int event) {
struct kevent kev;
View
5 gateway.c
@@ -2,7 +2,7 @@
extern struct uwsgi_server uwsgi;
-struct uwsgi_gateway *register_gateway(char *name, void (*loop)(int)) {
+struct uwsgi_gateway *register_gateway(char *name, void (*loop)(int, void *), void *data) {
struct uwsgi_gateway *ug;
int num=1,i;
@@ -26,6 +26,7 @@ struct uwsgi_gateway *register_gateway(char *name, void (*loop)(int)) {
ug->loop = loop;
ug->num = num;
ug->fullname = fullname;
+ ug->data = data;
if (socketpair(AF_UNIX, SOCK_DGRAM, 0, ug->internal_subscription_pipe)) {
uwsgi_error("socketpair()");
@@ -71,7 +72,7 @@ void gateway_respawn(int id) {
signal(SIGUSR2, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
- ug->loop(id);
+ ug->loop(id, ug->data);
// never here !!! (i hope)
exit(1);
}
View
19 install.sh
@@ -0,0 +1,19 @@
+echo "*** uWSGI installer ***"
+if [ $# -ne 2 ]
+then
+ echo "Usage: install.sh <profile> <binary_path>"
+ exit 1
+fi
+
+if [ ${2:0:1} != "/" ]
+then
+ echo "uWSGI binary path must be absolute !!!"
+ exit 1
+fi
+
+echo "downloading latest uWSGI tarball..."
+curl -o uwsgi_latest_from_installer.tar.gz http://projects.unbit.it/downloads/uwsgi-latest.tar.gz
+mkdir uwsgi_latest_from_installer
+tar zvxC uwsgi_latest_from_installer --strip-components=1 -f uwsgi_latest_from_installer.tar.gz
+cd uwsgi_latest_from_installer
+UWSGI_PROFILE="$1" UWSGI_BIN_NAME="$2" make
View
32 lock.c
@@ -47,6 +47,7 @@ static struct uwsgi_lock_item *uwsgi_register_lock(char *id, int rw) {
#ifdef EOWNERDEAD
#define UWSGI_LOCK_ENGINE_NAME "pthread robust mutexes"
+int uwsgi_pthread_robust_mutexes_enabled = 1;
#else
#define UWSGI_LOCK_ENGINE_NAME "pthread mutexes"
#endif
@@ -66,30 +67,49 @@ struct uwsgi_lock_item *uwsgi_lock_fast_init(char *id) {
struct uwsgi_lock_item *uli = uwsgi_register_lock(id, 0);
+#ifdef EOWNERDEAD
+retry:
+#endif
if (pthread_mutexattr_init(&attr)) {
uwsgi_log("unable to allocate mutexattr structure\n");
exit(1);
}
+
if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) {
uwsgi_log("unable to share mutex\n");
exit(1);
}
#ifdef EOWNERDEAD
- if (pthread_mutexattr_setrobust_np(&attr, PTHREAD_MUTEX_ROBUST_NP)) {
- uwsgi_log("unable to make the mutex 'robust'\n");
- exit(1);
- }
+ if (uwsgi_pthread_robust_mutexes_enabled) {
+ if (pthread_mutexattr_setrobust_np(&attr, PTHREAD_MUTEX_ROBUST_NP)) {
+ uwsgi_log("unable to make the mutex 'robust'\n");
+ exit(1);
+ }
+ }
#endif
if (pthread_mutex_init((pthread_mutex_t *) uli->lock_ptr, &attr)) {
+#ifdef EOWNERDEAD
+ if (uwsgi_pthread_robust_mutexes_enabled) {
+ uwsgi_log("!!! it looks like your kernel does not support pthread robust mutexes !!!\n");
+ uwsgi_log("!!! falling back to standard pthread mutexes !!!\n");
+ uwsgi_pthread_robust_mutexes_enabled = 0;
+ pthread_mutexattr_destroy(&attr);
+ goto retry;
+ }
+#endif
uwsgi_log("unable to initialize mutex\n");
exit(1);
}
pthread_mutexattr_destroy(&attr);
-#ifndef EOWNERDEAD
+#ifdef EOWNERDEAD
+ if (!uwsgi_pthread_robust_mutexes_enabled) {
+ uli->can_deadlock = 1;
+ }
+#else
uli->can_deadlock = 1;
#endif
@@ -478,7 +498,7 @@ pid_t uwsgi_rwlock_ipcsem_check(struct uwsgi_lock_item *uli) { return uwsgi_lock
void uwsgi_setup_locking() {
- // use the fastest avaikable locking
+ // use the fastest available locking
if (uwsgi.lock_engine) {
if (!strcmp(uwsgi.lock_engine, "ipcsem")) {
uwsgi_log_initial("lock engine: ipcsem\n");
View
31 master.c
@@ -28,6 +28,15 @@ void suspend_resume_them_all(int signum) {
uwsgi.workers[0].suspended = 1;
}
+ // subscribe/unsubscribe if needed
+ struct uwsgi_string_list *subscriptions = uwsgi.subscriptions;
+ while(subscriptions) {
+ uwsgi_log("%s %s\n", suspend ? "unsubscribing from" : "subscribing to", subscriptions->value);
+ uwsgi_subscribe(subscriptions->value, suspend);
+ subscriptions = subscriptions->next;
+ }
+
+
for (i = 1; i <= uwsgi.numproc; i++) {
uwsgi.workers[i].suspended = suspend;
if (uwsgi.workers[i].pid > 0) {
@@ -70,15 +79,13 @@ void expire_rb_timeouts(struct rb_root *root) {
int uwsgi_master_log(void) {
- char log_buf[4096];
-
- ssize_t rlen = read(uwsgi.shared->worker_log_pipe[0], log_buf, 4096);
+ ssize_t rlen = read(uwsgi.shared->worker_log_pipe[0], uwsgi.log_master_buf, uwsgi.log_master_bufsize);
if (rlen > 0) {
if (uwsgi.choosen_logger) {
- uwsgi.choosen_logger->func(uwsgi.choosen_logger, log_buf, rlen);
+ uwsgi.choosen_logger->func(uwsgi.choosen_logger, uwsgi.log_master_buf, rlen);
}
else {
- rlen = write(uwsgi.original_log_fd, log_buf, rlen);
+ rlen = write(uwsgi.original_log_fd, uwsgi.log_master_buf, rlen);
}
// TODO allow uwsgi.logger = func
return 0;
@@ -151,10 +158,13 @@ void uwsgi_subscribe(char *subscription, uint8_t cmd) {
int keysize = 0;
char *modifier1 = NULL;
int modifier1_len = 0;
+ char *udp_address = subscription;
- char *udp_address = strchr(subscription, ':');
- if (!udp_address)
- return;
+ if (subscription[0] != '/') {
+ udp_address = strchr(subscription, ':');
+ if (!udp_address)
+ return;
+ }
char *subscription_key = strchr(udp_address + 1, ':');
if (!subscription_key)
@@ -392,6 +402,7 @@ int master_loop(char **argv, char **environ) {
}
if (uwsgi.log_master) {
+ uwsgi.log_master_buf = uwsgi_malloc(uwsgi.log_master_bufsize);
if (!uwsgi.threaded_logger) {
#ifdef UWSGI_DEBUG
uwsgi_log("adding %d to master logging\n", uwsgi.shared->worker_log_pipe[0]);
@@ -727,7 +738,7 @@ int master_loop(char **argv, char **environ) {
}
// cheaper management
- if (uwsgi.cheaper && !uwsgi.cheap && !uwsgi.to_heaven && !uwsgi.to_hell) {
+ if (uwsgi.cheaper && !uwsgi.cheap && !uwsgi.to_heaven && !uwsgi.to_hell && !uwsgi.workers[0].suspended) {
if (!uwsgi_calc_cheaper()) return 0;
}
@@ -1377,7 +1388,7 @@ int master_loop(char **argv, char **environ) {
}
// resubscribe every 10 cycles by default
- if ((uwsgi.subscriptions && ((uwsgi.master_cycles % uwsgi.subscribe_freq) == 0 || uwsgi.master_cycles == 1)) && !uwsgi.to_heaven && !uwsgi.to_hell) {
+ if ((uwsgi.subscriptions && ((uwsgi.master_cycles % uwsgi.subscribe_freq) == 0 || uwsgi.master_cycles == 1)) && !uwsgi.to_heaven && !uwsgi.to_hell && !uwsgi.workers[0].suspended) {
struct uwsgi_string_list *subscriptions = uwsgi.subscriptions;
while (subscriptions) {
uwsgi_subscribe(subscriptions->value, 0);
View
189 master_utils.c
@@ -238,6 +238,9 @@ void uwsgi_reload(char **argv) {
waitpid(WAIT_ANY, &waitpid_status, WNOHANG);
}
+ // call atexit user exec
+ uwsgi_exec_atexit();
+
if (uwsgi.exit_on_reload) {
uwsgi_log("uWSGI: GAME OVER (insert coin)\n");
exit(0);
@@ -264,6 +267,17 @@ void uwsgi_reload(char **argv) {
uwsgi_sock = uwsgi_sock->next;
}
+ uwsgi_sock = uwsgi.shared_sockets;
+ while (uwsgi_sock) {
+ if (i == uwsgi_sock->fd) {
+ uwsgi_log("found fd %d mapped to shared socket %d (%s)\n", i, uwsgi_get_shared_socket_num(uwsgi_sock), uwsgi_sock->name);
+ found = 1;
+ break;
+ }
+ uwsgi_sock = uwsgi_sock->next;
+ }
+
+
if (!found) {
if (uwsgi.has_emperor) {
if (i == uwsgi.emperor_fd) {
@@ -692,158 +706,179 @@ void uwsgi_manage_command_cron(time_t now) {
}
-#define stats_send_llu(x, y) fprintf(output, x, (long long unsigned int) y)
-#define stats_send(x, y) fprintf(output, x, y)
-
void uwsgi_send_stats(int fd) {
int i, j;
struct sockaddr_un client_src;
struct uwsgi_app *ua;
socklen_t client_src_len = 0;
+
int client_fd = accept(fd, (struct sockaddr *) &client_src, &client_src_len);
if (client_fd < 0) {
uwsgi_error("accept()");
return;
}
- FILE *output = fdopen(client_fd, "w");
- if (!output) {
- uwsgi_error("fdopen()");
- close(client_fd);
- return;
- }
+ struct uwsgi_stats *us = uwsgi_stats_new(8192);
- stats_send("{ \"version\": \"%s\",\n", UWSGI_VERSION);
+ if (uwsgi_stats_keyval_comma(us, "version", UWSGI_VERSION)) goto end;
#ifdef __linux__
- stats_send_llu("\"listen_queue\": %llu,\n", (unsigned long long) uwsgi.shared->options[UWSGI_OPTION_BACKLOG_STATUS]);
- stats_send_llu("\"listen_queue_errors\": %llu,\n", (unsigned long long) uwsgi.shared->options[UWSGI_OPTION_BACKLOG_ERRORS]);
+ if (uwsgi_stats_keylong_comma(us, "listen_queue", (unsigned long long) uwsgi.shared->options[UWSGI_OPTION_BACKLOG_STATUS])) goto end;
+ if (uwsgi_stats_keylong_comma(us, "listen_queue_errors", (unsigned long long) uwsgi.shared->options[UWSGI_OPTION_BACKLOG_ERRORS])) goto end;
#endif
- stats_send_llu("\"load\": %llu,\n", uwsgi.shared->load);
-
- fprintf(output, "\"pid\": %d,\n", (int) (getpid()));
- fprintf(output, "\"uid\": %d,\n", (int) (getuid()));
- fprintf(output, "\"gid\": %d,\n", (int) (getgid()));
+ if (uwsgi_stats_keylong_comma(us, "load", (unsigned long long) uwsgi.shared->load)) goto end;
+ if (uwsgi_stats_keylong_comma(us, "pid", (unsigned long long) getpid())) goto end;
+ if (uwsgi_stats_keylong_comma(us, "uid", (unsigned long long) getuid())) goto end;
+ if (uwsgi_stats_keylong_comma(us, "gid", (unsigned long long) getgid())) goto end;
char *cwd = uwsgi_get_cwd();
- stats_send("\"cwd\": \"%s\",\n", cwd);
- free(cwd);
+ if (uwsgi_stats_keyval_comma(us, "cwd", cwd)) goto end0;
if (uwsgi.daemons) {
- fprintf(output, "\"daemons\": [\n");
+ if (uwsgi_stats_key(us ,"daemons")) goto end0;
+ if (uwsgi_stats_list_open(us)) goto end0;
+
struct uwsgi_daemon *ud = uwsgi.daemons;
while (ud) {
- fprintf(output, "\t{ \"cmd\": \"%s\", \"pid\": %d, \"respawns\": %llu }", ud->command, (int) ud->pid, (unsigned long long) ud->respawns - 1);
- if (ud->next)
- fprintf(output, ",\n");
- else {
- fprintf(output, "\n");
+ if (uwsgi_stats_object_open(us)) goto end0;
+ if (uwsgi_stats_keyval_comma(us, "cmd", ud->command)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "pid", (unsigned long long) ud->pid)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "respawns", (unsigned long long) (ud->respawns - 1))) goto end0;
+ if (uwsgi_stats_object_close(us)) goto end0;
+ if (ud->next) {
+ if (uwsgi_stats_comma(us)) goto end0;
}
ud = ud->next;
}
- fprintf(output, "],\n");
+ if (uwsgi_stats_list_close(us)) goto end0;
+ if (uwsgi_stats_comma(us)) goto end0;
}
- fprintf(output, "\"locks\": [\n");
+ if (uwsgi_stats_key(us ,"locks")) goto end0;
+ if (uwsgi_stats_list_open(us)) goto end0;
struct uwsgi_lock_item *uli = uwsgi.registered_locks;
while (uli) {
+ if (uwsgi_stats_object_open(us)) goto end0;
+ if (uwsgi_stats_keylong(us, uli->id, (unsigned long long) uli->pid)) goto end0;
+ if (uwsgi_stats_object_close(us)) goto end0;
if (uli->next) {
- fprintf(output, "\t{ \"%s\": %d },\n", uli->id, (int) uli->pid);
- }
- else {
- fprintf(output, "\t{ \"%s\": %d }\n", uli->id, (int) uli->pid);
+ if (uwsgi_stats_comma(us)) goto end0;
}
uli = uli->next;
}
- fprintf(output, "],\n");
+ if (uwsgi_stats_list_close(us)) goto end0;
+ if (uwsgi_stats_comma(us)) goto end0;
- fprintf(output, "\"workers\": [\n");
+ if (uwsgi_stats_key(us ,"workers")) goto end0;
+ if (uwsgi_stats_list_open(us)) goto end0;
for (i = 0; i < uwsgi.numproc; i++) {
- fprintf(output, "\t{");
+ if (uwsgi_stats_object_open(us)) goto end0;
- fprintf(output, "\"id\": %d, ", uwsgi.workers[i + 1].id);
- fprintf(output, "\"pid\": %d, ", (int) uwsgi.workers[i + 1].pid);
- stats_send_llu("\"requests\": %llu, ", uwsgi.workers[i + 1].requests);
- stats_send_llu("\"delta_requests\": %llu, ", uwsgi.workers[i + 1].delta_requests);
- stats_send_llu("\"exceptions\": %llu, ", uwsgi.workers[i + 1].exceptions);
- stats_send_llu("\"signals\": %llu, ", uwsgi.workers[i + 1].signals);
- stats_send_llu("\"static_offload_threads\": %llu, ", uwsgi.workers[i + 1].static_offload_threads);
+ if (uwsgi_stats_keylong_comma(us, "id", (unsigned long long) uwsgi.workers[i + 1].id)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "pid", (unsigned long long) uwsgi.workers[i + 1].pid)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "requests", (unsigned long long) uwsgi.workers[i + 1].requests)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "delta_requests", (unsigned long long) uwsgi.workers[i + 1].delta_requests)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "exceptions", (unsigned long long) uwsgi.workers[i + 1].exceptions)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "signals", (unsigned long long) uwsgi.workers[i + 1].signals)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "static_offload_threads", (unsigned long long) uwsgi.workers[i + 1].static_offload_threads)) goto end0;
if (uwsgi.workers[i + 1].cheaped) {
- fprintf(output, "\"status\": \"cheap\", ");
+ if (uwsgi_stats_keyval_comma(us, "status", "cheap")) goto end0;
}
else if (uwsgi.workers[i + 1].suspended && !uwsgi.workers[i + 1].busy) {
- fprintf(output, "\"status\": \"pause\", ");
+ if (uwsgi_stats_keyval_comma(us, "status", "pause")) goto end0;
}
else {
if (uwsgi.workers[i + 1].sig) {
- fprintf(output, "\"status\": \"sig%d\", ", uwsgi.workers[i + 1].signum);
+ if (uwsgi_stats_keyvalnum_comma(us, "status", "sig", (unsigned long long) uwsgi.workers[i + 1].signum)) goto end0;
}
else if (uwsgi.workers[i + 1].busy) {
- fprintf(output, "\"status\": \"busy\", ");
+ if (uwsgi_stats_keyval_comma(us, "status", "busy")) goto end0;
}
else {
- fprintf(output, "\"status\": \"idle\", ");
+ if (uwsgi_stats_keyval_comma(us, "status", "idle")) goto end0;
}
}
- stats_send_llu("\"rss\": %llu, ", uwsgi.workers[i + 1].rss_size);
- stats_send_llu("\"vsz\": %llu, ", uwsgi.workers[i + 1].vsz_size);
+ if (uwsgi_stats_keylong_comma(us, "rss", (unsigned long long) uwsgi.workers[i + 1].rss_size)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "vsz", (unsigned long long) uwsgi.workers[i + 1].vsz_size)) goto end0;
+
+ if (uwsgi_stats_keylong_comma(us, "running_time", (unsigned long long) uwsgi.workers[i + 1].running_time)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "last_spwan", (unsigned long long) uwsgi.workers[i + 1].last_spawn)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "respawn_count", (unsigned long long) uwsgi.workers[i + 1].respawn_count)) goto end0;
- stats_send_llu("\"running_time\": %llu, ", uwsgi.workers[i + 1].running_time);
- stats_send_llu("\"last_spawn\": %llu, ", uwsgi.workers[i + 1].last_spawn);
- stats_send_llu("\"respawn_count\": %llu, ", uwsgi.workers[i + 1].respawn_count);
- stats_send_llu("\"tx\": %llu, ", uwsgi.workers[i + 1].tx);
- stats_send_llu("\"avg_rt\": %llu, ", uwsgi.workers[i + 1].avg_response_time);
+ if (uwsgi_stats_keylong_comma(us, "tx", (unsigned long long) uwsgi.workers[i + 1].tx)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "avg_rt", (unsigned long long) uwsgi.workers[i + 1].avg_response_time)) goto end0;
- fprintf(output, "\"apps\": [\n");
+ if (uwsgi_stats_key(us ,"apps")) goto end0;
+ if (uwsgi_stats_list_open(us)) goto end0;
+
for (j = 0; j < uwsgi.workers[i + 1].apps_cnt; j++) {
ua = &uwsgi.workers[i + 1].apps[j];
- fprintf(output, "\t\t{ ");
- fprintf(output, "\"id\": %d, ", j);
- fprintf(output, "\"modifier1\": %d, ", ua->modifier1);
- fprintf(output, "\"mountpoint\": \"%.*s\", ", ua->mountpoint_len, ua->mountpoint);
+ if (uwsgi_stats_object_open(us)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "id", (unsigned long long) j)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "modifier1", (unsigned long long) ua->modifier1)) goto end0;
+ if (uwsgi_stats_keyvaln_comma(us, "mountpoint", ua->mountpoint, ua->mountpoint_len)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "startup_time", ua->startup_time)) goto end0;
- stats_send_llu("\"startup_time\": %llu, ", ua->startup_time);
-
- stats_send_llu("\"requests\": %llu, ", ua->requests);
- stats_send_llu("\"exceptions\": %llu, ", ua->exceptions);
+ if (uwsgi_stats_keylong_comma(us, "requests", ua->requests)) goto end0;
+ if (uwsgi_stats_keylong_comma(us, "exceptions", ua->exceptions)) goto end0;
if (ua->chdir) {
- fprintf(output, "\"chdir\": \"%s\", ", ua->chdir);
+ if (uwsgi_stats_keyval(us, "chdir", ua->chdir)) goto end0;
}
else {
- fprintf(output, "\"chdir\": \"\" ");
+ if (uwsgi_stats_keyval(us, "chdir", "")) goto end0;
}
- if (j == uwsgi.workers[i + 1].apps_cnt - 1) {
- fprintf(output, "}\n");
- }
- else {
- fprintf(output, "},\n");
+ if (uwsgi_stats_object_close(us)) goto end0;
+
+ if (j < uwsgi.workers[i + 1].apps_cnt - 1) {
+ if (uwsgi_stats_comma(us)) goto end0;
}
}
- fprintf(output, "\t\t]");
- if (i == uwsgi.numproc - 1) {
- fprintf(output, "}\n");
- }
- else {
- fprintf(output, "},\n");
+ if (uwsgi_stats_list_close(us)) goto end0;
+
+ if (uwsgi_stats_object_close(us)) goto end0;
+
+ if (i < uwsgi.numproc - 1) {
+ if (uwsgi_stats_comma(us)) goto end0;
}
}
- fprintf(output, "]}\n");
- fclose(output);
+ if (uwsgi_stats_list_close(us)) goto end0;
+ if (uwsgi_stats_object_close(us)) goto end0;
+
+ size_t remains = us->pos;
+ off_t pos = 0;
+ while(remains > 0) {
+ ssize_t res = write(client_fd, us->base + pos, remains);
+ if (res <= 0) {
+ if (res < 0) {
+ uwsgi_error("write()");
+ }
+ goto end0;
+ }
+ pos += res;
+ remains -= res;
+ }
+
+end0:
+ free(cwd);
+end:
+ free(us->base);
+ free(us);
+ close(client_fd);
}
void uwsgi_register_cheaper_algo(char *name, int(*func) (void)) {
View
857 plugins/corerouter/corerouter.c
@@ -0,0 +1,857 @@
+/*
+
+ uWSGI fastrouter
+
+ requires:
+
+ - async
+ - caching
+ - pcre (optional)
+
+*/
+
+#include "../../uwsgi.h"
+
+extern struct uwsgi_server uwsgi;
+
+#include "cr.h"
+
+void uwsgi_opt_corerouter(char *opt, char *value, void *cr) {
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+ uwsgi_new_gateway_socket(value, ucr->name);
+ ucr->has_sockets++;
+}
+
+void uwsgi_opt_corerouter_use_socket(char *opt, char *value, void *cr) {
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+ ucr->use_socket = 1;
+ ucr->has_backends++;
+
+ if (value) {
+ ucr->socket_num = atoi(value);
+ }
+}
+
+void uwsgi_opt_corerouter_use_base(char *opt, char *value, void *cr) {
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+ ucr->base = value;
+ ucr->base_len = strlen(ucr->base);
+ ucr->has_backends++;
+}
+
+void uwsgi_opt_corerouter_use_pattern(char *opt, char *value, void *cr) {
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+ ucr->pattern = value;
+ ucr->pattern_len = strlen(ucr->pattern);
+ ucr->has_backends++;
+}
+
+
+void uwsgi_opt_corerouter_zerg(char *opt, char *value, void *cr) {
+
+ int j;
+ int count = 8;
+ struct uwsgi_gateway_socket *ugs;
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+
+ int zerg_fd = uwsgi_connect(value, 30, 0);
+ if (zerg_fd < 0) {
+ uwsgi_log("--- unable to connect to zerg server ---\n");
+ exit(1);
+ }
+
+ int last_count = count;
+ int *zerg = uwsgi_attach_fd(zerg_fd, &count, "uwsgi-zerg", 10);
+ if (zerg == NULL) {
+ if (last_count != count) {
+ close(zerg_fd);
+ zerg_fd = uwsgi_connect(value, 30, 0);
+ if (zerg_fd < 0) {
+ uwsgi_log("--- unable to connect to zerg server ---\n");
+ exit(1);
+ }
+ zerg = uwsgi_attach_fd(zerg_fd, &count, "uwsgi-zerg", 10);
+ }
+ else {
+ uwsgi_log("--- invalid data received from zerg-server ---\n");
+ exit(1);
+ }
+ }
+
+ if (zerg == NULL) {
+ uwsgi_log("--- invalid data received from zerg-server ---\n");
+ exit(1);
+ }
+
+
+ close(zerg_fd);
+
+ for(j=0;j<count;j++) {
+ if (zerg[j] == -1) break;
+ ugs = uwsgi_new_gateway_socket_from_fd(zerg[j], ucr->name);
+ ugs->zerg = optarg;
+ }
+}
+
+
+void uwsgi_opt_corerouter_cs(char *opt, char *value, void *cr) {
+
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+
+ char *cs = uwsgi_str(value);
+ char *cs_code = strchr(cs, ':');
+ if (!cs_code) {
+ uwsgi_log("invalid code_string option\n");
+ exit(1);
+ }
+ cs_code[0] = 0;
+ char *cs_func = strchr(cs_code + 1, ':');
+ if (!cs_func) {
+ uwsgi_log("invalid code_string option\n");
+ exit(1);
+ }
+ cs_func[0] = 0;
+ ucr->code_string_modifier1 = atoi(cs);
+ ucr->code_string_code = cs_code + 1;
+ ucr->code_string_function = cs_func + 1;
+
+ ucr->has_backends++;
+
+}
+
+void uwsgi_opt_corerouter_ss(char *opt, char *value, void *cr) {
+
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) cr;
+ struct uwsgi_gateway_socket *ugs = uwsgi_new_gateway_socket(value, ucr->name);
+ ugs->subscription = 1;
+ ucr->has_subscription_sockets++;
+
+ ucr->has_backends++;
+
+}
+
+
+void corerouter_send_stats(struct uwsgi_corerouter *);
+
+void corerouter_manage_subscription(char *key, uint16_t keylen, char *val, uint16_t vallen, void *data) {
+
+ struct uwsgi_subscribe_req *usr = (struct uwsgi_subscribe_req *) data;
+
+ if (!uwsgi_strncmp("key", 3, key, keylen)) {
+ usr->key = val;
+ usr->keylen = vallen;
+ }
+ else if (!uwsgi_strncmp("address", 7, key, keylen)) {
+ usr->address = val;
+ usr->address_len = vallen;
+ }
+ else if (!uwsgi_strncmp("modifier1", 9, key, keylen)) {
+ usr->modifier1 = uwsgi_str_num(val, vallen);
+ }
+ else if (!uwsgi_strncmp("cores", 5, key, keylen)) {
+ usr->cores = uwsgi_str_num(val, vallen);
+ }
+ else if (!uwsgi_strncmp("load", 4, key, keylen)) {
+ usr->load = uwsgi_str_num(val, vallen);
+ }
+ else if (!uwsgi_strncmp("weight", 5, key, keylen)) {
+ usr->weight = uwsgi_str_num(val, vallen);
+ }
+}
+
+static struct uwsgi_rb_timer *corerouter_reset_timeout(struct uwsgi_corerouter *, struct corerouter_session *);
+
+void corerouter_close_session(struct uwsgi_corerouter *ucr, struct corerouter_session *cr_session) {
+
+
+ if (cr_session->instance_fd != -1) {
+#ifdef UWSGI_SCTP
+ if (!ucr->cr_table[cr_session->instance_fd]->persistent) {
+#endif
+ close(cr_session->instance_fd);
+ ucr->cr_table[cr_session->instance_fd] = NULL;
+#ifdef UWSGI_SCTP
+ }
+#endif
+ }
+
+ if (ucr->subscriptions && cr_session->un && cr_session->un->len > 0) {
+ // decrease reference count
+#ifdef UWSGI_DEBUG
+ uwsgi_log("[1] node %.*s refcnt: %llu\n", cr_session->un->len, cr_session->un->name, cr_session->un->reference);
+#endif
+ cr_session->un->reference--;
+#ifdef UWSGI_DEBUG
+ uwsgi_log("[2] node %.*s refcnt: %llu\n", cr_session->un->len, cr_session->un->name, cr_session->un->reference);
+#endif
+ }
+
+
+ if (cr_session->instance_failed) {
+
+ if (cr_session->soopt) {
+ if (!ucr->quiet)
+ uwsgi_log("unable to connect() to uwsgi instance \"%.*s\": %s\n", (int) cr_session->instance_address_len, cr_session->instance_address, strerror(cr_session->soopt));
+ }
+ else if (cr_session->timed_out) {
+ if (cr_session->instance_address_len > 0) {
+ if (cr_session->status == COREROUTER_STATUS_CONNECTING) {
+ if (!ucr->quiet)
+ uwsgi_log("unable to connect() to uwsgi instance \"%.*s\": timeout\n", (int) cr_session->instance_address_len, cr_session->instance_address);
+ }
+ else if (cr_session->status == COREROUTER_STATUS_RESPONSE) {
+ uwsgi_log("timeout waiting for instance \"%.*s\"\n", (int) cr_session->instance_address_len, cr_session->instance_address);
+ }
+ }
+ }
+
+ // now check for dead nodes
+ if (ucr->subscriptions && cr_session->un && cr_session->un->len > 0) {
+
+ if (cr_session->un->death_mark == 0)
+ uwsgi_log("[uwsgi-fastrouter] %.*s => marking %.*s as failed\n", (int) cr_session->hostname_len, cr_session->hostname, (int) cr_session->instance_address_len, cr_session->instance_address);
+
+ cr_session->un->failcnt++;
+ cr_session->un->death_mark = 1;
+ // check if i can remove the node
+ if (cr_session->un->reference == 0) {
+ uwsgi_remove_subscribe_node(&ucr->subscriptions, cr_session->un);
+ }
+ if (ucr->subscriptions == NULL && ucr->cheap && !ucr->i_am_cheap && !ucr->fallback) {
+ uwsgi_gateway_go_cheap("uWSGI fastrouter", ucr->queue, &ucr->i_am_cheap);
+ }
+
+ }
+ else if (cr_session->static_node) {
+ cr_session->static_node->custom = uwsgi_now();
+ uwsgi_log("[uwsgi-fastrouter] %.*s => marking %.*s as failed\n", (int) cr_session->hostname_len, cr_session->hostname, (int) cr_session->instance_address_len, cr_session->instance_address);
+ }
+
+
+ if (cr_session->tmp_socket_name) {
+ free(cr_session->tmp_socket_name);
+ cr_session->tmp_socket_name = NULL;
+ }
+
+ if (ucr->fallback) {
+ // ok let's try with the fallback nodes
+ if (!cr_session->fallback) {
+ cr_session->fallback = ucr->fallback;
+ }
+ else {
+ cr_session->fallback = cr_session->fallback->next;
+ if (!cr_session->fallback) goto end;
+ }
+
+ cr_session->instance_address = cr_session->fallback->value;
+ cr_session->instance_address_len = cr_session->fallback->len;
+
+ // reset error and timeout
+ cr_session->timeout = corerouter_reset_timeout(ucr, cr_session);
+ cr_session->timed_out = 0;
+ cr_session->soopt = 0;
+
+ // reset nodes
+ cr_session->un = NULL;
+ cr_session->static_node = NULL;
+
+ cr_session->pass_fd = is_unix(cr_session->instance_address, cr_session->instance_address_len);
+
+
+ cr_session->instance_fd = uwsgi_connectn(cr_session->instance_address, cr_session->instance_address_len, 0, 1);
+
+ if (cr_session->instance_fd < 0) {
+ cr_session->instance_failed = 1;
+ cr_session->soopt = errno;
+ corerouter_close_session(ucr, cr_session);
+ return;
+ }
+
+ ucr->cr_table[cr_session->instance_fd] = cr_session;
+
+ cr_session->status = COREROUTER_STATUS_CONNECTING;
+ ucr->cr_table[cr_session->instance_fd] = cr_session;
+ event_queue_add_fd_write(ucr->queue, cr_session->instance_fd);
+ return;
+
+ }
+ }
+
+end:
+
+ if (cr_session->tmp_socket_name) {
+ free(cr_session->tmp_socket_name);
+ }
+
+ if (cr_session->buf_file)
+ fclose(cr_session->buf_file);
+
+ if (cr_session->buf_file_name) {
+ if (unlink(cr_session->buf_file_name)) {
+ uwsgi_error("unlink()");
+ }
+ free(cr_session->buf_file_name);
+ }
+
+ // could be used to free additional resources
+ if (cr_session->close)
+ cr_session->close(ucr, cr_session);
+
+ close(cr_session->fd);
+ ucr->cr_table[cr_session->fd] = NULL;
+
+ cr_del_timeout(ucr, cr_session);
+ free(cr_session);
+}
+
+static struct uwsgi_rb_timer *corerouter_reset_timeout(struct uwsgi_corerouter *ucr, struct corerouter_session *cr_session) {
+ cr_del_timeout(ucr, cr_session);
+ return cr_add_timeout(ucr, cr_session);
+}
+
+static void corerouter_expire_timeouts(struct uwsgi_corerouter *ucr) {
+
+ time_t current = time(NULL);
+ struct uwsgi_rb_timer *urbt;
+ struct corerouter_session *cr_session;
+
+ for (;;) {
+ urbt = uwsgi_min_rb_timer(ucr->timeouts);
+ if (urbt == NULL)
+ return;
+
+ if (urbt->key <= current) {
+ cr_session = (struct corerouter_session *) urbt->data;
+ cr_session->timed_out = 1;
+ if (cr_session->retry) {
+ cr_session->retry = 0;
+ ucr->switch_events(ucr, cr_session, -1);
+ if (cr_session->retry) {
+ cr_del_timeout(ucr, cr_session);
+ cr_session->timeout = cr_add_fake_timeout(ucr, cr_session);
+ }
+ else {
+ cr_session->timeout = corerouter_reset_timeout(ucr, cr_session);
+ }
+ }
+ else {
+ corerouter_close_session(ucr, cr_session);
+ }
+ continue;
+ }
+
+ break;
+ }
+}
+
+struct corerouter_session *corerouter_alloc_session(struct uwsgi_corerouter *ucr, struct uwsgi_gateway_socket *ugs, int new_connection, struct sockaddr *cr_addr, socklen_t cr_addr_len) {
+
+ ucr->cr_table[new_connection] = uwsgi_calloc(ucr->session_size);
+ ucr->cr_table[new_connection]->fd = new_connection;
+ ucr->cr_table[new_connection]->instance_fd = -1;
+ ucr->cr_table[new_connection]->status = COREROUTER_STATUS_RECV_HDR;
+
+ ucr->cr_table[new_connection]->timeout = cr_add_timeout(ucr, ucr->cr_table[new_connection]);
+ ucr->cr_table[new_connection]->ugs = ugs;
+
+ ucr->alloc_session(ucr, ugs, ucr->cr_table[new_connection], cr_addr, cr_addr_len);
+ event_queue_add_fd_read(ucr->queue, new_connection);
+
+ return ucr->cr_table[new_connection];
+}
+
+void uwsgi_corerouter_loop(int id, void *data) {
+
+ int i;
+
+ struct uwsgi_corerouter *ucr = (struct uwsgi_corerouter *) data;
+
+ ucr->cr_stats_server = -1;
+
+ ucr->cr_table = uwsgi_malloc(sizeof(struct corerouter_session *) * uwsgi.max_fd);
+
+ for (i = 0; i < (int) uwsgi.max_fd; i++) {
+ ucr->cr_table[i] = NULL;
+ }
+
+ ucr->i_am_cheap = ucr->cheap;
+
+ void *events = uwsgi_corerouter_setup_event_queue(ucr, id);
+
+ if (ucr->has_subscription_sockets)
+ event_queue_add_fd_read(ucr->queue, ushared->gateways[id].internal_subscription_pipe[1]);
+
+
+ if (!ucr->socket_timeout)
+ ucr->socket_timeout = 30;
+
+ if (!ucr->static_node_gracetime)
+ ucr->static_node_gracetime = 30;
+
+ int i_am_the_first = 1;
+ for(i=0;i<id;i++) {
+ if (!strcmp(ushared->gateways[i].name, ucr->name)) {
+ i_am_the_first = 0;
+ break;
+ }
+ }
+
+ if (ucr->stats_server && i_am_the_first) {
+ char *tcp_port = strchr(ucr->stats_server, ':');
+ if (tcp_port) {
+ // disable deferred accept for this socket
+ int current_defer_accept = uwsgi.no_defer_accept;
+ uwsgi.no_defer_accept = 1;
+ ucr->cr_stats_server = bind_to_tcp(ucr->stats_server, uwsgi.listen_queue, tcp_port);
+ uwsgi.no_defer_accept = current_defer_accept;
+ }
+ else {
+ ucr->cr_stats_server = bind_to_unix(ucr->stats_server, uwsgi.listen_queue, uwsgi.chmod_socket, uwsgi.abstract_socket);
+ }
+
+ event_queue_add_fd_read(ucr->queue, ucr->cr_stats_server);
+ uwsgi_log("*** FastRouter stats server enabled on %s fd: %d ***\n", ucr->stats_server, ucr->cr_stats_server);
+ }
+
+
+ if (ucr->use_socket) {
+ ucr->to_socket = uwsgi_get_socket_by_num(ucr->socket_num);
+ if (ucr->to_socket) {
+ // fix socket name_len
+ if (ucr->to_socket->name_len == 0 && ucr->to_socket->name) {
+ ucr->to_socket->name_len = strlen(ucr->to_socket->name);
+ }
+ }
+ }
+
+ if (!ucr->pb_base_dir) {
+ ucr->pb_base_dir = getenv("TMPDIR");
+ if (!ucr->pb_base_dir)
+ ucr->pb_base_dir = "/tmp";
+ }
+
+ int nevents;
+
+ time_t delta;
+
+ struct uwsgi_rb_timer *min_timeout;
+
+ int interesting_fd;
+ int new_connection;
+
+
+ if (ucr->pattern) {
+ init_magic_table(ucr->magic_table);
+ }
+
+#ifdef UWSGI_SCTP
+ uwsgi_fastrouter_sctp_nodes = uwsgi_calloc(sizeof(struct uwsgi_fastrouter_sctp_nodes*));
+ uwsgi_fastrouter_sctp_nodes_current = uwsgi_calloc(sizeof(struct uwsgi_fastrouter_sctp_nodes*));
+#endif
+
+ union uwsgi_sockaddr cr_addr;
+ socklen_t cr_addr_len = sizeof(struct sockaddr_un);
+
+ struct corerouter_session *cr_session;
+
+ ucr->mapper = uwsgi_cr_map_use_void;
+
+ if (ucr->use_cache) {
+ ucr->mapper = uwsgi_cr_map_use_cache;
+ }
+ else if (ucr->pattern) {
+ ucr->mapper = uwsgi_cr_map_use_pattern;
+ }
+ else if (ucr->has_subscription_sockets) {
+ ucr->mapper = uwsgi_cr_map_use_subscription;
+ }
+ else if (ucr->base) {
+ ucr->mapper = uwsgi_cr_map_use_base;
+ }
+ else if (ucr->code_string_code && ucr->code_string_function) {
+ ucr->mapper = uwsgi_cr_map_use_cs;
+ }
+ else if (ucr->to_socket) {
+ ucr->mapper = uwsgi_cr_map_use_to;
+ }
+ else if (ucr->static_nodes) {
+ ucr->mapper = uwsgi_cr_map_use_static_nodes;
+ }
+ else if (ucr->use_cluster) {
+ ucr->mapper = uwsgi_cr_map_use_cluster;
+ }
+#ifdef UWSGI_SCTP
+ else if (ucr->has_sctp_sockets > 0) {
+ ucr->mapper = uwsgi_cr_map_use_sctp;
+ }
+#endif
+
+
+
+ ucr->timeouts = uwsgi_init_rb_timer();
+
+ for (;;) {
+
+ min_timeout = uwsgi_min_rb_timer(ucr->timeouts);
+ if (min_timeout == NULL) {
+ delta = -1;
+ }
+ else {
+ delta = min_timeout->key - time(NULL);
+ if (delta <= 0) {
+ corerouter_expire_timeouts(ucr);
+ delta = 0;
+ }
+ }
+
+ if (uwsgi.master_process && ucr->harakiri > 0) {
+ ushared->gateways_harakiri[id] = 0;
+ }
+
+ nevents = event_queue_wait_multi(ucr->queue, delta, events, ucr->nevents);
+
+ if (uwsgi.master_process && ucr->harakiri > 0) {
+ ushared->gateways_harakiri[id] = time(NULL) + ucr->harakiri;
+ }
+
+ if (nevents == 0) {
+ corerouter_expire_timeouts(ucr);
+ }
+
+ for (i = 0; i < nevents; i++) {
+
+ interesting_fd = event_queue_interesting_fd(events, i);
+
+ struct uwsgi_gateway_socket *ugs = uwsgi.gateway_sockets;
+ int taken = 0;
+ while (ugs) {
+ if (ugs->gateway == &ushared->gateways[id] && interesting_fd == ugs->fd) {
+#ifdef UWSGI_SCTP
+ if (!ugs->subscription && !ugs->sctp) {
+#else
+ if (!ugs->subscription) {
+#endif
+
+ new_connection = accept(interesting_fd, (struct sockaddr *) &cr_addr, &cr_addr_len);
+#ifdef UWSGI_EVENT_USE_PORT
+ event_queue_add_fd_read(ucr->queue, interesting_fd);
+#endif
+ if (new_connection < 0) {
+ taken = 1;
+ break;
+ }
+
+ // set socket blocking mode, on non-linux platforms, clients get the server mode
+#ifndef __linux__
+ if (!ugs->nb) {
+ uwsgi_socket_b(new_connection);
+ }
+#else
+ if (ugs->nb) {
+ uwsgi_socket_nb(new_connection);
+ }
+#endif
+
+ corerouter_alloc_session(ucr, ugs, new_connection, (struct sockaddr *) &cr_addr, cr_addr_len);
+ }
+ else if (ugs->subscription) {
+ uwsgi_corerouter_manage_subscription(ucr, id, ugs);
+ }
+#ifdef UWSGI_SCTP
+ else if (ugs->sctp) {
+ new_connection = accept(interesting_fd, (struct sockaddr *) &cr_addr, &cr_addr_len);
+#ifdef UWSGI_EVENT_USE_PORT
+ event_queue_add_fd_read(ucr->queue, interesting_fd);
+#endif
+ if (new_connection < 0) {
+ taken = 1;
+ break;
+ }
+ struct uwsgi_fr_sctp_node *sctp_node = uwsgi_fr_sctp_add_node(new_connection);
+ snprintf(sctp_node->name, 64, "%s:%d", inet_ntoa(((struct sockaddr_in *)&cr_addr)->sin_addr), ntohs(((struct sockaddr_in *) &cr_addr)->sin_port));
+ uwsgi_log("new SCTP peer: %s:%d\n", inet_ntoa(((struct sockaddr_in *)&cr_addr)->sin_addr), ntohs(((struct sockaddr_in *) &cr_addr)->sin_port));
+
+ ucr->cr_table[new_connection] = alloc_cr_session();
+ ucr->cr_table[new_connection]->instance_fd = new_connection;
+ ucr->cr_table[new_connection]->fd = -1;
+ ucr->cr_table[new_connection]->persistent = 1;
+ ucr->cr_table[new_connection]->status = FASTROUTER_STATUS_SCTP_NODE_FREE;
+
+ struct sctp_event_subscribe events;
+ memset(&events, 0, sizeof(events) );
+ events.sctp_data_io_event = 1;
+ // check for errors
+ setsockopt(new_connection, SOL_SCTP, SCTP_EVENTS, &events, sizeof(events) );
+
+ event_queue_add_fd_read(ucr->queue, new_connection);
+ }
+#endif
+
+ taken = 1;
+ break;
+ }
+
+
+ ugs = ugs->next;
+ }
+
+ if (taken) {
+ continue;
+ }
+
+ if (interesting_fd == ushared->gateways[id].internal_subscription_pipe[1]) {
+ uwsgi_corerouter_manage_internal_subscription(ucr, interesting_fd);
+ }
+ else if (interesting_fd == ucr->cr_stats_server) {
+ corerouter_send_stats(ucr);
+ }
+ else {
+ cr_session = ucr->cr_table[interesting_fd];
+
+ // something is going wrong...
+ if (cr_session == NULL)
+ continue;
+
+ if (event_queue_interesting_fd_has_error(events, i)) {
+#ifdef UWSGI_SCTP
+ if (!cr_session->persistent) {
+#endif
+ corerouter_close_session(ucr, cr_session);
+ continue;
+#ifdef UWSGI_SCTP
+ }
+#endif
+ }
+
+#ifdef UWSGI_SCTP
+ if (!cr_session->persistent) {
+#endif
+ cr_session->timeout = corerouter_reset_timeout(ucr, cr_session);
+#ifdef UWSGI_SCTP
+ }
+#endif
+
+ // mplementation specific cycle;
+ ucr->switch_events(ucr, cr_session, interesting_fd);
+
+
+ }
+ }
+ }
+
+}
+
+int uwsgi_courerouter_has_has_backends(struct uwsgi_corerouter *ucr) {
+
+ if (ucr->has_backends) return 1;
+
+ // check if the router has configured backends
+ if (ucr->use_cache ||
+ ucr->pattern ||
+ ucr->has_subscription_sockets ||
+ ucr->base ||
+ (ucr->code_string_code && ucr->code_string_function) ||
+ ucr->to_socket ||
+ ucr->static_nodes ||
+ ucr->use_cluster
+#ifdef UWSGI_SCTP
+ || ucr->has_sctp_sockets
+#endif
+ ) {