From 7c0d86cbcecadb67fce623b8e6218154d1f6d04e Mon Sep 17 00:00:00 2001 From: Carter Green Date: Tue, 10 Mar 2026 16:55:23 -0500 Subject: [PATCH 1/5] MOD: Change skip repr to match variant name --- CHANGELOG.md | 5 +++++ src/enums.cpp | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 405669b..73257f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## 0.52.0 - TBD + +### Enhancements +- Changed `SlowReaderBehavior::Skip` to send "skip" instead of "drop" to the gateway + ## 0.51.0 - 2026-03-17 ### Enhancements diff --git a/src/enums.cpp b/src/enums.cpp index 8fd3148..d3ab534 100644 --- a/src/enums.cpp +++ b/src/enums.cpp @@ -114,7 +114,7 @@ const char* ToString(SlowReaderBehavior slow_reader_behavior) { return "warn"; } case SlowReaderBehavior::Skip: { - return "drop"; + return "skip"; } default: { return "Unknown"; From 21ddca3ea976897a3d51d863104340648679d146 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Fri, 13 Mar 2026 15:06:20 -0500 Subject: [PATCH 2/5] ADD: Add pull-based TimeseriesGetRange overloads --- CHANGELOG.md | 6 ++ CMakeLists.txt | 2 +- README.md | 22 ++--- cmake/SourcesAndHeaders.cmake | 7 +- examples/historical/readme.cpp | 18 ++-- examples/historical/timeseries_get_range.cpp | 15 ++- .../timeseries_get_range_to_file.cpp | 11 +-- include/databento/dbn_file_store.hpp | 39 +------- include/databento/dbn_store.hpp | 43 ++++++++ include/databento/detail/http_client.hpp | 4 + .../databento/detail/http_stream_reader.hpp | 26 +++++ include/databento/historical.hpp | 73 +++++++++----- src/batch.cpp | 16 +-- src/datetime.cpp | 10 +- src/dbn.cpp | 24 +++-- src/{dbn_file_store.cpp => dbn_store.cpp} | 27 +++--- src/detail/buffer.cpp | 2 +- src/detail/dbn_buffer_decoder.cpp | 2 +- src/detail/http_client.cpp | 41 +++++++- src/detail/http_stream_reader.cpp | 44 +++++++++ src/detail/http_stream_reader.hpp | 26 +++++ src/{ => detail}/stream_op_helper.hpp | 4 +- src/flag_set.cpp | 4 +- src/historical.cpp | 64 ++++++++++-- src/log.cpp | 4 +- src/metadata.cpp | 24 ++--- src/record.cpp | 97 +++++++++++-------- src/symbology.cpp | 16 +-- src/v1.cpp | 26 ++--- src/v2.cpp | 6 +- tests/include/mock/mock_http_server.hpp | 5 +- tests/src/historical_tests.cpp | 54 +++++++++++ tests/src/stream_op_helper_tests.cpp | 6 +- 33 files changed, 539 insertions(+), 229 deletions(-) create mode 100644 include/databento/dbn_store.hpp create mode 100644 include/databento/detail/http_stream_reader.hpp rename src/{dbn_file_store.cpp => dbn_store.cpp} (51%) create mode 100644 src/detail/http_stream_reader.cpp create mode 100644 src/detail/http_stream_reader.hpp rename src/{ => detail}/stream_op_helper.hpp (98%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73257f7..72792da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,12 @@ ### Enhancements - Added support for `progress` field in `BatchJob` response +- Added blocking (pull-based) `TimeseriesGetRange` overloads that return a `DbnStore` + with `GetMetadata()` and `NextRecord()` methods, unifying the consumption pattern + across historical, live, and file sources +- Generalized `DbnFileStore` into `DbnStore` which accepts any `IReadable` source. + `DbnFileStore` is retained as a type alias for backwards compatibility +- Upgraded default `httplib` version to 0.37.2 ## 0.50.0 - 2026-03-03 diff --git a/CMakeLists.txt b/CMakeLists.txt index 06cedcc..01373d6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -178,7 +178,7 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_HTTPLIB) find_package(httplib REQUIRED) endif() else() - set(httplib_version 0.30.1) + set(httplib_version 0.37.2) FetchContent_Declare( httplib URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v${httplib_version}.tar.gz diff --git a/README.md b/README.md index d9c8410..4fbe1fc 100644 --- a/README.md +++ b/README.md @@ -138,19 +138,15 @@ namespace db = databento; int main() { auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build(); - db::TsSymbolMap symbol_map; - auto decode_symbols = [&symbol_map](const db::Metadata& metadata) { - symbol_map = metadata.CreateSymbolMap(); - }; - auto print_trades = [&symbol_map](const db::Record& record) { - const auto& trade_msg = record.Get(); - std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg - << '\n'; - return db::KeepGoing::Continue; - }; - client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, - {"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol, - db::SType::InstrumentId, {}, decode_symbols, print_trades); + auto store = client.TimeseriesGetRange( + "GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, + {"ESM2", "NQZ2"}, db::Schema::Trades); + auto symbol_map = store.GetMetadata().CreateSymbolMap(); + while (const auto* record = store.NextRecord()) { + const auto& trade_msg = record->Get(); + std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " + << trade_msg << '\n'; + } } ``` diff --git a/cmake/SourcesAndHeaders.cmake b/cmake/SourcesAndHeaders.cmake index bbe24ca..a7b917f 100644 --- a/cmake/SourcesAndHeaders.cmake +++ b/cmake/SourcesAndHeaders.cmake @@ -7,6 +7,7 @@ set(headers include/databento/dbn_decoder.hpp include/databento/dbn_encoder.hpp include/databento/dbn_file_store.hpp + include/databento/dbn_store.hpp include/databento/detail/buffer.hpp include/databento/detail/dbn_buffer_decoder.hpp include/databento/detail/http_client.hpp @@ -39,7 +40,8 @@ set(headers include/databento/v2.hpp include/databento/v3.hpp include/databento/with_ts_out.hpp - src/stream_op_helper.hpp + src/detail/http_stream_reader.hpp + src/detail/stream_op_helper.hpp ) set(sources @@ -49,10 +51,11 @@ set(sources src/dbn_constants.hpp src/dbn_decoder.cpp src/dbn_encoder.cpp - src/dbn_file_store.cpp + src/dbn_store.cpp src/detail/buffer.cpp src/detail/dbn_buffer_decoder.cpp src/detail/http_client.cpp + src/detail/http_stream_reader.cpp src/detail/json_helpers.cpp src/detail/live_connection.cpp src/detail/scoped_fd.cpp diff --git a/examples/historical/readme.cpp b/examples/historical/readme.cpp index cdad594..d1ed66d 100644 --- a/examples/historical/readme.cpp +++ b/examples/historical/readme.cpp @@ -9,17 +9,13 @@ namespace db = databento; int main() { auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build(); - db::TsSymbolMap symbol_map; - auto decode_symbols = [&symbol_map](const db::Metadata& metadata) { - symbol_map = metadata.CreateSymbolMap(); - }; - auto print_trades = [&symbol_map](const db::Record& record) { - const auto& trade_msg = record.Get(); + auto store = + client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, + {"ESM2", "NQZ2"}, db::Schema::Trades); + auto symbol_map = store.GetMetadata().CreateSymbolMap(); + while (const auto* record = store.NextRecord()) { + const auto& trade_msg = record->Get(); std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg << '\n'; - return db::KeepGoing::Continue; - }; - client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"}, - {"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol, - db::SType::InstrumentId, {}, decode_symbols, print_trades); + } } diff --git a/examples/historical/timeseries_get_range.cpp b/examples/historical/timeseries_get_range.cpp index 1ee9fe9..602854c 100644 --- a/examples/historical/timeseries_get_range.cpp +++ b/examples/historical/timeseries_get_range.cpp @@ -9,15 +9,14 @@ namespace db = databento; int main() { auto client = db::Historical::Builder().SetKeyFromEnv().Build(); const auto limit = 1000; - client.TimeseriesGetRange( + auto store = client.TimeseriesGetRange( db::dataset::kGlbxMdp3, db::DateTimeRange{"2022-10-03"}, {"ESZ2"}, - db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit, - [](db::Metadata&& metadata) { std::cout << metadata << '\n'; }, - [](const db::Record& record) { - const auto& trade_msg = record.Get(); - std::cout << trade_msg << '\n'; - return db::KeepGoing::Continue; - }); + db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit); + std::cout << store.GetMetadata() << '\n'; + while (const auto* record = store.NextRecord()) { + const auto& trade_msg = record->Get(); + std::cout << trade_msg << '\n'; + } return 0; } diff --git a/examples/historical/timeseries_get_range_to_file.cpp b/examples/historical/timeseries_get_range_to_file.cpp index 5165b0d..ffea4e3 100644 --- a/examples/historical/timeseries_get_range_to_file.cpp +++ b/examples/historical/timeseries_get_range_to_file.cpp @@ -1,6 +1,6 @@ #include "databento/constants.hpp" #include "databento/datetime.hpp" -#include "databento/dbn_file_store.hpp" +#include "databento/dbn_store.hpp" #include "databento/enums.hpp" #include "databento/historical.hpp" #include "databento/record.hpp" @@ -10,14 +10,13 @@ namespace db = databento; int main() { auto client = db::Historical::Builder().SetKeyFromEnv().Build(); const auto limit = 1000; - db::DbnFileStore dbn_file_store = client.TimeseriesGetRangeToFile( + db::DbnStore dbn_store = client.TimeseriesGetRangeToFile( db::dataset::kGlbxMdp3, {"2022-10-03T00:00", "2022-10-04T00:00"}, {"ESZ2"}, db::Schema::Ohlcv1M, db::SType::RawSymbol, db::SType::InstrumentId, limit, "ESZ2-ohlcv1m-20201003-20201004.dbn.zst"); - dbn_file_store.Replay([](const db::Record record) { - const auto& ohlcv_bar = record.Get(); + while (const auto* record = dbn_store.NextRecord()) { + const auto& ohlcv_bar = record->Get(); std::cout << ohlcv_bar << '\n'; - return db::KeepGoing::Continue; - }); + } return 0; } diff --git a/include/databento/dbn_file_store.hpp b/include/databento/dbn_file_store.hpp index 935cf64..e2bd085 100644 --- a/include/databento/dbn_file_store.hpp +++ b/include/databento/dbn_file_store.hpp @@ -1,40 +1,7 @@ #pragma once -#include // path - -#include "databento/dbn.hpp" // DecodeMetadata -#include "databento/dbn_decoder.hpp" // DbnDecoder -#include "databento/enums.hpp" // VersionUpgradePolicy -#include "databento/log.hpp" -#include "databento/record.hpp" -#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback +#include "databento/dbn_store.hpp" namespace databento { -// A reader for DBN files. This class provides both a callback API similar to -// TimeseriesGetRange in historical data and LiveThreaded for live data as well -// as a blocking API similar to that of LiveBlocking. Only one API should be -// used on a given instance. -class DbnFileStore { - public: - explicit DbnFileStore(const std::filesystem::path& file_path); - DbnFileStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path, - VersionUpgradePolicy upgrade_policy); - - // Callback API: calling Replay consumes the input. - void Replay(const MetadataCallback& metadata_callback, - const RecordCallback& record_callback); - void Replay(const RecordCallback& record_callback); - - // Blocking API - const Metadata& GetMetadata(); - // Returns the next record or `nullptr` if there are no remaining records. - const Record* NextRecord(); - - private: - void MaybeDecodeMetadata(); - - DbnDecoder decoder_; - Metadata metadata_{}; - bool has_decoded_metadata_{false}; -}; -} // namespace databento +using DbnFileStore = DbnStore; +} diff --git a/include/databento/dbn_store.hpp b/include/databento/dbn_store.hpp new file mode 100644 index 0000000..05681b6 --- /dev/null +++ b/include/databento/dbn_store.hpp @@ -0,0 +1,43 @@ +#pragma once + +#include // path +#include // unique_ptr + +#include "databento/dbn.hpp" // DecodeMetadata +#include "databento/dbn_decoder.hpp" // DbnDecoder +#include "databento/enums.hpp" // VersionUpgradePolicy +#include "databento/ireadable.hpp" +#include "databento/log.hpp" +#include "databento/record.hpp" +#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback + +namespace databento { +// A reader for DBN data from files or streams. This class provides both a callback API +// similar to `LiveThreaded` for live data as well as a blocking API similar to that of +// `LiveBlocking`. Only one API should be used on a given instance. +class DbnStore { + public: + explicit DbnStore(const std::filesystem::path& file_path); + DbnStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path, + VersionUpgradePolicy upgrade_policy); + DbnStore(ILogReceiver* log_receiver, std::unique_ptr input, + VersionUpgradePolicy upgrade_policy); + + // Callback API: calling Replay consumes the input. + void Replay(const MetadataCallback& metadata_callback, + const RecordCallback& record_callback); + void Replay(const RecordCallback& record_callback); + + // Blocking API + const databento::Metadata& GetMetadata(); + // Returns the next record or `nullptr` if there are no remaining records. + const Record* NextRecord(); + + private: + void MaybeDecodeMetadata(); + + DbnDecoder decoder_; + databento::Metadata metadata_{}; + bool has_decoded_metadata_{false}; +}; +} // namespace databento diff --git a/include/databento/detail/http_client.hpp b/include/databento/detail/http_client.hpp index 0b9df4e..0b740dc 100644 --- a/include/databento/detail/http_client.hpp +++ b/include/databento/detail/http_client.hpp @@ -9,10 +9,12 @@ #include #include +#include // unique_ptr #include namespace databento { class ILogReceiver; +class IReadable; namespace detail { class HttpClient { public: @@ -27,6 +29,8 @@ class HttpClient { const httplib::ContentReceiver& callback); void PostRawStream(const std::string& path, const httplib::Params& form_params, const httplib::ContentReceiver& callback); + std::unique_ptr OpenPostStream(const std::string& path, + const httplib::Params& form_params); private: static bool IsErrorStatus(int status_code); diff --git a/include/databento/detail/http_stream_reader.hpp b/include/databento/detail/http_stream_reader.hpp new file mode 100644 index 0000000..4b3223a --- /dev/null +++ b/include/databento/detail/http_stream_reader.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include + +#include // milliseconds +#include // byte, size_t + +#include "databento/ireadable.hpp" + +namespace databento::detail { +// Adapts an httplib StreamHandle to the IReadable interface, allowing +// HTTP response bodies to be read incrementally by DbnDecoder. +class HttpStreamReader : public IReadable { + public: + explicit HttpStreamReader(httplib::ClientImpl::StreamHandle handle); + + void ReadExact(std::byte* buffer, std::size_t length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + // timeout is ignored; historical data is always immediately available or EOF + Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) override; + + private: + httplib::ClientImpl::StreamHandle handle_; +}; +} // namespace databento::detail diff --git a/include/databento/historical.hpp b/include/databento/historical.hpp index d0a4850..9c05cfd 100644 --- a/include/databento/historical.hpp +++ b/include/databento/historical.hpp @@ -7,9 +7,9 @@ #include #include -#include "databento/batch.hpp" // BatchJob -#include "databento/datetime.hpp" // DateRange, DateTimeRange, UnixNanos -#include "databento/dbn_file_store.hpp" +#include "databento/batch.hpp" // BatchJob +#include "databento/datetime.hpp" // DateRange, DateTimeRange, UnixNanos +#include "databento/dbn_store.hpp" // DbnStore #include "databento/detail/http_client.hpp" // HttpClient #include "databento/enums.hpp" // BatchState, Delivery, DurationInterval, Schema, SType, VersionUpgradePolicy #include "databento/metadata.hpp" // DatasetConditionDetail, DatasetRange, FieldDetail, PublisherDetail, UnitPricesForMode @@ -188,31 +188,53 @@ class Historical { SType stype_in, SType stype_out, std::uint64_t limit, const MetadataCallback& metadata_callback, const RecordCallback& record_callback); - // Stream historical market data to a file at `path`. Returns a `DbnFileStore` + // Stream historical market data and return a `DbnStore` that can be consumed + // using `Metadata()` / `NextRecord()`. + // + // WARNING: Calling this method will incur a cost. + DbnStore TimeseriesGetRange(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema); + DbnStore TimeseriesGetRange(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema); + DbnStore TimeseriesGetRange(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema, + SType stype_in, SType stype_out, std::uint64_t limit); + DbnStore TimeseriesGetRange(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema, + SType stype_in, SType stype_out, std::uint64_t limit); + + // Stream historical market data to a file at `path`. Returns a `DbnStore` // object for replaying the data in `file_path`. // // If a file at `file_path` already exists, it will be overwritten. // // WARNING: Calling this method will incur a cost. - DbnFileStore TimeseriesGetRangeToFile(const std::string& dataset, - const DateTimeRange& datetime_range, - const std::vector& symbols, - Schema schema, - const std::filesystem::path& file_path); - DbnFileStore TimeseriesGetRangeToFile( - const std::string& dataset, const DateTimeRange& datetime_range, - const std::vector& symbols, Schema schema, - const std::filesystem::path& file_path); - DbnFileStore TimeseriesGetRangeToFile(const std::string& dataset, - const DateTimeRange& datetime_range, - const std::vector& symbols, - Schema schema, SType stype_in, SType stype_out, - std::uint64_t limit, - const std::filesystem::path& file_path); - DbnFileStore TimeseriesGetRangeToFile( - const std::string& dataset, const DateTimeRange& datetime_range, - const std::vector& symbols, Schema schema, SType stype_in, - SType stype_out, std::uint64_t limit, const std::filesystem::path& file_path); + DbnStore TimeseriesGetRangeToFile(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, + Schema schema, + const std::filesystem::path& file_path); + DbnStore TimeseriesGetRangeToFile(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, + Schema schema, + const std::filesystem::path& file_path); + DbnStore TimeseriesGetRangeToFile(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, + Schema schema, SType stype_in, SType stype_out, + std::uint64_t limit, + const std::filesystem::path& file_path); + DbnStore TimeseriesGetRangeToFile(const std::string& dataset, + const DateTimeRange& datetime_range, + const std::vector& symbols, + Schema schema, SType stype_in, SType stype_out, + std::uint64_t limit, + const std::filesystem::path& file_path); private: friend HistoricalBuilder; @@ -237,8 +259,9 @@ class Historical { void TimeseriesGetRange(const HttplibParams& params, const MetadataCallback& metadata_callback, const RecordCallback& record_callback); - DbnFileStore TimeseriesGetRangeToFile(const HttplibParams& params, - const std::filesystem::path& file_path); + DbnStore TimeseriesGetRange(const HttplibParams& params); + DbnStore TimeseriesGetRangeToFile(const HttplibParams& params, + const std::filesystem::path& file_path); ILogReceiver* log_receiver_; const std::string key_; diff --git a/src/batch.cpp b/src/batch.cpp index 75c529b..2f6f303 100644 --- a/src/batch.cpp +++ b/src/batch.cpp @@ -2,18 +2,20 @@ #include -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" namespace databento { -std::string ToString(const BatchJob& batch_job) { return MakeString(batch_job); } +std::string ToString(const BatchJob& batch_job) { + return detail::MakeString(batch_job); +} std::ostream& operator<<(std::ostream& stream, const BatchJob& batch_job) { std::ostringstream symbol_stream; - auto symbol_helper = StreamOpBuilder{symbol_stream}.SetSpacer(" ").Build(); + auto symbol_helper = detail::StreamOpBuilder{symbol_stream}.SetSpacer(" ").Build(); for (const auto& symbol : batch_job.symbols) { symbol_helper.AddItem(symbol); } - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("BatchJob") .Build() @@ -51,10 +53,12 @@ std::ostream& operator<<(std::ostream& stream, const BatchJob& batch_job) { .Finish(); } -std::string ToString(const BatchFileDesc& file_desc) { return MakeString(file_desc); } +std::string ToString(const BatchFileDesc& file_desc) { + return detail::MakeString(file_desc); +} std::ostream& operator<<(std::ostream& stream, const BatchFileDesc& file_desc) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("BatchFileDesc") .Build() diff --git a/src/datetime.cpp b/src/datetime.cpp index 36b7299..1a87d16 100644 --- a/src/datetime.cpp +++ b/src/datetime.cpp @@ -10,7 +10,7 @@ #include // ostringstream #include "databento/constants.hpp" // kUndefTimestamp -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" namespace databento { std::string ToIso8601(UnixNanos unix_nanos) { @@ -63,11 +63,11 @@ std::string DateFromIso8601Int(std::uint32_t date_int) { } std::string ToString(const DateTimeRange& dt_range) { - return MakeString(dt_range); + return detail::MakeString(dt_range); } std::ostream& operator<<(std::ostream& stream, const DateTimeRange& dt_range) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("DateTimeRange") .Build() @@ -77,11 +77,11 @@ std::ostream& operator<<(std::ostream& stream, } std::string ToString(const DateTimeRange& dt_range) { - return MakeString(dt_range); + return detail::MakeString(dt_range); } std::ostream& operator<<(std::ostream& stream, const DateTimeRange& dt_range) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("DateTimeRange") .Build() diff --git a/src/dbn.cpp b/src/dbn.cpp index 92757ae..f688755 100644 --- a/src/dbn.cpp +++ b/src/dbn.cpp @@ -5,7 +5,7 @@ #include "databento/constants.hpp" #include "databento/symbol_map.hpp" -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" namespace databento { @@ -25,9 +25,9 @@ void Metadata::Upgrade(VersionUpgradePolicy upgrade_policy) { } } -std::string ToString(const Metadata& metadata) { return MakeString(metadata); } +std::string ToString(const Metadata& metadata) { return detail::MakeString(metadata); } std::ostream& operator<<(std::ostream& stream, const Metadata& metadata) { - auto helper = StreamOpBuilder{stream} + auto helper = detail::StreamOpBuilder{stream} .SetTypeName("Metadata") .SetSpacer("\n ") .Build() @@ -50,7 +50,7 @@ std::ostream& operator<<(std::ostream& stream, const Metadata& metadata) { "not_found"}; for (std::size_t i = 0; i < kVecCount; ++i) { std::ostringstream vec_stream; - auto vec_helper = StreamOpBuilder{vec_stream}.SetSpacer(" ").Build(); + auto vec_helper = detail::StreamOpBuilder{vec_stream}.SetSpacer(" ").Build(); for (const auto& str : metadata.*(kStrVecs[i])) { vec_helper.AddItem(str); } @@ -61,7 +61,7 @@ std::ostream& operator<<(std::ostream& stream, const Metadata& metadata) { // format mappings std::ostringstream mappings; auto mappings_helper = - StreamOpBuilder{mappings}.SetIndent(" ").SetSpacer("\n ").Build(); + detail::StreamOpBuilder{mappings}.SetIndent(" ").SetSpacer("\n ").Build(); for (const auto& mapping : metadata.mappings) { mappings_helper.AddItem(mapping); } @@ -70,14 +70,16 @@ std::ostream& operator<<(std::ostream& stream, const Metadata& metadata) { .Finish(); } -std::string ToString(const SymbolMapping& mapping) { return MakeString(mapping); } +std::string ToString(const SymbolMapping& mapping) { + return detail::MakeString(mapping); +} std::ostream& operator<<(std::ostream& stream, const SymbolMapping& mapping) { std::ostringstream intervals; - auto intervals_helper = StreamOpBuilder{intervals}.SetSpacer(" ").Build(); + auto intervals_helper = detail::StreamOpBuilder{intervals}.SetSpacer(" ").Build(); for (const auto& interval : mapping.intervals) { intervals_helper.AddItem(interval); } - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("SymbolMapping") .Build() @@ -87,9 +89,11 @@ std::ostream& operator<<(std::ostream& stream, const SymbolMapping& mapping) { .Finish(); } -std::string ToString(const MappingInterval& interval) { return MakeString(interval); } +std::string ToString(const MappingInterval& interval) { + return detail::MakeString(interval); +} std::ostream& operator<<(std::ostream& stream, const MappingInterval& interval) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("MappingInterval") .Build() diff --git a/src/dbn_file_store.cpp b/src/dbn_store.cpp similarity index 51% rename from src/dbn_file_store.cpp rename to src/dbn_store.cpp index 28c211b..13a9944 100644 --- a/src/dbn_file_store.cpp +++ b/src/dbn_store.cpp @@ -1,4 +1,4 @@ -#include "databento/dbn_file_store.hpp" +#include "databento/dbn_store.hpp" #include // unique_ptr #include // move @@ -6,19 +6,22 @@ #include "databento/file_stream.hpp" #include "databento/record.hpp" -using databento::DbnFileStore; +using databento::DbnStore; -DbnFileStore::DbnFileStore(const std::filesystem::path& file_path) +DbnStore::DbnStore(const std::filesystem::path& file_path) : decoder_{ILogReceiver::Default(), InFileStream{file_path}} {} -DbnFileStore::DbnFileStore(ILogReceiver* log_receiver, - const std::filesystem::path& file_path, - VersionUpgradePolicy upgrade_policy) +DbnStore::DbnStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path, + VersionUpgradePolicy upgrade_policy) : decoder_{log_receiver, std::make_unique(file_path), upgrade_policy} {} -void DbnFileStore::Replay(const MetadataCallback& metadata_callback, - const RecordCallback& record_callback) { +DbnStore::DbnStore(ILogReceiver* log_receiver, std::unique_ptr input, + VersionUpgradePolicy upgrade_policy) + : decoder_{log_receiver, std::move(input), upgrade_policy} {} + +void DbnStore::Replay(const MetadataCallback& metadata_callback, + const RecordCallback& record_callback) { auto metadata = decoder_.DecodeMetadata(); if (metadata_callback) { metadata_callback(std::move(metadata)); @@ -31,21 +34,21 @@ void DbnFileStore::Replay(const MetadataCallback& metadata_callback, } } -void DbnFileStore::Replay(const RecordCallback& record_callback) { +void DbnStore::Replay(const RecordCallback& record_callback) { Replay({}, record_callback); } -const databento::Metadata& DbnFileStore::GetMetadata() { +const databento::Metadata& DbnStore::GetMetadata() { MaybeDecodeMetadata(); return metadata_; } -const databento::Record* DbnFileStore::NextRecord() { +const databento::Record* DbnStore::NextRecord() { MaybeDecodeMetadata(); return decoder_.DecodeRecord(); } -void DbnFileStore::MaybeDecodeMetadata() { +void DbnStore::MaybeDecodeMetadata() { if (!has_decoded_metadata_) { metadata_ = decoder_.DecodeMetadata(); has_decoded_metadata_ = true; diff --git a/src/detail/buffer.cpp b/src/detail/buffer.cpp index 22dec0d..88198d4 100644 --- a/src/detail/buffer.cpp +++ b/src/detail/buffer.cpp @@ -4,7 +4,7 @@ #include #include "databento/exceptions.hpp" -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" using databento::detail::Buffer; using Status = databento::IReadable::Status; diff --git a/src/detail/dbn_buffer_decoder.cpp b/src/detail/dbn_buffer_decoder.cpp index 7693b25..15f0633 100644 --- a/src/detail/dbn_buffer_decoder.cpp +++ b/src/detail/dbn_buffer_decoder.cpp @@ -3,7 +3,7 @@ #include "databento/dbn_decoder.hpp" #include "databento/timeseries.hpp" #include "dbn_constants.hpp" -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" using databento::detail::DbnBufferDecoder; diff --git a/src/detail/http_client.cpp b/src/detail/http_client.cpp index 9d257bc..77a148c 100644 --- a/src/detail/http_client.cpp +++ b/src/detail/http_client.cpp @@ -1,11 +1,16 @@ #include "databento/detail/http_client.hpp" +#include + #include // seconds +#include // make_unique #include // ostringstream #include "databento/constants.hpp" // kUserAgent #include "databento/exceptions.hpp" // HttpResponseError, HttpRequestError, JsonResponseError -#include "databento/log.hpp" // ILogReceiver, LogLevel +#include "databento/ireadable.hpp" +#include "databento/log.hpp" // ILogReceiver, LogLevel +#include "detail/http_stream_reader.hpp" // HttpStreamReader using databento::detail::HttpClient; @@ -18,7 +23,9 @@ const httplib::Headers HttpClient::kHeaders{ HttpClient::HttpClient(databento::ILogReceiver* log_receiver, const std::string& key, const std::string& gateway) : log_receiver_{log_receiver}, client_{gateway} { - client_.set_default_headers(HttpClient::kHeaders); + auto headers = HttpClient::kHeaders; + headers.insert(httplib::make_basic_authentication_header(key, "")); + client_.set_default_headers(headers); client_.set_basic_auth(key, ""); client_.set_read_timeout(kTimeout); client_.set_write_timeout(kTimeout); @@ -27,7 +34,9 @@ HttpClient::HttpClient(databento::ILogReceiver* log_receiver, const std::string& HttpClient::HttpClient(databento::ILogReceiver* log_receiver, const std::string& key, const std::string& gateway, std::uint16_t port) : log_receiver_{log_receiver}, client_{gateway, port} { - client_.set_default_headers(HttpClient::kHeaders); + auto headers = HttpClient::kHeaders; + headers.insert(httplib::make_basic_authentication_header(key, "")); + client_.set_default_headers(headers); client_.set_basic_auth(key, ""); client_.set_read_timeout(kTimeout); client_.set_write_timeout(kTimeout); @@ -91,6 +100,32 @@ void HttpClient::PostRawStream(const std::string& path, CheckStatusAndStreamRes(path, err_status, std::move(err_body), res); } +std::unique_ptr HttpClient::OpenPostStream( + const std::string& path, const httplib::Params& form_params) { + const auto body = httplib::detail::params_to_query_str(form_params); + // NOLINTNEXTLINE(clang-analyzer-unix.BlockInCriticalSection): dependency code + auto handle = client_.open_stream("POST", path, {}, {}, body, + "application/x-www-form-urlencoded"); + if (handle.error != httplib::Error::Success) { + throw HttpRequestError{path, handle.error}; + } + if (!handle.is_valid()) { + throw HttpRequestError{path, httplib::Error::Connection}; + } + CheckWarnings(*handle.response); + if (IsErrorStatus(handle.response->status)) { + // Read the full error body + std::string err_body; + std::array buf{}; + ssize_t n; + while ((n = handle.read(buf.data(), buf.size())) > 0) { + err_body.append(buf.data(), static_cast(n)); + } + throw HttpResponseError{path, handle.response->status, std::move(err_body)}; + } + return std::make_unique(std::move(handle)); +} + httplib::ResponseHandler HttpClient::MakeStreamResponseHandler(int& out_status) { return [this, &out_status](const httplib::Response& resp) { if (HttpClient::IsErrorStatus(resp.status)) { diff --git a/src/detail/http_stream_reader.cpp b/src/detail/http_stream_reader.cpp new file mode 100644 index 0000000..edbfb30 --- /dev/null +++ b/src/detail/http_stream_reader.cpp @@ -0,0 +1,44 @@ +#include "databento/detail/http_stream_reader.hpp" + +#include // byte, size_t +#include +#include // move + +#include "databento/exceptions.hpp" + +using databento::detail::HttpStreamReader; + +HttpStreamReader::HttpStreamReader(httplib::ClientImpl::StreamHandle handle) + : handle_{std::move(handle)} {} + +void HttpStreamReader::ReadExact(std::byte* buffer, std::size_t length) { + std::size_t total_read = 0; + while (total_read < length) { + const auto n = + handle_.read(reinterpret_cast(buffer) + total_read, length - total_read); + if (n <= 0) { + std::ostringstream ss; + ss << "[HttpStreamReader::ReadExact] Expected " << length + << " bytes but only read " << total_read; + throw DbnResponseError{ss.str()}; + } + total_read += static_cast(n); + } +} + +std::size_t HttpStreamReader::ReadSome(std::byte* buffer, std::size_t max_length) { + const auto n = handle_.read(reinterpret_cast(buffer), max_length); + if (n < 0) { + return 0; + } + return static_cast(n); +} + +databento::IReadable::Result HttpStreamReader::ReadSome( + std::byte* buffer, std::size_t max_length, std::chrono::milliseconds /*timeout*/) { + const auto read_size = ReadSome(buffer, max_length); + if (read_size == 0) { + return {0, Status::Closed}; + } + return {read_size, Status::Ok}; +} diff --git a/src/detail/http_stream_reader.hpp b/src/detail/http_stream_reader.hpp new file mode 100644 index 0000000..4b3223a --- /dev/null +++ b/src/detail/http_stream_reader.hpp @@ -0,0 +1,26 @@ +#pragma once + +#include + +#include // milliseconds +#include // byte, size_t + +#include "databento/ireadable.hpp" + +namespace databento::detail { +// Adapts an httplib StreamHandle to the IReadable interface, allowing +// HTTP response bodies to be read incrementally by DbnDecoder. +class HttpStreamReader : public IReadable { + public: + explicit HttpStreamReader(httplib::ClientImpl::StreamHandle handle); + + void ReadExact(std::byte* buffer, std::size_t length) override; + std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override; + // timeout is ignored; historical data is always immediately available or EOF + Result ReadSome(std::byte* buffer, std::size_t max_length, + std::chrono::milliseconds timeout) override; + + private: + httplib::ClientImpl::StreamHandle handle_; +}; +} // namespace databento::detail diff --git a/src/stream_op_helper.hpp b/src/detail/stream_op_helper.hpp similarity index 98% rename from src/stream_op_helper.hpp rename to src/detail/stream_op_helper.hpp index 9b53823..29886e2 100644 --- a/src/stream_op_helper.hpp +++ b/src/detail/stream_op_helper.hpp @@ -14,7 +14,7 @@ #include "databento/datetime.hpp" // TimeDeltaNanos, UnixNanos -namespace databento { +namespace databento::detail { template // Helper for types that implement a stream operator std::string MakeString(const T& val) { @@ -172,4 +172,4 @@ class StreamOpBuilder { std::string type_name_; std::string spacer_; }; -} // namespace databento +} // namespace databento::detail diff --git a/src/flag_set.cpp b/src/flag_set.cpp index fd2a650..dd6539e 100644 --- a/src/flag_set.cpp +++ b/src/flag_set.cpp @@ -2,7 +2,7 @@ #include -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" namespace databento { std::ostream& operator<<(std::ostream& stream, FlagSet flag_set) { @@ -38,5 +38,5 @@ std::ostream& operator<<(std::ostream& stream, FlagSet flag_set) { return stream; } -std::string ToString(FlagSet flags) { return MakeString(flags); } +std::string ToString(FlagSet flags) { return detail::MakeString(flags); } } // namespace databento diff --git a/src/historical.cpp b/src/historical.cpp index 399ae2c..8b53d89 100644 --- a/src/historical.cpp +++ b/src/historical.cpp @@ -20,6 +20,7 @@ #include "databento/constants.hpp" #include "databento/datetime.hpp" #include "databento/dbn_file_store.hpp" +#include "databento/dbn_store.hpp" #include "databento/detail/dbn_buffer_decoder.hpp" #include "databento/detail/json_helpers.hpp" #include "databento/detail/sha256_hasher.hpp" @@ -1002,10 +1003,61 @@ void Historical::TimeseriesGetRange(const HttplibParams& params, } } +databento::DbnStore Historical::TimeseriesGetRange( + const std::string& dataset, const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema) { + return this->TimeseriesGetRange(dataset, datetime_range, symbols, schema, + kDefaultSTypeIn, kDefaultSTypeOut, {}); +} +databento::DbnStore Historical::TimeseriesGetRange( + const std::string& dataset, const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema) { + return this->TimeseriesGetRange(dataset, datetime_range, symbols, schema, + kDefaultSTypeIn, kDefaultSTypeOut, {}); +} +databento::DbnStore Historical::TimeseriesGetRange( + const std::string& dataset, const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema, SType stype_in, + SType stype_out, std::uint64_t limit) { + httplib::Params params{ + {"dataset", dataset}, + {"encoding", "dbn"}, + {"compression", "zstd"}, + {"start", ToString(datetime_range.start)}, + {"symbols", JoinSymbolStrings(kTimeseriesGetRangeEndpoint, symbols)}, + {"schema", ToString(schema)}, + {"stype_in", ToString(stype_in)}, + {"stype_out", ToString(stype_out)}}; + detail::SetIfPositive(¶ms, "end", datetime_range.end); + detail::SetIfPositive(¶ms, "limit", limit); + return this->TimeseriesGetRange(params); +} +databento::DbnStore Historical::TimeseriesGetRange( + const std::string& dataset, const DateTimeRange& datetime_range, + const std::vector& symbols, Schema schema, SType stype_in, + SType stype_out, std::uint64_t limit) { + httplib::Params params{ + {"dataset", dataset}, + {"encoding", "dbn"}, + {"compression", "zstd"}, + {"start", datetime_range.start}, + {"symbols", JoinSymbolStrings(kTimeseriesGetRangeEndpoint, symbols)}, + {"schema", ToString(schema)}, + {"stype_in", ToString(stype_in)}, + {"stype_out", ToString(stype_out)}}; + detail::SetIfNotEmpty(¶ms, "end", datetime_range.end); + detail::SetIfPositive(¶ms, "limit", limit); + return this->TimeseriesGetRange(params); +} +databento::DbnStore Historical::TimeseriesGetRange(const HttplibParams& params) { + auto stream = client_.OpenPostStream(kTimeseriesGetRangePath, params); + return DbnStore{log_receiver_, std::move(stream), upgrade_policy_}; +} + static const std::string kTimeseriesGetRangeToFileEndpoint = "Historical::TimeseriesGetRangeToFile"; -databento::DbnFileStore Historical::TimeseriesGetRangeToFile( +databento::DbnStore Historical::TimeseriesGetRangeToFile( const std::string& dataset, const DateTimeRange& datetime_range, const std::vector& symbols, Schema schema, const std::filesystem::path& file_path) { @@ -1013,7 +1065,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( kDefaultSTypeIn, kDefaultSTypeOut, {}, file_path); } -databento::DbnFileStore Historical::TimeseriesGetRangeToFile( +databento::DbnStore Historical::TimeseriesGetRangeToFile( const std::string& dataset, const DateTimeRange& datetime_range, const std::vector& symbols, Schema schema, const std::filesystem::path& file_path) { @@ -1021,7 +1073,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( kDefaultSTypeIn, kDefaultSTypeOut, {}, file_path); } -databento::DbnFileStore Historical::TimeseriesGetRangeToFile( +databento::DbnStore Historical::TimeseriesGetRangeToFile( const std::string& dataset, const DateTimeRange& datetime_range, const std::vector& symbols, Schema schema, SType stype_in, SType stype_out, std::uint64_t limit, const std::filesystem::path& file_path) { @@ -1038,7 +1090,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( detail::SetIfPositive(¶ms, "limit", limit); return this->TimeseriesGetRangeToFile(params, file_path); } -databento::DbnFileStore Historical::TimeseriesGetRangeToFile( +databento::DbnStore Historical::TimeseriesGetRangeToFile( const std::string& dataset, const DateTimeRange& datetime_range, const std::vector& symbols, Schema schema, SType stype_in, SType stype_out, std::uint64_t limit, const std::filesystem::path& file_path) { @@ -1055,7 +1107,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( detail::SetIfPositive(¶ms, "limit", limit); return this->TimeseriesGetRangeToFile(params, file_path); } -databento::DbnFileStore Historical::TimeseriesGetRangeToFile( +databento::DbnStore Historical::TimeseriesGetRangeToFile( const HttplibParams& params, const std::filesystem::path& file_path) { { OutFileStream out_file{file_path}; @@ -1066,7 +1118,7 @@ databento::DbnFileStore Historical::TimeseriesGetRangeToFile( return true; }); } // Flush out_file - return DbnFileStore{log_receiver_, file_path, upgrade_policy_}; + return DbnStore{log_receiver_, file_path, upgrade_policy_}; } using databento::HistoricalBuilder; diff --git a/src/log.cpp b/src/log.cpp index e244575..9857a39 100644 --- a/src/log.cpp +++ b/src/log.cpp @@ -6,7 +6,7 @@ #include "databento/system.hpp" #include "databento/version.hpp" -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" databento::ILogReceiver* databento::ILogReceiver::Default() { static const std::unique_ptr gDefaultLogger{ @@ -64,7 +64,7 @@ void LogPlatformInfo() { LogPlatformInfo(ILogReceiver::Default()); } void LogPlatformInfo(ILogReceiver* log_receiver) { std::ostringstream ss; - StreamOpBuilder{ss} + detail::StreamOpBuilder{ss} .SetSpacer(" ") .Build() .AddField("client_version", DATABENTO_VERSION) diff --git a/src/metadata.cpp b/src/metadata.cpp index 23e5f31..e4257ab 100644 --- a/src/metadata.cpp +++ b/src/metadata.cpp @@ -3,15 +3,15 @@ #include #include -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" namespace databento { std::string ToString(const PublisherDetail& publisher_detail) { - return MakeString(publisher_detail); + return detail::MakeString(publisher_detail); } std::ostream& operator<<(std::ostream& stream, const PublisherDetail& publisher_detail) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("PublisherDetail") .Build() @@ -23,10 +23,10 @@ std::ostream& operator<<(std::ostream& stream, } std::string ToString(const FieldDetail& field_detail) { - return MakeString(field_detail); + return detail::MakeString(field_detail); } std::ostream& operator<<(std::ostream& stream, const FieldDetail& field_detail) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("FieldDetail") .Build() @@ -36,11 +36,11 @@ std::ostream& operator<<(std::ostream& stream, const FieldDetail& field_detail) } std::string ToString(const DatasetConditionDetail& condition_detail) { - return MakeString(condition_detail); + return detail::MakeString(condition_detail); } std::ostream& operator<<(std::ostream& stream, const DatasetConditionDetail& condition_detail) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("DatasetConditionDetail") .Build() @@ -51,17 +51,19 @@ std::ostream& operator<<(std::ostream& stream, } std::string ToString(const DatasetRange& dataset_range) { - return MakeString(dataset_range); + return detail::MakeString(dataset_range); } std::ostream& operator<<(std::ostream& stream, const DatasetRange& dataset_range) { std::ostringstream range_by_schema_ss; - auto range_by_schema_helper = - StreamOpBuilder{range_by_schema_ss}.SetSpacer("\n ").SetIndent(" ").Build(); + auto range_by_schema_helper = detail::StreamOpBuilder{range_by_schema_ss} + .SetSpacer("\n ") + .SetIndent(" ") + .Build(); for (const auto& [schema, range] : dataset_range.range_by_schema) { range_by_schema_helper.AddKeyVal(schema, range); } range_by_schema_helper.Finish(); - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("DatasetRange") .Build() diff --git a/src/record.cpp b/src/record.cpp index be0f29a..d7f169b 100644 --- a/src/record.cpp +++ b/src/record.cpp @@ -5,13 +5,13 @@ #include "databento/enums.hpp" #include "databento/exceptions.hpp" // InvalidArgumentError #include "databento/pretty.hpp" // Px -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" using databento::Record; using databento::RecordHeader; std::size_t RecordHeader::Size() const { - return static_cast(length) * kLengthMultiplier; + return static_cast(length) * kLengthMultiplier; } std::size_t Record::Size() const { return record_->Size(); } @@ -212,18 +212,18 @@ bool databento::operator==(const ImbalanceMsg& lhs, const ImbalanceMsg& rhs) { } namespace databento { -std::string ToString(const Record& record) { return MakeString(record); } +std::string ToString(const Record& record) { return detail::MakeString(record); } std::ostream& operator<<(std::ostream& stream, const Record& record) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("Record") .Build() .AddField("ptr", record.Header()) .Finish(); } -std::string ToString(const RecordHeader& header) { return MakeString(header); } +std::string ToString(const RecordHeader& header) { return detail::MakeString(header); } std::ostream& operator<<(std::ostream& stream, const RecordHeader& header) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("RecordHeader") .Build() @@ -235,9 +235,9 @@ std::ostream& operator<<(std::ostream& stream, const RecordHeader& header) { .Finish(); } -std::string ToString(const MboMsg& mbo_msg) { return MakeString(mbo_msg); } +std::string ToString(const MboMsg& mbo_msg) { return detail::MakeString(mbo_msg); } std::ostream& operator<<(std::ostream& stream, const MboMsg& mbo_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("MboMsg") .Build() @@ -256,10 +256,10 @@ std::ostream& operator<<(std::ostream& stream, const MboMsg& mbo_msg) { } std::string ToString(const BidAskPair& bid_ask_pair) { - return MakeString(bid_ask_pair); + return detail::MakeString(bid_ask_pair); } std::ostream& operator<<(std::ostream& stream, const BidAskPair& bid_ask_pair) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("BidAskPair") .Build() @@ -273,11 +273,11 @@ std::ostream& operator<<(std::ostream& stream, const BidAskPair& bid_ask_pair) { } std::string ToString(const ConsolidatedBidAskPair& consolidated_bid_ask_pair) { - return MakeString(consolidated_bid_ask_pair); + return detail::MakeString(consolidated_bid_ask_pair); } std::ostream& operator<<(std::ostream& stream, const ConsolidatedBidAskPair& consolidated_bid_ask_pair) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer(" ") .SetTypeName("ConsolidatedBidAskPair") .Build() @@ -290,9 +290,11 @@ std::ostream& operator<<(std::ostream& stream, .Finish(); } -std::string ToString(const TradeMsg& trade_msg) { return MakeString(trade_msg); } +std::string ToString(const TradeMsg& trade_msg) { + return detail::MakeString(trade_msg); +} std::ostream& operator<<(std::ostream& stream, const TradeMsg& trade_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("TradeMsg") .Build() @@ -309,9 +311,9 @@ std::ostream& operator<<(std::ostream& stream, const TradeMsg& trade_msg) { .Finish(); } -std::string ToString(const Mbp1Msg& mbp1_msg) { return MakeString(mbp1_msg); } +std::string ToString(const Mbp1Msg& mbp1_msg) { return detail::MakeString(mbp1_msg); } std::ostream& operator<<(std::ostream& stream, const Mbp1Msg& mbp1_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("Mbp1Msg") .Build() @@ -330,15 +332,18 @@ std::ostream& operator<<(std::ostream& stream, const Mbp1Msg& mbp1_msg) { .Finish(); } -std::string ToString(const Mbp10Msg& mbp10_msg) { return MakeString(mbp10_msg); } +std::string ToString(const Mbp10Msg& mbp10_msg) { + return detail::MakeString(mbp10_msg); +} std::ostream& operator<<(std::ostream& stream, const Mbp10Msg& mbp10_msg) { std::ostringstream sub_ss; - auto helper = StreamOpBuilder{sub_ss}.SetSpacer("\n ").SetIndent(" ").Build(); + auto helper = + detail::StreamOpBuilder{sub_ss}.SetSpacer("\n ").SetIndent(" ").Build(); for (const auto& level : mbp10_msg.levels) { helper.AddItem(level); } - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("Mbp10Msg") .Build() @@ -356,9 +361,9 @@ std::ostream& operator<<(std::ostream& stream, const Mbp10Msg& mbp10_msg) { .Finish(); } -std::string ToString(const BboMsg& bbo_msg) { return MakeString(bbo_msg); } +std::string ToString(const BboMsg& bbo_msg) { return detail::MakeString(bbo_msg); } std::ostream& operator<<(std::ostream& stream, const BboMsg& bbo_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("BboMsg") .Build() @@ -374,9 +379,11 @@ std::ostream& operator<<(std::ostream& stream, const BboMsg& bbo_msg) { .Finish(); } -std::string ToString(const Cmbp1Msg& cmbp1_msg) { return MakeString(cmbp1_msg); } +std::string ToString(const Cmbp1Msg& cmbp1_msg) { + return detail::MakeString(cmbp1_msg); +} std::ostream& operator<<(std::ostream& stream, const Cmbp1Msg& cmbp1_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("Cmbp1Msg") .Build() @@ -393,9 +400,9 @@ std::ostream& operator<<(std::ostream& stream, const Cmbp1Msg& cmbp1_msg) { .Finish(); } -std::string ToString(const CbboMsg& cbbo_msg) { return MakeString(cbbo_msg); } +std::string ToString(const CbboMsg& cbbo_msg) { return detail::MakeString(cbbo_msg); } std::ostream& operator<<(std::ostream& stream, const CbboMsg& cbbo_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("CbboMsg") .Build() @@ -410,9 +417,11 @@ std::ostream& operator<<(std::ostream& stream, const CbboMsg& cbbo_msg) { .Finish(); } -std::string ToString(const OhlcvMsg& ohlcv_msg) { return MakeString(ohlcv_msg); } +std::string ToString(const OhlcvMsg& ohlcv_msg) { + return detail::MakeString(ohlcv_msg); +} std::ostream& operator<<(std::ostream& stream, const OhlcvMsg& ohlcv_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("OhlcvMsg") .Build() @@ -425,9 +434,11 @@ std::ostream& operator<<(std::ostream& stream, const OhlcvMsg& ohlcv_msg) { .Finish(); } -std::string ToString(const StatusMsg& status_msg) { return MakeString(status_msg); } +std::string ToString(const StatusMsg& status_msg) { + return detail::MakeString(status_msg); +} std::ostream& operator<<(std::ostream& stream, const StatusMsg& status_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("StatusMsg") .Build() @@ -443,11 +454,11 @@ std::ostream& operator<<(std::ostream& stream, const StatusMsg& status_msg) { } std::string ToString(const InstrumentDefMsg& instrument_def_msg) { - return MakeString(instrument_def_msg); + return detail::MakeString(instrument_def_msg); } std::ostream& operator<<(std::ostream& stream, const InstrumentDefMsg& instrument_def_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("InstrumentDefMsg") .Build() @@ -532,10 +543,10 @@ std::ostream& operator<<(std::ostream& stream, } std::string ToString(const ImbalanceMsg& imbalance_msg) { - return MakeString(imbalance_msg); + return detail::MakeString(imbalance_msg); } std::ostream& operator<<(std::ostream& stream, const ImbalanceMsg& imbalance_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("ImbalanceMsg") .Build() @@ -564,9 +575,9 @@ std::ostream& operator<<(std::ostream& stream, const ImbalanceMsg& imbalance_msg .Finish(); } -std::string ToString(const StatMsg& stat_msg) { return MakeString(stat_msg); } +std::string ToString(const StatMsg& stat_msg) { return detail::MakeString(stat_msg); } std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("StatMsg") .Build() @@ -584,9 +595,11 @@ std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { .Finish(); } -std::string ToString(const ErrorMsg& error_msg) { return MakeString(error_msg); } +std::string ToString(const ErrorMsg& error_msg) { + return detail::MakeString(error_msg); +} std::ostream& operator<<(std::ostream& stream, const ErrorMsg& error_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("ErrorMsg") .Build() @@ -598,11 +611,11 @@ std::ostream& operator<<(std::ostream& stream, const ErrorMsg& error_msg) { } std::string ToString(const SymbolMappingMsg& symbol_mapping_msg) { - return MakeString(symbol_mapping_msg); + return detail::MakeString(symbol_mapping_msg); } std::ostream& operator<<(std::ostream& stream, const SymbolMappingMsg& symbol_mapping_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("SymbolMappingMsg") .Build() @@ -616,9 +629,11 @@ std::ostream& operator<<(std::ostream& stream, .Finish(); } -std::string ToString(const SystemMsg& system_msg) { return MakeString(system_msg); } +std::string ToString(const SystemMsg& system_msg) { + return detail::MakeString(system_msg); +} std::ostream& operator<<(std::ostream& stream, const SystemMsg& system_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("SystemMsg") .Build() diff --git a/src/symbology.cpp b/src/symbology.cpp index aa93e4f..4877b56 100644 --- a/src/symbology.cpp +++ b/src/symbology.cpp @@ -7,8 +7,8 @@ #include #include -#include "databento/exceptions.hpp" // InvalidArgumentError -#include "stream_op_helper.hpp" // StreamOpBuilder +#include "databento/exceptions.hpp" // InvalidArgumentError +#include "detail/stream_op_helper.hpp" // StreamOpBuilder namespace databento { TsSymbolMap SymbologyResolution::CreateSymbolMap() const { @@ -53,24 +53,26 @@ std::string JoinSymbolStrings(const std::string& method_name, return JoinSymbolStrings(method_name, symbols.begin(), symbols.end()); } -std::string ToString(const SymbologyResolution& sym_res) { return MakeString(sym_res); } +std::string ToString(const SymbologyResolution& sym_res) { + return detail::MakeString(sym_res); +} std::ostream& operator<<(std::ostream& stream, const SymbologyResolution& sym_res) { - auto stream_helper = StreamOpBuilder{stream} + auto stream_helper = detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("SymbologyResolution") .Build(); std::ostringstream intermediate; std::ostringstream key_value; auto intermediate_builder = - StreamOpBuilder{intermediate}.SetIndent(" ").SetSpacer("\n "); + detail::StreamOpBuilder{intermediate}.SetIndent(" ").SetSpacer("\n "); auto mappings_helper = intermediate_builder.Build(); - auto key_value_builder = StreamOpBuilder{key_value}.SetSpacer(" "); + auto key_value_builder = detail::StreamOpBuilder{key_value}.SetSpacer(" "); for (const auto& [symbol, intervals] : sym_res.mappings) { // empty stream key_value.str(""); std::ostringstream interval_ss; - auto interval_helper = StreamOpBuilder{interval_ss}.SetSpacer(" ").Build(); + auto interval_helper = detail::StreamOpBuilder{interval_ss}.SetSpacer(" ").Build(); for (const auto& interval : intervals) { interval_helper.AddItem(interval); } diff --git a/src/v1.cpp b/src/v1.cpp index 93460e0..678368b 100644 --- a/src/v1.cpp +++ b/src/v1.cpp @@ -11,7 +11,7 @@ #include "databento/record.hpp" #include "databento/v2.hpp" #include "databento/v3.hpp" -#include "stream_op_helper.hpp" // MakeString, StreamOpBuilder +#include "detail/stream_op_helper.hpp" // MakeString, StreamOpBuilder namespace databento::v1 { v2::InstrumentDefMsg InstrumentDefMsg::ToV2() const { @@ -299,9 +299,11 @@ v2::ErrorMsg ErrorMsg::Upgrade() const { return ToV2(); } -std::string ToString(const ErrorMsg& error_msg) { return MakeString(error_msg); } +std::string ToString(const ErrorMsg& error_msg) { + return detail::MakeString(error_msg); +} std::ostream& operator<<(std::ostream& stream, const ErrorMsg& error_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("ErrorMsg") .Build() @@ -377,11 +379,11 @@ v3::InstrumentDefMsg InstrumentDefMsg::Upgrade() const { } std::string ToString(const InstrumentDefMsg& instrument_def_msg) { - return MakeString(instrument_def_msg); + return detail::MakeString(instrument_def_msg); } std::ostream& operator<<(std::ostream& stream, const InstrumentDefMsg& instrument_def_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("InstrumentDefMsg") .Build() @@ -460,9 +462,9 @@ v3::StatMsg StatMsg::Upgrade() const { return ToV3(); } -std::string ToString(const StatMsg& stat_msg) { return MakeString(stat_msg); } +std::string ToString(const StatMsg& stat_msg) { return detail::MakeString(stat_msg); } std::ostream& operator<<(std::ostream& stream, const StatMsg& stat_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("StatMsg") .Build() @@ -486,11 +488,11 @@ v2::SymbolMappingMsg SymbolMappingMsg::Upgrade() const { } std::string ToString(const SymbolMappingMsg& symbol_mapping_msg) { - return MakeString(symbol_mapping_msg); + return detail::MakeString(symbol_mapping_msg); } std::ostream& operator<<(std::ostream& stream, const SymbolMappingMsg& symbol_mapping_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("SymbolMappingMsg") .Build() @@ -507,9 +509,11 @@ v2::SystemMsg SystemMsg::Upgrade() const { return ToV2(); } -std::string ToString(const SystemMsg& system_msg) { return MakeString(system_msg); } +std::string ToString(const SystemMsg& system_msg) { + return detail::MakeString(system_msg); +} std::ostream& operator<<(std::ostream& stream, const SystemMsg& system_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("SystemMsg") .Build() diff --git a/src/v2.cpp b/src/v2.cpp index e6f006c..b4f2f4a 100644 --- a/src/v2.cpp +++ b/src/v2.cpp @@ -2,7 +2,7 @@ #include "databento/pretty.hpp" // Px #include "databento/v3.hpp" -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" // MakeString, StreamOpBuilder namespace databento::v2 { databento::v3::InstrumentDefMsg InstrumentDefMsg::ToV3() const { @@ -155,11 +155,11 @@ databento::InstrumentDefMsg InstrumentDefMsg::Upgrade() const { } std::string ToString(const InstrumentDefMsg& instrument_def_msg) { - return MakeString(instrument_def_msg); + return detail::MakeString(instrument_def_msg); } std::ostream& operator<<(std::ostream& stream, const InstrumentDefMsg& instrument_def_msg) { - return StreamOpBuilder{stream} + return detail::StreamOpBuilder{stream} .SetSpacer("\n ") .SetTypeName("InstrumentDefMsg") .Build() diff --git a/tests/include/mock/mock_http_server.hpp b/tests/include/mock/mock_http_server.hpp index 0189348..02cc9b6 100644 --- a/tests/include/mock/mock_http_server.hpp +++ b/tests/include/mock/mock_http_server.hpp @@ -16,7 +16,10 @@ namespace databento::tests::mock { class MockHttpServer { public: explicit MockHttpServer(std::string api_key) - : port_{server_.bind_to_any_port("localhost")}, api_key_{std::move(api_key)} {} + : port_{server_.bind_to_any_port("localhost")}, api_key_{std::move(api_key)} { + // Default 5s timeout not needed for local testing + server_.set_keep_alive_timeout(1); + } MockHttpServer(MockHttpServer&&) = delete; MockHttpServer& operator=(MockHttpServer&&) = delete; MockHttpServer(const MockHttpServer&) = delete; diff --git a/tests/src/historical_tests.cpp b/tests/src/historical_tests.cpp index 9ada439..614ba1b 100644 --- a/tests/src/historical_tests.cpp +++ b/tests/src/historical_tests.cpp @@ -821,6 +821,60 @@ TEST_F(HistoricalTests, TestTimeseriesGetRange_UnreadBytes) { ASSERT_EQ(logger_.CallCount(), 1); } +TEST_F(HistoricalTests, TestTimeseriesGetRange_Blocking) { + mock_server_.MockPostDbn("/v0/timeseries.get_range", + {{"dataset", dataset::kGlbxMdp3}, + {"symbols", "ESH1"}, + {"schema", "mbo"}, + {"start", "1609160400000711344"}, + {"end", "1609160800000711344"}, + {"encoding", "dbn"}, + {"stype_in", "raw_symbol"}, + {"stype_out", "instrument_id"}, + {"limit", "2"}}, + TEST_DATA_DIR "/test_data.mbo.v3.dbn.zst"); + const auto port = mock_server_.ListenOnThread(); + + databento::Historical target = Client(port); + DbnStore store = target.TimeseriesGetRange( + dataset::kGlbxMdp3, + {UnixNanos{std::chrono::nanoseconds{1609160400000711344}}, + UnixNanos{std::chrono::nanoseconds{1609160800000711344}}}, + {"ESH1"}, Schema::Mbo, SType::RawSymbol, SType::InstrumentId, 2); + const auto& metadata = store.GetMetadata(); + EXPECT_EQ(metadata.limit, 2); + EXPECT_EQ(metadata.schema, Schema::Mbo); + std::vector mbo_records; + while (const auto* record = store.NextRecord()) { + mbo_records.emplace_back(record->Get()); + } + EXPECT_EQ(mbo_records.size(), 2); +} + +TEST_F(HistoricalTests, TestTimeseriesGetRange_BlockingSimple) { + mock_server_.MockPostDbn("/v0/timeseries.get_range", + {{"dataset", dataset::kGlbxMdp3}, + {"start", "2022-10-21T13:30"}, + {"end", "2022-10-21T20:00"}, + {"symbols", "CYZ2"}, + {"schema", "tbbo"}, + {"encoding", "dbn"}, + {"stype_in", "raw_symbol"}, + {"stype_out", "instrument_id"}}, + TEST_DATA_DIR "/test_data.tbbo.v3.dbn.zst"); + const auto port = mock_server_.ListenOnThread(); + + databento::Historical target = Client(port); + DbnStore store = target.TimeseriesGetRange(dataset::kGlbxMdp3, + {"2022-10-21T13:30", "2022-10-21T20:00"}, + {"CYZ2"}, Schema::Tbbo); + std::vector tbbo_records; + while (const auto* record = store.NextRecord()) { + tbbo_records.emplace_back(record->Get()); + } + EXPECT_EQ(tbbo_records.size(), 2); +} + TEST_F(HistoricalTests, TestTimeseriesGetRangeToFile) { mock_server_.MockPostDbn("/v0/timeseries.get_range", {{"dataset", dataset::kGlbxMdp3}, diff --git a/tests/src/stream_op_helper_tests.cpp b/tests/src/stream_op_helper_tests.cpp index 02a55a0..d88a840 100644 --- a/tests/src/stream_op_helper_tests.cpp +++ b/tests/src/stream_op_helper_tests.cpp @@ -7,9 +7,9 @@ #include "databento/constants.hpp" #include "databento/enums.hpp" -#include "stream_op_helper.hpp" +#include "detail/stream_op_helper.hpp" -namespace databento::tests { +namespace databento::detail::tests { TEST(StreamOpHelperTests, TestEmpty) { std::ostringstream stream; @@ -87,4 +87,4 @@ TEST(StreamOpHelperTests, TestCharArray) { target.AddField("array", test_data).Finish(); ASSERT_EQ(stream.str(), R"({array = "USD"})"); } -} // namespace databento::tests +} // namespace databento::detail::tests From dad2c0f98f3924f95824504ca95cf3282deefd9d Mon Sep 17 00:00:00 2001 From: Rob Maierle Date: Tue, 24 Mar 2026 16:35:24 -0400 Subject: [PATCH 3/5] MOD: Update Nasdaq Texas --- include/databento/publishers.hpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/include/databento/publishers.hpp b/include/databento/publishers.hpp index 28bb8af..10ca10a 100644 --- a/include/databento/publishers.hpp +++ b/include/databento/publishers.hpp @@ -71,7 +71,7 @@ enum class Venue : std::uint16_t { Mprl = 31, // Nasdaq Options Xndq = 32, - // Nasdaq BX Options + // Nasdaq Texas Options Xbxo = 33, // Cboe C2 Options C2Ox = 34, @@ -121,7 +121,7 @@ enum class Dataset : std::uint16_t { GlbxMdp3 = 1, // Nasdaq TotalView-ITCH XnasItch = 2, - // Nasdaq BX TotalView-ITCH + // Nasdaq Texas TotalView-ITCH XbosItch = 3, // Nasdaq PSX TotalView-ITCH XpsxItch = 4, @@ -207,7 +207,7 @@ enum class Publisher : std::uint16_t { GlbxMdp3Glbx = 1, // Nasdaq TotalView-ITCH XnasItchXnas = 2, - // Nasdaq BX TotalView-ITCH + // Nasdaq Texas TotalView-ITCH XbosItchXbos = 3, // Nasdaq PSX TotalView-ITCH XpsxItchXpsx = 4, @@ -267,7 +267,7 @@ enum class Publisher : std::uint16_t { OpraPillarMprl = 31, // OPRA - Nasdaq Options OpraPillarXndq = 32, - // OPRA - Nasdaq BX Options + // OPRA - Nasdaq Texas Options OpraPillarXbxo = 33, // OPRA - Cboe C2 Options OpraPillarC2Ox = 34, @@ -351,7 +351,7 @@ enum class Publisher : std::uint16_t { EqusAllEdga = 73, // Databento US Equities (All Feeds) - Cboe EDGX EqusAllEdgx = 74, - // Databento US Equities (All Feeds) - Nasdaq BX + // Databento US Equities (All Feeds) - Nasdaq Texas EqusAllXbos = 75, // Databento US Equities (All Feeds) - Nasdaq PSX EqusAllXpsx = 76, @@ -373,11 +373,11 @@ enum class Publisher : std::uint16_t { IfeuImpactXoff = 84, // ICE Endex - Off-Market Trades NdexImpactXoff = 85, - // Nasdaq NLS - Nasdaq BX + // Nasdaq NLS - Nasdaq Texas XnasNlsXbos = 86, // Nasdaq NLS - Nasdaq PSX XnasNlsXpsx = 87, - // Nasdaq Basic - Nasdaq BX + // Nasdaq Basic - Nasdaq Texas XnasBasicXbos = 88, // Nasdaq Basic - Nasdaq PSX XnasBasicXpsx = 89, From 49cf530243461e1669be699768e0a6b05d236537 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 30 Mar 2026 08:22:47 -0500 Subject: [PATCH 4/5] ADD: Add live heartbeat monitoring --- CHANGELOG.md | 6 ++ CMakeLists.txt | 5 +- cmake/Findzstd.cmake | 14 +++++ include/databento/detail/live_connection.hpp | 8 ++- include/databento/detail/tcp_client.hpp | 14 +++-- include/databento/exceptions.hpp | 15 +++++ include/databento/live_blocking.hpp | 6 +- src/detail/live_connection.cpp | 5 +- src/detail/tcp_client.cpp | 28 +++++++--- src/exceptions.cpp | 9 +++ src/live_blocking.cpp | 51 +++++++++++++++-- tests/src/live_blocking_tests.cpp | 59 +++++++++++++++++++- tests/src/live_threaded_tests.cpp | 4 +- tests/src/tcp_client_tests.cpp | 10 +++- 14 files changed, 203 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 72792da..cd5363e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,14 @@ ## 0.52.0 - TBD ### Enhancements +- Added client-side heartbeat timeout detection: `NextRecord` throws `HeartbeatTimeoutError` + if no data is received for `heartbeat_interval` + 5 seconds (defaults to 35 seconds) - Changed `SlowReaderBehavior::Skip` to send "skip" instead of "drop" to the gateway +### Breaking changes +- `NextRecord` now throws `LiveApiError` instead of `DbnResponseError` when the gateway + closes the session. Code catching `DbnResponseError` for this case should be updated + ## 0.51.0 - 2026-03-17 ### Enhancements diff --git a/CMakeLists.txt b/CMakeLists.txt index 01373d6..8e2a6de 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -202,13 +202,16 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_DATE) find_package(date REQUIRED) endif() else() - set(date_version 3.0.3) + set(date_version 3.0.4) + set(CMAKE_WARN_DEPRECATED OFF CACHE BOOL "" FORCE) FetchContent_Declare( date_src URL https://github.com/HowardHinnant/date/archive/refs/tags/v${date_version}.tar.gz DOWNLOAD_EXTRACT_TIMESTAMP TRUE ) + # Suppress deprecation warning from date's old cmake_minimum_required FetchContent_MakeAvailable(date_src) + set(CMAKE_WARN_DEPRECATED ON CACHE BOOL "" FORCE) # Ignore compiler warnings in headers add_system_include_property(date) endif() diff --git a/cmake/Findzstd.cmake b/cmake/Findzstd.cmake index 7bb95fc..8710d04 100644 --- a/cmake/Findzstd.cmake +++ b/cmake/Findzstd.cmake @@ -1,3 +1,17 @@ +# Try config mode first. If the system provides a zstd CMake config (e.g. Homebrew), +# use it directly so all targets are defined consistently. +find_package(zstd CONFIG QUIET) +if(zstd_FOUND) + if(NOT TARGET zstd::libzstd) + if(TARGET zstd::libzstd_shared) + add_library(zstd::libzstd ALIAS zstd::libzstd_shared) + elseif(TARGET zstd::libzstd_static) + add_library(zstd::libzstd ALIAS zstd::libzstd_static) + endif() + endif() + return() +endif() + include(FindPackageHandleStandardArgs) if(WIN32) diff --git a/include/databento/detail/live_connection.hpp b/include/databento/detail/live_connection.hpp index 3d74abe..ae389fd 100644 --- a/include/databento/detail/live_connection.hpp +++ b/include/databento/detail/live_connection.hpp @@ -13,12 +13,18 @@ #include "databento/ireadable.hpp" #include "databento/iwritable.hpp" +// Forward declare +namespace databento { +class ILogReceiver; +} + namespace databento::detail { // Manages the TCP connection to the live gateway with optionally compressed reads for // the DBN data. class LiveConnection : IWritable { public: - LiveConnection(const std::string& gateway, std::uint16_t port); + LiveConnection(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port); void WriteAll(std::string_view str); void WriteAll(const std::byte* buffer, std::size_t size); diff --git a/include/databento/detail/tcp_client.hpp b/include/databento/detail/tcp_client.hpp index ea2b764..13a9989 100644 --- a/include/databento/detail/tcp_client.hpp +++ b/include/databento/detail/tcp_client.hpp @@ -9,6 +9,11 @@ #include "databento/detail/scoped_fd.hpp" // ScopedFd #include "databento/ireadable.hpp" +// Forward declare +namespace databento { +class ILogReceiver; +} + namespace databento::detail { class TcpClient { public: @@ -17,8 +22,9 @@ class TcpClient { std::chrono::seconds max_wait{std::chrono::minutes{1}}; }; - TcpClient(const std::string& gateway, std::uint16_t port); - TcpClient(const std::string& gateway, std::uint16_t port, RetryConf retry_conf); + TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port); + TcpClient(ILogReceiver* log_receiver, const std::string& gateway, std::uint16_t port, + RetryConf retry_conf); void WriteAll(std::string_view str); void WriteAll(const std::byte* buffer, std::size_t size); @@ -32,8 +38,8 @@ class TcpClient { void Close(); private: - static ScopedFd InitSocket(const std::string& gateway, std::uint16_t port, - RetryConf retry_conf); + static ScopedFd InitSocket(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port, RetryConf retry_conf); ScopedFd socket_; }; diff --git a/include/databento/exceptions.hpp b/include/databento/exceptions.hpp index b10d0a7..88a9b78 100644 --- a/include/databento/exceptions.hpp +++ b/include/databento/exceptions.hpp @@ -6,6 +6,7 @@ #include // Error #include // json, parse_error +#include #include #include #include @@ -132,6 +133,20 @@ class DbnResponseError : public Exception { explicit DbnResponseError(std::string message) : Exception{std::move(message)} {} }; +// Exception indicating no data was received within the heartbeat timeout window. +class HeartbeatTimeoutError : public Exception { + public: + explicit HeartbeatTimeoutError(std::chrono::seconds elapsed) + : Exception{BuildMessage(elapsed)}, elapsed_{elapsed} {} + + std::chrono::seconds Elapsed() const { return elapsed_; } + + private: + static std::string BuildMessage(std::chrono::seconds elapsed); + + const std::chrono::seconds elapsed_; +}; + // Exception indicating something internal to the live API, but unrelated to TCP // went wrong. class LiveApiError : public Exception { diff --git a/include/databento/live_blocking.hpp b/include/databento/live_blocking.hpp index f6ef7ea..d8ae6ef 100644 --- a/include/databento/live_blocking.hpp +++ b/include/databento/live_blocking.hpp @@ -1,7 +1,7 @@ #pragma once #include -#include // milliseconds +#include // milliseconds, steady_clock #include #include #include @@ -119,6 +119,8 @@ class LiveBlocking { bool use_snapshot); IReadable::Result FillBuffer(std::chrono::milliseconds timeout); RecordHeader* BufferRecordHeader(); + std::chrono::milliseconds HeartbeatTimeout() const; + void CheckHeartbeatTimeout() const; static constexpr std::size_t kMaxStrLen = 24L * 1024; @@ -142,5 +144,7 @@ class LiveBlocking { alignas(RecordHeader) std::array compat_buffer_{}; std::uint64_t session_id_; Record current_record_{nullptr}; + std::chrono::steady_clock::time_point last_read_time_{ + std::chrono::steady_clock::now()}; }; } // namespace databento diff --git a/src/detail/live_connection.cpp b/src/detail/live_connection.cpp index c2b9d43..dd5a057 100644 --- a/src/detail/live_connection.cpp +++ b/src/detail/live_connection.cpp @@ -6,8 +6,9 @@ using databento::detail::LiveConnection; -LiveConnection::LiveConnection(const std::string& gateway, std::uint16_t port) - : client_{gateway, port} {} +LiveConnection::LiveConnection(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port) + : client_{log_receiver, gateway, port} {} void LiveConnection::WriteAll(std::string_view str) { client_.WriteAll(str); } diff --git a/src/detail/tcp_client.cpp b/src/detail/tcp_client.cpp index 9d39615..e1941fa 100644 --- a/src/detail/tcp_client.cpp +++ b/src/detail/tcp_client.cpp @@ -5,7 +5,7 @@ #else #include // addrinfo, gai_strerror, getaddrinfo, freeaddrinfo #include // htons, IPPROTO_TCP -#include // pollfd, POLLHUP +#include // pollfd #include // AF_INET, connect, recv, send, sockaddr, sockaddr_in, socket, SOCK_STREAM #include // close, ssize_t @@ -18,6 +18,7 @@ #include #include "databento/exceptions.hpp" // TcpError +#include "databento/log.hpp" // ILogReceiver using databento::detail::TcpClient; using Status = databento::IReadable::Status; @@ -32,12 +33,13 @@ int GetErrNo() { } } // namespace -TcpClient::TcpClient(const std::string& gateway, std::uint16_t port) - : TcpClient{gateway, port, {}} {} +TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port) + : TcpClient{log_receiver, gateway, port, {}} {} -TcpClient::TcpClient(const std::string& gateway, std::uint16_t port, - RetryConf retry_conf) - : socket_{InitSocket(gateway, port, retry_conf)} {} +TcpClient::TcpClient(ILogReceiver* log_receiver, const std::string& gateway, + std::uint16_t port, RetryConf retry_conf) + : socket_{InitSocket(log_receiver, gateway, port, retry_conf)} {} void TcpClient::WriteAll(std::string_view str) { WriteAll(reinterpret_cast(str.data()), str.length()); @@ -103,9 +105,12 @@ databento::IReadable::Result TcpClient::ReadSome(std::byte* buffer, void TcpClient::Close() { socket_.Close(); } -databento::detail::ScopedFd TcpClient::InitSocket(const std::string& gateway, +databento::detail::ScopedFd TcpClient::InitSocket(ILogReceiver* log_receiver, + const std::string& gateway, std::uint16_t port, RetryConf retry_conf) { + static constexpr auto kMethod = "TcpClient::TcpClient"; + const detail::Socket fd = ::socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if (fd == -1) { throw TcpError{::GetErrNo(), "Failed to create socket"}; @@ -121,7 +126,7 @@ databento::detail::ScopedFd TcpClient::InitSocket(const std::string& gateway, const auto ret = ::getaddrinfo(gateway.c_str(), std::to_string(port).c_str(), &hints, &out); if (ret != 0) { - throw InvalidArgumentError{"TcpClient::TcpClient", "addr", ::gai_strerror(ret)}; + throw InvalidArgumentError{kMethod, "addr", ::gai_strerror(ret)}; } std::unique_ptr res{out, &::freeaddrinfo}; const auto max_attempts = std::max(retry_conf.max_attempts, 1); @@ -134,7 +139,12 @@ databento::detail::ScopedFd TcpClient::InitSocket(const std::string& gateway, err_msg << "Socket failed to connect after " << max_attempts << " attempts"; throw TcpError{::GetErrNo(), err_msg.str()}; } - // TODO(cg): Log + std::ostringstream log_msg; + log_msg << '[' << kMethod << "] Connection attempt " << (attempt + 1) << " to " + << gateway << ':' << port << " failed, retrying in " << backoff.count() + << " seconds"; + log_receiver->Receive(LogLevel::Warning, log_msg.str()); + std::this_thread::sleep_for(backoff); backoff = (std::min)(backoff * 2, retry_conf.max_wait); } diff --git a/src/exceptions.cpp b/src/exceptions.cpp index 1fc5ada..14790c3 100644 --- a/src/exceptions.cpp +++ b/src/exceptions.cpp @@ -89,6 +89,15 @@ JsonResponseError JsonResponseError::TypeMismatch(std::string_view method_name, return JsonResponseError{err_msg.str()}; } +using databento::HeartbeatTimeoutError; + +std::string HeartbeatTimeoutError::BuildMessage(std::chrono::seconds elapsed) { + std::ostringstream err_msg; + err_msg << "Heartbeat timeout: no data received for " << elapsed.count() + << " seconds"; + return err_msg.str(); +} + using databento::LiveApiError; LiveApiError LiveApiError::UnexpectedMsg(std::string_view message, diff --git a/src/live_blocking.cpp b/src/live_blocking.cpp index 3e4b9e1..85101f4 100644 --- a/src/live_blocking.cpp +++ b/src/live_blocking.cpp @@ -24,6 +24,9 @@ using Status = databento::IReadable::Status; namespace { constexpr std::size_t kBucketIdLength = 5; +constexpr std::chrono::seconds kHeartbeatTimeoutMargin{5}; +constexpr auto kDefaultHeartbeatTimeout = + std::chrono::seconds{30} + kHeartbeatTimeoutMargin; } // namespace databento::LiveBuilder LiveBlocking::Builder() { return databento::LiveBuilder{}; } @@ -46,7 +49,7 @@ LiveBlocking::LiveBlocking( heartbeat_interval_{heartbeat_interval}, compression_{compression}, slow_reader_behavior_{slow_reader_behavior}, - connection_{gateway_, port_}, + connection_{log_receiver_, gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -68,7 +71,7 @@ LiveBlocking::LiveBlocking( heartbeat_interval_{heartbeat_interval}, compression_{compression}, slow_reader_behavior_{slow_reader_behavior}, - connection_{gateway_, port_}, + connection_{log_receiver_, gateway_, port_}, buffer_{buffer_size}, session_id_{this->Authenticate()} {} @@ -175,10 +178,22 @@ databento::Metadata LiveBlocking::Start() { buffer_.Shift(); version_ = metadata.version; metadata.Upgrade(upgrade_policy_); + last_read_time_ = std::chrono::steady_clock::now(); return metadata; } -const databento::Record& LiveBlocking::NextRecord() { return *NextRecord({}); } +const databento::Record& LiveBlocking::NextRecord() { + while (true) { + // Use heartbeat timeout as the effective poll timeout + const auto hb_timeout = HeartbeatTimeout(); + const auto* rec = NextRecord(hb_timeout); + if (rec) { + return *rec; + } + // throw if the heartbeat deadline has been exceeded + CheckHeartbeatTimeout(); + } +} const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds timeout) { // need some unread_bytes @@ -186,20 +201,22 @@ const databento::Record* LiveBlocking::NextRecord(std::chrono::milliseconds time if (unread_bytes == 0) { const auto read_res = FillBuffer(timeout); if (read_res.status == Status::Timeout) { + CheckHeartbeatTimeout(); return nullptr; } if (read_res.status == Status::Closed) { - throw DbnResponseError{"Gateway closed the session"}; + throw LiveApiError{"Gateway closed the session"}; } } // check length while (buffer_.ReadCapacity() < BufferRecordHeader()->Size()) { const auto read_res = FillBuffer(timeout); if (read_res.status == Status::Timeout) { + CheckHeartbeatTimeout(); return nullptr; } if (read_res.status == Status::Closed) { - throw DbnResponseError{"Gateway closed the session"}; + throw LiveApiError{"Gateway closed the session"}; } } current_record_ = Record{BufferRecordHeader()}; @@ -218,10 +235,11 @@ void LiveBlocking::Reconnect() { log_msg << "Reconnecting to " << gateway_ << ':' << port_; log_receiver_->Receive(LogLevel::Info, log_msg.str()); } - connection_ = detail::LiveConnection{gateway_, port_}; + connection_ = detail::LiveConnection{log_receiver_, gateway_, port_}; buffer_.Clear(); sub_counter_ = 0; session_id_ = this->Authenticate(); + last_read_time_ = std::chrono::steady_clock::now(); } void LiveBlocking::Resubscribe() { @@ -435,9 +453,30 @@ databento::IReadable::Result LiveBlocking::FillBuffer( const auto read_res = connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout); buffer_.Fill(read_res.read_size); + if (read_res.read_size > 0) { + last_read_time_ = std::chrono::steady_clock::now(); + } return read_res; } databento::RecordHeader* LiveBlocking::BufferRecordHeader() { return reinterpret_cast(buffer_.ReadBegin()); } + +std::chrono::milliseconds LiveBlocking::HeartbeatTimeout() const { + if (heartbeat_interval_) { + return std::chrono::milliseconds{*heartbeat_interval_ + kHeartbeatTimeoutMargin}; + } + return std::chrono::milliseconds{kDefaultHeartbeatTimeout}; +} + +void LiveBlocking::CheckHeartbeatTimeout() const { + const auto timeout = heartbeat_interval_.has_value() + ? *heartbeat_interval_ + kHeartbeatTimeoutMargin + : kDefaultHeartbeatTimeout; + const auto elapsed = std::chrono::steady_clock::now() - last_read_time_; + if (elapsed > timeout) { + throw HeartbeatTimeoutError{ + std::chrono::duration_cast(elapsed)}; + } +} diff --git a/tests/src/live_blocking_tests.cpp b/tests/src/live_blocking_tests.cpp index 5f1cf2b..fabcfc8 100644 --- a/tests/src/live_blocking_tests.cpp +++ b/tests/src/live_blocking_tests.cpp @@ -625,7 +625,7 @@ TEST_F(LiveBlockingTests, TestNextRecordThrowsOnGatewayClose) { std::unique_lock lock{has_closed_mutex}; has_closed_cv.wait(lock, [&has_closed] { return has_closed; }); } - ASSERT_THROW(target.NextRecord(), databento::DbnResponseError); + ASSERT_THROW(target.NextRecord(), databento::LiveApiError); } TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) { @@ -698,7 +698,7 @@ TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) { std::unique_lock lock{has_closed_mutex}; has_closed_cv.wait(lock, [&has_closed] { return has_closed; }); } - ASSERT_THROW(target.NextRecord(), databento::DbnResponseError); + ASSERT_THROW(target.NextRecord(), databento::LiveApiError); target.Reconnect(); target.Resubscribe(); ASSERT_EQ(target.Subscriptions().size(), 1); @@ -710,4 +710,59 @@ TEST_F(LiveBlockingTests, TestReconnectAndResubscribe) { ASSERT_TRUE(rec2.Holds()); ASSERT_EQ(rec2.Get(), kRec); } + +TEST_F(LiveBlockingTests, TestHeartbeatTimeoutOnNextRecord) { + constexpr auto kTsOut = false; + constexpr auto kHeartbeatInterval = std::chrono::seconds{1}; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, kHeartbeatInterval, [](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + // Let the heartbeat timeout trigger + std::this_thread::sleep_for(std::chrono::seconds{8}); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetHeartbeatInterval(kHeartbeatInterval) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + ASSERT_THROW(target.NextRecord(), databento::HeartbeatTimeoutError); +} + +TEST_F(LiveBlockingTests, TestHeartbeatTimeoutOnNextRecordWithTimeout) { + constexpr auto kTsOut = false; + constexpr auto kHeartbeatInterval = std::chrono::seconds{1}; + constexpr std::chrono::milliseconds kTimeout{50}; + const mock::MockLsgServer mock_server{ + dataset::kXnasItch, kTsOut, kHeartbeatInterval, [](mock::MockLsgServer& self) { + self.Accept(); + self.Authenticate(); + // Let the heartbeat timeout trigger + std::this_thread::sleep_for(std::chrono::seconds{8}); + }}; + + LiveBlocking target = builder_.SetDataset(dataset::kXnasItch) + .SetSendTsOut(kTsOut) + .SetHeartbeatInterval(kHeartbeatInterval) + .SetAddress(kLocalhost, mock_server.Port()) + .BuildBlocking(); + // With 50ms timeout and 1s+10s heartbeat interval, we should get nullptr + // returns initially, then eventually a heartbeat timeout exception + bool got_timeout_exception = false; + for (int i = 0; i < 500; ++i) { + try { + const auto* rec = target.NextRecord(kTimeout); + if (rec != nullptr) { + FAIL() << "Expected nullptr or exception, got a record"; + } + } catch (const databento::HeartbeatTimeoutError& e) { + EXPECT_NE(std::string{e.what()}.find("Heartbeat timeout"), std::string::npos); + got_timeout_exception = true; + break; + } + } + EXPECT_TRUE(got_timeout_exception) << "Expected heartbeat timeout exception"; +} + } // namespace databento::tests diff --git a/tests/src/live_threaded_tests.cpp b/tests/src/live_threaded_tests.cpp index a2df018..cd174ff 100644 --- a/tests/src/live_threaded_tests.cpp +++ b/tests/src/live_threaded_tests.cpp @@ -322,7 +322,7 @@ TEST_F(LiveThreadedTests, TestExceptionCallbackReconnectAndResubscribe) { kSType](const std::exception& exc) { ++exception_calls; if (exception_calls == 1) { - EXPECT_NE(dynamic_cast(&exc), nullptr) + EXPECT_NE(dynamic_cast(&exc), nullptr) << "Unexpected exception type"; target.Reconnect(); target.Resubscribe(); @@ -399,7 +399,7 @@ TEST_F(LiveThreadedTests, TestDeadlockPrevention) { }; const auto exception_cb = [&target, &metadata_cb, &record_cb, &kSymbols, kSchema, kSType](const std::exception& exc) { - EXPECT_NE(dynamic_cast(&exc), nullptr) + EXPECT_NE(dynamic_cast(&exc), nullptr) << "Unexpected exception type"; target.Reconnect(); target.Subscribe(kSymbols, kSchema, kSType); diff --git a/tests/src/tcp_client_tests.cpp b/tests/src/tcp_client_tests.cpp index e191b7b..063243b 100644 --- a/tests/src/tcp_client_tests.cpp +++ b/tests/src/tcp_client_tests.cpp @@ -9,14 +9,18 @@ #include "databento/detail/tcp_client.hpp" #include "databento/exceptions.hpp" +#include "databento/log.hpp" #include "mock/mock_tcp_server.hpp" namespace databento::tests { class TcpClientTests : public testing::Test { protected: TcpClientTests() - : testing::Test(), mock_server_{}, target_{"127.0.0.1", mock_server_.Port()} {} + : testing::Test(), + mock_server_{}, + target_{&logger_, "127.0.0.1", mock_server_.Port()} {} + NullLogReceiver logger_; mock::MockTcpServer mock_server_; detail::TcpClient target_; }; @@ -104,7 +108,7 @@ TEST_F(TcpClientTests, TestReadSomeTimeout) { server.Send(); server.Close(); }}; - target_ = {"127.0.0.1", mock_server.Port()}; + target_ = detail::TcpClient{&logger_, "127.0.0.1", mock_server.Port()}; std::array buffer{}; const auto res = @@ -123,7 +127,7 @@ TEST_F(TcpClientTests, TestReadCloseNoTimeout) { server.Accept(); server.Close(); }}; - target_ = {"127.0.0.1", mock_server.Port()}; + target_ = detail::TcpClient{&logger_, "127.0.0.1", mock_server.Port()}; constexpr std::chrono::milliseconds kTimeout{5}; From 30172d5276e682e16f1b9d1338a0a46174750524 Mon Sep 17 00:00:00 2001 From: Carter Green Date: Mon, 30 Mar 2026 16:56:12 -0500 Subject: [PATCH 5/5] VER: Release C++ client 0.52.0 --- CHANGELOG.md | 2 +- CMakeLists.txt | 2 +- pkg/PKGBUILD | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd5363e..7935fd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ # Changelog -## 0.52.0 - TBD +## 0.52.0 - 2026-03-31 ### Enhancements - Added client-side heartbeat timeout detection: `NextRecord` throws `HeartbeatTimeoutError` diff --git a/CMakeLists.txt b/CMakeLists.txt index 8e2a6de..0647dc6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2) project( databento - VERSION 0.51.0 + VERSION 0.52.0 LANGUAGES CXX DESCRIPTION "Official Databento client library" ) diff --git a/pkg/PKGBUILD b/pkg/PKGBUILD index 2bfc88e..f062726 100644 --- a/pkg/PKGBUILD +++ b/pkg/PKGBUILD @@ -1,7 +1,7 @@ # Maintainer: Databento _pkgname=databento-cpp pkgname=databento-cpp-git -pkgver=0.51.0 +pkgver=0.52.0 pkgrel=1 pkgdesc="Official C++ client for Databento" arch=('any')