Skip to content

Commit

Permalink
Revert fill forward enumerator change
Browse files Browse the repository at this point in the history
- Revert FillForward enumerator causing stats changing, enhancing unit
  tests
  • Loading branch information
Martin-Molinero committed May 17, 2024
1 parent b132b09 commit b560e87
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
using QuantConnect.Interfaces;
using QuantConnect.Data.Market;
using System.Collections.Generic;
using QuantConnect.Data;

namespace QuantConnect.Algorithm.CSharp
{
Expand Down
4 changes: 2 additions & 2 deletions Common/Data/Consolidators/MarketHourAwareConsolidator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected void Initialize(IBaseData data)
ExchangeHours = marketHoursDatabase.GetExchangeHours(symbol.ID.Market, symbol, symbol.SecurityType);
DataTimeZone = marketHoursDatabase.GetDataTimeZone(symbol.ID.Market, symbol, symbol.SecurityType);

_useStrictEndTime = !ExchangeHours.IsMarketAlwaysOpen && UseStrictEndTime(data.Symbol);
_useStrictEndTime = UseStrictEndTime(data.Symbol);
}
}

Expand All @@ -189,7 +189,7 @@ protected virtual CalendarInfo DailyStrictEndTime(DateTime dateTime)
/// </summary>
protected virtual bool UseStrictEndTime(Symbol symbol)
{
return LeanData.UseStrictEndTime(_dailyStrictEndTimeEnabled, symbol, Period);
return LeanData.UseStrictEndTime(_dailyStrictEndTimeEnabled, symbol, Period, ExchangeHours);
}

/// <summary>
Expand Down
2 changes: 2 additions & 0 deletions Common/Data/IDataAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@

using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;

