Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Brokerages/Paper/PaperBrokerage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ public class PaperBrokerage : BacktestingBrokerage
private DateTime _lastScanTime;
private readonly LiveNodePacket _job;

/// <summary>
/// Enables or disables concurrent processing of messages to and from the brokerage.
/// </summary>
public override bool ConcurrencyEnabled { get; set; } = true;

/// <summary>
/// Creates a new PaperBrokerage
/// </summary>
Expand Down
33 changes: 26 additions & 7 deletions Engine/TransactionHandlers/BacktestingTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class BacktestingTransactionHandler : BrokerageTransactionHandler
private BacktestingBrokerage _brokerage;
private IAlgorithm _algorithm;
private Delistings _lastestDelistings;
private bool _enableConcurrency;

/// <summary>
/// Gets current time UTC. This is here to facilitate testing
Expand All @@ -52,22 +53,29 @@ public override void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IRes
throw new ArgumentException("Brokerage must be of type BacktestingBrokerage for use wth the BacktestingTransactionHandler");
}

_brokerage = (BacktestingBrokerage) brokerage;
_brokerage = (BacktestingBrokerage)brokerage;
_algorithm = algorithm;
_enableConcurrency = _brokerage.ConcurrencyEnabled && _algorithm.LiveMode;

base.Initialize(algorithm, brokerage, resultHandler);

// non blocking implementation
_orderRequestQueues = new() { new BusyCollection<OrderRequest>() };
if (!_enableConcurrency)
{
// non blocking implementation
_orderRequestQueues = new() { new BusyCollection<OrderRequest>() };
}
}

/// <summary>
/// Processes all synchronous events that must take place before the next time loop for the algorithm
/// </summary>
public override void ProcessSynchronousEvents()
{
// we process pending order requests our selves
Run(0);
if (!_enableConcurrency)
{
// we process pending order requests our selves
Run(0);
}

base.ProcessSynchronousEvents();

Expand Down Expand Up @@ -97,6 +105,13 @@ public override void ProcessAsynchronousEvents()
/// <param name="ticket">The <see cref="OrderTicket"/> expecting to be submitted</param>
protected override void WaitForOrderSubmission(OrderTicket ticket)
{
if (_enableConcurrency)
{
// let the base class handle this
base.WaitForOrderSubmission(ticket);
return;
}

// we submit the order request our selves
Run(0);

Expand All @@ -114,9 +129,13 @@ protected override void WaitForOrderSubmission(OrderTicket ticket)
/// For backtesting order requests will be processed by the algorithm thread
/// sequentially at <see cref="WaitForOrderSubmission"/> and <see cref="ProcessSynchronousEvents"/>
/// </summary>
protected override void InitializeTransactionThread(int threadId)
protected override void InitializeTransactionThread()
{
// nop
if (_enableConcurrency)
{
// let the base class handle this
base.InitializeTransactionThread();
}
}
}
}
36 changes: 18 additions & 18 deletions Engine/TransactionHandlers/BrokerageTransactionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,14 +160,6 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
_brokerageIsBacktesting = brokerage is BacktestingBrokerage;
_algorithm = algorithm;

// multi threaded queue, used for live deployments
var processingThreadsCount = _brokerage.ConcurrencyEnabled
? Config.GetInt("maximum-transaction-threads", 4)
: 1;
_orderRequestQueues = Enumerable.Range(0, processingThreadsCount)
.Select(_ => new BusyBlockingCollection<OrderRequest>())
.ToList<IBusyCollection<OrderRequest>>();

_brokerage.OrdersStatusChanged += (sender, orderEvents) =>
{
HandleOrderEvents(orderEvents);
Expand Down Expand Up @@ -215,23 +207,31 @@ public virtual void Initialize(IAlgorithm algorithm, IBrokerage brokerage, IResu
: (_algorithm as AlgorithmPythonWrapper).SignalExport;

NewOrderEvent += (s, e) => _signalExport.OnOrderEvent(e);
InitializeTransactionThread(processingThreadsCount);
InitializeTransactionThread();
}

/// <summary>
/// Create and start the transaction thread, who will be in charge of processing
/// the order requests
/// </summary>
protected virtual void InitializeTransactionThread(int processingThreadsCount)
protected virtual void InitializeTransactionThread()
{
_processingThreads = Enumerable.Range(0, processingThreadsCount)
.Select(i =>
{
var thread = new Thread(() => Run(i)) { IsBackground = true, Name = $"Transaction Thread {i}" };
thread.Start();
return thread;
})
.ToList();
// multi threaded queue, used for live deployments
var processingThreadsCount = _brokerage.ConcurrencyEnabled
? Config.GetInt("maximum-transaction-threads", 4)
: 1;
_orderRequestQueues = new(processingThreadsCount);
_processingThreads = new(processingThreadsCount);
for (var i = 0; i < processingThreadsCount; i++)
{
_orderRequestQueues.Add(new BusyBlockingCollection<OrderRequest>());
var threadId = i; // avoid modified closure
_processingThreads.Add(new Thread(() => Run(threadId)) { IsBackground = true, Name = $"Transaction Thread {i}" });
}
foreach (var thread in _processingThreads)
{
thread.Start();
}
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion Launcher/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@
"data-feed-handler": "QuantConnect.Lean.Engine.DataFeeds.LiveTradingDataFeed",
"data-queue-handler": [ "QuantConnect.Lean.Engine.DataFeeds.Queues.LiveDataQueue" ],
"real-time-handler": "QuantConnect.Lean.Engine.RealTime.LiveTradingRealTimeHandler",
"transaction-handler": "QuantConnect.Lean.Engine.TransactionHandlers.BacktestingTransactionHandler"
"transaction-handler": "QuantConnect.Lean.Engine.TransactionHandlers.PaperBrokerageTransactionHandler"
},

// defines 'live-zerodha' environment
Expand Down
2 changes: 2 additions & 0 deletions Tests/Brokerages/Paper/PaperBrokerageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public void AppliesDividendsOnce()
realTime.Exit();
results.Exit();
Assert.AreEqual(initializedCash + dividend.Distribution, postDividendCash);

transactions.Exit();
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,15 @@
*/

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Moq;
using NUnit.Framework;
using QuantConnect.Brokerages;
using QuantConnect.Brokerages.Backtesting;
using QuantConnect.Brokerages.Paper;
using QuantConnect.Data.Market;
using QuantConnect.Lean.Engine.Results;
using QuantConnect.Lean.Engine.TransactionHandlers;
Expand Down Expand Up @@ -232,6 +236,168 @@ public void SendingNewOrderFromPartiallyFilledOnOrderEvent()
Assert.AreEqual(6, orderEventCalls);
}

