Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

isjson added to writes / stream creation

  • Loading branch information...
commit ddb1849b5fd9f40c796146579b13a7a8df786a12 1 parent 3216532
Taras Roshko TarasRoshko authored
Showing with 181 additions and 133 deletions.
  1. +1 −1  src/EventStore/EventStore.ClientAPI/ClientOperations/AppendToStreamOperation.cs
  2. +4 −1 src/EventStore/EventStore.ClientAPI/ClientOperations/CreateStreamOperation.cs
  3. +1 −1  src/EventStore/EventStore.ClientAPI/ClientOperations/TransactionalWriteOperation.cs
  4. +4 −4 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
  5. +1 −0  src/EventStore/EventStore.ClientAPI/IEvent.cs
  6. +13 −5 src/EventStore/EventStore.ClientAPI/Messages/ClientMessage.cs
  7. +12 −12 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_backward_should.cs
  8. +15 −15 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_forward_should.cs
  9. +2 −2 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/subscribe_to_all_should.cs
  10. +3 −0  src/EventStore/EventStore.Core.Tests/ClientAPI/TestEvent.cs
  11. +7 −7 src/EventStore/EventStore.Core.Tests/ClientAPI/append_to_stream.cs
  12. +6 −6 src/EventStore/EventStore.Core.Tests/ClientAPI/creating_stream.cs
  13. +4 −4 src/EventStore/EventStore.Core.Tests/ClientAPI/deleting_stream.cs
  14. +8 −8 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_backward_should.cs
  15. +9 −9 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_forward_should.cs
  16. +5 −5 src/EventStore/EventStore.Core.Tests/ClientAPI/subscribe_should.cs
  17. +4 −4 src/EventStore/EventStore.Core.Tests/ClientAPI/transaction.cs
  18. +1 −1  src/EventStore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_accepts_request.cs
  19. +1 −1  ...tStore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_completes_successfully.cs
  20. +1 −1  ...tStore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_already_committed.cs
  21. +1 −1  ....Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_commit_timeout_before_commit_stage.cs
  22. +1 −1  ....Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_commit_timeout_before_final_commit.cs
  23. +1 −1  ...tore.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_prepare_timeout_after_prepares.cs
  24. +1 −1  ...ore.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_prepare_timeout_before_prepares.cs
  25. +1 −1  ...ventStore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_stream_deleted.cs
  26. +1 −1  src/EventStore/EventStore.Core.Tests/Services/Replication/RequestManagerSpecification.cs
  27. +3 −1 src/EventStore/EventStore.Core/Messages/ClientMessage.cs
  28. +3 −0  src/EventStore/EventStore.Core/Messages/StorageMessage.cs
  29. +13 −4 src/EventStore/EventStore.Core/Messages/TcpClientMessageDto.cs
  30. +1 −1  src/EventStore/EventStore.Core/Services/RequestManager/Managers/CreateStreamTwoPhaseRequestManager.cs
  31. +1 −0  src/EventStore/EventStore.Core/Services/Transport/Http/Controllers/AtomControllerDefinitions.cs
  32. +6 −6 src/EventStore/EventStore.Core/Services/Transport/Tcp/ClientTcpDispatcher.cs
  33. +21 −20 src/EventStore/EventStore.Core/Services/VNode/SingleVNodeController.cs
  34. +2 −1  src/EventStore/EventStore.TestClient/Commands/CreateStreamProcessor.cs
  35. +1 −1  src/EventStore/EventStore.TestClient/Commands/DvuBasic/BankAccountBasicProducer.cs
  36. +1 −0  src/EventStore/EventStore.TestClient/Commands/MultiWriteFloodWaiting.cs
  37. +1 −0  src/EventStore/EventStore.TestClient/Commands/MultiWriteProcessor.cs
  38. +4 −0 src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/BankAccountEvent.cs
  39. +3 −3 src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/ScenarioBase.cs
  40. +3 −0  src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/TestEvent.cs
  41. +1 −0  src/EventStore/EventStore.TestClient/Commands/TransactionWriteProcessor.cs
  42. +1 −0  src/EventStore/EventStore.TestClient/Commands/WriteFloodProcessor.cs
  43. +1 −0  src/EventStore/EventStore.TestClient/Commands/WriteFloodWaitingProcessor.cs
  44. +1 −0  src/EventStore/EventStore.TestClient/Commands/WriteLongTermProcessor.cs
  45. +1 −0  src/EventStore/EventStore.TestClient/Commands/WriteProcessor.cs
  46. +1 −1  src/EventStore/EventStore.TestClient/MessageUtil.cs
  47. +4 −2 src/EventStore/Protos/ClientAPI/ClientMessageDtos.proto
