Skip to content
Browse files

Judomerged udp-admin branch into master.

Admin is now udp based.
Goodbye Libevent, two years together we went a long way but libuv is younger,
more agile and gets along better with granny windows.
Implemented a salsa20 based random number generator.
  • Loading branch information...
1 parent 2d77836 commit 03f4df8428abfb995dbea5ef29c84695e299359a Caleb James DeLisle committed Jan 31, 2013
Showing with 4,119 additions and 1,727 deletions.
  1. +12 −6 CMakeLists.txt
  2. +233 −253 admin/Admin.c
  3. +7 −9 admin/Admin.h
  4. +56 −85 admin/AdminClient.c
  5. +8 −2 admin/AdminClient.h
  6. +11 −11 admin/AdminLog.c
  7. +1 −1 admin/AdminLog.h
  8. +6 −1 admin/CMakeLists.txt
  9. +15 −13 admin/Configurator.c
  10. +2 −2 admin/Configurator.h
  11. +22 −174 admin/angel/Angel.c
  12. +2 −9 admin/angel/Angel.h
  13. +18 −117 admin/angel/AngelInit.c
  14. +7 −3 admin/angel/CMakeLists.txt
  15. +61 −18 admin/angel/Core.c
  16. +1 −1 admin/angel/Core.h
  17. +2 −2 admin/angel/Core_admin.c
  18. +1 −1 admin/angel/Core_admin.h
  19. +202 −0 admin/angel/Hermes.c
  20. +67 −0 admin/angel/Hermes.h
  21. +15 −10 admin/angel/InterfaceWaiter.c
  22. +1 −1 admin/angel/InterfaceWaiter.h
  23. +30 −23 admin/angel/Waiter.c
  24. +6 −12 admin/angel/cjdroute2.c
  25. +1 −106 admin/test/Admin_test.c
  26. +23 −36 admin/testframework/AdminTestFramework.c
  27. +2 −2 admin/testframework/AdminTestFramework.h
  28. BIN cmake/externals/node-v0.9.7.tar.gz
  29. +0 −134 cmake/modules/FindLibevent2.cmake
  30. +122 −0 cmake/modules/FindLibuv.cmake
  31. +3 −0 cmake/modules/Test.cmake
  32. +3 −3 contrib/perl/CJDNS/lib/CJDNS.pm
  33. +100 −27 contrib/python/cjdns.py
  34. +3 −19 contrib/python/cjdnslog
  35. +2 −14 crypto/CMakeLists.txt
  36. +3 −4 crypto/CryptoAuth.c
  37. +2 −2 crypto/CryptoAuth.h
  38. +5 −5 crypto/CryptoAuth_benchmark.c
  39. +1 −1 crypto/CryptoAuth_benchmark.h
  40. +0 −64 crypto/Random.c
  41. +35 −0 crypto/random/CMakeLists.txt
  42. +250 −0 crypto/random/Random.c
  43. +14 −5 crypto/{ → random}/Random.h
  44. +28 −0 crypto/random/libuv/CMakeLists.txt
  45. +75 −0 crypto/random/libuv/LibuvEntropyProvider.c
  46. +35 −0 crypto/random/libuv/LibuvEntropyProvider.h
  47. +14 −16 util/events/libevent/EventBase.c → crypto/random/randombytes.c
  48. +68 −0 crypto/random/seed/BsdKernArndSysctlRandomSeed.c
  49. +23 −0 crypto/random/seed/BsdKernArndSysctlRandomSeed.h
  50. +28 −0 crypto/random/seed/CMakeLists.txt
  51. +67 −0 crypto/random/seed/DevUrandomRandomSeed.c
  52. +23 −0 crypto/random/seed/DevUrandomRandomSeed.h
  53. +51 −0 crypto/random/seed/LinuxRandomUuidSysctlRandomSeed.c
  54. +23 −0 crypto/random/seed/LinuxRandomUuidSysctlRandomSeed.h
  55. +99 −0 crypto/random/seed/ProcSysKernelRandomUuidRandomSeed.c
  56. +23 −0 crypto/random/seed/ProcSysKernelRandomUuidRandomSeed.h
  57. +130 −0 crypto/random/seed/RandomSeed.c
  58. +49 −0 crypto/random/seed/RandomSeed.h
  59. +44 −0 crypto/random/seed/RtlGenRandomSeed.c
  60. +23 −0 crypto/random/seed/RtlGenRandomSeed.h
  61. +25 −0 crypto/random/test/CMakeLists.txt
  62. +35 −0 crypto/random/test/DeterminentRandomSeed.c
  63. +23 −0 crypto/random/test/DeterminentRandomSeed.h
  64. +115 −0 crypto/random/test/Random_test.c
  65. +1 −1 crypto/test/CMakeLists.txt
  66. +2 −2 crypto/test/CryptoAuth_test.c
  67. +2 −2 crypto/test/ReplayProtector_test.c
  68. +0 −2 dht/CMakeLists.txt
  69. +0 −3 dht/dhtcore/CMakeLists.txt
  70. +3 −3 dht/dhtcore/Janitor.c
  71. +1 −1 dht/dhtcore/Janitor.h
  72. +1 −1 dht/dhtcore/NodeStore.c
  73. +4 −4 dht/dhtcore/RouterModule.c
  74. +2 −2 dht/dhtcore/RouterModule.h
  75. +1 −1 dht/dhtcore/RouterModule_pvt.h
  76. +2 −2 dht/dhtcore/SearchStore.c
  77. +2 −2 dht/dhtcore/SearchStore.h
  78. +1 −1 dht/dhtcore/test/CMakeLists.txt
  79. +0 −1 dht/test/CMakeLists.txt
  80. +2 −2 interface/CMakeLists.txt
  81. +1 −1 interface/ETHInterface.h
  82. +1 −1 interface/ETHInterface_Linux.c
  83. +2 −2 interface/ETHInterface_admin.c
  84. +1 −1 interface/ETHInterface_admin.h
  85. +11 −18 interface/PipeInterface.c
  86. +2 −2 interface/PipeInterface.h
  87. +4 −5 interface/SessionManager.c
  88. +1 −1 interface/SessionManager.h
  89. +1 −1 interface/TUNInterface_W32.c
  90. +13 −142 interface/UDPInterface.c
  91. +4 −6 interface/UDPInterface.h
  92. +165 −0 interface/UDPInterfaceBase.c
  93. +57 −0 interface/UDPInterfaceBase.h
  94. +23 −13 interface/UDPInterface_admin.c
  95. +1 −1 interface/UDPInterface_admin.h
  96. +4 −12 interface/UDPInterface_pvt.h
  97. +39 −0 interface/addressable/AddrInterface.h
  98. +25 −0 interface/addressable/CMakeLists.txt
  99. +158 −0 interface/addressable/UDPAddrInterface.c
  100. +54 −0 interface/addressable/UDPAddrInterface.h
  101. +2 −2 interface/test/ICMP6Generator_test.c
  102. +2 −2 interface/test/MultiInterface_test.c
  103. +11 −29 interface/test/TUNInterface_ipv4_root_test.c
  104. +17 −32 interface/test/TUNInterface_ipv6_root_test.c
  105. +12 −18 interface/test/UDPInterface_communication_test.c
  106. +3 −4 io/CMakeLists.txt
  107. +0 −1 io/test/CMakeLists.txt
  108. +2 −2 io/test/FileReader_test.c
  109. +0 −4 memory/CMakeLists.txt
  110. +1 −0 memory/CanaryAllocator.c
  111. +1 −1 memory/CanaryAllocator.h
  112. +1 −1 memory/CanaryAllocator_pvt.h
  113. +2 −2 memory/test/MallocAllocator_oom_test.c
  114. +3 −3 net/DefaultInterfaceController.c
  115. +1 −1 net/DefaultInterfaceController.h
  116. +2 −3 net/Ducttape.c
  117. +1 −1 net/Ducttape.h
  118. +1 −1 net/Ducttape_pvt.h
  119. +1 −1 net/SwitchPinger.c
  120. +1 −1 net/SwitchPinger.h
  121. +1 −1 net/test/DefaultInterfaceController_multiIface_test.c
  122. +2 −2 net/test/DefaultInterfaceController_test.c
  123. +85 −0 privatetopublic.c
  124. +1 −1 test/CMakeLists.txt
  125. +7 −7 test/TestFramework.c
  126. +15 −6 tunnel/IpTunnel.c
  127. +6 −5 tunnel/IpTunnel.h
  128. +25 −13 tunnel/IpTunnel_admin.c
  129. +1 −1 tunnel/test/CMakeLists.txt
  130. +7 −5 tunnel/test/IpTunnel_test.c
  131. +20 −0 util/AddrTools.c
  132. +3 −4 util/AverageRoller.c
  133. +1 −1 util/AverageRoller.h
  134. +0 −8 util/CMakeLists.txt
  135. +1 −1 util/Events.h
  136. +3 −3 util/Pinger.c
  137. +1 −1 util/Pinger.h
  138. +1 −1 util/events/CMakeLists.txt
  139. +1 −4 util/events/Event.h
  140. +8 −2 util/events/EventBase.h
  141. 0 util/{ → events}/Time.h
  142. +3 −2 util/{ → events}/Timeout.h
  143. +31 −0 util/events/libuv/CMakeLists.txt
  144. +42 −24 util/events/{libevent → libuv}/Event.c
  145. +110 −0 util/events/libuv/EventBase.c
  146. +51 −0 util/events/libuv/EventBase_pvt.h
  147. +5 −9 util/{ → events/libuv}/Time.c
  148. +42 −57 util/{ → events/libuv}/Timeout.c
  149. +11 −1 util/platform/CMakeLists.txt
  150. +260 −0 util/platform/Sockaddr.c
  151. +115 −0 util/platform/Sockaddr.h
  152. +123 −5 util/platform/Socket.c
  153. +36 −8 util/platform/Socket.h
  154. +5 −5 util/{events/libevent → platform/test}/CMakeLists.txt
  155. +72 −0 util/platform/test/Sockaddr_test.c
  156. +3 −3 util/test/Base32_test.c
  157. +3 −3 util/test/Bits_test.c
  158. +1 −1 util/test/CMakeLists.txt
  159. +3 −3 util/test/Hex_test.c
  160. +3 −3 util/test/Map_test.c
