Permalink
Browse files

thread pool (#102)

* thread pool
* crofsock: use TLS_method() only for openssl versions newer than 1.1.0f, use TLSv1_2_method() for older versions
* added configure options --enable-asan and --enable-tsan for google's address and thread sanitizers
* moved global OpenSSL initialization and termination to class cthread
* removed state STATE_CLOSED from crofsock
* revised openssl deinitialization for tests with google's address sanitizer
* disabled test crofsocktest::test_tls(), when ASAN is enabled
* crofsock: fixed integration of openssl library
  • Loading branch information...
akoepsel committed Nov 13, 2017
1 parent 444b50f commit 0f67ee9e90bcf2b90c60c64979351085b7c24d1f
Showing with 3,086 additions and 1,344 deletions.
  1. +1 −2 .travis.yml
  2. +3 −0 CHANGES
  3. +1 −1 VERSION
  4. +2 −4 config/debug.m4
  5. +36 −0 config/google_sanitizers.m4
  6. +5 −0 config/openssl.m4
  7. +3 −0 configure.ac
  8. +0 −1 examples/ethswctld/cdaemon.h
  9. +16 −13 examples/ethswctld/cetherswitch.cc
  10. +1 −1 examples/ethswctld/cetherswitch.h
  11. +9 −6 examples/ethswctld/cfibentry.cc
  12. +1 −1 examples/ethswctld/cfibentry.h
  13. +9 −6 examples/ethswctld/cflowentry.cc
  14. +1 −1 examples/ethswctld/cflowentry.h
  15. +0 −1 examples/ethswctld/cunixenv.h
  16. +5 −2 src/rofl/common/Makefile.am
  17. +27 −0 src/rofl/common/cbuffer.cpp
  18. +209 −0 src/rofl/common/cbuffer.hpp
  19. +70 −11 src/rofl/common/crofbase.cc
  20. +54 −20 src/rofl/common/crofbase.h
  21. +135 −34 src/rofl/common/crofchan.h
  22. +311 −202 src/rofl/common/crofconn.cc
  23. +95 −55 src/rofl/common/crofconn.h
  24. +7 −2 src/rofl/common/crofctl.cc
  25. +33 −0 src/rofl/common/crofctl.h
  26. +16 −12 src/rofl/common/crofdpt.cc
  27. +33 −0 src/rofl/common/crofdpt.h
  28. +828 −499 src/rofl/common/crofsock.cc
  29. +73 −81 src/rofl/common/crofsock.h
  30. +329 −45 src/rofl/common/cthread.cpp
  31. +241 −23 src/rofl/common/cthread.hpp
  32. +44 −5 src/rofl/common/ctimer.hpp
  33. +18 −0 src/rofl/common/locking.hpp
  34. +0 −2 src/rofl/common/openflow/cofmatch.h
  35. +14 −9 test/rofl/common/crofbase/crofbasetest.cpp
  36. +17 −3 test/rofl/common/crofbase/crofbasetest.hpp
  37. +1 −1 test/rofl/common/crofbase/unittest.cpp
  38. +159 −159 test/rofl/common/crofchan/crofchantest.cpp
  39. +51 −40 test/rofl/common/crofchan/crofchantest.hpp
  40. +1 −1 test/rofl/common/crofchan/unittest.cpp
  41. +49 −27 test/rofl/common/crofconn/crofconntest.cpp
  42. +4 −0 test/rofl/common/crofconn/crofconntest.hpp
  43. +137 −62 test/rofl/common/crofsock/crofsocktest.cpp
  44. +25 −4 test/rofl/common/crofsock/crofsocktest.hpp
  45. +1 −1 test/rofl/common/crofsock/unittest.cpp
  46. +6 −4 test/rofl/common/cthread/cthread_test.cc
  47. +5 −3 test/rofl/common/cthread/cthread_test.h
View
@@ -26,14 +26,13 @@ env:
matrix:
include:
- compiler: clang
env: FLAGS="--disable-silent-rules" ASAN=true
env: FLAGS="--disable-silent-rules --enable-asan"
before_install:
- sudo apt-get update -qq
install:
- if [[ "$CXX" == "clang++" ]]; then export CXX="clang++-4.0" CC="clang-4.0"; fi
- if [[ "$ASAN" == "true" ]]; then export CXXFLAGS="-g -fno-omit-frame-pointer -fsanitize=address"; export CFLAGS="$CXXFLAGS"; fi
- ./autogen.sh
script:
View
@@ -9,6 +9,9 @@ Legend:
Change log
==========
v0.13.0
[+] thread pool
v0.12.1
[A] queue_type be public enum
[B] fix ctimespec operator
View
@@ -1 +1 @@
v0.12.1
v0.13.0
View
@@ -5,13 +5,11 @@ AC_ARG_ENABLE(debug,
AS_HELP_STRING([--enable-debug], [turn on debug mode [default=no]])
, , enable_debug=$debug_default)
if test "$enable_debug" = "yes"; then
CFLAGS="$CFLAGS -g -O0"
CXXFLAGS="$CXXFLAGS -g -O0 -fno-inline"
CFLAGS="$CFLAGS -g "
CXXFLAGS="$CXXFLAGS -g "
AC_DEFINE([DEBUG], [], [Description])
AC_MSG_RESULT(yes)
else
CFLAGS="$CFLAGS -O3" #--compiler-options -fno-strict-aliasing --compiler-options -fno-inline
CXXFLAGS="$CXXFLAGS -O3" #-fomit-frame-pointer"
AC_DEFINE([NDEBUG], [], [Description])
AC_MSG_RESULT(no)
fi
@@ -0,0 +1,36 @@
# Check for google's address sanitizer - MUST BE THE FIRST CHECK
AC_MSG_CHECKING(whether to enable google's address sanitizer)
asan_default="no"
AC_ARG_ENABLE(asan,
AS_HELP_STRING([--enable-asan], [compile with google's address sanitizer [default=no]])
, , enable_asan=$asan_default)
if test "$enable_asan" = "yes"; then
CFLAGS="$CFLAGS -g -fsanitize=address -fno-omit-frame-pointer"
CXXFLAGS="$CXXFLAGS -g -fno-inline -fsanitize=address -fno-omit-frame-pointer"
LDFLAGS="$LDFLAGS -g -fsanitize=address -fno-omit-frame-pointer"
AC_DEFINE([ASAN], [], [Description])
AC_MSG_RESULT(yes)
else
AC_MSG_RESULT(no)
fi
AM_CONDITIONAL(ASAN, test "$enable_asan" = yes)
# Check for google's thread sanitizer - MUST BE THE FIRST CHECK
AC_MSG_CHECKING(whether to enable google's thread sanitizer)
tsan_default="no"
AC_ARG_ENABLE(tsan,
AS_HELP_STRING([--enable-tsan], [compile with google's thread sanitizer [default=no]])
, , enable_tsan=$tsan_default)
if test "$enable_tsan" = "yes"; then
CFLAGS="$CFLAGS -g -fsanitize=thread"
CXXFLAGS="$CXXFLAGS -g -fno-inline -fsanitize=thread"
LDFLAGS="$LDFLAGS -g -fsanitize=thread"
AC_DEFINE([TSAN], [], [Description])
AC_MSG_RESULT(yes)
else
AC_MSG_RESULT(no)
fi
AM_CONDITIONAL(TSAN, test "$enable_tsan" = yes)
View
@@ -7,6 +7,11 @@ AC_CHECK_LIB(crypto, ERR_get_error, , ssl_detected="no")
AC_MSG_CHECKING(for availability of openssl and crypto libraries(SSL/TLS))
if test "$ssl_detected" = "yes"; then
AC_MSG_RESULT(found)
AC_DEFINE_UNQUOTED([CACERT],["$(pwd)/../tools/xca/ca.rofl-core.crt.pem"], [""])
AC_DEFINE_UNQUOTED([CLICERT],["$(pwd)/../tools/xca/client.crt.pem"], [""])
AC_DEFINE_UNQUOTED([CLIKEY],["$(pwd)/../tools/xca/client.key.pem"], [""])
AC_DEFINE_UNQUOTED([SRVCERT],["$(pwd)/../tools/xca/server.crt.pem"], [""])
AC_DEFINE_UNQUOTED([SRVKEY],["$(pwd)/../tools/xca/server.key.pem"], [""])
else
AC_MSG_RESULT(not found)
fi
View
@@ -31,6 +31,9 @@ AC_DEFINE([__STDC_FORMAT_MACROS], [], [Description])
# Perform GCC checkings
m4_include([config/gcc.m4])
# Google's sanitizers
m4_include([config/google_sanitizers.m4])
# Debug
m4_include([config/debug.m4])
@@ -17,7 +17,6 @@
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
@@ -73,11 +73,10 @@ int cetherswitch::run(int argc, char **argv) {
cetherswitch::cetherswitch(
const rofl::openflow::cofhello_elem_versionbitmap &versionbitmap)
: thread(this), dump_fib_interval(DUMP_FIB_DEFAULT_INTERVAL),
: thread_num(rofl::cthread::get_mgt_thread_num_from_pool()),
dump_fib_interval(DUMP_FIB_DEFAULT_INTERVAL),
get_flow_stats_interval(GET_FLOW_STATS_DEFAULT_INTERVAL) {
rofl::crofbase::set_versionbitmap(versionbitmap);
thread.start("cetherswitch");
}
cetherswitch::~cetherswitch() {}
@@ -88,8 +87,9 @@ void cetherswitch::handle_timeout(cthread &thread, uint32_t timer_id) {
case TIMER_ID_DUMP_FIB: {
// re-register timer for next round
thread.add_timer(TIMER_ID_DUMP_FIB,
rofl::ctimespec().expire_in(dump_fib_interval));
rofl::cthread::thread(thread_num)
.add_timer(this, TIMER_ID_DUMP_FIB,
rofl::ctimespec().expire_in(dump_fib_interval));
std::cerr << "****************************************" << std::endl;
std::cerr << *this;
@@ -99,8 +99,9 @@ void cetherswitch::handle_timeout(cthread &thread, uint32_t timer_id) {
case TIMER_ID_GET_FLOW_STATS: {
// re-register timer for next round
thread.add_timer(TIMER_ID_GET_FLOW_STATS,
rofl::ctimespec().expire_in(get_flow_stats_interval));
rofl::cthread::thread(thread_num)
.add_timer(this, TIMER_ID_GET_FLOW_STATS,
rofl::ctimespec().expire_in(get_flow_stats_interval));
rofl::crofdpt &dpt = rofl::crofbase::set_dpt(dptid);
@@ -132,12 +133,14 @@ void cetherswitch::handle_timeout(cthread &thread, uint32_t timer_id) {
*/
void cetherswitch::handle_dpt_open(rofl::crofdpt &dpt) {
// register timer for dumping ethswitch's internal state
thread.add_timer(TIMER_ID_DUMP_FIB,
rofl::ctimespec().expire_in(dump_fib_interval));
rofl::cthread::thread(thread_num)
.add_timer(this, TIMER_ID_DUMP_FIB,
rofl::ctimespec().expire_in(dump_fib_interval));
// start periodic timer for querying datapath for all flow table entries
thread.add_timer(TIMER_ID_GET_FLOW_STATS,
rofl::ctimespec().expire_in(get_flow_stats_interval));
rofl::cthread::thread(thread_num)
.add_timer(this, TIMER_ID_GET_FLOW_STATS,
rofl::ctimespec().expire_in(get_flow_stats_interval));
dptid = dpt.get_dptid();
@@ -183,9 +186,9 @@ void cetherswitch::handle_dpt_open(rofl::crofdpt &dpt) {
}
void cetherswitch::handle_dpt_close(const rofl::cdptid &dptid) {
thread.drop_timer(TIMER_ID_DUMP_FIB);
rofl::cthread::thread(thread_num).drop_timer(this, TIMER_ID_DUMP_FIB);
thread.drop_timer(TIMER_ID_GET_FLOW_STATS);
rofl::cthread::thread(thread_num).drop_timer(this, TIMER_ID_GET_FLOW_STATS);
std::cerr << "[cetherswitch] datapath detached, dptid: " << dptid << std::endl
<< cfibtable::get_fib(dptid);
@@ -170,7 +170,7 @@ class cetherswitch : public cflowtable_env,
static bool keep_on_running;
rofl::cthread thread;
uint32_t thread_num;
rofl::cdptid dptid;
@@ -13,11 +13,13 @@ using namespace rofl::examples::ethswctld;
cfibentry::cfibentry(cfibentry_env *fibenv, const rofl::caddress_ll &hwaddr,
uint32_t port_no)
: env(fibenv), port_no(port_no), hwaddr(hwaddr),
entry_timeout(CFIBENTRY_DEFAULT_TIMEOUT), thread(this) {
thread.add_timer(TIMER_ID_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
entry_timeout(CFIBENTRY_DEFAULT_TIMEOUT),
thread_num(rofl::cthread::get_mgt_thread_num_from_pool()) {
rofl::cthread::thread(thread_num)
.add_timer(this, TIMER_ID_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
std::cerr << "[cfibentry] created" << std::endl << *this;
thread.start("cfibentry");
rofl::cthread::thread(thread_num).start("cfibentry");
}
cfibentry::~cfibentry() {
@@ -39,6 +41,7 @@ void cfibentry::set_port_no(uint32_t port_no) {
env->fib_port_update(*this);
}
thread.add_timer(TIMER_ID_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
rofl::cthread::thread(thread_num)
.add_timer(this, TIMER_ID_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
}
@@ -191,7 +191,7 @@ class cfibentry : public rofl::cthread_env {
time_t entry_timeout;
static const time_t CFIBENTRY_DEFAULT_TIMEOUT;
rofl::cthread thread;
uint32_t thread_num;
};
}; // namespace ethswctld
@@ -13,12 +13,14 @@ cflowentry::cflowentry(cflowentry_env *flowenv, const rofl::cdptid &dptid,
const rofl::caddress_ll &src,
const rofl::caddress_ll &dst, uint32_t port_no)
: env(flowenv), dptid(dptid), port_no(port_no), src(src), dst(dst),
entry_timeout(CFLOWENTRY_DEFAULT_TIMEOUT), thread(this) {
entry_timeout(CFLOWENTRY_DEFAULT_TIMEOUT),
thread_num(rofl::cthread::get_mgt_thread_num_from_pool()) {
flow_mod_add();
thread.add_timer(CFLOWENTRY_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
rofl::cthread::thread(thread_num)
.add_timer(this, CFLOWENTRY_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
std::cerr << "[cflowentry] created" << std::endl << *this;
thread.start("cflowentry");
rofl::cthread::thread(thread_num).start("cflowentry");
}
cflowentry::~cflowentry() {
@@ -40,8 +42,9 @@ void cflowentry::set_out_port_no(uint32_t port_no) {
flow_mod_modify();
}
thread.add_timer(CFLOWENTRY_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
rofl::cthread::thread(thread_num)
.add_timer(this, CFLOWENTRY_ENTRY_EXPIRED,
rofl::ctimespec().expire_in(entry_timeout));
}
void cflowentry::flow_mod_add() {
@@ -201,7 +201,7 @@ class cflowentry : public rofl::cthread_env {
rofl::caddress_ll src;
rofl::caddress_ll dst;
int entry_timeout;
rofl::cthread thread;
uint32_t thread_num;
};
}; // namespace ethswctld
@@ -16,7 +16,6 @@
#include <string>
#include <sys/resource.h>
#include <sys/stat.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
@@ -48,7 +48,9 @@ librofl_common_base_la_SOURCES = \
cindex.h \
cdpid.h \
csegment.hpp \
csegment.cpp
csegment.cpp \
cbuffer.hpp \
cbuffer.cpp
@@ -82,7 +84,8 @@ library_include_HEADERS= \
caddrinfos.h \
cindex.h \
cdpid.h \
csegment.hpp
csegment.hpp \
cbuffer.hpp
@@ -0,0 +1,27 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
#include "cbuffer.hpp"
using namespace rofl;
cbuffer::cbuffer(size_t len) : cmemory(len), wbytes(0), rbytes(0) {}
cbuffer::cbuffer(uint8_t *data, size_t datalen)
: cmemory(data, datalen), wbytes(datalen), rbytes(0) {}
cbuffer::cbuffer(const cbuffer &b) { *this = b; }
cbuffer::~cbuffer() {}
cbuffer &cbuffer::operator=(const cbuffer &b) {
if (this == &b)
return *this;
cmemory::operator=(b);
this->wbytes = b.wbytes;
this->rbytes = b.rbytes;
return *this;
}
Oops, something went wrong.

0 comments on commit 0f67ee9

Please sign in to comment.