diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644
index 0000000..2f5223e
--- /dev/null
+++ b/.github/workflows/build.yml
@@ -0,0 +1,54 @@
+name: Build & Test
+
+on:
+ push:
+ branches: ['*']
+ pull_request:
+ branches: [master]
+
+jobs:
+ build:
+ runs-on: ubuntu-24.04
+ steps:
+ - name: Checkout
+ uses: actions/checkout@v2
+
+ - name: Liberate disk space
+ uses: jlumbroso/free-disk-space@main
+ with:
+ tool-cache: true
+ large-packages: false
+ docker-images: false
+ swap-storage: false
+
+ - name: Checkout Lean Same Branch
+ id: lean-same-branch
+ uses: actions/checkout@v2
+ continue-on-error: true
+ with:
+ ref: ${{ github.ref }}
+ repository: QuantConnect/Lean
+ path: Lean
+
+ - name: Checkout Lean Master
+ if: steps.lean-same-branch.outcome != 'success'
+ uses: actions/checkout@v2
+ with:
+ repository: QuantConnect/Lean
+ path: Lean
+
+ - name: Move Lean
+ run: mv Lean ../Lean
+
+ - uses: addnab/docker-run-action@v3
+ with:
+ image: quantconnect/lean:foundation
+ options: --workdir /__w/Lean.DataSource.SDK/Lean.DataSource.SDK -v /home/runner/work:/__w
+ shell: bash
+ run: |
+ # BuildDataSource
+ dotnet build ./QuantConnect.DataSource.csproj /p:Configuration=Release /v:quiet /p:WarningLevel=1 && \
+ # BuildTests
+ dotnet build ./tests/Tests.csproj /p:Configuration=Release /v:quiet /p:WarningLevel=1 && \
+ # Run Tests
+ dotnet test ./tests/bin/Release/net9.0/Tests.dll
diff --git a/.gitignore b/.gitignore
index c6127b3..bd0f10e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -50,3 +50,16 @@ modules.order
Module.symvers
Mkfile.old
dkms.conf
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+.vs/
+build/
+bld/
+[Bb]in/
+[Oo]bj/
diff --git a/Demonstration.cs b/Demonstration.cs
new file mode 100644
index 0000000..f5a1fe1
--- /dev/null
+++ b/Demonstration.cs
@@ -0,0 +1,82 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using QuantConnect.Algorithm;
+using QuantConnect.Data.Market;
+using QuantConnect.Interfaces;
+using QuantConnect;
+using QuantConnect.Data;
+using QuantConnect.Securities.Future;
+using QuantConnect.Util;
+using System;
+
+namespace QuantConnect.Algorithm.CSharp
+{
+ public class DatabentoFuturesTestAlgorithm : QCAlgorithm
+ {
+ private Future _es;
+
+ public override void Initialize()
+ {
+ Log("Algorithm Initialize");
+
+ SetStartDate(2025, 10, 1);
+ SetEndDate(2025, 10, 16);
+ SetCash(100000);
+
+ var exp = new DateTime(2025, 12, 19);
+ var symbol = QuantConnect.Symbol.CreateFuture("ES", Market.CME, exp);
+ _es = AddFutureContract(symbol, Resolution.Second, true, 1, true);
+ }
+
+ public override void OnData(Slice slice)
+ {
+ if (!slice.HasData)
+ {
+ Log("Slice has no data");
+ return;
+ }
+
+ Log($"OnData: Slice has {slice.Count} data points");
+
+ // For Tick resolution, check Ticks collection
+ if (slice.Ticks.ContainsKey(_es.Symbol))
+ {
+ var ticks = slice.Ticks[_es.Symbol];
+ Log($"Received {ticks.Count} ticks for {_es.Symbol}");
+
+ foreach (var tick in ticks)
+ {
+ if (tick.TickType == TickType.Trade)
+ {
+ Log($"Trade Tick - Price: {tick.Price}, Quantity: {tick.Quantity}, Time: {tick.Time}");
+ }
+ else if (tick.TickType == TickType.Quote)
+ {
+ Log($"Quote Tick - Bid: {tick.BidPrice}x{tick.BidSize}, Ask: {tick.AskPrice}x{tick.AskSize}, Time: {tick.Time}");
+ }
+ }
+ }
+
+ // These won't have data for Tick resolution
+ if (slice.Bars.ContainsKey(_es.Symbol))
+ {
+ var bar = slice.Bars[_es.Symbol];
+ Log($"Bar - O:{bar.Open} H:{bar.High} L:{bar.Low} C:{bar.Close} V:{bar.Volume}");
+ }
+ }
+ }
+}
diff --git a/DemonstrationUniverse.cs b/DemonstrationUniverse.cs
new file mode 100644
index 0000000..86a0d7b
--- /dev/null
+++ b/DemonstrationUniverse.cs
@@ -0,0 +1,28 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System.Linq;
+using QuantConnect.Data.UniverseSelection;
+using QuantConnect.DataSource;
+
+namespace QuantConnect.Algorithm.CSharp
+{
+ ///
+ /// Example algorithm using the custom data type as a source of alpha
+ ///
+ public class CustomDataUniverse : QCAlgorithm
+ { }
+}
\ No newline at end of file
diff --git a/Lean.DataSource.DataBento.sln b/Lean.DataSource.DataBento.sln
new file mode 100644
index 0000000..f2f03d6
--- /dev/null
+++ b/Lean.DataSource.DataBento.sln
@@ -0,0 +1,34 @@
+Microsoft Visual Studio Solution File, Format Version 12.00
+# Visual Studio Version 17
+VisualStudioVersion = 17.5.2.0
+MinimumVisualStudioVersion = 10.0.40219.1
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuantConnect.DataBento", "QuantConnect.DataBento\QuantConnect.DataSource.DataBento.csproj", "{367AEEDC-F0B3-7F47-539D-10E5EC242C2A}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "QuantConnect.DataBento.Tests", "QuantConnect.DataBento.Tests\QuantConnect.DataSource.DataBento.Tests.csproj", "{9CF47860-2CEA-F379-09D8-9AEF27965D12}"
+EndProject
+Global
+ GlobalSection(SolutionConfigurationPlatforms) = preSolution
+ Debug|Any CPU = Debug|Any CPU
+ Release|Any CPU = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(ProjectConfigurationPlatforms) = postSolution
+ {367AEEDC-F0B3-7F47-539D-10E5EC242C2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {367AEEDC-F0B3-7F47-539D-10E5EC242C2A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {367AEEDC-F0B3-7F47-539D-10E5EC242C2A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {367AEEDC-F0B3-7F47-539D-10E5EC242C2A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4B379C8F-16CE-1972-73E3-C14F6410D428}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4B379C8F-16CE-1972-73E3-C14F6410D428}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4B379C8F-16CE-1972-73E3-C14F6410D428}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4B379C8F-16CE-1972-73E3-C14F6410D428}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9CF47860-2CEA-F379-09D8-9AEF27965D12}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9CF47860-2CEA-F379-09D8-9AEF27965D12}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9CF47860-2CEA-F379-09D8-9AEF27965D12}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9CF47860-2CEA-F379-09D8-9AEF27965D12}.Release|Any CPU.Build.0 = Release|Any CPU
+ EndGlobalSection
+ GlobalSection(SolutionProperties) = preSolution
+ HideSolutionNode = FALSE
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ SolutionGuid = {CE272A88-90FF-4452-B402-D3AD0D08FB15}
+ EndGlobalSection
+EndGlobal
diff --git a/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs b/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs
new file mode 100644
index 0000000..d9d8071
--- /dev/null
+++ b/QuantConnect.DataBento.Tests/DataBentoDataDownloaderTests.cs
@@ -0,0 +1,183 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using System.Linq;
+using NUnit.Framework;
+using QuantConnect.Data;
+using QuantConnect.Data.Market;
+using QuantConnect.Lean.DataSource.DataBento;
+using QuantConnect.Logging;
+using QuantConnect.Configuration;
+
+namespace QuantConnect.Lean.DataSource.DataBento.Tests
+{
+ [TestFixture]
+ public class DataBentoDataDownloaderTests
+ {
+ private DataBentoDataDownloader _downloader;
+ private readonly string _apiKey = Config.Get("databento-api-key");
+
+ [SetUp]
+ public void SetUp()
+ {
+ _downloader = new DataBentoDataDownloader(_apiKey);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ _downloader?.Dispose();
+ }
+
+ [Test]
+ [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Daily, TickType.Trade)]
+ [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Hour, TickType.Trade)]
+ [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Minute, TickType.Trade)]
+ [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Second, TickType.Trade)]
+ [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Tick, TickType.Trade)]
+ [Explicit("This test requires a configured DataBento API key")]
+ public void DownloadsHistoricalData(string ticker, SecurityType securityType, string market, Resolution resolution, TickType tickType)
+ {
+ var symbol = Symbol.Create(ticker, securityType, market);
+ var startTime = new DateTime(2024, 1, 15);
+ var endTime = new DateTime(2024, 1, 16);
+ var param = new DataDownloaderGetParameters(symbol, resolution, startTime, endTime, tickType);
+
+ var downloadResponse = _downloader.Get(param).ToList();
+
+ Log.Trace($"Downloaded {downloadResponse.Count} data points for {symbol} at {resolution} resolution");
+
+ Assert.IsTrue(downloadResponse.Any(), "Expected to download at least one data point");
+
+ foreach (var data in downloadResponse)
+ {
+ Assert.IsNotNull(data, "Data point should not be null");
+ Assert.AreEqual(symbol, data.Symbol, "Symbol should match requested symbol");
+ Assert.IsTrue(data.Time >= startTime && data.Time <= endTime, "Data time should be within requested range");
+
+ if (data is TradeBar tradeBar)
+ {
+ Assert.Greater(tradeBar.Close, 0, "Close price should be positive");
+ Assert.GreaterOrEqual(tradeBar.Volume, 0, "Volume should be non-negative");
+ Assert.Greater(tradeBar.High, 0, "High price should be positive");
+ Assert.Greater(tradeBar.Low, 0, "Low price should be positive");
+ Assert.Greater(tradeBar.Open, 0, "Open price should be positive");
+ Assert.GreaterOrEqual(tradeBar.High, tradeBar.Low, "High should be >= Low");
+ Assert.GreaterOrEqual(tradeBar.High, tradeBar.Open, "High should be >= Open");
+ Assert.GreaterOrEqual(tradeBar.High, tradeBar.Close, "High should be >= Close");
+ Assert.LessOrEqual(tradeBar.Low, tradeBar.Open, "Low should be <= Open");
+ Assert.LessOrEqual(tradeBar.Low, tradeBar.Close, "Low should be <= Close");
+ }
+ else if (data is QuoteBar quoteBar)
+ {
+ Assert.Greater(quoteBar.Close, 0, "Quote close price should be positive");
+ if (quoteBar.Bid != null)
+ {
+ Assert.Greater(quoteBar.Bid.Close, 0, "Bid price should be positive");
+ }
+ if (quoteBar.Ask != null)
+ {
+ Assert.Greater(quoteBar.Ask.Close, 0, "Ask price should be positive");
+ }
+ }
+ else if (data is Tick tick)
+ {
+ Assert.Greater(tick.Value, 0, "Tick value should be positive");
+ Assert.GreaterOrEqual(tick.Quantity, 0, "Tick quantity should be non-negative");
+ }
+ }
+ }
+
+ [Test]
+ [TestCase("ZNM3", SecurityType.Future, Market.CME, Resolution.Daily, TickType.Trade)]
+ [TestCase("ZNM3", SecurityType.Future, Market.CME, Resolution.Hour, TickType.Trade)]
+ [Explicit("This test requires a configured DataBento API key")]
+ public void DownloadsFuturesHistoricalData(string ticker, SecurityType securityType, string market, Resolution resolution, TickType tickType)
+ {
+ var symbol = Symbol.Create(ticker, securityType, market);
+ var startTime = new DateTime(2024, 1, 15);
+ var endTime = new DateTime(2024, 1, 16);
+ var param = new DataDownloaderGetParameters(symbol, resolution, startTime, endTime, tickType);
+
+ var downloadResponse = _downloader.Get(param).ToList();
+
+ Log.Trace($"Downloaded {downloadResponse.Count} data points for futures {symbol}");
+
+ Assert.IsTrue(downloadResponse.Any(), "Expected to download futures data");
+
+ foreach (var data in downloadResponse)
+ {
+ Assert.AreEqual(symbol, data.Symbol, "Symbol should match requested futures symbol");
+ Assert.Greater(data.Value, 0, "Data value should be positive");
+ }
+ }
+
+ [Test]
+ [TestCase("ESM3", SecurityType.Future, Market.CME, Resolution.Tick, TickType.Quote)]
+ [Explicit("This test requires a configured DataBento API key and advanced subscription")]
+ public void DownloadsQuoteData(string ticker, SecurityType securityType, string market, Resolution resolution, TickType tickType)
+ {
+ var symbol = Symbol.Create(ticker, securityType, market);
+ var startTime = new DateTime(2024, 1, 15, 9, 30, 0);
+ var endTime = new DateTime(2024, 1, 15, 9, 45, 0);
+ var param = new DataDownloaderGetParameters(symbol, resolution, startTime, endTime, tickType);
+
+ var downloadResponse = _downloader.Get(param).ToList();
+
+ Log.Trace($"Downloaded {downloadResponse.Count} quote data points for {symbol}");
+
+ Assert.IsTrue(downloadResponse.Any(), "Expected to download quote data");
+
+ foreach (var data in downloadResponse)
+ {
+ Assert.AreEqual(symbol, data.Symbol, "Symbol should match requested symbol");
+ if (data is QuoteBar quoteBar)
+ {
+ Assert.IsTrue(quoteBar.Bid != null || quoteBar.Ask != null, "Quote should have bid or ask data");
+ }
+ }
+ }
+
+ [Test]
+ [Explicit("This test requires a configured DataBento API key")]
+ public void DataIsSortedByTime()
+ {
+ var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+ var startTime = new DateTime(2024, 1, 15);
+ var endTime = new DateTime(2024, 1, 16);
+ var param = new DataDownloaderGetParameters(symbol, Resolution.Minute, startTime, endTime, TickType.Trade);
+
+ var downloadResponse = _downloader.Get(param).ToList();
+
+ Assert.IsTrue(downloadResponse.Any(), "Expected to download data for time sorting test");
+
+ for (int i = 1; i < downloadResponse.Count; i++)
+ {
+ Assert.GreaterOrEqual(downloadResponse[i].Time, downloadResponse[i - 1].Time,
+ $"Data should be sorted by time. Item {i} time {downloadResponse[i].Time} should be >= item {i - 1} time {downloadResponse[i - 1].Time}");
+ }
+ }
+
+ [Test]
+ public void DisposesCorrectly()
+ {
+ var downloader = new DataBentoDataDownloader();
+ Assert.DoesNotThrow(() => downloader.Dispose(), "Dispose should not throw");
+ Assert.DoesNotThrow(() => downloader.Dispose(), "Multiple dispose calls should not throw");
+ }
+ }
+}
diff --git a/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs b/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs
new file mode 100644
index 0000000..2da5b8f
--- /dev/null
+++ b/QuantConnect.DataBento.Tests/DataBentoDataProviderHistoryTests.cs
@@ -0,0 +1,210 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using System.Linq;
+using NUnit.Framework;
+using QuantConnect.Data;
+using QuantConnect.Util;
+using QuantConnect.Lean.DataSource.DataBento;
+using QuantConnect.Securities;
+using System.Collections.Generic;
+using QuantConnect.Logging;
+using QuantConnect.Data.Market;
+using QuantConnect.Configuration;
+
+namespace QuantConnect.Lean.DataSource.DataBento.Tests
+{
+ [TestFixture]
+ public class DataBentoDataProviderHistoryTests
+ {
+ private DataBentoProvider _historyDataProvider;
+ private readonly string _apiKey = Config.Get("databento-api-key");
+
+ [SetUp]
+ public void SetUp()
+ {
+ _historyDataProvider = new DataBentoProvider(_apiKey);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ _historyDataProvider?.Dispose();
+ }
+
+ internal static IEnumerable TestParameters
+ {
+ get
+ {
+
+ // DataBento futures
+ var esMini = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+ var znNote = Symbol.Create("ZNM3", SecurityType.Future, Market.CME);
+ var gcGold = Symbol.Create("GCM3", SecurityType.Future, Market.CME);
+
+ // test cases for supported futures
+ yield return new TestCaseData(esMini, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), false)
+ .SetDescription("Valid ES futures - Daily resolution, 5 days period")
+ .SetCategory("Valid");
+
+ yield return new TestCaseData(esMini, Resolution.Hour, TickType.Trade, TimeSpan.FromDays(2), false)
+ .SetDescription("Valid ES futures - Hour resolution, 2 days period")
+ .SetCategory("Valid");
+
+ yield return new TestCaseData(esMini, Resolution.Minute, TickType.Trade, TimeSpan.FromHours(4), false)
+ .SetDescription("Valid ES futures - Minute resolution, 4 hours period")
+ .SetCategory("Valid");
+
+ yield return new TestCaseData(znNote, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(3), false)
+ .SetDescription("Valid ZN futures - Daily resolution, 3 days period")
+ .SetCategory("Valid");
+
+ yield return new TestCaseData(gcGold, Resolution.Hour, TickType.Trade, TimeSpan.FromDays(1), false)
+ .SetDescription("Valid GC futures - Hour resolution, 1 day period")
+ .SetCategory("Valid");
+
+ // Test cases for quote data (may require advanced subscription)
+ yield return new TestCaseData(esMini, Resolution.Tick, TickType.Quote, TimeSpan.FromMinutes(15), false)
+ .SetDescription("ES futures quote data - Tick resolution")
+ .SetCategory("Quote");
+
+ // Unsupported security types
+ var equity = Symbol.Create("SPY", SecurityType.Equity, Market.USA);
+ var option = Symbol.Create("SPY", SecurityType.Option, Market.USA);
+
+ yield return new TestCaseData(equity, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), true)
+ .SetDescription("Invalid - Equity not supported by DataBento")
+ .SetCategory("Invalid");
+
+ yield return new TestCaseData(option, Resolution.Daily, TickType.Trade, TimeSpan.FromDays(5), true)
+ .SetDescription("Invalid - Option not supported by DataBento")
+ .SetCategory("Invalid");
+ }
+ }
+
+ [Test, TestCaseSource(nameof(TestParameters))]
+ [Explicit("This test requires a configured DataBento API key")]
+ public void GetsHistory(Symbol symbol, Resolution resolution, TickType tickType, TimeSpan period, bool expectsNoData)
+ {
+ var request = GetHistoryRequest(resolution, tickType, symbol, period);
+
+ try
+ {
+ var slices = _historyDataProvider.GetHistory(request)?.Select(data => new Slice(data.Time, new[] { data }, data.Time.ConvertToUtc(request.DataTimeZone))).ToList();
+
+ if (expectsNoData)
+ {
+ Assert.IsTrue(slices == null || !slices.Any(),
+ $"Expected no data for unsupported symbol/security type: {symbol}");
+ }
+ else
+ {
+ Assert.IsNotNull(slices, "Expected to receive history data");
+
+ if (slices.Any())
+ {
+ Log.Trace($"Received {slices.Count} slices for {symbol} at {resolution} resolution");
+
+ foreach (var slice in slices.Take(5)) // Check first 5 slices
+ {
+ Assert.IsNotNull(slice, "Slice should not be null");
+ Assert.IsTrue(slice.Time >= request.StartTimeUtc && slice.Time <= request.EndTimeUtc,
+ "Slice time should be within requested range");
+
+ if (slice.Bars.ContainsKey(symbol))
+ {
+ var bar = slice.Bars[symbol];
+ Assert.Greater(bar.Close, 0, "Bar close price should be positive");
+ Assert.GreaterOrEqual(bar.Volume, 0, "Bar volume should be non-negative");
+ }
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"Error getting history for {symbol}: {ex.Message}");
+
+ if (!expectsNoData)
+ {
+ throw;
+ }
+ }
+ }
+
+ [Test]
+ [Explicit("This test requires a configured DataBento API key")]
+ public void GetHistoryWithMultipleSymbols()
+ {
+ var symbol1 = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+ var symbol2 = Symbol.Create("ZNM3", SecurityType.Future, Market.CME);
+
+ var request1 = GetHistoryRequest(Resolution.Daily, TickType.Trade, symbol1, TimeSpan.FromDays(3));
+ var request2 = GetHistoryRequest(Resolution.Daily, TickType.Trade, symbol2, TimeSpan.FromDays(3));
+
+ var history1 = _historyDataProvider.GetHistory(request1);
+ var history2 = _historyDataProvider.GetHistory(request2);
+
+ var allData = new List();
+ if (history1 != null) allData.AddRange(history1);
+ if (history2 != null) allData.AddRange(history2);
+
+ // timezone from the first request
+ var slices = allData.GroupBy(d => d.Time)
+ .Select(g => new Slice(g.Key, g.ToList(), g.Key.ConvertToUtc(request1.DataTimeZone)))
+ .ToList();
+
+ Assert.IsNotNull(slices, "Expected to receive history data for multiple symbols");
+
+ if (slices.Any())
+ {
+ Log.Trace($"Received {slices.Count} slices for multiple symbols");
+
+ var hasSymbol1Data = slices.Any(s => s.Bars.ContainsKey(symbol1));
+ var hasSymbol2Data = slices.Any(s => s.Bars.ContainsKey(symbol2));
+
+ Assert.IsTrue(hasSymbol1Data || hasSymbol2Data,
+ "Expected data for at least one of the requested symbols");
+ }
+ }
+
+ internal static HistoryRequest GetHistoryRequest(Resolution resolution, TickType tickType, Symbol symbol, TimeSpan period)
+ {
+ var utcNow = DateTime.UtcNow;
+ var dataType = LeanData.GetDataType(resolution, tickType);
+ var marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
+
+ var exchangeHours = marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType);
+ var dataTimeZone = marketHoursDatabase.GetDataTimeZone(symbol.ID.Market, symbol, symbol.SecurityType);
+
+ return new HistoryRequest(
+ startTimeUtc: utcNow.Add(-period),
+ endTimeUtc: utcNow,
+ dataType: dataType,
+ symbol: symbol,
+ resolution: resolution,
+ exchangeHours: exchangeHours,
+ dataTimeZone: dataTimeZone,
+ fillForwardResolution: resolution,
+ includeExtendedMarketHours: true,
+ isCustomData: false,
+ DataNormalizationMode.Raw,
+ tickType: tickType
+ );
+ }
+ }
+}
diff --git a/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs b/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs
new file mode 100644
index 0000000..7df71a4
--- /dev/null
+++ b/QuantConnect.DataBento.Tests/DataBentoRawLiveClientTests.cs
@@ -0,0 +1,253 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using NUnit.Framework;
+using QuantConnect.Data;
+using QuantConnect.Data.Market;
+using QuantConnect.Lean.DataSource.DataBento;
+using QuantConnect.Logging;
+using QuantConnect.Configuration;
+
+namespace QuantConnect.Lean.DataSource.DataBento.Tests
+{
+ [TestFixture]
+ public class DataBentoRawLiveClientTests
+ {
+ private DatabentoRawClient _client;
+ private readonly string _apiKey = Config.Get("databento-api-key");
+
+ [SetUp]
+ public void SetUp()
+ {
+ _client = new DatabentoRawClient(_apiKey);
+ }
+
+ [TearDown]
+ public void TearDown()
+ {
+ _client?.Dispose();
+ }
+
+ [Test]
+ [Explicit("This test requires a configured DataBento API key and live connection")]
+ public async Task ConnectsToGateway()
+ {
+ if (string.IsNullOrEmpty(_apiKey))
+ {
+ Assert.Ignore("DataBento API key not configured");
+ return;
+ }
+
+ var connected = await _client.ConnectAsync();
+
+ Assert.IsTrue(connected, "Should successfully connect to DataBento gateway");
+ Assert.IsTrue(_client.IsConnected, "IsConnected should return true after successful connection");
+
+ Log.Trace("Successfully connected to DataBento gateway");
+ }
+
+ [Test]
+ [Explicit("This test requires a configured DataBento API key and live connection")]
+ public async Task SubscribesToSymbol()
+ {
+ if (string.IsNullOrEmpty(_apiKey))
+ {
+ Assert.Ignore("DataBento API key not configured");
+ return;
+ }
+
+ var connected = await _client.ConnectAsync();
+ Assert.IsTrue(connected, "Must be connected to test subscription");
+
+ var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+ var subscribed = _client.Subscribe(symbol, Resolution.Minute, TickType.Trade);
+
+ Assert.IsTrue(subscribed, "Should successfully subscribe to symbol");
+
+ Log.Trace($"Successfully subscribed to {symbol}");
+
+ // Wait a moment to ensure subscription is active
+ await Task.Delay(2000);
+
+ var unsubscribed = _client.Unsubscribe(symbol);
+ Assert.IsTrue(unsubscribed, "Should successfully unsubscribe from symbol");
+
+ Log.Trace($"Successfully unsubscribed from {symbol}");
+ }
+
+ [Test]
+ [Explicit("This test requires a configured DataBento API key and live connection")]
+ public async Task ReceivesLiveData()
+ {
+ if (string.IsNullOrEmpty(_apiKey))
+ {
+ Assert.Ignore("DataBento API key not configured");
+ return;
+ }
+
+ var dataReceived = false;
+ var dataReceivedEvent = new ManualResetEventSlim(false);
+ BaseData receivedData = null;
+
+ _client.DataReceived += (sender, data) =>
+ {
+ receivedData = data;
+ dataReceived = true;
+ dataReceivedEvent.Set();
+ Log.Trace($"Received data: {data}");
+ };
+
+ var connected = await _client.ConnectAsync();
+ Assert.IsTrue(connected, "Must be connected to test data reception");
+
+ var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+ var subscribed = _client.Subscribe(symbol, Resolution.Tick, TickType.Trade);
+ Assert.IsTrue(subscribed, "Must be subscribed to receive data");
+
+ // Wait for data with timeout
+ var dataReceiptTimeout = TimeSpan.FromMinutes(2);
+ var receivedWithinTimeout = dataReceivedEvent.Wait(dataReceiptTimeout);
+
+ if (receivedWithinTimeout)
+ {
+ Assert.IsTrue(dataReceived, "Should have received data");
+ Assert.IsNotNull(receivedData, "Received data should not be null");
+ Assert.AreEqual(symbol, receivedData.Symbol, "Received data symbol should match subscription");
+ Assert.Greater(receivedData.Value, 0, "Received data value should be positive");
+
+ Log.Trace($"Successfully received live data: {receivedData}");
+ }
+ else
+ {
+ Log.Trace("No data received within timeout period - this may be expected during non-market hours");
+ }
+
+ _client.Unsubscribe(symbol);
+ }
+
+ [Test]
+ [Explicit("This test requires a configured DataBento API key and live connection")]
+ public async Task HandlesConnectionEvents()
+ {
+ if (string.IsNullOrEmpty(_apiKey))
+ {
+ Assert.Ignore("DataBento API key not configured");
+ return;
+ }
+
+ var connectionStatusChanged = false;
+ var connectionStatusEvent = new ManualResetEventSlim(false);
+
+ _client.ConnectionStatusChanged += (sender, isConnected) =>
+ {
+ connectionStatusChanged = true;
+ connectionStatusEvent.Set();
+ Log.Trace($"Connection status changed: {isConnected}");
+ };
+
+ var connected = await _client.ConnectAsync();
+ Assert.IsTrue(connected, "Should connect successfully");
+
+ // Connection status event should fire on connect
+ var eventFiredWithinTimeout = connectionStatusEvent.Wait(TimeSpan.FromSeconds(10));
+ Assert.IsTrue(eventFiredWithinTimeout || connectionStatusChanged,
+ "Connection status changed event should fire");
+
+ _client.Disconnect();
+ Assert.IsFalse(_client.IsConnected, "Should be disconnected after calling Disconnect()");
+ }
+
+ [Test]
+ public void HandlesInvalidApiKey()
+ {
+ var invalidClient = new DatabentoRawClient("invalid-api-key");
+
+ // Connection with invalid API key should fail gracefully
+ Assert.DoesNotThrowAsync(async () =>
+ {
+ var connected = await invalidClient.ConnectAsync();
+ Assert.IsFalse(connected, "Connection should fail with invalid API key");
+ });
+
+ invalidClient.Dispose();
+ }
+
+ [Test]
+ public void DisposesCorrectly()
+ {
+ var client = new DatabentoRawClient(_apiKey);
+ Assert.DoesNotThrow(() => client.Dispose(), "Dispose should not throw");
+ Assert.DoesNotThrow(() => client.Dispose(), "Multiple dispose calls should not throw");
+ }
+
+ [Test]
+ public void SymbolMappingWorksCorrectly()
+ {
+ // Test that futures are mapped correctly to DataBento format
+ var esFuture = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+
+ // Since the mapping method is private, we test indirectly through subscription
+ Assert.DoesNotThrowAsync(async () =>
+ {
+ if (!string.IsNullOrEmpty(_apiKey))
+ {
+ var connected = await _client.ConnectAsync();
+ if (connected)
+ {
+ _client.Subscribe(esFuture, Resolution.Minute, TickType.Trade);
+ _client.Unsubscribe(esFuture);
+ }
+ }
+ });
+ }
+
+ [Test]
+ public void SchemaResolutionMappingWorksCorrectly()
+ {
+ // Test that resolution mappings work correctly
+ var symbol = Symbol.Create("ESM3", SecurityType.Future, Market.CME);
+
+ Assert.DoesNotThrowAsync(async () =>
+ {
+ if (!string.IsNullOrEmpty(_apiKey))
+ {
+ var connected = await _client.ConnectAsync();
+ if (connected)
+ {
+ // Test different resolutions
+ _client.Subscribe(symbol, Resolution.Tick, TickType.Trade);
+ _client.Unsubscribe(symbol);
+
+ _client.Subscribe(symbol, Resolution.Second, TickType.Trade);
+ _client.Unsubscribe(symbol);
+
+ _client.Subscribe(symbol, Resolution.Minute, TickType.Trade);
+ _client.Unsubscribe(symbol);
+
+ _client.Subscribe(symbol, Resolution.Hour, TickType.Trade);
+ _client.Unsubscribe(symbol);
+
+ _client.Subscribe(symbol, Resolution.Daily, TickType.Trade);
+ _client.Unsubscribe(symbol);
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj
new file mode 100644
index 0000000..8e212ee
--- /dev/null
+++ b/QuantConnect.DataBento.Tests/QuantConnect.DataSource.DataBento.Tests.csproj
@@ -0,0 +1,35 @@
+
+
+ net9.0
+ QuantConnect.DataLibrary.Tests
+
+
+
+
+
+
+
+
+
+ all
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ PreserveNewest
+
+
+
diff --git a/QuantConnect.DataBento.Tests/TestSetup.cs b/QuantConnect.DataBento.Tests/TestSetup.cs
new file mode 100644
index 0000000..b0e07c1
--- /dev/null
+++ b/QuantConnect.DataBento.Tests/TestSetup.cs
@@ -0,0 +1,83 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ *
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+using System;
+using System.Collections;
+using System.IO;
+using NUnit.Framework;
+using QuantConnect.Configuration;
+using QuantConnect.Logging;
+
+namespace QuantConnect.Lean.DataSource.DataBento.Tests
+{
+ [SetUpFixture]
+ public class TestSetup
+ {
+ [OneTimeSetUp]
+ public void GlobalSetup()
+ {
+ Log.DebuggingEnabled = true;
+ Log.LogHandler = new CompositeLogHandler();
+ Log.Trace("TestSetup(): starting...");
+ ReloadConfiguration();
+ }
+
+ private static void ReloadConfiguration()
+ {
+ // nunit 3 sets the current folder to a temp folder we need it to be the test bin output folder
+ var dir = TestContext.CurrentContext.TestDirectory;
+ Environment.CurrentDirectory = dir;
+ Directory.SetCurrentDirectory(dir);
+ // reload config from current path
+ Config.Reset();
+
+ var environment = Environment.GetEnvironmentVariables();
+ foreach (DictionaryEntry entry in environment)
+ {
+ var envKey = entry.Key.ToString();
+ var value = entry.Value.ToString();
+
+ if (envKey.StartsWith("QC_"))
+ {
+ var key = envKey.Substring(3).Replace("_", "-").ToLower();
+
+ Log.Trace($"TestSetup(): Updating config setting '{key}' from environment var '{envKey}'");
+ Config.Set(key, value);
+ }
+ }
+
+ // resets the version among other things
+ Globals.Reset();
+ }
+
+ private static void SetUp()
+ {
+ Log.LogHandler = new CompositeLogHandler();
+ Log.Trace("TestSetup(): starting...");
+ ReloadConfiguration();
+ Log.DebuggingEnabled = Config.GetBool("debug-mode");
+ }
+
+ private static TestCaseData[] TestParameters
+ {
+ get
+ {
+ SetUp();
+ return new [] { new TestCaseData() };
+ }
+ }
+ }
+}
diff --git a/QuantConnect.DataBento.Tests/config.json b/QuantConnect.DataBento.Tests/config.json
new file mode 100644
index 0000000..ff861ca
--- /dev/null
+++ b/QuantConnect.DataBento.Tests/config.json
@@ -0,0 +1,9 @@
+{
+ "data-folder":"../../../../../Lean/Data/",
+
+ "job-user-id": "0",
+ "api-access-token": "",
+ "job-organization-id": "",
+
+ "databento-api-key":""
+}
\ No newline at end of file
diff --git a/QuantConnect.DataBento/DataBentoDataDownloader.cs b/QuantConnect.DataBento/DataBentoDataDownloader.cs
new file mode 100644
index 0000000..c8a1783
--- /dev/null
+++ b/QuantConnect.DataBento/DataBentoDataDownloader.cs
@@ -0,0 +1,307 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using System.IO;
+using System.Text;
+using System.Net.Http;
+using System.Net.Http.Headers;
+using System.Globalization;
+using System.Collections.Generic;
+using CsvHelper;
+using CsvHelper.Configuration.Attributes;
+using QuantConnect.Data;
+using QuantConnect.Data.Market;
+using QuantConnect.Util;
+using QuantConnect.Configuration;
+using QuantConnect.Interfaces;
+using QuantConnect.Securities;
+
+namespace QuantConnect.Lean.DataSource.DataBento
+{
+ ///
+ /// Data downloader class for pulling data from Data Provider
+ ///
+ public class DataBentoDataDownloader : IDataDownloader, IDisposable
+ {
+ ///
+ private readonly HttpClient _httpClient;
+
+ private readonly string _apiKey;
+
+ private const decimal PriceScaleFactor = 1e-9m;
+
+ ///
+ /// Initializes a new instance of the
+ ///
+ public DataBentoDataDownloader(string apiKey)
+ {
+ _apiKey = apiKey;
+ _httpClient = new HttpClient();
+
+ // Set up HTTP Basic Authentication
+ var credentials = Convert.ToBase64String(Encoding.ASCII.GetBytes($"{_apiKey}:"));
+ _httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Basic", credentials);
+ }
+
+ public DataBentoDataDownloader()
+ : this(Config.Get("databento-api-key"))
+ {
+ }
+
+ ///
+ /// Get historical data enumerable for a single symbol, type and resolution given this start and end time (in UTC).
+ ///
+ /// Parameters for the historical data request
+ /// Enumerable of base data for this symbol
+ ///
+ public IEnumerable Get(DataDownloaderGetParameters parameters)
+ {
+ var symbol = parameters.Symbol;
+ var resolution = parameters.Resolution;
+ var startUtc = parameters.StartUtc;
+ var endUtc = parameters.EndUtc;
+ var tickType = parameters.TickType;
+
+ var dataset = "GLBX.MDP3"; // hard coded for now. Later on can add equities and options with different mapping
+ var schema = GetSchema(resolution, tickType);
+ var dbSymbol = MapSymbolToDataBento(symbol);
+
+ // prepare body for Raw HTTP request
+ var body = new StringBuilder();
+ body.Append($"dataset={dataset}");
+ body.Append($"&symbols={dbSymbol}");
+ body.Append($"&schema={schema}");
+ body.Append($"&start={startUtc:yyyy-MM-ddTHH:mm}");
+ body.Append($"&end={endUtc:yyyy-MM-ddTHH:mm}");
+ body.Append("&stype_in=parent");
+ body.Append("&encoding=csv");
+
+ var request = new HttpRequestMessage(
+ HttpMethod.Post,
+ "https://hist.databento.com/v0/timeseries.get_range")
+ {
+ Content = new StringContent(body.ToString(), Encoding.UTF8, "application/x-www-form-urlencoded")
+ };
+
+ // send the request with the get range url
+ var response = _httpClient.Send(request);
+
+ // Add error handling to see the actual error message
+ if (!response.IsSuccessStatusCode)
+ {
+ var errorContent = response.Content.ReadAsStringAsync().Result;
+ throw new HttpRequestException($"DataBento API error ({response.StatusCode}): {errorContent}");
+ }
+
+ response.EnsureSuccessStatusCode();
+
+ using var stream = response.Content.ReadAsStream();
+ using var reader = new StreamReader(stream);
+ using var csv = new CsvReader(reader, CultureInfo.InvariantCulture);
+
+ if (tickType == TickType.Trade)
+ {
+ if (resolution == Resolution.Tick)
+ {
+ // For tick data, use the trades schema which returns individual trades
+ foreach (var record in csv.GetRecords())
+ {
+ yield return new Tick
+ {
+ Time = record.Timestamp,
+ Symbol = symbol,
+ Value = record.Price,
+ Quantity = record.Size
+ };
+ }
+ }
+ else
+ {
+ // For aggregated data, use the ohlcv schema which returns bars
+ foreach (var record in csv.GetRecords())
+ {
+ yield return new TradeBar
+ {
+ Symbol = symbol,
+ Time = record.Timestamp,
+ Open = record.Open,
+ High = record.High,
+ Low = record.Low,
+ Close = record.Close,
+ Volume = record.Volume
+ };
+ }
+ }
+ }
+ else if (tickType == TickType.Quote)
+ {
+ foreach (var record in csv.GetRecords())
+ {
+ var bidPrice = record.BidPrice * PriceScaleFactor;
+ var askPrice = record.AskPrice * PriceScaleFactor;
+
+ if (resolution == Resolution.Tick)
+ {
+ yield return new Tick
+ {
+ Time = record.Timestamp,
+ Symbol = symbol,
+ AskPrice = askPrice,
+ BidPrice = bidPrice,
+ AskSize = record.AskSize,
+ BidSize = record.BidSize,
+ TickType = TickType.Quote
+ };
+ }
+ else
+ {
+ var bidBar = new Bar(bidPrice, bidPrice, bidPrice, bidPrice);
+ var askBar = new Bar(askPrice, askPrice, askPrice, askPrice);
+ yield return new QuoteBar(
+ record.Timestamp,
+ symbol,
+ bidBar,
+ record.BidSize,
+ askBar,
+ record.AskSize
+ );
+ }
+ }
+ }
+ }
+
+ ///
+ /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
+ ///
+ public void Dispose()
+ {
+ _httpClient?.DisposeSafely();
+ }
+
+ ///
+ /// Pick Databento schema from Lean resolution/ticktype
+ ///
+ private string GetSchema(Resolution resolution, TickType tickType)
+ {
+ if (tickType == TickType.Trade)
+ {
+ if (resolution == Resolution.Tick)
+ return "trades";
+ if (resolution == Resolution.Second)
+ return "ohlcv-1s";
+ if (resolution == Resolution.Minute)
+ return "ohlcv-1m";
+ if (resolution == Resolution.Hour)
+ return "ohlcv-1h";
+ if (resolution == Resolution.Daily)
+ return "ohlcv-1d";
+ }
+ else if (tickType == TickType.Quote)
+ {
+ // top of book
+ if (resolution == Resolution.Tick || resolution == Resolution.Second || resolution == Resolution.Minute || resolution == Resolution.Hour || resolution == Resolution.Daily)
+ return "mbp-1";
+ }
+
+ throw new NotSupportedException($"Unsupported resolution {resolution} / {tickType}");
+ }
+
+ ///
+ /// Maps a LEAN symbol to DataBento symbol format
+ ///
+ private string MapSymbolToDataBento(Symbol symbol)
+ {
+ if (symbol.SecurityType == SecurityType.Future)
+ {
+ // For DataBento, use the root symbol with .FUT suffix for parent subscription
+ // ES19Z25 -> ES.FUT
+ var value = symbol.Value;
+
+ // Extract root by removing digits and month codes
+ var root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray());
+
+ return $"{root}.FUT";
+ }
+
+ return symbol.Value;
+ }
+
+ /// Class for parsing trade data from Databento
+ /// Really used as a map from the http request to then get it in QC data structures
+ private class DatabentoBar
+ {
+ [Name("ts_event")]
+ public long TimestampNanos { get; set; }
+
+ public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000)
+ .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime;
+
+ [Name("open")]
+ public decimal Open { get; set; }
+
+ [Name("high")]
+ public decimal High { get; set; }
+
+ [Name("low")]
+ public decimal Low { get; set; }
+
+ [Name("close")]
+ public decimal Close { get; set; }
+
+ [Name("volume")]
+ public decimal Volume { get; set; }
+ }
+
+ private class DatabentoTrade
+ {
+ [Name("ts_event")]
+ public long TimestampNanos { get; set; }
+
+ public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000)
+ .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime;
+
+ [Name("price")]
+ public long PriceRaw { get; set; }
+
+ public decimal Price => PriceRaw * PriceScaleFactor;
+
+ [Name("size")]
+ public int Size { get; set; }
+ }
+
+ private class DatabentoQuote
+ {
+ [Name("ts_event")]
+ public long TimestampNanos { get; set; }
+
+ public DateTime Timestamp => DateTimeOffset.FromUnixTimeSeconds(TimestampNanos / 1_000_000_000)
+ .AddTicks((TimestampNanos % 1_000_000_000) / 100).UtcDateTime;
+
+ [Name("bid_px_00")]
+ public long BidPrice { get; set; }
+
+ [Name("bid_sz_00")]
+ public int BidSize { get; set; }
+
+ [Name("ask_px_00")]
+ public long AskPrice { get; set; }
+
+ [Name("ask_sz_00")]
+ public int AskSize { get; set; }
+ }
+ }
+}
diff --git a/QuantConnect.DataBento/DataBentoDataProvider.cs b/QuantConnect.DataBento/DataBentoDataProvider.cs
new file mode 100644
index 0000000..f1ed3c0
--- /dev/null
+++ b/QuantConnect.DataBento/DataBentoDataProvider.cs
@@ -0,0 +1,489 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using NodaTime;
+using QuantConnect.Data;
+using QuantConnect.Util;
+using QuantConnect.Interfaces;
+using System.Collections.Generic;
+using QuantConnect.Configuration;
+using QuantConnect.Logging;
+using QuantConnect.Packets;
+using QuantConnect.Securities;
+using System.Threading.Tasks;
+using QuantConnect.Data.Market;
+
+namespace QuantConnect.Lean.DataSource.DataBento
+{
+ ///
+ /// Implementation of Custom Data Provider
+ ///
+ public class DataBentoProvider : IDataQueueHandler
+ {
+ ///
+ ///
+ ///
+ private readonly IDataAggregator _dataAggregator = Composer.Instance.GetExportedValueByTypeName(
+ Config.Get("data-aggregator", "QuantConnect.Lean.Engine.DataFeeds.AggregationManager"), forceTypeNameOnExisting: false);
+
+ ///
+ ///
+ ///
+ private EventBasedDataQueueHandlerSubscriptionManager _subscriptionManager = null!;
+
+ private readonly List _activeSubscriptionConfigs = new();
+
+ private readonly System.Collections.Concurrent.ConcurrentDictionary _subscriptionConfigs = new();
+
+ ///
+ ///
+ ///
+ private DatabentoRawClient _client = null!;
+
+ ///
+ /// DataBento API key
+ ///
+ private readonly string _apiKey;
+
+ ///
+ /// DataBento historical data downloader
+ ///
+ private readonly DataBentoDataDownloader _dataDownloader;
+
+ private bool _unsupportedSecurityTypeMessageLogged;
+ private bool _unsupportedDataTypeMessageLogged;
+ private bool _potentialUnsupportedResolutionMessageLogged;
+
+ private bool _sessionStarted = false;
+ private readonly object _sessionLock = new object();
+
+ private readonly MarketHoursDatabase _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
+ private readonly System.Collections.Concurrent.ConcurrentDictionary _symbolExchangeTimeZones = new();
+
+ ///
+ /// Returns true if we're currently connected to the Data Provider
+ ///
+ public bool IsConnected => _client?.IsConnected == true;
+
+ ///
+ /// Initializes a new instance of the DataBentoProvider
+ ///
+ public DataBentoProvider()
+ {
+ Log.Trace("From Plugin DataBentoProvider.DataBentoProvider() being initialized 1");
+ _apiKey = Config.Get("databento-api-key");
+ if (string.IsNullOrEmpty(_apiKey))
+ {
+ throw new ArgumentException("DataBento API key is required. Set 'databento-api-key' in configuration.");
+ }
+
+ _dataDownloader = new DataBentoDataDownloader(_apiKey);
+ Initialize();
+ }
+
+ ///
+ /// Initializes a new instance of the DataBentoProvider with custom API key
+ ///
+ /// DataBento API key
+ public DataBentoProvider(string apiKey)
+ {
+ Log.Trace("From Plugin DataBentoProvider.DataBentoProvider() being initialized 2");
+ _apiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey));
+ _dataDownloader = new DataBentoDataDownloader(_apiKey);
+ Initialize();
+ }
+
+ ///
+ /// Common initialization logic
+ ///
+ private void Initialize()
+ {
+ Log.Trace("DataBentoProvider.Initialize(): Starting initialization");
+
+ _subscriptionManager = new EventBasedDataQueueHandlerSubscriptionManager();
+ _subscriptionManager.SubscribeImpl = (symbols, tickType) =>
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): Received subscription request for {symbols.Count()} symbols, TickType={tickType}");
+
+ foreach (var symbol in symbols)
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): Processing symbol {symbol}");
+
+ if (_subscriptionConfigs.TryGetValue(symbol, out var config))
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): Found config for {symbol}, Resolution={config.Resolution}, TickType={config.TickType}");
+
+ if (_client?.IsConnected == true)
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): Client is connected, attempting async subscribe for {symbol}");
+ Task.Run(async () =>
+ {
+ // If the requested resolution is higher than tick, we subscribe to ticks and let the aggregator handle it.
+ var resolutionToSubscribe = config.Resolution > Resolution.Tick ? Resolution.Tick : config.Resolution;
+ var success = _client.Subscribe(config.Symbol, resolutionToSubscribe, config.TickType);
+ if (success)
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): Successfully subscribed to {config.Symbol} at {resolutionToSubscribe} resolution");
+
+ // Start session after first successful subscription
+ lock (_sessionLock)
+ {
+ if (!_sessionStarted)
+ {
+ Log.Trace("DataBentoProvider.SubscribeImpl(): Starting DataBento session to receive data");
+ _sessionStarted = _client.StartSession();
+ if (_sessionStarted)
+ {
+ Log.Trace("DataBentoProvider.SubscribeImpl(): Session started successfully - data should begin flowing");
+ }
+ else
+ {
+ Log.Error("DataBentoProvider.SubscribeImpl(): Failed to start session");
+ }
+ }
+ }
+ }
+ else
+ {
+ Log.Error($"DataBentoProvider.SubscribeImpl(): Failed to subscribe to live data for {config.Symbol}");
+ }
+ });
+ }
+ else
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): Client not connected, skipping subscription for {symbol}");
+ }
+ }
+ else
+ {
+ Log.Trace($"DataBentoProvider.SubscribeImpl(): No config found for {symbol}, skipping");
+ }
+ }
+
+ return true;
+ };
+
+ _subscriptionManager.UnsubscribeImpl = (symbols, tickType) =>
+ {
+ Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Received unsubscribe request for {symbols.Count()} symbols, TickType={tickType}");
+
+ foreach (var symbol in symbols)
+ {
+ Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Processing symbol {symbol}");
+
+ if (_client?.IsConnected == true)
+ {
+ Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Client is connected, unsubscribing from {symbol}");
+ Task.Run(() =>
+ {
+ _client.Unsubscribe(symbol);
+ Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Unsubscribed from {symbol}");
+ });
+ }
+ else
+ {
+ Log.Trace($"DataBentoProvider.UnsubscribeImpl(): Client not connected, skipping unsubscribe for {symbol}");
+ }
+ }
+
+ return true;
+ };
+
+ // Initialize the live client
+ Log.Trace("DataBentoProvider.Initialize(): Creating DatabentoRawClient");
+ _client = new DatabentoRawClient(_apiKey);
+ _client.DataReceived += OnDataReceived;
+ _client.ConnectionStatusChanged += OnConnectionStatusChanged;
+
+ // Connect to live gateway
+ Log.Trace("DataBentoProvider.Initialize(): Attempting async connection to DataBento live gateway");
+ Task.Run(async () =>
+ {
+ var connected = await _client.ConnectAsync();
+ Log.Trace($"DataBentoProvider.Initialize(): ConnectAsync() returned {connected}");
+
+ if (connected)
+ {
+ Log.Trace("DataBentoProvider.Initialize(): Successfully connected to DataBento live gateway");
+ }
+ else
+ {
+ Log.Error("DataBentoProvider.Initialize(): Failed to connect to DataBento live gateway");
+ }
+ });
+
+ Log.Trace("DataBentoProvider.Initialize(): Initialization complete");
+ }
+
+ ///
+ /// Subscribe to the specified configuration
+ ///
+ /// defines the parameters to subscribe to a data feed
+ /// handler to be fired on new data available
+ /// The new enumerator for this subscription request
+ public IEnumerator? Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
+ {
+ Log.Trace("From Plugin Subscribed ENTER");
+ if (!CanSubscribe(dataConfig)){
+ return null;
+ }
+
+ _subscriptionConfigs[dataConfig.Symbol] = dataConfig;
+ var enumerator = _dataAggregator.Add(dataConfig, newDataAvailableHandler);
+ _subscriptionManager.Subscribe(dataConfig);
+ _activeSubscriptionConfigs.Add(dataConfig);
+
+ Log.Trace("From Plugin Subscribed DONE");
+ return enumerator;
+ }
+
+ ///
+ /// Removes the specified configuration
+ ///
+ /// Subscription config to be removed
+ public void Unsubscribe(SubscriptionDataConfig dataConfig)
+ {
+ _subscriptionConfigs.TryRemove(dataConfig.Symbol, out _);
+ _subscriptionManager.Unsubscribe(dataConfig);
+ var toRemove = _activeSubscriptionConfigs.FirstOrDefault(c => c.Symbol == dataConfig.Symbol && c.TickType == dataConfig.TickType);
+ if (toRemove != null)
+ {
+ _activeSubscriptionConfigs.Remove(toRemove);
+ }
+ _dataAggregator.Remove(dataConfig);
+ }
+
+ ///
+ /// Sets the job we're subscribing for
+ ///
+ /// Job we're subscribing for
+ public void SetJob(LiveNodePacket job)
+ {
+ // No action required for DataBento since the job details are not used in the subscription process.
+ }
+
+ ///
+ /// Dispose of unmanaged resources.
+ ///
+ public void Dispose()
+ {
+ _dataAggregator?.DisposeSafely();
+ _subscriptionManager?.DisposeSafely();
+ _client?.Dispose();
+ _dataDownloader?.Dispose();
+ }
+
+ ///
+ /// Gets the history for the requested security
+ ///
+ /// The historical data request
+ /// An enumerable of BaseData points
+ public IEnumerable? GetHistory(Data.HistoryRequest request)
+ {
+ if (!CanSubscribe(request.Symbol))
+ {
+ return null;
+ }
+
+ try
+ {
+ // Use the data downloader to get historical data
+ var parameters = new DataDownloaderGetParameters(
+ request.Symbol,
+ request.Resolution,
+ request.StartTimeUtc,
+ request.EndTimeUtc,
+ request.TickType);
+
+ return _dataDownloader.Get(parameters);
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DataBentoProvider.GetHistory(): Failed to get history for {request.Symbol}: {ex.Message}");
+ return null;
+ }
+ }
+
+ ///
+ /// Checks if this Data provider supports the specified symbol
+ ///
+ /// The symbol
+ /// returns true if Data Provider supports the specified symbol; otherwise false
+ private bool CanSubscribe(Symbol symbol)
+ {
+ return !symbol.Value.Contains("universe", StringComparison.InvariantCultureIgnoreCase) &&
+ !symbol.IsCanonical() &&
+ IsSecurityTypeSupported(symbol.SecurityType);
+ }
+
+ ///
+ /// Determines whether or not the specified config can be subscribed to
+ ///
+ private bool CanSubscribe(SubscriptionDataConfig config)
+ {
+ return CanSubscribe(config.Symbol) &&
+ IsSupported(config.SecurityType, config.Type, config.TickType, config.Resolution);
+ }
+
+ ///
+ /// Checks if the security type is supported
+ ///
+ /// Security type to check
+ /// True if supported
+ private bool IsSecurityTypeSupported(SecurityType securityType)
+ {
+ // DataBento primarily supports futures, but also has equity and option coverage
+ return securityType == SecurityType.Future;
+ }
+
+ ///
+ /// Determines if the specified subscription is supported
+ ///
+ private bool IsSupported(SecurityType securityType, Type dataType, TickType tickType, Resolution resolution)
+ {
+ // Check supported security types
+ if (!IsSecurityTypeSupported(securityType))
+ {
+ if (!_unsupportedSecurityTypeMessageLogged)
+ {
+ _unsupportedSecurityTypeMessageLogged = true;
+ Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported security type: {securityType}");
+ }
+ return false;
+ }
+
+ // Check supported data types
+ if (dataType != typeof(TradeBar) &&
+ dataType != typeof(QuoteBar) &&
+ dataType != typeof(Tick) &&
+ dataType != typeof(OpenInterest))
+ {
+ if (!_unsupportedDataTypeMessageLogged)
+ {
+ _unsupportedDataTypeMessageLogged = true;
+ Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported data type: {dataType}");
+ }
+ return false;
+ }
+
+ // Warn about potential limitations for tick data
+ // I'm mimicing polygon implementation with this
+ if (!_potentialUnsupportedResolutionMessageLogged)
+ {
+ _potentialUnsupportedResolutionMessageLogged = true;
+ Log.Trace("DataBentoDataProvider.IsSupported(): " +
+ $"Subscription for {securityType}-{dataType}-{tickType}-{resolution} will be attempted. " +
+ $"An Advanced DataBento subscription plan is required to stream tick data.");
+ }
+
+ return true;
+ }
+
+ ///
+ /// Converts the given UTC time into the symbol security exchange time zone
+ ///
+ private DateTime GetTickTime(Symbol symbol, DateTime utcTime)
+ {
+ var exchangeTimeZone = _symbolExchangeTimeZones.GetOrAdd(symbol, sym =>
+ {
+ if (_marketHoursDatabase.TryGetEntry(sym.ID.Market, sym, sym.SecurityType, out var entry))
+ {
+ return entry.ExchangeHours.TimeZone;
+ }
+ // Futures default to Chicago
+ return TimeZones.Chicago;
+ });
+
+ return utcTime.ConvertFromUtc(exchangeTimeZone);
+ }
+
+ //
+ /// Handles data received from the live client
+ ///
+ private void OnDataReceived(object? sender, BaseData data)
+ {
+ try
+ {
+ if (data is Tick tick)
+ {
+ tick.Time = GetTickTime(tick.Symbol, tick.Time);
+ _dataAggregator.Update(tick);
+
+ Log.Trace($"DataBentoProvider.OnDataReceived(): Updated tick - Symbol: {tick.Symbol}, " +
+ $"TickType: {tick.TickType}, Price: {tick.Value}, Quantity: {tick.Quantity}");
+ }
+ else if (data is TradeBar tradeBar)
+ {
+ tradeBar.Time = GetTickTime(tradeBar.Symbol, tradeBar.Time);
+ tradeBar.EndTime = GetTickTime(tradeBar.Symbol, tradeBar.EndTime);
+ _dataAggregator.Update(tradeBar);
+
+ Log.Trace($"DataBentoProvider.OnDataReceived(): Updated TradeBar - Symbol: {tradeBar.Symbol}, " +
+ $"O:{tradeBar.Open} H:{tradeBar.High} L:{tradeBar.Low} C:{tradeBar.Close} V:{tradeBar.Volume}");
+ }
+ else
+ {
+ data.Time = GetTickTime(data.Symbol, data.Time);
+ _dataAggregator.Update(data);
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DataBentoProvider.OnDataReceived(): Error updating data aggregator: {ex.Message}\n{ex.StackTrace}");
+ }
+ }
+
+ ///
+ /// Handles connection status changes from the live client
+ ///
+ private void OnConnectionStatusChanged(object? sender, bool isConnected)
+ {
+ Log.Trace($"DataBentoProvider.OnConnectionStatusChanged(): Connection status changed to: {isConnected}");
+
+ if (isConnected)
+ {
+ // Reset session flag on reconnection
+ lock (_sessionLock)
+ {
+ _sessionStarted = false;
+ }
+
+ // Resubscribe to all active subscriptions
+ Task.Run(() =>
+ {
+ foreach (var config in _activeSubscriptionConfigs)
+ {
+ _client.Subscribe(config.Symbol, config.Resolution, config.TickType);
+ }
+
+ // Start session after resubscribing
+ if (_activeSubscriptionConfigs.Any())
+ {
+ lock (_sessionLock)
+ {
+ if (!_sessionStarted)
+ {
+ Log.Trace("DataBentoProvider.OnConnectionStatusChanged(): Starting session after reconnection");
+ _sessionStarted = _client.StartSession();
+ }
+ }
+ }
+ });
+ }
+ }
+ }
+}
diff --git a/QuantConnect.DataBento/DataBentoHistoryProivder.cs b/QuantConnect.DataBento/DataBentoHistoryProivder.cs
new file mode 100644
index 0000000..64b1da8
--- /dev/null
+++ b/QuantConnect.DataBento/DataBentoHistoryProivder.cs
@@ -0,0 +1,306 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using NodaTime;
+using QuantConnect.Data;
+using QuantConnect.Data.Market;
+using QuantConnect.Lean.Engine.DataFeeds;
+using QuantConnect.Lean.Engine.HistoricalData;
+using QuantConnect.Logging;
+using QuantConnect.Util;
+using QuantConnect.Lean.DataSource.DataBento;
+using QuantConnect.Interfaces;
+using System.Collections.Generic;
+using QuantConnect.Configuration;
+using QuantConnect.Securities;
+using System.Threading.Tasks;
+using QuantConnect.Data.Consolidators;
+
+namespace QuantConnect.Lean.DataSource.DataBento
+{
+ ///
+ /// DataBento implementation of
+ ///
+ public partial class DataBentoHistoryProvider : SynchronizingHistoryProvider
+ {
+ private int _dataPointCount;
+ private DataBentoDataDownloader _dataDownloader;
+
+ ///
+ /// Indicates whether a error for an invalid start time has been fired, where the start time is greater than or equal to the end time in UTC.
+ ///
+ private volatile bool _invalidStartTimeErrorFired;
+
+ ///
+ /// Indicates whether an error has been fired due to invalid conditions if the TickType is and the is greater than one second.
+ ///
+ private volatile bool _invalidTickTypeAndResolutionErrorFired;
+
+ ///
+ /// Indicates whether unsupported tick type message has been logged
+ ///
+ private volatile bool _unsupportedTickTypeMessagedLogged;
+
+ ///
+ /// Market hours database instance
+ ///
+ private MarketHoursDatabase _marketHoursDatabase;
+
+ ///
+ /// Gets the total number of data points emitted by this history provider
+ ///
+ public override int DataPointCount => _dataPointCount;
+
+ private bool _unsupportedSecurityTypeMessageLogged;
+ private bool _unsupportedDataTypeMessageLogged;
+ private bool _potentialUnsupportedResolutionMessageLogged;
+
+ ///
+ /// Initializes this history provider to work for the specified job
+ ///
+ /// The initialization parameters
+ public override void Initialize(HistoryProviderInitializeParameters parameters)
+ {
+ _dataDownloader = new DataBentoDataDownloader();
+ _marketHoursDatabase = MarketHoursDatabase.FromDataFolder();
+ }
+
+ ///
+ /// Gets the history for the requested securities
+ ///
+ /// The historical data requests
+ /// The time zone used when time stamping the slice instances
+ /// An enumerable of the slices of data covering the span specified in each request
+ public override IEnumerable? GetHistory(IEnumerable requests, DateTimeZone sliceTimeZone)
+ {
+ var subscriptions = new List();
+ foreach (var request in requests)
+ {
+ var history = GetHistory(request);
+ if (history == null)
+ {
+ continue;
+ }
+
+ var subscription = CreateSubscription(request, history);
+ if (!subscription.MoveNext())
+ {
+ continue;
+ }
+
+ subscriptions.Add(subscription);
+ }
+
+ if (subscriptions.Count == 0)
+ {
+ return null;
+ }
+ return CreateSliceEnumerableFromSubscriptions(subscriptions, sliceTimeZone);
+ }
+
+ ///
+ /// Gets the history for the requested security
+ ///
+ /// The historical data request
+ /// An enumerable of BaseData points
+ public IEnumerable? GetHistory(HistoryRequest request)
+ {
+ if (request.Symbol.IsCanonical() ||
+ !IsSupported(request.Symbol.SecurityType, request.DataType, request.TickType, request.Resolution))
+ {
+ // It is Logged in IsSupported(...)
+ return null;
+ }
+
+ if (request.TickType == TickType.OpenInterest)
+ {
+ if (!_unsupportedTickTypeMessagedLogged)
+ {
+ _unsupportedTickTypeMessagedLogged = true;
+ Log.Trace($"DataBentoHistoryProvider.GetHistory(): Unsupported tick type: {TickType.OpenInterest}");
+ }
+ return null;
+ }
+
+ if (request.EndTimeUtc < request.StartTimeUtc)
+ {
+ if (!_invalidStartTimeErrorFired)
+ {
+ _invalidStartTimeErrorFired = true;
+ Log.Error($"{nameof(DataBentoHistoryProvider)}.{nameof(GetHistory)}:InvalidDateRange. The history request start date must precede the end date, no history returned");
+ }
+ return null;
+ }
+
+
+ // Use the trade aggregates API for resolutions above tick for fastest results
+ if (request.TickType == TickType.Trade && request.Resolution > Resolution.Tick)
+ {
+ var data = GetAggregates(request);
+
+ if (data == null)
+ {
+ return null;
+ }
+
+ return data;
+ }
+
+ return GetHistoryThroughDataConsolidator(request);
+ }
+
+ private IEnumerable? GetHistoryThroughDataConsolidator(HistoryRequest request)
+ {
+ IDataConsolidator consolidator;
+ IEnumerable history;
+
+ if (request.TickType == TickType.Trade)
+ {
+ consolidator = request.Resolution != Resolution.Tick
+ ? new TickConsolidator(request.Resolution.ToTimeSpan())
+ : FilteredIdentityDataConsolidator.ForTickType(request.TickType);
+ history = GetTrades(request);
+ }
+ else
+ {
+ consolidator = request.Resolution != Resolution.Tick
+ ? new TickQuoteBarConsolidator(request.Resolution.ToTimeSpan())
+ : FilteredIdentityDataConsolidator.ForTickType(request.TickType);
+ history = GetQuotes(request);
+ }
+
+ BaseData? consolidatedData = null;
+ DataConsolidatedHandler onDataConsolidated = (s, e) =>
+ {
+ consolidatedData = (BaseData)e;
+ };
+ consolidator.DataConsolidated += onDataConsolidated;
+
+ foreach (var data in history)
+ {
+ consolidator.Update(data);
+ if (consolidatedData != null)
+ {
+ Interlocked.Increment(ref _dataPointCount);
+ yield return consolidatedData;
+ consolidatedData = null;
+ }
+ }
+
+ consolidator.DataConsolidated -= onDataConsolidated;
+ consolidator.DisposeSafely();
+ }
+
+ ///
+ /// Gets the trade bars for the specified history request
+ ///
+ private IEnumerable GetAggregates(HistoryRequest request)
+ {
+ var resolutionTimeSpan = request.Resolution.ToTimeSpan();
+ foreach (var date in Time.EachDay(request.StartTimeUtc, request.EndTimeUtc))
+ {
+ var start = date;
+ var end = date + Time.OneDay;
+
+ var parameters = new DataDownloaderGetParameters(request.Symbol, request.Resolution, start, end, request.TickType);
+ var data = _dataDownloader.Get(parameters);
+ if (data == null) continue;
+
+ foreach (var bar in data)
+ {
+ var tradeBar = (TradeBar)bar;
+ if (tradeBar.Time >= request.StartTimeUtc && tradeBar.EndTime <= request.EndTimeUtc)
+ {
+ yield return tradeBar;
+ }
+ }
+ }
+ }
+
+ ///
+ /// Gets the trade ticks that will potentially be aggregated for the specified history request
+ ///
+ private IEnumerable GetTrades(HistoryRequest request)
+ {
+ var parameters = new DataDownloaderGetParameters(request.Symbol, Resolution.Tick, request.StartTimeUtc, request.EndTimeUtc, request.TickType);
+ return _dataDownloader.Get(parameters);
+ }
+
+ ///
+ /// Gets the quote ticks that will potentially be aggregated for the specified history request
+ ///
+ private IEnumerable GetQuotes(HistoryRequest request)
+ {
+ var parameters = new DataDownloaderGetParameters(request.Symbol, Resolution.Tick, request.StartTimeUtc, request.EndTimeUtc, request.TickType);
+ return _dataDownloader.Get(parameters);
+ }
+
+ ///
+ /// Checks if the security type is supported
+ ///
+ /// Security type to check
+ /// True if supported
+ private bool IsSecurityTypeSupported(SecurityType securityType)
+ {
+ // DataBento primarily supports futures, but also has equity and option coverage
+ return securityType == SecurityType.Future;
+ }
+
+ ///
+ /// Determines if the specified subscription is supported
+ ///
+ private bool IsSupported(SecurityType securityType, Type dataType, TickType tickType, Resolution resolution)
+ {
+ // Check supported security types
+ if (!IsSecurityTypeSupported(securityType))
+ {
+ if (!_unsupportedSecurityTypeMessageLogged)
+ {
+ _unsupportedSecurityTypeMessageLogged = true;
+ Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported security type: {securityType}");
+ }
+ return false;
+ }
+
+ // Check supported data types
+ if (dataType != typeof(TradeBar) &&
+ dataType != typeof(QuoteBar) &&
+ dataType != typeof(Tick) &&
+ dataType != typeof(OpenInterest))
+ {
+ if (!_unsupportedDataTypeMessageLogged)
+ {
+ _unsupportedDataTypeMessageLogged = true;
+ Log.Trace($"DataBentoDataProvider.IsSupported(): Unsupported data type: {dataType}");
+ }
+ return false;
+ }
+
+ // Warn about potential limitations for tick data
+ // I'm mimicing polygon implementation with this
+ if (!_potentialUnsupportedResolutionMessageLogged)
+ {
+ _potentialUnsupportedResolutionMessageLogged = true;
+ Log.Trace("DataBentoDataProvider.IsSupported(): " +
+ $"Subscription for {securityType}-{dataType}-{tickType}-{resolution} will be attempted. " +
+ $"An Advanced DataBento subscription plan is required to stream tick data.");
+ }
+
+ return true;
+ }
+ }
+}
diff --git a/QuantConnect.DataBento/DataBentoRawLiveClient.cs b/QuantConnect.DataBento/DataBentoRawLiveClient.cs
new file mode 100644
index 0000000..9a5c8b8
--- /dev/null
+++ b/QuantConnect.DataBento/DataBentoRawLiveClient.cs
@@ -0,0 +1,761 @@
+/*
+ * QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
+ * Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+*/
+
+using System;
+using System.IO;
+using System.Text;
+using System.Net.Sockets;
+using System.Security.Cryptography;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+using System.Text.Json;
+using QuantConnect.Data;
+using QuantConnect.Data.Market;
+using QuantConnect.Logging;
+
+namespace QuantConnect.Lean.DataSource.DataBento
+{
+ ///
+ /// DataBento Raw TCP client for live streaming data
+ ///
+ public class DatabentoRawClient : IDisposable
+ {
+ private readonly string _apiKey;
+ private readonly string _gateway;
+ private readonly string _dataset;
+ private TcpClient? _tcpClient;
+ private NetworkStream? _stream;
+ private StreamReader? _reader;
+ private StreamWriter? _writer;
+ private CancellationTokenSource _cancellationTokenSource;
+ private readonly ConcurrentDictionary _subscriptions;
+ private readonly object _connectionLock = new object();
+ private bool _isConnected;
+ private bool _disposed;
+ private const decimal PriceScaleFactor = 1e-9m;
+ private readonly ConcurrentDictionary _instrumentIdToSymbol = new ConcurrentDictionary();
+ private readonly ConcurrentDictionary _lastTicks = new ConcurrentDictionary();
+
+ ///
+ /// Event fired when new data is received
+ ///
+ public event EventHandler? DataReceived;
+
+ ///
+ /// Event fired when connection status changes
+ ///
+ public event EventHandler? ConnectionStatusChanged;
+
+ ///
+ /// Gets whether the client is currently connected
+ ///
+ public bool IsConnected => _isConnected && _tcpClient?.Connected == true;
+
+ ///
+ /// Initializes a new instance of the DatabentoRawClient
+ ///
+ public DatabentoRawClient(string apiKey, string gateway = "glbx-mdp3.lsg.databento.com:13000", string dataset = "GLBX.MDP3")
+ {
+ _apiKey = apiKey ?? throw new ArgumentNullException(nameof(apiKey));
+ _gateway = gateway ?? throw new ArgumentNullException(nameof(gateway));
+ _dataset = dataset;
+ _subscriptions = new ConcurrentDictionary();
+ _cancellationTokenSource = new CancellationTokenSource();
+ }
+
+ ///
+ /// Connects to the DataBento live gateway
+ ///
+ public async Task ConnectAsync()
+ {
+ Log.Trace("DatabentoRawClient.ConnectAsync(): Connecting to DataBento live gateway");
+ if (_isConnected || _disposed)
+ {
+ return _isConnected;
+ }
+
+ try
+ {
+ var parts = _gateway.Split(':');
+ var host = parts[0];
+ var port = parts.Length > 1 ? int.Parse(parts[1]) : 13000;
+
+ _tcpClient = new TcpClient();
+ await _tcpClient.ConnectAsync(host, port).ConfigureAwait(false);
+ _stream = _tcpClient.GetStream();
+ _reader = new StreamReader(_stream, Encoding.ASCII);
+ _writer = new StreamWriter(_stream, Encoding.ASCII) { AutoFlush = true };
+
+ // Perform authentication handshake
+ if (await AuthenticateAsync().ConfigureAwait(false))
+ {
+ _isConnected = true;
+ ConnectionStatusChanged?.Invoke(this, true);
+
+ // Start message processing task
+ _ = Task.Run(() => ProcessMessagesAsync(_cancellationTokenSource.Token));
+
+ Log.Trace("DatabentoRawClient.ConnectAsync(): Connected and authenticated to DataBento live gateway");
+ return true;
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.ConnectAsync(): Failed to connect: {ex.Message}");
+ Disconnect();
+ }
+
+ return false;
+ }
+
+ ///
+ /// Authenticates with the DataBento gateway using CRAM-SHA256
+ ///
+ private async Task AuthenticateAsync()
+ {
+ if (_reader == null || _writer == null)
+ return false;
+
+ try
+ {
+ // Read greeting and challenge
+ string? versionLine = await _reader.ReadLineAsync();
+ string? cramLine = await _reader.ReadLineAsync();
+
+ if (string.IsNullOrEmpty(versionLine) || string.IsNullOrEmpty(cramLine))
+ {
+ Log.Error("DatabentoRawClient.AuthenticateAsync(): Failed to receive greeting or challenge");
+ return false;
+ }
+
+ Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Version: {versionLine}");
+ Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Challenge: {cramLine}");
+
+ // Parse challenge
+ string[] cramParts = cramLine.Split('=');
+ if (cramParts.Length != 2 || cramParts[0] != "cram")
+ {
+ Log.Error("DatabentoRawClient.AuthenticateAsync(): Invalid challenge format");
+ return false;
+ }
+ string cram = cramParts[1].Trim();
+
+ // Compute auth hash
+ string concat = $"{cram}|{_apiKey}";
+ string hashHex = ComputeSHA256(concat);
+ string bucketId = _apiKey.Length >= 5 ? _apiKey.Substring(_apiKey.Length - 5) : _apiKey;
+ string authString = $"{hashHex}-{bucketId}";
+
+ // Send auth message
+ string authMsg = $"auth={authString}|dataset={_dataset}|encoding=json|ts_out=0";
+ Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Sending auth");
+ await _writer.WriteLineAsync(authMsg);
+
+ // Read auth response
+ string? authResp = await _reader.ReadLineAsync();
+ if (string.IsNullOrEmpty(authResp))
+ {
+ Log.Error("DatabentoRawClient.AuthenticateAsync(): No authentication response received");
+ return false;
+ }
+
+ Log.Trace($"DatabentoRawClient.AuthenticateAsync(): Auth response: {authResp}");
+
+ if (!authResp.Contains("success=1"))
+ {
+ Log.Error($"DatabentoRawClient.AuthenticateAsync(): Authentication failed: {authResp}");
+ return false;
+ }
+
+ Log.Trace("DatabentoRawClient.AuthenticateAsync(): Authentication successful");
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.AuthenticateAsync(): Authentication failed: {ex.Message}");
+ return false;
+ }
+ }
+ private static string ComputeSHA256(string input)
+ {
+ using var sha = SHA256.Create();
+ byte[] hash = sha.ComputeHash(Encoding.UTF8.GetBytes(input));
+ var sb = new StringBuilder();
+ foreach (byte b in hash)
+ {
+ sb.Append(b.ToString("x2"));
+ }
+ return sb.ToString();
+ }
+
+ ///
+ /// Subscribes to live data for a symbol
+ ///
+ public bool Subscribe(Symbol symbol, Resolution resolution, TickType tickType)
+ {
+ if (!IsConnected || _writer == null)
+ {
+ Log.Error("DatabentoRawClient.Subscribe(): Not connected to gateway");
+ return false;
+ }
+
+ try
+ {
+ // Get the databento symbol form LEAN symbol
+ // Get schema from the resolution
+ var databentoSymbol = MapSymbolToDataBento(symbol);
+ var schema = GetSchema(resolution, tickType);
+
+ // subscribe
+ var subscribeMessage = $"schema={schema}|stype_in=parent|symbols={databentoSymbol}";
+ Log.Trace($"DatabentoRawClient.Subscribe(): Subscribing with message: {subscribeMessage}");
+
+ // Send subscribe message
+ _writer.WriteLine(subscribeMessage);
+
+ // Store subscription
+ _subscriptions.TryAdd(symbol, (resolution, tickType));
+ Log.Trace($"DatabentoRawClient.Subscribe(): Subscribed to {symbol} ({databentoSymbol}) at {resolution} resolution for {tickType}");
+
+ // If subscribing to quote ticks, also subscribe to trade ticks
+ if (tickType == TickType.Quote && resolution == Resolution.Tick)
+ {
+ var tradeSchema = GetSchema(resolution, TickType.Trade);
+ var tradeSubscribeMessage = $"schema={tradeSchema}|stype_in=parent|symbols={databentoSymbol}";
+ Log.Trace($"DatabentoRawClient.Subscribe(): Also subscribing to trades with message: {tradeSubscribeMessage}");
+ _writer.WriteLine(tradeSubscribeMessage);
+ }
+
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.Subscribe(): Failed to subscribe to {symbol}: {ex.Message}");
+ return false;
+ }
+ }
+
+ ///
+ /// Starts the session to begin receiving data
+ ///
+ public bool StartSession()
+ {
+ if (!IsConnected || _writer == null)
+ {
+ Log.Error("DatabentoRawClient.StartSession(): Not connected");
+ return false;
+ }
+
+ try
+ {
+ Log.Trace("DatabentoRawClient.StartSession(): Starting session");
+ _writer.WriteLine("start_session=1");
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.StartSession(): Failed to start session: {ex.Message}");
+ return false;
+ }
+ }
+
+ ///
+ /// Unsubscribes from live data for a symbol
+ ///
+ public bool Unsubscribe(Symbol symbol)
+ {
+ try
+ {
+ if (_subscriptions.TryRemove(symbol, out _))
+ {
+ Log.Trace($"DatabentoRawClient.Unsubscribe(): Unsubscribed from {symbol}");
+ }
+ return true;
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.Unsubscribe(): Failed to unsubscribe from {symbol}: {ex.Message}");
+ return false;
+ }
+ }
+
+ ///
+ /// Processes incoming messages from the DataBento gateway
+ ///
+ private async Task ProcessMessagesAsync(CancellationToken cancellationToken)
+ {
+ Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Starting message processing");
+ if (_reader == null)
+ {
+ Log.Error("DatabentoRawClient.ProcessMessagesAsync(): No reader available");
+ return;
+ }
+
+ var messageCount = 0;
+
+ try
+ {
+ while (!cancellationToken.IsCancellationRequested && IsConnected)
+ {
+ var line = await _reader.ReadLineAsync();
+ if (line == null)
+ {
+ Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Connection closed by server");
+ break;
+ }
+
+ if (string.IsNullOrWhiteSpace(line))
+ continue;
+
+ messageCount++;
+ if (messageCount <= 50 || messageCount % 100 == 0)
+ {
+ Log.Trace($"DatabentoRawClient.ProcessMessagesAsync(): Message #{messageCount}: {line.Substring(0, Math.Min(150, line.Length))}...");
+ }
+
+ await ProcessSingleMessage(line);
+ }
+ }
+ catch (OperationCanceledException)
+ {
+ Log.Trace("DatabentoRawClient.ProcessMessagesAsync(): Message processing cancelled");
+ }
+ catch (IOException ex) when (ex.InnerException is SocketException)
+ {
+ Log.Trace($"DatabentoRawClient.ProcessMessagesAsync(): Socket exception: {ex.Message}");
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.ProcessMessagesAsync(): Error processing messages: {ex.Message}\n{ex.StackTrace}");
+ }
+ finally
+ {
+ Log.Trace($"DatabentoRawClient.ProcessMessagesAsync(): Exiting. Total messages processed: {messageCount}");
+ Disconnect();
+ }
+ }
+
+ ///
+ /// Processes a single message from DataBento
+ ///
+ private async Task ProcessSingleMessage(string message)
+ {
+ await Task.CompletedTask;
+
+ try
+ {
+ using var document = JsonDocument.Parse(message);
+ var root = document.RootElement;
+
+ // Check for error messages
+ if (root.TryGetProperty("hd", out var headerElement))
+ {
+ if (headerElement.TryGetProperty("rtype", out var rtypeElement))
+ {
+ var rtype = rtypeElement.GetInt32();
+
+ if (rtype == 23)
+ {
+ if (root.TryGetProperty("msg", out var msgElement))
+ {
+ Log.Trace($"DatabentoRawClient: System message: {msgElement.GetString()}");
+ }
+ return;
+ }
+ else if (rtype == 22)
+ {
+ // Symbol mapping message
+ if (root.TryGetProperty("stype_in_symbol", out var inSymbol) &&
+ root.TryGetProperty("stype_out_symbol", out var outSymbol) &&
+ headerElement.TryGetProperty("instrument_id", out var instId))
+ {
+ var instrumentId = instId.GetInt64();
+ var outSymbolStr = outSymbol.GetString();
+
+ Log.Trace($"DatabentoRawClient: Symbol mapping: {inSymbol.GetString()} -> {outSymbolStr} (instrument_id: {instrumentId})");
+
+ // Find the subscription that matches this symbol
+ foreach (var kvp in _subscriptions)
+ {
+ var leanSymbol = kvp.Key;
+ if (outSymbolStr != null)
+ {
+ _instrumentIdToSymbol[instrumentId] = leanSymbol;
+ Log.Trace($"DatabentoRawClient: Mapped instrument_id {instrumentId} to {leanSymbol}");
+ break;
+ }
+ }
+ }
+ return;
+ }
+ else if (rtype == 1)
+ {
+ // MBP-1 (Market By Price) - Quote ticks
+ await HandleMBPMessage(root, headerElement);
+ return;
+ }
+ else if (rtype == 0)
+ {
+ // Trade messages - Trade ticks
+ await HandleTradeTickMessage(root, headerElement);
+ return;
+ }
+ else if (rtype == 32 || rtype == 33 || rtype == 34 || rtype == 35)
+ {
+ // OHLCV bar messages
+ await HandleOHLCVMessage(root, headerElement);
+ return;
+ }
+ }
+ }
+
+ // Handle other message types if needed
+ if (root.TryGetProperty("error", out var errorElement))
+ {
+ Log.Error($"DatabentoRawClient: Server error: {errorElement.GetString()}");
+ }
+ }
+ catch (JsonException ex)
+ {
+ Log.Error($"DatabentoRawClient.ProcessSingleMessage(): JSON parse error: {ex.Message}");
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.ProcessSingleMessage(): Error: {ex.Message}");
+ }
+ }
+
+ ///
+ /// Handles OHLCV messages and converts to LEAN TradeBar data
+ ///
+ private async Task HandleOHLCVMessage(JsonElement root, JsonElement header)
+ {
+ await Task.CompletedTask;
+
+ try
+ {
+ if (!header.TryGetProperty("ts_event", out var tsElement) ||
+ !header.TryGetProperty("instrument_id", out var instIdElement))
+ {
+ return;
+ }
+
+ // Convert timestamp from nanoseconds to DateTime
+ var timestampNs = long.Parse(tsElement.GetString()!);
+ var unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ var timestamp = unixEpoch.AddTicks(timestampNs / 100);
+
+ var instrumentId = instIdElement.GetInt64();
+
+ if (!_instrumentIdToSymbol.TryGetValue(instrumentId, out var matchedSymbol))
+ {
+ Log.Trace($"DatabentoRawClient: No mapping for instrument_id {instrumentId} in OHLCV message.");
+ return;
+ }
+
+ // Get the resolution for this symbol
+ if (!_subscriptions.TryGetValue(matchedSymbol, out var subscription))
+ {
+ return;
+ }
+
+ var resolution = subscription.Item1;
+
+ // Extract OHLCV data
+ if (root.TryGetProperty("open", out var openElement) &&
+ root.TryGetProperty("high", out var highElement) &&
+ root.TryGetProperty("low", out var lowElement) &&
+ root.TryGetProperty("close", out var closeElement) &&
+ root.TryGetProperty("volume", out var volumeElement))
+ {
+ // Parse prices
+ var openRaw = long.Parse(openElement.GetString()!);
+ var highRaw = long.Parse(highElement.GetString()!);
+ var lowRaw = long.Parse(lowElement.GetString()!);
+ var closeRaw = long.Parse(closeElement.GetString()!);
+ var volume = volumeElement.GetInt64();
+
+ var open = openRaw * PriceScaleFactor;
+ var high = highRaw * PriceScaleFactor;
+ var low = lowRaw * PriceScaleFactor;
+ var close = closeRaw * PriceScaleFactor;
+
+ // Determine the period based on resolution
+ TimeSpan period = resolution switch
+ {
+ Resolution.Second => TimeSpan.FromSeconds(1),
+ Resolution.Minute => TimeSpan.FromMinutes(1),
+ Resolution.Hour => TimeSpan.FromHours(1),
+ Resolution.Daily => TimeSpan.FromDays(1),
+ _ => TimeSpan.FromMinutes(1)
+ };
+
+ // Create TradeBar
+ var tradeBar = new TradeBar(
+ timestamp,
+ matchedSymbol,
+ open,
+ high,
+ low,
+ close,
+ volume,
+ period
+ );
+
+ Log.Trace($"DatabentoRawClient: OHLCV bar: {matchedSymbol} O={open} H={high} L={low} C={close} V={volume} at {timestamp}");
+ DataReceived?.Invoke(this, tradeBar);
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.HandleOHLCVMessage(): Error: {ex.Message}");
+ }
+ }
+
+ ///
+ /// Handles MBP messages for quote ticks
+ ///
+ private async Task HandleMBPMessage(JsonElement root, JsonElement header)
+ {
+ await Task.CompletedTask;
+
+ try
+ {
+ if (!header.TryGetProperty("ts_event", out var tsElement) ||
+ !header.TryGetProperty("instrument_id", out var instIdElement))
+ {
+ return;
+ }
+
+ // Convert timestamp from nanoseconds to DateTime
+ var timestampNs = long.Parse(tsElement.GetString()!);
+ var unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ var timestamp = unixEpoch.AddTicks(timestampNs / 100);
+
+ var instrumentId = instIdElement.GetInt64();
+
+ if (!_instrumentIdToSymbol.TryGetValue(instrumentId, out var matchedSymbol))
+ {
+ Log.Trace($"DatabentoRawClient: No mapping for instrument_id {instrumentId} in MBP message.");
+ return;
+ }
+
+ // For MBP-1, bid/ask data is in the levels array at index 0
+ if (root.TryGetProperty("levels", out var levelsElement) &&
+ levelsElement.GetArrayLength() > 0)
+ {
+ var level0 = levelsElement[0];
+
+ var quoteTick = new Tick
+ {
+ Symbol = matchedSymbol,
+ Time = timestamp,
+ TickType = TickType.Quote
+ };
+
+ if (level0.TryGetProperty("ask_px", out var askPxElement) &&
+ level0.TryGetProperty("ask_sz", out var askSzElement))
+ {
+ var askPriceRaw = long.Parse(askPxElement.GetString()!);
+ quoteTick.AskPrice = askPriceRaw * PriceScaleFactor;
+ quoteTick.AskSize = askSzElement.GetInt32();
+ }
+
+ if (level0.TryGetProperty("bid_px", out var bidPxElement) &&
+ level0.TryGetProperty("bid_sz", out var bidSzElement))
+ {
+ var bidPriceRaw = long.Parse(bidPxElement.GetString()!);
+ quoteTick.BidPrice = bidPriceRaw * PriceScaleFactor;
+ quoteTick.BidSize = bidSzElement.GetInt32();
+ }
+
+ // Set the tick value to the mid price
+ quoteTick.Value = (quoteTick.BidPrice + quoteTick.AskPrice) / 2;
+
+ // QuantConnect convention: Quote ticks should have zero Price and Quantity
+ quoteTick.Quantity = 0;
+
+ Log.Trace($"DatabentoRawClient: Quote tick: {matchedSymbol} Bid={quoteTick.BidPrice}x{quoteTick.BidSize} Ask={quoteTick.AskPrice}x{quoteTick.AskSize}");
+ DataReceived?.Invoke(this, quoteTick);
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.HandleMBPMessage(): Error: {ex.Message}");
+ }
+ }
+
+ ///
+ /// Handles trade tick messages. Aggressor fills
+ ///
+ private async Task HandleTradeTickMessage(JsonElement root, JsonElement header)
+ {
+ await Task.CompletedTask;
+
+ try
+ {
+ if (!header.TryGetProperty("ts_event", out var tsElement) ||
+ !header.TryGetProperty("instrument_id", out var instIdElement))
+ {
+ return;
+ }
+
+ // Convert timestamp from nanoseconds to DateTime
+ var timestampNs = long.Parse(tsElement.GetString()!);
+ var unixEpoch = new DateTime(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);
+ var timestamp = unixEpoch.AddTicks(timestampNs / 100);
+
+ var instrumentId = instIdElement.GetInt64();
+
+ if (!_instrumentIdToSymbol.TryGetValue(instrumentId, out var matchedSymbol))
+ {
+ Log.Trace($"DatabentoRawClient: No mapping for instrument_id {instrumentId} in trade message.");
+ return;
+ }
+
+ if (root.TryGetProperty("price", out var priceElement) &&
+ root.TryGetProperty("size", out var sizeElement))
+ {
+ var priceRaw = long.Parse(priceElement.GetString()!);
+ var size = sizeElement.GetInt32();
+ var price = priceRaw * PriceScaleFactor;
+
+ var tradeTick = new Tick
+ {
+ Symbol = matchedSymbol,
+ Time = timestamp,
+ Value = price,
+ Quantity = size,
+ TickType = TickType.Trade,
+ // Trade ticks should have zero bid/ask values
+ BidPrice = 0,
+ BidSize = 0,
+ AskPrice = 0,
+ AskSize = 0
+ };
+
+ Log.Trace($"DatabentoRawClient: Trade tick: {matchedSymbol} Price={price} Quantity={size}");
+ DataReceived?.Invoke(this, tradeTick);
+ }
+ }
+ catch (Exception ex)
+ {
+ Log.Error($"DatabentoRawClient.HandleTradeTickMessage(): Error: {ex.Message}");
+ }
+ }
+
+ ///
+ /// Maps a LEAN symbol to DataBento symbol format
+ ///
+ private string MapSymbolToDataBento(Symbol symbol)
+ {
+ if (symbol.SecurityType == SecurityType.Future)
+ {
+ // For DataBento, use the root symbol with .FUT suffix for parent subscription
+ // ES19Z25 -> ES.FUT
+ var value = symbol.Value;
+
+ // Extract root by removing digits and month codes
+ var root = new string(value.TakeWhile(c => !char.IsDigit(c)).ToArray());
+
+ return $"{root}.FUT";
+ }
+
+ return symbol.Value;
+ }
+
+ ///
+ /// Pick Databento schema from Lean resolution/ticktype
+ ///
+ private string GetSchema(Resolution resolution, TickType tickType)
+ {
+ if (tickType == TickType.Trade)
+ {
+ if (resolution == Resolution.Tick)
+ return "trades";
+ if (resolution == Resolution.Second)
+ return "ohlcv-1s";
+ if (resolution == Resolution.Minute)
+ return "ohlcv-1m";
+ if (resolution == Resolution.Hour)
+ return "ohlcv-1h";
+ if (resolution == Resolution.Daily)
+ return "ohlcv-1d";
+ }
+ else if (tickType == TickType.Quote)
+ {
+ // top of book
+ if (resolution == Resolution.Tick || resolution == Resolution.Second || resolution == Resolution.Minute || resolution == Resolution.Hour || resolution == Resolution.Daily)
+ return "mbp-1";
+ }
+ else if (tickType == TickType.OpenInterest)
+ {
+ return "statistics";
+ }
+
+ throw new NotSupportedException($"Unsupported resolution {resolution} / {tickType}");
+ }
+
+ ///
+ /// Disconnects from the DataBento gateway
+ ///
+ public void Disconnect()
+ {
+ lock (_connectionLock)
+ {
+ if (!_isConnected)
+ return;
+
+ _isConnected = false;
+ _cancellationTokenSource?.Cancel();
+
+ try
+ {
+ _reader?.Dispose();
+ _writer?.Dispose();
+ _stream?.Close();
+ _tcpClient?.Close();
+ }
+ catch (Exception ex)
+ {
+ Log.Trace($"DatabentoRawClient.Disconnect(): Error during disconnect: {ex.Message}");
+ }
+
+ ConnectionStatusChanged?.Invoke(this, false);
+ Log.Trace("DatabentoRawClient.Disconnect(): Disconnected from DataBento gateway");
+ }
+ }
+
+ ///
+ /// Disposes of resources
+ ///
+ public void Dispose()
+ {
+ if (_disposed)
+ return;
+
+ _disposed = true;
+ Disconnect();
+
+ _cancellationTokenSource?.Dispose();
+ _reader?.Dispose();
+ _writer?.Dispose();
+ _stream?.Dispose();
+ _tcpClient?.Dispose();
+ }
+ }
+}
diff --git a/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj b/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj
new file mode 100644
index 0000000..718002c
--- /dev/null
+++ b/QuantConnect.DataBento/QuantConnect.DataSource.DataBento.csproj
@@ -0,0 +1,42 @@
+
+
+
+ Release
+ AnyCPU
+ net9.0
+ QuantConnect.Lean.DataSource.DataBento
+ QuantConnect.Lean.DataSource.DataBento
+ QuantConnect.Lean.DataSource.DataBento
+ QuantConnect.Lean.DataSource.DataBento
+ Library
+ bin\$(Configuration)\
+ false
+ true
+ false
+ QuantConnect LEAN DataBento Data Source: DataBento Data Source plugin for Lean
+
+ enable
+ enable
+
+
+
+ full
+ bin\Debug\
+
+
+
+ pdbonly
+ bin\Release\
+
+
+
+
+
+
+
+
+
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..642fc49
--- /dev/null
+++ b/README.md
@@ -0,0 +1,47 @@
+
+
+# Lean DataSource SDK
+
+[](https://github.com/QuantConnect/LeanDataSdk/actions?query=workflow%3A%22Build%20%26%20Test%22)
+
+### Introduction
+
+The Lean Data SDK is a cross-platform template repository for developing custom data types for Lean.
+These data types will be consumed by [QuantConnect](https://www.quantconnect.com/) trading algorithms and research environment, locally or in the cloud.
+
+It is composed by example .Net solution for the data type and converter scripts.
+
+### Prerequisites
+
+The solution targets dotnet 5, for installation instructions please follow [dotnet download](https://dotnet.microsoft.com/download).
+
+The data downloader and converter script can be developed in different ways: C# executable, Python script, Python Jupyter notebook or even a bash script.
+- The python script should be compatible with python 3.6.8
+- Bash script will run on Ubuntu Bionic
+
+Specifically, the enviroment where these scripts will be run is [quantconnect/research](https://hub.docker.com/repository/docker/quantconnect/research) based on [quantconnect/lean:foundation](https://hub.docker.com/repository/docker/quantconnect/lean).
+
+### Installation
+
+The "Use this template" feature should be used for each unique data source which requires its own data processing. Once it is cloned locally, you should be able to successfully build the solution, run all tests and execute the downloader and/or conveter scripts. The final version should pass all CI tests of GitHub Actions.
+
+Once ready, please contact support@quantconnect.com and we will create a listing in the QuantConnect Data Market for your company and link to your public repository and commit hash.
+
+### Datasets Vendor Requirements
+
+Key requirements for new vendors include:
+
+ - A well-defined dataset with a clear and static vision for the data to minimize churn or changes as people will be building systems from it. This is easiest with "raw" data (e.g. sunshine hours vs a sentiment algorithm)
+ - Robust ticker and security links to ensure the tickers are tracked well through time, or accurately point in time. ISIN, FIGI, or point in time ticker supported
+ - Robust funding to ensure viable for at least 1 year
+ - Robust API to ensure reliable up-time. No dead links on site or and 502 servers while using API
+ - Consistent delivery schedule, on time and in time for market trading
+ - Consistent data format with notifications and lead time on data format updates
+ - At least 1 year of historical point in time data
+ - Survivorship bias free data
+ - Good documentation for the dataset
+
+
+### Tutorials
+
+ - See [Tutorials](https://www.quantconnect.com/docs/v2/our-platform/datasets/contributing-datasets) for a step by step guide for creating a new LEAN Data Source.
\ No newline at end of file
diff --git a/databento.json b/databento.json
new file mode 100644
index 0000000..c8efbf9
--- /dev/null
+++ b/databento.json
@@ -0,0 +1,37 @@
+{
+ "description": "DataBento provides historical and live market data across a variety of exchanges and asset classes.",
+
+ "platforms-features": [
+ {
+ "Platforms": ["Cloud Platform", "Local Platform", "LEAN CLI"],
+ "Download Data": [0,1,1],
+ "Backtesting": [0,1,1],
+ "Optimization": [0,1,1],
+ "Live Trading": [0,1,1]
+ }
+ ],
+ "data-supported": [ "TradeBar", "QuoteBar" ],
+ "documentation": "https://databento.com/docs/",
+ "module-specification": {
+ "download": {
+ "data-types": [
+ "Trade",
+ "Quote"
+ ],
+ "resolutions": [
+ "Tick",
+ "Second",
+ "Minute",
+ "Hour",
+ "Daily"
+ ],
+ "security-types": [
+ "Future"
+ ],
+ "markets": [
+ "CME",
+ "Globex"
+ ]
+ }
+ }
+}
\ No newline at end of file