Permalink
Browse files

NCBC-27: Client should support OBSERVE command

Change-Id: Ie9414d2675f2ad14814d7feb20a94e5cae3d1328
Reviewed-on: http://review.couchbase.org/19126
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: John C. Zablocki <john@couchbase.com>
  • Loading branch information...
1 parent c104a08 commit bd8bb2e21f40ff51b9259b90138d3994edcf0c62 @johnzablocki johnzablocki committed with John C. Zablocki Jul 27, 2012
Showing with 1,418 additions and 15 deletions.
  1. +1 −0 src/Couchbase.Tests/Couchbase.Tests.csproj
  2. +149 −0 src/Couchbase.Tests/CouchbaseClientObserveTests.cs
  3. +3 −1 src/Couchbase.Tests/CouchbaseClientTestsBase.cs
  4. +6 −0 src/Couchbase/BasicCouchbaseOperationFactory.cs
  5. +8 −0 src/Couchbase/Configuration/CouchbaseClientConfiguration.cs
  6. +5 −0 src/Couchbase/Configuration/CouchbaseClientSection.cs
  7. +1 −0 src/Couchbase/Configuration/ICouchbaseClientConfiguration.cs
  8. +8 −0 src/Couchbase/Configuration/ServersElement.cs
  9. +13 −0 src/Couchbase/Couchbase.csproj
  10. +135 −13 src/Couchbase/CouchbaseClient.cs
  11. +48 −0 src/Couchbase/CouchbaseNode.cs
  12. +14 −1 src/Couchbase/CouchbasePool.cs
  13. +1 −0 src/Couchbase/ICouchbaseOperationFactory.cs
  14. +14 −0 src/Couchbase/ICouchbaseResultsClient.cs
  15. +4 −0 src/Couchbase/ICouchbaseServerPool.cs
  16. +240 −0 src/Couchbase/ObserveHandler.cs
  17. +37 −0 src/Couchbase/ObservedNode.cs
  18. +1 −0 src/Couchbase/OperationInterfaces.cs
  19. +34 −0 src/Couchbase/Operations/Constants/ObserveOperationConstants.cs
  20. +36 −0 src/Couchbase/Operations/ObserveKeyState.cs
  21. +93 −0 src/Couchbase/Operations/ObserveOperation.cs
  22. +40 −0 src/Couchbase/Operations/ObservedKey.cs
  23. +33 −0 src/Couchbase/Operations/PersistTo.cs
  24. +33 −0 src/Couchbase/Operations/ReplicateTo.cs
  25. +38 −0 src/Couchbase/Protocol/CouchbaseOpCode.cs
  26. +130 −0 src/Couchbase/Protocol/ObserveRequest.cs
  27. +141 −0 src/Couchbase/Protocol/ObserveResponse.cs
  28. +58 −0 src/Couchbase/Results/IObserveOperationResult.cs
  29. +43 −0 src/Couchbase/Results/ObserveOperationResult.cs
  30. +45 −0 src/Couchbase/Settings/ObserveSettings.cs
  31. +6 −0 src/Couchbase/VBucketAwareOperationFactory.cs