[Test]
public void ProcessesOrdersInLivePaperTrading()
{
//Initializes the transaction handler
var transactionHandler = new BacktestingTransactionHandler();
using var brokerage = new PaperBrokerage(_algorithm, null);
_algorithm.SetLiveMode(true);
transactionHandler.Initialize(_algorithm, brokerage, new BacktestingResultHandler());

// Creates a market order
var security = _algorithm.Securities[Ticker];
var price = 1.12m;
security.SetMarketPrice(new Tick(DateTime.UtcNow.AddDays(-1), security.Symbol, price, price, price));
var reference = new DateTime(2025, 07, 03, 10, 0, 0);
var orderRequest = new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, 1000, 0, 0, 0, reference, "");
var orderRequest2 = new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, -1000, 0, 0, 0, reference.AddSeconds(1), "");
orderRequest.SetOrderId(1);
orderRequest2.SetOrderId(2);

// Mock the the order processor
var orderProcessorMock = new Mock<IOrderProcessor>();
orderProcessorMock.Setup(m => m.GetOrderTicket(It.Is<int>(i => i == 1))).Returns(new OrderTicket(_algorithm.Transactions, orderRequest));
orderProcessorMock.Setup(m => m.GetOrderTicket(It.Is<int>(i => i == 2))).Returns(new OrderTicket(_algorithm.Transactions, orderRequest2));
_algorithm.Transactions.SetOrderProcessor(orderProcessorMock.Object);

var allOrderEvents = new List<OrderEvent>();
using var eventsReceived = new AutoResetEvent(false);

brokerage.OrdersStatusChanged += (sender, orderEvents) =>
{
var orderEvent = orderEvents[0];
lock (allOrderEvents)
{
allOrderEvents.Add(orderEvent);
if (allOrderEvents.Count == 4)
{
eventsReceived.Set();
}
}

// Let's place another order before this one is filled
if (orderEvent.OrderId == 1 && orderEvent.Status == OrderStatus.Submitted)
{
var ticket2 = transactionHandler.Process(orderRequest2);
}

Log.Debug($"{orderEvent}");
};

var ticket = transactionHandler.Process(orderRequest);

if (!eventsReceived.WaitOne(10000))
{
Assert.Fail($"Did not receive all order events, received {allOrderEvents.Count} order events: {string.Join(", ", allOrderEvents)}");
}

Assert.IsTrue(orderRequest.Response.IsProcessed);
Assert.IsTrue(orderRequest.Response.IsSuccess);
Assert.AreEqual(OrderRequestStatus.Processed, orderRequest.Status);

Assert.IsTrue(orderRequest2.Response.IsProcessed);
Assert.IsTrue(orderRequest2.Response.IsSuccess);
Assert.AreEqual(OrderRequestStatus.Processed, orderRequest2.Status);

var order1 = transactionHandler.GetOrderById(1);
Assert.AreEqual(OrderStatus.Filled, order1.Status);

var order2 = transactionHandler.GetOrderById(2);
Assert.AreEqual(OrderStatus.Filled, order2.Status);

// 2 submitted and 2 filled
Assert.AreEqual(4, allOrderEvents.Count);

var firstOrderSubmittedEvent = allOrderEvents.FirstOrDefault(x => x.OrderId == 1 && x.Status == OrderStatus.Submitted);
Assert.IsNotNull(firstOrderSubmittedEvent);
var firstOrderFilledEvent = allOrderEvents.FirstOrDefault(x => x.OrderId == 1 && x.Status == OrderStatus.Filled);
Assert.IsNotNull(firstOrderFilledEvent);

