Permalink
Browse files

Postgres custom PersistCommit statement.

  • Loading branch information...
1 parent 6b2aa10 commit e111a4751119ad82aefbc6891ab64b8373ec410b @damianh damianh committed Sep 3, 2013
View
2 src/NEventStore/Persistence/SqlPersistence/SqlDialects/PostgreSqlDialect.cs
@@ -14,7 +14,7 @@ public override string MarkCommitAsDispatched
public override string PersistCommit
{
- get { throw new System.NotImplementedException(); }
+ get { return PostgreSqlStatements.PersistCommits; }
}
public override string GetUndispatchedCommits
View
13 src/NEventStore/Persistence/SqlPersistence/SqlDialects/PostgreSqlStatements.Designer.cs
@@ -81,5 +81,18 @@ internal class PostgreSqlStatements {
return ResourceManager.GetString("InitializeStorage", resourceCulture);
}
}
+
+ /// <summary>
+ /// Looks up a localized string similar to INSERT
+ /// INTO Commits
+ /// ( BucketId, StreamId, StreamIdOriginal, CommitId, CommitSequence, StreamRevision, Items, CommitStamp, Headers, Payload )
+ ///VALUES (@BucketId, @StreamId, @StreamIdOriginal, @CommitId, @CommitSequence, @StreamRevision, @Items, @CommitStamp, @Headers, @Payload)
+ ///RETURNING CheckpointNumber;.
+ /// </summary>
+ internal static string PersistCommits {
+ get {
+ return ResourceManager.GetString("PersistCommits", resourceCulture);
+ }
+ }
}
}
View
7 src/NEventStore/Persistence/SqlPersistence/SqlDialects/PostgreSqlStatements.resx
@@ -148,4 +148,11 @@ CREATE TABLE Snapshots
CONSTRAINT PK_Snapshots PRIMARY KEY (BucketId, StreamId, StreamRevision)
);</value>
</data>
+ <data name="PersistCommits" xml:space="preserve">
+ <value>INSERT
+ INTO Commits
+ ( BucketId, StreamId, StreamIdOriginal, CommitId, CommitSequence, StreamRevision, Items, CommitStamp, Headers, Payload )
+VALUES (@BucketId, @StreamId, @StreamIdOriginal, @CommitId, @CommitSequence, @StreamRevision, @Items, @CommitStamp, @Headers, @Payload)
+RETURNING CheckpointNumber;</value>
+ </data>
</root>
View
5 src/NEventStore/Persistence/SqlPersistence/SqlPersistenceEngine.cs
@@ -256,10 +256,13 @@ public void DeleteStream(string bucketId, string streamId)
public IEnumerable<ICommit> GetFrom(ICheckpoint checkpoint)
{
Logger.Debug(Messages.GettingAllCommitsFromCheckpoint, checkpoint);
+ //Assuming all SqlEngines are using IntCheckpoint, this will break when they don't.
+ //Postgres requires the CheckpointNumber query paramater to be of type int, the others where happy with a string...
+ var intCheckpoint = (IntCheckpoint) checkpoint;
return ExecuteQuery(query =>
{
string statement = _dialect.GetCommitsFromCheckpoint;
- query.AddParameter(_dialect.CheckpointNumber, checkpoint.Value);
+ query.AddParameter(_dialect.CheckpointNumber, intCheckpoint.IntValue);
return query.ExecutePagedQuery(statement, (q, r) => q.SetParameter(_dialect.CheckpointNumber, r.CheckpointNumber())).Select(x => x.GetCommit(_serializer));
});
}

0 comments on commit e111a47

Please sign in to comment.