Skip to content

Commit

Permalink
HistoryEmulationConnector. Fix external sources processing.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikasoukhov committed Apr 5, 2018
1 parent 2ec4de1 commit db0dfb3
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 239 deletions.
26 changes: 0 additions & 26 deletions Algo/Helper.cs
Expand Up @@ -66,32 +66,6 @@ public static Security CheckPriceStep(this Security security)
return security;
}

public static int ChangeSubscribers<T>(this CachedSynchronizedDictionary<T, int> subscribers, T subscriber, bool isSubscribe)
{
if (subscribers == null)
throw new ArgumentNullException(nameof(subscribers));

lock (subscribers.SyncRoot)
{
var value = subscribers.TryGetValue2(subscriber) ?? 0;

if (isSubscribe)
value++;
else
{
if (value > 0)
value--;
}

if (value > 0)
subscribers[subscriber] = value;
else
subscribers.Remove(subscriber);

return value;
}
}

public static long GetTradeId(this ExecutionMessage message)
{
if (message == null)
Expand Down
196 changes: 14 additions & 182 deletions Algo/Testing/HistoryEmulationConnector.cs
Expand Up @@ -34,7 +34,7 @@ namespace StockSharp.Algo.Testing
/// <summary>
/// The emulation connection. It uses historical data and/or occasionally generated.
/// </summary>
public class HistoryEmulationConnector : BaseEmulationConnector//, IExternalCandleSource
public class HistoryEmulationConnector : BaseEmulationConnector
{
private class EmulationEntityFactory : EntityFactory
{
Expand Down Expand Up @@ -187,10 +187,6 @@ public override IMessageChannel Clone()
}
}

//private readonly CachedSynchronizedDictionary<Tuple<SecurityId, MarketDataTypes, object>, int> _subscribedCandles = new CachedSynchronizedDictionary<Tuple<SecurityId, MarketDataTypes, object>, int>();
private readonly CachedSynchronizedDictionary<Tuple<SecurityId, MarketDataTypes, object>, int> _historySourceSubscriptions = new CachedSynchronizedDictionary<Tuple<SecurityId, MarketDataTypes, object>, int>();
//private readonly SynchronizedDictionary<Tuple<SecurityId, MarketDataTypes, object>, CachedSynchronizedSet<CandleSeries>> _series = new SynchronizedDictionary<Tuple<SecurityId, MarketDataTypes, object>, CachedSynchronizedSet<CandleSeries>>();

/// <summary>
/// Initializes a new instance of the <see cref="HistoryEmulationConnector"/>.
/// </summary>
Expand Down Expand Up @@ -309,23 +305,23 @@ private set
switch (value)
{
case EmulationStates.Stopped:
throwError = (_state != EmulationStates.Stopping);
throwError = _state != EmulationStates.Stopping;
break;
case EmulationStates.Stopping:
throwError = (_state != EmulationStates.Started && _state != EmulationStates.Suspended
&& State != EmulationStates.Starting); // при ошибках при запуске эмуляции состояние может быть Starting
throwError = _state != EmulationStates.Started && _state != EmulationStates.Suspended
&& State != EmulationStates.Starting; // при ошибках при запуске эмуляции состояние может быть Starting
break;
case EmulationStates.Starting:
throwError = (_state != EmulationStates.Stopped && _state != EmulationStates.Suspended);
throwError = _state != EmulationStates.Stopped && _state != EmulationStates.Suspended;
break;
case EmulationStates.Started:
throwError = (_state != EmulationStates.Starting);
throwError = _state != EmulationStates.Starting;
break;
case EmulationStates.Suspending:
throwError = (_state != EmulationStates.Started);
throwError = _state != EmulationStates.Started;
break;
case EmulationStates.Suspended:
throwError = (_state != EmulationStates.Suspending);
throwError = _state != EmulationStates.Suspending;
break;
default:
throw new ArgumentOutOfRangeException(nameof(value), value, LocalizedStrings.Str1219);
Expand Down Expand Up @@ -364,11 +360,6 @@ public override TimeSpan MarketTimeChangedInterval
set => HistoryMessageAdapter.MarketTimeChangedInterval = value;
}

///// <summary>
///// To enable the possibility to give out candles directly into <see cref="ICandleManager"/>. It accelerates operation, but candle change events will not be available. By default it is disabled.
///// </summary>
//public bool UseExternalCandleSource { get; set; }

/// <summary>
/// Clear cache.
/// </summary>
Expand All @@ -377,7 +368,6 @@ public override void ClearCache()
base.ClearCache();

//_series.Clear();
_historySourceSubscriptions.Clear();
//_subscribedCandles.Clear();

IsFinished = false;
Expand Down Expand Up @@ -539,54 +529,6 @@ private void SendPortfolio(Portfolio portfolio)
.Add(PositionChangeTypes.BlockedValue, 0m));
}

