Skip to content

Commit

Permalink
Merge pull request #1471 from EventStore/persistent-subscription-config
Browse files Browse the repository at this point in the history
Convert timeout of 0 to no timeout when loading config on startup.
  • Loading branch information
shaan1337 committed Oct 20, 2017
2 parents 116e027 + 75442ae commit 53e6ca7
Showing 1 changed file with 11 additions and 35 deletions.
Expand Up @@ -171,8 +171,6 @@ public void Handle(ClientMessage.CreatePersistentSubscription message)
return;
}

var msgTimeout = message.MessageTimeoutMilliseconds == 0 ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(message.MessageTimeoutMilliseconds);
var checkPointAfter = message.CheckPointAfterMilliseconds == 0 ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(message.CheckPointAfterMilliseconds);
CreateSubscriptionGroup(message.EventStreamId,
message.GroupName,
message.ResolveLinkTos,
Expand All @@ -182,12 +180,12 @@ public void Handle(ClientMessage.CreatePersistentSubscription message)
message.LiveBufferSize,
message.BufferSize,
message.ReadBatchSize,
checkPointAfter,
ToTimeout(message.CheckPointAfterMilliseconds),
message.MinCheckPointCount,
message.MaxCheckPointCount,
message.MaxSubscriberCount,
message.NamedConsumerStrategy,
msgTimeout
ToTimeout(message.MessageTimeoutMilliseconds)
);
Log.Debug("New persistent subscription {0}.", message.GroupName);
_config.Updated = DateTime.Now;
Expand Down Expand Up @@ -247,8 +245,6 @@ public void Handle(ClientMessage.UpdatePersistentSubscription message)

RemoveSubscription(message.EventStreamId, message.GroupName);
RemoveSubscriptionConfig(message.User.Identity.Name, message.EventStreamId, message.GroupName);
var msgTimeout = message.MessageTimeoutMilliseconds == 0 ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(message.MessageTimeoutMilliseconds);
var checkPointAfter = message.CheckPointAfterMilliseconds == 0 ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(message.CheckPointAfterMilliseconds);
CreateSubscriptionGroup(message.EventStreamId,
message.GroupName,
message.ResolveLinkTos,
Expand All @@ -258,12 +254,12 @@ public void Handle(ClientMessage.UpdatePersistentSubscription message)
message.LiveBufferSize,
message.BufferSize,
message.ReadBatchSize,
checkPointAfter,
ToTimeout(message.CheckPointAfterMilliseconds),
message.MinCheckPointCount,
message.MaxCheckPointCount,
message.MaxSubscriberCount,
message.NamedConsumerStrategy,
msgTimeout
ToTimeout(message.MessageTimeoutMilliseconds)
);
_config.Updated = DateTime.Now;
_config.UpdatedBy = message.User.Identity.Name;
Expand Down Expand Up @@ -668,12 +664,12 @@ private void HandleLoadCompleted(Action continueWith, ClientMessage.ReadStreamEv
entry.LiveBufferSize,
entry.HistoryBufferSize,
entry.ReadBatchSize,
TimeSpan.FromMilliseconds(entry.CheckPointAfter),
ToTimeout(entry.CheckPointAfter),
entry.MinCheckPointCount,
entry.MaxCheckPointCount,
entry.MaxSubscriberCount,
entry.NamedConsumerStrategy,
TimeSpan.FromMilliseconds(entry.MessageTimeout));
ToTimeout(entry.MessageTimeout));
}
continueWith();
}
Expand Down Expand Up @@ -716,31 +712,6 @@ private void HandleSaveConfigurationCompleted(Action continueWith, ClientMessage
}
}

public void LoadSubscriptionsFromConfig()
{
Log.Debug("Loading subscriptions from persisted config.");
InitToEmpty();
if(_config.Entries == null) throw new Exception("Subscription Entries should never be null.");
foreach (var sub in _config.Entries)
{
CreateSubscriptionGroup(sub.Stream,
sub.Group,
sub.ResolveLinkTos,
sub.StartFrom,
sub.ExtraStatistics,
sub.MaxRetryCount,
sub.LiveBufferSize,
sub.HistoryBufferSize,
sub.ReadBatchSize,
TimeSpan.FromMilliseconds(sub.CheckPointAfter),
sub.MinCheckPointCount,
sub.MaxCheckPointCount,
sub.MaxSubscriberCount,
sub.NamedConsumerStrategy,
TimeSpan.FromMilliseconds(sub.MessageTimeout));
}
}

public void Handle(MonitoringMessage.GetPersistentSubscriptionStats message)
{
if (!_started)
Expand Down Expand Up @@ -835,5 +806,10 @@ private void WakeSubscriptions()
subscription.NotifyClockTick(now);
}
}

private TimeSpan ToTimeout(int milliseconds)
{
return milliseconds == 0 ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(milliseconds);
}
}
}

0 comments on commit 53e6ca7

Please sign in to comment.