Skip to content

Commit

Permalink
Full command forwarding (#75)
Browse files Browse the repository at this point in the history
  • Loading branch information
jumaffre committed May 20, 2019
1 parent 9594cf1 commit cea3ae6
Show file tree
Hide file tree
Showing 25 changed files with 507 additions and 280 deletions.
6 changes: 0 additions & 6 deletions CMakeLists.txt
Expand Up @@ -43,12 +43,6 @@ if(RECORD_TRACE)
add_definitions(-DNO_COMMITTED_TX_HISTORY)
endif()

option(RPC_FORWARD_TO_LEADER "Followers verify signatures and forward RPC to leader" OFF)
if (RPC_FORWARD_TO_LEADER)
add_definitions(-DRPC_FORWARD_TO_LEADER)
set(TEST_FORWARD_TO_LEADER "--leader-forwarding")
endif()

option(BUILD_SMALLBANK "Build SmallBank sample app and clients" ON)

# MemberClient executable
Expand Down
32 changes: 18 additions & 14 deletions src/apps/luageneric/luageneric_test.cpp
Expand Up @@ -114,13 +114,15 @@ template <typename F, typename K, typename V>
void check_store_load(F frontend, K k, V v)
{
const Cert u0 = {0};
enclave::RpcContext rpc_ctx(0, u0);

// store
const auto pc0 = make_pc("store", {{"k", k}, {"v", v}});
check_success(frontend->process(u0, pc0), true);
check_success(frontend->process(rpc_ctx, pc0), true);

// load and check that we get the right result
const auto pc1 = make_pc("load", {{"k", k}});
check_success(frontend->process(u0, pc1), v);
check_success(frontend->process(rpc_ctx, pc1), v);
}

TEST_CASE("simple lua apps")
Expand All @@ -130,6 +132,7 @@ TEST_CASE("simple lua apps")
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, notifier, 1, 3);
const Cert u0 = {0};
enclave::RpcContext rpc_ctx(0, u0);

SUBCASE("echo")
{
Expand All @@ -149,7 +152,7 @@ TEST_CASE("simple lua apps")
// call "echo" function with "hello"
const string verb = "hello";
const auto pc = make_pc("echo", {{"verb", verb}});
check_success(frontend->process(u0, pc), verb);
check_success(frontend->process(rpc_ctx, pc), verb);
}

SUBCASE("store/load different types in generic table")
Expand Down Expand Up @@ -191,7 +194,7 @@ TEST_CASE("simple lua apps")

// (3) attempt to read non-existing key (set of integers)
const auto pc = make_pc("load", {{"k", set{5, 6, 7}}});
check_error(frontend->process(u0, pc), ErrorCodes::INVALID_PARAMS);
check_error(frontend->process(rpc_ctx, pc), ErrorCodes::INVALID_PARAMS);
}

SUBCASE("access gov tables")
Expand Down Expand Up @@ -223,12 +226,12 @@ TEST_CASE("simple lua apps")
map<string, MemberInfo> expected = {{"0", {MemberStatus::ACTIVE}},
{"1", {MemberStatus::ACTIVE}},
{"2", {MemberStatus::ACTIVE}}};
check_success(frontend->process(u0, pc), expected);
check_success(frontend->process(rpc_ctx, pc), expected);

// (2) try to write to members table
const auto pc1 = make_pc(
"put_member", {{"k", 99}, {"v", MemberInfo{MemberStatus::ACTIVE}}});
check_error(frontend->process(u0, pc1), ErrorCodes::SCRIPT_ERROR);
check_error(frontend->process(rpc_ctx, pc1), ErrorCodes::SCRIPT_ERROR);
}
}

Expand All @@ -239,6 +242,7 @@ TEST_CASE("simple bank")
// create network with 1 user and 3 active members
auto frontend = init_frontend(network, notifier, 1, 3);
const Cert u0 = {0};
enclave::RpcContext rpc_ctx(0, u0);

constexpr auto app = R"xxx(
tables, gov_tables, caller_id, method, params = ...
Expand Down Expand Up @@ -293,34 +297,34 @@ TEST_CASE("simple bank")

{
const auto pc = make_pc("SB_create", {{"dst", 1}, {"amt", 123}});
check_success<string>(frontend->process(u0, pc), "OK");
check_success<string>(frontend->process(rpc_ctx, pc), "OK");

const auto pc1 = make_pc("SB_read", {{"account", 1}});
check_success(frontend->process(u0, pc1), 123);
check_success(frontend->process(rpc_ctx, pc1), 123);
}

{
const auto pc = make_pc("SB_create", {{"dst", 2}, {"amt", 999}});
check_success<string>(frontend->process(u0, pc), "OK");
check_success<string>(frontend->process(rpc_ctx, pc), "OK");

const auto pc1 = make_pc("SB_read", {{"account", 2}});
check_success(frontend->process(u0, pc1), 999);
check_success(frontend->process(rpc_ctx, pc1), 999);
}