//private void InitOrderLogBuilders(DateTime loadDate)
//{
// if (StorageRegistry == null || !MarketEmulator.Settings.UseMarketDepth)
// return;

// foreach (var security in RegisteredMarketDepths)
// {
// var builder = _orderLogBuilders.TryGetValue(security);

// if (builder == null)
// continue;

// // стакан из ОЛ строиться начиная с 18.45 предыдущей торговой сессии
// var olDate = loadDate.Date;

// do
// {
// olDate -= TimeSpan.FromDays(1);
// }
// while (!ExchangeBoard.Forts.WorkingTime.IsTradeDate(olDate));

// olDate += new TimeSpan(18, 45, 0);

// foreach (var item in StorageRegistry.GetOrderLogStorage(security, Drive).Load(olDate, loadDate - TimeSpan.FromTicks(1)))
// {
// builder.Update(item);
// }
// }
//}

///// <summary>
///// Найти инструменты, соответствующие фильтру <paramref name="criteria"/>.
///// </summary>
///// <param name="criteria">Инструмент, поля которого будут использоваться в качестве фильтра.</param>
///// <returns>Найденные инструменты.</returns>
//public override IEnumerable<Security> Lookup(Security criteria)
//{
// var securities = _historyAdapter.SecurityProvider.Lookup(criteria);

// if (State == EmulationStates.Started)
// {
// foreach (var security in securities)
// SendSecurity(security);
// }

// return securities;
//}

/// <summary>
/// Subscribe on the portfolio changes.
/// </summary>
Expand All @@ -600,9 +542,9 @@ protected override void OnRegisterPortfolio(Portfolio portfolio)
}

/// <summary>
/// Register historical data.
/// Register historical data source.
/// </summary>
/// <param name="security">Instrument.</param>
/// <param name="security">Instrument. If passed <see langword="null"/> the source will be applied for all subscriptions.</param>
/// <param name="dataType">Data type.</param>
/// <param name="arg">The parameter associated with the <paramref name="dataType"/> type. For example, <see cref="CandleMessage.Arg"/>.</param>
/// <param name="getMessages">Historical data source.</param>
Expand All @@ -612,9 +554,9 @@ public void RegisterHistorySource(Security security, MarketDataTypes dataType, o
}

