Skip to content

Commit

Permalink
Mapper insert not exits support CSHARP-251
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorge Bay Gondra committed Mar 17, 2015
1 parent 9a15215 commit 3191174
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 32 deletions.
66 changes: 64 additions & 2 deletions src/Cassandra.Tests/Mapping/InsertTests.cs
Expand Up @@ -70,7 +70,7 @@ public void Insert_Poco()
var sessionMock = new Mock<ISession>(MockBehavior.Strict);
sessionMock
.Setup(s => s.ExecuteAsync(It.IsAny<BoundStatement>()))
.Returns(TaskHelper.ToTask(new RowSet()))
.Returns(TestHelper.DelayedTask(new RowSet()))
.Verifiable();
sessionMock
.Setup(s => s.PrepareAsync(It.IsAny<string>()))
Expand Down Expand Up @@ -115,7 +115,7 @@ public void InsertAsync_FluentPoco()
.Verifiable();
sessionMock
.Setup(s => s.PrepareAsync(It.IsAny<string>()))
.Returns<string>(cql => TaskHelper.ToTask(GetPrepared(cql)))
.Returns<string>(cql => TestHelper.DelayedTask(GetPrepared(cql)))
.Verifiable();

// Insert the new user
Expand Down Expand Up @@ -190,5 +190,67 @@ public void Insert_Poco_Returns_WhenResponse_IsReceived()
Assert.True(rowsetReturned);
sessionMock.Verify();
}

[Test]
public void InsertIfNotExists_Poco_AppliedInfo_True_Test()
{
//Just a few props as it is just to test that it runs
var user = TestDataHelper.GetUserList().First();
var newUser = new InsertUser
{
Id = Guid.NewGuid(),
Name = user.Name
};
string query = null;
var sessionMock = new Mock<ISession>(MockBehavior.Strict);
sessionMock
.Setup(s => s.ExecuteAsync(It.IsAny<BoundStatement>()))
.Returns(TestHelper.DelayedTask(TestDataHelper.CreateMultipleValuesRowSet(new [] {"[applied]"}, new [] { true})))
.Callback<BoundStatement>(b => query = b.PreparedStatement.Cql)
.Verifiable();
sessionMock
.Setup(s => s.PrepareAsync(It.IsAny<string>()))
.Returns<string>(cql => TestHelper.DelayedTask(GetPrepared(cql)))
.Verifiable();
var mappingClient = GetMappingClient(sessionMock);
//Execute
var appliedInfo = mappingClient.InsertIfNotExists(newUser);
sessionMock.Verify();
StringAssert.StartsWith("INSERT INTO users (", query);
StringAssert.EndsWith(") IF NOT EXISTS", query);
Assert.True(appliedInfo.Applied);
}

