Permalink
Browse files

Add ParseCheckpoint ability to IPersistStreams to get an engine speci…

…fic checkpoint implementation from a string.
  • Loading branch information...
damianh committed Sep 4, 2013
1 parent 87ded16 commit 70e730db1ccaf908363e1ee1e1375be45d3fd0ca
@@ -140,6 +140,11 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
.Select(x => x.ToCommit(_serializer)));
}
+ public ICheckpoint ParseCheckpoint(string checkpointValue)
+ {
+ return IntCheckpoint.Parse(checkpointValue);
+ }
+
public virtual IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
{
Logger.Debug(Messages.GettingAllCommitsFromTo, start, end, bucketId);
@@ -114,6 +114,11 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
return QueryCommits<RavenCommitByDate>(x => x.BucketId == bucketId && x.CommitStamp >= start).OrderBy(x => x.CommitStamp);
}
+ public ICheckpoint ParseCheckpoint(string checkpointValue)
+ {
+ throw new NotImplementedException("Engine to be rewritten");
+ }
+
public virtual IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
{
Logger.Debug(Messages.GettingAllCommitsFromTo, start, end, bucketId);
@@ -259,4 +259,42 @@ public void should_observe_exception_on_subscription()
_onErrorException.ShouldBe(_subscriberException);
}
}
+
+ public class when_resuming : using_polling_client
+ {
+ private IObserveCommits _observeCommits;
+ private Task<ICommit> _commitObserved;
+
+ protected override void Context()
+ {
+ base.Context();
+ StoreEvents.Advanced.CommitSingle();
+ _observeCommits = PollingClient.ObserveFromStart();
+ _commitObserved = _observeCommits.FirstAsync().ToTask();
+ _observeCommits.Start();
+ _commitObserved.Wait(PollingInterval * 2);
+ _observeCommits.Dispose();
+
+ StoreEvents.Advanced.CommitSingle();
+ string checkpointValue = _commitObserved.Result.Checkpoint.Value;
+ _observeCommits = PollingClient.ObserveFrom(StoreEvents.Advanced.ParseCheckpoint(checkpointValue));
+ }
+
+ protected override void Because()
+ {
+ _observeCommits.Start();
+ _commitObserved = _observeCommits.FirstAsync().ToTask();
+ }
+
+ protected override void Cleanup()
+ {
+ _observeCommits.Dispose();
+ }
+
+ [Fact]
+ public void should_observe_commit()
+ {
+ _commitObserved.Wait(PollingInterval * 2).ShouldBe(true);
+ }
+ }
}
@@ -36,6 +36,11 @@ public void MarkCommitAsDispatched(ICommit commit)
_counters.CountCommitDispatched();
}
+ public ICheckpoint ParseCheckpoint(string checkpointValue)
+ {
+ return IntCheckpoint.Parse(checkpointValue);
+ }
+
public IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
{
return _persistence.GetFromTo(bucketId, start, end);
@@ -29,5 +29,10 @@ public int CompareTo(ICheckpoint other)
}
return _value.CompareTo(intCheckpoint.IntValue);
}
+
+ public static ICheckpoint Parse(string checkpointValue)
+ {
+ return new IntCheckpoint(int.Parse(checkpointValue));
+ }
}
}
@@ -48,6 +48,13 @@ public interface IPersistStreams : IDisposable, ICommitEvents, IAccessSnapshots
/// </value>
ICheckpoint StartCheckpoint { get; }
+ /// <summary>
+ /// Tries the parse a checkpoint string value.
+ /// </summary>
+ /// <param name="checkpointValue">The checkpoint value to parse.</param>
+ /// <returns>An <see cref="ICheckpoint"/> instance.</returns>
+ ICheckpoint ParseCheckpoint(string checkpointValue);
+
/// <summary>
/// Gets all commits on or after from the specified starting time and before the specified end time.
/// </summary>
@@ -57,6 +57,11 @@ public IEnumerable<ICommit> GetFrom(ICheckpoint checkpoint)
public ICheckpoint StartCheckpoint { get { return new IntCheckpoint(0); } }
+ public ICheckpoint ParseCheckpoint(string checkpointValue)
+ {
+ return IntCheckpoint.Parse(checkpointValue);
+ }
+
public IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
{
ThrowWhenDisposed();
@@ -65,6 +65,11 @@ public IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
return ExecuteHooks(_original.GetFrom(bucketId, start));
}
+ public ICheckpoint ParseCheckpoint(string checkpointValue)
+ {
+ return _original.ParseCheckpoint(checkpointValue);
+ }
+
public IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
{
return ExecuteHooks(_original.GetFromTo(bucketId, start, end));
@@ -110,6 +110,11 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
});
}
+ public ICheckpoint ParseCheckpoint(string checkpointValue)
+ {
+ return IntCheckpoint.Parse(checkpointValue);
+ }
+
public virtual IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
{
start = start.AddTicks(-(start.Ticks%TimeSpan.TicksPerSecond)); // Rounds down to the nearest second.

0 comments on commit 70e730d

Please sign in to comment.