Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added support for atomic batch operations through API for #95

  • Loading branch information...
commit 00d167725466e28f168bc00770728ffe61e7624e 1 parent bfa48ac
@nberardi nberardi authored
View
17 src/CassandraContext.cs
@@ -241,9 +241,9 @@ public string DescribeClusterName()
}));
}
- public string DescribeVersion()
+ public Version DescribeVersion()
{
- return ExecuteOperation(new SimpleOperation<string>(ctx => {
+ return ExecuteOperation(new SimpleOperation<Version>(ctx => {
return ctx.Session.GetClient(setKeyspace: false).describe_version();
}));
}
@@ -281,7 +281,9 @@ public void Attach(IFluentRecord record)
/// <summary>
/// Saves the pending changes.
/// </summary>
- public void SaveChanges()
+ /// <param name="atomic">in the database sense that if any part of the batch succeeds, all of it will. No other guarantees are implied; in particular, there is no isolation; other clients will be able to read the first updated rows from the batch, while others are in progress</param>
+ /// <seealso href="http://www.datastax.com/dev/blog/atomic-batches-in-cassandra-1-2"/>
+ public void SaveChanges(bool atomic = true)
@eplowe
eplowe added a note

Just a suggestion: Should this really be defaulted to true? I think we should let the user explicitly tell us that they want a transaction vs. just doing it outright. I know that atomic batches are now the default when you issue a BEGIN BATCH / APPLY BATCH CQL command -- but this is mostly for people who are using FluentCassandra already and have a certain expectation of performance with their apps as atomic batches, per datastax, incur a ~30% performance penalty.

@nberardi Owner
nberardi added a note

Keeping in line with the server, and it is on default in CQL. Plus I like to design frameworks as safe by default, so that new developers, don't inadvertently fall in to traps they shouldn't.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
{
lock (_trackers)
{
@@ -290,7 +292,7 @@ public void SaveChanges()
foreach (var tracker in _trackers)
mutations.AddRange(tracker.GetMutations());
- var op = new BatchMutate(mutations);
+ var op = new BatchMutate(mutations, atomic);
ExecuteOperation(op);
foreach (var tracker in _trackers)
@@ -304,11 +306,14 @@ public void SaveChanges()
///
/// </summary>
/// <param name="record"></param>
- public void SaveChanges(IFluentRecord record)
+ /// <param name="atomic">in the database sense that if any part of the batch succeeds, all of it will. No other guarantees are implied; in particular, there is no isolation; other clients will be able to read the first updated rows from the batch, while others are in progress</param>
+ /// <seealso href="http://www.datastax.com/dev/blog/atomic-batches-in-cassandra-1-2"/>
+ public void SaveChanges(IFluentRecord record, bool atomic = true)
{
var tracker = record.MutationTracker;
var mutations = tracker.GetMutations();
- var op = new BatchMutate(mutations);
+
+ var op = new BatchMutate(mutations, atomic);
ExecuteOperation(op);
tracker.Clear();
View
1  src/FluentCassandra.csproj
@@ -215,6 +215,7 @@
<Compile Include="Operations\QueryableColumnFamilyOperation.cs" />
<Compile Include="Operations\CassandraRangeSlicePredicate.cs" />
<Compile Include="Operations\Remove.cs" />
+ <Compile Include="Operations\RpcApiVersion.cs" />
<Compile Include="Operations\SimpleOperation.cs" />
<Compile Include="Operations\Void.cs">
<SubType>Code</SubType>
View
2  src/FluentCassandra.nuspec
@@ -2,7 +2,7 @@
<package>
<metadata>
<id>FluentCassandra</id>
- <version>1.1.10</version>
+ <version>1.2.0</version>
<title>Fluent Cassandra</title>
<authors>Nick Berardi</authors>
<owners>Managed Fusion, LLC</owners>
View
35 src/Operations/BatchMutate.cs
@@ -7,11 +7,8 @@ namespace FluentCassandra.Operations
{
public class BatchMutate : Operation<Void>
{
- /*
- * batch_mutate(keyspace, mutation_map, consistency_level)
- */
-
public IEnumerable<FluentMutation> Mutations { get; private set; }
+ public bool Atomic { get; private set; }
public override Void Execute()
{
@@ -20,12 +17,10 @@ public override Void Execute()
var mutationMap = new Dictionary<byte[], Dictionary<string, List<Mutation>>>();
- foreach (var key in Mutations.GroupBy(x => x.Column.Family.Key))
- {
+ foreach (var key in Mutations.GroupBy(x => x.Column.Family.Key)) {
var keyMutations = new Dictionary<string, List<Mutation>>();
- foreach (var columnFamily in key.GroupBy(x => x.Column.Family.FamilyName))
- {
+ foreach (var columnFamily in key.GroupBy(x => x.Column.Family.FamilyName)) {
var columnFamilyMutations = columnFamily
.Where(m => m.Type == MutationType.Added || m.Type == MutationType.Changed)
.Select(m => Helper.CreateInsertedOrChangedMutation(m))
@@ -49,18 +44,30 @@ public override Void Execute()
mutationMap.Add(key.Key.TryToBigEndian(), keyMutations);
}
- // Dictionary<string : key, Dicationary<string : columnFamily, List<Mutation>>>
- Session.GetClient().batch_mutate(
- mutationMap,
- Session.WriteConsistency
- );
+ var client = Session.GetClient();
+
+ if (Atomic && client.describe_version() >= RpcApiVersion.Cassandra120)
+ client.atomic_batch_mutate(
+ mutationMap,
+ Session.WriteConsistency);
+ else
+ client.batch_mutate(
+ mutationMap,
+ Session.WriteConsistency);
return new Void();
}
- public BatchMutate(IEnumerable<FluentMutation> mutations)
+ /// <summary>
+ ///
+ /// </summary>
+ /// <param name="mutations"></param>
+ /// <param name="atomic">in the database sense that if any part of the batch succeeds, all of it will. No other guarantees are implied; in particular, there is no isolation; other clients will be able to read the first updated rows from the batch, while others are in progress</param>
+ /// <seealso href="http://www.datastax.com/dev/blog/atomic-batches-in-cassandra-1-2"/>
+ public BatchMutate(IEnumerable<FluentMutation> mutations, bool atomic)
{
Mutations = mutations;
+ Atomic = atomic;
}
}
}
View
4 src/Operations/CassandraClientWrapper.cs
@@ -147,9 +147,9 @@ public string describe_cluster_name()
return _client.describe_cluster_name();
}
- public string describe_version()
+ public Version describe_version()
{
- return _client.describe_version();
+ return new Version(_client.describe_version());
}
public List<Apache.Cassandra.TokenRange> describe_ring(string keyspace)
View
9 src/Operations/RpcApiVersion.cs
@@ -0,0 +1,9 @@
+using System;
+
+namespace FluentCassandra.Operations
+{
+ public static class RpcApiVersion
+ {
+ public static readonly Version Cassandra120 = new Version("19.35.0");
+ }
+}

0 comments on commit 00d1677

Please sign in to comment.
Something went wrong with that request. Please try again.