Skip to content
This repository has been archived by the owner on May 25, 2021. It is now read-only.

Added support for MultiGetSlice for Counter columns and super columns #101

Merged
merged 4 commits into from Jan 6, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Operations/MultiGetColumnFamilySlice.cs
Expand Up @@ -36,7 +36,9 @@ public override IEnumerable<FluentColumnFamily> Execute()
var key = CassandraObject.GetCassandraObjectFromDatabaseByteArray(result.Key, schema.KeyValueType); var key = CassandraObject.GetCassandraObjectFromDatabaseByteArray(result.Key, schema.KeyValueType);


var r = new FluentColumnFamily(key, ColumnFamily.FamilyName, schema, result.Value.Select(col => { var r = new FluentColumnFamily(key, ColumnFamily.FamilyName, schema, result.Value.Select(col => {
return Helper.ConvertColumnToFluentColumn(col.Column, schema); if(col.Counter_column != null)
return Helper.ConvertColumnToFluentCounterColumn(col.Counter_column, schema);
return Helper.ConvertColumnToFluentColumn(col.Column, schema);
})); }));
ColumnFamily.Context.Attach(r); ColumnFamily.Context.Attach(r);
r.MutationTracker.Clear(); r.MutationTracker.Clear();
Expand Down
7 changes: 6 additions & 1 deletion src/Operations/MultiGetSuperColumnFamilySlice.cs
Expand Up @@ -53,7 +53,12 @@ public override IEnumerable<FluentSuperColumnFamily> Execute()
} }
else else
{ {
superColumns = result.Value.Select(col => Helper.ConvertSuperColumnToFluentSuperColumn(col.Super_column, schema)); superColumns = result.Value.Select(col =>
{
if (col.Counter_super_column != null)
return Helper.ConvertSuperColumnToFluentCounterSuperColumn(col.Counter_super_column, schema);
return Helper.ConvertSuperColumnToFluentSuperColumn(col.Super_column, schema);
});
} }


var familyName = ColumnFamily.FamilyName; var familyName = ColumnFamily.FamilyName;
Expand Down
35 changes: 31 additions & 4 deletions test/FluentCassandra.Tests/CassandraDatabaseSetup.cs
Expand Up @@ -13,8 +13,11 @@ public class CassandraDatabaseSetup
public CassandraContext DB; public CassandraContext DB;


public CassandraColumnFamily<AsciiType> Family; public CassandraColumnFamily<AsciiType> Family;

public CassandraSuperColumnFamily<AsciiType, AsciiType> SuperFamily; public CassandraSuperColumnFamily<AsciiType, AsciiType> SuperFamily;

public CassandraColumnFamily UserFamily; public CassandraColumnFamily UserFamily;
public CassandraColumnFamily CounterFamily;


public User[] Users = new[] { public User[] Users = new[] {
new User { Id = 1, Name = "Darren Gemmell", Email = "darren@somewhere.com", Age = 32 }, new User { Id = 1, Name = "Darren Gemmell", Email = "darren@somewhere.com", Age = 32 },
Expand Down Expand Up @@ -51,6 +54,7 @@ public CassandraDatabaseSetup(bool reset = false)
Family = DB.GetColumnFamily<AsciiType>("Standard"); Family = DB.GetColumnFamily<AsciiType>("Standard");
SuperFamily = DB.GetColumnFamily<AsciiType, AsciiType>("Super"); SuperFamily = DB.GetColumnFamily<AsciiType, AsciiType>("Super");
UserFamily = DB.GetColumnFamily("Users"); UserFamily = DB.GetColumnFamily("Users");
CounterFamily = DB.GetColumnFamily("Counters");


if (exists && !reset) if (exists && !reset)
return; return;
Expand Down Expand Up @@ -82,6 +86,12 @@ public void ResetDatabase()
keyspace.TryCreateColumnFamily<TimeUUIDType>("StandardTimeUUIDType"); keyspace.TryCreateColumnFamily<TimeUUIDType>("StandardTimeUUIDType");
keyspace.TryCreateColumnFamily<UTF8Type>("StandardUTF8Type"); keyspace.TryCreateColumnFamily<UTF8Type>("StandardUTF8Type");
keyspace.TryCreateColumnFamily<UUIDType>("StandardUUIDType"); keyspace.TryCreateColumnFamily<UUIDType>("StandardUUIDType");
keyspace.TryCreateColumnFamily(new CassandraColumnFamilySchema()
{
FamilyName = "Counters",
ColumnNameType = CassandraType.AsciiType,
DefaultColumnValueType = CassandraType.CounterColumnType
});
keyspace.TryCreateColumnFamily(new CassandraColumnFamilySchema { keyspace.TryCreateColumnFamily(new CassandraColumnFamilySchema {
FamilyName = "StandardDecimalType", FamilyName = "StandardDecimalType",
ColumnNameType = CassandraType.DecimalType ColumnNameType = CassandraType.DecimalType
Expand All @@ -95,23 +105,25 @@ public void ResetDatabase()
ColumnNameType = CassandraType.DynamicCompositeType(new Dictionary<char, CassandraType> { { 'a', CassandraType.AsciiType }, { 'd', CassandraType.DoubleType } }) ColumnNameType = CassandraType.DynamicCompositeType(new Dictionary<char, CassandraType> { { 'a', CassandraType.AsciiType }, { 'd', CassandraType.DoubleType } })
}); });


db.ExecuteNonQuery(@" db.ExecuteNonQuery(@"
CREATE COLUMNFAMILY Users ( CREATE COLUMNFAMILY Users (
Id int PRIMARY KEY, Id int PRIMARY KEY,
Name ascii, Name ascii,
Email ascii, Email ascii,
Age int Age int
);"); );");
db.ExecuteNonQuery(@"CREATE INDEX User_Age ON Users (Age);"); db.ExecuteNonQuery(@"CREATE INDEX User_Age ON Users (Age);");
db.Keyspace.ClearCachedKeyspaceSchema(); db.Keyspace.ClearCachedKeyspaceSchema();


var family = db.GetColumnFamily<AsciiType>("Standard"); var family = db.GetColumnFamily<AsciiType>("Standard");
var superFamily = db.GetColumnFamily<AsciiType, AsciiType>("Super"); var superFamily = db.GetColumnFamily<AsciiType, AsciiType>("Super");
var userFamily = db.GetColumnFamily("Users"); var userFamily = db.GetColumnFamily("Users");
var counterFamily = db.GetColumnFamily("Counter");


ResetFamily(family); ResetFamily(family);
ResetSuperFamily(superFamily); ResetSuperFamily(superFamily);
ResetUsersFamily(userFamily); ResetUsersFamily(userFamily);
ResetCounterColumnFamily(counterFamily);
} }
} }


