Skip to content

Commit

Permalink
Lots of commenting and some output prep.
Browse files Browse the repository at this point in the history
  • Loading branch information
coryan committed Jun 19, 2017
1 parent 96abd72 commit 95c6728
Showing 1 changed file with 160 additions and 13 deletions.
173 changes: 160 additions & 13 deletions jb/itch5/moldfeedhandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
#include <jb/ehs/acceptor.hpp>
#include <jb/itch5/array_based_order_book.hpp>
#include <jb/itch5/generate_inside.hpp>
#include <jb/itch5/make_socket_udp_send.hpp>
#include <jb/itch5/mold_udp_channel.hpp>
#include <jb/itch5/process_iostream.hpp>
#include <jb/itch5/udp_receiver_config.hpp>
#include <jb/itch5/udp_sender_config.hpp>
#include <jb/fileio.hpp>
#include <jb/log.hpp>

#include <sstream>
Expand Down Expand Up @@ -54,6 +56,72 @@ std::unique_ptr<jb::itch5::mold_udp_channel> create_udp_channel(
return std::make_unique<jb::itch5::mold_udp_channel>(io, std::move(cb), cfg);
}

/// Define the type of order book used in the program.
using order_book = jb::itch5::order_book<jb::itch5::array_based_order_book>;

/// The output layer is composed of multiple instances of this
/// function type.
using output_function = std::function<void(
jb::itch5::message_header const& header, order_book const& updated_book,
jb::itch5::book_update const& update)>;

/// Create the output function for the file option.
output_function create_output_file(config const& cfg) {
// ... if there is no file option, just return a trivial output
// function ...
if (cfg.output_file() == "") {
return
[](jb::itch5::message_header const&, order_book const&,
jb::itch5::book_update const&) {};
}
// ... otherwise create an output iostream and use it ...
auto out = std::make_shared<boost::iostreams::filtering_ostream>();
;
jb::open_output_file(*out, cfg.output_file());
return [out](
jb::itch5::message_header const& header, order_book const& updated_book,
jb::itch5::book_update const& update) {
auto bid = updated_book.best_bid();
auto offer = updated_book.best_offer();
*out << header.timestamp.ts.count() << " " << header.stock_locate << " "
<< update.stock << " " << bid.first.as_integer() << " " << bid.second
<< " " << offer.first.as_integer() << " " << offer.second << "\n";
};
}

/// Create an output function for a single socket
output_function create_output_socket(
boost::asio::io_service& io, jb::itch5::udp_sender_config const& cfg) {
auto s = jb::itch5::make_socket_udp_send<>(io, cfg);
auto socket = std::make_shared<decltype(s)>(std::move(s));
return [socket, cfg](
jb::itch5::message_header const& header, order_book const& updated_book,
jb::itch5::book_update const& update) {
std::cout << "should send to " << cfg.address() << "\n";
};
}

/// Create a composite output function aggregating all the different
/// configured outputs
output_function
create_output_layer(boost::asio::io_service& io, config const& cfg) {
std::vector<output_function> outs;
outs.push_back(create_output_file(cfg));
for (auto const& outcfg : cfg.output()) {
if (outcfg.port() == 0 and outcfg.address() == "") {
continue;
}
outs.push_back(create_output_socket(io, outcfg));
}
return [outputs = std::move(outs)](
jb::itch5::message_header const& header, order_book const& updated_book,
jb::itch5::book_update const& update) {
for (auto f : outputs) {
f(header, updated_book, update);
}
};
}

} // anonymous namespace