@@ -50,6 +50,7 @@
<Compile Include="ConfigHelperTests.cs" />
<Compile Include="CouchbaseAuthenticatedViewTests.cs" />
<Compile Include="CouchbaseClientGenericViewTests.cs" />
+ <Compile Include="CouchbaseClientObserveTests.cs" />
<Compile Include="CouchbaseClientViewNameTransformerTests.cs" />
<Compile Include="CouchbaseClientViewPagingTests.cs" />
<Compile Include="CouchbaseClientViewParameterTests.cs">
@@ -0,0 +1,149 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using NUnit.Framework;
+using Enyim.Caching.Memcached;
+using Couchbase.Operations;
+using Couchbase.Operations.Constants;
+
+namespace Couchbase.Tests
+{
+ [TestFixture]
+ public class CouchbaseClientObserveTests : CouchbaseClientTestsBase
+ {
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Succeed_When_Persist_Is_One_And_Replicate_Is_Default_Cas_Is_Same()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value, PersistTo.One);
+ StoreAssertPass(storeResult);
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Succeed_When_Persist_Is_One_And_Cas_Is_Same()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas, PersistTo.One, ReplicateTo.Zero);
+ Assert.That(observeResult.Success, Is.True, observeResult.Message);
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Fail_When_Persist_Is_One_And_Cas_Is_Different()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas - 1, PersistTo.One, ReplicateTo.Zero);
+ Assert.That(observeResult.Success, Is.Not.True);
+ Assert.That(observeResult.Message, Is.StringMatching(ObserveOperationConstants.MESSAGE_MODIFIED));
+
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Succeed_When_Persist_Is_Two_And_Cas_Is_Same()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas, PersistTo.Two, ReplicateTo.Zero);
+ Assert.That(observeResult.Success, Is.True, observeResult.Message);
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Fail_When_Persist_Is_Two_And_Cas_Is_Different()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas - 1, PersistTo.Two, ReplicateTo.Zero);
+ Assert.That(observeResult.Success, Is.Not.True);
+ Assert.That(observeResult.Message, Is.StringMatching(ObserveOperationConstants.MESSAGE_MODIFIED));
+
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Succeed_When_Persist_Is_Two_And_Replicate_Is_Two_And_Cas_Is_Same()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas, PersistTo.Two, ReplicateTo.Two);
+ Assert.That(observeResult.Success, Is.True, observeResult.Message);
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Fail_When_Persist_Is_Two_And_Replicate_Is_Two_And_Cas_Is_Different()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas - 1, PersistTo.Two, ReplicateTo.Two);
+ Assert.That(observeResult.Success, Is.Not.True);
+ Assert.That(observeResult.Message, Is.StringMatching(ObserveOperationConstants.MESSAGE_MODIFIED));
+
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Succeed_When_Persist_Is_Zero_And_Replicate_Is_Two_And_Cas_Is_Same()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas, PersistTo.Two, ReplicateTo.Two);
+ Assert.That(observeResult.Success, Is.True, observeResult.Message);
+ }
+
+ [Test]
+ public void When_Storing_A_New_Key_Observe_Will_Fail_When_Persist_Is_Zero_And_Replicate_Is_Two_And_Cas_Is_Different()
+ {
+ var key = GetUniqueKey("store");
+ var value = GetRandomString();
+ var storeResult = _Client.ExecuteStore(StoreMode.Set, key, value);
+ StoreAssertPass(storeResult);
+
+ var observeResult = _Client.Observe(key, storeResult.Cas - 1, PersistTo.Two, ReplicateTo.Two);
+ Assert.That(observeResult.Success, Is.Not.True);
+ Assert.That(observeResult.Message, Is.StringMatching(ObserveOperationConstants.MESSAGE_MODIFIED));
+
+ }
+ }
+}
+
+#region [ License information ]
+/* ************************************************************
+ *
+ * @author Couchbase <info@couchbase.com>
+ * @copyright 2012 Couchbase, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * ************************************************************/
+#endregion
@@ -21,10 +21,12 @@ public abstract class CouchbaseClientTestsBase
public void SetUp()
{
var config = new CouchbaseClientConfiguration();
- config.Urls.Add(new Uri("http://localhost:8091/pools"));
+ config.Urls.Add(new Uri("http://127.0.0.1:8091/pools"));
config.DesignDocumentNameTransformer = new ProductionModeNameTransformer();
config.HttpClientFactory = new HammockHttpClientFactory();
config.Bucket = "default";
+ config.SocketPool.ConnectionTimeout = TimeSpan.FromMilliseconds(20000);
+ config.ObserveTimeout = TimeSpan.FromSeconds(5);
_Client = new CouchbaseClient(config);
}
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using Couchbase.Operations;
namespace Couchbase
{
@@ -21,6 +22,11 @@ ISyncOperation ICouchbaseOperationFactory.Sync(SyncMode mode, IList<KeyValuePair
{
throw new NotSupportedException("Sync is not supported on memcached buckets.");
}
+
+ IObserveOperation ICouchbaseOperationFactory.Observe(string key, int vbucket, ulong cas)
+ {
+ return new ObserveOperation(key, vbucket, cas);
+ }
}
}
@@ -124,6 +124,7 @@ public ITranscoder Transcoder
public int RetryCount { get; set; }
public TimeSpan RetryTimeout { get; set; }
+ public TimeSpan ObserveTimeout { get; set; }
#region [ interface ]
@@ -194,6 +195,7 @@ internal class ReadOnlyConfig : ICouchbaseClientConfiguration
private Uri[] urls;
private TimeSpan retryTimeout;
private int retryCount;
+ private TimeSpan observeTimeout;
private ISocketPoolConfiguration spc;
private IHeartbeatMonitorConfiguration hbm;
@@ -207,6 +209,7 @@ public ReadOnlyConfig(ICouchbaseClientConfiguration original)
this.retryCount = original.RetryCount;
this.retryTimeout = original.RetryTimeout;
+ this.observeTimeout = original.ObserveTimeout;
this.spc = new SPC(original.SocketPool);
this.hbm = new HBM(original.HeartbeatMonitor);
@@ -265,6 +268,11 @@ IPerformanceMonitor ICouchbaseClientConfiguration.CreatePerformanceMonitor()
return this.original.CreatePerformanceMonitor();
}
+ TimeSpan ICouchbaseClientConfiguration.ObserveTimeout
+ {
+ get { return this.observeTimeout; }
+ }
+
TimeSpan ICouchbaseClientConfiguration.RetryTimeout
{
get { return this.retryTimeout; }
@@ -199,6 +199,11 @@ TimeSpan ICouchbaseClientConfiguration.RetryTimeout
get { return this.Servers.RetryTimeout; }
}
+ TimeSpan ICouchbaseClientConfiguration.ObserveTimeout
+ {
+ get { return this.Servers.ObserveTimeout; }
+ }
+
#endregion
}
}
@@ -65,6 +65,7 @@ public interface ICouchbaseClientConfiguration
IHttpClient CreateHttpClient(Uri baseUri);
TimeSpan RetryTimeout { get; }
+ TimeSpan ObserveTimeout { get; }
int RetryCount { get; }
}
}
@@ -95,6 +95,14 @@ public TimeSpan RetryTimeout
get { return (TimeSpan)base["retryTimeout"]; }
set { base["retryTimeout"] = value; }
}
+
+ [ConfigurationProperty("observeTimeout", IsRequired = false, DefaultValue = "00:01:00"), PositiveTimeSpanValidator]
+ [TypeConverter(typeof(TimeSpanConverter))]
+ public TimeSpan ObserveTimeout
+ {
+ get { return (TimeSpan)base["observeTimeout"]; }
+ set { base["observeTimeout"] = value; }
+ }
}
}
@@ -103,6 +103,9 @@
<Compile Include="HeartbeatSettings.cs" />
<Compile Include="ICouchbaseClient.cs" />
<Compile Include="ICouchbaseResultsClient.cs" />
+ <Compile Include="ObservedNode.cs" />
+ <Compile Include="ObserveHandler.cs" />
+ <Compile Include="Operations\Constants\ObserveOperationConstants.cs" />
<Compile Include="Operations\GetAndTouchOperation.cs" />
<Compile Include="BasicCouchbaseOperationFactory.cs" />
<Compile Include="OperationInterfaces.cs" />
@@ -112,10 +115,20 @@
<Compile Include="MessageStreamListener.cs" />
<Compile Include="CouchbaseClient.cs" />
<Compile Include="CouchbasePool.cs" />
+ <Compile Include="Operations\ObserveKeyState.cs" />
+ <Compile Include="Operations\ObserveOperation.cs" />
+ <Compile Include="Operations\PersistTo.cs" />
+ <Compile Include="Operations\ReplicateTo.cs" />
<Compile Include="Operations\SyncOperation.cs" />
<Compile Include="PagedView.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Operations\TouchOperation.cs" />
+ <Compile Include="Protocol\CouchbaseOpCode.cs" />
+ <Compile Include="Protocol\ObserveResponse.cs" />
+ <Compile Include="Protocol\ObserveRequest.cs" />
+ <Compile Include="Results\IObserveOperationResult.cs" />
+ <Compile Include="Results\ObserveOperationResult.cs" />
+ <Compile Include="Settings\ObserveSettings.cs" />
<Compile Include="VBucketAwareOperationFactory.cs" />
<Compile Include="WebClientWithTimeout.cs">
<SubType>Component</SubType>
Oops, something went wrong.

0 comments on commit bd8bb2e

Please sign in to comment.