Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
321 lines (275 sloc) 9.71 KB
use std::cmp::{max, min};
use std::collections::hash_map::{DefaultHasher, HashMap};
use std::fs::File;
use std::hash::Hasher;
use std::io::Error;
use std::io::{BufRead, Read};
use std::path::Path;
use std::str::from_utf8_unchecked;
use std::time::{Instant, SystemTime};
use clap::{App, Arg};
use hdrhistogram::Histogram;
use nom::{bytes::complete::take_until, IResult};
use crate::iex::{IexParser, IexPayload};
// Cap'n'Proto and Flatbuffers typically ask that you generate code on the fly to match
// the schemas. For purposes of auto-complete and easy browsing in the repository,
// we generate the code and just copy it into the src/ tree.
pub mod marketdata_capnp;
#[allow(unused_imports)]
pub mod marketdata_generated; // Flatbuffers
#[allow(dead_code)]
pub mod marketdata_sbe;
mod capnp_runner;
mod flatbuffers_runner;
mod iex;
mod parsers;
mod sbe_runner;
fn main() {
let matches = App::new("Marketdata Shootout")
.arg(
Arg::with_name("file")
.short("f")
.long("file")
.value_name("FILE")
.help("IEX DEEP file to process")
.required(true)
.takes_value(true),
)
.get_matches();
let deep = matches.value_of("file").unwrap();
let path = Path::new(deep);
let mut file = File::open(path).expect(&format!("Unable to open file={}", path.display()));
let mut buf = Vec::new();
file.read_to_end(&mut buf)
.expect(&format!("Unable to read file={}", path.display()));
let analysis_start = SystemTime::now();
let capnp_unpacked = run_analysis(
&buf,
&mut capnp_runner::CapnpWriter::new(false),
&mut capnp_runner::CapnpReader::new(false),
);
let analysis_end = SystemTime::now()
.duration_since(analysis_start)
.unwrap()
.as_secs();
println!("Cap'n Proto Unpacked total time={}s", analysis_end);
println!("Cap'n Proto Unpacked:\n{}\n", capnp_unpacked.timing_stats());
let analysis_start = SystemTime::now();
let capnp_packed = run_analysis(
&buf,
&mut capnp_runner::CapnpWriter::new(true),
&mut capnp_runner::CapnpReader::new(true),
);
let analysis_end = SystemTime::now()
.duration_since(analysis_start)
.unwrap()
.as_secs();
println!("Cap'n Proto Packed total time={}s", analysis_end);
assert_eq!(capnp_unpacked.summary_stats, capnp_packed.summary_stats);
println!("Cap'n Proto Packed:\n{}\n", capnp_packed.timing_stats());
let analysis_start = SystemTime::now();
let flatbuffers = run_analysis(
&buf,
&mut flatbuffers_runner::FlatbuffersWriter::new(),
&mut flatbuffers_runner::FlatbuffersReader::new(),
);
let analysis_end = SystemTime::now()
.duration_since(analysis_start)
.unwrap()
.as_secs();
println!("Flatbuffers total time={}s", analysis_end);
assert_eq!(capnp_packed.summary_stats, flatbuffers.summary_stats);
println!("Flatbuffers:\n{}\n", flatbuffers.timing_stats());
let analysis_start = SystemTime::now();
let sbe = run_analysis(
&buf,
&mut sbe_runner::SBEWriter::new(),
&mut sbe_runner::SBEReader::new(),
);
let analysis_end = SystemTime::now()
.duration_since(analysis_start)
.unwrap()
.as_secs();
println!("SBE total time={}s", analysis_end);
assert_eq!(flatbuffers.summary_stats, sbe.summary_stats);
println!("SBE:\n{}\n", sbe.timing_stats());
}
#[derive(Debug, PartialEq)]
pub struct SummaryStats {
symbol: String,
trade_volume: u64,
bid_high: u64,
bid_low: u64,
ask_high: u64,
ask_low: u64,
}
#[derive(Default, Debug, PartialEq)]
pub struct Summarizer {
data: HashMap<u64, SummaryStats>,
}
impl Summarizer {
fn entry(&mut self, sym: &str) -> &mut SummaryStats {
let mut hasher = DefaultHasher::new();
hasher.write(sym.as_bytes());
self.data.entry(hasher.finish()).or_insert(SummaryStats {
symbol: sym.to_string(),
trade_volume: 0,
bid_high: 0,
bid_low: u64::max_value(),
ask_high: 0,
ask_low: u64::max_value(),
})
}
pub fn append_trade_volume(&mut self, sym: &str, volume: u64) {
self.entry(sym).trade_volume += volume;
}
pub fn update_quote_prices(&mut self, sym: &str, price: u64, is_buy: bool) {
let entry = self.entry(sym);
if is_buy {
entry.bid_low = min(entry.bid_low, price);
entry.bid_high = max(entry.bid_high, price);
} else {
entry.ask_low = min(entry.ask_low, price);
entry.ask_high = max(entry.ask_high, price);
}
}
}
pub struct StreamVec {
pos: usize,
inner: Vec<u8>,
}
impl StreamVec {
pub fn new(buf: Vec<u8>) -> StreamVec {
StreamVec { pos: 0, inner: buf }
}
}
impl Read for StreamVec {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
// TODO: There's *got* to be a better way to handle this
let end = self.pos + buf.len();
let end = if end > self.inner.len() {
self.inner.len()
} else {
end
};
let read_size = end - self.pos;
buf[..read_size].copy_from_slice(&self.inner[self.pos..end]);
self.pos = end;
Ok(read_size)
}
}
impl BufRead for StreamVec {
fn fill_buf(&mut self) -> Result<&[u8], Error> {
Ok(&self.inner[self.pos..])
}
fn consume(&mut self, amt: usize) {
self.pos += amt;
}
}
trait RunnerSerialize {
fn serialize(&mut self, payload: &IexPayload, output: &mut Vec<u8>);
}
trait RunnerDeserialize {
fn deserialize<'a>(&self, buf: &'a mut StreamVec, stats: &mut Summarizer) -> Result<(), ()>;
}
fn __take_until<'a>(tag: &'static str, input: &'a [u8]) -> IResult<&'a [u8], &'a [u8]> {
take_until(tag)(input)
}
fn parse_symbol(sym: &[u8; 8]) -> &str {
// TODO: Use the `jetscii` library for all that SIMD goodness
// IEX guarantees ASCII, so we're fine using an unsafe conversion
let (_, sym_bytes) = __take_until(" ", &sym[..]).unwrap();
unsafe { from_utf8_unchecked(sym_bytes) }
}
struct RunAnalysis {
serialize_hist: Histogram<u64>,
deserialize_hist: Histogram<u64>,
summary_stats: Summarizer,
serialize_total_nanos: u128,
deserialize_total_nanos: u128,
buf_len: usize,
}
impl RunAnalysis {
fn timing_stats(&self) -> String {
format!(
concat!(
" serialize_50={}ns\n",
" serialize_99={}ns\n",
" serialize_999={}ns\n",
" deserialize_50={}ns\n",
" deserialize_99={}ns\n",
" deserialize_999={}ns\n",
" serialize_total={}ns\n",
" deserialize_total={}ns\n",
" write_len={}b"
),
self.serialize_hist.value_at_quantile(0.5),
self.serialize_hist.value_at_quantile(0.99),
self.serialize_hist.value_at_quantile(0.999),
self.deserialize_hist.value_at_quantile(0.5),
self.deserialize_hist.value_at_quantile(0.99),
self.deserialize_hist.value_at_quantile(0.999),
self.serialize_total_nanos,
self.deserialize_total_nanos,
self.buf_len
)
}
}
fn run_analysis<S, D>(iex_data: &Vec<u8>, serializer: &mut S, deserializer: &mut D) -> RunAnalysis
where
S: RunnerSerialize,
D: RunnerDeserialize,
{
let iex_parser = IexParser::new(iex_data);
let mut output_buf = Vec::with_capacity(iex_data.len());
// As things stand, the histogram could reallocate, but because that happens outside
// the measurement critical path, not too worried.
let mut serialize_hist = Histogram::<u64>::new(2).unwrap();
let mut serialize_nanos_total = 0u128;
let mut serialize_msgs = 0;
for iex_payload in iex_parser {
let output_len_start = output_buf.len();
let serialize_start = Instant::now();
serializer.serialize(&iex_payload, &mut output_buf);
let serialize_end = Instant::now().duration_since(serialize_start).as_nanos();
serialize_hist.record(serialize_end as u64).unwrap();
serialize_nanos_total += serialize_end;
// If the IEX payload is made up of messages we don't care about
// (a multi-message containing nothing but SystemEvent for example),
// Cap'n Proto doesn't write anything into the output buffer.
// As such, only increment `serialize_msgs` when something was written
// so that the read/write counts line up.
let write_size = output_buf.len() - output_len_start;
if write_size != 0 {
serialize_msgs += 1;
}
}
let output_len = output_buf.len();
let mut read_buf = StreamVec::new(output_buf);
let mut summarizer = Summarizer::default();
let mut deserialize_hist = Histogram::<u64>::new(2).unwrap();
let mut parsed_msgs = 0usize;
let mut deserialize_nanos_total = 0u128;
loop {
let deserialize_start = Instant::now();
let res = deserializer.deserialize(&mut read_buf, &mut summarizer);
let deserialize_end = Instant::now().duration_since(deserialize_start).as_nanos();
if res.is_ok() {
deserialize_hist.record(deserialize_end as u64).unwrap();
deserialize_nanos_total += deserialize_end;
parsed_msgs += 1;
} else {
break;
}
}
assert_eq!(serialize_msgs, parsed_msgs);
//dbg!(serialize_all);
RunAnalysis {
serialize_hist,
deserialize_hist,
summary_stats: summarizer,
serialize_total_nanos: serialize_nanos_total,
deserialize_total_nanos: deserialize_nanos_total,
buf_len: output_len,
}
}
You can’t perform that action at this time.