Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #120 from haizaar/master

Fixes to re-connect logic
commit d209d000e20ae2d4ce9d813d7dbbc3fdfd435e69 2 parents 42562db + 101f312
@haizaar haizaar authored
View
7 .travis.yml
@@ -9,15 +9,20 @@ python:
- pypy
addons:
- postgresql: "9.2"
+ postgresql: "9.4"
before_script:
- psql -c 'CREATE DATABASE momoko_test;' -U postgres
- psql -U postgres momoko_test -c 'CREATE EXTENSION IF NOT EXISTS hstore'
+ - make -C tcproxy
+ - env
env:
global:
- MOMOKO_TEST_HSTORE=1
+ - MOMOKO_TEST_JSON=1
+ - MOMOKO_TEST_HOST=127.0.0.1
+ - PGHOST=127.0.0.1
matrix:
- MOMOKO_PSYCOPG2_IMPL=psycopg2
- MOMOKO_PSYCOPG2_IMPL=psycopg2cffi
View
1  README.rst
@@ -40,6 +40,7 @@ Testing
Set the following environment variables with your own values before running the
unit tests::
+ make -C tcpproxy
export MOMOKO_TEST_DB='your_db'
export MOMOKO_TEST_USER='your_user'
export MOMOKO_TEST_PASSWORD='your_password'
View
10 docs/installation.rst
@@ -22,7 +22,7 @@ can also be used by setting the ``MOMOKO_PSYCOPG2_IMPL`` environment variable to
# 'psycopg2' or 'psycopg2cffi'
export MOMOKO_PSYCOPG2_IMPL='psycopg2cffi'
-The unit tests als use this variable. It needs to be set if something else is used
+The unit tests all use this variable. It needs to be set if something else is used
instead of Psycopg2 when running the unit tests. Besides ``MOMOKO_PSYCOPG2_IMPL``
there are also other variables that need to be set for the unit tests.
@@ -37,11 +37,17 @@ Here's an example for the environment variables::
# Set to '0' if hstore extension isn't enabled
export MOMOKO_TEST_HSTORE='1' # Default: 0
-And running the tests is easy::
+Momoko tests use tcproxy_ for simulating Postgres server unavailablity. The copy
+of tcproxy is bundled with Momoko, but you need to build it first::
+
+ make -C tcproxy
+
+Finally, running the tests is easy::
python setup.py test
+.. _tcproxy: https://github.com/dccmx/tcproxy
.. _psycopg2cffi: http://pypi.python.org/pypi/psycopg2cffi
.. _Tornado: http://www.tornadoweb.org/
.. _Psycopg2: http://initd.org/psycopg/
View
82 momoko/connection.py
@@ -173,7 +173,7 @@ class Pool(object):
:param list setsession:
List of intial sql commands to be executed once connection is established.
- If any of the commands failes, the connection will be closed.
+ If any of the commands fails, the connection will be closed.
**NOTE:** The commands will be executed as one transaction block.
:param bool auto_shrink:
@@ -195,6 +195,10 @@ class Pool(object):
.. _psycopg2.extensions.connection: http://initd.org/psycopg/docs/connection.html#connection
.. _Connection and cursor factories: http://initd.org/psycopg/docs/advanced.html#subclassing-cursor
"""
+
+ class DatabaseNotAvailable(psycopg2.DatabaseError):
+ """Raised when Pool can not connect to database server"""
+
def __init__(self,
dsn,
connection_factory=None,
@@ -232,7 +236,7 @@ def __init__(self,
self.conns = ConnectionContainer()
self._last_connect_time = 0
- self._no_conn_availble_error = psycopg2.DatabaseError("No database connection available")
+ self._no_conn_availble_error = self.DatabaseNotAvailable("No database connection available")
self.shrink_period = shrink_period
self.shrink_delay = shrink_delay
self.auto_shrink = auto_shrink
@@ -440,7 +444,7 @@ def when_available(fut):
conn = fut.result()
except psycopg2.Error as error:
future.set_exc_info(sys.exc_info())
- if retry:
+ if retry and not keep:
self.putconn(retry[0])
return
@@ -448,28 +452,27 @@ def when_available(fut):
try:
future_or_result = method(conn, *args, **kwargs)
except psycopg2.Error as error:
- if conn.closed:
- if not retry:
- retry.append(conn)
- self.ioloop.add_future(conn.connect(), when_available)
- return
- else:
- future.set_exception(self._no_conn_availble_error)
- else:
- future.set_exc_info(sys.exc_info())
- log.debug(2)
- self.putconn(conn)
- return
+ log.debug("Method failed synchronously")
+ return self._retry(retry, when_available, conn, keep, future)
if not async:
+ if not keep:
+ self.putconn(conn)
future.set_result(future_or_result)
- log.debug(3)
- self.putconn(conn)
return
- chain_future(future_or_result, future)
- if not keep:
- future.add_done_callback(lambda f: self.putconn(conn))
+ def when_done(rfut):
+ try:
+ result = rfut.result()
+ except psycopg2.Error as error:
+ log.debug("Method failed Asynchronously")
+ return self._retry(retry, when_available, conn, keep, future)
+
+ if not keep:
+ self.putconn(conn)
+ future.set_result(result)
+
+ self.ioloop.add_future(future_or_result, when_done)
if not connection:
self.ioloop.add_future(self.getconn(ping=False), when_available)
@@ -479,6 +482,20 @@ def when_available(fut):
when_available(f)
return future
+ def _retry(self, retry, what, conn, keep, future):
+ if conn.closed:
+ if not retry:
+ retry.append(conn)
+ self.ioloop.add_future(conn.connect(), what)
+ return
+ else:
+ future.set_exception(self._no_conn_availble_error)
+ else:
+ future.set_exc_info(sys.exc_info())
+ if not keep:
+ self.putconn(conn)
+ return
+
def _reanimate(self):
assert self.conns.dead, "BUG: don't call reanimate when there is no one to reanimate"
@@ -556,7 +573,10 @@ def on_ping_done(ping_fut):
try:
ping_fut.result()
except psycopg2.Error as error:
- ping_future.set_exc_info(error)
+ if conn.closed:
+ ping_future.set_exception(self._no_conn_availble_error)
+ else:
+ ping_future.set_exc_info(sys.exc_info())
self.putconn(conn)
else:
ping_future.set_result(conn)
@@ -598,7 +618,7 @@ class Connection(object):
:param list setsession:
List of intial sql commands to be executed once connection is established.
- If any of the commands failes, the connection will be closed.
+ If any of the commands fails, the connection will be closed.
**NOTE:** The commands will be executed as one transaction block.
.. _Data Source Name: http://en.wikipedia.org/wiki/Data_Source_Name
@@ -636,6 +656,7 @@ def connect(self):
try:
self.connection = psycopg2.connect(self.dsn, **kwargs)
except psycopg2.Error as error:
+ self.connection = None
future.set_exc_info(sys.exc_info())
return future
@@ -654,6 +675,7 @@ def on_connect(on_connect_future):
self.ioloop.add_handler(self.fileno, callback, IOLoop.WRITE)
self.ioloop.add_future(future, self._set_server_version)
+ self.ioloop.add_future(future, self._close_on_fail)
return future
@@ -662,6 +684,12 @@ def _set_server_version(self, future):
return
self.server_version = self.connection.server_version
+ def _close_on_fail(self, future):
+ # If connection attempt evetually fails - marks connection as closed by ourselves
+ # since psycopg2 does not do that for us (on connection attempts)
+ if future.exception():
+ self.connection = None
+
def _io_callback(self, future, result, fd=None, events=None):
try:
state = self.connection.poll()
@@ -677,7 +705,7 @@ def _io_callback(self, future, result, fd=None, events=None):
elif state == POLL_WRITE:
self.ioloop.update_handler(self.fileno, IOLoop.WRITE)
else:
- future.set_exception(psycopg2.OperationalError('poll() returned {0}'.format(state)))
+ future.set_exception(psycopg2.OperationalError("poll() returned %s" % state))
def ping(self):
"""
@@ -797,7 +825,7 @@ def transaction(self,
The class returned must be a subclass of `psycopg2.extensions.cursor`_.
See `Connection and cursor factories`_ for details. Defaults to ``None``.
:param bool auto_rollback:
- If one of the transaction statements failes, try to automatically
+ If one of the transaction statements fails, try to automatically
execute ROLLBACK to abort the transaction. If ROLLBACK fails, it would
not be raised, but only logged.
@@ -818,10 +846,10 @@ def exec_statements(future):
cursor = future.result()
cursors.append(cursor)
except Exception as error:
- if not auto_rollback:
- transaction_future.set_exc_info(sys.exc_info())
- else:
+ if auto_rollback and not self.closed:
self._rollback(transaction_future, error)
+ else:
+ transaction_future.set_exc_info(sys.exc_info())
return
try:
View
1  setup.py
@@ -44,7 +44,6 @@
license='MIT',
test_suite='tests',
install_requires=dependencies,
- test_require=dependencies + ["unittest2"],
classifiers = [
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
View
9 tcproxy/.gitignore
@@ -0,0 +1,9 @@
+*.o
+*.DS_Store
+*.log
+cachegrind.out.*
+callgrind.out.*
+massif.out.*
+memcheck.out
+tcproxy
+core
View
18 tcproxy/ChangeLog
@@ -0,0 +1,18 @@
+2011-06-2 Release 0.2.2
+ * fix a bug that causes infinit loop when process write
+
+2011-05-21 Release 0.2.1
+ * fix ip address parse bug
+
+2011-05-20 Release 0.2
+ * nonblock connect implemented
+ * semicolon bind to address not port now
+ * some performance optimization
+ * some bugfix
+
+2011-05-18 Release 0.1.1
+ * -l option added to write log file
+ * minor code refinement
+
+2011-05-16 Release 0.1
+ * first public beta release
View
19 tcproxy/LICENSE
@@ -0,0 +1,19 @@
+Copyright dccmx. All rights reserved.
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to
+deal in the Software without restriction, including without limitation the
+rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+sell copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in
+all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
+IN THE SOFTWARE.
View
10 tcproxy/Makefile
@@ -0,0 +1,10 @@
+#
+# tcproxy - Makefile
+#
+# Author: dccmx <dccmx@dccmx.com>
+#
+
+default: all
+
+.DEFAULT:
+ cd src && $(MAKE) $@
View
15 tcproxy/README.md
@@ -0,0 +1,15 @@
+**Copied from https://github.com/dccmx/tcproxy.git. Used for connection issues simulation.**
+
+tcproxy
+=======
+tcproxy is a small efficient tcp proxy that can be used for port forwarding or load balancing.
+
+sample usage
+------------
+ tcproxy "11212 -> 11211"
+ tcproxy "192.168.0.1:11212 -> 192.168.0.2:11211"
+
+not implemented yet
+---------------
+ tcproxy "any:11212 -> rr{192.168.0.100:11211 192.168.0.101:11211 192.168.0.102:11211}"
+ tcproxy "any:11212 -> hash{192.168.0.100:11211 192.168.0.101:11211 192.168.0.102:11211}"
View
10 tcproxy/TODO
@@ -0,0 +1,10 @@
+failover
+tcp pool
+thread
+
+tcproxy "any:11212\
+<-> rr{\
+192.168.0.100:11211 -> 192.168.0.100:11212\
+192.168.0.101:11211\
+192.168.0.102:11211}\
+-> 192.168.0.103:11211"
View
37 tcproxy/src/Makefile
@@ -0,0 +1,37 @@
+PROGNAME = tcproxy
+
+OBJS = tcproxy.o ae.o util.o policy.o zmalloc.o anet.o
+
+CFLAGS_GEN = -Wall -Werror -g $(CFLAGS)
+CFLAGS_DBG = -ggdb $(CFLAGS_GEN)
+CFLAGS_OPT = -O3 -Wno-format $(CFLAGS_GEN)
+DEBUG ?=
+
+CCCOLOR="\033[34m"
+LINKCOLOR="\033[34;1m"
+SRCCOLOR="\033[33m"
+BINCOLOR="\033[37;1m"
+MAKECOLOR="\033[32;1m"
+ENDCOLOR="\033[0m"
+
+QUIET_CC = @printf ' %b %b\n' $(CCCOLOR)CC$(ENDCOLOR) $(SRCCOLOR)$@$(ENDCOLOR);
+QUIET_LINK = @printf ' %b %b\n' $(LINKCOLOR)LINK$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR);
+
+LDFLAGS +=
+LIBS +=
+
+all: $(PROGNAME)
+
+%.o: %.c
+ $(QUIET_CC)$(CC) -c $(CFLAGS) $(CFLAGS_OPT) $(DEBUG) $(COMPILE_TIME) $<
+
+$(PROGNAME): $(OBJS)
+ $(QUIET_LINK)$(CC) -o $(PROGNAME) $(CFLAGS_OPT) $(DEBUG) $(OBJS) $(CCLINK)
+ @echo
+ @echo "Make Complete. Read README for how to use."
+ @echo
+ @echo "Having problems with it? Send complains and bugs to dccmx@dccmx.com"
+ @echo
+
+clean:
+ rm -f $(PROGNAME) core core.[1-9][0-9]* *.o memcheck.out callgrind.out.[1-9][0-9]* massif.out.[1-9][0-9]*
View
405 tcproxy/src/ae.c
@@ -0,0 +1,405 @@
+/* A simple event-driven programming library. Originally I wrote this code
+ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
+ * it in form of a library for easy reuse.
+ *
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <poll.h>
+#include <string.h>
+
+#include "ae.h"
+#include "zmalloc.h"
+#include "config.h"
+
+/* Include the best multiplexing layer supported by this system.
+ * The following should be ordered by performances, descending. */
+#ifdef HAVE_EPOLL
+#include "ae_epoll.c"
+#else
+ #ifdef HAVE_KQUEUE
+ #include "ae_kqueue.c"
+ #else
+ #include "ae_select.c"
+ #endif
+#endif
+
+aeEventLoop *aeCreateEventLoop(int setsize) {
+ aeEventLoop *eventLoop;
+ int i;
+
+ if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
+ eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
+ eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
+ if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
+ eventLoop->setsize = setsize;
+ eventLoop->timeEventHead = NULL;
+ eventLoop->timeEventNextId = 0;
+ eventLoop->stop = 0;
+ eventLoop->maxfd = -1;
+ eventLoop->beforesleep = NULL;
+ if (aeApiCreate(eventLoop) == -1) goto err;
+ /* Events with mask == AE_NONE are not set. So let's initialize the
+ * vector with it. */
+ for (i = 0; i < setsize; i++)
+ eventLoop->events[i].mask = AE_NONE;
+ return eventLoop;
+
+err:
+ if (eventLoop) {
+ zfree(eventLoop->events);
+ zfree(eventLoop->fired);
+ zfree(eventLoop);
+ }
+ return NULL;
+}
+
+void aeDeleteEventLoop(aeEventLoop *eventLoop) {
+ aeApiFree(eventLoop);
+ zfree(eventLoop->events);
+ zfree(eventLoop->fired);
+ zfree(eventLoop);
+}
+
+void aeStop(aeEventLoop *eventLoop) {
+ eventLoop->stop = 1;
+}
+
+int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
+ aeFileProc *proc, void *clientData)
+{
+ if (fd >= eventLoop->setsize) return AE_ERR;
+ aeFileEvent *fe = &eventLoop->events[fd];
+
+ if (aeApiAddEvent(eventLoop, fd, mask) == -1)
+ return AE_ERR;
+ fe->mask |= mask;
+ if (mask & AE_READABLE) fe->rfileProc = proc;
+ if (mask & AE_WRITABLE) fe->wfileProc = proc;
+ fe->clientData = clientData;
+ if (fd > eventLoop->maxfd)
+ eventLoop->maxfd = fd;
+ return AE_OK;
+}
+
+void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
+{
+ if (fd >= eventLoop->setsize) return;
+ aeFileEvent *fe = &eventLoop->events[fd];
+
+ if (fe->mask == AE_NONE) return;
+ fe->mask = fe->mask & (~mask);
+ if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
+ /* Update the max fd */
+ int j;
+
+ for (j = eventLoop->maxfd-1; j >= 0; j--)
+ if (eventLoop->events[j].mask != AE_NONE) break;
+ eventLoop->maxfd = j;
+ }
+ aeApiDelEvent(eventLoop, fd, mask);
+}
+
+int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
+ if (fd >= eventLoop->setsize) return 0;
+ aeFileEvent *fe = &eventLoop->events[fd];
+
+ return fe->mask;
+}
+
+static void aeGetTime(long *seconds, long *milliseconds)
+{
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ *seconds = tv.tv_sec;
+ *milliseconds = tv.tv_usec/1000;
+}
+
+static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
+ long cur_sec, cur_ms, when_sec, when_ms;
+
+ aeGetTime(&cur_sec, &cur_ms);
+ when_sec = cur_sec + milliseconds/1000;
+ when_ms = cur_ms + milliseconds%1000;
+ if (when_ms >= 1000) {
+ when_sec ++;
+ when_ms -= 1000;
+ }
+ *sec = when_sec;
+ *ms = when_ms;
+}
+
+long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
+ aeTimeProc *proc, void *clientData,
+ aeEventFinalizerProc *finalizerProc)
+{
+ long long id = eventLoop->timeEventNextId++;
+ aeTimeEvent *te;
+
+ te = zmalloc(sizeof(*te));
+ if (te == NULL) return AE_ERR;
+ te->id = id;
+ aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
+ te->timeProc = proc;
+ te->finalizerProc = finalizerProc;
+ te->clientData = clientData;
+ te->next = eventLoop->timeEventHead;
+ eventLoop->timeEventHead = te;
+ return id;
+}
+
+int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
+{
+ aeTimeEvent *te, *prev = NULL;
+
+ te = eventLoop->timeEventHead;
+ while(te) {
+ if (te->id == id) {
+ if (prev == NULL)
+ eventLoop->timeEventHead = te->next;
+ else
+ prev->next = te->next;
+ if (te->finalizerProc)
+ te->finalizerProc(eventLoop, te->clientData);
+ zfree(te);
+ return AE_OK;
+ }
+ prev = te;
+ te = te->next;
+ }
+ return AE_ERR; /* NO event with the specified ID found */
+}
+
+/* Search the first timer to fire.
+ * This operation is useful to know how many time the select can be
+ * put in sleep without to delay any event.
+ * If there are no timers NULL is returned.
+ *
+ * Note that's O(N) since time events are unsorted.
+ * Possible optimizations (not needed by Redis so far, but...):
+ * 1) Insert the event in order, so that the nearest is just the head.
+ * Much better but still insertion or deletion of timers is O(N).
+ * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
+ */
+static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
+{
+ aeTimeEvent *te = eventLoop->timeEventHead;
+ aeTimeEvent *nearest = NULL;
+
+ while(te) {
+ if (!nearest || te->when_sec < nearest->when_sec ||
+ (te->when_sec == nearest->when_sec &&
+ te->when_ms < nearest->when_ms))
+ nearest = te;
+ te = te->next;
+ }
+ return nearest;
+}
+
+/* Process time events */
+static int processTimeEvents(aeEventLoop *eventLoop) {
+ int processed = 0;
+ aeTimeEvent *te;
+ long long maxId;
+
+ te = eventLoop->timeEventHead;
+ maxId = eventLoop->timeEventNextId-1;
+ while(te) {
+ long now_sec, now_ms;
+ long long id;
+
+ if (te->id > maxId) {
+ te = te->next;
+ continue;
+ }
+ aeGetTime(&now_sec, &now_ms);
+ if (now_sec > te->when_sec ||
+ (now_sec == te->when_sec && now_ms >= te->when_ms))
+ {
+ int retval;
+
+ id = te->id;
+ retval = te->timeProc(eventLoop, id, te->clientData);
+ processed++;
+ /* After an event is processed our time event list may
+ * no longer be the same, so we restart from head.
+ * Still we make sure to don't process events registered
+ * by event handlers itself in order to don't loop forever.
+ * To do so we saved the max ID we want to handle.
+ *
+ * FUTURE OPTIMIZATIONS:
+ * Note that this is NOT great algorithmically. Redis uses
+ * a single time event so it's not a problem but the right
+ * way to do this is to add the new elements on head, and
+ * to flag deleted elements in a special way for later
+ * deletion (putting references to the nodes to delete into
+ * another linked list). */
+ if (retval != AE_NOMORE) {
+ aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
+ } else {
+ aeDeleteTimeEvent(eventLoop, id);
+ }
+ te = eventLoop->timeEventHead;
+ } else {
+ te = te->next;
+ }
+ }
+ return processed;
+}
+
+/* Process every pending time event, then every pending file event
+ * (that may be registered by time event callbacks just processed).
+ * Without special flags the function sleeps until some file event
+ * fires, or when the next time event occurrs (if any).
+ *
+ * If flags is 0, the function does nothing and returns.
+ * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
+ * if flags has AE_FILE_EVENTS set, file events are processed.
+ * if flags has AE_TIME_EVENTS set, time events are processed.
+ * if flags has AE_DONT_WAIT set the function returns ASAP until all
+ * the events that's possible to process without to wait are processed.
+ *
+ * The function returns the number of events processed. */
+int aeProcessEvents(aeEventLoop *eventLoop, int flags)
+{
+ int processed = 0, numevents;
+
+ /* Nothing to do? return ASAP */
+ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
+
+ /* Note that we want call select() even if there are no
+ * file events to process as long as we want to process time
+ * events, in order to sleep until the next time event is ready
+ * to fire. */
+ if (eventLoop->maxfd != -1 ||
+ ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
+ int j;
+ aeTimeEvent *shortest = NULL;
+ struct timeval tv, *tvp;
+
+ if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
+ shortest = aeSearchNearestTimer(eventLoop);
+ if (shortest) {
+ long now_sec, now_ms;
+
+ /* Calculate the time missing for the nearest
+ * timer to fire. */
+ aeGetTime(&now_sec, &now_ms);
+ tvp = &tv;
+ tvp->tv_sec = shortest->when_sec - now_sec;
+ if (shortest->when_ms < now_ms) {
+ tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
+ tvp->tv_sec --;
+ } else {
+ tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
+ }
+ if (tvp->tv_sec < 0) tvp->tv_sec = 0;
+ if (tvp->tv_usec < 0) tvp->tv_usec = 0;
+ } else {
+ /* If we have to check for events but need to return
+ * ASAP because of AE_DONT_WAIT we need to se the timeout
+ * to zero */
+ if (flags & AE_DONT_WAIT) {
+ tv.tv_sec = tv.tv_usec = 0;
+ tvp = &tv;
+ } else {
+ /* Otherwise we can block */
+ tvp = NULL; /* wait forever */
+ }
+ }
+
+ numevents = aeApiPoll(eventLoop, tvp);
+ for (j = 0; j < numevents; j++) {
+ aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
+ int mask = eventLoop->fired[j].mask;
+ int fd = eventLoop->fired[j].fd;
+ int rfired = 0;
+
+ /* note the fe->mask & mask & ... code: maybe an already processed
+ * event removed an element that fired and we still didn't
+ * processed, so we check if the event is still valid. */
+ if (fe->mask & mask & AE_READABLE) {
+ rfired = 1;
+ fe->rfileProc(eventLoop,fd,fe->clientData,mask);
+ }
+ if (fe->mask & mask & AE_WRITABLE) {
+ if (!rfired || fe->wfileProc != fe->rfileProc)
+ fe->wfileProc(eventLoop,fd,fe->clientData,mask);
+ }
+ processed++;
+ }
+ }
+ /* Check time events */
+ if (flags & AE_TIME_EVENTS)
+ processed += processTimeEvents(eventLoop);
+
+ return processed; /* return the number of processed file/time events */
+}
+
+/* Wait for millseconds until the given file descriptor becomes
+ * writable/readable/exception */
+int aeWait(int fd, int mask, long long milliseconds) {
+ struct pollfd pfd;
+ int retmask = 0, retval;
+
+ memset(&pfd, 0, sizeof(pfd));
+ pfd.fd = fd;
+ if (mask & AE_READABLE) pfd.events |= POLLIN;
+ if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
+
+ if ((retval = poll(&pfd, 1, milliseconds))== 1) {
+ if (pfd.revents & POLLIN) retmask |= AE_READABLE;
+ if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
+ return retmask;
+ } else {
+ return retval;
+ }
+}
+
+void aeMain(aeEventLoop *eventLoop) {
+ eventLoop->stop = 0;
+ while (!eventLoop->stop) {
+ if (eventLoop->beforesleep != NULL)
+ eventLoop->beforesleep(eventLoop);
+ aeProcessEvents(eventLoop, AE_ALL_EVENTS);
+ }
+}
+
+char *aeGetApiName(void) {
+ return aeApiName();
+}
+
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
+ eventLoop->beforesleep = beforesleep;
+}
View
117 tcproxy/src/ae.h
@@ -0,0 +1,117 @@
+/* A simple event-driven programming library. Originally I wrote this code
+ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
+ * it in form of a library for easy reuse.
+ *
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __AE_H__
+#define __AE_H__
+
+#define AE_OK 0
+#define AE_ERR -1
+
+#define AE_NONE 0
+#define AE_READABLE 1
+#define AE_WRITABLE 2
+
+#define AE_FILE_EVENTS 1
+#define AE_TIME_EVENTS 2
+#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
+#define AE_DONT_WAIT 4
+
+#define AE_NOMORE -1
+
+/* Macros */
+#define AE_NOTUSED(V) ((void) V)
+
+struct aeEventLoop;
+
+/* Types and data structures */
+typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
+typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
+typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
+typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
+
+/* File event structure */
+typedef struct aeFileEvent {
+ int mask; /* one of AE_(READABLE|WRITABLE) */
+ aeFileProc *rfileProc;
+ aeFileProc *wfileProc;
+ void *clientData;
+} aeFileEvent;
+
+/* Time event structure */
+typedef struct aeTimeEvent {
+ long long id; /* time event identifier. */
+ long when_sec; /* seconds */
+ long when_ms; /* milliseconds */
+ aeTimeProc *timeProc;
+ aeEventFinalizerProc *finalizerProc;
+ void *clientData;
+ struct aeTimeEvent *next;
+} aeTimeEvent;
+
+/* A fired event */
+typedef struct aeFiredEvent {
+ int fd;
+ int mask;
+} aeFiredEvent;
+
+/* State of an event based program */
+typedef struct aeEventLoop {
+ int maxfd; /* highest file descriptor currently registered */
+ int setsize; /* max number of file descriptors tracked */
+ long long timeEventNextId;
+ aeFileEvent *events; /* Registered events */
+ aeFiredEvent *fired; /* Fired events */
+ aeTimeEvent *timeEventHead;
+ int stop;
+ void *apidata; /* This is used for polling API specific data */
+ aeBeforeSleepProc *beforesleep;
+} aeEventLoop;
+
+/* Prototypes */
+aeEventLoop *aeCreateEventLoop(int setsize);
+void aeDeleteEventLoop(aeEventLoop *eventLoop);
+void aeStop(aeEventLoop *eventLoop);
+int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
+ aeFileProc *proc, void *clientData);
+void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
+int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
+long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
+ aeTimeProc *proc, void *clientData,
+ aeEventFinalizerProc *finalizerProc);
+int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
+int aeProcessEvents(aeEventLoop *eventLoop, int flags);
+int aeWait(int fd, int mask, long long milliseconds);
+void aeMain(aeEventLoop *eventLoop);
+char *aeGetApiName(void);
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
+
+#endif
View
101 tcproxy/src/ae_epoll.c
@@ -0,0 +1,101 @@
+/* Linux epoll(2) based ae.c module
+ * Copyright (C) 2009-2010 Salvatore Sanfilippo - antirez@gmail.com
+ * Released under the BSD license. See the COPYING file for more info. */
+
+#include <sys/epoll.h>
+
+typedef struct aeApiState {
+ int epfd;
+ struct epoll_event *events;
+} aeApiState;
+
+static int aeApiCreate(aeEventLoop *eventLoop) {
+ aeApiState *state = zmalloc(sizeof(aeApiState));
+
+ if (!state) return -1;
+ state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
+ if (!state->events) {
+ zfree(state);
+ return -1;
+ }
+ state->epfd = epoll_create(1024); /* 1024 is just an hint for the kernel */
+ if (state->epfd == -1) {
+ zfree(state->events);
+ zfree(state);
+ return -1;
+ }
+ eventLoop->apidata = state;
+ return 0;
+}
+
+static void aeApiFree(aeEventLoop *eventLoop) {
+ aeApiState *state = eventLoop->apidata;
+
+ close(state->epfd);
+ zfree(state->events);
+ zfree(state);
+}
+
+static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
+ aeApiState *state = eventLoop->apidata;
+ struct epoll_event ee;
+ /* If the fd was already monitored for some event, we need a MOD
+ * operation. Otherwise we need an ADD operation. */
+ int op = eventLoop->events[fd].mask == AE_NONE ?
+ EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+
+ ee.events = 0;
+ mask |= eventLoop->events[fd].mask; /* Merge old events */
+ if (mask & AE_READABLE) ee.events |= EPOLLIN;
+ if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
+ ee.data.u64 = 0; /* avoid valgrind warning */
+ ee.data.fd = fd;
+ if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
+ return 0;
+}
+
+static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
+ aeApiState *state = eventLoop->apidata;
+ struct epoll_event ee;
+ int mask = eventLoop->events[fd].mask & (~delmask);
+
+ ee.events = 0;
+ if (mask & AE_READABLE) ee.events |= EPOLLIN;
+ if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
+ ee.data.u64 = 0; /* avoid valgrind warning */
+ ee.data.fd = fd;
+ if (mask != AE_NONE) {
+ epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
+ } else {
+ /* Note, Kernel < 2.6.9 requires a non null event pointer even for
+ * EPOLL_CTL_DEL. */
+ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
+ }
+}
+
+static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
+ aeApiState *state = eventLoop->apidata;
+ int retval, numevents = 0;
+
+ retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
+ tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
+ if (retval > 0) {
+ int j;
+
+ numevents = retval;
+ for (j = 0; j < numevents; j++) {
+ int mask = 0;
+ struct epoll_event *e = state->events+j;
+
+ if (e->events & EPOLLIN) mask |= AE_READABLE;
+ if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
+ eventLoop->fired[j].fd = e->data.fd;
+ eventLoop->fired[j].mask = mask;
+ }
+ }
+ return numevents;
+}
+
+static char *aeApiName(void) {
+ return "epoll";
+}
View
105 tcproxy/src/ae_kqueue.c
@@ -0,0 +1,105 @@
+/* Kqueue(2)-based ae.c module
+ * Copyright (C) 2009 Harish Mallipeddi - harish.mallipeddi@gmail.com
+ * Released under the BSD license. See the COPYING file for more info. */
+
+#include <sys/types.h>
+#include <sys/event.h>
+#include <sys/time.h>
+
+typedef struct aeApiState {
+ int kqfd;
+ struct kevent *events;
+} aeApiState;
+
+static int aeApiCreate(aeEventLoop *eventLoop) {
+ aeApiState *state = zmalloc(sizeof(aeApiState));
+
+ if (!state) return -1;
+ state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize);
+ if (!state->events) {
+ zfree(state);
+ return -1;
+ }
+ state->kqfd = kqueue();
+ if (state->kqfd == -1) {
+ zfree(state->events);
+ zfree(state);
+ return -1;
+ }
+ eventLoop->apidata = state;
+
+ return 0;
+}
+
+static void aeApiFree(aeEventLoop *eventLoop) {
+ aeApiState *state = eventLoop->apidata;
+
+ close(state->kqfd);
+ zfree(state->events);
+ zfree(state);
+}
+
+static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
+ aeApiState *state = eventLoop->apidata;
+ struct kevent ke;
+
+ if (mask & AE_READABLE) {
+ EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL);
+ if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
+ }
+ if (mask & AE_WRITABLE) {
+ EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL);
+ if (kevent(state->kqfd, &ke, 1, NULL, 0, NULL) == -1) return -1;
+ }
+ return 0;
+}
+
+static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
+ aeApiState *state = eventLoop->apidata;
+ struct kevent ke;
+
+ if (mask & AE_READABLE) {
+ EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+ kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
+ }
+ if (mask & AE_WRITABLE) {
+ EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+ kevent(state->kqfd, &ke, 1, NULL, 0, NULL);
+ }
+}
+
+static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
+ aeApiState *state = eventLoop->apidata;
+ int retval, numevents = 0;
+
+ if (tvp != NULL) {
+ struct timespec timeout;
+ timeout.tv_sec = tvp->tv_sec;
+ timeout.tv_nsec = tvp->tv_usec * 1000;
+ retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
+ &timeout);
+ } else {
+ retval = kevent(state->kqfd, NULL, 0, state->events, eventLoop->setsize,
+ NULL);
+ }
+
+ if (retval > 0) {
+ int j;
+
+ numevents = retval;
+ for(j = 0; j < numevents; j++) {
+ int mask = 0;
+ struct kevent *e = state->events+j;
+
+ if (e->filter == EVFILT_READ) mask |= AE_READABLE;
+ if (e->filter == EVFILT_WRITE) mask |= AE_WRITABLE;
+ eventLoop->fired[j].fd = e->ident;
+ eventLoop->fired[j].mask = mask;
+ }
+ }
+ return numevents;
+}
+
+static char *aeApiName(void) {
+ return "kqueue";
+}
View
72 tcproxy/src/ae_select.c
@@ -0,0 +1,72 @@
+/* Select()-based ae.c module
+ * Copyright (C) 2009-2010 Salvatore Sanfilippo - antirez@gmail.com
+ * Released under the BSD license. See the COPYING file for more info. */
+
+#include <string.h>
+
+typedef struct aeApiState {
+ fd_set rfds, wfds;
+ /* We need to have a copy of the fd sets as it's not safe to reuse
+ * FD sets after select(). */
+ fd_set _rfds, _wfds;
+} aeApiState;
+
+static int aeApiCreate(aeEventLoop *eventLoop) {
+ aeApiState *state = zmalloc(sizeof(aeApiState));
+
+ if (!state) return -1;
+ FD_ZERO(&state->rfds);
+ FD_ZERO(&state->wfds);
+ eventLoop->apidata = state;
+ return 0;
+}
+
+static void aeApiFree(aeEventLoop *eventLoop) {
+ zfree(eventLoop->apidata);
+}
+
+static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
+ aeApiState *state = eventLoop->apidata;
+
+ if (mask & AE_READABLE) FD_SET(fd,&state->rfds);
+ if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds);
+ return 0;
+}
+
+static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
+ aeApiState *state = eventLoop->apidata;
+
+ if (mask & AE_READABLE) FD_CLR(fd,&state->rfds);
+ if (mask & AE_WRITABLE) FD_CLR(fd,&state->wfds);
+}
+
+static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
+ aeApiState *state = eventLoop->apidata;
+ int retval, j, numevents = 0;
+
+ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set));
+ memcpy(&state->_wfds,&state->wfds,sizeof(fd_set));
+
+ retval = select(eventLoop->maxfd+1,
+ &state->_rfds,&state->_wfds,NULL,tvp);
+ if (retval > 0) {
+ for (j = 0; j <= eventLoop->maxfd; j++) {
+ int mask = 0;
+ aeFileEvent *fe = &eventLoop->events[j];
+
+ if (fe->mask == AE_NONE) continue;
+ if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds))
+ mask |= AE_READABLE;
+ if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds))
+ mask |= AE_WRITABLE;
+ eventLoop->fired[numevents].fd = j;
+ eventLoop->fired[numevents].mask = mask;
+ numevents++;
+ }
+ }
+ return numevents;
+}
+
+static char *aeApiName(void) {
+ return "select";
+}
View
369 tcproxy/src/anet.c
@@ -0,0 +1,369 @@
+/* anet.c -- Basic TCP socket stuff made a bit less boring
+ *
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "fmacros.h"
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <string.h>
+#include <netdb.h>
+#include <errno.h>
+#include <stdarg.h>
+#include <stdio.h>
+
+#include "anet.h"
+
+static void anetSetError(char *err, const char *fmt, ...)
+{
+ va_list ap;
+
+ if (!err) return;
+ va_start(ap, fmt);
+ vsnprintf(err, ANET_ERR_LEN, fmt, ap);
+ va_end(ap);
+}
+
+int anetNonBlock(char *err, int fd)
+{
+ int flags;
+
+ /* Set the socket nonblocking.
+ * Note that fcntl(2) for F_GETFL and F_SETFL can't be
+ * interrupted by a signal. */
+ if ((flags = fcntl(fd, F_GETFL)) == -1) {
+ anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));
+ return ANET_ERR;
+ }
+ if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
+ anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
+int anetTcpNoDelay(char *err, int fd)
+{
+ int yes = 1;
+ if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) == -1)
+ {
+ anetSetError(err, "setsockopt TCP_NODELAY: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
+int anetSetSendBuffer(char *err, int fd, int buffsize)
+{
+ if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &buffsize, sizeof(buffsize)) == -1)
+ {
+ anetSetError(err, "setsockopt SO_SNDBUF: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
+int anetTcpKeepAlive(char *err, int fd)
+{
+ int yes = 1;
+ if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1) {
+ anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
+int anetResolve(char *err, char *host, char *ipbuf)
+{
+ struct sockaddr_in sa;
+
+ sa.sin_family = AF_INET;
+ if (inet_aton(host, &sa.sin_addr) == 0) {
+ struct hostent *he;
+
+ he = gethostbyname(host);
+ if (he == NULL) {
+ anetSetError(err, "can't resolve: %s", host);
+ return ANET_ERR;
+ }
+ memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
+ }
+ strcpy(ipbuf,inet_ntoa(sa.sin_addr));
+ return ANET_OK;
+}
+
+static int anetCreateSocket(char *err, int domain) {
+ int s, on = 1;
+ if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
+ anetSetError(err, "creating socket: %s", strerror(errno));
+ return ANET_ERR;
+ }
+
+ /* Make sure connection-intensive things like the redis benckmark
+ * will be able to close/open sockets a zillion of times */
+ if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
+ anetSetError(err, "setsockopt SO_REUSEADDR: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ return s;
+}
+
+#define ANET_CONNECT_NONE 0
+#define ANET_CONNECT_NONBLOCK 1
+static int anetTcpGenericConnect(char *err, char *addr, int port, int flags)
+{
+ int s;
+ struct sockaddr_in sa;
+
+ if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
+ return ANET_ERR;
+
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ if (inet_aton(addr, &sa.sin_addr) == 0) {
+ struct hostent *he;
+
+ he = gethostbyname(addr);
+ if (he == NULL) {
+ anetSetError(err, "can't resolve: %s", addr);
+ close(s);
+ return ANET_ERR;
+ }
+ memcpy(&sa.sin_addr, he->h_addr, sizeof(struct in_addr));
+ }
+ if (flags & ANET_CONNECT_NONBLOCK) {
+ if (anetNonBlock(err,s) != ANET_OK)
+ return ANET_ERR;
+ }
+ if (connect(s, (struct sockaddr*)&sa, sizeof(sa)) == -1) {
+ if (errno == EINPROGRESS &&
+ flags & ANET_CONNECT_NONBLOCK)
+ return s;
+
+ anetSetError(err, "connect: %s", strerror(errno));
+ close(s);
+ return ANET_ERR;
+ }
+ return s;
+}
+
+int anetTcpConnect(char *err, char *addr, int port)
+{
+ return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONE);
+}
+
+int anetTcpNonBlockConnect(char *err, char *addr, int port)
+{
+ return anetTcpGenericConnect(err,addr,port,ANET_CONNECT_NONBLOCK);
+}
+
+int anetUnixGenericConnect(char *err, char *path, int flags)
+{
+ int s;
+ struct sockaddr_un sa;
+
+ if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
+ return ANET_ERR;
+
+ sa.sun_family = AF_LOCAL;
+ strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
+ if (flags & ANET_CONNECT_NONBLOCK) {
+ if (anetNonBlock(err,s) != ANET_OK)
+ return ANET_ERR;
+ }
+ if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) {
+ if (errno == EINPROGRESS &&
+ flags & ANET_CONNECT_NONBLOCK)
+ return s;
+
+ anetSetError(err, "connect: %s", strerror(errno));
+ close(s);
+ return ANET_ERR;
+ }
+ return s;
+}
+
+int anetUnixConnect(char *err, char *path)
+{
+ return anetUnixGenericConnect(err,path,ANET_CONNECT_NONE);
+}
+
+int anetUnixNonBlockConnect(char *err, char *path)
+{
+ return anetUnixGenericConnect(err,path,ANET_CONNECT_NONBLOCK);
+}
+
+/* Like read(2) but make sure 'count' is read before to return
+ * (unless error or EOF condition is encountered) */
+int anetRead(int fd, char *buf, int count)
+{
+ int nread, totlen = 0;
+ while(totlen != count) {
+ nread = read(fd,buf,count-totlen);
+ if (nread == 0) return totlen;
+ if (nread == -1) return -1;
+ totlen += nread;
+ buf += nread;
+ }
+ return totlen;
+}
+
+/* Like write(2) but make sure 'count' is read before to return
+ * (unless error is encountered) */
+int anetWrite(int fd, char *buf, int count)
+{
+ int nwritten, totlen = 0;
+ while(totlen != count) {
+ nwritten = write(fd,buf,count-totlen);
+ if (nwritten == 0) return totlen;
+ if (nwritten == -1) return -1;
+ totlen += nwritten;
+ buf += nwritten;
+ }
+ return totlen;
+}
+
+static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len) {
+ if (bind(s,sa,len) == -1) {
+ anetSetError(err, "bind: %s", strerror(errno));
+ close(s);
+ return ANET_ERR;
+ }
+
+ /* Use a backlog of 512 entries. We pass 511 to the listen() call because
+ * the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1);
+ * which will thus give us a backlog of 512 entries */
+ if (listen(s, 511) == -1) {
+ anetSetError(err, "listen: %s", strerror(errno));
+ close(s);
+ return ANET_ERR;
+ }
+ return ANET_OK;
+}
+
+int anetTcpServer(char *err, int port, char *bindaddr)
+{
+ int s;
+ struct sockaddr_in sa;
+
+ if ((s = anetCreateSocket(err,AF_INET)) == ANET_ERR)
+ return ANET_ERR;
+
+ memset(&sa,0,sizeof(sa));
+ sa.sin_family = AF_INET;
+ sa.sin_port = htons(port);
+ sa.sin_addr.s_addr = htonl(INADDR_ANY);
+ if (bindaddr && inet_aton(bindaddr, &sa.sin_addr) == 0) {
+ anetSetError(err, "invalid bind address");
+ close(s);
+ return ANET_ERR;
+ }
+ if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
+ return ANET_ERR;
+ return s;
+}
+
+int anetUnixServer(char *err, char *path, mode_t perm)
+{
+ int s;
+ struct sockaddr_un sa;
+
+ if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
+ return ANET_ERR;
+
+ memset(&sa,0,sizeof(sa));
+ sa.sun_family = AF_LOCAL;
+ strncpy(sa.sun_path,path,sizeof(sa.sun_path)-1);
+ if (anetListen(err,s,(struct sockaddr*)&sa,sizeof(sa)) == ANET_ERR)
+ return ANET_ERR;
+ if (perm)
+ chmod(sa.sun_path, perm);
+ return s;
+}
+
+static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
+ int fd;
+ while(1) {
+ fd = accept(s,sa,len);
+ if (fd == -1) {
+ if (errno == EINTR)
+ continue;
+ else {
+ anetSetError(err, "accept: %s", strerror(errno));
+ return ANET_ERR;
+ }
+ }
+ break;
+ }
+ return fd;
+}
+
+int anetTcpAccept(char *err, int s, char *ip, int *port) {
+ int fd;
+ struct sockaddr_in sa;
+ socklen_t salen = sizeof(sa);
+ if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
+ return ANET_ERR;
+
+ if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
+ if (port) *port = ntohs(sa.sin_port);
+ return fd;
+}
+
+int anetUnixAccept(char *err, int s) {
+ int fd;
+ struct sockaddr_un sa;
+ socklen_t salen = sizeof(sa);
+ if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == ANET_ERR)
+ return ANET_ERR;
+
+ return fd;
+}
+
+int anetPeerToString(int fd, char *ip, int *port) {
+ struct sockaddr_in sa;
+ socklen_t salen = sizeof(sa);
+
+ if (getpeername(fd,(struct sockaddr*)&sa,&salen) == -1) {
+ *port = 0;
+ ip[0] = '?';
+ ip[1] = '\0';
+ return -1;
+ }
+ if (ip) strcpy(ip,inet_ntoa(sa.sin_addr));
+ if (port) *port = ntohs(sa.sin_port);
+ return 0;
+}
View
58 tcproxy/src/anet.h
@@ -0,0 +1,58 @@
+/* anet.c -- Basic TCP socket stuff made a bit less boring
+ *
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef ANET_H
+#define ANET_H
+
+#define ANET_OK 0
+#define ANET_ERR -1
+#define ANET_ERR_LEN 256
+
+#if defined(__sun)
+#define AF_LOCAL AF_UNIX
+#endif
+
+int anetTcpConnect(char *err, char *addr, int port);
+int anetTcpNonBlockConnect(char *err, char *addr, int port);
+int anetUnixConnect(char *err, char *path);
+int anetUnixNonBlockConnect(char *err, char *path);
+int anetRead(int fd, char *buf, int count);
+int anetResolve(char *err, char *host, char *ipbuf);
+int anetTcpServer(char *err, int port, char *bindaddr);
+int anetUnixServer(char *err, char *path, mode_t perm);
+int anetTcpAccept(char *err, int serversock, char *ip, int *port);
+int anetUnixAccept(char *err, int serversock);
+int anetWrite(int fd, char *buf, int count);
+int anetNonBlock(char *err, int fd);
+int anetTcpNoDelay(char *err, int fd);
+int anetTcpKeepAlive(char *err, int fd);
+int anetPeerToString(int fd, char *ip, int *port);
+
+#endif
View
90 tcproxy/src/config.h
@@ -0,0 +1,90 @@
+#ifndef __CONFIG_H
+#define __CONFIG_H
+
+#ifdef __APPLE__
+#include <AvailabilityMacros.h>
+#endif
+
+/* Test for proc filesystem */
+#ifdef __linux__
+#define HAVE_PROCFS 1
+#endif
+
+/* Test for task_info() */
+#if defined(__APPLE__)
+#define HAVE_TASKINFO 1
+#endif
+
+/* Test for backtrace() */
+#if defined(__APPLE__) || defined(__linux__)
+#define HAVE_BACKTRACE 1
+#endif
+
+/* Test for polling API */
+#ifdef __linux__
+#define HAVE_EPOLL 1
+#endif
+
+#if (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined (__NetBSD__)
+#define HAVE_KQUEUE 1
+#endif
+
+/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
+#ifdef __linux__
+#define aof_fsync fdatasync
+#else
+#define aof_fsync fsync
+#endif
+
+/* Byte ordering detection */
+#include <sys/types.h> /* This will likely define BYTE_ORDER */
+
+#ifndef BYTE_ORDER
+#if (BSD >= 199103)
+# include <machine/endian.h>
+#else
+#if defined(linux) || defined(__linux__)
+# include <endian.h>
+#else
+#define LITTLE_ENDIAN 1234 /* least-significant byte first (vax, pc) */
+#define BIG_ENDIAN 4321 /* most-significant byte first (IBM, net) */
+#define PDP_ENDIAN 3412 /* LSB first in word, MSW first in long (pdp)*/
+
+#if defined(vax) || defined(ns32000) || defined(sun386) || defined(__i386__) || \
+ defined(MIPSEL) || defined(_MIPSEL) || defined(BIT_ZERO_ON_RIGHT) || \
+ defined(__alpha__) || defined(__alpha)
+#define BYTE_ORDER LITTLE_ENDIAN
+#endif
+
+#if defined(sel) || defined(pyr) || defined(mc68000) || defined(sparc) || \
+ defined(is68k) || defined(tahoe) || defined(ibm032) || defined(ibm370) || \
+ defined(MIPSEB) || defined(_MIPSEB) || defined(_IBMR2) || defined(DGUX) ||\
+ defined(apollo) || defined(__convex__) || defined(_CRAY) || \
+ defined(__hppa) || defined(__hp9000) || \
+ defined(__hp9000s300) || defined(__hp9000s700) || \
+ defined (BIT_ZERO_ON_LEFT) || defined(m68k) || defined(__sparc)
+#define BYTE_ORDER BIG_ENDIAN
+#endif
+#endif /* linux */
+#endif /* BSD */
+#endif /* BYTE_ORDER */
+
+#if defined(__BYTE_ORDER) && !defined(BYTE_ORDER)
+#if (__BYTE_ORDER == __LITTLE_ENDIAN)
+#define BYTE_ORDER LITTLE_ENDIAN
+#else
+#define BYTE_ORDER BIG_ENDIAN
+#endif
+#endif
+
+#if !defined(BYTE_ORDER) || \
+ (BYTE_ORDER != BIG_ENDIAN && BYTE_ORDER != LITTLE_ENDIAN)
+ /* you must determine what the correct bit order is for
+ * your compiler - the next line is an intentional error
+ * which will force your compiles to bomb until you fix
+ * the above macros.
+ */
+#error "Undefined or invalid BYTE_ORDER"
+#endif
+
+#endif
View
15 tcproxy/src/fmacros.h
@@ -0,0 +1,15 @@
+#ifndef _REDIS_FMACRO_H
+#define _REDIS_FMACRO_H
+
+#define _BSD_SOURCE
+
+#if defined(__linux__) || defined(__OpenBSD__)
+#define _XOPEN_SOURCE 700
+#else
+#define _XOPEN_SOURCE
+#endif
+
+#define _LARGEFILE_SOURCE
+#define _FILE_OFFSET_BITS 64
+
+#endif
View
471 tcproxy/src/policy.c
@@ -0,0 +1,471 @@
+
+#line 1 "policy.rl"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "policy.h"
+
+static Hostent host;
+static int addr_p;
+static int have_addr;
+
+
+#line 92 "policy.rl"
+
+
+
+#line 19 "policy.c"
+static const char _policy_parser_actions[] = {
+ 0, 1, 3, 1, 4, 1, 6, 1,
+ 7, 1, 8, 1, 9, 1, 10, 2,
+ 0, 3, 2, 2, 4, 2, 3, 4,
+ 2, 5, 1, 3, 8, 0, 3, 4,
+ 0, 3, 2, 4, 5, 8, 0, 3,
+ 2, 4
+};
+
+static const unsigned char _policy_parser_key_offsets[] = {
+ 0, 0, 4, 9, 11, 12, 19, 21,
+ 24, 26, 29, 31, 34, 37, 38, 40,
+ 43, 44, 47, 48, 49, 50, 51, 52,
+ 53, 55, 57, 62, 67, 73, 74, 75,
+ 76, 78, 82, 86, 90, 94, 96, 97,
+ 98, 99, 100, 101, 102, 103, 104, 106,
+ 109, 111, 114, 116, 119, 122, 125, 126,
+ 129, 130, 135, 140, 141, 142, 143, 144,
+ 145, 146, 147, 148, 149, 151, 153, 156,
+ 158, 161, 163, 166, 169, 170, 172, 176,
+ 180, 184, 188, 190, 193, 194, 197, 198,
+ 203, 208, 209, 210, 211, 212, 213, 214,
+ 215, 216, 217, 218, 221, 223, 225, 227,
+ 229, 229, 232, 235
+};
+
+static const char _policy_parser_trans_keys[] = {
+ 97, 108, 48, 57, 32, 45, 46, 48,
+ 57, 32, 45, 62, 32, 97, 104, 108,
+ 114, 48, 57, 48, 57, 46, 48, 57,
+ 48, 57, 46, 48, 57, 48, 57, 58,
+ 48, 57, 58, 48, 57, 58, 48, 57,
+ 46, 48, 57, 46, 46, 48, 57, 46,
+ 110, 121, 97, 115, 104, 32, 123, 32,
+ 123, 32, 97, 108, 48, 57, 32, 46,
+ 125, 48, 57, 32, 97, 108, 125, 48,
+ 57, 110, 121, 58, 48, 57, 32, 125,
+ 48, 57, 32, 125, 48, 57, 32, 125,
+ 48, 57, 32, 125, 48, 57, 32, 125,
+ 111, 99, 97, 108, 104, 111, 115, 116,
+ 48, 57, 46, 48, 57, 48, 57, 46,
+ 48, 57, 48, 57, 58, 48, 57, 58,
+ 48, 57, 46, 48, 57, 46, 46, 48,
+ 57, 46, 32, 46, 125, 48, 57, 32,
+ 46, 125, 48, 57, 111, 99, 97, 108,
+ 104, 111, 115, 116, 114, 32, 123, 48,
+ 57, 46, 48, 57, 48, 57, 46, 48,
+ 57, 48, 57, 58, 48, 57, 58, 48,
+ 57, 58, 48, 57, 32, 45, 48, 57,
+ 32, 45, 48, 57, 32, 45, 48, 57,
+ 32, 45, 48, 57, 32, 45, 46, 48,
+ 57, 46, 46, 48, 57, 46, 32, 45,
+ 46, 48, 57, 32, 45, 46, 48, 57,
+ 110, 121, 111, 99, 97, 108, 104, 111,
+ 115, 116, 46, 48, 57, 48, 57, 48,
+ 57, 48, 57, 48, 57, 46, 48, 57,
+ 46, 48, 57, 0
+};
+
+static const char _policy_parser_single_lengths[] = {
+ 0, 2, 3, 2, 1, 5, 0, 1,
+ 0, 1, 0, 1, 1, 1, 0, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1,
+ 2, 2, 3, 3, 4, 1, 1, 1,
+ 0, 2, 2, 2, 2, 2, 1, 1,
+ 1, 1, 1, 1, 1, 1, 0, 1,
+ 0, 1, 0, 1, 1, 1, 1, 1,
+ 1, 3, 3, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 2, 0, 1, 0,
+ 1, 0, 1, 1, 1, 0, 2, 2,
+ 2, 2, 2, 1, 1, 1, 1, 3,
+ 3, 1, 1, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 0, 0, 0, 0,
+ 0, 1, 1, 0
+};
+
+static const char _policy_parser_range_lengths[] = {
+ 0, 1, 1, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 1, 0, 1, 1,
+ 0, 1, 0, 0, 0, 0, 0, 0,
+ 0, 0, 1, 1, 1, 0, 0, 0,
+ 1, 1, 1, 1, 1, 0, 0, 0,
+ 0, 0, 0, 0, 0, 0, 1, 1,
+ 1, 1, 1, 1, 1, 1, 0, 1,
+ 0, 1, 1, 0, 0, 0, 0, 0,
+ 0, 0, 0, 0, 0, 1, 1, 1,
+ 1, 1, 1, 1, 0, 1, 1, 1,
+ 1, 1, 0, 1, 0, 1, 0, 1,
+ 1, 0, 0, 0, 0, 0, 0, 0,
+ 0, 0, 0, 1, 1, 1, 1, 1,
+ 0, 1, 1, 0
+};
+
+static const short _policy_parser_index_offsets[] = {
+ 0, 0, 4, 9, 12, 14, 21, 23,
+ 26, 28, 31, 33, 36, 39, 41, 43,
+ 46, 48, 51, 53, 55, 57, 59, 61,
+ 63, 66, 69, 74, 79, 85, 87, 89,
+ 91, 93, 97, 101, 105, 109, 112, 114,
+ 116, 118, 120, 122, 124, 126, 128, 130,
+ 133, 135, 138, 140, 143, 146, 149, 151,
+ 154, 156, 161, 166, 168, 170, 172, 174,
+ 176, 178, 180, 182, 184, 187, 189, 192,
+ 194, 197, 199, 202, 205, 207, 209, 213,
+ 217, 221, 225, 228, 231, 233, 236, 238,
+ 243, 248, 250, 252, 254, 256, 258, 260,
+ 262, 264, 266, 268, 271, 273, 275, 277,
+ 279, 280, 283, 286
+};
+
+static const char _policy_parser_indicies[] = {
+ 2, 3, 1, 0, 4, 5, 6, 7,
+ 0, 8, 9, 0, 10, 0, 10, 12,
+ 13, 14, 15, 11, 0, 16, 0, 17,
+ 18, 0, 19, 0, 20, 21, 0, 22,
+ 0, 24, 23, 0, 24, 25, 0, 24,
+ 0, 26, 0, 20, 27, 0, 20, 0,
+ 17, 28, 0, 17, 0, 29, 0, 25,
+ 0, 30, 0, 31, 0, 32, 0, 33,
+ 34, 0, 35, 36, 0, 36, 38, 39,
+ 37, 0, 40, 41, 43, 42, 0, 44,
+ 38, 39, 43, 37, 0, 45, 0, 46,
+ 0, 47, 0, 48, 0, 40, 43, 49,
+ 0, 40, 43, 50, 0, 40, 43, 51,
+ 0, 40, 43, 52, 0, 40, 43, 0,
+ 53, 0, 54, 0, 55, 0, 56, 0,
+ 57, 0, 58, 0, 59, 0, 46, 0,
+ 60, 0, 61, 62, 0, 63, 0, 64,
+ 65, 0, 66, 0, 47, 67, 0, 47,
+ 46, 0, 64, 68, 0, 64, 0, 61,
+ 69, 0, 61, 0, 40, 41, 43, 70,
+ 0, 40, 41, 43, 51, 0, 71, 0,
+ 72, 0, 73, 0, 74, 0, 75, 0,
+ 76, 0, 77, 0, 25, 0, 78, 0,
+ 79, 80, 0, 81, 0, 82, 83, 0,
+ 84, 0, 85, 86, 0, 87, 0, 89,
+ 88, 0, 89, 90, 0, 89, 0, 91,
+ 0, 4, 5, 92, 0, 4, 5, 93,
+ 0, 4, 5, 94, 0, 4, 5, 95,
+ 0, 4, 5, 0, 85, 96, 0, 85,
+ 0, 82, 97, 0, 82, 0, 4, 5,
+ 6, 98, 0, 4, 5, 6, 94, 0,
+ 99, 0, 90, 0, 100, 0, 101, 0,
+ 102, 0, 103, 0, 104, 0, 105, 0,
+ 106, 0, 90, 0, 107, 108, 0, 109,
+ 0, 110, 0, 111, 0, 112, 0, 0,
+ 107, 113, 0, 107, 111, 0, 0, 0
+};
+
+static const char _policy_parser_trans_targs[] = {
+ 0, 2, 89, 91, 3, 4, 69, 87,
+ 3, 4, 5, 99, 19, 21, 59, 67,
+ 7, 8, 17, 9, 10, 15, 11, 12,
+ 14, 13, 100, 16, 18, 20, 22, 23,
+ 24, 25, 26, 25, 26, 27, 29, 38,
+ 28, 46, 57, 107, 28, 30, 31, 32,
+ 33, 34, 35, 36, 37, 39, 40, 41,
+ 42, 43, 44, 45, 47, 48, 55, 49,
+ 50, 53, 51, 52, 54, 56, 58, 60,
+ 61, 62, 63, 64, 65, 66, 68, 25,
+ 26, 70, 71, 85, 72, 73, 83, 74,
+ 75, 77, 76, 78, 79, 80, 81, 82,
+ 84, 86, 88, 90, 92, 93, 94, 95,
+ 96, 97, 98, 6, 105, 101, 102, 103,
+ 104, 106
+};
+
+static const char _policy_parser_trans_actions[] = {
+ 13, 31, 15, 15, 5, 5, 1, 21,
+ 0, 0, 0, 36, 27, 0, 27, 0,
+ 1, 1, 1, 1, 1, 1, 1, 1,
+ 24, 1, 18, 1, 1, 1, 0, 0,
+ 0, 11, 11, 0, 0, 31, 15, 15,
+ 7, 1, 21, 7, 0, 1, 1, 24,
+ 18, 3, 3, 3, 3, 1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 1, 1, 21, 1,
+ 1, 1, 1, 1, 1, 1, 0, 9,
+ 9, 1, 1, 1, 1, 1, 1, 1,
+ 1, 24, 1, 18, 3, 3, 3, 3,
+ 1, 1, 21, 1, 1, 1, 1, 1,
+ 1, 1, 1, 1, 21, 3, 3, 3,
+ 3, 21
+};
+
+static const char _policy_parser_eof_actions[] = {
+ 0, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 13, 13, 13, 13, 13,
+ 13, 13, 13, 7, 7, 7, 7, 7,
+ 7, 7, 7, 0
+};
+
+static const int policy_parser_start = 1;
+static const int policy_parser_first_final = 99;
+static const int policy_parser_error = 0;
+
+static const int policy_parser_en_main = 1;
+
+
+#line 95 "policy.rl"
+
+Policy *ParsePolicy(const char *p) {
+ Policy *policy = malloc(sizeof(Policy));
+
+ memset(policy, 0, sizeof(Policy));
+ host.addr = NULL;
+
+#line 237 "policy.c"
+ {
+ policy->cs = policy_parser_start;
+ }
+
+#line 102 "policy.rl"
+
+ policy->p = p;
+ policy->pe = p + strlen(p);
+ policy->eof = policy->pe;
+
+
+#line 249 "policy.c"
+ {
+ int _klen;
+ unsigned int _trans;
+ const char *_acts;
+ unsigned int _nacts;
+ const char *_keys;
+
+ if ( ( policy->p) == ( policy->pe) )
+ goto _test_eof;
+ if ( policy->cs == 0 )
+ goto _out;
+_resume:
+ _keys = _policy_parser_trans_keys + _policy_parser_key_offsets[ policy->cs];
+ _trans = _policy_parser_index_offsets[ policy->cs];
+
+ _klen = _policy_parser_single_lengths[ policy->cs];
+ if ( _klen > 0 ) {
+ const char *_lower = _keys;
+ const char *_mid;
+ const char *_upper = _keys + _klen - 1;
+ while (1) {
+ if ( _upper < _lower )
+ break;
+
+ _mid = _lower + ((_upper-_lower) >> 1);
+ if ( (*( policy->p)) < *_mid )
+ _upper = _mid - 1;
+ else if ( (*( policy->p)) > *_mid )
+ _lower = _mid + 1;
+ else {
+ _trans += (unsigned int)(_mid - _keys);
+ goto _match;
+ }
+ }
+ _keys += _klen;
+ _trans += _klen;
+ }
+
+ _klen = _policy_parser_range_lengths[ policy->cs];
+ if ( _klen > 0 ) {
+ const char *_lower = _keys;
+ const char *_mid;
+ const char *_upper = _keys + (_klen<<1) - 2;
+ while (1) {
+ if ( _upper < _lower )
+ break;
+
+ _mid = _lower + (((_upper-_lower) >> 1) & ~1);
+ if ( (*( policy->p)) < _mid[0] )
+ _upper = _mid - 2;
+ else if ( (*( policy->p)) > _mid[1] )
+ _lower = _mid + 2;
+ else {
+ _trans += (unsigned int)((_mid - _keys)>>1);
+ goto _match;
+ }
+ }
+ _trans += _klen;
+ }
+
+_match:
+ _trans = _policy_parser_indicies[_trans];
+ policy->cs = _policy_parser_trans_targs[_trans];
+
+ if ( _policy_parser_trans_actions[_trans] == 0 )
+ goto _again;
+
+ _acts = _policy_parser_actions + _policy_parser_trans_actions[_trans];
+ _nacts = (unsigned int) *_acts++;
+ while ( _nacts-- > 0 )
+ {
+ switch ( *_acts++ )
+ {
+ case 0:
+#line 18 "policy.rl"
+ {
+ addr_p = 0;
+ host.addr = NULL;
+ have_addr = 0;
+ }
+ break;
+ case 1:
+#line 24 "policy.rl"
+ {
+ have_addr = 1;
+ }
+ break;
+ case 2:
+#line 28 "policy.rl"
+ {
+ host.port = 0;
+ }
+ break;
+ case 3:
+#line 32 "policy.rl"
+ {
+ if (host.addr == NULL) {
+ host.addr = malloc(16 * sizeof(char));
+ }
+ host.addr[addr_p] = (*( policy->p));
+ addr_p++;
+ }
+ break;
+ case 4:
+#line 40 "policy.rl"
+ {
+ host.port = host.port * 10 + ((*( policy->p)) - '0');
+ }
+ break;
+ case 5:
+#line 44 "policy.rl"
+ {
+ host.addr[addr_p] = '\0';
+ }
+ break;
+ case 6:
+#line 48 "policy.rl"
+ {
+ if (!have_addr) {
+ free(host.addr);
+ host.addr = NULL;
+ }
+ policy->listen = host;
+ host.addr = NULL;
+ }
+ break;
+ case 7:
+#line 57 "policy.rl"
+ {
+ if (!have_addr) {
+ free(host.addr);
+ host.addr = NULL;
+ }
+ policy->nhost++;
+ policy->hosts = realloc(policy->hosts, sizeof(Hostent) * policy->nhost);
+ policy->hosts[policy->nhost - 1] = host;
+ host.addr = NULL;
+ }
+ break;
+ case 8:
+#line 68 "policy.rl"
+ {
+ policy->type = PROXY_RR;
+ }
+ break;
+ case 9:
+#line 72 "policy.rl"
+ {
+ policy->type = PROXY_HASH;
+ }
+ break;
+ case 10:
+#line 76 "policy.rl"
+ {
+ LogFatal("policy syntax error around:\"%s\"\n", ( policy->p));
+ }
+ break;
+#line 407 "policy.c"
+ }
+ }
+
+_again:
+ if ( policy->cs == 0 )
+ goto _out;
+ if ( ++( policy->p) != ( policy->pe) )
+ goto _resume;
+ _test_eof: {}
+ if ( ( policy->p) == ( policy->eof) )
+ {
+ const char *__acts = _policy_parser_actions + _policy_parser_eof_actions[ policy->cs];
+ unsigned int __nacts = (unsigned int) *__acts++;
+ while ( __nacts-- > 0 ) {
+ switch ( *__acts++ ) {
+ case 7:
+#line 57 "policy.rl"
+ {
+ if (!have_addr) {
+ free(host.addr);
+ host.addr = NULL;
+ }
+ policy->nhost++;
+ policy->hosts = realloc(policy->hosts, sizeof(Hostent) * policy->nhost);
+ policy->hosts[policy->nhost - 1] = host;
+ host.addr = NULL;
+ }
+ break;
+ case 10:
+#line 76 "policy.rl"
+ {
+ LogFatal("policy syntax error around:\"%s\"\n", ( policy->p));
+ }
+ break;
+#line 442 "policy.c"
+ }
+ }
+ }
+
+ _out: {}
+ }
+
+#line 108 "policy.rl"
+
+ if (policy->cs ==
+#line 453 "policy.c"
+0
+#line 109 "policy.rl"
+) {
+ free(policy);
+ return NULL;
+ }
+
+ return policy;
+}
+
+void FreePolicy(Policy *policy) {
+ int i;
+ free(policy->listen.addr);
+ for (i = 0; i < policy->nhost; i++) {
+ free(policy->hosts[i].addr);
+ }
+ free(policy->hosts);
+ free(policy);
+}
View
33 tcproxy/src/policy.h
@@ -0,0 +1,33 @@
+#ifndef _POLICY_H_
+#define _POLICY_H_
+
+#include "util.h"
+
+#define PROXY_RR 0
+#define PROXY_HASH 1
+
+typedef struct Hostent {
+ char *addr;
+ int port;
+} Hostent;
+
+typedef struct Policy {
+ Hostent listen;
+
+ int type;
+
+ Hostent *hosts;
+ int nhost;
+
+ int curhost;
+
+ //ragel stuff
+ const char *p, *pe, *eof;
+ int cs;
+} Policy;
+
+void FreePolicy(Policy *policy);
+Policy *ParsePolicy(const char *str);
+
+#endif /* _POLICY_H_ */
+
View
125 tcproxy/src/policy.rl
@@ -0,0 +1,125 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "policy.h"
+
+static Hostent host;
+static int addr_p;
+static int have_addr;
+
+%%{
+ machine policy_parser;
+ access policy->;
+ variable p policy->p;
+ variable pe policy->pe;
+ variable eof policy->eof;
+
+ action init_host {
+ addr_p = 0;
+ host.addr = NULL;
+ have_addr = 0;
+ }
+
+ action have_addr {
+ have_addr = 1;
+ }
+
+ action init_port {
+ host.port = 0;
+ }
+
+ action append_addr {
+ if (host.addr == NULL) {
+ host.addr = malloc(16 * sizeof(char));
+ }
+ host.addr[addr_p] = fc;
+ addr_p++;
+ }
+
+ action append_port {
+ host.port = host.port * 10 + (fc - '0');
+ }
+
+ action finish_addr {
+ host.addr[addr_p] = '\0';
+ }
+
+ action listen_addr {
+ if (!have_addr) {
+ free(host.addr);
+ host.addr = NULL;
+ }
+ policy->listen = host;
+ host.addr = NULL;
+ }
+
+ action append_host {
+ if (!have_addr) {
+ free(host.addr);
+ host.addr = NULL;
+ }
+ policy->nhost++;
+ policy->hosts = realloc(policy->hosts, sizeof(Hostent) * policy->nhost);
+ policy->hosts[policy->nhost - 1] = host;
+ host.addr = NULL;
+ }
+
+ action set_rr {
+ policy->type = PROXY_RR;
+ }
+
+ action set_hash {
+ policy->type = PROXY_HASH;
+ }
+
+ action error {
+ LogFatal("policy syntax error around:\"%s\"\n", fpc);
+ }
+
+ ws = (' ');
+ port = (digit {1,5});
+ dottedip = (digit {1,3} '.' digit {1,3} '.' digit {1,3} '.' digit {1,3});
+ addr = ('localhost' | 'any' | dottedip) $append_addr %finish_addr;
+ host = ((addr ':' >have_addr)? port >init_port $append_port) >init_host;
+
+ type = ('rr' %set_rr | 'hash' %set_hash);
+ group = (type ws* '{' ws* host (ws+ >append_host host)* ws* '}' >append_host);
+
+ policy = (host %listen_addr ws* '->' ws* (host >set_rr %append_host | group));
+
+ main := (policy) $!error;
+}%%
+
+%% write data;
+
+Policy *ParsePolicy(const char *p) {
+ Policy *policy = malloc(sizeof(Policy));
+
+ memset(policy, 0, sizeof(Policy));
+ host.addr = NULL;
+ %% write init;
+
+ policy->p = p;
+ policy->pe = p + strlen(p);
+ policy->eof = policy->pe;
+
+ %% write exec;
+
+ if (policy->cs == %%{writ