Skip to content

Commit

Permalink
NCBC-1066: Support for async K/V method over TLS
Browse files Browse the repository at this point in the history
Motivation
----------
The async Key/Value methods for async operations were not implemented if
UseSsl was enabled.

Modifications
-------------
Added implementation for SSL/TLS to SslConnection for async memcached
requests.

Results
-------
The client now supports async operation while encrypting/decrypting
communication between cluster and app servers.

Change-Id: I4febf4e45a2a4472f32faac40a8f3f7528575061
Reviewed-on: http://review.couchbase.org/58710
Reviewed-by: Jeffry Morris <jeffrymorris@gmail.com>
Tested-by: Jeffry Morris <jeffrymorris@gmail.com>
  • Loading branch information
jeffrymorris committed Jan 26, 2016
1 parent fca6c6b commit b6b5de3
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@
</ItemGroup>
<ItemGroup>
<Compile Include="CouchbaseBucket_KeyValue_Tests.cs" />
<Compile Include="CouchbaseBucket_Ssl_Tests.cs">
<SubType>Code</SubType>
</Compile>
<Compile Include="GlobalSetup.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Utils\TestConfiguration.cs" />
Expand Down
193 changes: 193 additions & 0 deletions Src/Couchbase.IntegrationTests/CouchbaseBucket_Ssl_Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@

using Couchbase.Core;
using Couchbase.IO;
using NUnit.Framework;