#define KNOWN_ITCH5_MESSAGES \
Expand All @@ -73,49 +141,109 @@ std::unique_ptr<jb::itch5::mold_udp_channel> create_udp_channel(
jb::itch5::system_event_message, jb::itch5::trade_message

int main(int argc, char* argv[]) try {
// All JayBeam programs read their configuration from a YAML file,
// the values can be overriden by the command-line arguments, but it
// is not recommended to set all the values via command-line flags
// ...
// TODO() - make it possible to read the YAML file from a etcd
// path. That way we can keep all the configurations in a single
// place ...
config cfg;
cfg.load_overrides(
argc, argv, std::string("moldfeedhandler.yaml"), "JB_ROOT");
jb::log::init(cfg.log());

// ... this program basically has a single control loop. A future
// version should separate performance critical code to its own
// threads with their own io_service ...
boost::asio::io_service io;

// ... define the classes used to build the book ...
using compute_book =
jb::itch5::compute_book<jb::itch5::array_based_order_book>;
using order_book = jb::itch5::order_book<jb::itch5::array_based_order_book>;
auto cb =
[](jb::itch5::message_header const& header,
order_book const& updated_book, jb::itch5::book_update const& update) {
std::cout << update.stock << " " << update.buy_sell_indicator << "\n";
};

compute_book handler(std::move(cb), cfg.book());
// ... the data path is implemented as a series of stages, each one
// calls the next using lambdas. The last lambda to be called --
// where the data is sent to a file or a socket -- is the first to
// be constructed ...
// TODO() - actually output the messages to UDP sockets and files
// TODO() - run a master election via etcd and only output to
// sockets if this is the master
auto output_layer = create_output_layer(io, cfg);

auto process_buffer = [&handler](
// ... here we should have a layer to arbitrage between the ITCH-5.x
// feed and the UQDF/CQS feeds. Normally ITCH-5.x is a better feed,
// richer data, more accurate, and lower latency. But the ITCH-5.x
// feed depends on never losing a message. When you do, there are
// multiple alternatives (e.g. requesting a retransmission from the
// exchange, using a sync+tell feed from the exchange). We propose
// to fallback to the UQDF/CQS feeds, which are stateless. The
// recovery using those feeds is almost immediate.
// The ITCH-5.x book can be cleared and rebuilt using only new
// messages, for most tickers the freshly constructed book is
// accurate enough within seconds. Switching back to ITCH-5.x after
// falling back to UQDF/CQS will require detecting when the two
// feeds are synchronized again ...
// TODO() - implement all the fallback / recovery complexity ...

// ... in this layer we compute the book, i.e., assemble the list of
// orders received from the feed into a quantity at each price level
// ...
compute_book book_build_layer(std::move(output_layer), cfg.book());

// ... in this layer we decode the raw ITCH messages into objects
// that can be more easily manipulated ...
// TODO() - we need to break out the non-book-building messages and
// bypass the book_build_layer for them, send them directly to the
// output layer. Or maybe have a separate output layer for
// non-book-build messages, which can be running at lower priority
// ...
auto itch_decoding_layer = [&book_build_layer](
std::chrono::steady_clock::time_point recv_ts, std::uint64_t msgcnt,
std::size_t msgoffset, char const* msgbuf, std::size_t msglen) {
jb::itch5::process_buffer_mlist<compute_book, KNOWN_ITCH5_MESSAGES>::
process(handler, recv_ts, msgcnt, msgoffset, msgbuf, msglen);
process(book_build_layer, recv_ts, msgcnt, msgoffset, msgbuf, msglen);
};

/// ... here we are missing a layer to arbitrage between the two UDP
// message sources, something like ...
// auto sequencing_layer = [&itch_decoding_layer](...) {};
// TODO() - we need to refactor the mold_udp_channel class to
// support multiple input sockets and to handle out-of-order,
// duplicate, and gaps in the message stream.
auto primary = create_udp_channel(io, process_buffer, cfg.primary());
auto data_source_layer =
create_udp_channel(io, itch_decoding_layer, cfg.primary());

// ... that was it for the critical data path. There are several
// TODO() entries there ...

// In this section we create the control and monitoring path for the
// application. The control and monitoring path is implemented by a
// HTTP server that responds to simple GET requests. Adding new
// control methods is easy, as we will see ...
// TODO() - this should be refactored to a "application" class, they
// are very repetitive. We need to solve the counter problem first.
using endpoint = boost::asio::ip::tcp::endpoint;
using address = boost::asio::ip::address;
endpoint ep{address::from_string(cfg.control_host()), cfg.control_port()};

// ... create a dispatcher to process the HTTP requests, register
// some basic handlers ...
// ... the request and response types are simple in-memory strings,
// this is suitable for our purposes as the payloads will generally
// be small ...
using jb::ehs::request_type;
using jb::ehs::response_type;
// ... this object keeps track of all the handlers for each "path"
// in the HTTP request. The application registers a number of
// handlers for monitoring and control ...
auto dispatcher = std::make_shared<jb::ehs::request_dispatcher>("moldreplay");
// ... the root serves simply as a indication that the server is
// running ...
dispatcher->add_handler("/", [](request_type const&, response_type& res) {
res.fields.insert("Content-type", "text/plain");
res.body = "Server running...\r\n";
});
// ... this prints out the system configuration (command-line
// parameters and the YAML configuration file), in YAML format ...
dispatcher->add_handler(
"/config", [cfg](request_type const&, response_type& res) {
res.fields.insert("Content-type", "text/plain");
Expand All @@ -125,6 +253,13 @@ int main(int argc, char* argv[]) try {
});
// ... we need to use a weak_ptr to avoid a cycle of shared_ptr ...
std::weak_ptr<jb::ehs::request_dispatcher> disp = dispatcher;
// ... this handler collects the metrics and reports them in human
// readable form ...
// TODO() - we need a separate handler to serve the metrics in
// protobuf form for efficiency
// TODO() - once we solve the counter problem we should show the
// counter values here, not just whatever the dispatcher collects
// about itself
dispatcher->add_handler(
"/metrics", [disp](request_type const&, response_type& res) {
std::shared_ptr<jb::ehs::request_dispatcher> d(disp);
Expand Down Expand Up @@ -182,7 +317,9 @@ config::config()
desc("primary"), this, jb::itch5::udp_receiver_config()
.address(defaults::mold_address)
.port(defaults::mold_port))
, secondary(desc("secondary"), this)
, secondary(
desc("secondary"), this,
jb::itch5::udp_receiver_config().address(defaults::mold_address))
, output_file(
desc("output-file")
.help(
Expand Down Expand Up @@ -229,6 +366,16 @@ void config::validate() const {
"Either the primary or secondary receiving address must be configured.",
1);
}
int cnt = 0;
for (auto const& outcfg : output()) {
if ((outcfg.port() != 0 and outcfg.address() == "") or
(outcfg.port() == 0 and outcfg.address() != "")) {
std::ostringstream os;
os << "Partially configured output socket #" << cnt << " ("
<< outcfg.address() << " / " << outcfg.port();
throw jb::usage(os.str(), 1);
}
}
log().validate();
}

Expand Down

0 comments on commit 95c6728

Please sign in to comment.