Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,26 @@
# Changelog

## 0.52.0 - 2026-03-31

### Enhancements
- Added client-side heartbeat timeout detection: `NextRecord` throws `HeartbeatTimeoutError`
if no data is received for `heartbeat_interval` + 5 seconds (defaults to 35 seconds)
- Changed `SlowReaderBehavior::Skip` to send "skip" instead of "drop" to the gateway

### Breaking changes
- `NextRecord` now throws `LiveApiError` instead of `DbnResponseError` when the gateway
closes the session. Code catching `DbnResponseError` for this case should be updated

## 0.51.0 - 2026-03-17

### Enhancements
- Added support for `progress` field in `BatchJob` response
- Added blocking (pull-based) `TimeseriesGetRange` overloads that return a `DbnStore`
with `GetMetadata()` and `NextRecord()` methods, unifying the consumption pattern
across historical, live, and file sources
- Generalized `DbnFileStore` into `DbnStore` which accepts any `IReadable` source.
`DbnFileStore` is retained as a type alias for backwards compatibility
- Upgraded default `httplib` version to 0.37.2

## 0.50.0 - 2026-03-03

Expand Down
9 changes: 6 additions & 3 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.24..4.2)

project(
databento
VERSION 0.51.0
VERSION 0.52.0
LANGUAGES CXX
DESCRIPTION "Official Databento client library"
)
Expand Down Expand Up @@ -178,7 +178,7 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_HTTPLIB)
find_package(httplib REQUIRED)
endif()
else()
set(httplib_version 0.30.1)
set(httplib_version 0.37.2)
FetchContent_Declare(
httplib
URL https://github.com/yhirose/cpp-httplib/archive/refs/tags/v${httplib_version}.tar.gz
Expand All @@ -202,13 +202,16 @@ if(${PROJECT_NAME_UPPERCASE}_USE_EXTERNAL_DATE)
find_package(date REQUIRED)
endif()
else()
set(date_version 3.0.3)
set(date_version 3.0.4)
set(CMAKE_WARN_DEPRECATED OFF CACHE BOOL "" FORCE)
FetchContent_Declare(
date_src
URL https://github.com/HowardHinnant/date/archive/refs/tags/v${date_version}.tar.gz
DOWNLOAD_EXTRACT_TIMESTAMP TRUE
)
# Suppress deprecation warning from date's old cmake_minimum_required
FetchContent_MakeAvailable(date_src)
set(CMAKE_WARN_DEPRECATED ON CACHE BOOL "" FORCE)
# Ignore compiler warnings in headers
add_system_include_property(date)
endif()
Expand Down
22 changes: 9 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,19 +138,15 @@ namespace db = databento;

int main() {
auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build();
db::TsSymbolMap symbol_map;
auto decode_symbols = [&symbol_map](const db::Metadata& metadata) {
symbol_map = metadata.CreateSymbolMap();
};
auto print_trades = [&symbol_map](const db::Record& record) {
const auto& trade_msg = record.Get<db::TradeMsg>();
std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg
<< '\n';
return db::KeepGoing::Continue;
};
client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
{"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol,
db::SType::InstrumentId, {}, decode_symbols, print_trades);
auto store = client.TimeseriesGetRange(
"GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
{"ESM2", "NQZ2"}, db::Schema::Trades);
auto symbol_map = store.GetMetadata().CreateSymbolMap();
while (const auto* record = store.NextRecord()) {
const auto& trade_msg = record->Get<db::TradeMsg>();
std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": "
<< trade_msg << '\n';
}
}
```

Expand Down
14 changes: 14 additions & 0 deletions cmake/Findzstd.cmake
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Try config mode first. If the system provides a zstd CMake config (e.g. Homebrew),
# use it directly so all targets are defined consistently.
find_package(zstd CONFIG QUIET)
if(zstd_FOUND)
if(NOT TARGET zstd::libzstd)
if(TARGET zstd::libzstd_shared)
add_library(zstd::libzstd ALIAS zstd::libzstd_shared)
elseif(TARGET zstd::libzstd_static)
add_library(zstd::libzstd ALIAS zstd::libzstd_static)
endif()
endif()
return()
endif()

include(FindPackageHandleStandardArgs)

if(WIN32)
Expand Down
7 changes: 5 additions & 2 deletions cmake/SourcesAndHeaders.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(headers
include/databento/dbn_decoder.hpp
include/databento/dbn_encoder.hpp
include/databento/dbn_file_store.hpp
include/databento/dbn_store.hpp
include/databento/detail/buffer.hpp
include/databento/detail/dbn_buffer_decoder.hpp
include/databento/detail/http_client.hpp
Expand Down Expand Up @@ -39,7 +40,8 @@ set(headers
include/databento/v2.hpp
include/databento/v3.hpp
include/databento/with_ts_out.hpp
src/stream_op_helper.hpp
src/detail/http_stream_reader.hpp
src/detail/stream_op_helper.hpp
)

set(sources
Expand All @@ -49,10 +51,11 @@ set(sources
src/dbn_constants.hpp
src/dbn_decoder.cpp
src/dbn_encoder.cpp
src/dbn_file_store.cpp
src/dbn_store.cpp
src/detail/buffer.cpp
src/detail/dbn_buffer_decoder.cpp
src/detail/http_client.cpp
src/detail/http_stream_reader.cpp
src/detail/json_helpers.cpp
src/detail/live_connection.cpp
src/detail/scoped_fd.cpp
Expand Down
18 changes: 7 additions & 11 deletions examples/historical/readme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,13 @@ namespace db = databento;

int main() {
auto client = db::Historical::Builder().SetKey("$YOUR_API_KEY").Build();
db::TsSymbolMap symbol_map;
auto decode_symbols = [&symbol_map](const db::Metadata& metadata) {
symbol_map = metadata.CreateSymbolMap();
};
auto print_trades = [&symbol_map](const db::Record& record) {
const auto& trade_msg = record.Get<db::TradeMsg>();
auto store =
client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
{"ESM2", "NQZ2"}, db::Schema::Trades);
auto symbol_map = store.GetMetadata().CreateSymbolMap();
while (const auto* record = store.NextRecord()) {
const auto& trade_msg = record->Get<db::TradeMsg>();
std::cout << "Received trade for " << symbol_map.At(trade_msg) << ": " << trade_msg
<< '\n';
return db::KeepGoing::Continue;
};
client.TimeseriesGetRange("GLBX.MDP3", {"2022-06-10T14:30", "2022-06-10T14:40"},
{"ESM2", "NQZ2"}, db::Schema::Trades, db::SType::RawSymbol,
db::SType::InstrumentId, {}, decode_symbols, print_trades);
}
}
15 changes: 7 additions & 8 deletions examples/historical/timeseries_get_range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,14 @@ namespace db = databento;
int main() {
auto client = db::Historical::Builder().SetKeyFromEnv().Build();
const auto limit = 1000;
client.TimeseriesGetRange(
auto store = client.TimeseriesGetRange(
db::dataset::kGlbxMdp3, db::DateTimeRange<std::string>{"2022-10-03"}, {"ESZ2"},
db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit,
[](db::Metadata&& metadata) { std::cout << metadata << '\n'; },
[](const db::Record& record) {
const auto& trade_msg = record.Get<db::TradeMsg>();
std::cout << trade_msg << '\n';
return db::KeepGoing::Continue;
});
db::Schema::Trades, db::SType::RawSymbol, db::SType::InstrumentId, limit);
std::cout << store.GetMetadata() << '\n';
while (const auto* record = store.NextRecord()) {
const auto& trade_msg = record->Get<db::TradeMsg>();
std::cout << trade_msg << '\n';
}

return 0;
}
11 changes: 5 additions & 6 deletions examples/historical/timeseries_get_range_to_file.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "databento/constants.hpp"
#include "databento/datetime.hpp"
#include "databento/dbn_file_store.hpp"
#include "databento/dbn_store.hpp"
#include "databento/enums.hpp"
#include "databento/historical.hpp"
#include "databento/record.hpp"
Expand All @@ -10,14 +10,13 @@ namespace db = databento;
int main() {
auto client = db::Historical::Builder().SetKeyFromEnv().Build();
const auto limit = 1000;
db::DbnFileStore dbn_file_store = client.TimeseriesGetRangeToFile(
db::DbnStore dbn_store = client.TimeseriesGetRangeToFile(
db::dataset::kGlbxMdp3, {"2022-10-03T00:00", "2022-10-04T00:00"}, {"ESZ2"},
db::Schema::Ohlcv1M, db::SType::RawSymbol, db::SType::InstrumentId, limit,
"ESZ2-ohlcv1m-20201003-20201004.dbn.zst");
dbn_file_store.Replay([](const db::Record record) {
const auto& ohlcv_bar = record.Get<db::OhlcvMsg>();
while (const auto* record = dbn_store.NextRecord()) {
const auto& ohlcv_bar = record->Get<db::OhlcvMsg>();
std::cout << ohlcv_bar << '\n';
return db::KeepGoing::Continue;
});
}
return 0;
}
39 changes: 3 additions & 36 deletions include/databento/dbn_file_store.hpp
Original file line number Diff line number Diff line change
@@ -1,40 +1,7 @@
#pragma once

#include <filesystem> // path

#include "databento/dbn.hpp" // DecodeMetadata
#include "databento/dbn_decoder.hpp" // DbnDecoder
#include "databento/enums.hpp" // VersionUpgradePolicy
#include "databento/log.hpp"
#include "databento/record.hpp"
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback
#include "databento/dbn_store.hpp"

namespace databento {
// A reader for DBN files. This class provides both a callback API similar to
// TimeseriesGetRange in historical data and LiveThreaded for live data as well
// as a blocking API similar to that of LiveBlocking. Only one API should be
// used on a given instance.
class DbnFileStore {
public:
explicit DbnFileStore(const std::filesystem::path& file_path);
DbnFileStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path,
VersionUpgradePolicy upgrade_policy);

// Callback API: calling Replay consumes the input.
void Replay(const MetadataCallback& metadata_callback,
const RecordCallback& record_callback);
void Replay(const RecordCallback& record_callback);

// Blocking API
const Metadata& GetMetadata();
// Returns the next record or `nullptr` if there are no remaining records.
const Record* NextRecord();

private:
void MaybeDecodeMetadata();

DbnDecoder decoder_;
Metadata metadata_{};
bool has_decoded_metadata_{false};
};
} // namespace databento
using DbnFileStore = DbnStore;
}
43 changes: 43 additions & 0 deletions include/databento/dbn_store.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#pragma once

#include <filesystem> // path
#include <memory> // unique_ptr

#include "databento/dbn.hpp" // DecodeMetadata
#include "databento/dbn_decoder.hpp" // DbnDecoder
#include "databento/enums.hpp" // VersionUpgradePolicy
#include "databento/ireadable.hpp"
#include "databento/log.hpp"
#include "databento/record.hpp"
#include "databento/timeseries.hpp" // MetadataCallback, RecordCallback

namespace databento {
// A reader for DBN data from files or streams. This class provides both a callback API
// similar to `LiveThreaded` for live data as well as a blocking API similar to that of
// `LiveBlocking`. Only one API should be used on a given instance.
class DbnStore {
public:
explicit DbnStore(const std::filesystem::path& file_path);
DbnStore(ILogReceiver* log_receiver, const std::filesystem::path& file_path,
VersionUpgradePolicy upgrade_policy);
DbnStore(ILogReceiver* log_receiver, std::unique_ptr<IReadable> input,
VersionUpgradePolicy upgrade_policy);

// Callback API: calling Replay consumes the input.
void Replay(const MetadataCallback& metadata_callback,
const RecordCallback& record_callback);
void Replay(const RecordCallback& record_callback);

// Blocking API
const databento::Metadata& GetMetadata();
// Returns the next record or `nullptr` if there are no remaining records.
const Record* NextRecord();

private:
void MaybeDecodeMetadata();

DbnDecoder decoder_;
databento::Metadata metadata_{};
bool has_decoded_metadata_{false};
};
} // namespace databento
4 changes: 4 additions & 0 deletions include/databento/detail/http_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
#include <nlohmann/json.hpp>

#include <cstdint>
#include <memory> // unique_ptr
#include <string>

namespace databento {
class ILogReceiver;
class IReadable;
namespace detail {
class HttpClient {
public:
Expand All @@ -27,6 +29,8 @@ class HttpClient {
const httplib::ContentReceiver& callback);
void PostRawStream(const std::string& path, const httplib::Params& form_params,
const httplib::ContentReceiver& callback);
std::unique_ptr<IReadable> OpenPostStream(const std::string& path,
const httplib::Params& form_params);

private:
static bool IsErrorStatus(int status_code);
Expand Down
26 changes: 26 additions & 0 deletions include/databento/detail/http_stream_reader.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#pragma once

#include <httplib.h>

#include <chrono> // milliseconds
#include <cstddef> // byte, size_t

#include "databento/ireadable.hpp"

namespace databento::detail {
// Adapts an httplib StreamHandle to the IReadable interface, allowing
// HTTP response bodies to be read incrementally by DbnDecoder.
class HttpStreamReader : public IReadable {
public:
explicit HttpStreamReader(httplib::ClientImpl::StreamHandle handle);

void ReadExact(std::byte* buffer, std::size_t length) override;
std::size_t ReadSome(std::byte* buffer, std::size_t max_length) override;
// timeout is ignored; historical data is always immediately available or EOF
Result ReadSome(std::byte* buffer, std::size_t max_length,
std::chrono::milliseconds timeout) override;

private:
httplib::ClientImpl::StreamHandle handle_;
};
} // namespace databento::detail
8 changes: 7 additions & 1 deletion include/databento/detail/live_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@
#include "databento/ireadable.hpp"
#include "databento/iwritable.hpp"

// Forward declare
namespace databento {
class ILogReceiver;
}

namespace databento::detail {
// Manages the TCP connection to the live gateway with optionally compressed reads for
// the DBN data.
class LiveConnection : IWritable {
public:
LiveConnection(const std::string& gateway, std::uint16_t port);
LiveConnection(ILogReceiver* log_receiver, const std::string& gateway,
std::uint16_t port);

void WriteAll(std::string_view str);
void WriteAll(const std::byte* buffer, std::size_t size);
Expand Down
Loading
Loading