Skip to content

Commit

Permalink
PartialDownloadMessageAdapter, CandleBuilderMessageAdapter. Skip and …
Browse files Browse the repository at this point in the history
…Count properties supported.
  • Loading branch information
mikasoukhov committed Aug 19, 2020
1 parent 8c9a39f commit 0285d51
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 57 deletions.
54 changes: 47 additions & 7 deletions Algo/Candles/Compression/CandleBuilderMessageAdapter.cs
Expand Up @@ -56,6 +56,8 @@ public MarketDataMessage Current

public DateTimeOffset? LastTime { get; set; }

public long? Count { get; set; }

public CandleMessage CurrentCandle { get; set; }

public CandleMessage NonFinishedCandle { get; set; }
Expand Down Expand Up @@ -208,6 +210,7 @@ protected override bool OnSendInMessage(Message message)
{
State = SeriesStates.Regular,
LastTime = original.From,
Count = original.Count,
});
}

Expand Down Expand Up @@ -244,6 +247,7 @@ protected override bool OnSendInMessage(Message message)
State = SeriesStates.SmallTimeFrame,
BigTimeFrameCompressor = new BiggerTimeFrameCandleCompressor(original, _candleBuilderProvider.Get(typeof(TimeFrameCandleMessage))),
LastTime = original.From,
Count = original.Count,
});
}

Expand All @@ -270,6 +274,7 @@ protected override bool OnSendInMessage(Message message)
{
State = SeriesStates.Regular,
LastTime = original.From,
Count = original.Count,
});
}

Expand Down Expand Up @@ -359,7 +364,7 @@ private SeriesInfo TryRemoveSeries(long id)
}
}