/// <summary>
/// Unregister the subscription, previously made by <see cref="RegisterHistorySource"/>.
/// Unregister historical data source, previously registeted by <see cref="RegisterHistorySource"/>.
/// </summary>
/// <param name="security">Instrument.</param>
/// <param name="security">Instrument. If passed <see langword="null"/> the source will be removed for all subscriptions.</param>
/// <param name="dataType">Data type.</param>
/// <param name="arg">The parameter associated with the <paramref name="dataType"/> type. For example, <see cref="CandleMessage.Arg"/>.</param>
public void UnRegisterHistorySource(Security security, MarketDataTypes dataType, object arg)
Expand All @@ -624,124 +566,14 @@ public void UnRegisterHistorySource(Security security, MarketDataTypes dataType,

private void SendInHistorySourceMessage(Security security, MarketDataTypes dataType, object arg, Func<DateTimeOffset, IEnumerable<Message>> getMessages)
{
var isSubscribe = getMessages != null;

if (isSubscribe)
{
if (_historySourceSubscriptions.ChangeSubscribers(Tuple.Create(security.ToSecurityId(), dataType, arg), true) != 1)
return;
}
else
{
if (_historySourceSubscriptions.ChangeSubscribers(Tuple.Create(security.ToSecurityId(), dataType, arg), false) != 0)
return;
}

SendInMessage(new HistorySourceMessage
{
IsSubscribe = isSubscribe,
SecurityId = security.ToSecurityId(),
IsSubscribe = getMessages != null,
SecurityId = security?.ToSecurityId() ?? default(SecurityId),
DataType = dataType,
Arg = arg,
GetMessages = getMessages
});
}

#region IExternalCandleSource

//IEnumerable<Range<DateTimeOffset>> IExternalCandleSource.GetSupportedRanges(CandleSeries series)
//{
// if (!UseExternalCandleSource)
// yield break;

// var securityId = series.Security.ToSecurityId();
// var messageType = series.CandleType.ToCandleMessageType();
// var dataType = messageType.ToCandleMarketDataType();

// if (_historySourceSubscriptions.ContainsKey(Tuple.Create(securityId, dataType, series.Arg)))
// {
// yield return new Range<DateTimeOffset>(DateTimeOffset.MinValue, DateTimeOffset.MaxValue);
// yield break;
// }

// var types = HistoryMessageAdapter.Drive.GetAvailableDataTypes(securityId, HistoryMessageAdapter.StorageFormat);

// foreach (var tuple in types)
// {
// if (tuple.MessageType != messageType || !tuple.Arg.Equals(series.Arg))
// continue;

// var dates = HistoryMessageAdapter.StorageRegistry.GetCandleMessageStorage(tuple.MessageType, series.Security, series.Arg, HistoryMessageAdapter.Drive, HistoryMessageAdapter.StorageFormat).Dates.ToArray();

// if (dates.Any())
// yield return new Range<DateTimeOffset>(dates.First().ApplyTimeZone(TimeZoneInfo.Utc), dates.Last().ApplyTimeZone(TimeZoneInfo.Utc));

// break;
// }
//}

//private Action<CandleSeries, IEnumerable<Candle>> _newCandles;

//event Action<CandleSeries, IEnumerable<Candle>> IExternalCandleSource.NewCandles
//{
// add { _newCandles += value; }
// remove { _newCandles -= value; }
//}

//private Action<CandleSeries> _stopped;

//event Action<CandleSeries> IExternalCandleSource.Stopped
//{
// add { _stopped += value; }
// remove { _stopped -= value; }
//}

//void IExternalCandleSource.SubscribeCandles(CandleSeries series, DateTimeOffset from, DateTimeOffset to)
//{
// var securityId = GetSecurityId(series.Security);
// var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType();
// var key = Tuple.Create(securityId, dataType, series.Arg);

// _series.SafeAdd(key).Add(series);

// if (!_historySourceSubscriptions.ContainsKey(key))
// {
// if (_subscribedCandles.ChangeSubscribers(key, true) != 1)
// return;

// MarketDataAdapter.SendInMessage(new MarketDataMessage
// {
// //SecurityId = securityId,
// DataType = dataType,
// Arg = series.Arg,
// IsSubscribe = true,
// }.FillSecurityInfo(this, series.Security));
// }
//}

//void IExternalCandleSource.UnSubscribeCandles(CandleSeries series)
//{
// var securityId = GetSecurityId(series.Security);
// var dataType = series.CandleType.ToCandleMessageType().ToCandleMarketDataType();
// var key = Tuple.Create(securityId, dataType, series.Arg);

// _series.SafeAdd(key).Remove(series);

// if (!_historySourceSubscriptions.ContainsKey(key))
// {
// if (_subscribedCandles.ChangeSubscribers(key, false) != 0)
// return;

// MarketDataAdapter.SendInMessage(new MarketDataMessage
// {
// //SecurityId = securityId,
// DataType = MarketDataTypes.CandleTimeFrame,
// Arg = series.Arg,
// IsSubscribe = false,
// }.FillSecurityInfo(this, series.Security));
// }
//}

#endregion
}
}

0 comments on commit db0dfb3

Please sign in to comment.