[Test]
public void InsertIfNotExists_Poco_AppliedInfo_False_Test()
{
//Just a few props as it is just to test that it runs
var user = TestDataHelper.GetUserList().First();
var newUser = new InsertUser
{
Id = Guid.NewGuid(),
Name = user.Name
};
string query = null;
var sessionMock = new Mock<ISession>(MockBehavior.Strict);
sessionMock
.Setup(s => s.ExecuteAsync(It.IsAny<BoundStatement>()))
.Returns(TestHelper.DelayedTask(TestDataHelper.CreateMultipleValuesRowSet(new[] { "[applied]", "userid", "name" }, new object[] { false, newUser.Id, "existing-name"})))
.Callback<BoundStatement>(b => query = b.PreparedStatement.Cql)
.Verifiable();
sessionMock
.Setup(s => s.PrepareAsync(It.IsAny<string>()))
.Returns<string>(cql => TestHelper.DelayedTask(GetPrepared(cql)))
.Verifiable();
var mappingClient = GetMappingClient(sessionMock);
//Execute
var appliedInfo = mappingClient.InsertIfNotExists(newUser);
sessionMock.Verify();
StringAssert.StartsWith("INSERT INTO users (", query);
StringAssert.EndsWith(") IF NOT EXISTS", query);
Assert.False(appliedInfo.Applied);
Assert.AreEqual(newUser.Id, appliedInfo.Existing.Id);
Assert.AreEqual("existing-name", appliedInfo.Existing.Name);
}
}
}
12 changes: 11 additions & 1 deletion src/Cassandra.Tests/Mapping/TestData/TestDataHelper.cs
Expand Up @@ -68,7 +68,17 @@ public static RowSet CreateMultipleValuesRowSet<T>(string[] columnNames, T[] gen
for (var i = 0; i < columnNames.Length; i++)
{
IColumnInfo typeInfo;
var typeCode = TypeCodec.GetColumnTypeCodeInfo(typeof(T), out typeInfo);
var type = typeof (T);
if (type == typeof (Object))
{
//Try to guess by value
if (genericValues[i] == null)
{
throw new Exception("Test data could not be generated, value at index " + i + " could not be encoded");
}
type = genericValues[i].GetType();
}
var typeCode = TypeCodec.GetColumnTypeCodeInfo(type, out typeInfo);
rs.Columns[i] =
new CqlColumn { Name = columnNames[i], TypeCode = typeCode, TypeInfo = typeInfo, Type = typeof(T), Index = i };
}
Expand Down
2 changes: 1 addition & 1 deletion src/Cassandra.Tests/TestHelper.cs
Expand Up @@ -142,7 +142,7 @@ public static DateTimeOffset ToMillisecondPrecision(this DateTimeOffset dateTime
return dateTime.Value.ToMillisecondPrecision();
}

public static Task<T> DelayedTask<T>(T result, int dueTimeMs)
public static Task<T> DelayedTask<T>(T result, int dueTimeMs = 50)
{
var tcs = new TaskCompletionSource<T>();
var timer = new Timer(delegate(object self)
Expand Down
24 changes: 1 addition & 23 deletions src/Cassandra/Data/Linq/CqlConditionalCommand.cs
Expand Up @@ -48,33 +48,11 @@ public new Task<AppliedInfo<TEntity>> ExecuteAsync()
this.CopyQueryPropertiesTo(stmt);
return session
.ExecuteAsync(stmt)
.Continue(t => AdaptRowSet(cql, t.Result));
.Continue(t => AppliedInfo<TEntity>.FromRowSet(_mapperFactory, cql, t.Result));
})
.Unwrap();
}

/// <summary>
/// Adapts a LWT RowSet to an AppliedInfo
/// </summary>
private AppliedInfo<TEntity> AdaptRowSet(string cql, RowSet rs)
{
var row = rs.FirstOrDefault();
const string appliedColumn = "[applied]";
if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue<bool>(appliedColumn))
{
//The change was applied correctly
return new AppliedInfo<TEntity>(true);
}
if (rs.Columns.Length == 1)
{
//There isn't more information on why it was not applied
return new AppliedInfo<TEntity>(false);
}
//It was not applied, map the information returned
var mapper = _mapperFactory.GetMapper<TEntity>(cql, rs);
return new AppliedInfo<TEntity>(mapper(row));
}

/// <summary>
/// Executes a conditional query and returns information whether it was applied.
/// </summary>
Expand Down
22 changes: 22 additions & 0 deletions src/Cassandra/Mapping/AppliedInfo.cs
Expand Up @@ -37,5 +37,27 @@ public AppliedInfo(T existing)
Applied = false;
Existing = existing;
}

/// <summary>
/// Adapts a LWT RowSet and returns a new AppliedInfo
/// </summary>
internal static AppliedInfo<T> FromRowSet(MapperFactory mapperFactory, string cql, RowSet rs)
{
var row = rs.FirstOrDefault();
const string appliedColumn = "[applied]";
if (row == null || row.GetColumn(appliedColumn) == null || row.GetValue<bool>(appliedColumn))
{
//The change was applied correctly
return new AppliedInfo<T>(true);
}
if (rs.Columns.Length == 1)
{
//There isn't more information on why it was not applied
return new AppliedInfo<T>(false);
}
//It was not applied, map the information returned
var mapper = mapperFactory.GetMapper<T>(cql, rs);
return new AppliedInfo<T>(mapper(row));
}
}
}
8 changes: 8 additions & 0 deletions src/Cassandra/Mapping/CqlBatch.cs
Expand Up @@ -95,5 +95,13 @@ public void Execute(Cql cql)
{
return _mapperFactory.TypeConverter.ConvertCqlArgument<TValue, TDatabase>(value);
}

