Skip to content
This repository has been archived by the owner on May 25, 2021. It is now read-only.

Commit

Permalink
got rid of the CassandraSession.Current since it was an anti-pattern …
Browse files Browse the repository at this point in the history
…that didn't enforce best practices, and caused issues in threading environments
  • Loading branch information
Nick Berardi committed May 25, 2012
1 parent 8876f7e commit bfe118e
Show file tree
Hide file tree
Showing 18 changed files with 192 additions and 198 deletions.
35 changes: 2 additions & 33 deletions src/BaseCassandraColumnFamily.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace FluentCassandra
/// <seealso href="http://wiki.apache.org/cassandra/API"/> /// <seealso href="http://wiki.apache.org/cassandra/API"/>
public abstract class BaseCassandraColumnFamily public abstract class BaseCassandraColumnFamily
{ {
private CassandraContext _context; private readonly CassandraContext _context;


/// <summary> /// <summary>
/// ///
Expand All @@ -21,7 +21,6 @@ public BaseCassandraColumnFamily(CassandraContext context, string columnFamily)
{ {
_context = context; _context = context;
FamilyName = columnFamily; FamilyName = columnFamily;
ThrowErrors = context.ThrowErrors;
} }


/// <summary> /// <summary>
Expand All @@ -34,16 +33,6 @@ public BaseCassandraColumnFamily(CassandraContext context, string columnFamily)
/// </summary> /// </summary>
public string FamilyName { get; private set; } public string FamilyName { get; private set; }


/// <summary>
/// The last error that occured during the execution of an operation.
/// </summary>
public CassandraException LastError { get; private set; }

/// <summary>
/// Indicates if errors should be thrown when occuring on opperation.
/// </summary>
public bool ThrowErrors { get; set; }

/// <summary> /// <summary>
/// Verifies that the family passed in is part of this family. /// Verifies that the family passed in is part of this family.
/// </summary> /// </summary>
Expand Down Expand Up @@ -109,29 +98,9 @@ public void RemoveAllRows()
/// <returns></returns> /// <returns></returns>
public TResult ExecuteOperation<TResult>(ColumnFamilyOperation<TResult> action, bool? throwOnError = null) public TResult ExecuteOperation<TResult>(ColumnFamilyOperation<TResult> action, bool? throwOnError = null)
{ {
if (!throwOnError.HasValue)
throwOnError = ThrowErrors;

var localSession = CassandraSession.Current == null;
var session = CassandraSession.Current;
if (session == null)
session = _context.OpenSession();

action.Context = _context;
action.ColumnFamily = this; action.ColumnFamily = this;


try return _context.ExecuteOperation(action, throwOnError);
{
var result = session.ExecuteOperation(action, throwOnError);
LastError = session.LastError;

return result;
}
finally
{
if (localSession && session != null)
session.Dispose();
}
} }


public CassandraSlicePredicateQuery<TResult> CreateCassandraSlicePredicateQuery<TResult>(Expression expression) public CassandraSlicePredicateQuery<TResult> CreateCassandraSlicePredicateQuery<TResult>(Expression expression)
Expand Down
7 changes: 0 additions & 7 deletions src/CassandraColumnFamilyOperations.cs
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,5 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using FluentCassandra.Linq;
using FluentCassandra.Operations; using FluentCassandra.Operations;
using FluentCassandra.Types; using FluentCassandra.Types;


Expand Down Expand Up @@ -128,11 +127,5 @@ public static CassandraSlicePredicateQuery<FluentColumnFamily> Get(this Cassandr
} }


#endregion #endregion

public static IEnumerable<ICqlRow> Cql(this CassandraColumnFamily family, UTF8Type cqlQuery)
{
var op = new ExecuteCqlQuery(cqlQuery, family.Context.ConnectionBuilder.CompressCqlQueries);
return family.ExecuteOperation(op);
}
} }
} }
61 changes: 28 additions & 33 deletions src/CassandraContext.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ namespace FluentCassandra
{ {
public class CassandraContext : IDisposable public class CassandraContext : IDisposable
{ {
private IList<IFluentMutationTracker> _trackers; private readonly IList<IFluentMutationTracker> _trackers;
private readonly ConnectionBuilder _connectionBuilder;
private CassandraSession _session;


/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
/// <param name="keyspace"></param> /// <param name="keyspace"></param>
/// <param name="server"></param> /// <param name="server"></param>
/// <param name="timeout"></param> /// <param name="timeout"></param>
public CassandraContext(string keyspace, Server server, string username = null, string password = null) public CassandraContext(string keyspace, Server server)
: this(keyspace, server.Host, server.Port, server.Timeout, username, password) { } : this(keyspace, server.Host, server.Port, server.Timeout) { }


/// <summary> /// <summary>
/// ///
Expand All @@ -30,8 +32,8 @@ public CassandraContext(string keyspace, Server server, string username = null,
/// <param name="port"></param> /// <param name="port"></param>
/// <param name="timeout"></param> /// <param name="timeout"></param>
/// <param name="provider"></param> /// <param name="provider"></param>
public CassandraContext(string keyspace, string host, int port = Server.DefaultPort, int timeout = Server.DefaultTimeout, string username = null, string password = null) public CassandraContext(string keyspace, string host, int port = Server.DefaultPort, int timeout = Server.DefaultTimeout)
: this(new ConnectionBuilder(keyspace, host, port, timeout, username: username, password: password)) { } : this(new ConnectionBuilder(keyspace, host, port, timeout)) { }


/// <summary> /// <summary>
/// ///
Expand All @@ -40,6 +42,16 @@ public CassandraContext(string keyspace, string host, int port = Server.DefaultP
public CassandraContext(string connectionString) public CassandraContext(string connectionString)
: this(new ConnectionBuilder(connectionString)) { } : this(new ConnectionBuilder(connectionString)) { }


/// <summary>
///
/// </summary>
/// <param name="session"></param>
public CassandraContext(CassandraSession session)
: this(session.ConnectionBuilder)
{
_session = session;
}

/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
Expand All @@ -49,9 +61,9 @@ public CassandraContext(ConnectionBuilder connectionBuilder)
ThrowErrors = true; ThrowErrors = true;


_trackers = new List<IFluentMutationTracker>(); _trackers = new List<IFluentMutationTracker>();
_connectionBuilder = connectionBuilder;


ConnectionBuilder = connectionBuilder; Keyspace = new CassandraKeyspace(_connectionBuilder.Keyspace, this);
Keyspace = new CassandraKeyspace(connectionBuilder.Keyspace, this);
} }


/// <summary> /// <summary>
Expand Down Expand Up @@ -265,7 +277,7 @@ public void SaveChanges(IFluentRecord record)
/// <param name="cqlQuery"></param> /// <param name="cqlQuery"></param>
public IEnumerable<ICqlRow> ExecuteQuery(UTF8Type cqlQuery) public IEnumerable<ICqlRow> ExecuteQuery(UTF8Type cqlQuery)
{ {
var op = new ExecuteCqlQuery(cqlQuery, ConnectionBuilder.CompressCqlQueries); var op = new ExecuteCqlQuery(cqlQuery);
return ExecuteOperation(op); return ExecuteOperation(op);
} }


Expand All @@ -275,24 +287,10 @@ public IEnumerable<ICqlRow> ExecuteQuery(UTF8Type cqlQuery)
/// <param name="cqlQuery"></param> /// <param name="cqlQuery"></param>
public void ExecuteNonQuery(UTF8Type cqlQuery) public void ExecuteNonQuery(UTF8Type cqlQuery)
{ {
var op = new ExecuteCqlNonQuery(cqlQuery, ConnectionBuilder.CompressCqlQueries); var op = new ExecuteCqlNonQuery(cqlQuery);
ExecuteOperation(op); ExecuteOperation(op);
} }


/// <summary>
/// Open a session against the database.
/// </summary>
/// <returns></returns>
public CassandraSession OpenSession()
{
return new CassandraSession(ConnectionBuilder);
}

/// <summary>
/// The connection builder that is currently in use for this context.
/// </summary>
public ConnectionBuilder ConnectionBuilder { get; private set; }

/// <summary> /// <summary>
/// The last error that occured during the execution of an operation. /// The last error that occured during the execution of an operation.
/// </summary> /// </summary>
Expand All @@ -315,19 +313,16 @@ public TResult ExecuteOperation<TResult>(Operation<TResult> action, bool? throwO
if (WasDisposed) if (WasDisposed)
throw new ObjectDisposedException(GetType().FullName); throw new ObjectDisposedException(GetType().FullName);


if (!throwOnError.HasValue) var localSession = _session == null;
throwOnError = ThrowErrors; var session = _session;

var localSession = CassandraSession.Current == null;
var session = CassandraSession.Current;
if (session == null) if (session == null)
session = OpenSession(); session = new CassandraSession(_connectionBuilder);


action.Context = this; action.Context = this;


try try
{ {
var result = session.ExecuteOperation(action, throwOnError); var result = session.ExecuteOperation(action, throwOnError ?? ThrowErrors);
LastError = session.LastError; LastError = session.LastError;


return result; return result;
Expand Down Expand Up @@ -367,10 +362,10 @@ public void Dispose()
/// </param> /// </param>
protected virtual void Dispose(bool disposing) protected virtual void Dispose(bool disposing)
{ {
if (!WasDisposed && disposing && CassandraSession.Current != null) if (!WasDisposed && disposing && _session != null)
{ {
CassandraSession.Current.Dispose(); _session.Dispose();
CassandraSession.Current = null; _session = null;
} }


WasDisposed = true; WasDisposed = true;
Expand Down
11 changes: 6 additions & 5 deletions src/CassandraKeyspace.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ namespace FluentCassandra
public class CassandraKeyspace public class CassandraKeyspace
{ {
private readonly string _keyspaceName; private readonly string _keyspaceName;
private readonly CassandraContext _context;

private CassandraKeyspaceSchema _cachedSchema; private CassandraKeyspaceSchema _cachedSchema;
private CassandraContext _context;


public CassandraKeyspace(string keyspaceName, CassandraContext context) public CassandraKeyspace(string keyspaceName, CassandraContext context)
{ {
Expand Down Expand Up @@ -64,8 +65,8 @@ public void TryCreateSelf()
} }
catch(Exception exc) catch(Exception exc)
{ {
Debug.WriteLine(exc); if (_context.ThrowErrors)
Debug.WriteLine(schema.Name + " already exists", "keyspace setup"); throw exc;
} }
} }


Expand All @@ -80,8 +81,8 @@ public void TryCreateColumnFamily(CassandraColumnFamilySchema schema)
} }
catch (Exception exc) catch (Exception exc)
{ {
Debug.WriteLine(exc); if (_context.ThrowErrors)
Debug.WriteLine(schema.FamilyName + " already exists", "column family setup"); throw exc;
} }
} }


Expand Down
72 changes: 48 additions & 24 deletions src/CassandraSession.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -9,36 +9,66 @@ namespace FluentCassandra
{ {
public class CassandraSession : IDisposable public class CassandraSession : IDisposable
{ {
[ThreadStatic] private IConnection _connection;
private static CassandraSession _current;


public static CassandraSession Current /// <summary>
{ ///
get { return _current; } /// </summary>
internal set { _current = value; } /// <param name="keyspace"></param>
} /// <param name="server"></param>
/// <param name="timeout"></param>
public CassandraSession(string keyspace, Server server, string username = null, string password = null)
: this(keyspace, server.Host, server.Port, server.Timeout, username, password) { }


private IConnection _connection; /// <summary>
///
/// </summary>
/// <param name="keyspace"></param>
/// <param name="host"></param>
/// <param name="port"></param>
/// <param name="timeout"></param>
/// <param name="provider"></param>
public CassandraSession(string keyspace, string host, int port = Server.DefaultPort, int timeout = Server.DefaultTimeout, string username = null, string password = null)
: this(new ConnectionBuilder(keyspace, host, port, timeout, username: username, password: password)) { }

/// <summary>
///
/// </summary>
/// <param name="connectionString"></param>
public CassandraSession(string connectionString)
: this(new ConnectionBuilder(connectionString)) { }


/// <summary>
///
/// </summary>
/// <param name="connectionBuilder"></param>
public CassandraSession(ConnectionBuilder connectionBuilder) public CassandraSession(ConnectionBuilder connectionBuilder)
: this(ConnectionProviderFactory.Get(connectionBuilder), connectionBuilder.ReadConsistency, connectionBuilder.WriteConsistency) { } : this(ConnectionProviderFactory.Get(connectionBuilder), connectionBuilder.ReadConsistency, connectionBuilder.WriteConsistency) { }


public CassandraSession(ConnectionBuilder connectionBuilder, ConsistencyLevel read, ConsistencyLevel write) /// <summary>
: this(ConnectionProviderFactory.Get(connectionBuilder), read, write) { } ///

/// </summary>
/// <param name="connectionProvider"></param>
/// <param name="read"></param>
/// <param name="write"></param>
public CassandraSession(IConnectionProvider connectionProvider, ConsistencyLevel read, ConsistencyLevel write) public CassandraSession(IConnectionProvider connectionProvider, ConsistencyLevel read, ConsistencyLevel write)
{ {
if (Current != null) if (connectionProvider == null)
throw new CassandraException("Cannot create a new session while there is one already active."); throw new ArgumentNullException("connectionProvider");


ConnectionBuilder = connectionProvider.ConnectionBuilder;
ConnectionProvider = connectionProvider; ConnectionProvider = connectionProvider;
ReadConsistency = read; ReadConsistency = read;
WriteConsistency = write; WriteConsistency = write;


IsAuthenticated = false; IsAuthenticated = false;
Current = this;
} }


/// <summary>
/// The connection builder that is currently in use for this session.
/// </summary>
public ConnectionBuilder ConnectionBuilder { get; private set; }

/// <summary> /// <summary>
/// ///
/// </summary> /// </summary>
Expand Down Expand Up @@ -66,7 +96,7 @@ public CassandraSession(IConnectionProvider connectionProvider, ConsistencyLevel
/// <returns></returns> /// <returns></returns>
internal CassandraClientWrapper GetClient(bool setKeyspace = true, bool? setCqlVersion = null) internal CassandraClientWrapper GetClient(bool setKeyspace = true, bool? setCqlVersion = null)
{ {
var builder = ConnectionProvider.Builder; var builder = ConnectionProvider.ConnectionBuilder;
setCqlVersion = setCqlVersion ?? (builder.CqlVersion != null); setCqlVersion = setCqlVersion ?? (builder.CqlVersion != null);


if (_connection == null) if (_connection == null)
Expand All @@ -92,7 +122,7 @@ internal CassandraClientWrapper GetClient(bool setKeyspace = true, bool? setCqlV
/// </summary> /// </summary>
public void Login() public void Login()
{ {
var builder = ConnectionProvider.Builder; var builder = ConnectionProvider.ConnectionBuilder;


if (String.IsNullOrWhiteSpace(builder.Username) || String.IsNullOrWhiteSpace(builder.Password)) if (String.IsNullOrWhiteSpace(builder.Username) || String.IsNullOrWhiteSpace(builder.Password))
throw new CassandraException("No username and/or password was set in the connection string, please use Login(username, password) method."); throw new CassandraException("No username and/or password was set in the connection string, please use Login(username, password) method.");
Expand Down Expand Up @@ -191,14 +221,8 @@ public void Dispose()
/// </param> /// </param>
protected virtual void Dispose(bool disposing) protected virtual void Dispose(bool disposing)
{ {
if (!WasDisposed && disposing) if (!WasDisposed && disposing && _connection != null)
{ ConnectionProvider.Close(_connection);
if (_connection != null)
ConnectionProvider.Close(_connection);

if (Current == this)
Current = null;
}


WasDisposed = true; WasDisposed = true;
} }
Expand Down
6 changes: 3 additions & 3 deletions src/Connections/ConnectionBuilder.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public class ConnectionBuilder
/// <param name="host"></param> /// <param name="host"></param>
/// <param name="port"></param> /// <param name="port"></param>
/// <param name="timeout"></param> /// <param name="timeout"></param>
public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = false, string username = null, string password = null) public ConnectionBuilder(string keyspace, string host, int port = Server.DefaultPort, int connectionTimeout = Server.DefaultTimeout, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
{ {
Keyspace = keyspace; Keyspace = keyspace;
Servers = new List<Server>() { new Server(host, port) }; Servers = new List<Server>() { new Server(host, port) };
Expand All @@ -35,7 +35,7 @@ public ConnectionBuilder(string keyspace, string host, int port = Server.Default
ConnectionString = GetConnectionString(); ConnectionString = GetConnectionString();
} }


public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = false, string username = null, string password = null) public ConnectionBuilder(string keyspace, Server server, bool pooling = false, int minPoolSize = 0, int maxPoolSize = 100, int connectionLifetime = 0, ConnectionType connectionType = ConnectionType.Framed, int bufferSize = 1024, ConsistencyLevel read = ConsistencyLevel.QUORUM, ConsistencyLevel write = ConsistencyLevel.QUORUM, string cqlVersion = FluentCassandra.Connections.CqlVersion.ServerDefault, bool compressCqlQueries = true, string username = null, string password = null)
{ {
Keyspace = keyspace; Keyspace = keyspace;
Servers = new List<Server>() { server }; Servers = new List<Server>() { server };
Expand Down Expand Up @@ -316,7 +316,7 @@ private void InitializeConnectionString(string connectionString)


if (!pairs.ContainsKey("Compress CQL Queries")) if (!pairs.ContainsKey("Compress CQL Queries"))
{ {
CompressCqlQueries = false; CompressCqlQueries = true;
} }
else else
{ {
Expand Down
Loading

0 comments on commit bfe118e

Please sign in to comment.