2  src/EventStore/EventStore.ClientAPI/ClientOperations/AppendToStreamOperation.cs
View
@@ -87,7 +87,7 @@ public TcpPackage CreateNetworkPackage()
{
lock (_corrIdLock)
{
- var dtos = _events.Select(x => new ClientMessage.ClientEvent(x.EventId.ToByteArray(), x.Type, x.Data, x.Metadata)).ToArray();
+ var dtos = _events.Select(x => new ClientMessage.ClientEvent(x.EventId.ToByteArray(), x.Type, x.IsJson, x.Data, x.Metadata)).ToArray();
var write = new ClientMessage.WriteEvents(_stream, _expectedVersion, dtos, _forward);
return new TcpPackage(TcpCommand.WriteEvents, _correlationId, write.Serialize());
}
5 src/EventStore/EventStore.ClientAPI/ClientOperations/CreateStreamOperation.cs
View
@@ -47,6 +47,7 @@ internal class CreateStreamOperation : IClientOperation
private readonly bool _forward;
private readonly string _stream;
+ private readonly bool _isJson;
private readonly byte[] _metadata;
public Guid CorrelationId
@@ -62,6 +63,7 @@ public Guid CorrelationId
Guid correlationId,
bool forward,
string stream,
+ bool isJson,
byte[] metadata)
{
_source = source;
@@ -69,6 +71,7 @@ public Guid CorrelationId
_correlationId = correlationId;
_forward = forward;
_stream = stream;
+ _isJson = isJson;
_metadata = metadata;
}
@@ -82,7 +85,7 @@ public TcpPackage CreateNetworkPackage()
{
lock (_corrIdLock)
{
- var dto = new ClientMessage.CreateStream(_stream, _metadata, _forward);
+ var dto = new ClientMessage.CreateStream(_stream, _metadata, _forward, _isJson);
return new TcpPackage(TcpCommand.CreateStream, _correlationId, dto.Serialize());
}
}
2  src/EventStore/EventStore.ClientAPI/ClientOperations/TransactionalWriteOperation.cs
View
@@ -87,7 +87,7 @@ public TcpPackage CreateNetworkPackage()
{
lock (_corrIdLock)
{
- var dtos = _events.Select(x => new ClientMessage.ClientEvent(x.EventId.ToByteArray(), x.Type, x.Data, x.Metadata)).ToArray();
+ var dtos = _events.Select(x => new ClientMessage.ClientEvent(x.EventId.ToByteArray(), x.Type, x.IsJson, x.Data, x.Metadata)).ToArray();
var write = new ClientMessage.TransactionWrite(_transactionId, _stream, dtos, _forward);
return new TcpPackage(TcpCommand.TransactionWrite, _corrId, write.Serialize());
}
8 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
View
@@ -230,22 +230,22 @@ void IDisposable.Dispose()
Close();
}
- public void CreateStream(string stream, byte[] metadata)
+ public void CreateStream(string stream, bool isJson, byte[] metadata)
{
Ensure.NotNullOrEmpty(stream, "stream");
EnsureActive();
- var task = CreateStreamAsync(stream, metadata);
+ var task = CreateStreamAsync(stream, isJson, metadata);
task.Wait();
}
- public Task CreateStreamAsync(string stream, byte[] metadata)
+ public Task CreateStreamAsync(string stream, bool isJson, byte[] metadata)
{
Ensure.NotNullOrEmpty(stream, "stream");
EnsureActive();
var source = new TaskCompletionSource<object>();
- var operation = new CreateStreamOperation(source, Guid.NewGuid(), _allowForwarding, stream, metadata);
+ var operation = new CreateStreamOperation(source, Guid.NewGuid(), _allowForwarding, stream, isJson, metadata);
EnqueueOperation(operation);
return source.Task;
1  src/EventStore/EventStore.ClientAPI/IEvent.cs
View
@@ -33,6 +33,7 @@ public interface IEvent
{
Guid EventId { get; }
string Type { get; }
+ bool IsJson { get; }
byte[] Data { get; }
byte[] Metadata { get; }
18 src/EventStore/EventStore.ClientAPI/Messages/ClientMessage.cs
View
@@ -44,18 +44,22 @@ public partial class ClientEvent
[ProtoMember(2, IsRequired = false, Name=@"event_type", DataFormat = DataFormat.Default)]
public readonly string EventType;
- [ProtoMember(3, IsRequired = true, Name=@"data", DataFormat = DataFormat.Default)]
+ [ProtoMember(3, IsRequired = true, Name=@"is_json", DataFormat = DataFormat.Default)]
+ public readonly bool IsJson;
+
+ [ProtoMember(4, IsRequired = true, Name=@"data", DataFormat = DataFormat.Default)]
public readonly byte[] Data;
- [ProtoMember(4, IsRequired = false, Name=@"metadata", DataFormat = DataFormat.Default)]
+ [ProtoMember(5, IsRequired = false, Name=@"metadata", DataFormat = DataFormat.Default)]
public readonly byte[] Metadata;
private ClientEvent() {}
- public ClientEvent(byte[] eventId, string eventType, byte[] data, byte[] metadata)
+ public ClientEvent(byte[] eventId, string eventType, bool isJson, byte[] data, byte[] metadata)
{
EventId = eventId;
EventType = eventType;
+ IsJson = isJson;
Data = data;
Metadata = metadata;
}
@@ -132,14 +136,18 @@ public partial class CreateStream
[ProtoMember(3, IsRequired = true, Name=@"allow_forwarding", DataFormat = DataFormat.Default)]
public readonly bool AllowForwarding;
+
+ [ProtoMember(4, IsRequired = true, Name = @"is_json", DataFormat = DataFormat.Default)]
+ public readonly bool IsJson;
private CreateStream() {}
-
- public CreateStream(string eventStreamId, byte[] metadata, bool allowForwarding)
+
+ public CreateStream(string eventStreamId, byte[] metadata, bool allowForwarding, bool isJson)
{
EventStreamId = eventStreamId;
Metadata = metadata;
AllowForwarding = allowForwarding;
+ IsJson = isJson;
}
}
24 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_backward_should.cs
View
@@ -57,7 +57,7 @@ public void return_empty_slice_if_asked_to_read_from_start()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var read = store.ReadAllEventsBackwardAsync(Position.Start, 1);
@@ -94,7 +94,7 @@ public void return_partial_slice_if_not_enough_events()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 20).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -118,7 +118,7 @@ public void return_events_in_reversed_order_compared_to_written()
store.Connect(Node.TcpEndPoint);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var write = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, testEvents);
@@ -138,10 +138,10 @@ public void read_stream_created_events_as_well()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var read = store.ReadAllEventsBackwardAsync(Position.End, 2);
@@ -159,7 +159,7 @@ public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -188,7 +188,7 @@ public void be_able_to_read_events_slice_at_time()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -217,10 +217,10 @@ public void not_return_events_from_deleted_streams()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -256,10 +256,10 @@ public void not_return_stream_deleted_records()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var delete1 = store.DeleteStreamAsync(stream + 1, ExpectedVersion.EmptyStream);
@@ -280,7 +280,7 @@ public void return_no_records_if_stream_created_than_deleted()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
30 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/read_all_events_forward_should.cs
View
@@ -60,7 +60,7 @@ public void return_empty_slice_if_asked_to_read_from_end()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -104,13 +104,13 @@ public void return_events_in_same_order_as_written()
store.Connect(Node.TcpEndPoint);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
var write5to1 = store.AppendToStreamAsync(stream + 1, ExpectedVersion.EmptyStream, testEvents);
Assert.DoesNotThrow(write5to1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var write5to2 = store.AppendToStreamAsync(stream + 2, ExpectedVersion.EmptyStream, testEvents);
@@ -131,10 +131,10 @@ public void read_stream_created_events_as_well()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var read = store.ReadAllEventsForwardAsync(Position.Start, 2);
@@ -152,7 +152,7 @@ public void be_able_to_read_all_one_by_one_and_return_empty_slice_at_last()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 5).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -181,7 +181,7 @@ public void be_able_to_read_events_slice_at_time()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 20).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -210,7 +210,7 @@ public void return_partial_slice_if_not_enough_events()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 20).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -232,10 +232,10 @@ public void not_return_events_from_deleted_streams()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -271,10 +271,10 @@ public void not_return_stream_deleted_records()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var delete1 = store.DeleteStreamAsync(stream + 1, ExpectedVersion.EmptyStream);
@@ -295,10 +295,10 @@ public void return_no_records_if_stream_created_than_deleted()
using (var store = EventStoreConnection.Create())
{
store.Connect(Node.TcpEndPoint);
- var create1 = store.CreateStreamAsync(stream + 1, new byte[0]);
+ var create1 = store.CreateStreamAsync(stream + 1, false, new byte[0]);
Assert.DoesNotThrow(create1.Wait);
- var create2 = store.CreateStreamAsync(stream + 2, new byte[0]);
+ var create2 = store.CreateStreamAsync(stream + 2, false, new byte[0]);
Assert.DoesNotThrow(create2.Wait);
var delete1 = store.DeleteStreamAsync(stream + 1, ExpectedVersion.EmptyStream);
@@ -326,7 +326,7 @@ public void recover_from_dropped_subscription_state_using_last_known_position()
Position lastKnonwPosition = null;
var dropped = new AutoResetEvent(false);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
store.SubscribeAsync(stream,
4 src/EventStore/EventStore.Core.Tests/ClientAPI/AllEvents/subscribe_to_all_should.cs
View
@@ -67,7 +67,7 @@ public void allow_multiple_subscriptions()
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
Assert.That(appeared.Wait(Timeout));
@@ -109,7 +109,7 @@ public void catch_created_and_deleted_events_as_well()
store.SubscribeToAllStreamsAsync(eventAppeared, subscriptionDropped);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
Assert.That(delete.Wait(Timeout));
3  src/EventStore/EventStore.Core.Tests/ClientAPI/TestEvent.cs
View
@@ -36,6 +36,8 @@ internal class TestEvent : IEvent
public Guid EventId { get; private set; }
public string Type { get; private set; }
+ public bool IsJson { get; private set; }
+
public byte[] Data { get; private set; }
public byte[] Metadata { get; private set; }
@@ -44,6 +46,7 @@ public TestEvent(string data = null, string metadata = null)
EventId = Guid.NewGuid();
Type = GetType().FullName;
+ IsJson = false;
Data = Encoding.UTF8.GetBytes(data ?? EventId.ToString());
Metadata = Encoding.UTF8.GetBytes(metadata ?? "metadata");
}
14 src/EventStore/EventStore.Core.Tests/ClientAPI/append_to_stream.cs
View
@@ -91,7 +91,7 @@ public void should_fail_writing_with_correct_exp_ver_to_deleted_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -110,7 +110,7 @@ public void should_fail_writing_with_any_exp_ver_to_deleted_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -129,7 +129,7 @@ public void should_fail_writing_with_invalid_exp_ver_to_deleted_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -148,7 +148,7 @@ public void should_append_with_correct_exp_ver_to_existing_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, new[] { new TestEvent() });
@@ -164,7 +164,7 @@ public void should_append_with_any_exp_ver_to_existing_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var append = store.AppendToStreamAsync(stream, ExpectedVersion.Any, new[] { new TestEvent() });
@@ -180,7 +180,7 @@ public void should_fail_appending_with_wrong_exp_ver_to_existing_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var append = store.AppendToStreamAsync(stream, 1, new[] { new TestEvent() });
@@ -196,7 +196,7 @@ public void can_append_multiple_events_at_once()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var events = Enumerable.Range(0, 100).Select(i => new TestEvent(i.ToString(), i.ToString()));
12 src/EventStore/EventStore.Core.Tests/ClientAPI/creating_stream.cs
View
@@ -43,7 +43,7 @@ public void which_does_not_exist_should_be_successfull()
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
}
}
@@ -56,7 +56,7 @@ public void which_supposed_to_be_system_should_succees__but_on_your_own_risk()
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
}
}
@@ -69,10 +69,10 @@ public void which_already_exists_should_fail()
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var initialCreate = connection.CreateStreamAsync(stream, new byte[0]);
+ var initialCreate = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(initialCreate.Wait);
- var secondCreate = connection.CreateStreamAsync(stream, new byte[0]);
+ var secondCreate = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.Inconclusive();
//Assert.That(() => secondCreate.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<WrongExpectedVersionException>());
}
@@ -86,13 +86,13 @@ public void which_was_deleted_should_fail()
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = connection.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(delete.Wait);
- var secondCreate = connection.CreateStreamAsync(stream, new byte[0]);
+ var secondCreate = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(() => secondCreate.Wait(), Throws.Exception.TypeOf<AggregateException>().With.InnerException.TypeOf<StreamDeletedException>());
}
}
8 src/EventStore/EventStore.Core.Tests/ClientAPI/deleting_stream.cs
View
@@ -43,7 +43,7 @@ public void which_already_exists_should_success_when_passed_empty_stream_expecte
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = connection.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
@@ -59,7 +59,7 @@ public void which_already_exists_should_success_when_passed_any_for_expected_ver
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = connection.DeleteStreamAsync(stream, ExpectedVersion.Any);
@@ -75,7 +75,7 @@ public void with_invalid_expected_version_should_fail()
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = connection.DeleteStreamAsync(stream, 1);
@@ -105,7 +105,7 @@ public void which_was_allready_deleted_should_fail()
using (var connection = EventStoreConnection.Create())
{
connection.Connect(MiniNode.Instance.TcpEndPoint);
- var create = connection.CreateStreamAsync(stream, new byte[0]);
+ var create = connection.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = connection.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
16 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_backward_should.cs
View
@@ -71,7 +71,7 @@ public void notify_using_status_code_if_stream_was_deleted()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(delete.Wait);
@@ -91,7 +91,7 @@ public void return_single_event_when_called_on_empty_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var read = store.ReadEventStreamBackwardAsync(stream, StreamPosition.End, 1);
@@ -109,7 +109,7 @@ public void return_partial_slice_if_no_enough_events_in_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -131,7 +131,7 @@ public void return_events_reversed_compared_to_written()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -153,7 +153,7 @@ public void be_able_to_read_single_event_from_arbitrary_position()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -175,7 +175,7 @@ public void be_able_to_read_first_event()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -197,7 +197,7 @@ public void be_able_to_read_last_event()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -219,7 +219,7 @@ public void be_able_to_read_slice_from_arbitrary_position()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
18 src/EventStore/EventStore.Core.Tests/ClientAPI/read_event_stream_forward_should.cs
View
@@ -81,7 +81,7 @@ public void notify_using_status_code_if_stream_was_deleted()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
Assert.DoesNotThrow(delete.Wait);
@@ -101,7 +101,7 @@ public void return_single_event_when_called_on_empty_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var read = store.ReadEventStreamForwardAsync(stream, 0, 1);
@@ -119,7 +119,7 @@ public void return_empty_slice_when_called_on_empty_stream_starting_at_position_
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var read = store.ReadEventStreamForwardAsync(stream, 1, 1);
@@ -137,7 +137,7 @@ public void return_empty_slice_when_called_on_non_existing_range()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var write10 = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())));
@@ -158,7 +158,7 @@ public void return_partial_slice_if_no_enough_events_in_stream()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var write10 = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())));
@@ -179,7 +179,7 @@ public void return_partial_slice_when_got_int_max_value_as_maxcount()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var write10 = store.AppendToStreamAsync(stream, ExpectedVersion.EmptyStream, Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())));
@@ -200,7 +200,7 @@ public void return_events_in_same_order_as_written()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -222,7 +222,7 @@ public void be_able_to_read_single_event_from_arbitrary_position()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
@@ -244,7 +244,7 @@ public void be_able_to_read_slice_from_arbitrary_position()
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.DoesNotThrow(create.Wait);
var testEvents = Enumerable.Range(0, 10).Select(x => new TestEvent((x + 1).ToString())).ToArray();
10 src/EventStore/EventStore.Core.Tests/ClientAPI/subscribe_should.cs
View
@@ -52,7 +52,7 @@ public void be_able_to_subscribe_to_non_existing_stream_and_then_catch_created_e
store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
Assert.That(appeared.Wait(Timeout));
@@ -75,7 +75,7 @@ public void allow_multiple_subscriptions_to_same_stream()
store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
Assert.That(appeared.Wait(Timeout));
@@ -116,7 +116,7 @@ public void subscribe_to_deleted_stream_as_well_but_never_invoke_user_callbacks(
Action<RecordedEvent, Position> eventAppeared = (x, p) => appeared.Signal();
Action subscriptionDropped = () => dropped.Signal();
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
Assert.That(delete.Wait(Timeout));
@@ -140,7 +140,7 @@ public void not_call_dropped_if_stream_was_deleted()
Action<RecordedEvent, Position> eventAppeared = (x, p) => appeared.Signal();
Action subscriptionDropped = () => dropped.Signal();
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
var subscribe = store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
@@ -170,7 +170,7 @@ public void catch_created_and_deleted_events_as_well()
store.SubscribeAsync(stream, eventAppeared, subscriptionDropped);
- var create = store.CreateStreamAsync(stream, new byte[0]);
+ var create = store.CreateStreamAsync(stream, false, new byte[0]);
Assert.That(create.Wait(Timeout));
var delete = store.DeleteStreamAsync(stream, ExpectedVersion.EmptyStream);
Assert.That(delete.Wait(Timeout));
8 src/EventStore/EventStore.Core.Tests/ClientAPI/transaction.cs
View
@@ -144,7 +144,7 @@ public void should_commit_when_writing_with_exp_ver_any_even_while_somene_is_wri
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- store.CreateStream(stream, new byte[0]);
+ store.CreateStream(stream, false, new byte[0]);
}
//500 events during transaction
@@ -218,7 +218,7 @@ public void should_fail_to_commit_if_started_with_correct_ver_but_committing_wit
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- store.CreateStream(stream, new byte[0]);
+ store.CreateStream(stream, false, new byte[0]);
}
using (var store = EventStoreConnection.Create())
@@ -246,7 +246,7 @@ public void should_not_fail_to_commit_if_started_with_wrong_ver_but_committing_w
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- store.CreateStream(stream, new byte[0]);
+ store.CreateStream(stream, false, new byte[0]);
}
using (var store = EventStoreConnection.Create())
@@ -274,7 +274,7 @@ public void should_fail_to_commit_if_started_with_correct_ver_but_on_commit_stre
using (var store = EventStoreConnection.Create())
{
store.Connect(MiniNode.Instance.TcpEndPoint);
- store.CreateStream(stream, new byte[0]);
+ store.CreateStream(stream, false, new byte[0]);
}
using (var store = EventStoreConnection.Create())
2  ...EventStore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_accepts_request.cs
View
@@ -50,7 +50,7 @@ protected override IEnumerable<Message> WithInitialMessages()
protected override Message When()
{
- return new StorageMessage.CreateStreamRequestCreated(CorrelationId, new NoopEnvelope(), "test123", Metadata);
+ return new StorageMessage.CreateStreamRequestCreated(CorrelationId, new NoopEnvelope(), "test123", false, Metadata);
}
[Test]
2  ...ore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_completes_successfully.cs
View
@@ -46,7 +46,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
2  ...ore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_already_committed.cs
View
@@ -45,7 +45,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
}
protected override Message When()
2  ...re.Tests/Services/Replication/CreateStream/when_create_stream_gets_commit_timeout_before_commit_stage.cs
View
@@ -45,7 +45,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
}
2  ...re.Tests/Services/Replication/CreateStream/when_create_stream_gets_commit_timeout_before_final_commit.cs
View
@@ -46,7 +46,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
2  ...e.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_prepare_timeout_after_prepares.cs
View
@@ -45,7 +45,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
yield return new StorageMessage.PrepareAck(CorrelationId, 1, PrepareFlags.SingleWrite);
2  ....Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_prepare_timeout_before_prepares.cs
View
@@ -45,7 +45,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
}
protected override Message When()
2  ...tStore/EventStore.Core.Tests/Services/Replication/CreateStream/when_create_stream_gets_stream_deleted.cs
View
@@ -45,7 +45,7 @@ protected override TwoPhaseRequestManagerBase OnManager(FakePublisher publisher)
protected override IEnumerable<Message> WithInitialMessages()
{
- yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", Metadata);
+ yield return new StorageMessage.CreateStreamRequestCreated(CorrelationId, Envelope, "test123", false, Metadata);
}
protected override Message When()
2  src/EventStore/EventStore.Core.Tests/Services/Replication/RequestManagerSpecification.cs
View
@@ -52,7 +52,7 @@ public abstract class RequestManagerSpecification
protected Event DummyEvent()
{
- return new Event(Guid.NewGuid(), "test", true, EventData, Metadata);
+ return new Event(Guid.NewGuid(), "test", false, EventData, Metadata);
}
[SetUp]
4 src/EventStore/EventStore.Core/Messages/ClientMessage.cs
View
@@ -103,9 +103,10 @@ public class CreateStream: WriteRequestMessage
public readonly bool AllowForwarding;
public readonly string EventStreamId;
+ public readonly bool IsJson;
public readonly byte[] Metadata;
- public CreateStream(Guid correlationId, IEnvelope envelope, bool allowForwarding, string eventStreamId, byte[] metadata)
+ public CreateStream(Guid correlationId, IEnvelope envelope, bool allowForwarding, string eventStreamId, bool isJson, byte[] metadata)
{
Ensure.NotNull(envelope, "envelope");
Ensure.NotNull(eventStreamId, "eventStreamId");
@@ -114,6 +115,7 @@ public CreateStream(Guid correlationId, IEnvelope envelope, bool allowForwarding
Envelope = envelope;
AllowForwarding = allowForwarding;
EventStreamId = eventStreamId;
+ IsJson = isJson;
Metadata = metadata;
}
}
3  src/EventStore/EventStore.Core/Messages/StorageMessage.cs
View
@@ -261,11 +261,13 @@ public class CreateStreamRequestCreated : Message
public readonly Guid CorrelationId;
public readonly IEnvelope Envelope;
public readonly string EventStreamId;
+ public readonly bool IsJson;
public readonly byte[] Metadata;
public CreateStreamRequestCreated(Guid correlationId,
IEnvelope envelope,
string eventStreamId,
+ bool isJson,
byte[] metadata)
{
Ensure.NotEmptyGuid(correlationId, "correlationId");
@@ -276,6 +278,7 @@ public class CreateStreamRequestCreated : Message
CorrelationId = correlationId;
Envelope = envelope;
EventStreamId = eventStreamId;
+ IsJson = isJson;
Metadata = metadata;
}
}
17 src/EventStore/EventStore.Core/Messages/TcpClientMessageDto.cs
View
@@ -44,18 +44,22 @@ public partial class ClientEvent
[ProtoMember(2, IsRequired = false, Name=@"event_type", DataFormat = DataFormat.Default)]
public readonly string EventType;
- [ProtoMember(3, IsRequired = true, Name=@"data", DataFormat = DataFormat.Default)]
+ [ProtoMember(3, IsRequired = true, Name=@"is_json", DataFormat = DataFormat.Default)]
+ public readonly bool IsJson;
+
+ [ProtoMember(4, IsRequired = true, Name=@"data", DataFormat = DataFormat.Default)]
public readonly byte[] Data;
- [ProtoMember(4, IsRequired = false, Name=@"metadata", DataFormat = DataFormat.Default)]
+ [ProtoMember(5, IsRequired = false, Name=@"metadata", DataFormat = DataFormat.Default)]
public readonly byte[] Metadata;
private ClientEvent() {}
- public ClientEvent(byte[] eventId, string eventType, byte[] data, byte[] metadata)
+ public ClientEvent(byte[] eventId, string eventType, bool isJson, byte[] data, byte[] metadata)
{
EventId = eventId;
EventType = eventType;
+ IsJson = isJson;
Data = data;
Metadata = metadata;
}
@@ -132,14 +136,19 @@ public partial class CreateStream
[ProtoMember(3, IsRequired = true, Name=@"allow_forwarding", DataFormat = DataFormat.Default)]
public readonly bool AllowForwarding;
+
+ [ProtoMember(4, IsRequired = true, Name = @"is_json", DataFormat = DataFormat.Default)]
+ public readonly bool IsJson;
+
private CreateStream() {}
- public CreateStream(string eventStreamId, byte[] metadata, bool allowForwarding)
+ public CreateStream(string eventStreamId, byte[] metadata, bool allowForwarding, bool isJson)
{
EventStreamId = eventStreamId;
Metadata = metadata;
AllowForwarding = allowForwarding;
+ IsJson = isJson;
}
}
2  src/EventStore/EventStore.Core/Services/RequestManager/Managers/CreateStreamTwoPhaseRequestManager.cs
View
@@ -56,7 +56,7 @@ public void Handle(StorageMessage.CreateStreamRequestCreated request)
_publishEnvelope,
request.EventStreamId,
ExpectedVersion.NoStream,
- new[] { new Event(Guid.NewGuid(), "StreamCreated", true, LogRecord.NoData, request.Metadata) },
+ new[] { new Event(Guid.NewGuid(), "StreamCreated", request.IsJson, LogRecord.NoData, request.Metadata) },
allowImplicitStreamCreation: false,
liveUntil: DateTime.UtcNow + Timeouts.PrepareWriteMessageTimeout));
Publisher.Publish(TimerMessage.Schedule.Create(Timeouts.PrepareTimeout,
1  src/EventStore/EventStore.Core/Services/Transport/Http/Controllers/AtomControllerDefinitions.cs
View
@@ -331,6 +331,7 @@ private void CreateStreamBodyRead(HttpEntityManager manager, string body)
envelope,
true,
create.EventStreamId,
+ false,//TODO TR discover
Encoding.UTF8.GetBytes(create.Metadata ?? string.Empty));
Publish(msg);
}
12 src/EventStore/EventStore.Core/Services/Transport/Tcp/ClientTcpDispatcher.cs
View
@@ -114,12 +114,12 @@ private static ClientMessage.CreateStream UnwrapCreateStream(TcpPackage package,
{
var dto = package.Data.Deserialize<TcpClientMessageDto.CreateStream>();
if (dto == null) return null;
- return new ClientMessage.CreateStream(package.CorrelationId, envelope, dto.AllowForwarding, dto.EventStreamId, dto.Metadata);
+ return new ClientMessage.CreateStream(package.CorrelationId, envelope, dto.AllowForwarding, dto.EventStreamId, dto.IsJson, dto.Metadata);
}
private static TcpPackage WrapCreateStream(ClientMessage.CreateStream msg)
{
- var dto = new TcpClientMessageDto.CreateStream(msg.EventStreamId, msg.Metadata, msg.AllowForwarding);
+ var dto = new TcpClientMessageDto.CreateStream(msg.EventStreamId, msg.Metadata, msg.AllowForwarding, msg.IsJson);
return new TcpPackage(TcpCommand.CreateStream, msg.CorrelationId, dto.Serialize());
}
@@ -146,7 +146,7 @@ private static ClientMessage.WriteEvents UnwrapWriteEvents(TcpPackage package, I
dto.AllowForwarding,
dto.EventStreamId,
dto.ExpectedVersion,
- dto.Events.Select(x => new Event(new Guid(x.EventId), x.EventType, false, x.Data, x.Metadata)).ToArray());
+ dto.Events.Select(x => new Event(new Guid(x.EventId), x.EventType, x.IsJson, x.Data, x.Metadata)).ToArray());
}
private static TcpPackage WrapWriteEvents(ClientMessage.WriteEvents msg)
@@ -154,7 +154,7 @@ private static TcpPackage WrapWriteEvents(ClientMessage.WriteEvents msg)
var dto = new TcpClientMessageDto.WriteEvents(
msg.EventStreamId,
msg.ExpectedVersion,
- msg.Events.Select(x => new TcpClientMessageDto.ClientEvent(x.EventId.ToByteArray(), x.EventType, x.Data, x.Metadata)).ToArray(),
+ msg.Events.Select(x => new TcpClientMessageDto.ClientEvent(x.EventId.ToByteArray(), x.EventType, x.IsJson, x.Data, x.Metadata)).ToArray(),
msg.AllowForwarding);
return new TcpPackage(TcpCommand.WriteEvents, msg.CorrelationId, dto.Serialize());
}
@@ -221,14 +221,14 @@ private static ClientMessage.TransactionWrite UnwrapTransactionWrite(TcpPackage
dto.AllowForwarding,
dto.TransactionId,
dto.EventStreamId,
- dto.Events.Select(x => new Event(new Guid(x.EventId), x.EventType, false, x.Data, x.Metadata)).ToArray());
+ dto.Events.Select(x => new Event(new Guid(x.EventId), x.EventType, x.IsJson, x.Data, x.Metadata)).ToArray());
}
private static TcpPackage WrapTransactionWrite(ClientMessage.TransactionWrite msg)
{
var dto = new TcpClientMessageDto.TransactionWrite(msg.TransactionId,
msg.EventStreamId,
- msg.Events.Select(x => new TcpClientMessageDto.ClientEvent(x.EventId.ToByteArray(), x.EventType, x.Data, x.Metadata)).ToArray(),
+ msg.Events.Select(x => new TcpClientMessageDto.ClientEvent(x.EventId.ToByteArray(), x.EventType, x.IsJson, x.Data, x.Metadata)).ToArray(),
msg.AllowForwarding);
return new TcpPackage(TcpCommand.TransactionWrite, msg.CorrelationId, dto.Serialize());
}
41 src/EventStore/EventStore.Core/Services/VNode/SingleVNodeController.cs
View
@@ -187,51 +187,52 @@ private void CheckInitializationDone()
private void Handle(ClientMessage.CreateStream message)
{
_outputBus.Publish(new StorageMessage.CreateStreamRequestCreated(message.CorrelationId,
- message.Envelope,
- message.EventStreamId,
- message.Metadata));
+ message.Envelope,
+ message.EventStreamId,
+ message.IsJson,
+ message.Metadata));
}
private void Handle(ClientMessage.WriteEvents message)
{
_outputBus.Publish(new StorageMessage.WriteRequestCreated(message.CorrelationId,
- message.Envelope,
- message.EventStreamId,
- message.ExpectedVersion,
- message.Events));
+ message.Envelope,
+ message.EventStreamId,
+ message.ExpectedVersion,
+ message.Events));
}
private void Handle(ClientMessage.TransactionStart message)
{
_outputBus.Publish(new StorageMessage.TransactionStartRequestCreated(message.CorrelationId,
- message.Envelope,
- message.EventStreamId,
- message.ExpectedVersion));
+ message.Envelope,
+ message.EventStreamId,
+ message.ExpectedVersion));
}
private void Handle(ClientMessage.TransactionWrite message)
{
_outputBus.Publish(new StorageMessage.TransactionWriteRequestCreated(message.CorrelationId,
- message.Envelope,
- message.TransactionId,
- message.EventStreamId,
- message.Events));
+ message.Envelope,
+ message.TransactionId,
+ message.EventStreamId,
+ message.Events));
}
private void Handle(ClientMessage.TransactionCommit message)
{
_outputBus.Publish(new StorageMessage.TransactionCommitRequestCreated(message.CorrelationId,
- message.Envelope,
- message.TransactionId,
- message.EventStreamId));
+ message.Envelope,
+ message.TransactionId,
+ message.EventStreamId));
}
private void Handle(ClientMessage.DeleteStream message)
{
_outputBus.Publish(new StorageMessage.DeleteStreamRequestCreated(message.CorrelationId,
- message.Envelope,
- message.EventStreamId,
- message.ExpectedVersion));
+ message.Envelope,
+ message.EventStreamId,
+ message.ExpectedVersion));
}
private void Handle(ClientMessage.RequestShutdown message)
3  src/EventStore/EventStore.TestClient/Commands/CreateStreamProcessor.cs
View
@@ -58,7 +58,8 @@ public bool Execute(CommandProcessorContext context, string[] args)
var createStreamDto = new TcpClientMessageDto.CreateStream(
eventStreamId,
Encoding.UTF8.GetBytes(metadata ?? string.Format("{{\"StreamName\": \"{0}\"}}", eventStreamId)),
- true);
+ true,
+ metadata == null);
var package = new TcpPackage(TcpCommand.CreateStream, Guid.NewGuid(), createStreamDto.Serialize());
var sw = new Stopwatch();
2  src/EventStore/EventStore.TestClient/Commands/DvuBasic/BankAccountBasicProducer.cs
View
@@ -50,7 +50,7 @@ public Event Create(int version)
var accountObject = BankAccountEventFactory.CreateAccountObject(version);
var serializedObject = Codec.Json.To(accountObject);
- var @event = new Event(Guid.NewGuid(), accountObject.GetType().FullName, false, Encoding.UTF8.GetBytes(serializedObject), new byte[0]);
+ var @event = new Event(Guid.NewGuid(), accountObject.GetType().FullName, true, Encoding.UTF8.GetBytes(serializedObject), new byte[0]);
return @event;
}
1  src/EventStore/EventStore.TestClient/Commands/MultiWriteFloodWaiting.cs
View
@@ -138,6 +138,7 @@ private void WriteFlood(CommandProcessorContext context, int writeCnt, int clien
new TcpClientMessageDto.ClientEvent(
Guid.NewGuid().ToByteArray(),
"type",
+ false,
Encoding.UTF8.GetBytes(data),
new byte[0])).ToArray(),
true);
1  src/EventStore/EventStore.TestClient/Commands/MultiWriteProcessor.cs
View
@@ -66,6 +66,7 @@ public bool Execute(CommandProcessorContext context, string[] args)
Enumerable.Range(0, writeCount).Select(x =>
new TcpClientMessageDto.ClientEvent(Guid.NewGuid().ToByteArray(),
"type",
+ false,
Encoding.UTF8.GetBytes(data),
new byte[0])).ToArray(),
true);
4 src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/BankAccountEvent.cs
View
@@ -38,6 +38,9 @@ internal class BankAccountEvent : IEvent
{
public Guid EventId { get; private set; }
public string Type { get; private set; }
+
+ public bool IsJson { get; private set; }
+
public byte[] Data { get; private set; }
public byte[] Metadata { get; private set; }
@@ -49,6 +52,7 @@ public BankAccountEvent(object accountObject)
EventId = Guid.NewGuid();
Type = accountObject.GetType().Name;
+ IsJson = true;
Data = Encoding.UTF8.GetBytes(Codec.Json.To(accountObject));
Metadata = Encoding.UTF8.GetBytes(Codec.Json.To(new Dictionary<string, object> { { "IsEmpty", true } }));
}
6 src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/ScenarioBase.cs
View
@@ -444,7 +444,7 @@ private Task WriteSingleEventAtTime(string stream, int events, Func<int, IEvent>
Log.Info("Starting to write {0} events to [{1}]", events, stream);
var store = GetConnection();
int eventVersion = 0;
- var createTask = store.CreateStreamAsync(stream, Encoding.UTF8.GetBytes("metadata"));
+ var createTask = store.CreateStreamAsync(stream, false, Encoding.UTF8.GetBytes("metadata"));
Action<Task> fail = prevTask =>
{
@@ -484,7 +484,7 @@ private Task WriteBucketOfEventsAtTime(string stream, int eventCount, Func<int,
var resSource = new TaskCompletionSource<object>();
var store = GetConnection();
int writtenCount = 0;
- var createTask = store.CreateStreamAsync(stream, Encoding.UTF8.GetBytes("metadata"));
+ var createTask = store.CreateStreamAsync(stream, false, Encoding.UTF8.GetBytes("metadata"));
Action<Task> fail = prevTask =>
{
@@ -534,7 +534,7 @@ private Task WriteEventsInTransactionalWay(string stream, int eventCount, Func<i
int writtenCount = 0;
long transactionId = -1;
- var createTask = store.CreateStreamAsync(stream, Encoding.UTF8.GetBytes("metadata"));
+ var createTask = store.CreateStreamAsync(stream, false, Encoding.UTF8.GetBytes("metadata"));
createTask.ContinueWith(fail, TaskContinuationOptions.OnlyOnFaulted);
Action<Task> writeTransactionEvent = null;
3  src/EventStore/EventStore.TestClient/Commands/RunTestScenarios/TestEvent.cs
View
@@ -37,6 +37,8 @@ internal class TestEvent : IEvent
public Guid EventId { get; private set; }
public string Type { get; private set; }
+ public bool IsJson { get; private set; }
+
public byte[] Data { get; private set; }
public byte[] Metadata { get; private set; }
@@ -49,6 +51,7 @@ public TestEvent(int index)
var body = new string('#', 1 + 17 * subIndex * subIndex);
+ IsJson = false;
Data = Encoding.UTF8.GetBytes(string.Format("{0}-{1}-{2}", index, body.Length, body));
Metadata = new byte[0];
}
1  src/EventStore/EventStore.TestClient/Commands/TransactionWriteProcessor.cs
View
@@ -109,6 +109,7 @@ public bool Execute(CommandProcessorContext context, string[] args)
{
new TcpClientMessageDto.ClientEvent(Guid.NewGuid().ToByteArray() ,
"TakeSomeSpaceEvent",
+ false,
Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()),
Encoding.UTF8.GetBytes(Guid.NewGuid().ToString()))
},
1  src/EventStore/EventStore.TestClient/Commands/WriteFloodProcessor.cs
View
@@ -172,6 +172,7 @@ private void WriteFlood(CommandProcessorContext context, int clientsCnt, long re
{
new TcpClientMessageDto.ClientEvent(Guid.NewGuid().ToByteArray(),
"TakeSomeSpaceEvent",
+ false,
Encoding.UTF8.GetBytes("DATA" + new string('*', 256)),
Encoding.UTF8.GetBytes("METADATA" + new string('$', 100)))
},
1  src/EventStore/EventStore.TestClient/Commands/WriteFloodWaitingProcessor.cs
View
@@ -130,6 +130,7 @@ private void WriteFlood(CommandProcessorContext context, int clientsCnt, int req
{
new TcpClientMessageDto.ClientEvent(Guid.NewGuid().ToByteArray(),
"TakeSomeSpaceEvent",
+ false,
Encoding.UTF8.GetBytes("DATA" + new string('*', 256)),
Encoding.UTF8.GetBytes("METADATA" + new string('$', 100)))
},
1  src/EventStore/EventStore.TestClient/Commands/WriteLongTermProcessor.cs
View
@@ -196,6 +196,7 @@ public bool Execute(CommandProcessorContext context, string[] args)
new TcpClientMessageDto.ClientEvent(
Guid.NewGuid().ToByteArray(),
"TakeSomeSpaceEvent",
+ false,
Encoding.UTF8.GetBytes("DATA" + dataSize.ToString(" 00000 ") + new string('*', dataSize)),
Encoding.UTF8.GetBytes("METADATA" + new string('$', 100)))
},
1  src/EventStore/EventStore.TestClient/Commands/WriteProcessor.cs
View
@@ -67,6 +67,7 @@ public bool Execute(CommandProcessorContext context, string[] args)
{
new TcpClientMessageDto.ClientEvent(Guid.NewGuid().ToByteArray(),
"TakeSomeSpaceEvent",
+ false,
Encoding.UTF8.GetBytes(data),
Encoding.UTF8.GetBytes(metadata ?? string.Empty))
},
2  src/EventStore/EventStore.TestClient/MessageUtil.cs
View
@@ -6,7 +6,7 @@ internal static class ClientEventUtil
{
public static TcpClientMessageDto.ClientEvent FromDataEvent(Core.Data.Event evnt)
{
- return new TcpClientMessageDto.ClientEvent(evnt.EventId.ToByteArray(), evnt.EventType, evnt.Data,
+ return new TcpClientMessageDto.ClientEvent(evnt.EventId.ToByteArray(), evnt.EventType, evnt.IsJson, evnt.Data,
evnt.Metadata);
}
}
6 src/EventStore/Protos/ClientAPI/ClientMessageDtos.proto
View
@@ -8,8 +8,9 @@ message EventLinkPair {
message ClientEvent {
required bytes event_id = 1;
optional string event_type = 2;
- required bytes data = 3;
- optional bytes metadata = 4;
+ required bool is_json = 3;
+ required bytes data = 4;
+ optional bytes metadata = 5;
}
message EventRecord {
@@ -32,6 +33,7 @@ message CreateStream {
required string event_stream_id = 1;
optional bytes metadata = 2;
required bool allow_forwarding = 3;
+ required bool is_json = 4;
}
message CreateStreamCompleted {
Please sign in to comment.
Something went wrong with that request. Please try again.