View
18 CMakeLists.txt
@@ -10,8 +10,7 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
project(cjdns C)
-cmake_minimum_required(VERSION 2.4)
-
+cmake_minimum_required(VERSION 2.8)
message(${CMAKE_VERSION})
if(CMAKE_BINARY_DIR STREQUAL ${CMAKE_SOURCE_DIR} AND NOT OPENWRT)
@@ -157,7 +156,7 @@ else()
set(PIE "")
endif()
-if (NOT "$ENV{DEBUG}" STREQUAL "")
+if ("$ENV{NO_DEBUG}" STREQUAL "")
add_definitions(-g)
endif()
@@ -183,7 +182,7 @@ if (${Log_LEVEL} STREQUAL "KEYS")
endif (${Log_LEVEL} STREQUAL "KEYS")
add_definitions("-D Log_${Log_LEVEL}")
-# https://github.com/cjdelisle/cjdns/issues/178
+# vrooooooom
add_definitions(-O2)
#IF(NOT CMAKE_BUILD_TYPE)
@@ -212,8 +211,7 @@ add_definitions("-D PARANOIA=1")
include_directories(${CMAKE_SOURCE_DIR})
-find_package(Libevent2 REQUIRED)
-include_directories(${LIBEVENT2_INCLUDE_DIRS})
+find_package(Libuv REQUIRED)
find_package(NACL REQUIRED)
include_directories(${NACL_INCLUDE_DIRS})
@@ -250,6 +248,14 @@ target_link_libraries(benc2json
cjdmemory
cjdio
)
+add_executable(privatetopublic
+ privatetopublic.c
+)
+target_link_libraries(privatetopublic
+ util
+ crypto
+)
+
#INSTALL(TARGETS cjdroute RUNTIME DESTINATION bin)
View
486 admin/Admin.c
@@ -13,15 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "admin/Admin.h"
-#include "admin/angel/Angel.h"
#include "benc/String.h"
+#include "benc/Int.h"
#include "benc/Dict.h"
-#include "benc/List.h"
#include "benc/serialization/BencSerializer.h"
#include "benc/serialization/standard/StandardBencSerializer.h"
-#include "dht/CJDHTConstants.h"
-#include "exception/Except.h"
-#include "interface/Interface.h"
+#include "interface/addressable/AddrInterface.h"
#include "io/Reader.h"
#include "io/ArrayReader.h"
#include "io/ArrayWriter.h"
@@ -32,24 +29,17 @@
#include "util/Bits.h"
#include "util/Hex.h"
#include "util/log/Log.h"
-#include "util/Security.h"
-#include "util/Time.h"
-#include "util/Timeout.h"
+#include "util/events/Time.h"
+#include "util/events/Timeout.h"
+#include "util/Identity.h"
+#include "util/platform/Sockaddr.h"
#define string_strstr
#define string_strcmp
#define string_strlen
#include "util/platform/libc/string.h"
#include <crypto_hash_sha256.h>
-#include <event2/event.h>
-#include <limits.h>
-#include <stdbool.h>
-#include <unistd.h>
-
-#ifdef WIN32
- #define EWOULDBLOCK WSAEWOULDBLOCK
-#endif
static String* TYPE = String_CONST_SO("type");
static String* REQUIRED = String_CONST_SO("required");
@@ -59,48 +49,53 @@ static String* DICT = String_CONST_SO("Dict");
static String* LIST = String_CONST_SO("List");
static String* TXID = String_CONST_SO("txid");
-struct Function
+/** Number of milliseconds before a session times out and outgoing messages are failed. */
+#define TIMEOUT_MILLISECONDS 30000
+
+/** map values for tracking time of last message by source address */
+struct MapValue
{
- String* name;
- Admin_FUNCTION(call);
- void* context;
- bool needsAuth;
- Dict* args;
-};
+ /** time when the last incoming message was received. */
+ uint64_t timeOfLastMessage;
-/**
- * This txid prefix is the inter-process communication txid.
- * the txid which is passed to the functions is these bytes followed by the bytes
- * that the user supplied in their "txid" entry. If the user didn't supply a txid
- * the txid which the function gets will be just these bytes and when it is sent back
- * the user will not get a txid back.
- */
-union Admin_TxidPrefix {
- uint8_t raw[8];
- struct {
- uint32_t channelNum;
- uint32_t serial;
- } values;
+ /** used to allocate the memory for the key (Sockaddr) and value (this). */
+ struct Allocator* allocator;
};
-#define Admin_TxidPrefix_SIZE 8
-Assert_compileTime(sizeof(union Admin_TxidPrefix) == Admin_TxidPrefix_SIZE);
-struct Channel
+//////// generate time-of-last-message-by-address map
+
+#define Map_USE_HASH
+#define Map_USE_COMPARATOR
+#define Map_NAME LastMessageTimeByAddr
+#define Map_KEY_TYPE struct Sockaddr*
+#define Map_VALUE_TYPE struct MapValue*
+#include "util/Map.h"
+
+static inline uint32_t Map_LastMessageTimeByAddr_hash(struct Sockaddr** key)
{
- /** True if the channel is waiting for the other end to close. */
- bool isClosing;
+ uint32_t* k = (uint32_t*) *key;
+ return k[ ((*key)->addrLen / 4)-1 ];
+}
- /** The index of this channel in the admin->clientChannels array. */
- uint32_t number;
+static inline int Map_LastMessageTimeByAddr_compare(struct Sockaddr** keyA, struct Sockaddr** keyB)
+{
+ return Bits_memcmp(*keyA, *keyB, (*keyA)->addrLen);
+}
+
+/////// end map
- uint32_t partialMessageLength;
- uint8_t partialMessageBuffer[Admin_MAX_REQUEST_SIZE];
- struct Allocator* alloc;
+struct Function
+{
+ String* name;
+ Admin_FUNCTION(call);
+ void* context;
+ bool needsAuth;
+ Dict* args;
};
struct Admin
{
- struct event_base* eventBase;
+ struct EventBase* eventBase;
struct Function* functions;
int functionCount;
@@ -110,52 +105,31 @@ struct Admin
String* password;
struct Log* logger;
- struct Interface* toAngelInterface;
+ struct AddrInterface* iface;
- struct Channel* clientChannels[Angel_MAX_CONNECTIONS];
-};
+ struct Map_LastMessageTimeByAddr map;
-static void freeChannel(void* vchannel)
-{
- struct Channel** channel = vchannel;
- *channel = NULL;
-}
+ /** non-zero if we are currently in an admin request. */
+ int inRequest;
-static void newChannel(struct Channel** location, struct Allocator* alloc)
-{
- struct Allocator* childAlloc = Allocator_child(alloc);
- struct Channel* out = childAlloc->clone(sizeof(struct Channel), alloc, &(struct Channel) {
- .alloc = childAlloc
- });
- childAlloc->onFree(freeChannel, location, childAlloc);
- *location = out;
-}
+ /** non-zero if this session able to receive asynchronous messages. */
+ int asyncEnabled;
-/**
- * find a channel by number; could allocate channels on the fly if needed
- * right now only 0..Angel_MAX_CONNECTIONS-1 numbers are valid and allocated in the Admin struct
- */
-static struct Channel* channelForNum(uint32_t channelNum, bool create, struct Admin* admin)
-{
- Assert_true(channelNum < Angel_MAX_CONNECTIONS);
- struct Channel** chanPtr = &admin->clientChannels[channelNum];
- if (*chanPtr == NULL && create) {
- newChannel(chanPtr, admin->allocator);
- (*chanPtr)->number = channelNum;
- }
- return *chanPtr;
-}
+ /** Length of addresses of clients which communicate with admin. */
+ uint32_t addrLen;
-static void sendMessage(struct Message* message, uint32_t channelNum, struct Admin* admin)
+ Identity
+};
+
+static uint8_t sendMessage(struct Message* message, struct Sockaddr* dest, struct Admin* admin)
{
// stack overflow when used with admin logger.
//Log_keys(admin->logger, "sending message to angel [%s]", message->bytes);
- Message_shift(message, 4);
- Bits_memcpyConst(message->bytes, &channelNum, 4);
- admin->toAngelInterface->sendMessage(message, admin->toAngelInterface);
+ Message_push(message, dest, dest->addrLen);
+ return admin->iface->generic.sendMessage(message, &admin->iface->generic);
}
-static int sendBenc(Dict* message, uint32_t channelNum, struct Admin* admin)
+static int sendBenc(Dict* message, struct Sockaddr* dest, struct Admin* admin)
{
struct Allocator* allocator;
BufferAllocator_STACK(allocator, 256);
@@ -173,13 +147,38 @@ static int sendBenc(Dict* message, uint32_t channelNum, struct Admin* admin)
.length = w->bytesWritten(w),
.padding = SEND_MESSAGE_PADDING
};
- sendMessage(&m, channelNum, admin);
+ return sendMessage(&m, dest, admin);
+}
+
+/**
+ * If no incoming data has been sent by this address in TIMEOUT_MILLISECONDS
+ * then Admin_sendMessage() should fail so that it doesn't endlessly send
+ * udp packets into outer space after a logging client disconnects.
+ */
+static int checkAddress(struct Admin* admin, int index, uint64_t now)
+{
+ uint64_t diff = now - admin->map.values[index]->timeOfLastMessage;
+ // check for backwards time
+ if (diff > TIMEOUT_MILLISECONDS && diff < ((uint64_t)INT64_MAX)) {
+ Allocator_free(admin->map.values[index]->allocator);
+ Map_LastMessageTimeByAddr_remove(index, &admin->map);
+ return -1;
+ }
+
return 0;
}
-int Admin_sendMessageToAngel(Dict* message, struct Admin* admin)
+static void clearExpiredAddresses(void* vAdmin)
{
- return sendBenc(message, 0xFFFFFFFF, admin);
+ struct Admin* admin = Identity_cast((struct Admin*) vAdmin);
+ uint64_t now = Time_currentTimeMilliseconds(admin->eventBase);
+ int count = 0;
+ for (int i = admin->map.count - 1; i >= 0; i--) {
+ if (checkAddress(admin, i, now)) {
+ count++;
+ }
+ }
+ Log_debug(admin->logger, "Cleared [%d] expired sessions", count);
}
/**
@@ -190,45 +189,37 @@ int Admin_sendMessage(Dict* message, String* txid, struct Admin* admin)
if (!admin) {
return 0;
}
- Assert_true(txid && txid->len >= 4);
-
- uint32_t channelNum;
- Bits_memcpyConst(&channelNum, txid->bytes, 4);
- struct Channel* channel = channelForNum(channelNum, false, admin);
-
- if (!channel) {
- return Admin_sendMessage_CHANNEL_CLOSED;
+ Identity_check(admin);
+ Assert_true(txid && txid->len >= admin->addrLen);
+
+ struct Sockaddr_storage addr;
+ Bits_memcpy(&addr, txid->bytes, admin->addrLen);
+
+ // if this is an async call, check if we've got any input from that client.
+ // if the client is nresponsive then fail the call so logs don't get sent
+ // out forever after a disconnection.
+ if (!admin->inRequest) {
+ struct Sockaddr* addrPtr = (struct Sockaddr*) &addr.addr;
+ int index = Map_LastMessageTimeByAddr_indexForKey(&addrPtr, &admin->map);
+ uint64_t now = Time_currentTimeMilliseconds(admin->eventBase);
+ if (index < 0 || checkAddress(admin, index, now)) {
+ return -1;
+ }
}
struct Allocator* allocator;
BufferAllocator_STACK(allocator, 256);
// Bounce back the user-supplied txid.
String userTxid = {
- .bytes = txid->bytes + 4,
- .len = txid->len - 4
+ .bytes = txid->bytes + admin->addrLen,
+ .len = txid->len - admin->addrLen
};
- if (txid->len > 4) {
+ if (txid->len > admin->addrLen) {
Dict_putString(message, TXID, &userTxid, allocator);
}
- return sendBenc(message, channelNum, admin);
-}
-
-/**
- * close a channel (for example if an error happened or we received non-empty
- * messages on a invalid channel number)
- * also used to cleanup if we receive a close message
- */
-static void closeChannel(struct Channel* channel, struct Admin* admin)
-{
- if (channel && !channel->isClosing) {
- channel->isClosing = true;
- uint8_t buff[128];
- struct Message m = { .bytes = &buff[124], .length = 0, .padding = 128 };
- sendMessage(&m, channel->number, admin);
- channel->isClosing = true;
- }
+ return sendBenc(message, &addr.addr, admin);
}
static inline bool authValid(Dict* message, struct Message* messageBytes, struct Admin* admin)
@@ -296,29 +287,58 @@ static bool checkArgs(Dict* args, struct Function* func, String* txid, struct Ad
return !error;
}
+static void asyncEnabled(Dict* args, void* vAdmin, String* txid)
+{
+ struct Admin* admin = Identity_cast((struct Admin*) vAdmin);
+ int64_t enabled = admin->asyncEnabled;
+ Dict d = Dict_CONST(String_CONST("asyncEnabled"), Int_OBJ(enabled), NULL);
+ Admin_sendMessage(&d, txid, admin);
+}
+
+#define ENTRIES_PER_PAGE 8
+static void availableFunctions(Dict* args, void* vAdmin, String* txid)
+{
+ struct Admin* admin = Identity_cast((struct Admin*) vAdmin);
+ int64_t* page = Dict_getInt(args, String_CONST("page"));
+ uint32_t i = (page) ? *page * ENTRIES_PER_PAGE : 0;
+ struct Allocator* tempAlloc = Allocator_child(admin->allocator);
+
+ Dict* d = Dict_new(tempAlloc);
+ Dict* functions = Dict_new(tempAlloc);
+ int count = 0;
+ for (; i < (uint32_t)admin->functionCount && count++ < ENTRIES_PER_PAGE; i++) {
+ Dict_putDict(functions, admin->functions[i].name, admin->functions[i].args, tempAlloc);
+ }
+ String* more = String_CONST("more");
+ if (count >= ENTRIES_PER_PAGE) {
+ Dict_putInt(d, more, 1, tempAlloc);
+ }
+ Dict_putDict(d, String_CONST("availableFunctions"), functions, tempAlloc);
+
+ Admin_sendMessage(d, txid, admin);
+ Allocator_free(tempAlloc);
+ return;
+}
+
static void handleRequest(Dict* messageDict,
struct Message* message,
- struct Channel* channel,
+ struct Sockaddr* src,
struct Allocator* allocator,
struct Admin* admin)
{
- Log_debug(admin->logger, "Got a request on channel [%u]", channel->number);
- String* query = Dict_getString(messageDict, CJDHTConstants_QUERY);
+ String* query = Dict_getString(messageDict, String_CONST("q"));
if (!query) {
- Log_info(admin->logger,
- "Got a non-query from admin interface on channel [%u].",
- channel->number);
- closeChannel(channel, admin);
+ Log_info(admin->logger, "Got a non-query from admin interface");
return;
}
// txid becomes the user supplied txid combined with the channel num.
String* userTxid = Dict_getString(messageDict, TXID);
- uint32_t txidlen = ((userTxid) ? userTxid->len : 0) + 4;
+ uint32_t txidlen = ((userTxid) ? userTxid->len : 0) + src->addrLen;
String* txid = String_newBinary(NULL, txidlen, allocator);
- Bits_memcpyConst(txid->bytes, &channel->number, 4);
+ Bits_memcpy(txid->bytes, src, src->addrLen);
if (userTxid) {
- Bits_memcpy(txid->bytes + 4, userTxid->bytes, userTxid->len);
+ Bits_memcpy(txid->bytes + src->addrLen, userTxid->bytes, userTxid->len);
}
// If they're asking for a cookie then lets give them one.
@@ -348,6 +368,24 @@ static void handleRequest(Dict* messageDict,
authed = true;
}
+ // Then sent a valid authed query, lets track their address so they can receive
+ // asynchronous messages.
+ int index = Map_LastMessageTimeByAddr_indexForKey(&src, &admin->map);
+ uint64_t now = Time_currentTimeMilliseconds(admin->eventBase);
+ admin->asyncEnabled = 1;
+ if (index >= 0) {
+ admin->map.values[index]->timeOfLastMessage = now;
+ } else if (authed) {
+ struct Allocator* entryAlloc = Allocator_child(admin->allocator);
+ struct MapValue* mv = Allocator_calloc(entryAlloc, sizeof(struct MapValue), 1);
+ mv->timeOfLastMessage = now;
+ mv->allocator = entryAlloc;
+ struct Sockaddr* storedAddr = Sockaddr_clone(src, entryAlloc);
+ Map_LastMessageTimeByAddr_put(&storedAddr, &mv, &admin->map);
+ } else {
+ admin->asyncEnabled = 0;
+ }
+
Dict* args = Dict_getDict(messageDict, String_CONST("args"));
bool noFunctionsCalled = true;
for (int i = 0; i < admin->functionCount; i++) {
@@ -362,141 +400,72 @@ static void handleRequest(Dict* messageDict,
}
if (noFunctionsCalled) {
- Dict* d = Dict_new(allocator);
- String* list = String_CONST("availableFunctions");
- if (!String_equals(query, list)) {
- Dict_putString(d,
- String_CONST("error"),
- String_CONST("No functions matched your request."),
- allocator);
- }
- Dict* functions = Dict_new(allocator);
- for (int i = 0; i < admin->functionCount; i++) {
- Dict_putDict(functions, admin->functions[i].name, admin->functions[i].args, allocator);
- }
- if (functions) {
- Dict_putDict(d, String_CONST("availableFunctions"), functions, allocator);
- }
- Admin_sendMessage(d, txid, admin);
- return;
+ Dict d = Dict_CONST(
+ String_CONST("error"),
+ String_OBJ(String_CONST("No functions matched your request, "
+ "try Admin_availableFunctions()")),
+ NULL
+ );
+ Admin_sendMessage(&d, txid, admin);
}
return;
}
-#define handleFrame_PARTIAL 1 // got a valid partial request.
-#define handleFrame_INVALID 2 // invalid request.
-#define handleFrame_CONTINUE 0 // parsed frame, keep parsing to get next frame.
-static int handleFrame(struct Message* message,
- struct Channel* channel,
- struct Allocator* tempAlloc,
- struct Admin* admin)
+static void handleMessage(struct Message* message,
+ struct Sockaddr* src,
+ struct Allocator* alloc,
+ struct Admin* admin)
{
- struct Reader* reader = ArrayReader_new(message->bytes, message->length, tempAlloc);
-
- char nextByte;
- int ret;
- while (!(ret = reader->read(&nextByte, 1, reader)) && nextByte != 'd');
+ message->bytes[message->length] = '\0';
+ Log_keys(admin->logger, "Got message from [%s] [%s]",
+ Sockaddr_print(src, alloc), message->bytes);
- if (ret) {
- // out of data.
- channel->partialMessageLength = 0;
- return handleFrame_INVALID;
+ // handle non empty message data
+ if (message->length > Admin_MAX_REQUEST_SIZE) {
+ #define TOO_BIG "d5:error16:Request too big.e"
+ #define TOO_BIG_STRLEN (sizeof(TOO_BIG) - 1)
+ Bits_memcpyConst(message->bytes, TOO_BIG, TOO_BIG_STRLEN);
+ message->length = TOO_BIG_STRLEN;
+ sendMessage(message, src, admin);
+ return;
}
- // back the reader up by 1.
- reader->skip(-1, reader);
-
+ struct Reader* reader = ArrayReader_new(message->bytes, message->length, alloc);
Dict messageDict;
- ret = StandardBencSerializer_get()->parseDictionary(reader, tempAlloc, &messageDict);
- if (ret == -2) {
- // couldn't parse any more data.
- return handleFrame_PARTIAL;
- }
- if (ret) {
+ if (StandardBencSerializer_get()->parseDictionary(reader, alloc, &messageDict)) {
Log_warn(admin->logger,
- "Got unparsable data from admin interface on channel [%u] content: [%s].",
- channel->number, message->bytes);
- closeChannel(channel, admin);
- return handleFrame_INVALID;
- }
-
- uint32_t amount = reader->bytesRead(reader);
-
- struct Message tmpMessage = { .bytes = message->bytes, .length = amount, .padding = 0 };
- handleRequest(&messageDict, &tmpMessage, channel, tempAlloc, admin);
-
- Message_shift(message, -amount);
- return handleFrame_CONTINUE;
-}
-
-static void handleMessage(struct Message* message,
- struct Channel* channel,
- struct Admin* admin)
-{
- Log_debug(admin->logger, "Handling message.");
- if (channel->partialMessageLength > 0) {
- Log_debug(admin->logger, "Channel [%d] has [%u] bytes of existing data on it.",
- channel->number, channel->partialMessageLength);
- Message_shift(message, channel->partialMessageLength);
- Bits_memcpy(message->bytes, channel->partialMessageBuffer, channel->partialMessageLength);
- channel->partialMessageLength = 0;
+ "Unparsable data from [%s] content: [%s]",
+ Sockaddr_print(src, alloc), message->bytes);
+ return;
}
- // Try to handle multiple stacked requests in the same frame.
- for (;;) {
- struct Allocator* allocator = Allocator_child(admin->allocator);
- int ret = handleFrame(message, channel, allocator, admin);
- Allocator_free(allocator);
- if (ret == handleFrame_PARTIAL) {
- break;
- }
- if (ret == handleFrame_INVALID) {
- return;
- }
+ int amount = reader->bytesRead(reader);
+ if (amount < message->length) {
+ Log_warn(admin->logger,
+ "Message from [%s] contained garbage after byte [%d] content: [%s]",
+ Sockaddr_print(src, alloc), amount - 1, message->bytes);
+ return;
}
- // This is an idea of just how much the incoming message can be shifted.
- // Pathological messages full of massive requests will fail.
- if (0 != message->length && message->length <= Admin_MAX_REQUEST_SIZE) {
- // move data to start of buffer, so we can append new data
- Bits_memcpy(channel->partialMessageBuffer, message->bytes, message->length);
- channel->partialMessageLength = message->length;
- }
+ handleRequest(&messageDict, message, src, alloc, admin);
}
static uint8_t receiveMessage(struct Message* message, struct Interface* iface)
{
- struct Admin* admin = iface->receiverContext;
-
- Assert_true(message->length >= 4);
+ struct Admin* admin = Identity_cast((struct Admin*) iface->receiverContext);
- uint32_t channelNum;
- Bits_memcpyConst(&channelNum, message->bytes, 4);
- Message_shift(message, -4);
+ Assert_true(message->length >= (int)admin->addrLen);
+ struct Sockaddr_storage addrStore;
+ Message_pop(message, &addrStore, admin->addrLen);
- Log_debug(admin->logger, "Got data from [%u]", channelNum);
+ struct Allocator* alloc = Allocator_child(admin->allocator);
+ admin->inRequest = 1;
- struct Channel* channel = channelForNum(channelNum, true, admin);
-
- if (0 == message->length) {
- // empty message -> close channel
- channel->alloc->free(channel->alloc);
- return 0;
- }
+ handleMessage(message, &addrStore.addr, alloc, admin);
- // handle non empty message data
- if (message->length > Admin_MAX_REQUEST_SIZE) {
- #define TOO_BIG "d5:error16:Request too big.e"
- #define TOO_BIG_STRLEN (sizeof(TOO_BIG) - 1)
- Bits_memcpyConst(message->bytes, TOO_BIG, TOO_BIG_STRLEN);
- message->length = TOO_BIG_STRLEN;
- sendMessage(message, channelNum, admin);
- } else if (channel->isClosing) {
- // Do nothing because the channel is in closing state.
- } else {
- handleMessage(message, channel, admin);
- }
+ admin->inRequest = 0;
+ Allocator_free(alloc);
return 0;
}
@@ -511,15 +480,13 @@ void Admin_registerFunctionWithArgCount(char* name,
if (!admin) {
return;
}
+ Identity_check(admin);
+
String* str = String_new(name, admin->allocator);
- if (!admin->functionCount) {
- admin->functions = admin->allocator->malloc(sizeof(struct Function), admin->allocator);
- } else {
- admin->functions =
- admin->allocator->realloc(admin->functions,
- sizeof(struct Function) * (admin->functionCount + 1),
- admin->allocator);
- }
+ admin->functions =
+ Allocator_realloc(admin->allocator,
+ admin->functions,
+ sizeof(struct Function) * (admin->functionCount + 1));
struct Function* fu = &admin->functions[admin->functionCount];
admin->functionCount++;
@@ -550,23 +517,36 @@ void Admin_registerFunctionWithArgCount(char* name,
}
}
-struct Admin* Admin_new(struct Interface* toAngelInterface,
+struct Admin* Admin_new(struct AddrInterface* iface,
struct Allocator* alloc,
struct Log* logger,
- struct event_base* eventBase,
+ struct EventBase* eventBase,
String* password)
{
- struct Admin* admin = alloc->clone(sizeof(struct Admin), alloc, &(struct Admin) {
- .toAngelInterface = toAngelInterface,
+ struct Admin* admin = Allocator_clone(alloc, (&(struct Admin) {
+ .iface = iface,
.allocator = alloc,
.logger = logger,
- .functionCount = 0,
.eventBase = eventBase,
- .password = String_clone(password, alloc)
- });
+ .addrLen = iface->addr->addrLen,
+ .map = {
+ .allocator = alloc
+ }
+ }));
+ Identity_set(admin);
+
+ admin->password = String_clone(password, alloc);
+
+ Timeout_setInterval(clearExpiredAddresses, admin, TIMEOUT_MILLISECONDS * 3, eventBase, alloc);
+
+ iface->generic.receiveMessage = receiveMessage;
+ iface->generic.receiverContext = admin;
- toAngelInterface->receiveMessage = receiveMessage;
- toAngelInterface->receiverContext = admin;
+ Admin_registerFunction("Admin_asyncEnabled", asyncEnabled, admin, false, NULL, admin);
+ Admin_registerFunction("Admin_availableFunctions", availableFunctions, admin, false,
+ ((struct Admin_FunctionArg[]) {
+ { .name = "page", .required = 0, .type = "Int" }
+ }), admin);
return admin;
}
View
16 admin/Admin.h
@@ -17,7 +17,7 @@
#include "benc/Dict.h"
#include "exception/Except.h"
-#include "interface/Interface.h"
+#include "interface/addressable/AddrInterface.h"
#include "memory/Allocator.h"
#include "util/log/Log.h"
#include "util/UniqueName.h"
@@ -45,11 +45,11 @@ struct Admin_FunctionArg
* @param arguments an array of struct Admin_FunctionArg specifying what functions are available
* and of those, which are required.
* Example C code:
- * struct Admin_FunctionArg adma[2] = {
- * { .name = "password", .required = 1, .type = "String" },
- * { .name = "authType", .required = 0, .type = "Int" }
- * };
- * Admin_registerFunction("AuthorizedPasswords_add", add, context, true, adma, admin);
+ * Admin_registerFunction("AuthorizedPasswords_add", addPass, ctx, true,
+ * ((struct Admin_FunctionArg[]) {
+ * { .name = "password", .required = 1, .type = "String" },
+ * { .name = "authType", .required = 0, .type = "Int" }
+ * }), admin);
*/
void Admin_registerFunctionWithArgCount(char* name,
Admin_FUNCTION(callback),
@@ -65,9 +65,7 @@ void Admin_registerFunctionWithArgCount(char* name,
#define Admin_sendMessage_CHANNEL_CLOSED -1
int Admin_sendMessage(Dict* message, String* txid, struct Admin* admin);
-int Admin_sendMessageToAngel(Dict* message, struct Admin* admin);
-
-struct Admin* Admin_new(struct Interface* toAngelInterface,
+struct Admin* Admin_new(struct AddrInterface* iface,
struct Allocator* alloc,
struct Log* logger,
struct EventBase* eventBase,
View
141 admin/AdminClient.c
@@ -15,22 +15,24 @@
#include "admin/AdminClient.h"
#include "benc/serialization/BencSerializer.h"
#include "benc/serialization/standard/StandardBencSerializer.h"
+#include "interface/addressable/AddrInterface.h"
+#include "interface/addressable/UDPAddrInterface.h"
+#include "exception/Except.h"
#include "io/ArrayReader.h"
#include "io/ArrayWriter.h"
#include "io/Reader.h"
#include "io/Writer.h"
+#include "util/Bits.h"
#include "util/platform/libc/strlen.h"
#include "util/Endian.h"
#include "util/Errno.h"
#include "util/Hex.h"
+#include "util/events/Timeout.h"
+#include "util/Identity.h"
+#include "wire/Message.h"
+#include <stdio.h>
#include <crypto_hash_sha256.h>
-#include <unistd.h>
-#include <event2/event.h>
-
-#ifdef BSD
- #include <netinet/in.h> // sizeof(struct sockaddr_in) on BSD
-#endif
struct AdminClient
{
@@ -52,12 +54,13 @@ struct Result
struct Context
{
struct AdminClient public;
+ struct EventBase* eventBase;
+ struct Sockaddr* targetAddr;
struct Result* result;
- evutil_socket_t socket;
- struct event* socketEvent;
- struct event_base* eventBase;
+ struct AddrInterface* addrIface;
struct Log* logger;
String* password;
+ Identity
};
static int calculateAuth(Dict* message,
@@ -97,10 +100,10 @@ static int calculateAuth(Dict* message,
static void done(struct Context* ctx, enum AdminClient_Error err)
{
ctx->result->public.err = err;
- event_base_loopexit(ctx->eventBase, NULL);
+ EventBase_endLoop(ctx->eventBase);
}
-static void timeout(evutil_socket_t socket, short eventType, void* vcontext)
+static void timeout(void* vcontext)
{
done((struct Context*) vcontext, AdminClient_Error_TIMEOUT);
}
@@ -139,58 +142,56 @@ static void doCall(Dict* message, struct Result* res, bool getCookie)
}
}
- send(res->ctx->socket, res->public.messageBytes, writer->bytesWritten(writer), 0);
+ struct Timeout* to =
+ Timeout_setTimeout(timeout, res->ctx, 5000, res->ctx->eventBase, res->alloc);
- struct event* timeoutEvent = evtimer_new(res->ctx->eventBase, timeout, res->ctx);
- evtimer_add(timeoutEvent, (&(struct timeval) { .tv_sec = 5, .tv_usec = 0 }));
+ struct Message m = {
+ .bytes = res->public.messageBytes,
+ .padding = AdminClient_Result_PADDING_SIZE,
+ .length = writer->bytesWritten(writer)
+ };
+ Message_push(&m, res->ctx->targetAddr, res->ctx->targetAddr->addrLen);
+ res->ctx->addrIface->generic.sendMessage(&m, &res->ctx->addrIface->generic);
- event_base_dispatch(res->ctx->eventBase);
+ EventBase_beginLoop(res->ctx->eventBase);
- evtimer_del(timeoutEvent);
+ Timeout_clearTimeout(to);
}
-static void incoming(evutil_socket_t socket, short eventType, void* vcontext)
+static uint8_t receiveMessage(struct Message* msg, struct Interface* iface)
{
- struct Context* ctx = vcontext;
+ struct Context* ctx = Identity_cast((struct Context*) iface->receiverContext);
// Since this is a blocking api, one result per context.
struct Result* res = ctx->result;
- ssize_t length = recv(socket, res->public.messageBytes, AdminClient_MAX_MESSAGE_SIZE, 0);
- if (length == AdminClient_MAX_MESSAGE_SIZE) {
- while (length) {
- // purge the socket.
- length = recv(socket, res->public.messageBytes, AdminClient_MAX_MESSAGE_SIZE, 0);
- }
- done(ctx, AdminClient_Error_OVERLONG_RESPONSE);
- return;
- }
- if (length < 1) {
- if (length < 0) {
- done(ctx, AdminClient_Error_ERROR_READING_FROM_SOCKET);
- } else {
- done(ctx, AdminClient_Error_SOCKET_NOT_READY);
- }
- return;
+ struct Sockaddr_storage source;
+ Message_pop(msg, &source, ctx->targetAddr->addrLen);
+ if (Bits_memcmp(&source, ctx->targetAddr, ctx->targetAddr->addrLen)) {
+ return 0;
}
- res->public.messageBytes[length] = '\0';
- struct Reader* reader = ArrayReader_new(res->public.messageBytes, length, res->alloc);
+ struct Reader* reader = ArrayReader_new(msg->bytes, msg->length, res->alloc);
Dict* d = Dict_new(res->alloc);
if (StandardBencSerializer_get()->parseDictionary(reader, res->alloc, d)) {
done(ctx, AdminClient_Error_DESERIALIZATION_FAILED);
- return;
+ return 0;
}
res->public.responseDict = d;
+ int len =
+ (msg->length > AdminClient_MAX_MESSAGE_SIZE) ? AdminClient_MAX_MESSAGE_SIZE : msg->length;
+ Bits_memset(res->public.messageBytes, 0, AdminClient_MAX_MESSAGE_SIZE);
+ Bits_memcpy(res->public.messageBytes, msg->bytes, len);
done(ctx, AdminClient_Error_NONE);
+ return 0;
}
struct AdminClient_Result* AdminClient_rpcCall(String* function,
Dict* args,
struct AdminClient* client,
struct Allocator* alloc)
{
- struct Context* ctx = (struct Context*) client;
+ struct Context* ctx = Identity_cast((struct Context*) client);
Dict a = (args) ? *args : NULL;
Dict message = Dict_CONST(
String_CONST("q"), String_OBJ(String_CONST("auth")), Dict_CONST(
@@ -233,64 +234,34 @@ char* AdminClient_errorString(enum AdminClient_Error err)
};
}
-static void disconnect(void* vcontext)
-{
- struct Context* context = vcontext;
- EVUTIL_CLOSESOCKET(context->socket);
- event_del(context->socketEvent);
-}
-
-struct AdminClient* AdminClient_new(uint8_t* sockAddr,
- int addrLen,
+struct AdminClient* AdminClient_new(struct Sockaddr* connectToAddress,
String* adminPassword,
- struct event_base* eventBase,
+ struct EventBase* eventBase,
struct Log* logger,
struct Allocator* alloc)
{
- struct sockaddr_storage* addr = (struct sockaddr_storage*) sockAddr;
-
- struct Context* context = alloc->clone(sizeof(struct Context), alloc, &(struct Context) {
+ struct Context* context = Allocator_clone(alloc, (&(struct Context) {
.eventBase = eventBase,
.logger = logger,
.password = adminPassword,
- });
-
- context->socket = socket(addr->ss_family, SOCK_STREAM, 0);
-
- if (context->socket < 0) {
- Log_error(logger, "Failed to allocate socket [%s]", Errno_getString());
- return NULL;
- }
-
- evutil_make_listen_socket_reuseable(context->socket);
-
- if (addr->ss_family == AF_INET) {
- struct sockaddr_in* inAddr = (struct sockaddr_in*) addr;
- if (inAddr->sin_addr.s_addr == 0) {
+ }));
+ Identity_set(context);
+
+ context->targetAddr = Sockaddr_clone(connectToAddress, alloc);
+ if (Sockaddr_getFamily(context->targetAddr) == Sockaddr_AF_INET) {
+ uint8_t* addrBytes;
+ int len = Sockaddr_getAddress(context->targetAddr, &addrBytes);
+ if (Bits_isZero(addrBytes, len)) {
// 127.0.0.1
- inAddr->sin_addr.s_addr = Endian_hostToBigEndian32(0x7f000001);
+ uint32_t loopback = Endian_hostToBigEndian32(0x7f000001);
+ Bits_memcpyConst(addrBytes, &loopback, 4);
}
}
+ Log_debug(logger, "Connecting to [%s]", Sockaddr_print(context->targetAddr, alloc));
- if (connect(context->socket, (struct sockaddr*)addr, addrLen)) {
- #ifdef Log_ERROR
- enum Errno err = Errno_get();
- char printedAddr[128];
- uint16_t port = Endian_bigEndianToHost16(((struct sockaddr_in*)addr)->sin_port);
- evutil_inet_ntop(AF_INET, &((struct sockaddr_in*)addr)->sin_addr, printedAddr, 128);
- Log_error(logger, "Failed to connect to admin port at [%s:%u], [%s]",
- printedAddr, port, Errno_strerror(err));
- #endif
- return NULL;
- }
-
- evutil_make_socket_nonblocking(context->socket);
-
- context->socketEvent =
- event_new(context->eventBase, context->socket, EV_READ | EV_PERSIST, incoming, context);
- event_add(context->socketEvent, NULL);
-
- alloc->onFree(disconnect, context, alloc);
+ context->addrIface = UDPAddrInterface_new(eventBase, NULL, alloc, NULL, logger);
+ context->addrIface->generic.receiveMessage = receiveMessage;
+ context->addrIface->generic.receiverContext = context;
return &context->public;
}
View
10 admin/AdminClient.h
@@ -20,6 +20,7 @@
#include "memory/Allocator.h"
#include "util/log/Log.h"
#include "util/events/EventBase.h"
+#include "util/platform/Sockaddr.h"
enum AdminClient_Error
{
@@ -51,11 +52,17 @@ enum AdminClient_Error
/** The biggest message that can be sent or received. */
#define AdminClient_MAX_MESSAGE_SIZE 1023
+/** The amount of message padding. */
+#define AdminClient_Result_PADDING_SIZE (sizeof(struct Sockaddr_storage))
+
struct AdminClient_Result
{
/** The error type of AdminClient_Error_NONE if there was no error. */
enum AdminClient_Error err;
+ /** Space to put the address of the node which the response is being sent to. */
+ uint8_t padding[AdminClient_Result_PADDING_SIZE];
+
/**
* When the request is made, this will hold the request bytes,
* after it will hold the response bytes. If there is an error
@@ -79,8 +86,7 @@ struct AdminClient_Result* AdminClient_rpcCall(String* function,
struct Allocator* requestAlloc);
-struct AdminClient* AdminClient_new(uint8_t* sockAddr,
- int addrLen,
+struct AdminClient* AdminClient_new(struct Sockaddr* addr,
String* adminPassword,
struct EventBase* eventBase,
struct Log* logger,
View
22 admin/AdminLog.c
@@ -16,7 +16,7 @@
#include "admin/AdminLog.h"
#include "benc/Dict.h"
#include "benc/String.h"
-#include "crypto/Random.h"
+#include "crypto/random/Random.h"
#include "io/Writer.h"
#include "memory/BufferAllocator.h"
#include "util/log/Log.h"
@@ -296,17 +296,17 @@ struct Log* AdminLog_registerNew(struct Admin* admin, struct Allocator* alloc, s
.rand = rand
});
- struct Admin_FunctionArg subscribeArgs[] = {
- { .name = "level", .required = 0, .type = "String" },
- { .name = "line", .required = 0, .type = "Int" },
- { .name = "file", .required = 0, .type = "String" }
- };
- Admin_registerFunction("AdminLog_subscribe", subscribe, log, true, subscribeArgs, admin);
+ Admin_registerFunction("AdminLog_subscribe", subscribe, log, true,
+ ((struct Admin_FunctionArg[]) {
+ { .name = "level", .required = 0, .type = "String" },
+ { .name = "line", .required = 0, .type = "Int" },
+ { .name = "file", .required = 0, .type = "String" }
+ }), admin);
- struct Admin_FunctionArg unsubscribeArgs[] = {
- { .name = "streamId", .required = 1, .type = "String" }
- };
- Admin_registerFunction("AdminLog_unsubscribe", unsubscribe, log, true, unsubscribeArgs, admin);
+ Admin_registerFunction("AdminLog_unsubscribe", unsubscribe, log, true,
+ ((struct Admin_FunctionArg[]) {
+ { .name = "streamId", .required = 1, .type = "String" }
+ }), admin);
return &log->pub;
}
View
2 admin/AdminLog.h
@@ -16,7 +16,7 @@
#define AdminLog_H
#include "admin/Admin.h"
-#include "crypto/Random.h"
+#include "crypto/random/Random.h"
#include "memory/Allocator.h"
#include "util/log/Log.h"
View
7 admin/CMakeLists.txt
@@ -29,7 +29,12 @@ add_library(cjdns-admin-client
AdminClient.c
Configurator.c
)
-target_link_libraries(cjdns-admin-client crypto cjdbenc_StandardBencSerializer)
+target_link_libraries(cjdns-admin-client
+ crypto
+ cjdbenc_StandardBencSerializer
+ cjdns-util-platform-socket
+ cjdns-interface-addressable
+)
add_subdirectory(testframework)
add_subdirectory(angel)
View
28 admin/Configurator.c
@@ -18,11 +18,10 @@
#include "benc/Dict.h"
#include "benc/Int.h"
#include "memory/Allocator.h"
+#include "util/events/Event.h"
#include "util/platform/libc/strlen.h"
#include "util/log/Log.h"
-#include <event2/event.h>
-
struct Context
{
struct Log* logger;
@@ -48,11 +47,11 @@ static void die(struct AdminClient_Result* res, struct Context* ctx, struct Allo
exit(1);
}
-static void rpcCall0(String* function,
- Dict* args,
- struct Context* ctx,
- struct Allocator* alloc,
- bool exitIfError)
+static int rpcCall0(String* function,
+ Dict* args,
+ struct Context* ctx,
+ struct Allocator* alloc,
+ bool exitIfError)
{
struct AdminClient_Result* res = AdminClient_rpcCall(function, args, ctx->client, alloc);
if (res->err) {
@@ -73,7 +72,9 @@ static void rpcCall0(String* function,
}
Log_warn(ctx->logger, "Got error [%s] calling [%s], ignoring.",
error->bytes, function->bytes);
+ return 1;
}
+ return 0;
}
static void rpcCall(String* function, Dict* args, struct Context* ctx, struct Allocator* alloc)
@@ -241,7 +242,9 @@ static void ethInterface(Dict* config, struct Context* ctx)
if (deviceStr) {
Dict_putString(d, String_CONST("bindDevice"), deviceStr, ctx->alloc);
}
- rpcCall(String_CONST("ETHInterface_new"), d, ctx, ctx->alloc);
+ if (rpcCall0(String_CONST("ETHInterface_new"), d, ctx, ctx->alloc, false)) {
+ continue;
+ }
// Make the connections.
Dict* connectTo = Dict_getDict(eth, String_CONST("connectTo"));
@@ -304,16 +307,15 @@ static void security(List* securityConf, struct Allocator* tempAlloc, struct Con
}
void Configurator_config(Dict* config,
- uint8_t* sockAddr,
- int addrLen,
+ struct Sockaddr* sockAddr,
String* adminPassword,
- struct event_base* eventBase,
+ struct EventBase* eventBase,
struct Log* logger,
struct Allocator* alloc)
{
struct Allocator* tempAlloc = Allocator_child(alloc);
struct AdminClient* client =
- AdminClient_new(sockAddr, addrLen, adminPassword, eventBase, logger, tempAlloc);
+ AdminClient_new(sockAddr, adminPassword, eventBase, logger, tempAlloc);
struct Context ctx = { .logger = logger, .alloc = tempAlloc, .client = client };
@@ -335,5 +337,5 @@ void Configurator_config(Dict* config,
List* securityList = Dict_getList(config, String_CONST("security"));
security(securityList, tempAlloc, &ctx);
- tempAlloc->free(tempAlloc);
+ Allocator_free(tempAlloc);
}
View
4 admin/Configurator.h
@@ -20,12 +20,12 @@
#include "memory/Allocator.h"
#include "util/log/Log.h"
#include "util/events/EventBase.h"
+#include "util/platform/Sockaddr.h"
#include <stdint.h>
void Configurator_config(Dict* config,
- uint8_t* sockAddr,
- int addrLen,
+ struct Sockaddr* addr,
String* adminPassword,
struct EventBase* eventBase,
struct Log* logger,
View
196 admin/angel/Angel.c
@@ -19,35 +19,21 @@
#include "interface/PipeInterface.h"
#include "interface/Interface.h"
#include "util/platform/libc/strlen.h"
+#include "util/platform/Socket.h"
+#include "util/events/Event.h"
#include "util/Bits.h"
#include "util/Errno.h"
#include "util/log/Log.h"
-#include "util/Time.h"
-#include "util/Timeout.h"
+#include "util/events/Time.h"
+#include "util/events/Timeout.h"
#include "wire/Message.h"
#include "wire/Error.h"
-#include <event2/event.h>
-#include <unistd.h>
-
-struct AngelContext;
-
-struct Connection {
- struct event* read; /** NULL: socket closed */
- evutil_socket_t socket; /** -1: channel (to core) closed */
- struct AngelContext* context;
-};
-
struct AngelContext
{
- struct Connection connections[Angel_MAX_CONNECTIONS];
-
- /** The event which listens for new connections. */
- struct event* socketEvent;
-
struct Interface* coreIface;
-
- struct event_base* eventBase;
+ struct EventBase* eventBase;
+ struct Allocator* alloc;
struct Log* logger;
};
@@ -65,14 +51,14 @@ static void handleMessageForAngel(struct Message* message, struct AngelContext*
/**
* send message via pipe to core process
- */
+ *
static void sendToCore(struct Message* message, uint32_t connNumber, struct AngelContext* context)
{
Message_shift(message, 4);
Bits_memcpyConst(message->bytes, &connNumber, 4);
//Log_debug(context->logger, "sending Message to core");
context->coreIface->sendMessage(message, context->coreIface);
-}
+}*/
/**
* handle message on the pipe from core process
@@ -83,170 +69,32 @@ static uint8_t receiveMessage(struct Message* message, struct Interface* iface)
Assert_true(message->length >= 4);
- uint32_t connNumber;
- Bits_memcpyConst(&connNumber, message->bytes, 4);
+ uint32_t handle;
+ Bits_memcpyConst(&handle, message->bytes, 4);
Message_shift(message, -4);
- if (connNumber == 0xffffffff) {
+ if (handle == 0xffffffff) {
handleMessageForAngel(message, context);
return Error_NONE;
}
- //Log_debug(context->logger, "Got incoming message from [%u] with content [%s]",
- // connNumber, message->bytes);
-
- if (connNumber <= 0xffff) {
- if (connNumber >= Angel_MAX_CONNECTIONS) {
- fprintf(stderr, "got message for connection #%u, max connections is %d\n",
- connNumber, Angel_MAX_CONNECTIONS);
- return Error_NONE;
- }
-
- struct Connection* conn = &context->connections[connNumber];
- if (-1 == conn->socket) {
- fprintf(stderr, "got message for closed channel #%u", connNumber);
- return Error_NONE;
- }
-
- if (0 == message->length) {
- /* close channel / recv ACK for close */
- if (NULL != conn->read) {
- // send close ACK
- message->length = 0;
- sendToCore(message, connNumber, context);
- EVUTIL_CLOSESOCKET(conn->socket);
- event_free(conn->read);
- conn->read = NULL;
- }
- conn->socket = -1;
- } else if (NULL == conn->read) {
- /* drop message - channel is closed, wait for close ACK */
- } else {
- ssize_t sent;
- sent = send(conn->socket, message->bytes, message->length, 0);
- if (sent != (ssize_t) message->length) {
- // All errors lead to closing the socket.
- EVUTIL_CLOSESOCKET(conn->socket);
- event_free(conn->read);
- conn->read = NULL;
- // send close channel
- message->length = 0;
- sendToCore(message, connNumber, context);
- // set conn->socket = -1 later when we recv close ACK
- }
- }
- }
return Error_NONE;
}
-/**
- * handle incoming tcp data from client connections in the admin process
- */
-static void incomingFromClient(evutil_socket_t socket, short eventType, void* vconn)
-{
- struct Connection* conn = (struct Connection*) vconn;
- struct AngelContext* context = conn->context;
-
- uint8_t buf[PipeInterface_MAX_MESSAGE_SIZE + PipeInterface_PADDING];
- uint32_t connNumber = conn - context->connections;
-
- struct Message message = {
- .bytes = buf + PipeInterface_PADDING,
- .length = PipeInterface_MAX_MESSAGE_SIZE,
- .padding = PipeInterface_PADDING
- };
- ssize_t result = recv(socket, message.bytes, message.length, 0);
-
- if (result > 0) {
- message.length = result;
- sendToCore(&message, connNumber, context);
- } else if (result < 0 && Errno_get() == Errno_EAGAIN) {
- return;
- } else {
- // The return value will be 0 when the peer has performed an orderly shutdown.
- EVUTIL_CLOSESOCKET(conn->socket);
- event_free(conn->read);
- conn->read = NULL;
- // send close channel
- message.length = 0;
- sendToCore(&message, connNumber, context);
- // set conn->socket = -1 later when we recv close ACK
- }
-}
-
-static struct Connection* newConnection(struct AngelContext* context, evutil_socket_t fd)
-{
- struct Connection* conn = NULL;
- for (int i = 0; i < Angel_MAX_CONNECTIONS; i++) {
- if (context->connections[i].read == NULL && context->connections[i].socket == -1) {
- conn = &context->connections[i];
- break;
- }
- }
-
- if (!conn) {
- return NULL;
- }
-
- conn->read = event_new(context->eventBase, fd, EV_READ | EV_PERSIST, incomingFromClient, conn);
- conn->socket = fd;
- conn->context = context;
-
- if (!conn->read) {
- return NULL;
- }
-
- event_add(conn->read, NULL);
- return conn;
-}
-
-static void acceptConn(evutil_socket_t socket, short eventType, void* vcontext)
-{
- struct AngelContext* context = (struct AngelContext*) vcontext;
-
- struct sockaddr_storage ss;
- ev_socklen_t slen = sizeof(ss);
- evutil_socket_t fd = accept(socket, (struct sockaddr*)&ss, &slen);
- if (fd < 0) {
- perror("acceptConn() fd < 0");
- return;
- } else if (fd > (evutil_socket_t) FD_SETSIZE) {
- EVUTIL_CLOSESOCKET(fd);
- return;
- }
-
- evutil_make_socket_nonblocking(fd);
-
- struct Connection* conn = newConnection(context, fd);
- if (!conn) {
- EVUTIL_CLOSESOCKET(fd);
- }
-}
-
-void Angel_start(String* pass,
- evutil_socket_t tcpSocket,
- struct Interface* coreIface,
- struct event_base* eventBase,
+void Angel_start(struct Interface* coreIface,
+ struct EventBase* eventBase,
struct Log* logger,
struct Allocator* alloc)
{
- struct AngelContext contextStore;
- struct AngelContext* context = &contextStore;
- Bits_memset(context, 0, sizeof(struct AngelContext));
-
- for (int i = 0; i < Angel_MAX_CONNECTIONS; i++) {
- context->connections[i].socket = -1;
- }
- context->eventBase = eventBase;
- context->logger = logger;
-
- context->coreIface = coreIface;
- context->coreIface->receiveMessage = receiveMessage;
- context->coreIface->receiverContext = context;
+ struct AngelContext ctx = {
+ .eventBase = eventBase,
+ .logger = logger,
+ .coreIface = coreIface,
+ .alloc = alloc
+ };
- context->socketEvent =
- event_new(context->eventBase, tcpSocket, EV_READ | EV_PERSIST, acceptConn, context);
- event_add(context->socketEvent, NULL);
+ coreIface->receiveMessage = receiveMessage;
+ coreIface->receiverContext = &ctx;
- event_base_dispatch(context->eventBase);
+ EventBase_beginLoop(eventBase);
}
View
11 admin/angel/Angel.h
@@ -15,20 +15,13 @@
#ifndef Angel_H
#define Angel_H
-#include "benc/String.h"
#include "memory/Allocator.h"
#include "util/log/Log.h"
#include "interface/Interface.h"
-
#include "util/events/EventBase.h"
-#define Angel_MAX_CONNECTIONS 64
-#define Angel_INITIAL_CONF_BUFF_SIZE 1024
-
-void Angel_start(String* pass,
- int tcpSocket,
- struct Interface* coreIface,
- struct event_base* eventBase,
+void Angel_start(struct Interface* coreIface,
+ struct EventBase* eventBase,
struct Log* logger,
struct Allocator* alloc);
View
135 admin/angel/AngelInit.c
@@ -44,13 +44,6 @@
#include <unistd.h>
#include <stdint.h>
-#include <event2/event.h>
-
-#ifdef BSD
- #include <netinet/in.h>
-#elif defined(WIN32)
- #include <ws2tcpip.h> /* sockaddr_in6 */
-#endif
/**
* Initialize the core.
@@ -60,13 +53,11 @@
* file descriptor for writing to the core.
* @param fromCore a pointer to an int which will be set to the
* file descriptor for reading from the core.
- * @param alloc an allocator.
* @param eh an exception handler in case something goes wrong.
*/
static void initCore(char* coreBinaryPath,
int* toCore,
int* fromCore,
- struct Allocator* alloc,
struct Except* eh)
{
int pipes[2][2];
@@ -126,89 +117,6 @@ static void sendConfToCore(struct Interface* toCoreInterface,
toCoreInterface->sendMessage(&m, toCoreInterface);
}
-/**
- * @param bindAddr a string representation of the address to bind to, if 0.0.0.0:0,
- * an address will be chosen automatically.
- * @param alloc an allocator to build the output.
- * @param eh an exception handler which will be triggered if anything goes wrong.
- * @param sock a pointer which will be set to the bound socket.
- * @return a dictionary containing: "addr" which is a string representation of the ip address
- * which was bound and "port" which is the bound port as an integer.
- */
-static String* bindListener(String* bindAddr,
- struct Allocator* alloc,
- struct Except* eh,
- evutil_socket_t* sock)
-{
- struct sockaddr_storage addr;
- int addrLen = sizeof(struct sockaddr_storage);
- Bits_memset(&addr, 0, sizeof(struct sockaddr_storage));
- if (evutil_parse_sockaddr_port(bindAddr->bytes, (struct sockaddr*) &addr, &addrLen)) {
- Except_raise(eh, -1, "Unable to parse [%s] as an ip address and port, "
- "eg: 127.0.0.1:11234", bindAddr->bytes);
- }
-
- if (!addr.ss_family) {
- addr.ss_family = AF_INET;
- // Apple gets mad if the length is wrong.
- addrLen = sizeof(struct sockaddr_in);
- }
-
- evutil_socket_t listener = socket(addr.ss_family, SOCK_STREAM, 0);
- if (listener < 0) {
- Except_raise(eh, -1, "Failed to allocate socket() [%s]", Errno_getString());
- }
-
- evutil_make_listen_socket_reuseable(listener);
-
- if (bind(listener, (struct sockaddr*) &addr, addrLen) < 0) {
- enum Errno err = Errno_get();
- EVUTIL_CLOSESOCKET(listener);
- Except_raise(eh, -1, "Failed to bind() socket [%s]", Errno_strerror(err));
- }
-
- if (getsockname(listener, (struct sockaddr*) &addr, (ev_socklen_t*) &addrLen)) {
- enum Errno err = Errno_get();
- EVUTIL_CLOSESOCKET(listener);
- Except_raise(eh, -1, "Failed to get socket name [%s]", Errno_strerror(err));
- }
-
- if (listen(listener, 16) < 0) {
- enum Errno err = Errno_get();
- EVUTIL_CLOSESOCKET(listener);
- Except_raise(eh, -1, "Failed to listen on socket [%s]", Errno_strerror(err));
- }
-
- #define ADDR_BUFF_SZ 64
- char addrStr[ADDR_BUFF_SZ] = {0};
- uint16_t port;
- switch(addr.ss_family) {
- case AF_INET:
- evutil_inet_ntop(AF_INET,
- &(((struct sockaddr_in*)&addr)->sin_addr),
- addrStr,
- ADDR_BUFF_SZ);
- Bits_memcpyConst(&port, &((struct sockaddr_in*)&addr)->sin_port, 2);
- break;
- case AF_INET6:
- evutil_inet_ntop(AF_INET6,
- &(((struct sockaddr_in6*)&addr)->sin6_addr),
- addrStr + 1,
- ADDR_BUFF_SZ - 2);
- addrStr[0] = '[';
- addrStr[strlen(addrStr)] = ']';
- Bits_memcpyConst(&port, &((struct sockaddr_in6*)&addr)->sin6_port, 2);
- break;
- default:
- Assert_always(false);
- }
- port = Endian_bigEndianToHost16(port);
- snprintf(addrStr + strlen(addrStr), 48 - strlen(addrStr), ":%u", port);
-
- *sock = listener;
- return String_new(addrStr, alloc);
-}
-
static void setUser(char* user, struct Log* logger, struct Except* eh)
{
struct Jmp jmp;
@@ -268,12 +176,13 @@ int AngelInit_main(int argc, char** argv)
}
struct Allocator* alloc = MallocAllocator_new(1<<20);
- struct Random* rand = Random_new(alloc, eh);
+ struct Writer* logWriter = FileWriter_new(stdout, alloc);
+ struct Log* logger = WriterLog_new(logWriter, alloc);
+ struct Random* rand = Random_new(alloc, logger, eh);
alloc = CanaryAllocator_new(alloc, rand);
+ struct Allocator* tempAlloc = Allocator_child(alloc);
struct EventBase* eventBase = EventBase_new(alloc);
- struct Writer* logWriter = FileWriter_new(stdout, alloc);
- struct Log* logger = WriterLog_new(logWriter, alloc);
Log_debug(logger, "Initializing angel with input [%d] and output [%d]",
inFromClientNo, outToClientNo);
@@ -285,9 +194,9 @@ int AngelInit_main(int argc, char** argv)
Log_debug(logger, "Finished getting pre-configuration from client");
- struct Reader* reader = ArrayReader_new(buff, CONFIG_BUFF_SIZE, alloc);
+ struct Reader* reader = ArrayReader_new(buff, CONFIG_BUFF_SIZE, tempAlloc);
Dict config;
- if (StandardBencSerializer_get()->parseDictionary(reader, alloc, &config)) {
+ if (StandardBencSerializer_get()->parseDictionary(reader, tempAlloc, &config)) {
Except_raise(eh, -1, "Failed to parse configuration.");
}
@@ -311,12 +220,9 @@ int AngelInit_main(int argc, char** argv)
Except_raise(eh, -1, "missing configuration params in preconfig. [%s]", buff);
}
- evutil_socket_t tcpSocket;
- String* boundAddr = bindListener(bind, alloc, eh, &tcpSocket);
-
if (core) {
Log_info(logger, "Initializing core [%s]", core->bytes);
- initCore(core->bytes, &toCore, &fromCore, alloc, eh);
+ initCore(core->bytes, &toCore, &fromCore, eh);
}
Log_debug(logger, "Sending pre-configuration to core.");
@@ -325,28 +231,23 @@ int AngelInit_main(int argc, char** argv)
struct Interface* coreIface = &pif->generic;
PipeInterface_waitUntilReady(pif);
- sendConfToCore(coreIface, alloc, &config, eh, logger);
+ sendConfToCore(coreIface, tempAlloc, &config, eh, logger);
- struct Allocator* tempAlloc = Allocator_child(alloc);
- InterfaceWaiter_waitForData(coreIface, eventBase, tempAlloc, eh);
- tempAlloc->free(tempAlloc);
+ struct Message* coreResponse = InterfaceWaiter_waitForData(coreIface, eventBase, tempAlloc, eh);
+ write(outToClientNo, coreResponse->bytes, coreResponse->length);
- Dict* configResp = Dict_new(alloc);
- Dict* adminResp = Dict_new(alloc);
- Dict_putDict(configResp, String_CONST("admin"), adminResp, alloc);
- Dict_putString(adminResp, String_CONST("bind"), boundAddr, alloc);
+ #ifdef Log_KEYS
+ uint8_t lastChar = coreResponse->bytes[coreResponse->length-1];
+ coreResponse->bytes[coreResponse->length-1] = 0;
+ Log_keys(logger, "Sent [%s%c] to client.", coreResponse->bytes, lastChar);
+ coreResponse->bytes[coreResponse->length-1] = lastChar;
+ #endif
- struct Writer* toClientWriter = ArrayWriter_new(buff, CONFIG_BUFF_SIZE, alloc);
- if (StandardBencSerializer_get()->serializeDictionary(toClientWriter, configResp)) {
- Except_raise(eh, -1, "Failed to serialize response");
- }
- write(outToClientNo, buff, toClientWriter->bytesWritten(toClientWriter));
- buff[toClientWriter->bytesWritten(toClientWriter)] = 0;
- Log_keys(logger, "Sent [%s] to client.", buff);
if (user) {
setUser(user->bytes, logger, eh);
}
- Angel_start(pass, tcpSocket, coreIface, eventBase, logger, alloc);
+ Allocator_free(tempAlloc);
+ Angel_start(coreIface, eventBase, logger, alloc);
return 0;
}
View
10 admin/angel/CMakeLists.txt
@@ -27,7 +27,7 @@ target_link_libraries(cjdns-angel
cjdbenc
cjdbenc_StandardBencSerializer
cjdmemory
- cjdns-util-events-libevent
+ cjdns-util-events-libuv
cjdns-security
cjdns-process
cjdns-interface-waiter
@@ -36,11 +36,13 @@ target_link_libraries(cjdns-angel
cjdns-util-log-writer
cjdns-crypto-random
cjdns-memory-canary
+ cjdns-util-platform-socket
)
add_library(cjdns-core
Core.c
Core_admin.c
+ Hermes.c
)
target_link_libraries(cjdns-core
@@ -54,14 +56,16 @@ target_link_libraries(cjdns-core
cjdmemory
cjdns-admin
cjdnet
- cjdns-util-events-libevent
+ cjdns-util-events-libuv
cjdns-interface-waiter
cjdns-util-log-writer
cjdns-admin-logger
cjdns-util-log-indirect
cjdns-tunnel-iptunnel
cjdns-crypto-random
+ cjdns-crypto-random-libuv
cjdns-memory-canary
+ cjdns-interface-addressable
)
add_executable(cjdns
@@ -78,7 +82,7 @@ target_link_libraries(cjdroute2
cjdns-admin-client
crypto
cjdns-crypto-bench
- cjdns-util-events-libevent
+ cjdns-util-events-libuv
cjdbenc_StandardBencSerializer
cjdbenc_JsonBencSerializer
cjdns-process
View
79 admin/angel/Core.c
@@ -19,15 +19,19 @@
#include "admin/angel/Core.h"
#include "admin/angel/Core_admin.h"
#include "admin/angel/InterfaceWaiter.h"
+#include "admin/angel/Hermes.h"
#include "admin/AuthorizedPasswords.h"
#include "benc/Int.h"
#include "benc/serialization/BencSerializer.h"
#include "benc/serialization/standard/StandardBencSerializer.h"
#include "crypto/AddressCalc.h"
-#include "crypto/Random.h"
+#include "crypto/random/Random.h"
+#include "crypto/random/libuv/LibuvEntropyProvider.h"
#include "dht/ReplyModule.h"
#include "dht/SerializationModule.h"
#include "dht/dhtcore/RouterModule_admin.h"
+#include "interface/addressable/AddrInterface.h"
+#include "interface/addressable/UDPAddrInterface.h"
#include "interface/UDPInterface_admin.h"
#ifdef HAS_ETH_INTERFACE
#include "interface/ETHInterface_admin.h"
@@ -126,7 +130,7 @@ static void adminMemory(Dict* input, void* vcontext, String* txid)
static void adminExit(Dict* input, void* vcontext, String* txid)
{
Log_info((struct Log*) vcontext, "Got request to exit.");
- exit(1);
+ exit(0);
}
static Dict* getInitialConfig(struct Interface* iface,
@@ -149,7 +153,7 @@ void Core_initTunnel(String* desiredDeviceName,
uint8_t addressPrefix,
struct Ducttape* dt,
struct Log* logger,
- struct event_base* eventBase,
+ struct EventBase* eventBase,
struct Allocator* alloc,
struct Except* eh)
{
@@ -173,6 +177,12 @@ void Core_initTunnel(String* desiredDeviceName,
TUNConfigurator_setMTU(assignedTunName, DEFAULT_MTU, logger, eh);
}
+/** This is a response from a call which is intended only to send information to the angel. */
+static void angelResponse(Dict* resp, void* vNULL)
+{
+ // do nothing
+}
+
/*
* This process is started with 2 parameters, they must all be numeric in base 10.
* toAngel the pipe which is used to send data back to the angel process.
@@ -193,28 +203,46 @@ int Core_main(int argc, char** argv)
Except_raise(eh, -1, "This is internal to cjdns and shouldn't started manually.");
}
- struct Allocator* alloc = MallocAllocator_new(ALLOCATOR_FAILSAFE);
- struct Random* rand = Random_new(alloc, eh);
- alloc = CanaryAllocator_new(alloc, rand);
- struct EventBase* eventBase = EventBase_new(alloc);
+ struct Allocator* unsafeAlloc = MallocAllocator_new(ALLOCATOR_FAILSAFE);
+ struct Writer* logWriter = FileWriter_new(stderr, unsafeAlloc);
+ struct Log* preLogger = WriterLog_new(logWriter, unsafeAlloc);
+ struct EventBase* eventBase = EventBase_new(unsafeAlloc);
// -------------------- Setup the Pre-Logger ---------------------- //
- struct Writer* logWriter = FileWriter_new(stdout, alloc);
- struct Log* preLogger = WriterLog_new(logWriter, alloc);
- struct IndirectLog* indirectLogger = IndirectLog_new(alloc);
+ struct IndirectLog* indirectLogger = IndirectLog_new(unsafeAlloc);
indirectLogger->wrappedLog = preLogger;
struct Log* logger = &indirectLogger->pub;
+ // -------------------- Setup the PRNG ---------------------- //
+ struct Random* rand =
+ LibuvEntropyProvider_newDefaultRandom(eventBase, logger, eh, unsafeAlloc);
+
+ // -------------------- Setup Protected Allocator ---------------------- //
+ struct Allocator* alloc = CanaryAllocator_new(unsafeAlloc, rand);
+ struct Allocator* tempAlloc = Allocator_child(alloc);
+
+
// The first read inside of getInitialConfig() will begin it waiting.
struct PipeInterface* pi =
PipeInterface_new(fromAngel, toAngel, eventBase, logger, alloc, rand);
+ struct Hermes* hermes = Hermes_new(&pi->generic, eventBase, logger, alloc);
- Dict* config = getInitialConfig(&pi->generic, eventBase, alloc, eh);
+ Dict* config = getInitialConfig(&pi->generic, eventBase, tempAlloc, eh);
String* privateKeyHex = Dict_getString(config, String_CONST("privateKey"));
Dict* adminConf = Dict_getDict(config, String_CONST("admin"));
String* pass = Dict_getString(adminConf, String_CONST("pass"));
- if (!pass || !privateKeyHex) {
- Except_raise(eh, -1, "Expected 'pass' and 'privateKey' in configuration.");
+ String* bind = Dict_getString(adminConf, String_CONST("bind"));
+ if (!(pass && privateKeyHex && bind)) {
+ if (!pass) {
+ Except_raise(eh, -1, "Expected 'pass'");
+ }
+ if (!bind) {
+ Except_raise(eh, -1, "Expected 'bind'");
+ }
+ if (!privateKeyHex) {
+ Except_raise(eh, -1, "Expected 'privateKey'");
+ }
+ Except_raise(eh, -1, "Expected 'pass', 'privateKey' and 'bind' in configuration.");
}
Log_keys(logger, "Starting core with admin password [%s]", pass->bytes);
uint8_t privateKey[32];
@@ -224,18 +252,33 @@ int Core_main(int argc, char** argv)
Except_raise(eh, -1, "privateKey must be 64 bytes of hex.");
}
- struct Admin* admin = Admin_new(&pi->generic, alloc, logger, eventBase, pass);
+ struct Sockaddr_storage bindAddr;
+ if (Sockaddr_parse(bind->bytes, &bindAddr)) {
+ Except_raise(eh, -1, "bind address [%s] unparsable", bind->bytes);
+ }
+
+ struct AddrInterface* udpAdmin =
+ UDPAddrInterface_new(eventBase, &bindAddr.addr, alloc, eh, logger);
- Dict adminResponse = Dict_CONST(String_CONST("error"), String_OBJ(String_CONST("none")), NULL);
- Admin_sendMessageToAngel(&adminResponse, admin);
+ struct Admin* admin = Admin_new(udpAdmin, alloc, logger, eventBase, pass);
+
+ char* boundAddr = Sockaddr_print(udpAdmin->addr, tempAlloc);
+ Dict adminResponse = Dict_CONST(
+ String_CONST("bind"), String_OBJ(String_CONST(boundAddr)), NULL
+ );
+ Dict response = Dict_CONST(
+ String_CONST("error"), String_OBJ(String_CONST("none")), Dict_CONST(
+ String_CONST("admin"), Dict_OBJ(&adminResponse), NULL
+ ));
+ // This always times out because the angel doesn't respond.
+ Hermes_callAngel(&response, angelResponse, NULL, alloc, eh, hermes);
// --------------------- Setup the Logger --------------------- //
// the prelogger will nolonger be used.
struct Log* adminLogger = AdminLog_registerNew(admin, alloc, rand);
indirectLogger->wrappedLog =