namespace Couchbase.IntegrationTests
{
[TestFixture]
public class CouchbaseBucketSslTests
{
private ICluster _cluster;
private IBucket _bucket;

[TestFixtureSetUp]
public void TestFixtureSetUp()
{
_cluster = new Cluster(Utils.TestConfiguration.GetConfiguration("ssl"));
_bucket = _cluster.OpenBucket();
}

[Test]
public async void Test_GetAsync()
{
var key = "thekey";
var value = "thevalue";

await _bucket.RemoveAsync(key);
await _bucket.InsertAsync(key, value);
var result = await _bucket.GetAsync<string>(key);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public async void Test_UpsertAsync()
{
var key = "thekey";
var value = "thevalue";

//await _bucket.RemoveAsync(key);
var result = await _bucket.UpsertAsync(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public async void Test_InsertAsync()
{
var key = "thekey";
var value = "thevalue";

await _bucket.RemoveAsync(key);
var result = await _bucket.InsertAsync(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public async void Test_RemoveAsync()
{
var key = "thekey";
var value = "thevalue";

await _bucket.RemoveAsync(key);
var result = await _bucket.GetAsync<string>(key);
Assert.AreEqual(ResponseStatus.KeyNotFound, result.Status);
}

[Test]
public void Test_Get()
{
var key = "thekey";
var value = "thevalue";

_bucket.Remove(key);
_bucket.Insert(key, value);
var result = _bucket.Get<string>(key);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public void Test_Upsert()
{
var key = "thekey";
var value = "thevalue";

_bucket.Remove(key);
var result = _bucket.Upsert(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public void Test_Insert()
{
var key = "thekey";
var value = "thevalue";

_bucket.Remove(key);
var result = _bucket.Insert(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public void Test_Remove()
{
var key = "thekey";
var value = "thevalue";

_bucket.Remove(key);
var result = _bucket.Get<string>(key);
Assert.AreEqual(ResponseStatus.KeyNotFound, result.Status);
}

[Test]
public void Insert_When_Buffer_Is_Smaller_Than_Payload_Return_Success()
{
var key = "thekey";
var value = new string[1024*17];//default buffer is ~16kb

_bucket.Remove(key);
var result = _bucket.Insert(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public async void InsertAsync_When_Buffer_Is_Smaller_Than_Payload_Return_Success()
{
var key = "thekey";
var value = new string[1024 * 17];//default buffer is ~16kb

await _bucket.RemoveAsync(key);
var result = await _bucket.InsertAsync(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);
}

[Test]
public void Insert_And_Get_When_Buffer_Is_Smaller_Than_Payload_Return_Success()
{
var key = "thekey";
var value = new string[1024 * 17];//default buffer is ~16kb

_bucket.Remove(key);
var result = _bucket.Insert(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);

result = _bucket.Get<string[]>(key);
Assert.AreEqual(ResponseStatus.Success, result.Status);
Assert.AreEqual(value, result.Value);
}

[Test]
public async void InsertAsync_And_GetAsync_When_Buffer_Is_Smaller_Than_Payload_Return_Success()
{
var key = "thekey";
var value = new string[1024 * 17];//default buffer is ~16kb

_bucket.Remove(key);
var result = await _bucket.InsertAsync(key, value);
Assert.AreEqual(ResponseStatus.Success, result.Status);

result = await _bucket.GetAsync<string[]>(key);
Assert.AreEqual(ResponseStatus.Success, result.Status);
Assert.AreEqual(value, result.Value);
}

[TestFixtureTearDown]
public void TestFixtureTearDown()
{
_cluster.CloseBucket(_bucket);
_cluster.Dispose();
}
}
}

#region [ License information ]

/* ************************************************************
*
* @author Couchbase <info@couchbase.com>
* @copyright 2015 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ************************************************************/

#endregion
2 changes: 1 addition & 1 deletion Src/Couchbase/Configuration/Client/ClientConfiguration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public ClientConfiguration(CouchbaseClientSection section)
MinSize = section.ConnectionPool.MinSize,
WaitTimeout = section.ConnectionPool.WaitTimeout,
ShutdownTimeout = section.ConnectionPool.ShutdownTimeout,
UseSsl = section.ConnectionPool.UseSsl,
UseSsl = UseSsl ? UseSsl : section.ConnectionPool.UseSsl,
BufferSize = section.ConnectionPool.BufferSize,
BufferAllocator = (p) => new BufferAllocator(p.MaxSize * p.BufferSize, p.BufferSize),
ConnectTimeout = section.ConnectionPool.ConnectTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,32 @@ namespace Couchbase.Configuration.Client.Providers
/// </summary>
public class ConnectionPoolElement : ConfigurationElement
{
private const string DefaultTypeName =
"Couchbase.IO.ConnectionPool`1[Couchbase.IO.Connection], Couchbase.NetClient";

private const string DefaultSslTypeName =
"Couchbase.IO.ConnectionPool`1[Couchbase.IO.SslConnection], Couchbase.NetClient";

/// <summary>
/// Gets or sets the <see cref="Type"/> of the custom <see cref="IConnectionPool"/>
/// </summary>
/// <value>
/// The type.
/// </value>
[ConfigurationProperty("type", DefaultValue = "Couchbase.IO.ConnectionPool`1[Couchbase.IO.Connection], Couchbase.NetClient", IsRequired = false, IsKey = false)]
[ConfigurationProperty("type", DefaultValue = DefaultTypeName, IsRequired = false, IsKey = false)]
public string Type
{
get { return (string)this["type"]; }
get
{
var typeName = (string) this["type"];

//if ssl is enabled and no custom type is being used, default to the SslConnection class.
if (UseSsl && typeName.Equals(DefaultTypeName))
{
typeName = DefaultSslTypeName;
}
return typeName;
}
set { this["type"] = value; }
}

Expand Down
65 changes: 65 additions & 0 deletions Src/Couchbase/IO/SslConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Net.Sockets;
using System.Security.Authentication;
using System.Security.Cryptography.X509Certificates;
using System.Threading.Tasks;
using Couchbase.Core.Diagnostics;
using Couchbase.IO.Converters;
using Couchbase.IO.Operations;
Expand Down Expand Up @@ -53,6 +54,70 @@ public override void Authenticate()
}
}

public override async void SendAsync(byte[] request, Func<SocketAsyncState, Task> callback)
{
SocketAsyncState state = null;
byte[] buffer = null;
try
{
state = new SocketAsyncState
{
Data = new MemoryStream(),
Opaque = Converter.ToUInt32(request, HeaderIndexFor.Opaque),
Buffer = request,
Completed = callback
};

await _sslStream.WriteAsync(request, 0, request.Length);

state.Buffer = BufferManager.TakeBuffer(Configuration.BufferSize);
state.BytesReceived = await _sslStream.ReadAsync(state.Buffer, 0, Configuration.BufferSize);

//write the received buffer to the state obj
await state.Data.WriteAsync(state.Buffer, 0, state.BytesReceived);

state.BodyLength = Converter.ToInt32(state.Buffer, HeaderIndexFor.BodyLength);
while (state.BytesReceived < state.BodyLength + 24)
{
var bufferLength = state.Buffer.Length - state.BytesSent < Configuration.BufferSize
? state.Buffer.Length - state.BytesSent
: Configuration.BufferSize;

state.BytesReceived += await _sslStream.ReadAsync(state.Buffer, 0, bufferLength);
await state.Data.WriteAsync(state.Buffer, 0, state.BytesReceived - (int)state.Data.Length);
}
await callback(state);
}
catch (Exception e)
{
IsDead = true;
if (state == null)
{
callback(new SocketAsyncState
{
Exception = e,
Status = (e is SocketException)
? ResponseStatus.TransportFailure
: ResponseStatus.ClientFailure
});
}
else
{
state.Exception = e;
state.Completed(state);
Log.Debug(e);
}
}
finally
{
ConnectionPool.Release(this);
if (buffer != null)
{
BufferManager.ReturnBuffer(buffer);
}
}
}

public override byte[] Send(byte[] buffer)
{
var state = new SocketAsyncState
Expand Down

0 comments on commit b6b5de3

Please sign in to comment.