namespace QuantConnect.Data
{
/// <summary>
/// Aggregates ticks and bars based on given subscriptions.
/// </summary>
[InheritedExport(typeof(IDataAggregator))]
public interface IDataAggregator : IDisposable
{
/// <summary>
Expand Down
16 changes: 8 additions & 8 deletions Common/Util/LeanData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,11 +1370,7 @@ public static CalendarInfo GetDailyCalendar(DateTime exchangeTimeZoneDate, Secur
public static DateTime GetNextDailyEndTime(Symbol symbol, DateTime exchangeTimeZoneDate, SecurityExchangeHours exchangeHours)
{
var nextMidnight = exchangeTimeZoneDate.Date.AddDays(1);
if (exchangeHours.IsMarketAlwaysOpen
// the cases have market hours which cross multiple days
|| symbol.SecurityType == SecurityType.Cfd && symbol.ID.Market == Market.Oanda
|| symbol.SecurityType == SecurityType.Forex
|| symbol.SecurityType == SecurityType.Base)
if (!UseStrictEndTime(true, symbol, Time.OneDay, exchangeHours))
{
return nextMidnight;
}
Expand Down Expand Up @@ -1407,9 +1403,10 @@ public static bool OptionUseScaleFactor(Symbol symbol)
/// <param name="symbol">The associated symbol</param>
/// <param name="increment">The datas time increment</param>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static bool UseStrictEndTime(bool dailyStrictEndTimeEnabled, Symbol symbol, TimeSpan increment)
public static bool UseStrictEndTime(bool dailyStrictEndTimeEnabled, Symbol symbol, TimeSpan increment, SecurityExchangeHours exchangeHours)
{
if (increment != Time.OneDay
if (exchangeHours.IsMarketAlwaysOpen
|| increment != Time.OneDay
|| symbol.SecurityType == SecurityType.Cfd && symbol.ID.Market == Market.Oanda
|| symbol.SecurityType == SecurityType.Forex
|| symbol.SecurityType == SecurityType.Base)
Expand All @@ -1419,9 +1416,12 @@ public static bool UseStrictEndTime(bool dailyStrictEndTimeEnabled, Symbol symbo
return dailyStrictEndTimeEnabled;
}

/// <summary>
/// Helper method to determine if we should use strict end time
/// </summary>
public static bool UseDailyStrictEndTimes(IAlgorithmSettings settings, BaseDataRequest request, Symbol symbol, TimeSpan increment)
{
return !request.ExchangeHours.IsMarketAlwaysOpen && UseStrictEndTime(settings.DailyStrictEndTimeEnabled, symbol, increment);
return UseStrictEndTime(settings.DailyStrictEndTimeEnabled, symbol, increment, request.ExchangeHours);
}

/// <summary>
Expand Down
17 changes: 17 additions & 0 deletions Engine/DataFeeds/DataQueueHandlerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using QuantConnect.Packets;
using QuantConnect.Logging;
using QuantConnect.Interfaces;
using QuantConnect.Securities;
using QuantConnect.Data.Market;
using System.Collections.Generic;
using QuantConnect.Lean.Engine.DataFeeds.Enumerators;
Expand All @@ -32,8 +33,17 @@ namespace QuantConnect.Lean.Engine.DataFeeds
public class DataQueueHandlerManager : IDataQueueHandler, IDataQueueUniverseProvider
{
private ITimeProvider _frontierTimeProvider;
private readonly IAlgorithmSettings _algorithmSettings;
private readonly Dictionary<SubscriptionDataConfig, Queue<IDataQueueHandler>> _dataConfigAndDataHandler = new();

/// <summary>
/// Creates a new instance
/// </summary>
public DataQueueHandlerManager(IAlgorithmSettings settings)
{
_algorithmSettings = settings;
}

/// <summary>
/// Collection of data queue handles being used
/// </summary>
Expand All @@ -58,6 +68,7 @@ public class DataQueueHandlerManager : IDataQueueHandler, IDataQueueUniverseProv
/// <returns>The new enumerator for this subscription request</returns>
public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventHandler newDataAvailableHandler)
{
var exchangeHours = MarketHoursDatabase.FromDataFolder().GetExchangeHours(dataConfig.Symbol.ID.Market, dataConfig.Symbol, dataConfig.Symbol.SecurityType);
foreach (var dataHandler in DataHandlers)
{
// Emit ticks & custom data as soon as we get them, they don't need any kind of batching behavior applied to them
Expand Down Expand Up @@ -92,6 +103,12 @@ public IEnumerator<BaseData> Subscribe(SubscriptionDataConfig dataConfig, EventH
return enumerator;
}

if (LeanData.UseStrictEndTime(_algorithmSettings.DailyStrictEndTimeEnabled, dataConfig.Symbol, dataConfig.Increment, exchangeHours))
{
// before the first frontier enumerator we adjust the endtimes if required
enumerator = new StrictDailyEndTimesEnumerator(enumerator, exchangeHours);
}

return new FrontierAwareEnumerator(enumerator, _frontierTimeProvider,
new TimeZoneOffsetProvider(exchangeTimeZone, _frontierTimeProvider.GetUtcNow(), Time.EndOfTime)
);
Expand Down
5 changes: 4 additions & 1 deletion Engine/DataFeeds/Enumerators/FillForwardEnumerator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ public bool MoveNext()
return false;
}

if (Current != null && Current.EndTime == endOfSubscription.EndTime || endOfSubscription.EndTime > _subscriptionEndTime)
if (Current != null && Current.EndTime == endOfSubscription.EndTime
// TODO this changes stats, why would the FF enumerator emit a data point beyoned the end time he was requested
//|| endOfSubscription.EndTime > _subscriptionEndTime
)
{
return false;
}
Expand Down
2 changes: 1 addition & 1 deletion Engine/DataFeeds/LiveTradingDataFeed.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public override void Exit()
/// <returns>The loaded <see cref="IDataQueueHandler"/></returns>
protected virtual IDataQueueHandler GetDataQueueHandler()
{
var result = new DataQueueHandlerManager();
var result = new DataQueueHandlerManager(_algorithm.Settings);
result.UnsupportedConfiguration += HandleUnsupportedConfigurationEvent;
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion Engine/DataFeeds/Queues/FakeDataQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class FakeDataQueue : IDataQueueHandler, IDataQueueUniverseProvider
/// Initializes a new instance of the <see cref="FakeDataQueue"/> class to randomly emit data for each symbol
/// </summary>
public FakeDataQueue()
: this(new AggregationManager())
: this(Composer.Instance.GetExportedValueByTypeName<IDataAggregator>(nameof(AggregationManager)))
{

}
Expand Down
2 changes: 1 addition & 1 deletion Engine/DataFeeds/SubscriptionData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static SubscriptionData Create(bool dailyStrictEndTimeEnabled, Subscripti
data = data.Clone(data.IsFillForward);
var emitTimeUtc = offsetProvider.ConvertToUtc(data.EndTime);
// rounding down does not make sense for daily increments using strict end times
if (!LeanData.UseStrictEndTime(dailyStrictEndTimeEnabled, configuration.Symbol, configuration.Increment))
if (!LeanData.UseStrictEndTime(dailyStrictEndTimeEnabled, configuration.Symbol, configuration.Increment, exchangeHours))
{
// Let's round down for any data source that implements a time delta between
// the start of the data and end of the data (usually used with Bars).
Expand Down
16 changes: 8 additions & 8 deletions Tests/Engine/DataFeeds/DataQueueHandlerManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ public void SetJob()
{
DataQueueHandler = dataHandlers
};
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.SetJob(jobWithArrayIDQH);
compositeDataQueueHandler.Dispose();
}

[Test]
public void SubscribeReturnsNull()
{
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
var enumerator = compositeDataQueueHandler.Subscribe(GetConfig(), (_, _) => {});
Assert.Null(enumerator);
compositeDataQueueHandler.Dispose();
Expand All @@ -69,7 +69,7 @@ public void SubscribeReturnsNotNull()
{
DataQueueHandler = dataHandlers
};
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.SetJob(job);
var enumerator = compositeDataQueueHandler.Subscribe(GetConfig(), (_, _) => {});
Assert.NotNull(enumerator);
Expand All @@ -80,15 +80,15 @@ public void SubscribeReturnsNotNull()
[Test]
public void Unsubscribe()
{
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.Unsubscribe(GetConfig());
compositeDataQueueHandler.Dispose();
}

[Test]
public void IsNotUniverseProvider()
{
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
Assert.IsFalse(compositeDataQueueHandler.HasUniverseProvider);
Assert.Throws<NotSupportedException>(() => compositeDataQueueHandler.LookupSymbols(Symbols.ES_Future_Chain, false));
Assert.Throws<NotSupportedException>(() => compositeDataQueueHandler.CanPerformSelection());
Expand All @@ -98,7 +98,7 @@ public void IsNotUniverseProvider()
[Test]
public void DoubleSubscribe()
{
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.SetJob(new LiveNodePacket { DataQueueHandler = "[ \"TestDataHandler\" ]" });

var dataConfig = GetConfig();
Expand All @@ -112,7 +112,7 @@ public void DoubleSubscribe()
public void SingleSubscribe()
{
TestDataHandler.UnsubscribeCounter = 0;
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.SetJob(new LiveNodePacket { DataQueueHandler = "[ \"TestDataHandler\" ]" });

var dataConfig = GetConfig();
Expand All @@ -133,7 +133,7 @@ public void MappedConfig(bool canonicalUnsubscribeFirst)
{
TestDataHandler.UnsubscribeCounter = 0;
TestDataHandler.SubscribeCounter = 0;
var compositeDataQueueHandler = new DataQueueHandlerManager();
var compositeDataQueueHandler = new DataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.SetJob(new LiveNodePacket { DataQueueHandler = "[ \"TestDataHandler\" ]" });

var canonicalSymbol = Symbols.ES_Future_Chain.UpdateMappedSymbol(Symbols.Future_ESZ18_Dec2018.ID.ToString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ public void FillsForwardDailyMissingDays(bool strictEndTimes)
Assert.IsTrue(((TradeBar)fillForwardEnumerator.Current).Period == expectedBarPeriod);
Assert.AreEqual(0, ((TradeBar)fillForwardEnumerator.Current).Volume);

Assert.IsFalse(fillForwardEnumerator.MoveNext());
Assert.AreEqual(!strictEndTimes, fillForwardEnumerator.MoveNext());
fillForwardEnumerator.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void HandlesSymbolMapping()
MappedSymbol = Symbols.Fut_SPY_Feb19_2016.ID.ToString()
};

var compositeDataQueueHandler = new TestDataQueueHandlerManager();
var compositeDataQueueHandler = new TestDataQueueHandlerManager(new AlgorithmSettings());
compositeDataQueueHandler.ExposedDataHandlers.Add(dataQueue);
var data = new LiveSubscriptionEnumerator(config, compositeDataQueueHandler, (_, _) => {}, (_) => false);

Expand Down Expand Up @@ -108,6 +108,9 @@ public void Dispose()
private class TestDataQueueHandlerManager : DataQueueHandlerManager
{
public List<IDataQueueHandler> ExposedDataHandlers => DataHandlers;
public TestDataQueueHandlerManager(IAlgorithmSettings settings) : base(settings)
{
}
}
}
}
66 changes: 49 additions & 17 deletions Tests/Engine/DataFeeds/LiveTradingDataFeedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,38 +86,66 @@ public void TearDown()
_dataQueueHandler?.DisposeSafely();
}

[TestCase(false)]
[TestCase(true)]
public void StreamsDailyData(bool strictEndTimes)
[TestCase(false, true)]
[TestCase(true, true)]
[TestCase(false, false)]
[TestCase(true, false)]
public void StreamsDailyData(bool strictEndTimes, bool warmup)
{
_startDate = new DateTime(2014, 3, 27, 10, 0, 0);
_algorithm.Settings.DailyStrictEndTimeEnabled = strictEndTimes;

_startDate = new DateTime(2024, 5, 15, 14, 0, 0);
_algorithm.SetStartDate(_startDate);
_manualTimeProvider.SetCurrentTimeUtc(_startDate.ConvertToUtc(TimeZones.NewYork));
var endDate = _startDate.AddDays(1);

_algorithm.SetBenchmark(x => 1);
if (warmup)
{
_algorithm.SetWarmUp(TimeSpan.FromDays(2));
}
var feed = RunDataFeed();
_algorithm.AddEquity("SPY", Resolution.Daily);
_algorithm.OnEndOfTimeStep();

DateTime _emissionTime = default;
var emittedData = false;
ConsumeBridge(feed, TimeSpan.FromSeconds(500), true, ts =>
List<DateTime> emittedDataTime = new();
List<BaseData> emittedData = new();
ConsumeBridge(feed, TimeSpan.FromSeconds(5), true, ts =>
{
if (ts.Slice.HasData)
{
_emissionTime = _manualTimeProvider.GetUtcNow().ConvertFromUtc(TimeZones.NewYork);
emittedData = true;
var data = ts.Slice[Symbols.SPY];
if (data == null)
{
return;
}
emittedDataTime.Add(_algorithm.Time);
emittedData.Add(data);
// we got what we wanted shortcut unit test
_manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime);
if (warmup && emittedData.Count == 3 || !warmup && emittedData.Count == 1)
{
// short cut
_manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime);
}
}
}, endDate: endDate,
secondsTimeStep: 60);
secondsTimeStep: 60 * 60);

Assert.IsTrue(emittedData);
Assert.AreEqual(strictEndTimes ? _startDate.Date.AddHours(16) : _startDate.Date.AddDays(1), _emissionTime);
for (var i = 0; i < emittedDataTime.Count; i++)
{
Assert.AreEqual(emittedDataTime[i], emittedData[i].EndTime);
}

if (warmup)
{
Assert.AreEqual(3, emittedData.Count);
Assert.AreEqual(strictEndTimes ? _startDate.Date.AddDays(-2).AddHours(16) : _startDate.Date.AddDays(-1), emittedData[0].EndTime);
Assert.AreEqual(strictEndTimes ? _startDate.Date.AddDays(-1).AddHours(16) : _startDate.Date, emittedData[1].EndTime);
}
else
{
Assert.AreEqual(1, emittedData.Count);
}
Assert.AreEqual(strictEndTimes ? _startDate.Date.AddHours(16) : _startDate.Date.AddDays(1), emittedData.Last().EndTime);
}

[TestCase(SecurityType.Option, Resolution.Daily, 0, true)]
Expand Down Expand Up @@ -1959,7 +1987,7 @@ public void UniverseDataIsHoldUntilTimeIsRight(string dateTime, Type universeDat
[Test]
public void CustomUniverseFineFundamentalDataGetsPipedCorrectly()
{
_startDate = new DateTime(2014, 10, 07);
_startDate = new DateTime(2014, 10, 07, 15, 0, 0);
_manualTimeProvider.SetCurrentTimeUtc(_startDate);

// we use test ConstituentsUniverse, we have daily data for it
Expand Down Expand Up @@ -2000,7 +2028,7 @@ public void CustomUniverseFineFundamentalDataGetsPipedCorrectly()
// short cut unit test
_manualTimeProvider.SetCurrentTimeUtc(Time.EndOfTime);
}
}, secondsTimeStep: 60 * 60 * 4, // 6 hour time step
}, secondsTimeStep: 60 * 60,
alwaysInvoke: true,
sendUniverseData: true,
endDate:_startDate.AddDays(10));
Expand Down Expand Up @@ -2685,6 +2713,10 @@ public void DoesNotAggregateTicksToTradeBar()
return fdqh.SubscriptionDataConfigs.Where(config => !_algorithm.UniverseManager.ContainsKey(config.Symbol)) // its not a universe
.SelectMany(config =>
{
if (_algorithm.IsWarmingUp)
{
return Enumerable.Empty<Tick>();
}
var ticks = new List<Tick>
{
new Tick(tickTimeUtc.ConvertFromUtc(config.ExchangeTimeZone), config.Symbol, 1, 2)
Expand Down

0 comments on commit b560e87

Please sign in to comment.