Skip to content
This repository has been archived by the owner on May 25, 2021. It is now read-only.

Commit

Permalink
rounded up a few things for issue #40
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Berardi committed Jun 20, 2012
1 parent 38b1243 commit 6851535
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 31 deletions.
16 changes: 16 additions & 0 deletions src/Connections/CassandraConnectionException.cs
@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace FluentCassandra.Connections
{
public class CassandraConnectionException : CassandraException
{
public CassandraConnectionException(string message)
: base(message) { }

public CassandraConnectionException(string message, Exception innerException)
: base(message, innerException) { }
}
}
102 changes: 72 additions & 30 deletions src/Connections/Connection.cs
Expand Up @@ -12,35 +12,49 @@ namespace FluentCassandra.Connections
/// <see href="http://github.com/robconery/NoRM/tree/master/NoRM/Connections/"/> /// <see href="http://github.com/robconery/NoRM/tree/master/NoRM/Connections/"/>
public class Connection : IConnection, IDisposable public class Connection : IConnection, IDisposable
{ {
private readonly ConnectionType _connectionType; private TTransport _transport;
private readonly int _bufferSize; private Cassandra.Client _client;

private readonly object _lock = new object();
private readonly TTransport _transport;
private readonly TProtocol _protocol;
private readonly Cassandra.Client _client;


/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
/// <param name="server"></param>
/// <param name="builder"></param> /// <param name="builder"></param>
internal Connection(Server server, IConnectionBuilder builder) public Connection(Server server, IConnectionBuilder builder)
{ : this(server, builder.ConnectionType, builder.BufferSize) { }
_connectionType = builder.ConnectionType;
_bufferSize = builder.BufferSize;


/// <summary>
///
/// </summary>
/// <param name="server"></param>
/// <param name="connectionType"></param>
/// <param name="bufferSize"></param>
public Connection(Server server, ConnectionType connectionType, int bufferSize)
{
Created = DateTime.UtcNow; Created = DateTime.UtcNow;
Server = server; Server = server;
ConnectionType = connectionType;
BufferSize = bufferSize;


var socket = new TSocket(server.Host, server.Port, server.Timeout * 1000); InitTransportAndClient();
}

/// <summary>
///
/// </summary>
private void InitTransportAndClient()
{
var socket = new TSocket(Server.Host, Server.Port, Server.Timeout * 1000);


switch (_connectionType) switch (ConnectionType)
{ {
case ConnectionType.Simple: case ConnectionType.Simple:
_transport = socket; _transport = socket;
break; break;


case ConnectionType.Buffered: case ConnectionType.Buffered:
_transport = new TBufferedTransport(socket, _bufferSize); _transport = new TBufferedTransport(socket, BufferSize);
break; break;


case ConnectionType.Framed: case ConnectionType.Framed:
Expand All @@ -51,8 +65,8 @@ internal Connection(Server server, IConnectionBuilder builder)
goto case ConnectionType.Framed; goto case ConnectionType.Framed;
} }


_protocol = new TBinaryProtocol(_transport); var protocol = new TBinaryProtocol(_transport);
_client = new Cassandra.Client(_protocol); _client = new Cassandra.Client(protocol);
} }


/// <summary> /// <summary>
Expand All @@ -73,6 +87,24 @@ public Server Server
private set; private set;
} }


/// <summary>
///
/// </summary>
public ConnectionType ConnectionType
{
get;
private set;
}

/// <summary>
///
/// </summary>
public int BufferSize
{
get;
private set;
}

/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
Expand All @@ -83,11 +115,8 @@ public bool IsOpen
if (_transport == null) if (_transport == null)
return false; return false;


lock (_transport) lock (_lock)
{ return _transport.IsOpen;
try { return _transport.IsOpen; }
catch { return false; }
}
} }
} }


Expand All @@ -101,7 +130,10 @@ public void Open()
if (IsOpen) if (IsOpen)
return; return;


lock (_transport) if (_transport == null)
InitTransportAndClient();

lock (_lock)
_transport.Open(); _transport.Open();
} }


Expand All @@ -115,8 +147,12 @@ public void Close()
if (!IsOpen) if (!IsOpen)
return; return;