{
const auto pc = make_pc("SB_read", {{"account", 3}});
check_error(frontend->process(u0, pc), ErrorCodes::INVALID_PARAMS);
check_error(frontend->process(rpc_ctx, pc), ErrorCodes::INVALID_PARAMS);
}

{
const auto pc =
make_pc("SB_transfer", {{"src", 1}, {"dst", 2}, {"amt", 5}});
check_success<string>(frontend->process(u0, pc), "OK");
check_success<string>(frontend->process(rpc_ctx, pc), "OK");

const auto pc1 = make_pc("SB_read", {{"account", 1}});
check_success(frontend->process(u0, pc1), 123 - 5);
check_success(frontend->process(rpc_ctx, pc1), 123 - 5);

const auto pc2 = make_pc("SB_read", {{"account", 2}});
check_success(frontend->process(u0, pc2), 999 + 5);
check_success(frontend->process(rpc_ctx, pc2), 999 + 5);
}
}
28 changes: 16 additions & 12 deletions src/enclave/enclave.h
Expand Up @@ -11,6 +11,7 @@
#include "node/nodestate.h"
#include "node/nodetypes.h"
#include "node/notifier.h"
#include "node/rpc/forwarder.h"
#include "node/rpc/managementfrontend.h"
#include "node/rpc/memberfrontend.h"
#include "node/rpc/nodefrontend.h"
Expand All @@ -28,6 +29,7 @@ namespace enclave
ccf::NetworkState network;
ccf::NodeState node;
std::shared_ptr<ccf::NodeToNode> n2n_channels;
std::shared_ptr<ccf::Forwarder> cmd_forwarder;
ccf::Notifier notifier;
std::shared_ptr<RpcMap> rpc_map;
bool recover = false;
Expand All @@ -38,8 +40,9 @@ namespace enclave
writer_factory(circuit, config->writer_config),
rpcsessions(writer_factory),
n2n_channels(std::make_shared<ccf::NodeToNode>(writer_factory)),
node(writer_factory, network, rpcsessions, notifier),
notifier(writer_factory)
node(writer_factory, network, rpcsessions),
notifier(writer_factory),
cmd_forwarder(std::make_shared<ccf::Forwarder>(rpcsessions, n2n_channels))
{
rpc_map = std::make_shared<RpcMap>();
rpc_map->emplace(
Expand All @@ -62,14 +65,20 @@ namespace enclave
frontend->set_sig_intervals(
config->signature_intervals.sig_max_tx,
config->signature_intervals.sig_max_ms);
frontend->set_n2n_channels(n2n_channels);

// TODO: All frontends should be able to forward to/be forwarded to.
if (r.first == ccf::Actors::USERS)
{
frontend->set_cmd_forwarder(cmd_forwarder);
}
}

logger::config::msg() = AdminMessage::log_msg;
logger::config::writer() = writer_factory.create_writer_to_outside();

node.initialize(config->raft_config, n2n_channels);
rpcsessions.initialize(rpc_map);
cmd_forwarder->initialize(rpc_map);
}

bool create_node(
Expand Down Expand Up @@ -132,22 +141,17 @@ namespace enclave

DISPATCHER_SET_MESSAGE_HANDLER(
bp, ccf::node_inbound, [this](const uint8_t* data, size_t size) {
auto [body] =
const auto [body] =
ringbuffer::read_message<ccf::node_inbound>(data, size);

const auto& body_ = body;
auto p = body_.data();
auto psize = body_.size();
auto p = body.data();
auto psize = body.size();

if (
serialized::peek<ccf::NodeMsgType>(p, psize) ==
ccf::NodeMsgType::forwarded_msg)
{
serialized::skip(p, psize, sizeof(ccf::NodeMsgType));
LOG_DEBUG << "RPC forwarded: " << ccf::Actors::USERS << std::endl;

rpc_map->at(std::string(ccf::Actors::USERS))
->process_forwarded(p, psize);
cmd_forwarder->recv_message(p, psize);
}
else
{
Expand Down
21 changes: 19 additions & 2 deletions src/enclave/rpcendpoint.h
Expand Up @@ -14,6 +14,7 @@ namespace enclave
private:
std::shared_ptr<RpcMap> rpc_map;
std::shared_ptr<RpcHandler> handler;
size_t session_id;
CBuffer caller;

public:
Expand All @@ -23,7 +24,8 @@ namespace enclave
ringbuffer::AbstractWriterFactory& writer_factory,
std::unique_ptr<tls::Context> ctx) :
FramedTLSEndpoint(session_id, writer_factory, move(ctx)),
rpc_map(rpc_map_)
rpc_map(rpc_map_),
session_id(session_id)
{}

bool handle_data(const std::vector<uint8_t>& data)
Expand All @@ -42,7 +44,22 @@ namespace enclave
caller = peer_cert();
}

send(handler->process(caller, data));
// Create a new RPC context for each command since some may require
// forwarding to the leader.
RpcContext rpc_ctx(session_id, caller);
auto rep = handler->process(rpc_ctx, data);

if (rpc_ctx.is_forwarded)
{
// If the RPC has been forwarded, hold the connection.
return true;
}
else
{
// Otherwise, reply to the client synchronously.
send(rep);
}

return true;
}
};
Expand Down
22 changes: 17 additions & 5 deletions src/enclave/rpchandler.h
@@ -1,24 +1,36 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once
#include "../ds/buffer.h"
#include "ds/buffer.h"

