Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

EventStore source code

  • Loading branch information...
commit 1b4764bc13fa6cba6b7e2c22ded6a8c61c91bad8 0 parents
@ysw ysw authored
Showing with 25,453 additions and 0 deletions.
  1. +9 −0 LICENSE
  2. +11 −0 src/.gitignore
  3. +143 −0 src/EventStore/EventStore.BufferManagement.Tests/BufferManagerTests.cs
  4. +149 −0 src/EventStore/EventStore.BufferManagement.Tests/BufferPoolStreamTests.cs
  5. +555 −0 src/EventStore/EventStore.BufferManagement.Tests/BufferPoolTests.cs
  6. +73 −0 src/EventStore/EventStore.BufferManagement.Tests/EventStore.BufferManagement.Tests.csproj
  7. +66 −0 src/EventStore/EventStore.BufferManagement.Tests/Properties/AssemblyInfo.cs
  8. +267 −0 src/EventStore/EventStore.BufferManagement/BufferManager.cs
  9. +351 −0 src/EventStore/EventStore.BufferManagement/BufferPool.cs
  10. +94 −0 src/EventStore/EventStore.BufferManagement/BufferPoolStream.cs
  11. +73 −0 src/EventStore/EventStore.BufferManagement/EventStore.BufferManagement.csproj
  12. +67 −0 src/EventStore/EventStore.BufferManagement/Properties/AssemblyInfo.cs
  13. +39 −0 src/EventStore/EventStore.BufferManagement/UnableToAllocateBufferException.cs
  14. +39 −0 src/EventStore/EventStore.BufferManagement/UnableToCreateMemoryException.cs
  15. +115 −0 src/EventStore/EventStore.Client/Commands/CreateStreamProcessor.cs
  16. +115 −0 src/EventStore/EventStore.Client/Commands/DeleteProcessor.cs
  17. +180 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Coordinator.cs
  18. +197 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/DvuAdvancedProcessor.cs
  19. +34 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/IProducer.cs
  20. +108 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/VerificationEvent.cs
  21. +62 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Workers/BankAccountProducer.cs
  22. +54 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Workers/TestProducer.cs
  23. +132 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Workers/VerificationWorker.cs
  24. +280 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Workers/Worker.cs
  25. +51 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Workers/WorkerItem.cs
  26. +135 −0 src/EventStore/EventStore.Client/Commands/DvuAdvanced/Workers/WriteWorker.cs
  27. +299 −0 src/EventStore/EventStore.Client/Commands/DvuBasic/BankAccountBasicProducer.cs
  28. +77 −0 src/EventStore/EventStore.Client/Commands/DvuBasic/BankAccountEventFactory.cs
  29. +476 −0 src/EventStore/EventStore.Client/Commands/DvuBasic/DvuBasicProcessor.cs
  30. +39 −0 src/EventStore/EventStore.Client/Commands/DvuBasic/IBasicProducer.cs
  31. +110 −0 src/EventStore/EventStore.Client/Commands/DvuBasic/Status.cs
  32. +44 −0 src/EventStore/EventStore.Client/Commands/DvuBasic/StreamNamesGenerator.cs
  33. +46 −0 src/EventStore/EventStore.Client/Commands/ExitProcessor.cs
  34. +193 −0 src/EventStore/EventStore.Client/Commands/MultiWriteFloodWaiting.cs
  35. +125 −0 src/EventStore/EventStore.Client/Commands/MultiWriteProcessor.cs
  36. +162 −0 src/EventStore/EventStore.Client/Commands/PingFloodHttpProcessor.cs
  37. +153 −0 src/EventStore/EventStore.Client/Commands/PingFloodProcessor.cs
  38. +139 −0 src/EventStore/EventStore.Client/Commands/PingFloodWaitingProcessor.cs
  39. +210 −0 src/EventStore/EventStore.Client/Commands/PingHttpLongTermProcessor.cs
  40. +72 −0 src/EventStore/EventStore.Client/Commands/PingProcessor.cs
  41. +152 −0 src/EventStore/EventStore.Client/Commands/ReadFloodHttpProcessor.cs
  42. +173 −0 src/EventStore/EventStore.Client/Commands/ReadFloodProcessor.cs
  43. +77 −0 src/EventStore/EventStore.Client/Commands/ReadHttpProcessor.cs
  44. +123 −0 src/EventStore/EventStore.Client/Commands/ReadProcessor.cs
  45. +88 −0 src/EventStore/EventStore.Client/Commands/SubscribeToStreamProcessor.cs
  46. +206 −0 src/EventStore/EventStore.Client/Commands/TransactionWriteProcessor.cs
  47. +51 −0 src/EventStore/EventStore.Client/Commands/UsageProcessor.cs
  48. +183 −0 src/EventStore/EventStore.Client/Commands/WriteFloodHttpProcessor.cs
  49. +235 −0 src/EventStore/EventStore.Client/Commands/WriteFloodProcessor.cs
  50. +183 −0 src/EventStore/EventStore.Client/Commands/WriteFloodWaitingHttpProcessor.cs
  51. +184 −0 src/EventStore/EventStore.Client/Commands/WriteFloodWaitingProcessor.cs
  52. +98 −0 src/EventStore/EventStore.Client/Commands/WriteHttpProcessor.cs
  53. +260 −0 src/EventStore/EventStore.Client/Commands/WriteLongTermHttpProcessor.cs
  54. +255 −0 src/EventStore/EventStore.Client/Commands/WriteLongTermProcessor.cs
  55. +126 −0 src/EventStore/EventStore.Client/Commands/WriteProcessor.cs
  56. +167 −0 src/EventStore/EventStore.Client/EventStore.Client.csproj
  57. +63 −0 src/EventStore/EventStore.Client/Properties/AssemblyInfo.cs
  58. +142 −0 src/EventStore/EventStore.ClientAPI/Commands/CreateStreamCompletionWrapper.cs
  59. +131 −0 src/EventStore/EventStore.ClientAPI/Commands/DeleteTaskCompletionWrapper.cs
  60. +44 −0 src/EventStore/EventStore.ClientAPI/Commands/ITaskCompletionWrapper.cs
  61. +23 −0 src/EventStore/EventStore.ClientAPI/Commands/ProcessResultStatus.cs
  62. +107 −0 src/EventStore/EventStore.ClientAPI/Commands/ReadFromBeginningTaskCompletionWrapper.cs
  63. +130 −0 src/EventStore/EventStore.ClientAPI/Commands/WriteTaskCompletionWrapper.cs
  64. +439 −0 src/EventStore/EventStore.ClientAPI/Common/ConcurrentCollections/ConcurrentQueue.cs
  65. +439 −0 src/EventStore/EventStore.ClientAPI/Common/ConcurrentCollections/ConcurrentStack.cs
  66. +67 −0 src/EventStore/EventStore.ClientAPI/Common/Log/LogManager.cs
  67. +16 −0 src/EventStore/EventStore.ClientAPI/Common/Log/Logger.cs
  68. +117 −0 src/EventStore/EventStore.ClientAPI/Common/Utils/BytesFormatter.cs
  69. +82 −0 src/EventStore/EventStore.ClientAPI/Common/Utils/Ensure.cs
  70. +71 −0 src/EventStore/EventStore.ClientAPI/Configure.cs
  71. +58 −0 src/EventStore/EventStore.ClientAPI/Data/Event.cs
  72. +184 −0 src/EventStore/EventStore.ClientAPI/Data/EventRecord.cs
  73. +36 −0 src/EventStore/EventStore.ClientAPI/Data/ExpectedVersion.cs
  74. +140 −0 src/EventStore/EventStore.ClientAPI/EventStore.ClientAPI.csproj
  75. +116 −0 src/EventStore/EventStore.ClientAPI/EventStore.cs
  76. +461 −0 src/EventStore/EventStore.ClientAPI/EventStoreConnection.cs
  77. +52 −0 src/EventStore/EventStore.ClientAPI/EventStream.cs
  78. +40 −0 src/EventStore/EventStore.ClientAPI/Exceptions/ConnectionClosingException.cs
  79. +43 −0 src/EventStore/EventStore.ClientAPI/Exceptions/FailedConnectionException.cs
  80. +40 −0 src/EventStore/EventStore.ClientAPI/Exceptions/UnknownPackageReturned.cs
  81. +718 −0 src/EventStore/EventStore.ClientAPI/Messages/ClientMessageDto.cs
  82. +40 −0 src/EventStore/EventStore.ClientAPI/Messages/OperationErrorCode.cs
  83. +63 −0 src/EventStore/EventStore.ClientAPI/Properties/AssemblyInfo.cs
  84. +69 −0 src/EventStore/EventStore.ClientAPI/Results.cs
  85. +44 −0 src/EventStore/EventStore.ClientAPI/Services.Storage.ReadIndex/ReadResult.cs
  86. +79 −0 src/EventStore/EventStore.ClientAPI/Services.Transport.Tcp/TcpCommand.cs
  87. +78 −0 src/EventStore/EventStore.ClientAPI/Services.Transport.Tcp/TcpPackage.cs
  88. +122 −0 src/EventStore/EventStore.ClientAPI/TcpConnector.cs
  89. +152 −0 src/EventStore/EventStore.ClientAPI/TransactionLog.LogRecords/CommitLogRecord.cs
  90. +130 −0 src/EventStore/EventStore.ClientAPI/TransactionLog.LogRecords/LogRecord.cs
  91. +35 −0 src/EventStore/EventStore.ClientAPI/TransactionLog.LogRecords/LogRecordType.cs
  92. +252 −0 src/EventStore/EventStore.ClientAPI/TransactionLog.LogRecords/PrepareLogRecord.cs
  93. +56 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/IMonitoredTcpConnection.cs
  94. +45 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/ITcpConnection.cs
  95. +135 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/LengthPrefixFramer.cs
  96. +64 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/ProtobufExtensions.cs
  97. +86 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/SocketArgsPool.cs
  98. +155 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/TcpClientConnector.cs
  99. +44 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/TcpConfiguratin.cs
  100. +476 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/TcpConnection.cs
  101. +317 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/TcpConnectionBase.cs
  102. +324 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/TcpConnectionMonitor.cs
  103. +109 −0 src/EventStore/EventStore.ClientAPI/Transport.Tcp/TcpTypedConnection.cs
  104. +274 −0 src/EventStore/EventStore.ClientAPI/Usage.cs
  105. +61 −0 src/EventStore/EventStore.Common/CommandLine/EventStoreCmdLineOptionsBase.cs
  106. +2,131 −0 src/EventStore/EventStore.Common/CommandLine/lib/CommandLine.cs
  107. +1,374 −0 src/EventStore/EventStore.Common/CommandLine/lib/CommandLineText.cs
  108. +439 −0 src/EventStore/EventStore.Common/ConcurrentCollections/ConcurrentQueue.cs
  109. +439 −0 src/EventStore/EventStore.Common/ConcurrentCollections/ConcurrentStack.cs
  110. +35 −0 src/EventStore/EventStore.Common/Configuration/Constants.cs
  111. +87 −0 src/EventStore/EventStore.Common/EventStore.Common.csproj
  112. +40 −0 src/EventStore/EventStore.Common/Exceptions/ApplicationInitializationException.cs
  113. +58 −0 src/EventStore/EventStore.Common/Log/ILogger.cs
  114. +143 −0 src/EventStore/EventStore.Common/Log/LazyLogger.cs
  115. +115 −0 src/EventStore/EventStore.Common/Log/LogManager.cs
  116. +145 −0 src/EventStore/EventStore.Common/Log/NLogger.cs
  117. +63 −0 src/EventStore/EventStore.Common/Properties/AssemblyInfo.cs
  118. +43 −0 src/EventStore/EventStore.Common/Settings/ApplicationSettings.cs
  119. +57 −0 src/EventStore/EventStore.Common/Settings/VNodeSettings.cs
  120. +68 −0 src/EventStore/EventStore.Common/Utils/Application.cs
  121. +118 −0 src/EventStore/EventStore.Common/Utils/BytesFormatter.cs
  122. +82 −0 src/EventStore/EventStore.Common/Utils/Ensure.cs
  123. +52 −0 src/EventStore/EventStore.Common/Utils/EnumerableExtensions.cs
  124. +51 −0 src/EventStore/EventStore.Common/Utils/Helper.cs
  125. +51 −0 src/EventStore/EventStore.Common/Utils/IPEndpointExtensions.cs
  126. +53 −0 src/EventStore/EventStore.Common/Utils/OS.cs
  127. +39 −0 src/EventStore/EventStore.Common/Utils/Runtime.cs
  128. +49 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/BusTestBase.cs
  129. +140 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/BusTestHandlers.cs
  130. +48 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/BusTestMessages.cs
  131. +57 −0 src/EventStore/EventStore.Core.Tests/Bus/Helpers/TimeoutHelper.cs
  132. +96 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/Helpers/Consumers.cs
  133. +49 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/Helpers/Messages.cs
  134. +79 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/Helpers/QueuedHandlerTestBase.cs
  135. +49 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/queued_handler_should.cs
  136. +160 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/when_publishing.cs
  137. +100 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/when_publishing_before_starting.cs
  138. +68 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/when_starting.cs
  139. +111 −0 src/EventStore/EventStore.Core.Tests/Bus/QueuedHandler/when_stopping.cs
  140. +165 −0 src/EventStore/EventStore.Core.Tests/Bus/when_publishing.cs
  141. +193 −0 src/EventStore/EventStore.Core.Tests/Bus/when_subscribing.cs
  142. +198 −0 src/EventStore/EventStore.Core.Tests/Bus/when_unsubscribing.cs
  143. +71 −0 src/EventStore/EventStore.Core.Tests/Common/CollectionsExtensions.cs
  144. +49 −0 src/EventStore/EventStore.Core.Tests/Common/HelperExtensions.cs
  145. +162 −0 src/EventStore/EventStore.Core.Tests/DataStructures/pairing_heap_should.cs
  146. +242 −0 src/EventStore/EventStore.Core.Tests/EventStore.Core.Tests.csproj
  147. +188 −0 src/EventStore/EventStore.Core.Tests/EventStore.Core.Tests.csproj.orig
  148. +118 −0 src/EventStore/EventStore.Core.Tests/Fakes/FakeLogger.cs
  149. +48 −0 src/EventStore/EventStore.Core.Tests/Fakes/FakePublisher.cs
  150. +40 −0 src/EventStore/EventStore.Core.Tests/Fakes/NoopPublisher.cs
  151. +51 −0 src/EventStore/EventStore.Core.Tests/Index/FakeFilenameProvider.cs
  152. +68 −0 src/EventStore/EventStore.Core.Tests/Index/IndexEntryTests.cs
  153. +664 −0 src/EventStore/EventStore.Core.Tests/Index/MemTableTests.cs
  154. +67 −0 src/EventStore/EventStore.Core.Tests/Index/PTableReadScenario.cs
  155. +54 −0 src/EventStore/EventStore.Core.Tests/Index/ReverseComparerTests.cs
  156. +117 −0 ...EventStore.Core.Tests/Index/adding_four_items_to_empty_index_map_with_four_tables_per_level_causes_merge.cs
  157. +114 −0 ...tore.Core.Tests/Index/adding_four_items_to_empty_index_map_with_two_tables_per_level_causes_double_merge.cs
  158. +110 −0 src/EventStore/EventStore.Core.Tests/Index/adding_item_to_empty_index_map.cs
  159. +139 −0 ....Core.Tests/Index/adding_sixteen_items_to_empty_index_map_with_four_tables_per_level_causes_double_merge.cs
  160. +109 −0 ...e/EventStore.Core.Tests/Index/adding_two_items_to_empty_index_map_with_two_tables_per_level_causes_merge.cs
  161. +69 −0 src/EventStore/EventStore.Core.Tests/Index/create_index_map_from_non_existing_file.cs
  162. +69 −0 src/EventStore/EventStore.Core.Tests/Index/destroying_ptable.cs
  163. +163 −0 src/EventStore/EventStore.Core.Tests/Index/index_map_should.cs
  164. +239 −0 src/EventStore/EventStore.Core.Tests/Index/index_map_should_detect_corruption.cs
  165. +183 −0 src/EventStore/EventStore.Core.Tests/Index/ptable_range_query_tests.cs
