Skip to content
Browse files

THRIFT-1500: d programming language support

Client: D
Patch: David Nadlinger

D program language library and additions



git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1304085 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent f485e2b commit b95b0ffa720ffdebd829861ed420d10deab6c852 @jfarrell jfarrell committed Mar 22, 2012
Sorry, we could not display the entire diff because it was too big.
View
124 aclocal/ax_check_openssl.m4
@@ -0,0 +1,124 @@
+# ===========================================================================
+# http://www.gnu.org/software/autoconf-archive/ax_check_openssl.html
+# ===========================================================================
+#
+# SYNOPSIS
+#
+# AX_CHECK_OPENSSL([action-if-found[, action-if-not-found]])
+#
+# DESCRIPTION
+#
+# Look for OpenSSL in a number of default spots, or in a user-selected
+# spot (via --with-openssl). Sets
+#
+# OPENSSL_INCLUDES to the include directives required
+# OPENSSL_LIBS to the -l directives required
+# OPENSSL_LDFLAGS to the -L or -R flags required
+#
+# and calls ACTION-IF-FOUND or ACTION-IF-NOT-FOUND appropriately
+#
+# This macro sets OPENSSL_INCLUDES such that source files should use the
+# openssl/ directory in include directives:
+#
+# #include <openssl/hmac.h>
+#
+# LICENSE
+#
+# Copyright (c) 2009,2010 Zmanda Inc. <http://www.zmanda.com/>
+# Copyright (c) 2009,2010 Dustin J. Mitchell <dustin@zmanda.com>
+#
+# Copying and distribution of this file, with or without modification, are
+# permitted in any medium without royalty provided the copyright notice
+# and this notice are preserved. This file is offered as-is, without any
+# warranty.
+
+#serial 8
+
+AU_ALIAS([CHECK_SSL], [AX_CHECK_OPENSSL])
+AC_DEFUN([AX_CHECK_OPENSSL], [
+ found=false
+ AC_ARG_WITH([openssl],
+ [AS_HELP_STRING([--with-openssl=DIR],
+ [root of the OpenSSL directory])],
+ [
+ case "$withval" in
+ "" | y | ye | yes | n | no)
+ AC_MSG_ERROR([Invalid --with-openssl value])
+ ;;
+ *) ssldirs="$withval"
+ ;;
+ esac
+ ], [
+ # if pkg-config is installed and openssl has installed a .pc file,
+ # then use that information and don't search ssldirs
+ AC_PATH_PROG([PKG_CONFIG], [pkg-config])
+ if test x"$PKG_CONFIG" != x""; then
+ OPENSSL_LDFLAGS=`$PKG_CONFIG openssl --libs-only-L 2>/dev/null`
+ if test $? = 0; then
+ OPENSSL_LIBS=`$PKG_CONFIG openssl --libs-only-l 2>/dev/null`
+ OPENSSL_INCLUDES=`$PKG_CONFIG openssl --cflags-only-I 2>/dev/null`
+ found=true
+ fi
+ fi
+
+ # no such luck; use some default ssldirs
+ if ! $found; then
+ ssldirs="/usr/local/ssl /usr/lib/ssl /usr/ssl /usr/pkg /usr/local /usr"
+ fi
+ ]
+ )
+
+
+ # note that we #include <openssl/foo.h>, so the OpenSSL headers have to be in
+ # an 'openssl' subdirectory
+
+ if ! $found; then
+ OPENSSL_INCLUDES=
+ for ssldir in $ssldirs; do
+ AC_MSG_CHECKING([for openssl/ssl.h in $ssldir])
+ if test -f "$ssldir/include/openssl/ssl.h"; then
+ OPENSSL_INCLUDES="-I$ssldir/include"
+ OPENSSL_LDFLAGS="-L$ssldir/lib"
+ OPENSSL_LIBS="-lssl -lcrypto"
+ found=true
+ AC_MSG_RESULT([yes])
+ break
+ else
+ AC_MSG_RESULT([no])
+ fi
+ done
+
+ # if the file wasn't found, well, go ahead and try the link anyway -- maybe
+ # it will just work!
+ fi
+
+ # try the preprocessor and linker with our new flags,
+ # being careful not to pollute the global LIBS, LDFLAGS, and CPPFLAGS
+
+ AC_MSG_CHECKING([whether compiling and linking against OpenSSL works])
+ echo "Trying link with OPENSSL_LDFLAGS=$OPENSSL_LDFLAGS;" \
+ "OPENSSL_LIBS=$OPENSSL_LIBS; OPENSSL_INCLUDES=$OPENSSL_INCLUDES" >&AS_MESSAGE_LOG_FD
+
+ save_LIBS="$LIBS"
+ save_LDFLAGS="$LDFLAGS"
+ save_CPPFLAGS="$CPPFLAGS"
+ LDFLAGS="$LDFLAGS $OPENSSL_LDFLAGS"
+ LIBS="$OPENSSL_LIBS $LIBS"
+ CPPFLAGS="$OPENSSL_INCLUDES $CPPFLAGS"
+ AC_LINK_IFELSE(
+ [AC_LANG_PROGRAM([#include <openssl/ssl.h>], [SSL_new(NULL)])],
+ [
+ AC_MSG_RESULT([yes])
+ $1
+ ], [
+ AC_MSG_RESULT([no])
+ $2
+ ])
+ CPPFLAGS="$save_CPPFLAGS"
+ LDFLAGS="$save_LDFLAGS"
+ LIBS="$save_LIBS"
+
+ AC_SUBST([OPENSSL_INCLUDES])
+ AC_SUBST([OPENSSL_LIBS])
+ AC_SUBST([OPENSSL_LDFLAGS])
+])
View
107 aclocal/ax_dmd.m4
@@ -0,0 +1,107 @@
+dnl @synopsis AX_DMD
+dnl
+dnl Test for the presence of a DMD-compatible D2 compiler, and (optionally)
+dnl specified modules on the import path.
+dnl
+dnl If "DMD" is defined in the environment, that will be the only
+dnl dmd command tested. Otherwise, a hard-coded list will be used.
+dnl
+dnl After AX_DMD runs, the shell variables "success" and "ax_dmd" are set to
+dnl "yes" or "no", and "DMD" is set to the appropriate command. Furthermore,
+dnl "dmd_optlink" will be set to "yes" or "no" depending on whether OPTLINK is
+dnl used as the linker (DMD/Windows), and "dmd_of_dirsep" will be set to the
+dnl directory separator to use when passing -of to DMD (OPTLINK requires a
+dnl backslash).
+dnl
+dnl AX_CHECK_D_MODULE must be run after AX_DMD. It tests for the presence of a
+dnl module in the import path of the chosen compiler, and sets the shell
+dnl variable "success" to "yes" or "no".
+dnl
+dnl @category D
+dnl @version 2011-05-31
+dnl @license AllPermissive
+dnl
+dnl Copyright (C) 2009 David Reiss
+dnl Copyright (C) 2011 David Nadlinger
+dnl Copying and distribution of this file, with or without modification,
+dnl are permitted in any medium without royalty provided the copyright
+dnl notice and this notice are preserved.
+
+
+AC_DEFUN([AX_DMD],
+ [
+ dnl Hard-coded default commands to test.
+ DMD_PROGS="dmd,gdmd,ldmd"
+
+ dnl Allow the user to specify an alternative.
+ if test -n "$DMD" ; then
+ DMD_PROGS="$DMD"
+ fi
+
+ AC_MSG_CHECKING(for DMD)
+
+ # std.algorithm as a quick way to check for D2/Phobos.
+ echo "import std.algorithm; void main() {}" > configtest_ax_dmd.d
+ success=no
+ oIFS="$IFS"
+
+ IFS=","
+ for DMD in $DMD_PROGS ; do
+ IFS="$oIFS"
+
+ echo "Running \"$DMD configtest_ax_dmd.d\"" >&AS_MESSAGE_LOG_FD
+ if $DMD configtest_ax_dmd.d >&AS_MESSAGE_LOG_FD 2>&1 ; then
+ success=yes
+ break
+ fi
+ done
+
+ if test "$success" != "yes" ; then
+ AC_MSG_RESULT(no)
+ DMD=""
+ else
+ AC_MSG_RESULT(yes)
+ fi
+
+ ax_dmd="$success"
+
+ # Test whether OPTLINK is used by trying if DMD accepts -L/? without
+ # erroring out.
+ if test "$success" == "yes" ; then
+ AC_MSG_CHECKING(whether DMD uses OPTLINK)
+ echo "Running \”$DMD -L/? configtest_ax_dmd.d\"" >&AS_MESSAGE_LOG_FD
+ if $DMD -L/? configtest_ax_dmd.d >&AS_MESSAGE_LOG_FD 2>&1 ; then
+ AC_MSG_RESULT(yes)
+ dmd_optlink="yes"
+
+ # This actually produces double slashes in the final configure
+ # output, but at least it works.
+ dmd_of_dirsep="\\\\"
+ else
+ AC_MSG_RESULT(no)
+ dmd_optlink="no"
+ dmd_of_dirsep="/"
+ fi
+ fi
+
+ rm -f configtest_ax_dmd*
+ ])
+
+
+AC_DEFUN([AX_CHECK_D_MODULE],
+ [
+ AC_MSG_CHECKING(for D module [$1])
+
+ echo "import $1; void main() {}" > configtest_ax_dmd.d
+
+ echo "Running \"$DMD configtest_ax_dmd.d\"" >&AS_MESSAGE_LOG_FD
+ if $DMD -c configtest_ax_dmd.d >&AS_MESSAGE_LOG_FD 2>&1 ; then
+ AC_MSG_RESULT(yes)
+ success=yes
+ else
+ AC_MSG_RESULT(no)
+ success=no
+ fi
+
+ rm -f configtest_ax_dmd*
+ ])
View
3 compiler/cpp/Makefile.am
@@ -83,7 +83,8 @@ thrift_SOURCES += src/generate/t_c_glib_generator.cc \
src/generate/t_js_generator.cc \
src/generate/t_javame_generator.cc \
src/generate/t_delphi_generator.cc \
- src/generate/t_go_generator.cc
+ src/generate/t_go_generator.cc \
+ src/generate/t_d_generator.cc
thrift_CPPFLAGS = -I$(srcdir)/src
thrift_CXXFLAGS = -Wall
View
772 compiler/cpp/src/generate/t_d_generator.cc
@@ -0,0 +1,772 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * Contains some contributions under the Thrift Software License.
+ * Please see doc/old-thrift-license.txt in the Thrift distribution for
+ * details.
+ */
+
+#include <cassert>
+
+#include <fstream>
+#include <iostream>
+#include <set>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <sys/stat.h>
+
+#include "platform.h"
+#include "t_oop_generator.h"
+using namespace std;
+
+
+/**
+ * D code generator.
+ *
+ * generate_*() functions are called by the base class to emit code for the
+ * given entity, print_*() functions write a piece of code to the passed
+ * stream, and render_*() return a string containing the D representation of
+ * the passed entity.
+ */
+class t_d_generator : public t_oop_generator {
+ public:
+ t_d_generator(
+ t_program* program,
+ const std::map<string, string>& parsed_options,
+ const string& option_string)
+ : t_oop_generator(program)
+ {
+ (void) parsed_options;
+ (void) option_string;
+ out_dir_base_ = "gen-d";
+ }
+
+ protected:
+ virtual void init_generator() {
+ // Make output directory
+ MKDIR(get_out_dir().c_str());
+
+ string dir = program_->get_namespace("d");
+ string subdir = get_out_dir();
+ string::size_type loc;
+ while ((loc = dir.find(".")) != string::npos) {
+ subdir = subdir + "/" + dir.substr(0, loc);
+ MKDIR(subdir.c_str());
+ dir = dir.substr(loc+1);
+ }
+ if (!dir.empty()) {
+ subdir = subdir + "/" + dir;
+ MKDIR(subdir.c_str());
+ }
+
+ package_dir_ = subdir + "/";
+
+ // Make output file
+ string f_types_name = package_dir_ + program_name_ + "_types.d";
+ f_types_.open(f_types_name.c_str());
+
+ // Print header
+ f_types_ <<
+ autogen_comment() <<
+ "module " << render_package(*program_) << program_name_ << "_types;" << endl <<
+ endl;
+
+ print_default_imports(f_types_);
+
+ // Include type modules from other imported programs.
+ const vector<t_program*>& includes = program_->get_includes();
+ for (size_t i = 0; i < includes.size(); ++i) {
+ f_types_ <<
+ "import " << render_package(*(includes[i])) <<
+ includes[i]->get_name() << "_types;" << endl;
+ }
+ if (!includes.empty()) f_types_ << endl;
+ }
+
+ virtual void close_generator() {
+ // Close output file
+ f_types_.close();
+ }
+
+ virtual void generate_consts(std::vector<t_const*> consts) {
+ if (!consts.empty()) {
+ string f_consts_name = package_dir_+program_name_+"_constants.d";
+ ofstream f_consts;
+ f_consts.open(f_consts_name.c_str());
+
+ f_consts <<
+ autogen_comment() <<
+ "module " << render_package(*program_) << program_name_ << "_constants;" << endl
+ << endl;
+
+ print_default_imports(f_consts);
+
+ f_consts <<
+ "import " << render_package(*get_program()) << program_name_ << "_types;" << endl <<
+ endl;
+
+ vector<t_const*>::iterator c_iter;
+ for (c_iter = consts.begin(); c_iter != consts.end(); ++c_iter) {
+ string name = (*c_iter)->get_name();
+ t_type* type = (*c_iter)->get_type();
+ indent(f_consts) << "immutable(" << render_type_name(type) << ") " <<
+ name << ";" << endl;
+ }
+
+ f_consts <<
+ endl <<
+ "static this() {" << endl;
+ indent_up();
+
+ bool first = true;
+ for (c_iter = consts.begin(); c_iter != consts.end(); ++c_iter) {
+ if (first) {
+ first = false;
+ } else {
+ f_consts << endl;
+ }
+ t_type* type = (*c_iter)->get_type();
+ indent(f_consts) << (*c_iter)->get_name() << " = ";
+ if (!is_immutable_type(type)) {
+ f_consts << "cast(immutable(" << render_type_name(type) << ")) ";
+ }
+ f_consts <<
+ render_const_value(type, (*c_iter)->get_value()) << ";" << endl;
+ }
+ indent_down();
+ indent(f_consts) <<
+ "}" << endl;
+ }
+ }
+
+ virtual void generate_typedef(t_typedef* ttypedef) {
+ f_types_ <<
+ indent() << "alias " << render_type_name(ttypedef->get_type()) << " " <<
+ ttypedef->get_symbolic() << ";" << endl << endl;
+ }
+
+ virtual void generate_enum(t_enum* tenum) {
+ vector<t_enum_value*> constants = tenum->get_constants();
+
+ string enum_name = tenum->get_name();
+ f_types_ <<
+ indent() << "enum " << enum_name << " {" << endl;
+
+ indent_up();
+
+ vector<t_enum_value*>::const_iterator c_iter;
+ bool first = true;
+ for (c_iter = constants.begin(); c_iter != constants.end(); ++c_iter) {
+ if (first) {
+ first = false;
+ } else {
+ f_types_ << "," << endl;
+ }
+ indent(f_types_) << (*c_iter)->get_name();
+ if ((*c_iter)->has_value()) {
+ f_types_ << " = " << (*c_iter)->get_value();
+ }
+ }
+
+ f_types_ << endl;
+ indent_down();
+ indent(f_types_) << "}" << endl;
+
+ f_types_ << endl;
+ }
+
+ virtual void generate_struct(t_struct* tstruct) {
+ print_struct_definition(f_types_, tstruct, false);
+ }
+
+ virtual void generate_xception(t_struct* txception) {
+ print_struct_definition(f_types_, txception, true);
+ }
+
+ virtual void generate_service(t_service* tservice) {
+ string svc_name = tservice->get_name();
+
+ // Service implementation file includes
+ string f_servicename = package_dir_ + svc_name + ".d";
+ std::ofstream f_service;
+ f_service.open(f_servicename.c_str());
+ f_service <<
+ autogen_comment() <<
+ "module " << render_package(*program_) << svc_name << ";" << endl <<
+ endl;
+
+ print_default_imports(f_service);
+
+ f_service << "import " << render_package(*get_program()) << program_name_ <<
+ "_types;" << endl;
+
+ t_service* extends_service = tservice->get_extends();
+ if (extends_service != NULL) {
+ f_service <<
+ "import " << render_package(*(extends_service->get_program())) <<
+ extends_service->get_name() << ";" << endl;
+ }
+
+ f_service << endl;
+
+ string extends = "";
+ if (tservice->get_extends() != NULL) {
+ extends = " : " + render_type_name(tservice->get_extends());
+ }
+
+ f_service <<
+ indent() << "interface " << svc_name << extends << " {" << endl;
+ indent_up();
+
+ // Collect all the exception types service methods can throw so we can
+ // emit the necessary aliases later.
+ set<t_type*> exception_types;
+
+ // Print the method signatures.
+ vector<t_function*> functions = tservice->get_functions();
+ vector<t_function*>::iterator fn_iter;
+ for (fn_iter = functions.begin(); fn_iter != functions.end(); ++fn_iter) {
+ f_service << indent();
+ print_function_signature(f_service, *fn_iter);
+ f_service << ";" << endl;
+
+ const vector<t_field*>& exceptions = (*fn_iter)->get_xceptions()->get_members();
+ vector<t_field*>::const_iterator ex_iter;
+ for (ex_iter = exceptions.begin(); ex_iter != exceptions.end(); ++ex_iter) {
+ exception_types.insert((*ex_iter)->get_type());
+ }
+ }
+
+ // Alias the exception types into the current scope.
+ if (!exception_types.empty()) f_service << endl;
+ set<t_type*>::const_iterator et_iter;
+ for (et_iter = exception_types.begin(); et_iter != exception_types.end(); ++et_iter) {
+ indent(f_service) << "alias " << render_package(*(*et_iter)->get_program()) <<
+ (*et_iter)->get_program()->get_name() << "_types" << "." <<
+ (*et_iter)->get_name() << " " << (*et_iter)->get_name() << ";" << endl;
+ }
+
+ // Write the method metadata.
+ ostringstream meta;
+ indent_up();
+ bool first = true;
+ for (fn_iter = functions.begin(); fn_iter != functions.end(); ++fn_iter) {
+ if ((*fn_iter)->get_arglist()->get_members().empty() &&
+ (*fn_iter)->get_xceptions()->get_members().empty() &&
+ !(*fn_iter)->is_oneway()) {
+ continue;
+ }
+
+ if (first) {
+ first = false;
+ } else {
+ meta << ",";
+ }
+
+ meta << endl <<
+ indent() << "TMethodMeta(`" << (*fn_iter)->get_name() << "`, " << endl;
+ indent_up();
+ indent(meta) << "[";
+
+ bool first = true;
+ const vector<t_field*> &params = (*fn_iter)->get_arglist()->get_members();
+ vector<t_field*>::const_iterator p_iter;
+ for (p_iter = params.begin(); p_iter != params.end(); ++p_iter) {
+ if (first) {
+ first = false;
+ } else {
+ meta << ", ";
+ }
+
+ meta << "TParamMeta(`" << (*p_iter)->get_name() << "`, " << (*p_iter)->get_key();
+
+ t_const_value* cv = (*p_iter)->get_value();
+ if (cv != NULL) {
+ meta << ", q{" << render_const_value((*p_iter)->get_type(), cv) << "}";
+ }
+ meta << ")";
+ }
+
+ meta << "]";
+
+ if (!(*fn_iter)->get_xceptions()->get_members().empty() ||
+ (*fn_iter)->is_oneway()) {
+ meta << "," << endl <<
+ indent() << "[";
+
+ bool first = true;
+ const vector<t_field*>& exceptions =
+ (*fn_iter)->get_xceptions()->get_members();
+ vector<t_field*>::const_iterator ex_iter;
+ for (ex_iter = exceptions.begin(); ex_iter != exceptions.end(); ++ex_iter) {
+ if (first) {
+ first = false;
+ } else {
+ meta << ", ";
+ }
+
+ meta << "TExceptionMeta(`" << (*ex_iter)->get_name() <<
+ "`, " << (*ex_iter)->get_key() << ", `" <<
+ (*ex_iter)->get_type()->get_name() << "`)";
+ }
+
+ meta << "]";
+ }
+
+ if ((*fn_iter)->is_oneway()) {
+ meta << "," << endl <<
+ indent() << "TMethodType.ONEWAY";
+ }
+
+ indent_down();
+ meta << endl <<
+ indent() << ")";
+ }
+ indent_down();
+
+ string meta_str(meta.str());
+ if (!meta_str.empty()) {
+ f_service << endl <<
+ indent() << "enum methodMeta = [" << meta_str << endl <<
+ indent() << "];" << endl;
+ }
+
+ indent_down();
+ indent(f_service) << "}" << endl;
+
+
+ // Server skeleton generation.
+ string f_skeletonname = package_dir_ + svc_name + "_server.skeleton.d";
+ std::ofstream f_skeleton;
+ f_skeleton.open(f_skeletonname.c_str());
+ print_server_skeleton(f_skeleton, tservice);
+ f_skeleton.close();
+ }
+
+ private:
+ /**
+ * Writes a server skeleton for the passed service to out.
+ */
+ void print_server_skeleton(ostream &out, t_service* tservice) {
+ string svc_name = tservice->get_name();
+
+ out <<
+ "/*" << endl <<
+ " * This auto-generated skeleton file illustrates how to build a server. If you" << endl <<
+ " * intend to customize it, you should edit a copy with another file name to " << endl <<
+ " * avoid overwriting it when running the generator again." << endl <<
+ " */" << endl <<
+ "module " << render_package(*tservice->get_program()) << svc_name << "_server;" << endl <<
+ endl <<
+ "import std.stdio;" << endl <<
+ "import thrift.codegen.processor;" << endl <<
+ "import thrift.protocol.binary;" << endl <<
+ "import thrift.server.simple;" << endl <<
+ "import thrift.server.transport.socket;" << endl <<
+ "import thrift.transport.buffered;" << endl <<
+ "import thrift.util.hashset;" << endl <<
+ endl <<
+ "import " << render_package(*tservice->get_program()) << svc_name << ";" << endl <<
+ "import " << render_package(*get_program()) << program_name_ << "_types;" << endl <<
+ endl <<
+ endl <<
+ "class " << svc_name << "Handler : " << svc_name << " {" << endl;
+
+ indent_up();
+ out <<
+ indent() << "this() {" << endl <<
+ indent() << " // Your initialization goes here." << endl <<
+ indent() << "}" << endl <<
+ endl;
+
+ vector<t_function*> functions = tservice->get_functions();
+ vector<t_function*>::iterator f_iter;
+ for (f_iter = functions.begin(); f_iter != functions.end(); ++f_iter) {
+ out << indent();
+ print_function_signature(out, *f_iter);
+ out << " {" << endl;
+
+ indent_up();
+
+ out <<
+ indent() << "// Your implementation goes here." << endl <<
+ indent() << "writeln(\"" << (*f_iter)->get_name() << " called\");" << endl;
+
+ t_base_type* rt = (t_base_type*)(*f_iter)->get_returntype();
+ if (rt->get_base() != t_base_type::TYPE_VOID) {
+ indent(out) << "return typeof(return).init;" << endl;
+ }
+
+ indent_down();
+
+ out <<
+ indent() << "}" << endl <<
+ endl;
+ }
+
+ indent_down();
+ out <<
+ "}" << endl <<
+ endl;
+
+ out <<
+ indent() << "void main() {" << endl;
+ indent_up();
+ out <<
+ indent() << "auto protocolFactory = new TBinaryProtocolFactory!();" << endl <<
+ indent() << "auto processor = new TServiceProcessor!" << svc_name << "(new " << svc_name << "Handler);" << endl <<
+ indent() << "auto serverTransport = new TServerSocket(9090);" << endl <<
+ indent() << "auto transportFactory = new TBufferedTransportFactory;" << endl <<
+
+ indent() << "auto server = new TSimpleServer(" << endl <<
+ indent() << " processor, serverTransport, transportFactory, protocolFactory);" << endl <<
+ indent() << "server.serve();" << endl;
+ indent_down();
+ out <<
+ "}" << endl;
+ }
+
+ /**
+ * Writes the definition of a struct or an exception type to out.
+ */
+ void print_struct_definition(ostream& out, t_struct* tstruct, bool is_exception) {
+ const vector<t_field*>& members = tstruct->get_members();
+
+ if (is_exception) {
+ indent(out) << "class " << tstruct->get_name() << " : TException {" << endl;
+ } else {
+ indent(out) << "struct " << tstruct->get_name() << " {" << endl;
+ }
+ indent_up();
+
+ // Declare all fields.
+ vector<t_field*>::const_iterator m_iter;
+ for (m_iter = members.begin(); m_iter != members.end(); ++m_iter) {
+ indent(out) << render_type_name((*m_iter)->get_type()) << " " <<
+ (*m_iter)->get_name() << ";" << endl;
+ }
+
+ if (!members.empty()) indent(out) << endl;
+ indent(out) << "mixin TStructHelpers!(";
+
+ if (!members.empty()) {
+ // If there are any fields, construct the TFieldMeta array to pass to
+ // TStructHelpers. We can't just pass an empty array if not because []
+ // doesn't pass the TFieldMeta[] constraint.
+ out << "[";
+ indent_up();
+
+ bool first = true;
+ vector<t_field*>::const_iterator m_iter;
+ for (m_iter = members.begin(); m_iter != members.end(); ++m_iter) {
+ if (first) {
+ first = false;
+ } else {
+ out << ",";
+ }
+ out << endl;
+
+ indent(out) << "TFieldMeta(`" << (*m_iter)->get_name() << "`, " <<
+ (*m_iter)->get_key();
+
+ t_const_value* cv = (*m_iter)->get_value();
+ t_field::e_req req = (*m_iter)->get_req();
+ out << ", " << render_req(req);
+ if (cv != NULL) {
+ out << ", q{" << render_const_value((*m_iter)->get_type(), cv) << "}";
+ }
+ out << ")";
+ }
+
+ indent_down();
+ out << endl << indent() << "]";
+ }
+
+ out << ");" << endl;
+
+ indent_down();
+ indent(out) <<
+ "}" << endl <<
+ endl;
+ }
+
+ /**
+ * Prints the D function signature (including return type) for the given
+ * method.
+ */
+ void print_function_signature(ostream& out, t_function* fn) {
+ out << render_type_name(fn->get_returntype()) <<
+ " " << fn->get_name() << "(";
+
+ const vector<t_field*>& fields = fn->get_arglist()->get_members();
+ vector<t_field*>::const_iterator f_iter;
+ bool first = true;
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if (first) {
+ first = false;
+ } else {
+ out << ", ";
+ }
+ out << render_type_name((*f_iter)->get_type(), true) << " " <<
+ (*f_iter)->get_name();
+ }
+
+ out << ")";
+ }
+
+ /**
+ * Returns the D representation of value. The result is guaranteed to be a
+ * single expression; for complex types, immediately called delegate
+ * literals are used to achieve this.
+ */
+ string render_const_value(t_type* type, t_const_value* value) {
+ // Resolve any typedefs.
+ type = get_true_type(type);
+
+ ostringstream out;
+ if (type->is_base_type()) {
+ t_base_type::t_base tbase = ((t_base_type*)type)->get_base();
+ switch (tbase) {
+ case t_base_type::TYPE_STRING:
+ out << '"' << get_escaped_string(value) << '"';
+ break;
+ case t_base_type::TYPE_BOOL:
+ out << ((value->get_integer() > 0) ? "true" : "false");
+ break;
+ case t_base_type::TYPE_BYTE:
+ case t_base_type::TYPE_I16:
+ out << "cast(" << render_type_name(type) << ")" << value->get_integer();
+ break;
+ case t_base_type::TYPE_I32:
+ out << value->get_integer();
+ break;
+ case t_base_type::TYPE_I64:
+ out << value->get_integer() << "L";
+ break;
+ case t_base_type::TYPE_DOUBLE:
+ if (value->get_type() == t_const_value::CV_INTEGER) {
+ out << value->get_integer();
+ } else {
+ out << value->get_double();
+ }
+ break;
+ default:
+ throw "Compiler error: No const of base type " +
+ t_base_type::t_base_name(tbase);
+ }
+ } else if (type->is_enum()) {
+ out << "cast(" << render_type_name(type) << ")" << value->get_integer();
+ } else {
+ out << "{" << endl;
+ indent_up();
+
+ indent(out) << render_type_name(type) << " v;" << endl;
+ if (type->is_struct() || type->is_xception()) {
+ indent(out) << "v = " << (type->is_xception() ? "new " : "") <<
+ render_type_name(type) << "();" << endl;
+
+ const vector<t_field*>& fields = ((t_struct*)type)->get_members();
+ vector<t_field*>::const_iterator f_iter;
+ const map<t_const_value*, t_const_value*>& val = value->get_map();
+ map<t_const_value*, t_const_value*>::const_iterator v_iter;
+ for (v_iter = val.begin(); v_iter != val.end(); ++v_iter) {
+ t_type* field_type = NULL;
+ for (f_iter = fields.begin(); f_iter != fields.end(); ++f_iter) {
+ if ((*f_iter)->get_name() == v_iter->first->get_string()) {
+ field_type = (*f_iter)->get_type();
+ }
+ }
+ if (field_type == NULL) {
+ throw "Type error: " + type->get_name() + " has no field " +
+ v_iter->first->get_string();
+ }
+ string val = render_const_value(field_type, v_iter->second);
+ indent(out) << "v.set!`" << v_iter->first->get_string() <<
+ "`(" << val << ");" << endl;
+ }
+ } else if (type->is_map()) {
+ t_type* ktype = ((t_map*)type)->get_key_type();
+ t_type* vtype = ((t_map*)type)->get_val_type();
+ const map<t_const_value*, t_const_value*>& val = value->get_map();
+ map<t_const_value*, t_const_value*>::const_iterator v_iter;
+ for (v_iter = val.begin(); v_iter != val.end(); ++v_iter) {
+ string key = render_const_value(ktype, v_iter->first);
+ string val = render_const_value(vtype, v_iter->second);
+ indent(out) << "v[";
+ if (!is_immutable_type(ktype)) {
+ out << "cast(immutable(" << render_type_name(ktype) << "))";
+ }
+ out << key << "] = " << val << ";" << endl;
+ }
+ } else if (type->is_list()) {
+ t_type* etype = ((t_list*)type)->get_elem_type();
+ const vector<t_const_value*>& val = value->get_list();
+ vector<t_const_value*>::const_iterator v_iter;
+ for (v_iter = val.begin(); v_iter != val.end(); ++v_iter) {
+ string val = render_const_value(etype, *v_iter);
+ indent(out) << "v ~= " << val << ";" << endl;
+ }
+ } else if (type->is_set()) {
+ t_type* etype = ((t_set*)type)->get_elem_type();
+ const vector<t_const_value*>& val = value->get_list();
+ vector<t_const_value*>::const_iterator v_iter;
+ for (v_iter = val.begin(); v_iter != val.end(); ++v_iter) {
+ string val = render_const_value(etype, *v_iter);
+ indent(out) << "v ~= " << val << ";" << endl;
+ }
+ } else {
+ throw "Compiler error: Invalid type in render_const_value: " +
+ type->get_name();
+ }
+ indent(out) << "return v;" << endl;
+
+ indent_down();
+ indent(out) << "}()";
+ }
+
+ return out.str();
+ }
+
+ /**
+ * Returns the D package to which modules for program are written (with a
+ * trailing dot, if not empty).
+ */
+ string render_package(const t_program& program) const {
+ string package = program.get_namespace("d");
+ if (package.size() == 0) return "";
+ return package + ".";
+ }
+
+ /**
+ * Returns the name of the D repesentation of ttype.
+ *
+ * If isArg is true, a const reference to the type will be returned for
+ * structs.
+ */
+ string render_type_name(const t_type* ttype, bool isArg = false) const {
+ if (ttype->is_base_type()) {
+ t_base_type::t_base tbase = ((t_base_type*)ttype)->get_base();
+ switch (tbase) {
+ case t_base_type::TYPE_VOID:
+ return "void";
+ case t_base_type::TYPE_STRING:
+ return "string";
+ case t_base_type::TYPE_BOOL:
+ return "bool";
+ case t_base_type::TYPE_BYTE:
+ return "byte";
+ case t_base_type::TYPE_I16:
+ return "short";
+ case t_base_type::TYPE_I32:
+ return "int";
+ case t_base_type::TYPE_I64:
+ return "long";
+ case t_base_type::TYPE_DOUBLE:
+ return "double";
+ default:
+ throw "Compiler error: No D type name for base type " +
+ t_base_type::t_base_name(tbase);
+ }
+ }
+
+ if (ttype->is_container()) {
+ t_container* tcontainer = (t_container*) ttype;
+ if (tcontainer->has_cpp_name()) {
+ return tcontainer->get_cpp_name();
+ } else if (ttype->is_map()) {
+ t_map* tmap = (t_map*) ttype;
+ t_type* ktype = tmap->get_key_type();
+
+ string name = render_type_name(tmap->get_val_type()) + "[";
+ if (!is_immutable_type(ktype)) {
+ name += "immutable(";
+ }
+ name += render_type_name(ktype);
+ if (!is_immutable_type(ktype)) {
+ name += ")";
+ }
+ name += "]";
+ return name;
+ } else if (ttype->is_set()) {
+ t_set* tset = (t_set*) ttype;
+ return "HashSet!(" + render_type_name(tset->get_elem_type()) + ")";
+ } else if (ttype->is_list()) {
+ t_list* tlist = (t_list*) ttype;
+ return render_type_name(tlist->get_elem_type()) + "[]";
+ }
+ }
+
+ if (ttype->is_struct() && isArg) {
+ return "ref const(" + ttype->get_name() + ")";
+ } else {
+ return ttype->get_name();
+ }
+ }
+
+ /**
+ * Returns the D TReq enum member corresponding to req.
+ */
+ string render_req(t_field::e_req req) const {
+ switch (req) {
+ case t_field::T_OPT_IN_REQ_OUT:
+ return "TReq.OPT_IN_REQ_OUT";
+ case t_field::T_OPTIONAL:
+ return "TReq.OPTIONAL";
+ case t_field::T_REQUIRED:
+ return "TReq.REQUIRED";
+ default:
+ throw "Compiler error: Invalid requirement level: " + req;
+ }
+ }
+
+ /**
+ * Writes the default list of imports (which are written to every generated
+ * module) to f.
+ */
+ void print_default_imports(ostream& out) {
+ indent(out) <<
+ "import thrift.base;" << endl <<
+ "import thrift.codegen.base;" << endl <<
+ "import thrift.util.hashset;" << endl <<
+ endl;
+ }
+
+ /**
+ * Returns whether type is »intrinsically immutable«, in the sense that
+ * a value of that type is implicitly castable to immutable(type), and it is
+ * allowed for AA keys without an immutable() qualifier.
+ */
+ bool is_immutable_type(t_type* type) const {
+ t_type* ttype = get_true_type(type);
+ return ttype->is_base_type() || ttype->is_enum();
+ }
+
+ /*
+ * File streams, stored here to avoid passing them as parameters to every
+ * function.
+ */
+ ofstream f_types_;
+ ofstream f_header_;
+
+ string package_dir_;
+};
+
+THRIFT_REGISTER_GENERATOR(d, "D", "")
+
View
104 configure.ac
@@ -89,6 +89,13 @@ AC_ARG_VAR([GOBIN], [Binary directory for Go.
Default = "/usr/local/bin"])
AS_IF([test "x$GOBIN" = x], [GOBIN="/usr/local/bin"])
+AC_ARG_VAR([D_IMPORT_PREFIX], [Prefix for installing D modules.
+ [INCLUDEDIR/d2]])
+AS_IF([test "x$D_IMPORT_PREFIX" = x], [D_IMPORT_PREFIX="${includedir}/d2"])
+
+AC_ARG_VAR([DMD_LIBEVENT_FLAGS], [DMD flags for linking libevent (auto-detected if not set).])
+AC_ARG_VAR([DMD_OPENSSL_FLAGS], [DMD flags for linking OpenSSL (auto-detected if not set).])
+
AC_PROG_CC
AC_PROG_CPP
AC_PROG_CXX
@@ -287,6 +294,94 @@ if test "$with_go" = "yes"; then
fi
AM_CONDITIONAL(WITH_GO, [test "$have_go" = "yes"])
+
+AX_THRIFT_LIB(d, [D], yes)
+if test "$with_d" = "yes"; then
+ AX_DMD
+ AC_SUBST(DMD)
+ if test "x$DMD" != "x"; then
+ have_d="yes"
+ fi
+fi
+
+# Determine actual name of the generated D library for use in the command line
+# when compiling tests. This is needed because the -l<lib> syntax doesn't work
+# with OPTLINK (Windows).
+lib_prefix=lib
+lib_suffix=a
+case "$host_os" in
+ cygwin* | mingw* | pw32* | cegcc*)
+ lib_prefix=""
+ lib_suffix=lib
+ ;;
+esac
+D_LIB_NAME="${lib_prefix}thriftd.${lib_suffix}"
+AC_SUBST(D_LIB_NAME)
+D_EVENT_LIB_NAME="${lib_prefix}thriftd-event.${lib_suffix}"
+AC_SUBST(D_EVENT_LIB_NAME)
+D_SSL_LIB_NAME="${lib_prefix}thriftd-ssl.${lib_suffix}"
+AC_SUBST(D_SSL_LIB_NAME)
+
+if test "$have_d" = "yes"; then
+ AX_CHECK_D_MODULE(deimos.event2.event)
+ have_deimos_event2=$success
+
+ with_d_event_tests="no"
+ if test "$have_deimos_event2" = "yes"; then
+ if test "x$DMD_LIBEVENT_FLAGS" = "x"; then
+ if test "$dmd_optlink" = "yes"; then
+ AC_MSG_WARN([D libevent interface found, but cannot auto-detect \
+linker flags for OPTLINK. Please set DMD_LIBEVENT_FLAGS manually.])
+ else
+ AX_LIB_EVENT([2.0])
+ if test "$success" = "yes"; then
+ DMD_LIBEVENT_FLAGS=$(echo "$LIBEVENT_LDFLAGS $LIBEVENT_LIBS" | \
+ sed -e 's/^ *//g;s/ *$//g;s/^\(.\)/-L\1/g;s/ */ -L/g')
+ with_d_event_tests="yes"
+ else
+ AC_MSG_WARN([D libevent interface present, but libevent library not found.])
+ fi
+ fi
+ else
+ with_d_event_tests="yes"
+ fi
+ fi
+
+ AX_CHECK_D_MODULE(deimos.openssl.ssl)
+ have_deimos_openssl=$success
+
+ with_d_ssl_tests="no"
+ if test "$have_deimos_openssl" = "yes"; then
+ if test "x$DMD_OPENSSL_FLAGS" = "x"; then
+ if test "$dmd_optlink" = "yes"; then
+ AC_MSG_WARN([D OpenSSL interface found, but cannot auto-detect \
+linker flags for OPTLINK. Please set DMD_OPENSSL_FLAGS manually.])
+ else
+ AX_CHECK_OPENSSL([with_d_ssl_tests="yes"])
+ if test "$with_d_ssl_tests" = "yes"; then
+ DMD_OPENSSL_FLAGS=$(echo "$OPENSSL_LDFLAGS $OPENSSL_LIBS" | \
+ sed -e 's/^ *//g;s/ *$//g;s/^\(.\)/-L\1/g;s/ */ -L/g')
+ else
+ AC_MSG_WARN([D OpenSSL interface present, but OpenSSL library not found.])
+ fi
+ fi
+ else
+ with_d_ssl_tests="yes"
+ fi
+ fi
+fi
+
+AM_CONDITIONAL(WITH_D, [test "$have_d" = "yes"])
+AM_CONDITIONAL(DMD_OPTLINK, [test "$dmd_optlink" = "yes"])
+AC_SUBST(DMD_OF_DIRSEP, "$dmd_of_dirsep")
+AM_CONDITIONAL(HAVE_DEIMOS_EVENT2, [test "$have_deimos_event2" = "yes"])
+AM_CONDITIONAL(WITH_D_EVENT_TESTS, [test "$with_d_event_tests" = "yes"])
+AC_SUBST(DMD_LIBEVENT_FLAGS)
+AM_CONDITIONAL(HAVE_DEIMOS_OPENSSL, [test "$have_deimos_openssl" = "yes"])
+AM_CONDITIONAL(WITH_D_SSL_TESTS, [test "$with_d_ssl_tests" = "yes"])
+AC_SUBST(DMD_OPENSSL_FLAGS)
+
+
have_tests=yes
if test "$with_tests" = "no"; then
have_tests="no"
@@ -490,6 +585,8 @@ AC_CONFIG_FILES([
lib/c_glib/thrift_c_glib.pc
lib/c_glib/test/Makefile
lib/csharp/Makefile
+ lib/d/Makefile
+ lib/d/test/Makefile
lib/erl/Makefile
lib/hs/Makefile
lib/java/Makefile
@@ -530,6 +627,7 @@ echo "Building Perl Library ........ : $have_perl"
echo "Building PHP Library ......... : $have_php"
echo "Building Erlang Library ...... : $have_erlang"
echo "Building Go Library .......... : $have_go"
+echo "Building D Library ........... : $have_d"
if test "$have_cpp" = "yes" ; then
echo
echo "C++ Library:"
@@ -589,6 +687,12 @@ if test "$have_go" = "yes" ; then
echo " Using GO Compiler.......... : $GO_C"
echo " Using GO Linker............ : $GO_L"
fi
+if test "$have_d" = "yes" ; then
+ echo
+ echo "Using D Compiler ............. : $DMD"
+ echo "Building D libevent tests .... : $with_d_event_tests"
+ echo "Building D SSL tests ......... : $with_d_ssl_tests"
+fi
echo
echo "If something is missing that you think should be present,"
echo "please skim the output of configure to find the missing"
View
4 lib/Makefile.am
@@ -62,6 +62,10 @@ if WITH_PHP
SUBDIRS += php
endif
+if WITH_D
+SUBDIRS += d
+endif
+
# All of the libs that don't use Automake need to go in here
# so they will end up in our release tarballs.
EXTRA_DIST = \
View
181 lib/d/Makefile.am
@@ -0,0 +1,181 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+SUBDIRS = . test
+
+#
+# Enumeration of all the public and private modules.
+#
+# We unconditionally install all of them, even if libevent or OpenSSL are
+# not available, but build the respective libraries only if the Deimos headers
+# could be found.
+#
+d_thriftmodules = $(addprefix thrift/, base)
+d_thriftdir = $(D_IMPORT_PREFIX)/thrift
+d_thrift_DATA = $(addprefix src/, $(addsuffix .d, $(d_thriftmodules)))
+
+d_asyncmodules = $(addprefix thrift/async/, base libevent socket ssl)
+d_asyncdir = $(d_thriftdir)/async
+d_async_DATA = $(addprefix src/, $(addsuffix .d, $(d_asyncmodules)))
+
+d_codegenmodules = $(addprefix thrift/codegen/, async_client \
+ async_client_pool base client client_pool idlgen processor)
+d_codegendir = $(d_thriftdir)/codegen
+d_codegen_DATA = $(addprefix src/, $(addsuffix .d, $(d_codegenmodules)))
+
+d_protocolmodules = $(addprefix thrift/protocol/, base binary compact json \
+ processor)
+d_protocoldir = $(d_thriftdir)/protocol
+d_protocol_DATA = $(addprefix src/, $(addsuffix .d, $(d_protocolmodules)))
+
+d_servermodules = $(addprefix thrift/server/, base simple nonblocking \
+ taskpool threaded)
+d_serverdir = $(d_thriftdir)/server
+d_server_DATA = $(addprefix src/, $(addsuffix .d, $(d_servermodules)))
+
+d_servertransportmodules = $(addprefix thrift/server/transport/, base socket ssl)
+d_servertransportdir = $(d_thriftdir)/server/transport
+d_servertransport_DATA = $(addprefix src/, $(addsuffix .d, \
+ $(d_servertransportmodules)))
+
+d_transportmodules = $(addprefix thrift/transport/, base buffered file \
+ framed http memory piped range socket ssl zlib)
+d_transportdir = $(d_thriftdir)/transport
+d_transport_DATA = $(addprefix src/, $(addsuffix .d, $(d_transportmodules)))
+
+d_utilmodules = $(addprefix thrift/util/, awaitable cancellation future \
+ hashset)
+d_utildir = $(d_thriftdir)/util
+d_util_DATA = $(addprefix src/, $(addsuffix .d, $(d_utilmodules)))
+
+d_internalmodules = $(addprefix thrift/internal/, algorithm codegen ctfe \
+ endian resource_pool socket ssl ssl_bio traits)
+d_internaldir = $(d_thriftdir)/internal
+d_internal_DATA = $(addprefix src/, $(addsuffix .d, $(d_internalmodules)))
+
+d_testmodules = $(addprefix thrift/internal/test/, protocol server)
+d_testdir = $(d_internaldir)/test
+d_test_DATA = $(addprefix src/, $(addsuffix .d, $(d_testmodules)))
+
+d_publicmodules = $(d_thriftmodules) $(d_asyncmodules) \
+ $(d_codegenmodules) $(d_protocolmodules) $(d_servermodules) \
+ $(d_servertransportmodules) $(d_transportmodules) $(d_utilmodules)
+d_publicsources = $(addprefix src/, $(addsuffix .d, $(d_publicmodules)))
+
+d_modules = $(d_publicmodules) $(d_internalmodules) $(d_testmodules)
+
+# List modules with external dependencies and remove them from the main list
+d_libevent_dependent_modules = thrift/async/libevent thrift/server/nonblocking
+d_openssl_dependent_modules = thrift/async/ssl thrift/internal/ssl \
+ thrift/internal/ssl_bio thrift/transport/ssl thrift/server/transport/ssl
+d_main_modules = $(filter-out $(d_libevent_dependent_modules) \
+ $(d_openssl_dependent_modules),$(d_modules))
+
+
+d_lib_flags = -w -wi -Isrc -lib
+all_targets =
+
+#
+# libevent-dependent modules.
+#
+if HAVE_DEIMOS_EVENT2
+$(D_EVENT_LIB_NAME): $(addprefix src/, $(addsuffix .d, $(d_libevent_dependent_modules)))
+ $(DMD) -of$(D_EVENT_LIB_NAME) $(d_lib_flags) $^
+all_targets += $(D_EVENT_LIB_NAME)
+endif
+
+#
+# OpenSSL-dependent modules.
+#
+if HAVE_DEIMOS_OPENSSL
+$(D_SSL_LIB_NAME): $(addprefix src/, $(addsuffix .d, $(d_openssl_dependent_modules)))
+ $(DMD) -of$(D_SSL_LIB_NAME) $(d_lib_flags) $^
+all_targets += $(D_SSL_LIB_NAME)
+endif
+
+#
+# Main library target.
+#
+$(D_LIB_NAME): $(addprefix src/, $(addsuffix .d, $(d_main_modules)))
+ $(DMD) -of$(D_LIB_NAME) $(d_lib_flags) $^
+all_targets += $(D_LIB_NAME)
+
+
+#
+# Documentation target (requires Dil).
+#
+docs: $(d_publicsources) src/thrift/index.d
+ dil ddoc docs -hl --kandil $^
+
+
+#
+# Hook custom library targets into the automake all/install targets.
+#
+all-local: $(all_targets)
+
+install-exec-local:
+ $(INSTALL_PROGRAM) $(all_targets) $(DESTDIR)$(libdir)
+
+
+clean-local:
+ $(RM) -rf docs $(D_LIB_NAME) $(D_EVENT_LIB_NAME) $(D_SSL_LIB_NAME) unittest
+
+
+#
+# Unit tests (built both in debug and release mode).
+#
+d_test_flags = -unittest -w -wi -I$(top_srcdir)/lib/d/src
+
+# There just must be some way to reassign a variable without warnings in
+# Automake...
+d_test_modules__ = $(d_modules)
+
+if WITH_D_EVENT_TESTS
+d_test_flags += $(DMD_LIBEVENT_FLAGS)
+d_test_modules_ = $(d_test_modules__)
+else
+d_test_modules_ = $(filter-out $(d_libevent_dependent_modules), $(d_test_modules__))
+endif
+
+if WITH_D_SSL_TESTS
+d_test_flags += $(DMD_OPENSSL_FLAGS)
+d_test_modules = $(d_test_modules_)
+else
+d_test_modules = $(filter-out $(d_openssl_dependent_modules), $(d_test_modules_))
+endif
+
+unittest/emptymain.d: unittest/.directory
+ @echo 'void main(){}' >$@
+
+unittest/.directory:
+ mkdir -p unittest || exists unittest
+ touch $@
+
+unittest/debug/%: src/%.d $(all_targets) unittest/emptymain.d
+ $(DMD) -gc -of$(subst /,$(DMD_OF_DIRSEP),$@) $(d_test_flags) $^
+
+unittest/release/%: src/%.d $(all_targets) unittest/emptymain.d
+ $(DMD) -O -release -of$(subst /,$(DMD_OF_DIRSEP),$@) $(d_test_flags) $^
+
+TESTS = $(addprefix unittest/debug/, $(d_test_modules)) \
+ $(addprefix unittest/release/, $(d_test_modules))
+
+
+EXTRA_DIST = \
+ README
View
58 lib/d/README
@@ -0,0 +1,58 @@
+Thrift D Software Library
+=========================
+
+License
+-------
+
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+
+Testing
+-------
+
+D support in Thrift is covered by two sets of tests: first,
+the unit test blocks contained in the D source files, and
+second, the more extensive testing applications in the test/
+subdirectory, which also make use of the Thrift compiler.
+Both are built when running "make check", but only the
+unit tests are immediately run, however – the separate test
+cases typically run longer or require manual intervention.
+It might also be prudent to run the independent tests,
+which typically consist of a server and a client part,
+against the other language implementations.
+
+To build the unit tests on Windows, the easiest way might
+be to manually create a file containing an empty main() and
+invoke the compiler by running the following in the src/
+directory (PowerShell syntax):
+
+dmd -ofunittest -unittest -w $(dir -r -filter '*.d' -name)
+
+If you want to run the test clients/servers in OpenSSL
+mode, you have to provide »server-private-key.pem« and
+»server-certificate.pem« files in the directory the server
+executable resides in, and a »trusted-ca-certificate.pem«
+file for the client. The easiest way is to generate a new
+self signed certificate using the provided config file
+(openssl.test.cnf):
+
+openssl req -new -x509 -nodes -config openssl.test.cnf \
+ -out server-certificate.pem
+cat server-certificate.pem > trusted-ca-certificate.pem
+
+This steps are also performed automatically by the
+Autotools build system if the files are not present.
View
228 lib/d/src/thrift/async/base.d
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Defines the interface used for client-side handling of asynchronous
+ * I/O operations, based on coroutines.
+ *
+ * The main piece of the »client side« (e.g. for TAsyncClient users) of the
+ * API is TFuture, which represents an asynchronously executed operation,
+ * which can have a return value, throw exceptions, and which can be waited
+ * upon.
+ *
+ * On the »implementation side«, the idea is that by using a TAsyncTransport
+ * instead of a normal TTransport and executing the work through a
+ * TAsyncManager, the same code as for synchronous I/O can be used for
+ * asynchronous operation as well, for example:
+ *
+ * ---
+ * auto socket = new TAsyncSocket(someTAsyncSocketManager(), host, port);
+ * // …
+ * socket.asyncManager.execute(socket, {
+ * SomeThriftStruct s;
+ *
+ * // Waiting for socket I/O will not block an entire thread but cause
+ * // the async manager to execute another task in the meantime, because
+ * // we are using TAsyncSocket instead of TSocket.
+ * s.read(socket);
+ *
+ * // Do something with s, e.g. set a TPromise result to it.
+ * writeln(s);
+ * });
+ * ---
+ */
+module thrift.async.base;
+
+import core.time : Duration, dur;
+import std.socket/+ : Socket+/; // DMD @@BUG314@@
+import thrift.base;
+import thrift.transport.base;
+import thrift.util.cancellation;
+
+/**
+ * Manages one or more asynchronous transport resources (e.g. sockets in the
+ * case of TAsyncSocketManager) and allows work items to be submitted for them.
+ *
+ * Implementations will typically run one or more background threads for
+ * executing the work, which is one of the reasons for a TAsyncManager to be
+ * used. Each work item is run in its own fiber and is expected to yield() away
+ * while waiting for time-consuming operations.
+ *
+ * The second important purpose of TAsyncManager is to serialize access to
+ * the transport resources – without taking care of that, e.g. issuing multiple
+ * RPC calls over the same connection in rapid succession would likely lead to
+ * more than one request being written at the same time, causing only garbage
+ * to arrive at the remote end.
+ *
+ * All methods are thread-safe.
+ */
+interface TAsyncManager {
+ /**
+ * Submits a work item to be executed asynchronously.
+ *
+ * Access to asnyc transports is serialized – if two work items associated
+ * with the same transport are submitted, the second delegate will not be
+ * invoked until the first has returned, even it the latter context-switches
+ * away (because it is waiting for I/O) and the async manager is idle
+ * otherwise.
+ *
+ * Optionally, a TCancellation instance can be specified. If present,
+ * triggering it will be considered a request to cancel the work item, if it
+ * is still waiting for the associated transport to become available.
+ * Delegates which are already being processed (i.e. waiting for I/O) are not
+ * affected because this would bring the connection into an undefined state
+ * (as probably half-written request or a half-read response would be left
+ * behind).
+ *
+ * Params:
+ * transport = The TAsyncTransport the work delegate will operate on. Must
+ * be associated with this TAsyncManager instance.
+ * work = The operations to execute on the given transport. Must never
+ * throw, errors should be handled in another way. nothrow semantics are
+ * difficult to enforce in combination with fibres though, so currently
+ * exceptions are just swallowed by TAsyncManager implementations.
+ * cancellation = If set, can be used to request cancellatinon of this work
+ * item if it is still waiting to be executed.
+ *
+ * Note: The work item will likely be executed in a different thread, so make
+ * sure the code it relies on is thread-safe. An exception are the async
+ * transports themselves, to which access is serialized as noted above.
+ */
+ void execute(TAsyncTransport transport, void delegate() work,
+ TCancellation cancellation = null
+ ) in {
+ assert(transport.asyncManager is this,
+ "The given transport must be associated with this TAsyncManager.");
+ }
+
+ /**
+ * Submits a delegate to be executed after a certain amount of time has
+ * passed.
+ *
+ * The actual amount of time elapsed can be higher if the async manager
+ * instance is busy and thus should not be relied on. The
+ *
+ * Params:
+ * duration = The amount of time to wait before starting to execute the
+ * work delegate.
+ * work = The code to execute after the specified amount of time has passed.
+ *
+ * Example:
+ * ---
+ * // A very basic example – usually, the actuall work item would enqueue
+ * // some async transport operation.
+ * auto asyncMangager = someAsyncManager();
+ *
+ * TFuture!int calculate() {
+ * // Create a promise and asynchronously set its value after three
+ * // seconds have passed.
+ * auto promise = new TPromise!int;
+ * asyncManager.delay(dur!"seconds"(3), {
+ * promise.succeed(42);
+ * });
+ *
+ * // Immediately return it to the caller.
+ * return promise;
+ * }
+ *
+ * // This will wait until the result is available and then print it.
+ * writeln(calculate().waitGet());
+ * ---
+ */
+ void delay(Duration duration, void delegate() work);
+
+ /**
+ * Shuts down all background threads or other facilities that might have
+ * been started in order to execute work items. This function is typically
+ * called during program shutdown.
+ *
+ * If there are still tasks to be executed when the timeout expires, any
+ * currently executed work items will never receive any notifications
+ * for async transports managed by this instance, queued work items will
+ * be silently dropped, and implementations are allowed to leak resources.
+ *
+ * Params:
+ * waitFinishTimeout = If positive, waits for all work items to be
+ * finished for the specified amount of time, if negative, waits for
+ * completion without ever timing out, if zero, immediately shuts down
+ * the background facilities.
+ */
+ bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1));
+}
+
+/**
+ * A TTransport which uses a TAsyncManager to schedule non-blocking operations.
+ *
+ * The actual type of device is not specified; typically, implementations will
+ * depend on an interface derived from TAsyncManager to be notified of changes
+ * in the transport state.
+ *
+ * The peeking, reading, writing and flushing methods must always be called
+ * from within the associated async manager.
+ */
+interface TAsyncTransport : TTransport {
+ /**
+ * The TAsyncManager associated with this transport.
+ */
+ TAsyncManager asyncManager() @property;
+}
+
+/**
+ * A TAsyncManager providing notificiations for socket events.
+ */
+interface TAsyncSocketManager : TAsyncManager {
+ /**
+ * Adds a listener that is triggered once when an event of the specified type
+ * occurs, and removed afterwards.
+ *
+ * Params:
+ * socket = The socket to listen for events at.
+ * eventType = The type of the event to listen for.
+ * timeout = The period of time after which the listener will be called
+ * with TAsyncEventReason.TIMED_OUT if no event happened.
+ * listener = The delegate to call when an event happened.
+ */
+ void addOneshotListener(Socket socket, TAsyncEventType eventType,
+ Duration timeout, TSocketEventListener listener);
+
+ /// Ditto
+ void addOneshotListener(Socket socket, TAsyncEventType eventType,
+ TSocketEventListener listener);
+}
+
+/**
+ * Types of events that can happen for an asynchronous transport.
+ */
+enum TAsyncEventType {
+ READ, /// New data became available to read.
+ WRITE /// The transport became ready to be written to.
+}
+
+/**
+ * The type of the delegates used to register socket event handlers.
+ */
+alias void delegate(TAsyncEventReason callReason) TSocketEventListener;
+
+/**
+ * The reason a listener was called.
+ */
+enum TAsyncEventReason : byte {
+ NORMAL, /// The event listened for was triggered normally.
+ TIMED_OUT /// A timeout for the event was set, and it expired.
+}
View
461 lib/d/src/thrift/async/libevent.d
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.async.libevent;
+
+import core.atomic;
+import core.time : Duration, dur;
+import core.exception : onOutOfMemoryError;
+import core.memory : GC;
+import core.thread : Fiber, Thread;
+import core.sync.condition;
+import core.sync.mutex;
+import core.stdc.stdlib : free, malloc;
+import deimos.event2.event;
+import std.array : empty, front, popFront;
+import std.conv : text, to;
+import std.exception : enforce;
+import std.socket : Socket, socketPair;
+import thrift.base;
+import thrift.async.base;
+import thrift.internal.socket;
+import thrift.internal.traits;
+import thrift.util.cancellation;
+
+// To avoid DMD @@BUG6395@@.
+import thrift.internal.algorithm;
+
+/**
+ * A TAsyncManager implementation based on libevent.
+ *
+ * The libevent loop for handling non-blocking sockets is run in a background
+ * thread, which is lazily spawned. The thread is not daemonized to avoid
+ * crashes on program shutdown, it is only stopped when the manager instance
+ * is destroyed. So, to ensure a clean program teardown, either make sure this
+ * instance gets destroyed (e.g. by using scope), or manually call stop() at
+ * the end.
+ */
+class TLibeventAsyncManager : TAsyncSocketManager {
+ this() {
+ eventBase_ = event_base_new();
+
+ // Set up the socket pair for transferring control messages to the event
+ // loop.
+ auto pair = socketPair();
+ controlSendSocket_ = pair[0];
+ controlReceiveSocket_ = pair[1];
+ controlReceiveSocket_.blocking = false;
+
+ // Register an event for receiving control messages.
+ controlReceiveEvent_ = event_new(eventBase_, controlReceiveSocket_.handle,
+ EV_READ | EV_PERSIST | EV_ET, assumeNothrow(&controlMsgReceiveCallback),
+ cast(void*)this);
+ event_add(controlReceiveEvent_, null);
+
+ queuedCountMutex_ = new Mutex;
+ zeroQueuedCondition_ = new Condition(queuedCountMutex_);
+ }
+
+ ~this() {
+ // stop() should be safe to call, because either we don't have a worker
+ // thread running and it is a no-op anyway, or it is guaranteed to be
+ // still running (blocked in event_base_loop), and thus guaranteed not to
+ // be garbage collected yet.
+ stop(dur!"hnsecs"(0));
+
+ event_free(controlReceiveEvent_);
+ event_base_free(eventBase_);
+ eventBase_ = null;
+ }
+
+ override void execute(TAsyncTransport transport, Work work,
+ TCancellation cancellation = null
+ ) {
+ if (cancellation && cancellation.triggered) return;
+
+ // Keep track that there is a new work item to be processed.
+ incrementQueuedCount();
+
+ ensureWorkerThreadRunning();
+
+ // We should be able to send the control message as a whole – we currently
+ // assume to be able to receive it at once as well. If this proves to be
+ // unstable (e.g. send could possibly return early if the receiving buffer
+ // is full and the blocking call gets interrupted by a signal), it could
+ // be changed to a more sophisticated scheme.
+
+ // Make sure the delegate context doesn't get GCd while the work item is
+ // on the wire.
+ GC.addRoot(work.ptr);
+
+ // Send work message.
+ sendControlMsg(ControlMsg(MsgType.WORK, work, transport));
+
+ if (cancellation) {
+ cancellation.triggering.addCallback({
+ sendControlMsg(ControlMsg(MsgType.CANCEL, work, transport));
+ });
+ }
+ }
+
+ override void delay(Duration duration, void delegate() work) {
+ incrementQueuedCount();
+
+ ensureWorkerThreadRunning();
+
+ const tv = toTimeval(duration);
+
+ // DMD @@BUG@@: Cannot deduce T to void delegate() here.
+ registerOneshotEvent!(void delegate())(
+ -1, 0, assumeNothrow(&delayCallback), &tv,
+ {
+ work();
+ decrementQueuedCount();
+ }
+ );
+ }
+
+ override bool stop(Duration waitFinishTimeout = dur!"hnsecs"(-1)) {
+ bool cleanExit = true;
+
+ synchronized (this) {
+ if (workerThread_) {
+ synchronized (queuedCountMutex_) {
+ if (waitFinishTimeout > dur!"hnsecs"(0)) {
+ if (queuedCount_ > 0) {
+ zeroQueuedCondition_.wait(waitFinishTimeout);
+ }
+ } else if (waitFinishTimeout < dur!"hnsecs"(0)) {
+ while (queuedCount_ > 0) zeroQueuedCondition_.wait();
+ } else {
+ // waitFinishTimeout is zero, immediately exit in all cases.
+ }
+ cleanExit = (queuedCount_ == 0);
+ }
+
+ event_base_loopbreak(eventBase_);
+ sendControlMsg(ControlMsg(MsgType.SHUTDOWN));
+ workerThread_.join();
+ workQueues_ = null;
+ // We have nuked all currently enqueued items, so set the count to
+ // zero. This is safe to do without locking, since the worker thread
+ // is down.
+ queuedCount_ = 0;
+ atomicStore(*(cast(shared)&workerThread_), cast(shared(Thread))null);
+ }
+ }
+
+ return cleanExit;
+ }
+
+ override void addOneshotListener(Socket socket, TAsyncEventType eventType,
+ TSocketEventListener listener
+ ) {
+ addOneshotListenerImpl(socket, eventType, null, listener);
+ }
+
+ override void addOneshotListener(Socket socket, TAsyncEventType eventType,
+ Duration timeout, TSocketEventListener listener
+ ) {
+ if (timeout <= dur!"hnsecs"(0)) {
+ addOneshotListenerImpl(socket, eventType, null, listener);
+ } else {
+ // This is not really documented well, but libevent does not require to
+ // keep the timeval around after the event was added.
+ auto tv = toTimeval(timeout);
+ addOneshotListenerImpl(socket, eventType, &tv, listener);
+ }
+ }
+
+private:
+ alias void delegate() Work;
+
+ void addOneshotListenerImpl(Socket socket, TAsyncEventType eventType,
+ const(timeval)* timeout, TSocketEventListener listener
+ ) {
+ registerOneshotEvent(socket.handle, libeventEventType(eventType),
+ assumeNothrow(&socketCallback), timeout, listener);
+ }
+
+ void registerOneshotEvent(T)(evutil_socket_t fd, short type,
+ event_callback_fn callback, const(timeval)* timeout, T payload
+ ) {
+ // Create a copy of the payload on the C heap.
+ auto payloadMem = malloc(payload.sizeof);
+ if (!payloadMem) onOutOfMemoryError();
+ (cast(T*)payloadMem)[0 .. 1] = payload;
+ GC.addRange(payloadMem, payload.sizeof);
+
+ auto result = event_base_once(eventBase_, fd, type, callback,
+ payloadMem, timeout);
+
+ // Assuming that we didn't get our arguments wrong above, the only other
+ // situation in which event_base_once can fail is when it can't allocate
+ // memory.
+ if (result != 0) onOutOfMemoryError();
+ }
+
+ enum MsgType : ubyte {
+ SHUTDOWN,
+ WORK,
+ CANCEL
+ }
+
+ struct ControlMsg {
+ MsgType type;
+ Work work;
+ TAsyncTransport transport;
+ }
+
+ /**
+ * Starts the worker thread if it is not already running.
+ */
+ void ensureWorkerThreadRunning() {
+ // Technically, only half barriers would be required here, but adding the
+ // argument seems to trigger a DMD template argument deduction @@BUG@@.
+ if (!atomicLoad(*(cast(shared)&workerThread_))) {
+ synchronized (this) {
+ if (!workerThread_) {
+ auto thread = new Thread({ event_base_loop(eventBase_, 0); });
+ thread.start();
+ atomicStore(*(cast(shared)&workerThread_), cast(shared)thread);
+ }
+ }
+ }
+ }
+
+ /**
+ * Sends a control message to the worker thread.
+ */
+ void sendControlMsg(const(ControlMsg) msg) {
+ auto result = controlSendSocket_.send((&msg)[0 .. 1]);
+ enum size = msg.sizeof;
+ enforce(result == size, new TException(text(
+ "Sending control message of type ", msg.type, " failed (", result,
+ " bytes instead of ", size, " transmitted).")));
+ }
+
+ /**
+ * Receives messages from the control message socket and acts on them. Called
+ * from the worker thread.
+ */
+ void receiveControlMsg() {
+ // Read as many new work items off the socket as possible (at least one
+ // should be available, as we got notified by libevent).
+ ControlMsg msg;
+ ptrdiff_t bytesRead;
+ while (true) {
+ bytesRead = controlReceiveSocket_.receive(cast(ubyte[])((&msg)[0 .. 1]));
+
+ if (bytesRead < 0) {
+ auto errno = getSocketErrno();
+ if (errno != WOULD_BLOCK_ERRNO) {
+ logError("Reading control message, some work item will possibly " ~
+ "never be executed: %s", socketErrnoString(errno));
+ }
+ }
+ if (bytesRead != msg.sizeof) break;
+
+ // Everything went fine, we received a new control message.
+ final switch (msg.type) {
+ case MsgType.SHUTDOWN:
+ // The message was just intended to wake us up for shutdown.
+ break;
+
+ case MsgType.CANCEL:
+ // When processing a cancellation, we must not touch the first item,
+ // since it is already being processed.
+ auto queue = workQueues_[msg.transport];
+ if (queue.length > 0) {
+ workQueues_[msg.transport] = [queue[0]] ~
+ removeEqual(queue[1 .. $], msg.work);
+ }
+ break;
+
+ case MsgType.WORK:
+ // Now that the work item is back in the D world, we don't need the
+ // extra GC root for the context pointer anymore (see execute()).
+ GC.removeRoot(msg.work.ptr);
+
+ // Add the work item to the queue and execute it.
+ auto queue = msg.transport in workQueues_;
+ if (queue is null || (*queue).empty) {
+ // If the queue is empty, add the new work item to the queue as well,
+ // but immediately start executing it.
+ workQueues_[msg.transport] = [msg.work];
+ executeWork(msg.transport, msg.work);
+ } else {
+ (*queue) ~= msg.work;
+ }
+ break;
+ }
+ }
+
+ // If the last read was successful, but didn't read enough bytes, we got
+ // a problem.
+ if (bytesRead > 0) {
+ logError("Unexpected partial control message read (%s byte(s) " ~
+ "instead of %s), some work item will possibly never be executed.",
+ bytesRead, msg.sizeof);
+ }
+ }
+
+ /**
+ * Executes the given work item and all others enqueued for the same
+ * transport in a new fiber. Called from the worker thread.
+ */
+ void executeWork(TAsyncTransport transport, Work work) {
+ (new Fiber({
+ auto item = work;
+ while (true) {
+ try {
+ // Execute the actual work. It will possibly add listeners to the
+ // event loop and yield away if it has to wait for blocking
+ // operations. It is quite possible that another fiber will modify
+ // the work queue for the current transport.
+ item();
+ } catch (Exception e) {
+ // This should never happen, just to be sure the worker thread
+ // doesn't stop working in mysterious ways because of an unhandled
+ // exception.
+ logError("Exception thrown by work item: %s", e);
+ }
+
+ // Remove the item from the work queue.
+ // Note: Due to the value semantics of array slices, we have to
+ // re-lookup this on every iteration. This could be solved, but I'd
+ // rather replace this directly with a queue type once one becomes
+ // available in Phobos.
+ auto queue = workQueues_[transport];
+ assert(queue.front == item);
+ queue.popFront();
+ workQueues_[transport] = queue;
+
+ // Now that the work item is done, no longer count it as queued.
+ decrementQueuedCount();
+
+ if (queue.empty) break;
+
+ // If the queue is not empty, execute the next waiting item.
+ item = queue.front;
+ }
+ })).call();
+ }
+
+ /**
+ * Increments the amount of queued items.
+ */
+ void incrementQueuedCount() {
+ synchronized (queuedCountMutex_) {
+ ++queuedCount_;
+ }
+ }
+
+ /**
+ * Decrements the amount of queued items.
+ */
+ void decrementQueuedCount() {
+ synchronized (queuedCountMutex_) {
+ assert(queuedCount_ > 0);
+ --queuedCount_;
+ if (queuedCount_ == 0) {
+ zeroQueuedCondition_.notifyAll();
+ }
+ }
+ }
+
+ static extern(C) void controlMsgReceiveCallback(evutil_socket_t, short,
+ void *managerThis
+ ) {
+ (cast(TLibeventAsyncManager)managerThis).receiveControlMsg();
+ }
+
+ static extern(C) void socketCallback(evutil_socket_t, short flags,
+ void *arg
+ ) {
+ auto reason = (flags & EV_TIMEOUT) ? TAsyncEventReason.TIMED_OUT :
+ TAsyncEventReason.NORMAL;
+ (*(cast(TSocketEventListener*)arg))(reason);
+ GC.removeRange(arg);
+ clear(arg);
+ free(arg);
+ }
+
+ static extern(C) void delayCallback(evutil_socket_t, short flags,
+ void *arg
+ ) {
+ assert(flags & EV_TIMEOUT);
+ (*(cast(void delegate()*)arg))();
+ GC.removeRange(arg);
+ clear(arg);
+ free(arg);
+ }
+
+ Thread workerThread_;
+
+ event_base* eventBase_;
+
+ /// The socket used for receiving new work items in the event loop. Paired
+ /// with controlSendSocket_. Invalid (i.e. TAsyncWorkItem.init) items are
+ /// ignored and can be used to wake up the worker thread.
+ Socket controlReceiveSocket_;
+ event* controlReceiveEvent_;
+
+ /// The socket used to send new work items to the event loop. It is
+ /// expected that work items can always be read at once from it, i.e. that
+ /// there will never be short reads.
+ Socket controlSendSocket_;
+
+ /// Queued up work delegates for async transports. This also includes
+ /// currently active ones, they are removed from the queue on completion,
+ /// which is relied on by the control message receive fiber (the main one)
+ /// to decide whether to immediately start executing items or not.
+ // TODO: This should really be of some queue type, not an array slice, but
+ // std.container doesn't have anything.
+ Work[][TAsyncTransport] workQueues_;
+
+ /// The total number of work items not yet finished (queued and currently
+ /// excecuted) and delays not yet executed.
+ uint queuedCount_;
+
+ /// Protects queuedCount_.
+ Mutex queuedCountMutex_;
+
+ /// Triggered when queuedCount_ reaches zero, protected by queuedCountMutex_.
+ Condition zeroQueuedCondition_;
+}
+
+private {
+ timeval toTimeval(const(Duration) dur) {
+ timeval tv = {tv_sec: cast(int)dur.total!"seconds"(),
+ tv_usec: dur.fracSec.usecs};
+ return tv;
+ }
+
+ /**
+ * Returns the libevent flags combination to represent a given TAsyncEventType.
+ */
+ short libeventEventType(TAsyncEventType type) {
+ final switch (type) {
+ case TAsyncEventType.READ:
+ return EV_READ | EV_ET;
+ case TAsyncEventType.WRITE:
+ return EV_WRITE | EV_ET;
+ }
+ }
+}
View
357 lib/d/src/thrift/async/socket.d
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+module thrift.async.socket;
+
+import core.thread : Fiber;
+import core.time : dur, Duration;
+import std.array : empty;
+import std.conv : to;
+import std.exception : enforce;
+import std.socket;
+import thrift.base;
+import thrift.async.base;
+import thrift.transport.base;
+import thrift.transport.socket : TSocketBase;
+import thrift.internal.endian;
+import thrift.internal.socket;
+
+version (Windows) {
+ import std.c.windows.winsock : connect;
+} else version (Posix) {
+ import core.sys.posix.sys.socket : connect;
+} else static assert(0, "Don't know connect on this platform.");
+
+/**
+ * Non-blocking socket implementation of the TTransport interface.
+ *
+ * Whenever a socket operation would block, TAsyncSocket registers a callback
+ * with the specified TAsyncSocketManager and yields.
+ *
+ * As for thrift.transport.socket, due to the limitations of std.socket,
+ * currently only TCP/IP sockets are supported (i.e. Unix domain sockets are
+ * not).
+ */
+class TAsyncSocket : TSocketBase, TAsyncTransport {
+ /**
+ * Constructor that takes an already created, connected (!) socket.
+ *
+ * Params:
+ * asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
+ * socket = Already created, connected socket object. Will be switched to
+ * non-blocking mode if it isn't already.
+ */
+ this(TAsyncSocketManager asyncManager, Socket socket) {
+ asyncManager_ = asyncManager;
+ socket.blocking = false;
+ super(socket);
+ }
+
+ /**
+ * Creates a new unconnected socket that will connect to the given host
+ * on the given port.
+ *
+ * Params:
+ * asyncManager = The TAsyncSocketManager to use for non-blocking I/O.
+ * host = Remote host.
+ * port = Remote port.
+ */
+ this(TAsyncSocketManager asyncManager, string host, ushort port) {
+ asyncManager_ = asyncManager;
+ super(host, port);
+ }
+
+ override TAsyncManager asyncManager() @property {
+ return asyncManager_;
+ }
+
+ /**
+ * Asynchronously connects the socket.
+ *
+ * Completes without blocking and defers further operations on the socket
+ * until the connection is established. If connecting fails, this is
+ * currently not indicated in any way other than every call to read/write
+ * failing.
+ */
+ override void open() {
+ if (isOpen) return;
+
+ enforce(!host_.empty, new TTransportException(
+ "Cannot open null host.", TTransportException.Type.NOT_OPEN));
+ enforce(port_ != 0, new TTransportException(
+ "Cannot open with null port.", TTransportException.Type.NOT_OPEN));
+
+
+ // Cannot use std.socket.Socket.connect here because it hides away
+ // EINPROGRESS/WSAWOULDBLOCK.
+ Address addr;
+ try {
+ // Currently, we just go with the first address returned, could be made
+ // more intelligent though – IPv6?
+ addr = getAddress(host_, port_)[0];
+ } catch (Exception e) {
+ throw new TTransportException(`Unable to resolve host "` ~ host_ ~ `".`,
+ TTransportException.Type.NOT_OPEN, __FILE__, __LINE__, e);
+ }
+
+ socket_ = new TcpSocket(addr.addressFamily);
+ socket_.blocking = false;
+ setSocketOpts();
+
+ auto errorCode = connect(socket_.handle, addr.name(), addr.nameLen());
+ if (errorCode == 0) {
+ // If the connection could be established immediately, just return. I
+ // don't know if this ever happens.
+ return;
+ }
+
+ auto errno = getSocketErrno();
+ if (errno != CONNECT_INPROGRESS_ERRNO) {
+ throw new TTransportException(`Could not establish connection to "` ~
+ host_ ~ `": ` ~ socketErrnoString(errno),
+ TTransportException.Type.NOT_OPEN);
+ }
+
+ // This is the expected case: connect() signalled that the connection
+ // is being established in the background. Queue up a work item with the
+ // async manager which just defers any other operations on this
+ // TAsyncSocket instance until the socket is ready.
+ asyncManager_.execute(this,
+ {
+ auto fiber = Fiber.getThis();
+ TAsyncEventReason reason = void;
+ asyncManager_.addOneshotListener(socket_, TAsyncEventType.WRITE,
+ connectTimeout,
+ scopedDelegate((TAsyncEventReason r){ reason = r; fiber.call(); })
+ );
+ Fiber.yield();
+
+ if (reason == TAsyncEventReason.TIMED_OUT) {
+ // Close the connection, so that subsequent work items fail immediately.
+ closeImmediately();
+ return;
+ }
+
+ int errorCode = void;
+ socket_.getOption(SocketOptionLevel.SOCKET, cast(SocketOption)SO_ERROR,
+ errorCode);
+
+ if (errorCode) {
+ logInfo("Could not connect TAsyncSocket: %s",
+ socketErrnoString(errorCode));
+
+ // Close the connection, so that subsequent work items fail immediately.
+ closeImmediately();
+ return;
+ }
+
+ }
+ );
+ }
+
+ /**
+ * Closes the socket.
+ *
+ * Will block until all currently active operations are finished before the
+ * socket is closed.
+ */
+ override void close() {
+ if (!isOpen) return;
+
+ import core.sync.condition;
+ import core.sync.mutex;
+
+ auto doneMutex = new Mutex;
+ auto doneCond = new Condition(doneMutex);
+ synchronized (doneMutex) {
+ asyncManager_.execute(this,
+ scopedDelegate(
+ {
+ closeImmediately();
+ synchronized (doneMutex) doneCond.notifyAll();
+ }
+ )
+ );
+ doneCond.wait();
+ }
+ }
+
+ override bool peek() {
+ if (!isOpen) return false;
+
+ ubyte buf;
+ auto r = socket_.receive((&buf)[0..1], SocketFlags.PEEK);
+ if (r == Socket.ERROR) {
+ auto lastErrno = getSocketErrno();
+ static if (connresetOnPeerShutdown) {
+ if (lastErrno == ECONNRESET) {
+ closeImmediately();
+ return false;
+ }
+ }
+ throw new TTransportException("Peeking into socket failed: " ~
+ socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
+ }
+ return (r > 0);
+ }
+
+ override size_t read(ubyte[] buf) {
+ enforce(isOpen, new TTransportException(
+ "Cannot read if socket is not open.", TTransportException.Type.NOT_OPEN));
+
+ typeof(getSocketErrno()) lastErrno;
+
+ auto r = yieldOnBlock(socket_.receive(cast(void[])buf),
+ TAsyncEventType.READ);
+
+ // If recv went fine, immediately return.
+ if (r >= 0) return r;
+
+ // Something went wrong, find out how to handle it.
+ lastErrno = getSocketErrno();
+
+ static if (connresetOnPeerShutdown) {
+ // See top comment.
+ if (lastErrno == ECONNRESET) {
+ return 0;
+ }
+ }
+
+ throw new TTransportException("Receiving from socket failed: " ~
+ socketErrnoString(lastErrno), TTransportException.Type.UNKNOWN);
+ }
+
+ override void write(in ubyte[] buf) {
+ size_t sent;
+ while (sent < buf.length) {
+ sent += writeSome(buf[sent .. $]);
+ }
+ assert(sent == buf.length);
+ }
+
+ override size_t writeSome(in ubyte[] buf) {
+ enforce(isOpen, new TTransportException(
+ "Cannot write if socket is not open.", TTransportException.Type.NOT_OPEN));
+
+ auto r = yieldOnBlock(socket_.send(buf), TAsyncEventType.WRITE);
+
+ // Everything went well, just return the number of bytes written.
+ if (r > 0) return r;
+
+ // Handle error conditions.
+ if (r < 0) {
+ auto lastErrno = getSocketErrno();
+
+ auto type = TTransportException.Type.UNKNOWN;
+ if (isSocketCloseErrno(lastErrno)) {
+ type = TTransportException.Type.NOT_OPEN;
+ closeImmediately();
+ }
+
+ throw new TTransportException("Sending to socket failed: " ~
+ socketErrnoString(lastErrno), type);
+ }
+
+ // send() should never return 0.
+ throw new TTransportException("Sending to socket failed (0 bytes written).",
+ TTransportException.Type.UNKNOWN);
+ }
+
+ /// The amount of time in which a conncetion must be established before the
+ /// open() call times out.
+ Duration connectTimeout = dur!"seconds"(5);
+
+private:
+ void closeImmediately() {
+ socket_.close();
+ socket_ = null;
+ }
+
+ T yieldOnBlock(T)(lazy T call, TAsyncEventType eventType) {
+ while (true) {
+ auto result = call();
+ if (result != Socket.ERROR || getSocketErrno() != WOULD_BLOCK_ERRNO) return result;
+
+ // We got an EAGAIN result, register a callback to return here once some
+ // event happens and yield.
+
+ Duration timeout = void;
+ final switch (eventType) {
+ case TAsyncEventType.READ:
+ timeout = recvTimeout_;
+ break;
+ case TAsyncEventType.WRITE:
+ timeout = sendTimeout_;
+ break;
+ }
+
+ auto fiber = Fiber.getThis();
+ assert(fiber, "Current fiber null – not running in TAsyncManager?");
+ TAsyncEventReason eventReason = void;
+ asyncManager_.addOneshotListener(socket_, eventType, timeout,
+ scopedDelegate((TAsyncEventReason reason) {
+ eventReason = reason;
+ fiber.call();
+ })
+ );
+
+ // Yields execution back to the async manager, will return back here once
+ // the above listener is called.
+ Fiber.yield();
+
+ if (eventReason == TAsyncEventReason.TIMED_OUT) {
+ // If we are cancelling the request due to a timed out operation, the
+ // connection is in an undefined state, because the server could decide
+ // to send the requested data later, or we could have already been half-
+ // way into writing a request. Thus, we close the connection to make any
+ // possibly queued up work items fail immediately. Besides, the server
+ // is not very likely to immediately recover after a socket-level
+ // timeout has expired anyway.
+ closeImmediately();
+
+ throw new TTransportException("Timed out while waiting for socket " ~
+ "to get ready to " ~ to!string(eventType) ~ ".",
+ TTransportException.Type.TIMED_OUT);
+ }
+ }
+ }
+
+ /// The TAsyncSocketManager to use for non-blocking I/O.
+ TAsyncSocketManager asyncManager_;
+}
+
+private {
+ // std.socket doesn't include SO_ERROR for reasons unknown.
+ version (linux) {
+ enum SO_ERROR = 4;
+ } else version (OSX) {
+ enum SO_ERROR = 0x1007;
+ } else version (FreeBSD) {
+ enum SO_ERROR = 0x1007;
+ } else version (Win32) {
+ import std.c.windows.winsock : SO_ERROR;
+ } else static assert(false, "Don't know SO_ERROR on this platform.");
+
+ // This hack forces a delegate literal to be scoped, even if it is passed to