Permalink
Browse files

afamqp: New AMQP destination driver

This driver implements an AMQP destination (based on the rabbitmq-c
library), supporting persistence, all the exchange types, and uses a
creative way to get messages accross: all the name-value pairs
selected with the value-pairs() syntax will be sent as headers, while
the message payload can be set with the body() option (empty by
default).

Most settings have sensible defaults, except for a few, noted below:

@module afamqp
destination d_amqp {
    amqp(
        vhost("/")
        host("127.0.0.1")
        port(5672)
        username("guest") # mandatory, no default
        password("guest") # mandatory, no default
        exchange("syslog")
        exchange_type("fanout")
        #routing_key("")
        #body("")
        persistent(yes)
        value-pairs(
            scope("selected-macros" "nv-pairs" "sdata")
        )
    );
};

Publishing the name-value pairs as headers makes it possible to use a
headers exchange type and subscribe only to interesting log streams,
in a much more flexible way than using the routing_key() option.

The routing_key() and body() options can contain any template, they
will be expanded before publication.

Signed-off-by: Attila Nagy <bra@fsn.hu>
Signed-off-by: Gergely Nagy <algernon@balabit.hu>
  • Loading branch information...
1 parent f870f6c commit efdee1072316674459b354f58b7621d9d97be4da @algernon algernon committed Oct 8, 2012
View
@@ -4,3 +4,6 @@
[submodule "lib/ivykis"]
path = lib/ivykis
url = https://github.com/buytenh/ivykis.git
+[submodule "modules/afamqp/rabbitmq-c"]
+ path = modules/afamqp/rabbitmq-c
+ url = https://github.com/alanxz/rabbitmq-c.git
View
@@ -3,7 +3,7 @@
# This script is needed to setup build environment from checked out
# source tree.
#
-SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client"
+SUBMODULES="lib/ivykis modules/afmongodb/libmongo-client modules/afamqp/rabbitmq-c"
GIT=`which git`
autogen_submodules()
View
@@ -28,6 +28,7 @@ IVYKIS_MIN_VERSION="0.30.1"
JSON_C_MIN_VERSION="0.9"
PCRE_MIN_VERSION="6.1"
LMC_MIN_VERSION="0.1.6"
+LRMQ_MIN_VERSION="0.0.1"
dnl ***************************************************************************
dnl Initial setup
@@ -163,6 +164,15 @@ AC_ARG_WITH(libmongo-client,
Link against the system supplied or the built-in libmongo-client library.]
,,with_libmongo_client="internal")
+AC_ARG_ENABLE(amqp,
+ [ --enable-amqp Enable amqp destination (default: auto)]
+ ,,enable_amqp="auto")
+
+AC_ARG_WITH(librabbitmq,
+ [ --with-librabbitmq-client=[system/internal]
+ Link against the system supplied or the built-in librabbitmq library.]
+ ,,with_librabbitmq_client="internal")
+
AC_ARG_WITH(ivykis,
[ --with-ivykis=[system/internal]
Link against the system supplied or the built-in ivykis library.]
@@ -827,6 +837,31 @@ if test "x$enable_smtp" != "xno" && test "x$with_libesmtp" != "no"; then
fi
dnl ***************************************************************************
+dnl rabbitmq-c headers/libraries
+dnl ***************************************************************************
+
+if test "x$with_librabbitmq_client" = "xinternal"; then
+ if test -f "$srcdir/modules/afamqp/rabbitmq-c/librabbitmq/amqp.h"; then
+ AC_CONFIG_SUBDIRS([modules/afamqp/rabbitmq-c])
+ # these can only be used in modules/amqp as it assumes
+ # the current directory just one below rabbitmq-c
+
+ LIBRABBITMQ_LIBS="-L\$(builddir)/rabbitmq-c/librabbitmq -lrabbitmq"
+ LIBRABBITMQ_CFLAGS="-I\$(srcdir)/rabbitmq-c/librabbitmq -I\$(builddir)/rabbitmq-c/librabbitmq"
+ LIBRABBITMQ_SUBDIRS="rabbitmq-c"
+ else
+ AC_MSG_WARN([Internal librabbitmq-client sources not found in modules/afamqp/rabbitmq-c])
+ with_librabbitmq_client="no"
+ fi
+elif test "x$with_librabbitmq_client" = "xsystem"; then
+ PKG_CHECK_MODULES(LIBRABBITMQ, librabbitmq >= $LRMQ_MIN_VERSION,with_librabbitmq_client="yes",with_librabbitmq_client="no")
+fi
+
+if test "x$with_librabbitmq_client" = "xno"; then
+ enable_amqp="no"
+fi
+
+dnl ***************************************************************************
dnl misc features to be enabled
dnl ***************************************************************************
@@ -889,6 +924,16 @@ if test "x$enable_mongodb" = "xauto"; then
AC_MSG_RESULT([$enable_mongodb])
fi
+if test "x$enable_amqp" = "xauto"; then
+ AC_MSG_CHECKING(whether to enable amqp destination support)
+ if test "x$with_librabbitmq_client" != "no"; then
+ enable_amqp="yes"
+ else
+ enable_amqp="no"
+ fi
+ AC_MSG_RESULT([$enable_amqp])
+fi
+
if test "x$enable_json" != "xno"; then
JSON_LIBS=$JSON_C_LIBS
JSON_CFLAGS=$JSON_C_CFLAGS
@@ -1068,6 +1113,7 @@ AM_CONDITIONAL(ENABLE_SUN_STREAMS, [test "$enable_sun_streams" = "yes"])
AM_CONDITIONAL(ENABLE_PACCT, [test "$enable_pacct" = "yes"])
AM_CONDITIONAL(ENABLE_MONGODB, [test "$enable_mongodb" = "yes"])
AM_CONDITIONAL(ENABLE_SMTP, [test "$enable_smtp" = "yes"])
+AM_CONDITIONAL(ENABLE_AMQP, [test "$enable_amqp" = "yes"])
AM_CONDITIONAL(ENABLE_JSON, [test "$enable_json" = "yes"])
AM_CONDITIONAL(WITH_LIBSYSTEMD, [test "$with_libsystemd" = "yes"])
@@ -1102,6 +1148,9 @@ AC_SUBST(LIBMONGO_CFLAGS)
AC_SUBST(LIBMONGO_SUBDIRS)
AC_SUBST(LIBESMTP_CFLAGS)
AC_SUBST(LIBESMTP_LIBS)
+AC_SUBST(LIBRABBITMQ_LIBS)
+AC_SUBST(LIBRABBITMQ_CFLAGS)
+AC_SUBST(LIBRABBITMQ_SUBDIRS)
AC_SUBST(JSON_LIBS)
AC_SUBST(JSON_CFLAGS)
AC_SUBST(IVYKIS_SUBDIRS)
@@ -1132,6 +1181,7 @@ AC_OUTPUT(dist.conf
modules/afuser/Makefile
modules/afmongodb/Makefile
modules/afsmtp/Makefile
+ modules/afamqp/Makefile
modules/dbparser/Makefile
modules/dbparser/pdbtool/Makefile
modules/dbparser/tests/Makefile
@@ -1172,6 +1222,7 @@ echo " __thread keyword : ${ac_cv_have_tls:=no}"
echo " Submodules:"
echo " ivykis : $with_ivykis"
echo " libmongo-client : $with_libmongo_client"
+echo " librabbitmq : $with_librabbitmq_client"
echo " Features:"
echo " Debug symbols : ${enable_debug:=no}"
echo " GCC profiling : ${enable_gprof:=no}"
@@ -1192,4 +1243,5 @@ echo " PACCT module (EXPERIMENTAL) : ${enable_pacct:=no}"
echo " MongoDB destination (module): ${enable_mongodb:=no}"
echo " JSON support (module) : ${enable_json:=no}"
echo " SMTP support (module) : ${enable_smtp:=no}"
+echo " AMQP destination (module) : ${enable_amqp:=no}"
View
@@ -383,6 +383,8 @@ const gchar *source_names[SCS_MAX] =
"severity",
"facility",
"sender",
+ "smtp",
+ "amqp",
};
View
@@ -71,6 +71,7 @@ enum
SCS_FACILITY = 25,
SCS_SENDER = 26,
SCS_SMTP = 27,
+ SCS_AMQP = 28,
SCS_MAX,
SCS_SOURCE_MASK = 0xff
};
View
@@ -1 +1 @@
-SUBDIRS = afsocket afsql afstreams affile afprog afuser afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
+SUBDIRS = afsocket afsql afstreams affile afprog afuser afamqp afmongodb afsmtp csvparser confgen system-source syslogformat pacctformat basicfuncs cryptofuncs dbparser json
View
@@ -0,0 +1,35 @@
+
+SUBDIRS = @LIBRABBITMQ_SUBDIRS@
+DIST_SUBDIRS = rabbitmq-c
+
+moduledir = @moduledir@
+AM_CPPFLAGS = -I$(top_srcdir)/lib -I../../lib
+module_LTLIBRARIES = libafamqp.la
+
+export top_srcdir
+
+if ENABLE_AMQP
+
+libafamqp_la_CFLAGS = $(LIBRABBITMQ_CFLAGS)
+libafamqp_la_SOURCES = afamqp-grammar.y afamqp.c afamqp.h afamqp-parser.c afamqp-parser.h
+libafamqp_la_LIBADD = $(MODULE_DEPS_LIBS) $(LIBRABBITMQ_LIBS)
+libafamqp_la_LDFLAGS = $(MODULE_LDFLAGS)
+
+endif
+
+BUILT_SOURCES = afamqp-grammar.y afamqp-grammar.c afamqp-grammar.h
+EXTRA_DIST = $(BUILT_SOURCES) afamqp-grammar.ym
+
+include $(top_srcdir)/build/lex-rules.am
+
+# divert install/uninstall targets to avoid recursing into $(SUBDIRS)
+
+install:
+ $(MAKE) $(AM_MAKEFLAGS) all
+ $(MAKE) $(AM_MAKEFLAGS) install-am
+
+uninstall:
+ $(MAKE) $(AM_MAKEFLAGS) uninstall-am
+
+check:
+ echo "Make check disabled, since it requires a newer glib"
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+%code requires {
+
+#include "afamqp-parser.h"
+
+}
+
+%code {
+
+#include "cfg-parser.h"
+#include "afamqp-grammar.h"
+#include "plugin.h"
+#include "vptransform.h"
+
+extern LogDriver *last_driver;
+extern ValuePairs *last_value_pairs;
+extern ValuePairsTransformSet *last_vp_transset;
+}
+
+%name-prefix "afamqp_"
+%lex-param {CfgLexer *lexer}
+%parse-param {CfgLexer *lexer}
+%parse-param {LogDriver **instance}
+%parse-param {gpointer arg}
+
+
+/* INCLUDE_DECLS */
+
+%token KW_AMQP
+%token KW_EXCHANGE
+%token KW_EXCHANGE_TYPE
+%token KW_PERSISTENT
+%token KW_VHOST
+%token KW_ROUTING_KEY
+%token KW_BODY
+
+%%
+
+start
+ : LL_CONTEXT_DESTINATION KW_AMQP
+ {
+ last_driver = *instance = afamqp_dd_new();
+ }
+ '(' afamqp_options ')' { YYACCEPT; }
+ ;
+
+afamqp_options
+ : afamqp_option afamqp_options
+ |
+ ;
+
+afamqp_option
+ : KW_HOST '(' string ')' { afamqp_dd_set_host(last_driver, $3); free($3); }
+ | KW_PORT '(' LL_NUMBER ')' { afamqp_dd_set_port(last_driver, $3); }
+ | KW_VHOST '(' string ')' { afamqp_dd_set_vhost(last_driver, $3); free($3); }
+ | KW_EXCHANGE '(' string ')' { afamqp_dd_set_exchange(last_driver, $3); free($3); }
+ | KW_EXCHANGE_TYPE '(' string ')' { afamqp_dd_set_exchange_type(last_driver, $3); free($3); }
+ | KW_ROUTING_KEY '(' string ')' { afamqp_dd_set_routing_key(last_driver, $3); free($3); }
+ | KW_BODY '(' string ')' { afamqp_dd_set_body(last_driver, $3); free($3); }
+ | KW_PERSISTENT '(' yesno ')' { afamqp_dd_set_persistent(last_driver, $3); }
+ | KW_USERNAME '(' string ')' { afamqp_dd_set_user(last_driver, $3); free($3); }
+ | KW_PASSWORD '(' string ')' { afamqp_dd_set_password(last_driver, $3); free($3); }
+ | value_pair_option { afamqp_dd_set_value_pairs(last_driver, $1); }
+ | dest_driver_option
+ ;
+
+/* INCLUDE_RULES */
+
+%%
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#include "afamqp.h"
+#include "cfg-parser.h"
+#include "afamqp-grammar.h"
+
+extern int afamqp_debug;
+int afamqp_parse(CfgLexer *lexer, LogDriver **instance, gpointer arg);
+
+static CfgLexerKeyword afamqp_keywords[] = {
+ { "amqp", KW_AMQP },
+ { "vhost", KW_VHOST },
+ { "host", KW_HOST },
+ { "port", KW_PORT },
+ { "exchange", KW_EXCHANGE },
+ { "exchange_type", KW_EXCHANGE_TYPE },
+ { "routing_key", KW_ROUTING_KEY },
+ { "persistent", KW_PERSISTENT },
+ { "username", KW_USERNAME },
+ { "password", KW_PASSWORD },
+ { "log_fifo_size", KW_LOG_FIFO_SIZE },
+ { "body", KW_BODY },
+ { NULL }
+};
+
+CfgParser afamqp_parser =
+{
+#if ENABLE_DEBUG
+ .debug_flag = &afamqp_debug,
+#endif
+ .name = "afamqp",
+ .keywords = afamqp_keywords,
+ .parse = (int (*)(CfgLexer *lexer, gpointer *instance, gpointer)) afamqp_parse,
+ .cleanup = (void (*)(gpointer)) log_pipe_unref,
+};
+
+CFG_PARSER_IMPLEMENT_LEXER_BINDING(afamqp_, LogDriver **)
@@ -0,0 +1,36 @@
+/*
+ * Copyright (c) 2012 Nagy, Attila <bra@fsn.hu>
+ * Copyright (c) 2012 BalaBit IT Ltd, Budapest, Hungary
+ * Copyright (c) 2012 Gergely Nagy <algernon@balabit.hu>
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU General Public License version 2 as published
+ * by the Free Software Foundation, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * As an additional exemption you are allowed to compile & link against the
+ * OpenSSL libraries as published by the OpenSSL project. See the file
+ * COPYING for details.
+ *
+ */
+
+#ifndef AFAMQP_PARSER_H_INCLUDED
+#define AFAMQP_PARSER_H_INCLUDED
+
+#include "cfg-parser.h"
+#include "cfg-lexer.h"
+#include "afamqp.h"
+
+extern CfgParser afamqp_parser;
+
+CFG_PARSER_DECLARE_LEXER_BINDING(afamqp_, LogDriver **)
+
+#endif
Oops, something went wrong.

0 comments on commit efdee10

Please sign in to comment.