Sorry, we could not display the entire diff because too many files (885) changed.
9 LICENSE
@@ -0,0 +1,9 @@
+Copyright (c) 2012, Event Store LLP
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
+Neither the name of Event Store LLP nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
11 src/.gitignore
@@ -0,0 +1,11 @@
+/Logs
+/Data
+/v8/
+ipch/
+*.o
+*.ii
+*.s
+*.sdf
+*.opensdf
+*.DotSettings.user
+*.DotSettings.personal
143 src/EventStore/EventStore.BufferManagement.Tests/BufferManagerTests.cs
@@ -0,0 +1,143 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+using NUnit.Framework;
+
+namespace EventStore.BufferManagement.Tests
+{
+ [TestFixture]
+ public class when_creating_a_buffer_manager
+ {
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_zero_chunk_size_causes_an_argumentexception()
+ {
+ BufferManager manager = new BufferManager(1024, 0, 1024);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_negative_chunk_size_causes_an_argumentexception()
+ {
+ BufferManager manager = new BufferManager(200, -1, 200);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_negative_chunks_per_segment_causes_an_argumentexception()
+ {
+ BufferManager manager = new BufferManager(-1, 1024, 8);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_zero_chunks_per_segment_causes_an_argumentexception()
+ {
+ BufferManager manager = new BufferManager(0, 1024, 8);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_negative_number_of_segments_causes_an_argumentexception()
+ {
+ BufferManager manager = new BufferManager(1024, 1024, -1);
+ }
+
+ [Test]
+ public void can_create_a_manager_with_zero_inital_segments()
+ {
+ BufferManager manager = new BufferManager(1024, 1024, 0);
+ }
+ }
+
+ [TestFixture]
+ public class when_checking_out_a_buffer
+ {
+ [Test]
+ public void should_return_a_valid_buffer_when_available()
+ {
+ BufferManager manager = new BufferManager(1, 1000, 1);
+ ArraySegment<byte> buffer = manager.CheckOut();
+ Assert.AreEqual(1000, buffer.Count);
+ }
+
+ [Test]
+ public void should_decrement_available_buffers()
+ {
+ BufferManager manager = new BufferManager(1, 1000, 1);
+ ArraySegment<byte> buffer = manager.CheckOut();
+ Assert.AreEqual(0, manager.AvailableBuffers);
+ }
+
+ [Test]
+ public void should_create_a_segment_if_none_are_availabke()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 0);
+ ArraySegment<byte> buffer = manager.CheckOut();
+ Assert.AreEqual(9, manager.AvailableBuffers);
+ }
+
+ [Test, ExpectedException(typeof(UnableToCreateMemoryException))]
+ public void should_throw_an_unabletocreatememoryexception_if_acquiring_memory_is_disabled_and_out_of_memory()
+ {
+ BufferManager manager = new BufferManager(1, 1000, 1, false);
+ ArraySegment<byte> buffer = manager.CheckOut();
+ //should be none left, boom
+ manager.CheckOut();
+ }
+ }
+
+ [TestFixture]
+ public class when_checking_in_a_buffer
+ {
+ [Test]
+ public void should_accept_a_checked_out_buffer()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 0);
+ manager.CheckIn(manager.CheckOut());
+ }
+
+ [Test]
+ public void should_increment_available_buffers()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 0);
+ manager.CheckIn(manager.CheckOut());
+ Assert.AreEqual(10, manager.AvailableBuffers);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentNullException))]
+ public void should_throw_argumentnullexception_if_null_buffer()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 0);
+ manager.CheckIn(null);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void should_throw_argumentexception_if_buffer_wrong_size()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 0);
+ byte[] data = new byte[10000];
+ manager.CheckIn(new ArraySegment<byte>(data));
+ }
+ }
+}
149 src/EventStore/EventStore.BufferManagement.Tests/BufferPoolStreamTests.cs
@@ -0,0 +1,149 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+using System.IO;
+using NUnit.Framework;
+
+namespace EventStore.BufferManagement.Tests
+{
+ public class has_buffer_pool_fixture : has_buffer_manager_fixture
+ {
+ protected BufferPool BufferPool;
+ [SetUp]
+ public override void Setup()
+ {
+ base.Setup();
+ BufferPool = new BufferPool(10, BufferManager);
+ }
+ }
+
+ [TestFixture]
+ public class when_insantiating_a_buffer_pool_stream : has_buffer_pool_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentNullException))]
+ public void a_null_buffer_pool_throws_an_argumentnullexception()
+ {
+ BufferPoolStream stream = new BufferPoolStream(null);
+ }
+
+ [Test]
+ public void the_internal_buffer_pool_is_set()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ Assert.AreEqual(BufferPool, stream.BufferPool);
+ }
+
+ }
+
+ [TestFixture]
+ public class when_reading_from_the_stream : has_buffer_pool_fixture
+ {
+ [Test]
+ public void position_is_incremented()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ stream.Seek(0, SeekOrigin.Begin);
+ Assert.AreEqual(0, stream.Position);
+ int read = stream.Read(new byte[50], 0, 50);
+ Assert.AreEqual(50, stream.Position);
+ }
+
+ [Test]
+ public void a_read_past_the_end_of_the_stream_returns_zero()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ stream.Position = 0;
+ int read = stream.Read(new byte[500], 0, 500);
+ Assert.AreEqual(500, read);
+ read = stream.Read(new byte[500], 0, 500);
+ Assert.AreEqual(0, read);
+ }
+ }
+
+ [TestFixture]
+ public class when_writing_to_the_stream : has_buffer_pool_fixture
+ {
+ [Test]
+ public void position_is_incremented()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ Assert.AreEqual(500, stream.Position);
+ }
+ }
+
+ [TestFixture]
+ public class when_seeking_in_the_stream : has_buffer_pool_fixture
+ {
+ [Test]
+ public void from_begin_sets_relative_to_beginning()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ stream.Seek(22, SeekOrigin.Begin);
+ Assert.AreEqual(22, stream.Position);
+ }
+
+ [Test]
+ public void from_end_sets_relative_to_end()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ stream.Seek(-100, SeekOrigin.End);
+ Assert.AreEqual(400, stream.Position);
+ }
+
+ [Test]
+ public void from_current_sets_relative_to_current()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ stream.Seek(-2, SeekOrigin.Current);
+ stream.Seek(1, SeekOrigin.Current);
+ Assert.AreEqual(499, stream.Position);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_negative_position_throws_an_argumentexception()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Seek(-1, SeekOrigin.Begin);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void seeking_past_end_of_stream_throws_an_argumentexception()
+ {
+ BufferPoolStream stream = new BufferPoolStream(BufferPool);
+ stream.Write(new byte[500], 0, 500);
+ stream.Seek(501, SeekOrigin.Begin);
+ }
+ }
+
+}
555 src/EventStore/EventStore.BufferManagement.Tests/BufferPoolTests.cs
@@ -0,0 +1,555 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+using System.Collections.Generic;
+using NUnit.Framework;
+
+namespace EventStore.BufferManagement.Tests
+{
+ public class has_buffer_manager_fixture
+ {
+ protected BufferManager BufferManager;
+
+ [SetUp]
+ public virtual void Setup()
+ {
+ BufferManager = new BufferManager(128, 1024, 1);
+ }
+ }
+
+ [TestFixture]
+ public class when_instantiating_a_bufferpool : has_buffer_manager_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_negative_initial_buffers_throws_an_argumentexception()
+ {
+ BufferPool pool = new BufferPool(-1, BufferManager);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentNullException))]
+ public void a_null_buffer_manager_throws_an_argumentnullexception()
+ {
+ BufferPool pool = new BufferPool(12, null);
+ }
+
+ [Test]
+ public void an_empty_buffer_has_a_length_of_zero()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ Assert.AreEqual(0, pool.Length);
+ }
+
+ [Test]
+ public void the_requested_buffers_should_be_removed_from_the_buffer_manager()
+ {
+ int InitialBuffers = BufferManager.AvailableBuffers;
+ BufferPool pool = new BufferPool(10, BufferManager);
+ Assert.AreEqual(InitialBuffers - 10, BufferManager.AvailableBuffers);
+ }
+ }
+
+ [TestFixture]
+ public class when_changing_data_in_a_bufferpool_via_indexer : has_buffer_manager_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void an_index_under_zero_throws_an_argument_exception()
+ {
+ BufferPool pool = new BufferPool(12, BufferManager);
+ pool[-1] = 4;
+ }
+
+ [Test]
+ public void data_that_has_been_set_can_read()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool[3] = 5;
+ Assert.AreEqual(5, pool[3]);
+ }
+
+ [Test]
+ public void length_is_updated_when_index_higher_than_count_set()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ Assert.AreEqual(0, pool.Length);
+ pool[3] = 5;
+ Assert.AreEqual(4, pool.Length);
+ }
+
+ [Test]
+ public void a_write_will_automatically_grow_the_buffer_pool()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ int initialCapacity = pool.Capacity;
+ pool[initialCapacity + 14] = 5;
+ Assert.AreEqual(initialCapacity * 2, pool.Capacity);
+ }
+
+ [Test]
+ public void a_write_past_end_will_check_out_a_buffer_from_the_buffer_pool()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ int initial = BufferManager.AvailableBuffers;
+ pool[pool.Capacity + 14] = 5;
+ Assert.AreEqual(initial - 1, BufferManager.AvailableBuffers);
+ }
+ }
+
+ [TestFixture]
+ public class when_converting_to_a_byte_array : has_buffer_manager_fixture
+ {
+ [Test]
+ public void the_byte_array_should_be_the_same_length_as_the_pool_with_data()
+ {
+ BufferPool pool = new BufferPool(5, BufferManager);
+ for (int i = 0; i < 500; i++)
+ {
+ pool[i] = 12;
+ }
+ Assert.AreEqual(500, pool.ToByteArray().Length);
+ }
+
+ [Test]
+ public void the_byte_array_should_have_the_same_data_as_the_pool_with_multiple_buffers()
+ {
+ BufferPool pool = new BufferPool(5, BufferManager);
+ for (int i = 0; i < 5000; i++)
+ {
+ pool[i] = (byte)(i % 255);
+ }
+ byte[] data = pool.ToByteArray();
+ for (int i = 0; i < 5000; i++)
+ {
+ Assert.AreEqual((byte)(i % 255), data[i]);
+ }
+ }
+
+ [Test]
+ public void the_byte_array_should_have_the_same_data_as_the_pool_with_a_single_buffer()
+ {
+ BufferPool pool = new BufferPool(5, BufferManager);
+ for (int i = 0; i < 5; i++)
+ {
+ pool[i] = (byte)(i % 255);
+ }
+ byte[] data = pool.ToByteArray();
+ for (int i = 0; i < 5; i++)
+ {
+ Assert.AreEqual((byte)(i % 255), data[i]);
+ }
+ }
+
+
+ [Test]
+ public void an_empty_pool_should_return_an_empty_array()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ byte[] arr = pool.ToByteArray();
+ Assert.AreEqual(0, arr.Length);
+ }
+ }
+
+ [TestFixture]
+ public class when_converting_to_an_effective_IEnumerable_of_arraysegments : has_buffer_manager_fixture
+ {
+ [Test]
+ public void empty_returns_no_results()
+ {
+ BufferPool pool = new BufferPool(10, BufferManager);
+ foreach (ArraySegment<byte> effectiveBuffer in pool.EffectiveBuffers)
+ {
+ Assert.Fail("should not have been buffers");
+ }
+ }
+
+ [Test]
+ public void a_single_partial_segment_can_be_returned()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ for (byte i = 0; i < 10; i++)
+ {
+ pool[i] = i;
+ }
+ List<ArraySegment<byte>> buffers = new List<ArraySegment<byte>>(pool.EffectiveBuffers);
+ Assert.IsTrue(buffers.Count == 1);
+ for (byte i = 0; i < 10; i++)
+ {
+ Assert.IsTrue(buffers[0].Array[buffers[0].Offset + i] == i);
+ }
+ }
+
+ [Test]
+ public void multiple_segments_can_be_returned()
+ {
+ BufferManager manager = new BufferManager(3, 1000, 1);
+ BufferPool pool = new BufferPool(10, manager);
+ for (int i = 0; i < 2500; i++)
+ {
+ pool[i] = (byte)(i % 255);
+ }
+ List<ArraySegment<byte>> buffers = new List<ArraySegment<byte>>(pool.EffectiveBuffers);
+ Assert.IsTrue(buffers.Count == 3);
+ Assert.IsTrue(buffers[0].Count == 1000);
+ Assert.IsTrue(buffers[1].Count == 1000);
+ Assert.IsTrue(buffers[2].Count == 500);
+ }
+ }
+
+ [TestFixture]
+ public class when_reading_data_in_a_bufferpool_via_indexer : has_buffer_manager_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void if_the_index_is_past_the_length_an_argumentoutofrangeexception_is_thrown()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ byte b = pool[3];
+ }
+ }
+
+
+ [TestFixture]
+ public class when_disposing_a_buffer_pool : has_buffer_manager_fixture
+ {
+ [Test]
+ public void buffers_are_released_back_to_the_buffer_pool()
+ {
+ int initial = BufferManager.AvailableBuffers;
+ using (BufferPool pool = new BufferPool(20, BufferManager))
+ {
+ //sanity check (make sure they are actually gone)
+ Assert.AreEqual(initial - 20, BufferManager.AvailableBuffers);
+ }
+ Assert.AreEqual(initial, BufferManager.AvailableBuffers);
+ }
+ }
+
+ [TestFixture]
+ public class when_reading_multiple_bytes : has_buffer_manager_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentNullException))]
+ public void a_null_read_buffer_throws_an_argumentnullexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.ReadFrom(0, null, 0, 0);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void an_offset_larger_than_the_buffer_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.ReadFrom(0, new byte[5], 8, 3);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_count_larger_than_the_buffer_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.ReadFrom(0, new byte[5], 3, 5);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_negative_count_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.ReadFrom(0, new byte[5], 3, -1);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_negative_offset_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.ReadFrom(0, new byte[5], -1, 1);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void count_and_offset_together_lerger_than_buffer_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.ReadFrom(0, new byte[5], 4, 2);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_count_and_offset_together_are_longer_than_pool_length_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool[0] = 12;
+ pool[1] = 13;
+ pool.ReadFrom(3, new byte[5], 0, 5);
+ }
+
+ [Test]
+ public void can_read_within_a_single_buffer_with_no_offset()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ for (int i = 0; i < 255; i++)
+ {
+ pool[i] = (byte)i;
+ }
+
+ byte[] buffer = new byte[255];
+ pool.ReadFrom(0, buffer, 0, 255);
+ for (int i = 0; i < 255; i++)
+ {
+ Assert.AreEqual((byte)i, buffer[i]);
+ }
+ }
+
+ [Test]
+ public void can_read_from_multiple_buffers()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ for (int i = 0; i < 5000; i++)
+ {
+ pool[i] = (byte)(i % 255);
+ }
+
+ byte[] buffer = new byte[5000];
+ pool.ReadFrom(0, buffer, 0, 5000);
+ for (int i = 0; i < 5000; i++)
+ {
+ Assert.AreEqual((byte)(i % 255), buffer[i]);
+ }
+ }
+
+ [Test]
+ public void can_read_using_an_offset()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ for (int i = 5; i < 260; i++)
+ {
+ pool[i] = (byte)(i - 5);
+ }
+
+ byte[] buffer = new byte[255];
+ pool.ReadFrom(5, buffer, 0, 255);
+ for (int i = 0; i < 255; i++)
+ {
+ Assert.AreEqual((byte)i, buffer[i]);
+ }
+ }
+ }
+
+ [TestFixture]
+ public class when_writing_multiple_bytes : has_buffer_manager_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentNullException))]
+ public void a_null_byte_array_throws_an_argumentnullexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.Append(null);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void an_offset_larger_than_the_buffer_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.Write(0, new byte[5], 8, 3);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_count_larger_than_the_buffer_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.Write(0, new byte[5], 3, 5);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_negative_count_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.Write(0, new byte[5], 3, -1);
+ }
+
+ [Test, ExpectedException(typeof(ArgumentOutOfRangeException))]
+ public void a_negative_offset_throws_an_argumentoutofrangeexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.Write(0, new byte[5], -1, 1);
+ }
+
+ [Test]
+ public void length_is_updated_to_include_bytes_written()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ byte[] data = { 1, 2, 3, 4, 5 };
+ pool.Append(data);
+ Assert.IsTrue(pool.Length == 5);
+ }
+
+ [Test]
+ public void data_is_written_to_the_internal_buffer()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ byte[] data = { 1, 2, 3, 4, 5 };
+ pool.Append(data);
+ for (byte i = 0; i < 5; i++)
+ {
+ Assert.AreEqual(i + 1, pool[i]);
+ }
+ }
+
+ [Test]
+ public void pool_can_expand_capacity()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ int initialCapacity = pool.Capacity;
+ byte[] data = new byte[initialCapacity + 25];
+ pool.Append(data);
+ Assert.AreEqual(initialCapacity * 2, pool.Capacity);
+ }
+
+ [Test]
+ public void can_write_given_a_self_offset()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ byte[] data = { 1, 2, 3, 4, 5 };
+ pool.Write(4, data, 0, 5); //start at position 4
+ for (byte i = 4; i < 9; i++)
+ {
+ Assert.AreEqual(i - 3, pool[i]);
+ }
+ }
+
+ [Test]
+ public void can_write_given_a_source_offset()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ byte[] data = { 1, 2, 3, 4, 5 };
+ pool.Write(0, data, 3, 2);
+ Assert.AreEqual(pool[0], 4);
+ Assert.AreEqual(pool[1], 5);
+ }
+ }
+
+ [TestFixture]
+ public class when_setting_the_length_of_the_pool : has_buffer_manager_fixture
+ {
+ [Test, ExpectedException(typeof(ArgumentException))]
+ public void a_negative_length_throws_an_argumentexception()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.SetLength(-1, false);
+ }
+
+ [Test]
+ public void a_larger_length_makes_capacity_larger()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 1);
+ BufferPool pool = new BufferPool(1, manager);
+ pool.SetLength(5000);
+ Assert.AreNotEqual(5000, pool.Capacity);
+ }
+
+ [Test]
+ public void length_is_set_when_setting_length()
+ {
+ BufferPool pool = new BufferPool(1, BufferManager);
+ pool.SetLength(5000, false);
+ Assert.AreEqual(5000, pool.Length);
+ }
+
+ [Test]
+ public void a_smaller_length_lowers_capacity()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 1);
+ BufferPool pool = new BufferPool(5, manager);
+ pool.SetLength(1);
+ Assert.AreEqual(9, manager.AvailableBuffers);
+ }
+
+ [Test]
+ public void a_smaller_length_checks_buffers_back_in_when_allowed()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 1);
+ BufferPool pool = new BufferPool(5, manager);
+ pool.SetLength(1, true);
+ Assert.AreEqual(9, manager.AvailableBuffers);
+ }
+
+ [Test]
+ public void a_smaller_length_checks_buffers_back_in_when_not_allowed()
+ {
+ BufferManager manager = new BufferManager(10, 1000, 1);
+ BufferPool pool = new BufferPool(5, manager);
+ pool.SetLength(1, false);
+ Assert.AreEqual(5, manager.AvailableBuffers);
+ }
+ }
+
+ [TestFixture]
+ public class when_a_buffer_pool_has_been_disposed : has_buffer_manager_fixture
+ {
+ private BufferPool m_DisposedPool;
+
+ public override void Setup()
+ {
+ base.Setup();
+ m_DisposedPool = new BufferPool(10, BufferManager);
+ m_DisposedPool.Dispose();
+ }
+
+ [Test, ExpectedException(typeof(ObjectDisposedException))]
+ public void reading_indexer_throws_objectdisposedexception()
+ {
+ byte b = m_DisposedPool[0];
+ }
+
+ [Test, ExpectedException(typeof(ObjectDisposedException))]
+ public void writing_indexer_throws_objectdisposedexception()
+ {
+ m_DisposedPool[0] = 5;
+ }
+
+ [Test, ExpectedException(typeof(ObjectDisposedException))]
+ public void writing_multiple_bytes_throws_objectdisposedexception()
+ {
+ m_DisposedPool.Append(new byte[] { 1, 2, 3, 4 });
+ }
+
+
+ [Test, ExpectedException(typeof(ObjectDisposedException))]
+ public void effective_enumerator_throws_objectdisposedexception()
+ {
+ foreach (ArraySegment<byte> segment in m_DisposedPool.EffectiveBuffers)
+ {
+ }
+ }
+
+ [Test, ExpectedException(typeof(ObjectDisposedException))]
+ public void setting_length_throws_objectdisposedexception()
+ {
+ m_DisposedPool.SetLength(200);
+ }
+
+ [Test, ExpectedException(typeof(ObjectDisposedException))]
+ public void converting_to_a_byte_array_throws_objectdisposedexception()
+ {
+ m_DisposedPool.ToByteArray();
+ }
+
+ }
+}
73 src/EventStore/EventStore.BufferManagement.Tests/EventStore.BufferManagement.Tests.csproj
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{05598608-CF42-49C1-A220-B1B7D1EBD83E}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>EventStore.BufferManagement.Tests</RootNamespace>
+ <AssemblyName>EventStore.BufferManagement.Tests</AssemblyName>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\..\..\bin\eventstore.tests\debug\anycpu\</OutputPath>
+ <DefineConstants>TRACE;DEBUG</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>..\..\..\bin\eventstore.tests\release\anycpu\</OutputPath>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x86'">
+ <PlatformTarget>x86</PlatformTarget>
+ <OutputPath>..\..\..\bin\eventstore.tests\debug\x86\</OutputPath>
+ <DefineConstants>TRACE;DEBUG</DefineConstants>
+ <DebugType>none</DebugType>
+ <WarningLevel>4</WarningLevel>
+ <Optimize>false</Optimize>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x86'">
+ <PlatformTarget>x86</PlatformTarget>
+ <OutputPath>..\..\..\bin\eventstore.tests\release\x86\</OutputPath>
+ <Optimize>true</Optimize>
+ <DebugType>none</DebugType>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ <Reference Include="nunit.framework, Version=2.6.0.12051, Culture=neutral, PublicKeyToken=96d09a1eb7f44a77">
+ <HintPath>..\libs\nunit.framework.dll</HintPath>
+ </Reference>
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BufferManagerTests.cs" />
+ <Compile Include="BufferPoolStreamTests.cs" />
+ <Compile Include="BufferPoolTests.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\EventStore.BufferManagement\EventStore.BufferManagement.csproj">
+ <Project>{a794d3fb-06ac-471f-ab8d-6e98cbfa0021}</Project>
+ <Name>EventStore.BufferManagement</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
66 src/EventStore/EventStore.BufferManagement.Tests/Properties/AssemblyInfo.cs
@@ -0,0 +1,66 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+using NUnit.Framework;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("EventStore.BufferManagement.Tests")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Eleks")]
+[assembly: AssemblyProduct("EventStore.BufferManagement.Tests")]
+[assembly: AssemblyCopyright("Copyright © Eleks 2012")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("255a5bd0-33d8-4734-8e12-ef3442bb3615")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
+
+[assembly: Category("All")]
267 src/EventStore/EventStore.BufferManagement/BufferManager.cs
@@ -0,0 +1,267 @@
+using System;
+using System.Collections.Generic;
+using EventStore.Common.Log;
+
+namespace EventStore.BufferManagement
+{
+ /// <summary>
+ /// A manager to handle buffers for the socket connections
+ /// </summary>
+ /// <remarks>
+ /// When used in an async call a buffer is pinned. Large numbers of pinned buffers
+ /// cause problem with the GC (in particular it causes heap fragmentation).
+ /// This class maintains a set of large segments and gives clients pieces of these
+ /// segments that they can use for their buffers. The alternative to this would be to
+ /// create many small arrays which it then maintained. This methodology should be slightly
+ /// better than the many small array methodology because in creating only a few very
+ /// large objects it will force these objects to be placed on the LOH. Since the
+ /// objects are on the LOH they are at this time not subject to compacting which would
+ /// require an update of all GC roots as would be the case with lots of smaller arrays
+ /// that were in the normal heap.
+ /// </remarks>
+ public class BufferManager
+ {
+ private const int TrialsCount = 100;
+
+ private static readonly ILogger Log = LogManager.GetLoggerFor<BufferManager>();
+ private static BufferManager _defaultBufferManager = null;
+
+ private readonly int _segmentChunks;
+ private readonly int _chunkSize;
+ private readonly int _segmentSize;
+ private readonly bool _allowedToCreateMemory;
+
+#if __MonoCS__
+ private readonly Common.ConcurrentCollections.ConcurrentStack<ArraySegment<byte>> _buffers = new Common.ConcurrentCollections.ConcurrentStack<ArraySegment<byte>>();
+#else
+ private readonly System.Collections.Concurrent.ConcurrentStack<ArraySegment<byte>> _buffers = new System.Collections.Concurrent.ConcurrentStack<ArraySegment<byte>>();
+#endif
+
+ private readonly List<byte[]> _segments;
+ private readonly object _creatingNewSegmentLock = new object();
+
+ /// <summary>
+ /// Gets the default buffer manager
+ /// </summary>
+ /// <remarks>You should only be using this method if you don't want to manage buffers on your own.</remarks>
+ /// <value>The default buffer manager.</value>
+ public static BufferManager Default
+ {
+ get
+ {
+ //default to 1024 1kb buffers if people don't want to manage it on their own;
+ if (_defaultBufferManager == null)
+ _defaultBufferManager = new BufferManager(1024, 1024, 1);
+ return _defaultBufferManager;
+ }
+ }
+
+ /// <summary>
+ /// Sets the default buffer manager.
+ /// </summary>
+ /// <param name="manager">The new default buffer manager.</param>
+ public static void SetDefaultBufferManager(BufferManager manager)
+ {
+ if (manager == null)
+ throw new ArgumentNullException("manager");
+ _defaultBufferManager = manager;
+ }
+
+ public int SegmentsCount
+ {
+ get { return _segments.Count; }
+ }
+
+ public int SegmentChunksCount
+ {
+ get { return _segmentChunks; }
+ }
+
+ /// <summary>
+ /// The current number of buffers available
+ /// </summary>
+ public int AvailableBuffers
+ {
+ get { return _buffers.Count; } //do we really care about volatility here?
+ }
+
+ /// <summary>
+ /// The total size of all buffers
+ /// </summary>
+ public int TotalBufferSize
+ {
+ get { return _segments.Count * _segmentSize; } //do we really care about volatility here?
+ }
+
+ /// <summary>
+ /// Constructs a new <see cref="BufferManager"></see> object
+ /// </summary>
+ /// <param name="segmentChunks">The number of chunks to create per segment</param>
+ /// <param name="chunkSize">The size of a chunk in bytes</param>
+ public BufferManager(int segmentChunks, int chunkSize)
+ : this(segmentChunks, chunkSize, 1) { }
+
+ /// <summary>
+ /// Constructs a new <see cref="BufferManager"></see> object
+ /// </summary>
+ /// <param name="segmentChunks">The number of chunks to create per segment</param>
+ /// <param name="chunkSize">The size of a chunk in bytes</param>
+ /// <param name="initialSegments">The initial number of segments to create</param>
+ public BufferManager(int segmentChunks, int chunkSize, int initialSegments)
+ : this(segmentChunks, chunkSize, initialSegments, true) { }
+
+ /// <summary>
+ /// Constructs a new <see cref="BufferManager"></see> object
+ /// </summary>
+ /// <param name="segmentChunks">The number of chunks to create per segment</param>
+ /// <param name="chunkSize">The size of a chunk in bytes</param>
+ /// <param name="initialSegments">The initial number of segments to create</param>
+ /// <param name="allowedToCreateMemory">If false when empty and checkout is called an exception will be thrown</param>
+ public BufferManager(int segmentChunks, int chunkSize, int initialSegments, bool allowedToCreateMemory)
+ {
+ if (segmentChunks <= 0)
+ throw new ArgumentException("segmentChunks");
+ if (chunkSize <= 0)
+ throw new ArgumentException("chunkSize");
+ if (initialSegments < 0)
+ throw new ArgumentException("initialSegments");
+
+ _segmentChunks = segmentChunks;
+ _chunkSize = chunkSize;
+ _segmentSize = _segmentChunks * _chunkSize;
+
+ _segments = new List<byte[]>();
+
+ _allowedToCreateMemory = true;
+ for (int i = 0; i < initialSegments; i++)
+ {
+ CreateNewSegment(true);
+ }
+ _allowedToCreateMemory = allowedToCreateMemory;
+ }
+
+ /// <summary>
+ /// Creates a new segment, makes buffers available
+ /// </summary>
+ private void CreateNewSegment(bool forceCreation)
+ {
+ if (!_allowedToCreateMemory)
+ throw new UnableToCreateMemoryException();
+ lock (_creatingNewSegmentLock)
+ {
+ if (!forceCreation && _buffers.Count > _segmentChunks / 2)
+ return;
+
+ var bytes = new byte[_segmentSize];
+ _segments.Add(bytes);
+ for (int i = 0; i < _segmentChunks; i++)
+ {
+ var chunk = new ArraySegment<byte>(bytes, i * _chunkSize, _chunkSize);
+ _buffers.Push(chunk);
+ }
+
+ Log.Info("Segments count: {0}, buffers count: {1}, should be when full: {2}",
+ _segments.Count,
+ _buffers.Count,
+ _segments.Count * _segmentChunks);
+ }
+ }
+
+ /// <summary>
+ /// Checks out a buffer from the manager
+ /// </summary>
+ /// <remarks>
+ /// It is the client's responsibility to return the buffer to the manager by
+ /// calling <see cref="CheckIn"></see> on the buffer
+ /// </remarks>
+ /// <returns>A <see cref="ArraySegment{T}"></see> that can be used as a buffer</returns>
+ public ArraySegment<byte> CheckOut()
+ {
+ int trial = 0;
+ while (trial < TrialsCount)
+ {
+ ArraySegment<byte> result;
+ if (_buffers.TryPop(out result))
+ return result;
+ CreateNewSegment(false);
+ trial++;
+ }
+ throw new UnableToAllocateBufferException();
+ }
+
+ /// <summary>
+ /// Checks out a buffer from the manager
+ /// </summary>
+ /// <remarks>
+ /// It is the client's responsibility to return the buffer to the manger by
+ /// calling <see cref="CheckIn"></see> on the buffer
+ /// </remarks>
+ /// <returns>A <see cref="ArraySegment{T}"></see> that can be used as a buffer</returns>
+ public IEnumerable<ArraySegment<byte>> CheckOut(int toGet)
+ {
+ var result = new ArraySegment<byte>[toGet];
+ var count = 0;
+ var totalReceived = 0;
+
+ while (count < TrialsCount)
+ {
+ ArraySegment<byte> piece;
+ while (totalReceived < toGet)
+ {
+ if (!_buffers.TryPop(out piece))
+ break;
+ result[totalReceived] = piece;
+ ++totalReceived;
+ }
+ if (totalReceived == toGet)
+ return result;
+ CreateNewSegment(false);
+ count++;
+ }
+ throw new UnableToAllocateBufferException();
+ }
+
+ /// <summary>
+ /// Returns a buffer to the control of the manager
+ /// </summary>
+ /// <remarks>
+ /// It is the client's responsibility to return the buffer to the manger by
+ /// calling <see cref="CheckIn"></see> on the buffer
+ /// </remarks>
+ /// <param name="buffer">The <see cref="ArraySegment{T}"></see> to return to the cache</param>
+ public void CheckIn(ArraySegment<byte> buffer)
+ {
+ CheckBuffer(buffer);
+ _buffers.Push(buffer);
+ }
+
+ /// <summary>
+ /// Returns a set of buffers to the control of the manager
+ /// </summary>
+ /// <remarks>
+ /// It is the client's responsibility to return the buffer to the manger by
+ /// calling <see cref="CheckIn"></see> on the buffer
+ /// </remarks>
+ /// <param name="buffersToReturn">The <see cref="ArraySegment{T}"></see> to return to the cache</param>
+ public void CheckIn(IEnumerable<ArraySegment<byte>> buffersToReturn)
+ {
+ if (buffersToReturn == null)
+ throw new ArgumentNullException("buffersToReturn");
+
+ foreach (var buf in buffersToReturn)
+ {
+ CheckBuffer(buf);
+ _buffers.Push(buf);
+ }
+ }
+
+ //[Conditional("DEBUG")]
+ private void CheckBuffer(ArraySegment<byte> buffer)
+ {
+ if (buffer.Array == null || buffer.Count == 0 || buffer.Array.Length < buffer.Offset + buffer.Count)
+ throw new Exception("Attempt to checking invalid buffer");
+ if (buffer.Count != _chunkSize)
+ throw new ArgumentException("Buffer was not of the same chunk size as the buffer manager", "buffer");
+ }
+ }
+}
351 src/EventStore/EventStore.BufferManagement/BufferPool.cs
@@ -0,0 +1,351 @@
+using System;
+using System.Collections.Generic;
+
+namespace EventStore.BufferManagement
+{
+ public class BufferPool : IDisposable
+ {
+ private List<ArraySegment<byte>> _buffers;
+ private readonly BufferManager _bufferManager;
+ private readonly int _chunkSize;
+ private int _length;
+ private bool _disposed;
+
+ /// <summary>
+ /// Structure to represent an index and an offset into the list of buffers allowing for quick access to a position
+ /// </summary>
+ private struct Position
+ {
+ public readonly int Index;
+ public readonly int Offset;
+
+ public Position(int index, int offset)
+ {
+ Index = index;
+ Offset = offset;
+ }
+ }
+
+ /// <summary>
+ /// Gets the capacity of the <see cref="BufferPool"></see>
+ /// </summary>
+ /// <value>The capacity.</value>
+ public int Capacity
+ {
+ get
+ {
+ CheckDisposed();
+ return _chunkSize * _buffers.Count;
+ }
+ }
+
+ /// <summary>
+ /// Gets the current length of the <see cref="BufferPool"></see>
+ /// </summary>
+ /// <value>The length.</value>
+ public int Length { get { return _length; } }
+
+ /// <summary>
+ /// Gets the effective buffers contained in this <see cref="BufferPool"></see>
+ /// </summary>
+ /// <value>The effective buffers.</value>
+ public IEnumerable<ArraySegment<byte>> EffectiveBuffers
+ {
+ get
+ {
+ CheckDisposed();
+ if (_length > 0)
+ {
+ Position l = GetPositionFor(_length);
+ //send full buffers
+ for (int i = 0; i < l.Index; i++)
+ {
+ yield return _buffers[i];
+ }
+ //send partial buffer
+ ArraySegment<byte> last = _buffers[l.Index];
+ yield return new ArraySegment<byte>(last.Array, last.Offset, l.Offset);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Gets or sets the <see cref="System.Byte"/> with the specified index.
+ /// </summary>
+ public byte this[int index]
+ {
+ get
+ {
+ CheckDisposed();
+ if (index < 0 || index > _length)
+ throw new ArgumentOutOfRangeException("index");
+ Position l = GetPositionFor(index);
+ ArraySegment<byte> buffer = _buffers[l.Index];
+ return buffer.Array[buffer.Offset + l.Offset];
+ }
+ set
+ {
+ CheckDisposed();
+ if (index < 0)
+ throw new ArgumentException("_Index");
+ Position l = GetPositionFor(index);
+ EnsureCapacity(l);
+ ArraySegment<byte> buffer = _buffers[l.Index];
+ buffer.Array[buffer.Offset + l.Offset] = value;
+ if (_length <= index)
+ _length = index + 1;
+ }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BufferPool"/> class.
+ /// </summary>
+ public BufferPool() : this(1, BufferManager.Default) { }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BufferPool"/> class.
+ /// </summary>
+ /// <param name="bufferManager">The buffer manager.</param>
+ public BufferPool(BufferManager bufferManager) : this(1, bufferManager) { }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BufferPool"/> class.
+ /// </summary>
+ /// <param name="initialBufferCount">The number of initial buffers.</param>
+ /// <param name="bufferManager">The buffer manager.</param>
+ public BufferPool(int initialBufferCount, BufferManager bufferManager)
+ {
+ if (initialBufferCount <= 0)
+ throw new ArgumentException("initialBufferCount");
+ if (bufferManager == null)
+ throw new ArgumentNullException("bufferManager");
+ _length = 0;
+ _buffers = new List<ArraySegment<byte>>(bufferManager.CheckOut(initialBufferCount));
+ // must have 1 buffer
+ _chunkSize = _buffers[0].Count;
+ _bufferManager = bufferManager;
+ _disposed = false;
+ }
+
+ /// <summary>
+ /// Appends the specified data.
+ /// </summary>
+ /// <param name="data">The data to write.</param>
+ public void Append(byte [] data)
+ {
+ if (data == null)
+ throw new ArgumentNullException("data");
+ Write(_length, data, 0, data.Length);
+ }
+
+ /// <summary>
+ /// Appends the specified data.
+ /// </summary>
+ /// <param name="data">The data.</param>
+ /// <param name="offset">The offset.</param>
+ /// <param name="count">The count.</param>
+ public void Append(byte [] data, int offset, int count)
+ {
+ Write(_length, data, offset, count);
+ }
+
+ /// <summary>
+ /// Writes data to the specified position.
+ /// </summary>
+ /// <param name="position">The position to write at.</param>
+ /// <param name="data">The data.</param>
+ /// <param name="offset">The offset.</param>
+ /// <param name="count">The count.</param>
+ public void Write(int position, byte[] data, int offset, int count)
+ {
+ if (data == null)
+ throw new ArgumentNullException("data");
+ if (offset < 0 || offset > data.Length)
+ throw new ArgumentOutOfRangeException("offset");
+ if (count < 0 || count + offset > data.Length)
+ throw new ArgumentOutOfRangeException("count");
+ Write(position, new ArraySegment<byte>(data, offset, count));
+ }
+
+ /// <summary>
+ /// Writes data to the specified position.
+ /// </summary>
+ /// <param name="position">The position to write at.</param>
+ /// <param name="data">The data.</param>
+ public void Write(int position, ArraySegment<byte> data)
+ {
+ CheckDisposed();
+ int written = 0;
+ int tmpLength = position;
+ do
+ {
+ Position loc = GetPositionFor(tmpLength);
+ EnsureCapacity(loc);
+ ArraySegment<byte> current = _buffers[loc.Index];
+ int canWrite = data.Count - written;
+ int available = current.Count - loc.Offset;
+ canWrite = canWrite > available ? available : canWrite;
+ if (canWrite > 0)
+ Buffer.BlockCopy(data.Array, written + data.Offset, current.Array, current.Offset + loc.Offset, canWrite);
+ written += canWrite;
+ tmpLength += canWrite;
+ } while (written < data.Count);
+
+ _length = tmpLength > _length ? tmpLength : _length;
+ }
+
+ /// <summary>
+ /// Reads data from a given position
+ /// </summary>
+ /// <param name="position">The position to read from.</param>
+ /// <param name="data">Where to read the data to.</param>
+ /// <param name="offset">The offset to start reading into.</param>
+ /// <param name="count">The number of bytes to read.</param>
+ /// <returns></returns>
+ public int ReadFrom(int position, byte[] data, int offset, int count)
+ {
+ if (data == null)
+ throw new ArgumentNullException("data");
+ if (offset < 0 || offset > data.Length)
+ throw new ArgumentOutOfRangeException("offset");
+ if (count < 0 || count + offset > data.Length)
+ throw new ArgumentOutOfRangeException("count");
+ return ReadFrom(position, new ArraySegment<byte>(data, offset, count));
+ }
+
+ /// <summary>
+ /// Reads data from a given position.
+ /// </summary>
+ /// <param name="position">The position to read from.</param>
+ /// <param name="data">Where to read the data to.</param>
+ /// <returns></returns>
+ public int ReadFrom(int position, ArraySegment<byte> data)
+ {
+ CheckDisposed();
+ if (position + data.Count > _length)
+ throw new ArgumentOutOfRangeException("data");
+ int copied = 0;
+ int currentLocation = position;
+ do
+ {
+ Position l = GetPositionFor(currentLocation);
+ ArraySegment<byte> current = _buffers[l.Index];
+ int bytesToRead = _chunkSize - l.Offset;
+ bytesToRead = bytesToRead > data.Count - copied ? data.Count - copied : bytesToRead;
+ if (bytesToRead > 0)
+ Buffer.BlockCopy(current.Array, current.Offset + l.Offset, data.Array, data.Offset + copied, bytesToRead);
+ copied += bytesToRead;
+ currentLocation += bytesToRead;
+ } while (copied < data.Count);
+ return copied;
+ }
+
+ /// <summary>
+ /// Sets the length of the <see cref="BufferPool"></see>
+ /// </summary>
+ /// <param name="newLength">The new length.</param>
+ public void SetLength(int newLength)
+ {
+ SetLength(newLength, true);
+ }
+
+ /// <summary>
+ /// Sets the length of the <see cref="BufferPool"></see>
+ /// </summary>
+ /// <param name="newLength">The new length</param>
+ /// <param name="releaseMemory">if set to <c>true</c> any memory no longer used will be released.</param>
+ public void SetLength(int newLength, bool releaseMemory)
+ {
+ CheckDisposed();
+ if (newLength < 0) throw new ArgumentException("newLength must be greater than 0");
+ int oldCapacity = Capacity;
+ _length = newLength;
+
+ if (_length < oldCapacity && releaseMemory)
+ RemoveCapacity(GetPositionFor(_length));
+ else if (_length > oldCapacity)
+ EnsureCapacity(GetPositionFor(_length));
+ }
+
+ private void RemoveCapacity(Position position)
+ {
+ while (_buffers.Count > position.Index + 1)
+ {
+ _bufferManager.CheckIn(_buffers[_buffers.Count - 1]);
+ _buffers.RemoveAt(_buffers.Count - 1);
+ }
+ }
+
+ private void EnsureCapacity(Position position)
+ {
+ if (position.Index >= _buffers.Count)
+ {
+ foreach (ArraySegment<byte> buffer in _bufferManager.CheckOut(position.Index + 1 - _buffers.Count))
+ {
+ if (buffer.Count != _chunkSize)
+ throw new Exception("Received a buffer of the wrong size: this shouldn't happen, ever.");
+ _buffers.Add(buffer);
+ }
+ }
+ }
+
+ /// <summary>
+ /// Converts this <see cref="BufferPool"></see> to a byte array.
+ /// </summary>
+ /// <returns></returns>
+ public byte[] ToByteArray()
+ {
+ CheckDisposed();
+ Position l = GetPositionFor(_length);
+ var result = new byte[_length];
+ for (int i=0; i<l.Index; i++)
+ {
+ ArraySegment<byte> current = _buffers[i];
+ //copy full buffers
+ Buffer.BlockCopy(current.Array, current.Offset, result, i * _chunkSize, _chunkSize);
+ }
+ //copy last partial buffer
+ if (l.Index < _buffers.Count)
+ {
+ ArraySegment<byte> last = _buffers[l.Index];
+ Buffer.BlockCopy(last.Array, last.Offset, result, l.Index*_chunkSize, l.Offset);
+ }
+ return result;
+ }
+
+ private Position GetPositionFor(int index)
+ {
+ //we could do this much faster if we restricted buffer sizes to powers of 2
+ var l = new Position(index / _chunkSize, index % _chunkSize);
+ return l;
+ }
+
+ /// <summary>
+ /// Returns any memory used buy this <see cref="BufferPool"></see> to the <see cref="BufferManager"></see>
+ /// </summary>
+ public void Dispose()
+ {
+ DisposeInternal();
+ GC.SuppressFinalize(this);
+ }
+
+ ~BufferPool()
+ {
+ DisposeInternal();
+ }
+
+ protected virtual void DisposeInternal()
+ {
+ if (_buffers != null)
+ _bufferManager.CheckIn(_buffers);
+ _disposed = true;
+ _buffers = null;
+ }
+
+ private void CheckDisposed()
+ {
+ if (_disposed)
+ throw new ObjectDisposedException("Object has been disposed.");
+ }
+ }
+}
94 src/EventStore/EventStore.BufferManagement/BufferPoolStream.cs
@@ -0,0 +1,94 @@
+using System;
+using System.IO;
+
+namespace EventStore.BufferManagement
+{
+ public class BufferPoolStream : Stream
+ {
+ private readonly BufferPool _bufferPool;
+ private long _position;
+
+ public BufferPool BufferPool { get { return _bufferPool; } }
+
+ public override bool CanRead { get { return true; } }
+ public override bool CanSeek { get { return true; } }
+ public override bool CanWrite { get { return true; } }
+ public override long Length { get { return _bufferPool.Length; } }
+ public int Capacity { get { return _bufferPool.Capacity; } }
+
+ public override long Position
+ {
+ get { return _position; }
+ set
+ {
+ if (value < 0 || value > _bufferPool.Length)
+ throw new ArgumentOutOfRangeException("value");
+ _position = value;
+ }
+ }
+
+ /// <summary>
+ /// Initializes a new instance of the <see cref="BufferPoolStream"/> class.
+ /// </summary>
+ /// <param name="bufferPool">The buffer pool used as underlying storage.</param>
+ public BufferPoolStream(BufferPool bufferPool)
+ {
+ if (bufferPool == null)
+ throw new ArgumentNullException("bufferPool");
+ _bufferPool = bufferPool;
+ }
+
+ public override void Flush()
+ {
+ //noop
+ }
+
+ public override long Seek(long offset, SeekOrigin origin)
+ {
+ switch(origin)
+ {
+ case SeekOrigin.Begin:
+ Position = offset;
+ break;
+ case SeekOrigin.End:
+ Position = _bufferPool.Length + offset;
+ break;
+ case SeekOrigin.Current:
+ Position = _position + offset;
+ break;
+ default:
+ throw new Exception("Unknown SeekOrigin: " + origin.ToString());
+ }
+ return Position;
+ }
+
+ public override void SetLength(long value)
+ {
+ _bufferPool.SetLength((int) value);
+ if (_position > value)
+ _position = value;
+ }
+
+ public override int Read(byte[] buffer, int offset, int count)
+ {
+ if (_position >= _bufferPool.Length)
+ return 0;
+ int ret = _bufferPool.ReadFrom((int) _position, buffer, offset, count);
+ _position += ret;
+ return ret;
+ }
+
+ public override void Write(byte[] buffer, int offset, int count)
+ {
+ _bufferPool.Write((int) _position, buffer, offset, count);
+ _position += count;
+ }
+
+ protected override void Dispose(bool disposing)
+ {
+ if (disposing)
+ _bufferPool.Dispose();
+ base.Dispose(disposing);
+ }
+ }
+}
73 src/EventStore/EventStore.BufferManagement/EventStore.BufferManagement.csproj
@@ -0,0 +1,73 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+ <PropertyGroup>
+ <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+ <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+ <ProductVersion>8.0.30703</ProductVersion>
+ <SchemaVersion>2.0</SchemaVersion>
+ <ProjectGuid>{A794D3FB-06AC-471F-AB8D-6E98CBFA0021}</ProjectGuid>
+ <OutputType>Library</OutputType>
+ <AppDesignerFolder>Properties</AppDesignerFolder>
+ <RootNamespace>EventStore.BufferManagement</RootNamespace>
+ <AssemblyName>EventStore.BufferManagement</AssemblyName>
+ <FileAlignment>512</FileAlignment>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+ <DebugSymbols>true</DebugSymbols>
+ <DebugType>full</DebugType>
+ <Optimize>false</Optimize>
+ <OutputPath>..\..\..\bin\eventstore\debug\anycpu\</OutputPath>
+ <DefineConstants>TRACE;DEBUG</DefineConstants>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+ <DebugType>pdbonly</DebugType>
+ <Optimize>true</Optimize>
+ <OutputPath>..\..\..\bin\eventstore\release\anycpu\</OutputPath>
+ <ErrorReport>prompt</ErrorReport>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Debug|x86'">
+ <PlatformTarget>x86</PlatformTarget>
+ <OutputPath>..\..\..\bin\eventstore\debug\x86\</OutputPath>
+ <DefineConstants>TRACE;DEBUG</DefineConstants>
+ <DebugType>full</DebugType>
+ <WarningLevel>4</WarningLevel>
+ <Optimize>false</Optimize>
+ <DebugSymbols>true</DebugSymbols>
+ </PropertyGroup>
+ <PropertyGroup Condition="'$(Configuration)|$(Platform)' == 'Release|x86'">
+ <PlatformTarget>x86</PlatformTarget>
+ <OutputPath>..\..\..\bin\eventstore\release\x86\</OutputPath>
+ <Optimize>true</Optimize>
+ <DebugType>none</DebugType>
+ <WarningLevel>4</WarningLevel>
+ </PropertyGroup>
+ <ItemGroup>
+ <Reference Include="System" />
+ <Reference Include="System.Core" />
+ </ItemGroup>
+ <ItemGroup>
+ <Compile Include="BufferManager.cs" />
+ <Compile Include="BufferPool.cs" />
+ <Compile Include="BufferPoolStream.cs" />
+ <Compile Include="UnableToCreateMemoryException.cs" />
+ <Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="UnableToAllocateBufferException.cs" />
+ </ItemGroup>
+ <ItemGroup>
+ <ProjectReference Include="..\EventStore.Common\EventStore.Common.csproj">
+ <Project>{B4C9BE3D-43B1-4049-A23A-5DC53DB3F0B0}</Project>
+ <Name>EventStore.Common</Name>
+ </ProjectReference>
+ </ItemGroup>
+ <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+ <!-- To modify your build process, add your task inside one of the targets below and uncomment it.
+ Other similar extension points exist, see Microsoft.Common.targets.
+ <Target Name="BeforeBuild">
+ </Target>
+ <Target Name="AfterBuild">
+ </Target>
+ -->
+</Project>
67 src/EventStore/EventStore.BufferManagement/Properties/AssemblyInfo.cs
@@ -0,0 +1,67 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+
+[assembly: AssemblyTitle("EventStore.BufferManagement")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Eleks")]
+[assembly: AssemblyProduct("EventStore.BufferManagement")]
+[assembly: AssemblyCopyright("Copyright © Eleks 2012")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible
+// to COM components. If you need to access a type in this assembly from
+// COM, set the ComVisible attribute to true on that type.
+
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+
+[assembly: Guid("e9d0be81-0543-4567-9d94-bfe84194433f")]
+
+// Version information for an assembly consists of the following four values:
+//
+// Major Version
+// Minor Version
+// Build Number
+// Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]
39 src/EventStore/EventStore.BufferManagement/UnableToAllocateBufferException.cs
@@ -0,0 +1,39 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+
+namespace EventStore.BufferManagement
+{
+ public class UnableToAllocateBufferException : Exception
+ {
+ public UnableToAllocateBufferException()
+ : base("Couldn't allocate buffer after few trials.")
+ {
+ }
+ }
+}
39 src/EventStore/EventStore.BufferManagement/UnableToCreateMemoryException.cs
@@ -0,0 +1,39 @@
+// Copyright (c) 2012, Event Store LLP
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store LLP nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+
+namespace EventStore.BufferManagement
+{
+ public class UnableToCreateMemoryException : Exception
+ {
+ public UnableToCreateMemoryException()
+ : base("All buffers were in use and acquiring more memory has been disabled.")
+ {
+ }
+ }
+}
115 src/EventStore/EventStore.Client/Commands/CreateStreamProcessor.cs
@@ -0,0 +1,115 @@
+// Copyright (c) 2012, Event Store Ltd
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store Ltd nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+using System.Diagnostics;
+using System.Net.Sockets;
+using System.Text;
+using EventStore.Core.Data;
+using EventStore.Core.Messages;
+using EventStore.Core.Services.Transport.Tcp;
+
+namespace EventStore.TestClient.Commands
+{
+ internal class CreateStreamProcessor : ICmdProcessor
+ {
+ public string Usage { get { return "CR [<stream-id> [<metadata>]]"; } }
+ public string Keyword { get { return "CR"; } }
+
+ public bool Execute(CommandProcessorContext context, string[] args)
+ {
+ var eventStreamId = "test-stream";
+ string metadata = null;
+
+ if (args.Length > 0)
+ {
+ if (args.Length > 2)
+ return false;
+ eventStreamId = args[0];
+ if (args.Length > 1)
+ metadata = args[1];
+ }
+
+ context.IsAsync();
+
+ var createStreamDto = new ClientMessageDto.CreateStream(
+ Guid.Empty,
+ eventStreamId,
+ Encoding.UTF8.GetBytes(metadata ?? string.Format("{{\"StreamName\": \"{0}\"}}", eventStreamId)));
+ var package = new TcpPackage(TcpCommand.CreateStream, createStreamDto.Serialize());
+
+ var sw = new Stopwatch();
+
+ context.Client.CreateTcpConnection(
+ context,
+ connectionEstablished: conn =>
+ {
+ context.Log.Info("[{0}]: Trying to create stream '{1}'...", conn.EffectiveEndPoint, eventStreamId);
+ sw.Start();
+ conn.EnqueueSend(package.AsByteArray());
+ },
+ handlePackage: (conn, pkg) =>
+ {
+ if (pkg.Command != TcpCommand.CreateStreamCompleted)
+ {
+ context.Fail(reason: string.Format("Unexpected TCP package: {0}.", pkg.Command));
+ return;
+ }
+
+ sw.Stop();
+
+ var dto = pkg.Data.Deserialize<ClientMessageDto.CreateStreamCompleted>();
+ if ((OperationErrorCode)dto.ErrorCode == OperationErrorCode.Success)
+ {
+ context.Log.Info("Successfully created stream '{0}'.", dto.EventStreamId);
+ PerfUtils.LogTeamCityGraphData(string.Format("{0}-latency-ms", Keyword), (int)sw.ElapsedMilliseconds);
+ }
+ else
+ {
+ context.Log.Info("Error while creating stream {0}: {1} ({2}).",
+ eventStreamId,
+ dto.Error,
+ (OperationErrorCode)dto.ErrorCode);
+ }
+
+ context.Log.Info("Create stream request took: {0}.", sw.Elapsed);
+ conn.Close();
+ context.Success();
+ },
+ connectionClosed: (connection, error) =>
+ {
+ if (error == SocketError.Success)
+ context.Success();
+ else
+ context.Fail();
+ });
+
+ context.WaitForCompletion();
+ return true;
+ }
+ }
+}
115 src/EventStore/EventStore.Client/Commands/DeleteProcessor.cs
@@ -0,0 +1,115 @@
+// Copyright (c) 2012, Event Store Ltd
+// All rights reserved.
+//
+// Redistribution and use in source and binary forms, with or without
+// modification, are permitted provided that the following conditions are
+// met:
+//
+// Redistributions of source code must retain the above copyright notice,
+// this list of conditions and the following disclaimer.
+// Redistributions in binary form must reproduce the above copyright
+// notice, this list of conditions and the following disclaimer in the
+// documentation and/or other materials provided with the distribution.
+// Neither the name of the Event Store Ltd nor the names of its
+// contributors may be used to endorse or promote products derived from
+// this software without specific prior written permission
+// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+//
+using System;
+using System.Diagnostics;
+using System.Net.Sockets;
+using EventStore.Core.Data;
+using EventStore.Core.Messages;
+using EventStore.Core.Services.Transport.Tcp;
+
+namespace EventStore.TestClient.Commands
+{
+ internal class DeleteProcessor : ICmdProcessor
+ {
+ public string Usage { get { return "DEL [<stream-id> [<expected-version>]]"; } }
+ public string Keyword { get { return "DEL"; } }
+
+ public bool Execute(CommandProcessorContext context, string[] args)
+ {
+ var eventStreamId = "test-stream";
+