From db0dfb39d05140db7565acdb05a771248e4b2202 Mon Sep 17 00:00:00 2001 From: StockSharp Dev1 Date: Thu, 5 Apr 2018 23:27:43 +0300 Subject: [PATCH] HistoryEmulationConnector. Fix external sources processing. --- Algo/Helper.cs | 26 --- Algo/Testing/HistoryEmulationConnector.cs | 196 ++---------------- Algo/Testing/HistoryMessageAdapter.cs | 72 +++++-- Localization/text.csv | 4 +- .../SampleHistoryTesting/MainWindow.xaml.cs | 30 +-- _ReleaseNotes/CHANGE_LOG_API.md | 1 + 6 files changed, 90 insertions(+), 239 deletions(-) diff --git a/Algo/Helper.cs b/Algo/Helper.cs index d04cdec8e4..7f56b371d0 100644 --- a/Algo/Helper.cs +++ b/Algo/Helper.cs @@ -66,32 +66,6 @@ public static Security CheckPriceStep(this Security security) return security; } - public static int ChangeSubscribers(this CachedSynchronizedDictionary subscribers, T subscriber, bool isSubscribe) - { - if (subscribers == null) - throw new ArgumentNullException(nameof(subscribers)); - - lock (subscribers.SyncRoot) - { - var value = subscribers.TryGetValue2(subscriber) ?? 0; - - if (isSubscribe) - value++; - else - { - if (value > 0) - value--; - } - - if (value > 0) - subscribers[subscriber] = value; - else - subscribers.Remove(subscriber); - - return value; - } - } - public static long GetTradeId(this ExecutionMessage message) { if (message == null) diff --git a/Algo/Testing/HistoryEmulationConnector.cs b/Algo/Testing/HistoryEmulationConnector.cs index c1efd3b310..be52b9e8ac 100644 --- a/Algo/Testing/HistoryEmulationConnector.cs +++ b/Algo/Testing/HistoryEmulationConnector.cs @@ -34,7 +34,7 @@ namespace StockSharp.Algo.Testing /// /// The emulation connection. It uses historical data and/or occasionally generated. /// - public class HistoryEmulationConnector : BaseEmulationConnector//, IExternalCandleSource + public class HistoryEmulationConnector : BaseEmulationConnector { private class EmulationEntityFactory : EntityFactory { @@ -187,10 +187,6 @@ public override IMessageChannel Clone() } } - //private readonly CachedSynchronizedDictionary, int> _subscribedCandles = new CachedSynchronizedDictionary, int>(); - private readonly CachedSynchronizedDictionary, int> _historySourceSubscriptions = new CachedSynchronizedDictionary, int>(); - //private readonly SynchronizedDictionary, CachedSynchronizedSet> _series = new SynchronizedDictionary, CachedSynchronizedSet>(); - /// /// Initializes a new instance of the . /// @@ -309,23 +305,23 @@ private set switch (value) { case EmulationStates.Stopped: - throwError = (_state != EmulationStates.Stopping); + throwError = _state != EmulationStates.Stopping; break; case EmulationStates.Stopping: - throwError = (_state != EmulationStates.Started && _state != EmulationStates.Suspended - && State != EmulationStates.Starting); // при ошибках при запуске эмуляции состояние может быть Starting + throwError = _state != EmulationStates.Started && _state != EmulationStates.Suspended + && State != EmulationStates.Starting; // при ошибках при запуске эмуляции состояние может быть Starting break; case EmulationStates.Starting: - throwError = (_state != EmulationStates.Stopped && _state != EmulationStates.Suspended); + throwError = _state != EmulationStates.Stopped && _state != EmulationStates.Suspended; break; case EmulationStates.Started: - throwError = (_state != EmulationStates.Starting); + throwError = _state != EmulationStates.Starting; break; case EmulationStates.Suspending: - throwError = (_state != EmulationStates.Started); + throwError = _state != EmulationStates.Started; break; case EmulationStates.Suspended: - throwError = (_state != EmulationStates.Suspending); + throwError = _state != EmulationStates.Suspending; break; default: throw new ArgumentOutOfRangeException(nameof(value), value, LocalizedStrings.Str1219); @@ -364,11 +360,6 @@ public override TimeSpan MarketTimeChangedInterval set => HistoryMessageAdapter.MarketTimeChangedInterval = value; } - ///// - ///// To enable the possibility to give out candles directly into . It accelerates operation, but candle change events will not be available. By default it is disabled. - ///// - //public bool UseExternalCandleSource { get; set; } - /// /// Clear cache. /// @@ -377,7 +368,6 @@ public override void ClearCache() base.ClearCache(); //_series.Clear(); - _historySourceSubscriptions.Clear(); //_subscribedCandles.Clear(); IsFinished = false; @@ -539,54 +529,6 @@ private void SendPortfolio(Portfolio portfolio) .Add(PositionChangeTypes.BlockedValue, 0m)); } - //private void InitOrderLogBuilders(DateTime loadDate) - //{ - // if (StorageRegistry == null || !MarketEmulator.Settings.UseMarketDepth) - // return; - - // foreach (var security in RegisteredMarketDepths) - // { - // var builder = _orderLogBuilders.TryGetValue(security); - - // if (builder == null) - // continue; - - // // стакан из ОЛ строиться начиная с 18.45 предыдущей торговой сессии - // var olDate = loadDate.Date; - - // do - // { - // olDate -= TimeSpan.FromDays(1); - // } - // while (!ExchangeBoard.Forts.WorkingTime.IsTradeDate(olDate)); - - // olDate += new TimeSpan(18, 45, 0); - - // foreach (var item in StorageRegistry.GetOrderLogStorage(security, Drive).Load(olDate, loadDate - TimeSpan.FromTicks(1))) - // { - // builder.Update(item); - // } - // } - //} - - ///// - ///// Найти инструменты, соответствующие фильтру . - ///// - ///// Инструмент, поля которого будут использоваться в качестве фильтра. - ///// Найденные инструменты. - //public override IEnumerable Lookup(Security criteria) - //{ - // var securities = _historyAdapter.SecurityProvider.Lookup(criteria); - - // if (State == EmulationStates.Started) - // { - // foreach (var security in securities) - // SendSecurity(security); - // } - - // return securities; - //} - /// /// Subscribe on the portfolio changes. /// @@ -600,9 +542,9 @@ protected override void OnRegisterPortfolio(Portfolio portfolio) } /// - /// Register historical data. + /// Register historical data source. /// - /// Instrument. + /// Instrument. If passed the source will be applied for all subscriptions. /// Data type. /// The parameter associated with the type. For example, . /// Historical data source. @@ -612,9 +554,9 @@ public void RegisterHistorySource(Security security, MarketDataTypes dataType, o } /// - /// Unregister the subscription, previously made by . + /// Unregister historical data source, previously registeted by . /// - /// Instrument. + /// Instrument. If passed the source will be removed for all subscriptions. /// Data type. /// The parameter associated with the type. For example, . public void UnRegisterHistorySource(Security security, MarketDataTypes dataType, object arg) @@ -624,124 +566,14 @@ public void UnRegisterHistorySource(Security security, MarketDataTypes dataType, private void SendInHistorySourceMessage(Security security, MarketDataTypes dataType, object arg, Func> getMessages) { - var isSubscribe = getMessages != null; - - if (isSubscribe) - { - if (_historySourceSubscriptions.ChangeSubscribers(Tuple.Create(security.ToSecurityId(), dataType, arg), true) != 1) - return; - } - else - { - if (_historySourceSubscriptions.ChangeSubscribers(Tuple.Create(security.ToSecurityId(), dataType, arg), false) != 0) - return; - } - SendInMessage(new HistorySourceMessage { - IsSubscribe = isSubscribe, - SecurityId = security.ToSecurityId(), + IsSubscribe = getMessages != null, + SecurityId = security?.ToSecurityId() ?? default(SecurityId), DataType = dataType, Arg = arg, GetMessages = getMessages }); } - - #region IExternalCandleSource - - //IEnumerable> IExternalCandleSource.GetSupportedRanges(CandleSeries series) - //{ - // if (!UseExternalCandleSource) - // yield break; - - // var securityId = series.Security.ToSecurityId(); - // var messageType = series.CandleType.ToCandleMessageType(); - // var dataType = messageType.ToCandleMarketDataType(); - - // if (_historySourceSubscriptions.ContainsKey(Tuple.Create(securityId, dataType, series.Arg))) - // { - // yield return new Range(DateTimeOffset.MinValue, DateTimeOffset.MaxValue); - // yield break; - // } - - // var types = HistoryMessageAdapter.Drive.GetAvailableDataTypes(securityId, HistoryMessageAdapter.StorageFormat); - - // foreach (var tuple in types) - // { - // if (tuple.MessageType != messageType || !tuple.Arg.Equals(series.Arg)) - // continue; - - // var dates = HistoryMessageAdapter.StorageRegistry.GetCandleMessageStorage(tuple.MessageType, series.Security, series.Arg, HistoryMessageAdapter.Drive, HistoryMessageAdapter.StorageFormat).Dates.ToArray(); - - // if (dates.Any()) - // yield return new Range(dates.First().ApplyTimeZone(TimeZoneInfo.Utc), dates.Last().ApplyTimeZone(TimeZoneInfo.Utc)); - - // break; - // } - //} - - //private Action> _newCandles; - - //event Action> IExternalCandleSource.NewCandles - //{ - // add { _newCandles += value; } - // remove { _newCandles -= value; } - //} - - //private Action _stopped; - - //event Action IExternalCandleSource.Stopped - //{ - // add { _stopped += value; } - // remove { _stopped -= value; } - //} - - //void IExternalCandleSource.SubscribeCandles(CandleSeries series, DateTimeOffset from, DateTimeOffset to) - //{ - // var securityId = GetSecurityId(series.Security); - // var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType(); - // var key = Tuple.Create(securityId, dataType, series.Arg); - - // _series.SafeAdd(key).Add(series); - - // if (!_historySourceSubscriptions.ContainsKey(key)) - // { - // if (_subscribedCandles.ChangeSubscribers(key, true) != 1) - // return; - - // MarketDataAdapter.SendInMessage(new MarketDataMessage - // { - // //SecurityId = securityId, - // DataType = dataType, - // Arg = series.Arg, - // IsSubscribe = true, - // }.FillSecurityInfo(this, series.Security)); - // } - //} - - //void IExternalCandleSource.UnSubscribeCandles(CandleSeries series) - //{ - // var securityId = GetSecurityId(series.Security); - // var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType(); - // var key = Tuple.Create(securityId, dataType, series.Arg); - - // _series.SafeAdd(key).Remove(series); - - // if (!_historySourceSubscriptions.ContainsKey(key)) - // { - // if (_subscribedCandles.ChangeSubscribers(key, false) != 0) - // return; - - // MarketDataAdapter.SendInMessage(new MarketDataMessage - // { - // //SecurityId = securityId, - // DataType = MarketDataTypes.CandleTimeFrame, - // Arg = series.Arg, - // IsSubscribe = false, - // }.FillSecurityInfo(this, series.Security)); - // } - //} - - #endregion } } \ No newline at end of file diff --git a/Algo/Testing/HistoryMessageAdapter.cs b/Algo/Testing/HistoryMessageAdapter.cs index 38eeb757a4..4233f81edd 100644 --- a/Algo/Testing/HistoryMessageAdapter.cs +++ b/Algo/Testing/HistoryMessageAdapter.cs @@ -19,6 +19,7 @@ namespace StockSharp.Algo.Testing using System.Collections.Generic; using System.Linq; + using Ecng.Collections; using Ecng.Common; using MoreLinq; @@ -34,6 +35,7 @@ namespace StockSharp.Algo.Testing public class HistoryMessageAdapter : MessageAdapter { private readonly HashSet> _generators = new HashSet>(); + private readonly Dictionary, Func>> _historySources = new Dictionary, Func>>(); private bool _isSuspended; private bool _isStarted; @@ -245,7 +247,10 @@ protected override void OnSendInMessage(Message message) { _isSuspended = false; _currentTime = default(DateTimeOffset); + _generators.Clear(); + _historySources.Clear(); + BasketStorage.Reset(); LoadedMessageCount = 0; @@ -262,7 +267,7 @@ protected override void OnSendInMessage(Message message) throw new InvalidOperationException(LocalizedStrings.Str1116); SendOutMessage(new ConnectMessage { LocalTime = StartDate }); - return; + break; } case MessageTypes.Disconnect: @@ -277,7 +282,8 @@ protected override void OnSendInMessage(Message message) BasketStorage.Reset(); _isStarted = false; - return; + + break; } case MessageTypes.SecurityLookup: @@ -306,15 +312,29 @@ protected override void OnSendInMessage(Message message) SendOutMessage(new SecurityLookupResultMessage { OriginalTransactionId = lookupMsg.TransactionId }); - return; + break; } case MessageTypes.MarketData: - case ExtendedMessageTypes.HistorySource: ProcessMarketDataMessage((MarketDataMessage)message); - return; + break; + + case ExtendedMessageTypes.HistorySource: + { + var sourceMsg = (HistorySourceMessage)message; + + var key = Tuple.Create(sourceMsg.SecurityId, sourceMsg.DataType, sourceMsg.Arg); + + if (sourceMsg.IsSubscribe) + _historySources[key] = sourceMsg.GetMessages; + else + _historySources.Remove(key); + + break; + } case ExtendedMessageTypes.EmulationState: + { var stateMsg = (EmulationStateMessage)message; var isSuspended = false; @@ -349,7 +369,8 @@ protected override void OnSendInMessage(Message message) if (isSuspended) SendOutMessage(new EmulationStateMessage { State = EmulationStates.Suspended }); - return; + break; + } case ExtendedMessageTypes.Generator: { @@ -385,7 +406,15 @@ private void ProcessMarketDataMessage(MarketDataMessage message) return; } - var history = message as HistorySourceMessage; + Func> GetHistorySource() + { + Func> GetHistorySource2(SecurityId s) + { + return _historySources.TryGetValue(Tuple.Create(s, message.DataType, message.Arg)); + } + + return GetHistorySource2(message.SecurityId) ?? GetHistorySource2(default(SecurityId)); + } Exception error = null; @@ -398,7 +427,9 @@ private void ProcessMarketDataMessage(MarketDataMessage message) if (message.IsSubscribe) { - if (history == null) + var historySource = GetHistorySource(); + + if (historySource == null) { BasketStorage.AddStorage(StorageRegistry.GetLevel1MessageStorage(security, Drive, StorageFormat)); @@ -414,7 +445,7 @@ private void ProcessMarketDataMessage(MarketDataMessage message) } else { - BasketStorage.AddStorage(new InMemoryMarketDataStorage(security, null, history.GetMessages)); + BasketStorage.AddStorage(new InMemoryMarketDataStorage(security, null, historySource)); } } else @@ -433,9 +464,11 @@ private void ProcessMarketDataMessage(MarketDataMessage message) if (message.IsSubscribe) { - BasketStorage.AddStorage(history == null + var historySource = GetHistorySource(); + + BasketStorage.AddStorage(historySource == null ? StorageRegistry.GetQuoteMessageStorage(security, Drive, StorageFormat) - : new InMemoryMarketDataStorage(security, null, history.GetMessages)); + : new InMemoryMarketDataStorage(security, null, historySource)); } else BasketStorage.RemoveStorage>(security, MessageTypes.QuoteChange, null); @@ -450,9 +483,11 @@ private void ProcessMarketDataMessage(MarketDataMessage message) if (message.IsSubscribe) { - BasketStorage.AddStorage(history == null + var historySource = GetHistorySource(); + + BasketStorage.AddStorage(historySource == null ? StorageRegistry.GetTickMessageStorage(security, Drive, StorageFormat) - : new InMemoryMarketDataStorage(security, null, history.GetMessages)); + : new InMemoryMarketDataStorage(security, null, historySource)); } else BasketStorage.RemoveStorage>(security, MessageTypes.Execution, ExecutionTypes.Tick); @@ -467,9 +502,11 @@ private void ProcessMarketDataMessage(MarketDataMessage message) if (message.IsSubscribe) { - BasketStorage.AddStorage(history == null + var historySource = GetHistorySource(); + + BasketStorage.AddStorage(historySource == null ? StorageRegistry.GetOrderLogMessageStorage(security, Drive, StorageFormat) - : new InMemoryMarketDataStorage(security, null, history.GetMessages)); + : new InMemoryMarketDataStorage(security, null, historySource)); } else BasketStorage.RemoveStorage>(security, MessageTypes.Execution, ExecutionTypes.OrderLog); @@ -496,11 +533,12 @@ private void ProcessMarketDataMessage(MarketDataMessage message) if (message.IsSubscribe) { + var historySource = GetHistorySource(); var candleType = message.DataType.ToCandleMessage(); - BasketStorage.AddStorage(history == null + BasketStorage.AddStorage(historySource == null ? StorageRegistry.GetCandleMessageStorage(candleType, security, message.Arg, Drive, StorageFormat) - : new InMemoryMarketDataStorage(security, message.Arg, history.GetMessages, candleType)); + : new InMemoryMarketDataStorage(security, message.Arg, historySource, candleType)); } else BasketStorage.RemoveStorage>(security, msgType, message.Arg); diff --git a/Localization/text.csv b/Localization/text.csv index 2e7d72a14d..18fd54dd6e 100644 --- a/Localization/text.csv +++ b/Localization/text.csv @@ -13260,9 +13260,9 @@ DocStr8666;Bits flag;Битовый флаг DocStr8667;Extract system attribute from the bits flag;Получить системные атрибуты из флага DocStr8668;{0} if an order is system, otherwise, {1};{0} если заявка системная, иначе, {1} DocStr8669;Instrument;Инструмент -DocStr8670;Register historical data;Подписаться на исторические данные +DocStr8670;Register historical data source;Зарегистрировать источник исторических данных DocStr8672;Historical data source;Источник исторических данных -DocStr8673;Unregister the subscription, previously made by {0};Удалить подписку, сделанную ранее через {0} +DocStr8673;Unregister historical data source, previously registeted by {0};Удалить источник исторических данных, ранее добавленный через {0} DocStr8674;Candle type;Тип свечи DocStr8675;Message type {0};Тип сообщения {0} DocStr8677;The candle message type;Тип сообщения свечи diff --git a/Samples/Testing/SampleHistoryTesting/MainWindow.xaml.cs b/Samples/Testing/SampleHistoryTesting/MainWindow.xaml.cs index 406a583a18..b7f68cac18 100644 --- a/Samples/Testing/SampleHistoryTesting/MainWindow.xaml.cs +++ b/Samples/Testing/SampleHistoryTesting/MainWindow.xaml.cs @@ -479,37 +479,43 @@ private void StartBtnClick(object sender, RoutedEventArgs e) logManager.Sources.Add(strategy); - connector.NewSecurity += s => + connector.Connected += () => { - if (s != security) - return; - - // fill level1 values - connector.HistoryMessageAdapter.SendOutMessage(level1Info); - if (emulationInfo.HistorySource != null) { + // passing null value as security to register the source for all securities + if (emulationInfo.UseCandleTimeFrame != null) { - connector.RegisterHistorySource(security, MarketDataTypes.CandleTimeFrame, emulationInfo.UseCandleTimeFrame.Value, emulationInfo.HistorySource); + connector.RegisterHistorySource(null, MarketDataTypes.CandleTimeFrame, emulationInfo.UseCandleTimeFrame.Value, emulationInfo.HistorySource); } if (emulationInfo.UseTicks) { - connector.RegisterHistorySource(security, MarketDataTypes.Trades, null, emulationInfo.HistorySource); + connector.RegisterHistorySource(null, MarketDataTypes.Trades, null, emulationInfo.HistorySource); } if (emulationInfo.UseLevel1) { - connector.RegisterHistorySource(security, MarketDataTypes.Level1, null, emulationInfo.HistorySource); + connector.RegisterHistorySource(null, MarketDataTypes.Level1, null, emulationInfo.HistorySource); } if (emulationInfo.UseMarketDepth) { - connector.RegisterHistorySource(security, MarketDataTypes.MarketDepth, null, emulationInfo.HistorySource); + connector.RegisterHistorySource(null, MarketDataTypes.MarketDepth, null, emulationInfo.HistorySource); } } - else + }; + + connector.NewSecurity += s => + { + if (s != security) + return; + + // fill level1 values + connector.HistoryMessageAdapter.SendOutMessage(level1Info); + + if (emulationInfo.HistorySource == null) { if (emulationInfo.UseMarketDepth) { diff --git a/_ReleaseNotes/CHANGE_LOG_API.md b/_ReleaseNotes/CHANGE_LOG_API.md index eb07bd1a3f..690e03c6a8 100644 --- a/_ReleaseNotes/CHANGE_LOG_API.md +++ b/_ReleaseNotes/CHANGE_LOG_API.md @@ -19,6 +19,7 @@ StockSharp API Change log * (bug) AlfaDirect, Transaq. PortfolioMessage processing fixes. * (bug) Charting. OptionPositionChart. Legend binding fix. * (bug) Charting. Options charts theme binding fix. +* (bug) HistoryEmulationConnector. Fix external sources processing. ## v4.4.6: * (feature) Binance, Liqui, CEX.IO, Cryptopia, OKEx, BitMEX, YoBit, Livecoin, EXMO, Deribit, Huobi, Kucoin, BITEXBOOK, CoinExchange stubs.