var secondOrderSubmittedEvent = allOrderEvents.FirstOrDefault(x => x.OrderId == 2 && x.Status == OrderStatus.Submitted);
Assert.IsNotNull(secondOrderSubmittedEvent);
var secondOrderFilledEvent = allOrderEvents.FirstOrDefault(x => x.OrderId == 2 && x.Status == OrderStatus.Filled);
Assert.IsNotNull(secondOrderFilledEvent);

transactionHandler.Exit();
}

[Test]
public void ProcessesOrdersConcurrentlyInLivePaperTrading()
{
_algorithm.SetLiveMode(true);
using var brokerage = new PaperBrokerage(_algorithm, null);

const int expectedOrdersCount = 20;
using var finishedEvent = new ManualResetEventSlim(false);
var transactionHandler = new TestablePaperBrokerageTransactionHandler(expectedOrdersCount, finishedEvent);
transactionHandler.Initialize(_algorithm, brokerage, new BacktestingResultHandler());
_algorithm.Transactions.SetOrderProcessor(transactionHandler);

var security = (Security)_algorithm.AddEquity("SPY");
_algorithm.SetFinishedWarmingUp();

// Set up security
var reference = new DateTime(2025, 07, 03, 10, 0, 0);
security.SetMarketPrice(new Tick(reference, security.Symbol, 300, 300));

// Creates the order
var orderRequests = Enumerable.Range(0, expectedOrdersCount)
.Select(_ => new SubmitOrderRequest(OrderType.Market, security.Type, security.Symbol, 1000, 0, 0, 0, reference, ""))
.ToList();

// Act
for (var i = 0; i < orderRequests.Count; i++)
{
var orderRequest = orderRequests[i];
orderRequest.SetOrderId(i + 1);
transactionHandler.Process(orderRequest);
}

// Wait for all orders to be processed
Assert.IsTrue(finishedEvent.Wait(10000));
Assert.Greater(transactionHandler.ProcessingThreadNames.Count, 1);
CollectionAssert.AreEquivalent(orderRequests.Select(x => x.ToString()), transactionHandler.ProcessedRequests.Select(x => x.ToString()));

transactionHandler.Exit();
}

private class TestablePaperBrokerageTransactionHandler : BacktestingTransactionHandler
{
private readonly int _expectedOrdersCount;
private readonly ManualResetEventSlim _finishedEvent;
private int _currentOrdersCount;

public HashSet<string> ProcessingThreadNames = new();

public ConcurrentBag<OrderRequest> ProcessedRequests = new();

public TestablePaperBrokerageTransactionHandler(int expectedOrdersCount, ManualResetEventSlim finishedEvent)
{
_expectedOrdersCount = expectedOrdersCount;
_finishedEvent = finishedEvent;
}

public override void HandleOrderRequest(OrderRequest request)
{
base.HandleOrderRequest(request);

// Capture the thread name for debugging purposes
lock (ProcessingThreadNames)
{
ProcessingThreadNames.Add(Thread.CurrentThread.Name ?? Environment.CurrentManagedThreadId.ToStringInvariant());
}

ProcessedRequests.Add(request);

if (Interlocked.Increment(ref _currentOrdersCount) >= _expectedOrdersCount)
{
// Signal that we have processed the expected number of orders
_finishedEvent.Set();
}
}
}

internal class TestPartialFilledModel : IFillModel
{
public static Dictionary<int, Order> FilledOrders;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2665,9 +2665,9 @@ public DateTime GetLastSyncDate()
return _brokerage.LastSyncDateTimeUtc.ConvertFromUtc(TimeZones.NewYork);
}

protected override void InitializeTransactionThread(int threadId)
protected override void InitializeTransactionThread()
{
// nop
_orderRequestQueues = new() { new BusyCollection<OrderRequest>() };
}

public new void RoundOrderPrices(Order order, Security security)
Expand Down
8 changes: 8 additions & 0 deletions Tests/Engine/DataFeeds/InternalSubscriptionManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class InternalSubscriptionManagerTests
private IDataFeed _dataFeed;
private AggregationManager _aggregationManager;
private PaperBrokerage _paperBrokerage;
private ITransactionHandler _transactionHandler;

[SetUp]
public void Setup()
Expand All @@ -55,6 +56,7 @@ public void Setup()
[TearDown]
public void TearDown()
{
_transactionHandler.Exit();
_dataFeed.Exit();
_dataManager.RemoveAllSubscriptions();
_resultHandler.Exit();
Expand Down Expand Up @@ -425,6 +427,12 @@ private void SetupImpl(IDataQueueHandler dataQueueHandler, Synchronizer synchron
_paperBrokerage = new PaperBrokerage(_algorithm, new LiveNodePacket());
backtestingTransactionHandler.Initialize(_algorithm, _paperBrokerage, _resultHandler);
_algorithm.Transactions.SetOrderProcessor(backtestingTransactionHandler);

if (_transactionHandler != null)
{
_transactionHandler.Exit();
}
_transactionHandler = backtestingTransactionHandler;
}
private class TestAggregationManager : AggregationManager
{
Expand Down
Loading