Permalink
Switch branches/tags
algo-timeout-stacktrace alpaca-brokerage alpha-value-monthly bitfinex bug-337-add-support-for-python-project-directories bug-2187-remove-framework-pop-avg-score-warmup-period bug-2189-parralelize-fine-fundamental-reading bug-2288-pythonnet-memory-leak bug-2357-fix-insight-close-time bug-2381-daily-benchmark-for-backtesting bug-2381-setbenchmark-fix bug-2504-ib-always-restore-data-subscriptions bug-2513-performcashsync-once bug-2532-apply-splits-in-live-mode bug-2541-live-mode-always-apply-splits-never-apply-dividends bug-2569-apply-dividends-in-live-paper bug-2569-double-dividend-application bug-2611-live-trading-sync-algorithm-status-update bug-2762-market-order-fills-stale-prices bugfix-insight-close-time crypto-symbol-length debugging-api-logging desktop-mk-ii docker-file-lean-foundation-updates feature-452-net-core feature-1040-object-store feature-1093-vwap-order-type feature-1418-buying-power-order-fee-contexts feature-1418-fee-model-units feature-2003-kraken-exchange feature-2047-split-dividend-api feature-2060-multi-leg-currency-conversion feature-2068-refactor-regression-test-suite feature-2271-IRegressionAlgorithmDefinition-CanRunLocally feature-2378-fix-stream-reader-disposal feature-2378-generator-factors feature-2378-live-factor-files feature-2378-minor-split-dividend-fixes feature-2378-split-dividend-improvements feature-2581-multiple-risk-models feature-timestamped-packets feature/1418-fee-model-context feature/2606-custom-brokerage-message-handler features-1998-2219-portfolio-implementation-v2 fix-python-algorithm-loading fix-vix-futures-scale-factor fsdf-thread-count-logging ib-restart-handler-fix ibrokeragemodel-isshortsellingallowed live-test-move-addsubscription-removesubscription log-splits-dividends master multiple-brokerages quandl-live-extra-logging refactor-1418-buying-power-model-context refactor-2491-livetradingdatafeed-will-use-subscriptionsynchronizer refactor-2567-fill-fee-model-invocation refactor-remove-job-packet release-engine-test smarter-live-chart-subscriptions symbol-tostring-no-subscription test-insight-scoring tick_not_decimal track-all-security-subscriptions trade-crypto-history-requests tweak-2530-log-split-dividend-prices tweak-configure-await-extension tweak-improve-map-file-read-time tweak-make-InsightFromSerializedInsight-public tweak-minor-changes-splits-dividends
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
151 lines (130 sloc) 5.58 KB
/*
* QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
* Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Collections.Generic;
using System.Linq;
using com.fxcm.fix;
using com.fxcm.fix.pretrade;
using NodaTime;
using QuantConnect.Data;
using QuantConnect.Data.Market;
using QuantConnect.Logging;
using QuantConnect.Packets;
using QuantConnect.Securities;
namespace QuantConnect.Brokerages.Fxcm
{
/// <summary>
/// FXCM brokerage - implementation of IDataQueueHandler interface
/// </summary>
public partial class FxcmBrokerage
{
private readonly List<Tick> _ticks = new List<Tick>();
private readonly HashSet<Symbol> _subscribedSymbols = new HashSet<Symbol>();
#region IDataQueueHandler implementation
/// <summary>
/// Get the next ticks from the live trading data queue
/// </summary>
/// <returns>IEnumerable list of ticks since the last update.</returns>
public IEnumerable<BaseData> GetNextTicks()
{
lock (_ticks)
{
var copy = _ticks.ToArray();
_ticks.Clear();
return copy;
}
}
/// <summary>
/// Adds the specified symbols to the subscription
/// </summary>
/// <param name="job">Job we're subscribing for:</param>
/// <param name="symbols">The symbols to be added keyed by SecurityType</param>
public void Subscribe(LiveNodePacket job, IEnumerable<Symbol> symbols)
{
var symbolsToSubscribe = (from symbol in symbols
where !_subscribedSymbols.Contains(symbol) && CanSubscribe(symbol)
select symbol).ToList();
if (symbolsToSubscribe.Count == 0)
return;
Log.Trace("FxcmBrokerage.Subscribe(): {0}", string.Join(",", symbolsToSubscribe));
var request = new MarketDataRequest();
foreach (var symbol in symbolsToSubscribe)
{
TradingSecurity fxcmSecurity;
if (_fxcmInstruments.TryGetValue(_symbolMapper.GetBrokerageSymbol(symbol), out fxcmSecurity))
{
request.addRelatedSymbol(fxcmSecurity);
// cache exchange time zone for symbol
DateTimeZone exchangeTimeZone;
if (!_symbolExchangeTimeZones.TryGetValue(symbol, out exchangeTimeZone))
{
exchangeTimeZone = MarketHoursDatabase.FromDataFolder().GetExchangeHours(Market.FXCM, symbol, symbol.SecurityType).TimeZone;
_symbolExchangeTimeZones.Add(symbol, exchangeTimeZone);
}
}
}
request.setSubscriptionRequestType(SubscriptionRequestTypeFactory.SUBSCRIBE);
request.setMDEntryTypeSet(MarketDataRequest.MDENTRYTYPESET_ALL);
lock (_locker)
{
_gateway.sendMessage(request);
}
foreach (var symbol in symbolsToSubscribe)
{
_subscribedSymbols.Add(symbol);
}
}
/// <summary>
/// Removes the specified symbols to the subscription
/// </summary>
/// <param name="job">Job we're processing.</param>
/// <param name="symbols">The symbols to be removed keyed by SecurityType</param>
public void Unsubscribe(LiveNodePacket job, IEnumerable<Symbol> symbols)
{
var symbolsToUnsubscribe = (from symbol in symbols
where _subscribedSymbols.Contains(symbol)
select symbol).ToList();
if (symbolsToUnsubscribe.Count == 0)
return;
Log.Trace("FxcmBrokerage.Unsubscribe(): {0}", string.Join(",", symbolsToUnsubscribe));
var request = new MarketDataRequest();
foreach (var symbol in symbolsToUnsubscribe)
{
request.addRelatedSymbol(_fxcmInstruments[_symbolMapper.GetBrokerageSymbol(symbol)]);
}
request.setSubscriptionRequestType(SubscriptionRequestTypeFactory.UNSUBSCRIBE);
request.setMDEntryTypeSet(MarketDataRequest.MDENTRYTYPESET_ALL);
lock (_locker)
{
_gateway.sendMessage(request);
}
foreach (var symbol in symbolsToUnsubscribe)
{
_subscribedSymbols.Remove(symbol);
}
}
/// <summary>
/// Returns true if this brokerage supports the specified symbol
/// </summary>
private static bool CanSubscribe(Symbol symbol)
{
// ignore unsupported security types
if (symbol.ID.SecurityType != SecurityType.Forex && symbol.ID.SecurityType != SecurityType.Cfd)
return false;
// ignore universe symbols
return !symbol.Value.Contains("-UNIVERSE-");
}
#endregion
}
}