Permalink
Browse files

Using long instead of int for checkpoint numbers in sql and mongo sto…

…rage.
  • Loading branch information...
1 parent c6cbca1 commit 8635f710beaa13da1ad6e54af2babebcbed1c3a9 @damianh damianh committed Sep 24, 2013
@@ -26,7 +26,7 @@ public static class ExtensionMethods
}
}
- public static BsonDocument ToMongoCommit(this CommitAttempt commit, Func<int> getNextCheckpointNumber, IDocumentSerializer serializer)
+ public static BsonDocument ToMongoCommit(this CommitAttempt commit, Func<long> getNextCheckpointNumber, IDocumentSerializer serializer)
{
int streamRevision = commit.StreamRevision - (commit.Events.Count - 1);
IEnumerable<BsonDocument> events = commit
@@ -80,7 +80,7 @@ public static ICommit ToCommit(this BsonDocument doc, IDocumentSerializer serial
doc[MongoFields.CommitId].AsGuid,
commitSequence,
doc[MongoFields.CommitStamp].ToUniversalTime(),
- new IntCheckpoint(doc[MongoFields.CheckpointNumber].ToInt32()).Value,
+ new LongCheckpoint(doc[MongoFields.CheckpointNumber].ToInt64()).Value,
doc[MongoFields.Headers].AsDictionary<string, object>(),
events);
}
@@ -22,7 +22,7 @@ public class MongoPersistenceEngine : IPersistStreams
private readonly MongoCollectionSettings _countersSettings;
private bool _disposed;
private int _initialized;
- private readonly Func<int> _getNextCheckpointNumber;
+ private readonly Func<long> _getNextCheckpointNumber;
public MongoPersistenceEngine(MongoDatabase store, IDocumentSerializer serializer)
{
@@ -50,9 +50,9 @@ public MongoPersistenceEngine(MongoDatabase store, IDocumentSerializer serialize
_getNextCheckpointNumber = () => TryMongo(() =>
{
IMongoQuery query = Query.EQ("_id", "CheckpointNumber");
- IMongoUpdate update = Update.Inc("seq", 1);
+ IMongoUpdate update = Update.Inc("seq", 1L);
FindAndModifyResult result = Counters.FindAndModify(query, null, update, true, true);
- return result.ModifiedDocument["seq"].ToInt32();
+ return result.ModifiedDocument["seq"].ToInt64();
});
}
@@ -106,7 +106,7 @@ public virtual void Initialize()
IndexOptions.SetName("Unsnapshotted_Index").SetUnique(false));
IMongoQuery query = Query.EQ("_id", MongoFields.CheckpointNumber);
- IMongoUpdate update = Update.Replace(new BsonDocument {{"_id", "CheckpointNumber"}, {"seq", 0}});
+ IMongoUpdate update = Update.Replace(new BsonDocument {{"_id", "CheckpointNumber"}, {"seq", 0L}});
Counters.Update(query, update, UpdateFlags.Upsert, WriteConcern.Acknowledged);
});
}
@@ -142,17 +142,17 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
public IEnumerable<ICommit> GetFrom(string checkpointToken)
{
- var intCheckpoint = IntCheckpoint.Parse(checkpointToken);
+ var intCheckpoint = LongCheckpoint.Parse(checkpointToken);
Logger.Debug(Messages.GettingAllCommitsFromCheckpoint, intCheckpoint.Value);
return TryMongo(() => PersistedCommits
- .Find(Query.GTE(MongoFields.CheckpointNumber, intCheckpoint.IntValue)))
+ .Find(Query.GTE(MongoFields.CheckpointNumber, intCheckpoint.LongValue)))
.SetSortOrder(MongoFields.CheckpointNumber)
.Select(x => x.ToCommit(_serializer));
}
public ICheckpoint GetCheckpoint(string checkpointToken = null)
{
- return IntCheckpoint.Parse(checkpointToken);
+ return LongCheckpoint.Parse(checkpointToken);
}
public virtual IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
@@ -293,7 +293,7 @@ public IEnumerable<ICommit> GetFrom(ICheckpoint checkpoint)
throw new NotImplementedException("Engine to be rewritten");
}
- public ICheckpoint StartCheckpoint { get { return new IntCheckpoint(0); } }
+ public ICheckpoint StartCheckpoint { get { return new LongCheckpoint(0); } }
public bool IsDisposed
{
@@ -7,7 +7,7 @@ internal static class CommitHelper
{
public static ICommit Create()
{
- return new Commit(Bucket.Default, "defaultstream", 0, Guid.NewGuid(), 0, DateTime.MinValue, new IntCheckpoint(0).Value, null, null);
+ return new Commit(Bucket.Default, "defaultstream", 0, Guid.NewGuid(), 0, DateTime.MinValue, new LongCheckpoint(0).Value, null, null);
}
}
}
@@ -155,7 +155,7 @@ protected static ICommit CreateCommit(EventMessage eventMessage)
Guid.NewGuid(),
0,
DateTime.MinValue,
- new IntCheckpoint(0).Value,
+ new LongCheckpoint(0).Value,
null,
new[] { eventMessage });
}
@@ -14,7 +14,7 @@ namespace NEventStore
public class when_a_commit_has_been_persisted : SpecificationBase
{
- private readonly ICommit _commit = new Commit(Bucket.Default, Guid.NewGuid().ToString(), 0, Guid.NewGuid(), 0, DateTime.MinValue, new IntCheckpoint(0).Value, null, null);
+ private readonly ICommit _commit = new Commit(Bucket.Default, Guid.NewGuid().ToString(), 0, Guid.NewGuid(), 0, DateTime.MinValue, new LongCheckpoint(0).Value, null, null);
private readonly Mock<IScheduleDispatches> _dispatcher = new Mock<IScheduleDispatches>();
private DispatchSchedulerPipelineHook _dispatchSchedulerHook;
@@ -41,7 +41,7 @@ public class when_the_hook_has_no_dispatcher_configured : SpecificationBase
{
private readonly DispatchSchedulerPipelineHook _dispatchSchedulerHook = new DispatchSchedulerPipelineHook();
- private readonly ICommit _commit = new Commit(Bucket.Default, Guid.NewGuid().ToString(), 0, Guid.NewGuid(), 0, DateTime.MinValue, new IntCheckpoint(0).Value, null, null);
+ private readonly ICommit _commit = new Commit(Bucket.Default, Guid.NewGuid().ToString(), 0, Guid.NewGuid(), 0, DateTime.MinValue, new LongCheckpoint(0).Value, null, null);
private Exception _thrown;
@@ -61,7 +61,7 @@ public class when_a_commit_is_selected : SpecificationBase
{
private readonly DispatchSchedulerPipelineHook _dispatchSchedulerHook = new DispatchSchedulerPipelineHook();
- private readonly ICommit _commit = new Commit(Bucket.Default, Guid.NewGuid().ToString(), 0, Guid.NewGuid(), 0, DateTime.MinValue, new IntCheckpoint(0).Value, null, null);
+ private readonly ICommit _commit = new Commit(Bucket.Default, Guid.NewGuid().ToString(), 0, Guid.NewGuid(), 0, DateTime.MinValue, new LongCheckpoint(0).Value, null, null);
private ICommit _selected;
@@ -244,7 +244,7 @@ private ICommit BuildCommit(Guid streamId, Guid commitId)
private ICommit BuildCommit(string streamId, Guid commitId)
{
- return new Commit(Bucket.Default, streamId, 0, commitId, 0, SystemTime.UtcNow, new IntCheckpoint(0).Value, null, null);
+ return new Commit(Bucket.Default, streamId, 0, commitId, 0, SystemTime.UtcNow, new LongCheckpoint(0).Value, null, null);
}
}
@@ -261,7 +261,7 @@ protected CommitAttempt BuildCommitStub(Guid commitId)
protected ICommit BuildCommitStub(int streamRevision, int commitSequence)
{
List<EventMessage> events = new[] {new EventMessage()}.ToList();
- return new Commit(Bucket.Default, _streamId, streamRevision, Guid.NewGuid(), commitSequence, SystemTime.UtcNow, new IntCheckpoint(0).Value, null, events);
+ return new Commit(Bucket.Default, _streamId, streamRevision, Guid.NewGuid(), commitSequence, SystemTime.UtcNow, new LongCheckpoint(0).Value, null, events);
}
protected CommitAttempt BuildCommitAttemptStub(int streamRevision, int commitSequence)
@@ -273,7 +273,7 @@ protected CommitAttempt BuildCommitAttemptStub(int streamRevision, int commitSeq
protected ICommit BuildCommitStub(Guid commitId, int streamRevision, int commitSequence)
{
List<EventMessage> events = new[] {new EventMessage()}.ToList();
- return new Commit(Bucket.Default, _streamId, streamRevision, commitId, commitSequence, SystemTime.UtcNow, new IntCheckpoint(0).Value, null, events);
+ return new Commit(Bucket.Default, _streamId, streamRevision, commitId, commitSequence, SystemTime.UtcNow, new LongCheckpoint(0).Value, null, events);
}
}
}
@@ -391,7 +391,7 @@ protected override void Context()
attempt.CommitId,
attempt.CommitSequence,
attempt.CommitStamp,
- new IntCheckpoint(0).Value,
+ new LongCheckpoint(0).Value,
attempt.Headers,
attempt.Events);
return _populatedCommit;
@@ -515,7 +515,7 @@ protected CommitAttempt BuildCommitAttemptStub(Guid commitId)
protected ICommit BuildCommitStub(int streamRevision, int commitSequence)
{
List<EventMessage> events = new[] {new EventMessage()}.ToList();
- return new Commit(Bucket.Default, streamId, streamRevision, Guid.NewGuid(), commitSequence, SystemTime.UtcNow, new IntCheckpoint(0).Value, null, events);
+ return new Commit(Bucket.Default, streamId, streamRevision, Guid.NewGuid(), commitSequence, SystemTime.UtcNow, new LongCheckpoint(0).Value, null, events);
}
protected CommitAttempt BuildCommitAttemptStub(int streamRevision, int commitSequence)
@@ -527,7 +527,7 @@ protected CommitAttempt BuildCommitAttemptStub(int streamRevision, int commitSeq
protected ICommit BuildCommitStub(Guid commitId, int streamRevision, int commitSequence)
{
List<EventMessage> events = new[] {new EventMessage()}.ToList();
- return new Commit(Bucket.Default, streamId, streamRevision, commitId, commitSequence, SystemTime.UtcNow, new IntCheckpoint(0).Value, null, events);
+ return new Commit(Bucket.Default, streamId, streamRevision, commitId, commitSequence, SystemTime.UtcNow, new LongCheckpoint(0).Value, null, events);
}
}
}
@@ -263,7 +263,7 @@ protected override void Context()
attempt.CommitId,
attempt.CommitSequence,
attempt.CommitStamp,
- new IntCheckpoint(0).Value,
+ new LongCheckpoint(0).Value,
attempt.Headers,
attempt.Events));
Stream.Add(_uncommitted);
@@ -570,7 +570,7 @@ protected ICommit BuildCommitStub(int revision, int sequence, int eventCount)
events.Add(new EventMessage());
}
- return new Commit(Bucket.Default, StreamId, revision, Guid.NewGuid(), sequence, SystemTime.UtcNow, new IntCheckpoint(0).Value, null, events);
+ return new Commit(Bucket.Default, StreamId, revision, Guid.NewGuid(), sequence, SystemTime.UtcNow, new LongCheckpoint(0).Value, null, events);
}
}
@@ -36,7 +36,7 @@ public class when_reading_the_all_events_from_date : using_underlying_persistenc
protected override void Context()
{
date = DateTime.Now;
- _commit = new Commit(Bucket.Default, streamId, 1, Guid.NewGuid(), 1, DateTime.Now, new IntCheckpoint(0).Value, null, null);
+ _commit = new Commit(Bucket.Default, streamId, 1, Guid.NewGuid(), 1, DateTime.Now, new LongCheckpoint(0).Value, null, null);
hook1 = new Mock<IPipelineHook>();
hook1.Setup(h => h.Select(_commit)).Returns(_commit);
@@ -81,7 +81,7 @@ protected override void Context()
{
start = DateTime.Now;
end = DateTime.Now;
- _commit = new Commit(Bucket.Default, streamId, 1, Guid.NewGuid(), 1, DateTime.Now, new IntCheckpoint(0).Value, null, null);
+ _commit = new Commit(Bucket.Default, streamId, 1, Guid.NewGuid(), 1, DateTime.Now, new LongCheckpoint(0).Value, null, null);
hook1 = new Mock<IPipelineHook>();
hook1.Setup(h => h.Select(_commit)).Returns(_commit);
@@ -38,7 +38,7 @@ public void MarkCommitAsDispatched(ICommit commit)
public ICheckpoint ParseCheckpoint(string checkpointValue)
{
- return IntCheckpoint.Parse(checkpointValue);
+ return LongCheckpoint.Parse(checkpointValue);
}
public ICheckpoint GetCheckpoint(string checkpointToken = null)
@@ -3,36 +3,36 @@ namespace NEventStore
using System;
using System.Globalization;
- public sealed class IntCheckpoint : ICheckpoint
+ public sealed class LongCheckpoint : ICheckpoint
{
- private readonly int _value;
+ private readonly long _value;
- public IntCheckpoint(int value)
+ public LongCheckpoint(long value)
{
_value = value;
}
public string Value { get { return _value.ToString(CultureInfo.InvariantCulture); }}
- public int IntValue { get { return _value; } }
+ public long LongValue { get { return _value; } }
public int CompareTo(ICheckpoint other)
{
if (other == null)
{
return 1;
}
- var intCheckpoint = other as IntCheckpoint;
+ var intCheckpoint = other as LongCheckpoint;
if (intCheckpoint == null)
{
throw new InvalidOperationException("Can only compare with {0} but compared with {1}".FormatWith());
}
- return _value.CompareTo(intCheckpoint.IntValue);
+ return _value.CompareTo(intCheckpoint.LongValue);
}
- public static IntCheckpoint Parse(string checkpointValue)
+ public static LongCheckpoint Parse(string checkpointValue)
{
- return string.IsNullOrWhiteSpace(checkpointValue) ? new IntCheckpoint(-1) : new IntCheckpoint(int.Parse(checkpointValue));
+ return string.IsNullOrWhiteSpace(checkpointValue) ? new LongCheckpoint(-1) : new LongCheckpoint(long.Parse(checkpointValue));
}
}
}
@@ -87,7 +87,7 @@
<Compile Include="ICheckpoint.cs" />
<Compile Include="ICommit.cs" />
<Compile Include="ImmutableCollection.cs" />
- <Compile Include="IntCheckpoint.cs" />
+ <Compile Include="LongCheckpoint.cs" />
<Compile Include="ISnapshot.cs" />
<Compile Include="LoggingWireupExtensions.cs" />
<Compile Include="Logging\ConsoleWindowLogger.cs" />
@@ -47,7 +47,7 @@ public IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
public IEnumerable<ICommit> GetFrom(string checkpointToken)
{
Logger.Debug(Resources.GettingAllCommitsFromCheckpoint, checkpointToken);
- ICheckpoint checkpoint = IntCheckpoint.Parse(checkpointToken);
+ ICheckpoint checkpoint = LongCheckpoint.Parse(checkpointToken);
return _buckets
.Values
.SelectMany(b => b.GetCommits())
@@ -58,7 +58,7 @@ public IEnumerable<ICommit> GetFrom(string checkpointToken)
public ICheckpoint GetCheckpoint(string checkpointToken = null)
{
- return IntCheckpoint.Parse(checkpointToken);
+ return LongCheckpoint.Parse(checkpointToken);
}
public IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
@@ -72,7 +72,7 @@ public ICommit Commit(CommitAttempt attempt)
{
ThrowWhenDisposed();
Logger.Debug(Resources.AttemptingToCommit, attempt.CommitId, attempt.StreamId, attempt.CommitSequence);
- return this[attempt.BucketId].Commit(attempt, new IntCheckpoint(Interlocked.Increment(ref _checkpoint)));
+ return this[attempt.BucketId].Commit(attempt, new LongCheckpoint(Interlocked.Increment(ref _checkpoint)));
}
public IEnumerable<ICommit> GetUndispatchedCommits()
@@ -32,7 +32,7 @@ public static ICommit GetCommit(this IDataRecord record, ISerialize serializer)
record[CommitIdIndex].ToGuid(),
record[CommitSequenceIndex].ToInt(),
record[CommitStampIndex].ToDateTime(),
- new IntCheckpoint(record[CheckpointIndex].ToInt()).Value,
+ new LongCheckpoint(record[CheckpointIndex].ToLong()).Value,
headers,
events);
}
@@ -28,6 +28,15 @@ public static int ToInt(this object value)
: value is decimal ? (int) (decimal) value : Convert.ToInt32(value);
}
+ public static long ToLong(this object value)
+ {
+ return value is long
+ ? (long) value
+ : value is int
+ ? (int) value
+ : value is decimal ? (long) (decimal) value : Convert.ToInt32(value);
+ }
+
public static DateTime ToDateTime(this object value)
{
value = value is decimal ? (long) (decimal) value : value;
@@ -113,7 +113,7 @@ public virtual IEnumerable<ICommit> GetFrom(string bucketId, DateTime start)
public ICheckpoint GetCheckpoint(string checkpointToken)
{
- return string.IsNullOrWhiteSpace(checkpointToken) ? new IntCheckpoint(-1) : IntCheckpoint.Parse(checkpointToken);
+ return string.IsNullOrWhiteSpace(checkpointToken) ? new LongCheckpoint(-1) : LongCheckpoint.Parse(checkpointToken);
}
public virtual IEnumerable<ICommit> GetFromTo(string bucketId, DateTime start, DateTime end)
@@ -304,7 +304,7 @@ private ICommit PersistCommit(CommitAttempt attempt)
cmd.AddParameter(_dialect.CommitStamp, attempt.CommitStamp);
cmd.AddParameter(_dialect.Headers, _serializer.Serialize(attempt.Headers));
cmd.AddParameter(_dialect.Payload, _serializer.Serialize(attempt.Events.ToList()));
- var checkpointNumber = cmd.ExecuteScalar(_dialect.PersistCommit).ToInt();
+ var checkpointNumber = cmd.ExecuteScalar(_dialect.PersistCommit).ToLong();
return new Commit(
attempt.BucketId,
attempt.StreamId,

0 comments on commit 8635f71

Please sign in to comment.