private MarketDataMessage TryCreateBuildSubscription(MarketDataMessage original, DateTimeOffset? lastTime)
private MarketDataMessage TryCreateBuildSubscription(MarketDataMessage original, DateTimeOffset? lastTime, long? count, bool needCalcCount)
{
if (original == null)
throw new ArgumentNullException(nameof(original));
Expand All @@ -369,12 +374,15 @@ private MarketDataMessage TryCreateBuildSubscription(MarketDataMessage original,
if (buildFrom == null)
return null;

if (needCalcCount && count is null)
count = GetMaxCount(buildFrom);

var current = new MarketDataMessage
{
DataType2 = buildFrom,
From = lastTime,
To = original.To,
Count = original.Count,
Count = count,
MaxDepth = original.MaxDepth,
BuildField = original.BuildField,
IsSubscribe = true,
Expand All @@ -389,7 +397,7 @@ private MarketDataMessage TryCreateBuildSubscription(MarketDataMessage original,

private bool TrySubscribeBuild(MarketDataMessage original)
{
var current = TryCreateBuildSubscription(original, original.From);
var current = TryCreateBuildSubscription(original, original.From, original.Count, false);

if (current == null)
return false;
Expand All @@ -399,6 +407,7 @@ private bool TrySubscribeBuild(MarketDataMessage original)
var series = new SeriesInfo(original.TypedClone(), current)
{
LastTime = current.From,
Count = current.Count,
Transform = CreateTransform(current),
State = SeriesStates.Compress,
};
Expand Down Expand Up @@ -562,6 +571,15 @@ protected override void OnInnerAdapterNewOutMessage(Message message)
bigCandle.SetSubscriptionIds(subscriptionId: series.Id);
bigCandle.Adapter = candleMsg.Adapter;
series.LastTime = bigCandle.CloseTime;

if (series.Count != null)
{
if (series.Count <= 0)
break;

series.Count--;
}

base.OnInnerAdapterNewOutMessage(bigCandle);
}

Expand Down Expand Up @@ -643,6 +661,12 @@ void Finish()
return;
}

if (original.Count != null && series.Count <= 0)
{
Finish();
return;
}

switch (series.State)
{
case SeriesStates.Regular:
Expand Down Expand Up @@ -720,7 +744,7 @@ void Finish()

series.NonFinishedCandle = null;

var current = TryCreateBuildSubscription(original, series.LastTime);
var current = TryCreateBuildSubscription(original, series.LastTime, null, series.Count != null);

if (current == null)
{
Expand Down Expand Up @@ -748,6 +772,9 @@ private void ProcessCandle(SeriesInfo info, CandleMessage candleMsg)
if (info.LastTime != null && info.LastTime > candleMsg.OpenTime)
return;

if (info.Count <= 0)
return;

info.LastTime = candleMsg.OpenTime;

var nonFinished = info.NonFinishedCandle;
Expand All @@ -756,7 +783,7 @@ private void ProcessCandle(SeriesInfo info, CandleMessage candleMsg)
{
nonFinished.State = CandleStates.Finished;
nonFinished.LocalTime = candleMsg.LocalTime;
RaiseNewOutMessage(nonFinished);
RaiseNewOutCandle(info, nonFinished);
info.NonFinishedCandle = null;
}

Expand Down Expand Up @@ -818,6 +845,7 @@ private bool ProcessValue(ISubscriptionIdMessage message)
return new SeriesInfo(allMsg, allMsg)
{
LastTime = allMsg.From,
Count = allMsg.Count,
Transform = CreateTransform(series.Current),
State = SeriesStates.Compress,
};
Expand All @@ -828,9 +856,7 @@ private bool ProcessValue(ISubscriptionIdMessage message)
}

if (allMsg != null)
{
RaiseNewOutMessage(allMsg);
}
}

var transform = series.Transform;
Expand All @@ -851,6 +877,9 @@ private bool ProcessValue(ISubscriptionIdMessage message)
if (series.LastTime != null && series.LastTime.Value > time)
continue;

if (series.Count <= 0)
continue;

series.LastTime = time;

var builder = _candleBuilderProvider.Get(origin.DataType2.MessageType);
Expand Down Expand Up @@ -883,6 +912,17 @@ private void SendCandle(SeriesInfo info, CandleMessage candleMsg)
candleMsg.OriginalTransactionId = info.Id;
candleMsg.SetSubscriptionIds(subscriptionId: info.Id);

RaiseNewOutCandle(info, candleMsg);
}

private void RaiseNewOutCandle(SeriesInfo info, CandleMessage candleMsg)
{
if (candleMsg.State == CandleStates.Finished)
{
if (info.Count != null)
info.Count--;
}

RaiseNewOutMessage(candleMsg);
}

Expand Down
49 changes: 29 additions & 20 deletions Algo/PartialDownloadMessageAdapter.cs
Expand Up @@ -68,7 +68,7 @@ private class DownloadInfo
public MarketDataMessage Origin { get; }

public long CurrTransId { get; private set; }
public bool LastIteration => Origin.To != null && _nextFrom >= Origin.To.Value;
public bool LastIteration => Origin.To != null && _nextFrom >= Origin.To.Value || (_count <= 0);

public bool ReplyReceived { get; set; }
public long? UnsubscribingId { get; set; }
Expand All @@ -81,6 +81,7 @@ private class DownloadInfo
private bool _firstIteration;
private DateTimeOffset _nextFrom;
private readonly DateTimeOffset _to;
private long? _count;

private bool IsStepMax => _step == TimeSpan.MaxValue;

Expand All @@ -101,12 +102,33 @@ public DownloadInfo(PartialDownloadMessageAdapter adapter, MarketDataMessage ori
_currFrom = origin.From ?? _to - (IsStepMax ? TimeSpan.FromDays(1) : step);

_firstIteration = true;

_count = origin.Count;
}

public void TryUpdateNextFrom(DateTimeOffset last)
{
if (_currFrom < last)
_nextFrom = last;

if (_count != default)
_count--;
}

private void Init(MarketDataMessage mdMsg)
{
if (_nextFrom > _to)
_nextFrom = _to;

mdMsg.TransactionId = _adapter.TransactionIdGenerator.GetNextId();
mdMsg.Count = _adapter.GetMaxCount(mdMsg.DataType2);
mdMsg.From = _currFrom;
mdMsg.To = _nextFrom;

if (mdMsg.Count > _count)
mdMsg.Count = _count;

CurrTransId = mdMsg.TransactionId;
}

public MarketDataMessage InitNext()
Expand All @@ -122,39 +144,26 @@ public MarketDataMessage InitNext()

_nextFrom = IsStepMax ? _to : _currFrom + _step;

if (_nextFrom > _to)
_nextFrom = _to;

mdMsg.TransactionId = _adapter.TransactionIdGenerator.GetNextId();
mdMsg.Count = _adapter.GetMaxCount(mdMsg.DataType2);
mdMsg.From = _currFrom;
mdMsg.To = _nextFrom;

CurrTransId = mdMsg.TransactionId;
Init(mdMsg);
}
else
{
_iterationInterval.Sleep();

mdMsg.Skip = null;

if (Origin.To == null && _nextFrom >= _to)
{
// on-line
mdMsg.From = null;
mdMsg.Count = null;
}
else
{
_currFrom = _nextFrom;
_nextFrom += _step;

if (_nextFrom > _to)
_nextFrom = _to;

mdMsg.TransactionId = _adapter.TransactionIdGenerator.GetNextId();
mdMsg.Count = _adapter.GetMaxCount(mdMsg.DataType2);
mdMsg.From = _currFrom;
mdMsg.To = _nextFrom;

CurrTransId = mdMsg.TransactionId;
Init(mdMsg);
}
}

Expand Down Expand Up @@ -210,7 +219,7 @@ protected override bool OnSendInMessage(Message message)
var from = subscriptionMsg.From;
var to = subscriptionMsg.To;

if (from != null || to != null)
if (from != null || to != null || subscriptionMsg.Count != null)
{
var step = InnerAdapter.GetHistoryStepSize(DataType.Transactions, out _);

Expand Down

0 comments on commit 0285d51

Please sign in to comment.