/// <summary>
/// Not supported for batches
/// </summary>
public AppliedInfo<T> InsertIfNotExists<T>(T poco, CqlQueryOptions queryOptions = null)
{
throw new NotSupportedException("It is not possible to include lightweight transactions in batches");
}
}
}
6 changes: 6 additions & 0 deletions src/Cassandra/Mapping/ICqlWriteAsyncClient.cs
Expand Up @@ -12,6 +12,12 @@ public interface ICqlWriteAsyncClient
/// </summary>
Task InsertAsync<T>(T poco, CqlQueryOptions queryOptions = null);

/// <summary>
/// Inserts the specified POCO in Cassandra, if not exists.
/// It returns information regarding it was applied or not.
/// </summary>
Task<AppliedInfo<T>> InsertIfNotExistsAsync<T>(T poco, CqlQueryOptions queryOptions = null);

/// <summary>
/// Updates the POCO specified in Cassandra.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Cassandra/Mapping/ICqlWriteClient.cs
Expand Up @@ -10,6 +10,12 @@ public interface ICqlWriteClient
/// </summary>
void Insert<T>(T poco, CqlQueryOptions queryOptions = null);

/// <summary>
/// Inserts the specified POCO in Cassandra, if not exists.
/// It returns information regarding it was applied or not.
/// </summary>
AppliedInfo<T> InsertIfNotExists<T>(T poco, CqlQueryOptions queryOptions = null);