#include <chrono>
#include <limits>
#include <stdint.h>
#include <vector>

namespace enclave
{
static constexpr size_t InvalidSessionId = std::numeric_limits<size_t>::max();

struct RpcContext
{
const size_t session_id;
const CBuffer caller;
bool is_forwarded = false;

RpcContext(size_t session_id_, CBuffer caller_) :
session_id(session_id_),
caller(caller_)
{}
};

class RpcHandler
{
public:
virtual ~RpcHandler() {}

virtual std::vector<uint8_t> process(
CBuffer caller, const std::vector<uint8_t>& input) = 0;

virtual std::vector<uint8_t> process_forwarded(
const uint8_t* data, size_t size) = 0;
RpcContext& rpc_ctx, const std::vector<uint8_t>& input) = 0;

virtual void tick(std::chrono::milliseconds elapsed_ms_count) {}
};
Expand Down
14 changes: 14 additions & 0 deletions src/enclave/rpcsessions.h
Expand Up @@ -78,6 +78,20 @@ namespace enclave
sessions.insert(std::make_pair(id, std::move(session)));
}

void reply_forwarded(size_t id, const std::vector<uint8_t>& data)
{
std::lock_guard<SpinLock> guard(lock);

auto search = sessions.find(id);
if (search == sessions.end())
{
throw std::logic_error(
"reply forwarded for unknown session: " + std::to_string(id));
}

search->second->send(data);
}

void remove_session(size_t id)
{
std::lock_guard<SpinLock> guard(lock);
Expand Down
4 changes: 2 additions & 2 deletions src/enclave/tlsendpoint.h
Expand Up @@ -153,8 +153,8 @@ namespace enclave
// MBEDTLS_ERR_SSL_WANT_READ. Probably hit a size limit - try again
if (exact && (total < up_to))
{
LOG_INFO << "Asked for exactly " << up_to << ", received " << total
<< ", retrying" << std::endl;
LOG_DEBUG << "Asked for exactly " << up_to << ", received " << total
<< ", retrying" << std::endl;
read_buffer = move(data);
return read(up_to, exact);
}
Expand Down
2 changes: 1 addition & 1 deletion src/node/channels.h
Expand Up @@ -112,7 +112,7 @@ namespace ccf
const GcmHdr& header, CBuffer aad, CBuffer cipher, Buffer plain)
{
if (status != ESTABLISHED)
throw std::logic_error("Channel is not established for encrypting");
throw std::logic_error("Channel is not established for decrypting");

return key->decrypt(header.getIv(), header.tag, cipher, aad, plain.p);
}
Expand Down
13 changes: 4 additions & 9 deletions src/node/nodestate.h
Expand Up @@ -19,7 +19,6 @@
#include "kv/replicator.h"
#include "networkstate.h"
#include "nodetonode.h"
#include "notifier.h"
#include "rpc/consts.h"
#include "rpc/frontend.h"
#include "rpc/serialization.h"
Expand Down Expand Up @@ -126,7 +125,6 @@ namespace ccf
enclave::RPCSessions& rpcsessions;
std::shared_ptr<kv::TxHistory> history;
std::shared_ptr<kv::AbstractTxEncryptor> encryptor;
Notifier& notifier;

//
// join protocol
Expand Down Expand Up @@ -160,15 +158,13 @@ namespace ccf
NodeState(
ringbuffer::AbstractWriterFactory& writer_factory,
NetworkState& network,
enclave::RPCSessions& rpcsessions,
Notifier& notifier) :
enclave::RPCSessions& rpcsessions) :
sm(State::uninitialized),
self(INVALID_ID),
writer_factory(writer_factory),
to_host(writer_factory.create_writer_to_outside()),
network(network),
rpcsessions(rpcsessions),
notifier(notifier)
rpcsessions(rpcsessions)
{
::EverCrypt_AutoConfig2_init();
}
Expand Down Expand Up @@ -880,14 +876,14 @@ namespace ccf
#endif
}

bool node_msg(const std::vector<uint8_t>& data)
void node_msg(const std::vector<uint8_t>& data)
{
// Only process messages once part of network
if (
!sm.check(State::partOfNetwork) &&
!sm.check(State::partOfPublicNetwork))
{
return false;
return;
}

auto p = data.data();
Expand All @@ -912,7 +908,6 @@ namespace ccf
default:
{}
}
return true;
}

bool pbft_msg(const uint8_t* data, size_t size)
Expand Down

0 comments on commit cea3ae6

Please sign in to comment.