lock (_transport) lock (_lock)
{
_transport.Close(); _transport.Close();
_transport = null;
_client = null;
}
} }


/// <summary> /// <summary>
Expand All @@ -125,6 +161,11 @@ public void Close()
/// <param name="keyspace"></param> /// <param name="keyspace"></param>
public void SetKeyspace(string keyspace) public void SetKeyspace(string keyspace)
{ {
CheckWasDisposed();

if (!IsOpen)
throw new CassandraConnectionException("A connection to Cassandra has not been opened.");

Client.set_keyspace(keyspace); Client.set_keyspace(keyspace);
} }


Expand All @@ -134,6 +175,11 @@ public void SetKeyspace(string keyspace)
/// <param name="cqlVersion"></param> /// <param name="cqlVersion"></param>
public void SetCqlVersion(string cqlVersion) public void SetCqlVersion(string cqlVersion)
{ {
CheckWasDisposed();

if (!IsOpen)
throw new CassandraConnectionException("A connection to Cassandra has not been opened.");

Client.set_cql_version(cqlVersion); Client.set_cql_version(cqlVersion);
} }


Expand All @@ -142,10 +188,10 @@ public void SetCqlVersion(string cqlVersion)
/// </summary> /// </summary>
public Cassandra.Client Client public Cassandra.Client Client
{ {
get get
{ {
CheckWasDisposed(); lock(_lock)
return _client; return _client;
} }
} }


Expand Down Expand Up @@ -195,10 +241,6 @@ protected virtual void Dispose(bool disposing)
if (!WasDisposed && disposing && _transport != null) if (!WasDisposed && disposing && _transport != null)
{ {
Close(); Close();

_client = null;
_protocol = null;
_transport = null;
} }


WasDisposed = true; WasDisposed = true;
Expand Down
1 change: 1 addition & 0 deletions src/FluentCassandra.csproj
Expand Up @@ -106,6 +106,7 @@
<Compile Include="CassandraSuperColumnFamilyOperations.cs" /> <Compile Include="CassandraSuperColumnFamilyOperations.cs" />
<Compile Include="CassandraSuperColumnFamily.cs" /> <Compile Include="CassandraSuperColumnFamily.cs" />
<Compile Include="CassandraTokenRange.cs" /> <Compile Include="CassandraTokenRange.cs" />
<Compile Include="Connections\CassandraConnectionException.cs" />
<Compile Include="Connections\Connection.cs" /> <Compile Include="Connections\Connection.cs" />
<Compile Include="Connections\ConnectionBuilder.cs" /> <Compile Include="Connections\ConnectionBuilder.cs" />
<Compile Include="Connections\ConnectionProvider.cs" /> <Compile Include="Connections\ConnectionProvider.cs" />
Expand Down
2 changes: 1 addition & 1 deletion test/FluentCassandra.Tests/FluentCassandra.Tests.csproj
Expand Up @@ -53,7 +53,7 @@
<Compile Include="Bugs\Issue28GuidGeneratorInParallelContext.cs" /> <Compile Include="Bugs\Issue28GuidGeneratorInParallelContext.cs" />
<Compile Include="Bugs\Issue25JavaBigDecimalBinaryConversion.cs" /> <Compile Include="Bugs\Issue25JavaBigDecimalBinaryConversion.cs" />
<Compile Include="Bugs\Issue36KeyAliasSupport.cs" /> <Compile Include="Bugs\Issue36KeyAliasSupport.cs" />
<Compile Include="Bugs\IssueXXCompositeTypeAsKey.cs" /> <Compile Include="Bugs\Issue39CompositeTypeAsKey.cs" />
<Compile Include="CassandraDatabaseSetupFixture.cs" /> <Compile Include="CassandraDatabaseSetupFixture.cs" />
<Compile Include="CassandraQueryTest.cs" /> <Compile Include="CassandraQueryTest.cs" />
<Compile Include="Connections\ConnectionProviderTests.cs" /> <Compile Include="Connections\ConnectionProviderTests.cs" />
Expand Down

0 comments on commit 6851535

Please sign in to comment.