/// <summary>
/// Updates the POCO specified in Cassandra.
/// </summary>
Expand Down
28 changes: 25 additions & 3 deletions src/Cassandra/Mapping/Mapper.cs
Expand Up @@ -8,9 +8,11 @@
namespace Cassandra.Mapping
{
/// <summary>
/// The default CQL client implementation which uses the DataStax driver ISession provided in the constructor
/// The default CQL client implementation which uses the DataStax driver <see cref="ISession"/> provided in the constructor
/// for running queries against a Cassandra cluster.
/// </summary>
/// <seealso cref="IMapper"/>
/// <inheritdoc />
public class Mapper : IMapper
{
private readonly ISession _session;
Expand Down Expand Up @@ -60,7 +62,6 @@ internal Mapper(ISession session, MapperFactory mapperFactory, StatementFactory
/// </summary>
private Task<TResult> ExecuteAsyncAndAdapt<T, TResult>(Cql cql, Func<Statement, RowSet, TResult> adaptation)
{
_cqlGenerator.AddSelect<T>(cql);
return _statementFactory
.GetStatementAsync(_session, cql)
.Continue(t1 =>
Expand All @@ -84,6 +85,7 @@ public Task<IEnumerable<T>> FetchAsync<T>(string cql, params object[] args)
public Task<IEnumerable<T>> FetchAsync<T>(Cql cql)
{
//Use ExecuteAsyncAndAdapt with a delegate to handle the adaptation from RowSet to IEnumerable<T>
_cqlGenerator.AddSelect<T>(cql);
return ExecuteAsyncAndAdapt<T, IEnumerable<T>>(cql, (s, rs) =>
{
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
Expand All @@ -98,6 +100,7 @@ public Task<IPage<T>> FetchPageAsync<T>(Cql cql)
throw new ArgumentNullException("cql");
}
cql.AutoPage = true;
_cqlGenerator.AddSelect<T>(cql);
return ExecuteAsyncAndAdapt<T, IPage<T>>(cql, (stmt, rs) =>
{
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
Expand All @@ -122,6 +125,7 @@ public Task<T> SingleAsync<T>(string cql, params object[] args)

public Task<T> SingleAsync<T>(Cql cql)
{
_cqlGenerator.AddSelect<T>(cql);
return ExecuteAsyncAndAdapt<T, T>(cql, (s, rs) =>
{
var mapper = _mapperFactory.GetMapper<T>(cql.Statement, rs);
Expand All @@ -136,6 +140,7 @@ public Task<T> SingleOrDefaultAsync<T>(string cql, params object[] args)

public Task<T> SingleOrDefaultAsync<T>(Cql cql)
{
_cqlGenerator.AddSelect<T>(cql);
return ExecuteAsyncAndAdapt<T, T>(cql, (s, rs) =>
{
var row = rs.SingleOrDefault();
Expand All @@ -156,6 +161,7 @@ public Task<T> FirstAsync<T>(string cql, params object[] args)

public Task<T> FirstAsync<T>(Cql cql)
{
_cqlGenerator.AddSelect<T>(cql);
return ExecuteAsyncAndAdapt<T, T>(cql, (s, rs) =>
{
var row = rs.First();
Expand All @@ -172,7 +178,7 @@ public Task<T> FirstOrDefaultAsync<T>(string cql, params object[] args)

public Task<T> FirstOrDefaultAsync<T>(Cql cql)
{

_cqlGenerator.AddSelect<T>(cql);
return ExecuteAsyncAndAdapt<T, T>(cql, (s, rs) =>
{
var row = rs.FirstOrDefault();
Expand All @@ -196,6 +202,17 @@ public Task InsertAsync<T>(T poco, CqlQueryOptions queryOptions = null)
return ExecuteAsync(Cql.New(cql, values, queryOptions ?? CqlQueryOptions.None));
}

public Task<AppliedInfo<T>> InsertIfNotExistsAsync<T>(T poco, CqlQueryOptions queryOptions = null)
{
// Get statement and bind values from POCO
var cql = _cqlGenerator.GenerateInsert<T>(true);
var getBindValues = _mapperFactory.GetValueCollector<T>(cql);
var values = getBindValues(poco);
return ExecuteAsyncAndAdapt<T, AppliedInfo<T>>(
Cql.New(cql, values, queryOptions ?? CqlQueryOptions.None),
(stmt, rs) => AppliedInfo<T>.FromRowSet(_mapperFactory, cql, rs));
}

public Task UpdateAsync<T>(T poco, CqlQueryOptions queryOptions = null)
{
// Get statement and bind values from POCO
Expand Down Expand Up @@ -387,6 +404,11 @@ public void Insert<T>(T poco, CqlQueryOptions queryOptions = null)
TaskHelper.WaitToComplete(InsertAsync(poco, queryOptions), _queryAbortTimeout);
}

public AppliedInfo<T> InsertIfNotExists<T>(T poco, CqlQueryOptions queryOptions = null)
{
return TaskHelper.WaitToComplete(InsertIfNotExistsAsync(poco, queryOptions), _queryAbortTimeout);
}

public void Update<T>(T poco, CqlQueryOptions queryOptions = null)
{
//Wait async method to be completed or throw
Expand Down
4 changes: 2 additions & 2 deletions src/Cassandra/Mapping/Statements/CqlGenerator.cs
Expand Up @@ -76,7 +76,7 @@ private static string Escape(string identifier, PocoData pocoData)
/// <summary>
/// Generates an "INSERT INTO tablename (columns) VALUES (?)" statement for a POCO of Type T.
/// </summary>
public string GenerateInsert<T>()
public string GenerateInsert<T>(bool ifNotExists = false)
{
var pocoData = _pocoDataFactory.GetPocoData<T>();

Expand All @@ -85,7 +85,7 @@ public string GenerateInsert<T>()

var columns = pocoData.Columns.Select(Escape(pocoData)).ToCommaDelimitedString();
var placeholders = Enumerable.Repeat("?", pocoData.Columns.Count).ToCommaDelimitedString();
return string.Format("INSERT INTO {0} ({1}) VALUES ({2})", Escape(pocoData.TableName, pocoData), columns, placeholders);
return string.Format("INSERT INTO {0} ({1}) VALUES ({2}){3}", Escape(pocoData.TableName, pocoData), columns, placeholders, ifNotExists ? " IF NOT EXISTS" : null);
}

/// <summary>
Expand Down

0 comments on commit 3191174

Please sign in to comment.