Expand Down Expand Up @@ -164,5 +176,20 @@ public void ResetSuperFamily(CassandraSuperColumnFamily superFamily = null)
superFamily.InsertColumn(TestKey2, TestSuperName, "Test2", Math.PI); superFamily.InsertColumn(TestKey2, TestSuperName, "Test2", Math.PI);
superFamily.InsertColumn(TestKey2, TestSuperName, "Test3", Math.PI); superFamily.InsertColumn(TestKey2, TestSuperName, "Test3", Math.PI);
} }

public void ResetCounterColumnFamily(CassandraColumnFamily counterFamily = null)
{
counterFamily = counterFamily ?? CounterFamily;

counterFamily.RemoveAllRows();

counterFamily.InsertCounterColumn(TestKey1, "Test1", 1);
counterFamily.InsertCounterColumn(TestKey1, "Test2", 2);
counterFamily.InsertCounterColumn(TestKey1, "Test3", 3);

counterFamily.InsertCounterColumn(TestKey2, "Test1", 2);
counterFamily.InsertCounterColumn(TestKey2, "Test2", 4);
counterFamily.InsertCounterColumn(TestKey2, "Test3", 6);
}
} }
} }
19 changes: 19 additions & 0 deletions test/FluentCassandra.Tests/Operations/MultiGetSliceTest.cs
Expand Up @@ -10,6 +10,8 @@ public class MultiGetSliceTest : IUseFixture<CassandraDatabaseSetupFixture>, IDi
{ {
private CassandraContext _db; private CassandraContext _db;
private CassandraColumnFamily<AsciiType> _family; private CassandraColumnFamily<AsciiType> _family;
private CassandraColumnFamily _counterFamily;

private CassandraSuperColumnFamily<AsciiType, AsciiType> _superFamily; private CassandraSuperColumnFamily<AsciiType, AsciiType> _superFamily;


public void SetFixture(CassandraDatabaseSetupFixture data) public void SetFixture(CassandraDatabaseSetupFixture data)
Expand All @@ -18,6 +20,7 @@ public void SetFixture(CassandraDatabaseSetupFixture data)
_db = setup.DB; _db = setup.DB;
_family = setup.Family; _family = setup.Family;
_superFamily = setup.SuperFamily; _superFamily = setup.SuperFamily;
_counterFamily = setup.CounterFamily;
} }


public void Dispose() public void Dispose()
Expand Down Expand Up @@ -46,6 +49,22 @@ public void Standard_GetSlice_Columns()
Assert.Equal(expectedCount, columns.Count()); Assert.Equal(expectedCount, columns.Count());
} }


[Fact]
public void Counter_GetSlice_Columns()
{
// arrange
int expectedCount = 2;

// act
var columns = _counterFamily
.Get(new BytesType[] { _testKey, _testKey2 })
.FetchColumns(new AsciiType[] { "Test1", "Test2" })
.Execute();

// assert
Assert.Equal(expectedCount, columns.Count());
}

[Fact] [Fact]
public void Super_GetSlice_Columns() public void Super_GetSlice_Columns()
{ {
Expand Down