Skip to content

Commit

Permalink
composite column
Browse files Browse the repository at this point in the history
  • Loading branch information
Shimi committed Jan 9, 2012
1 parent ba882c2 commit 067224e
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.shorrockin.cascal.serialization

import java.nio.ByteBuffer
import java.util.{Date, UUID}

object TupleSerializer {

def extractType[T](bytes: ByteBuffer, mf: Manifest[T]): T = {
val length = (bytes.get() & 0xFF) << 8 | (bytes.get() & 0xFF)
val typeBuffer = bytes.duplicate
typeBuffer.limit(typeBuffer.position + length)

bytes.position(typeBuffer.position + length + 1)

val ser = Serializer.Default(mf.erasure)
ser.fromByteBuffer(typeBuffer).asInstanceOf[T]
}

def byteBuffer[T](value: T)(implicit mf: Manifest[T]): ByteBuffer = {
value match {
case x: String if mf.erasure == classOf[String] => StringSerializer.toByteBuffer(x)
case x: UUID if mf.erasure == classOf[UUID] => UUIDSerializer.toByteBuffer(x)
case x: Int if mf.erasure == classOf[Int] => IntSerializer.toByteBuffer(x)
case x: Long if mf.erasure == classOf[Long] => LongSerializer.toByteBuffer(x)
case x: Boolean if mf.erasure == classOf[Boolean] => BooleanSerializer.toByteBuffer(x)
case x: Float if mf.erasure == classOf[Float] => FloatSerializer.toByteBuffer(x)
case x: Double if mf.erasure == classOf[Double] => DoubleSerializer.toByteBuffer(x)
case x: Date if mf.erasure == classOf[Date] => DateSerializer.toByteBuffer(x)
case None => ByteBuffer.allocate(0)
}
}
}

class CompositeBuffer(val buffers: ByteBuffer*) {

val lengthBytesSize = 2
val endOfComponentSize = 1
val compositeOverheadSize = lengthBytesSize + endOfComponentSize

def buffer(): ByteBuffer = {
val buffersSize = buffers.foldLeft(0){(sum, buffer) => sum + buffer.remaining}
val requiredSize = buffersSize + buffers.size * compositeOverheadSize
val buffer = ByteBuffer.allocate(requiredSize)

buffers foreach {buff =>
buffer.putShort(buff.remaining.asInstanceOf[Short]).put(buff).put(0.toByte)
}
buffer.rewind
buffer
}
}

object Tuple2Serializer {
import TupleSerializer._

def toByteBuffer[T1: Manifest, T2: Manifest](tuple: Tuple2[T1, T2]): ByteBuffer = {
val buffer = new CompositeBuffer(byteBuffer(tuple._1), byteBuffer(tuple._2))
buffer.buffer
}

def fromByteBuffer[T1, T2](bytes:ByteBuffer, mf1: Manifest[T1], mf2: Manifest[T2]): Tuple2[T1, T2] = {
(extractType(bytes, mf1), extractType(bytes, mf2))
}
}

object Tuple3Serializer {
import TupleSerializer._

def toByteBuffer[T1: Manifest, T2: Manifest, T3: Manifest](tuple: Tuple3[T1, T2, T3]): ByteBuffer = {
val buffer = new CompositeBuffer(byteBuffer(tuple._1), byteBuffer(tuple._2), byteBuffer(tuple._3))
buffer.buffer
}

def fromByteBuffer[T1, T2, T3](bytes:ByteBuffer, mf1: Manifest[T1], mf2: Manifest[T2], mf3: Manifest[T3]): Tuple3[T1, T2, T3] = {
(extractType(bytes, mf1), extractType(bytes, mf2), extractType(bytes, mf3))
}
}

object Tuple4Serializer {
import TupleSerializer._

def toByteBuffer[T1: Manifest, T2: Manifest, T3: Manifest, T4: Manifest](tuple: Tuple4[T1, T2, T3, T4]): ByteBuffer = {
val buffer = new CompositeBuffer(byteBuffer(tuple._1), byteBuffer(tuple._2), byteBuffer(tuple._3), byteBuffer(tuple._4))
buffer.buffer
}

def fromByteBuffer[T1, T2, T3, T4](bytes:ByteBuffer, mf1: Manifest[T1], mf2: Manifest[T2], mf3: Manifest[T3], mf4: Manifest[T4]): Tuple4[T1, T2, T3, T4] = {
(extractType(bytes, mf1), extractType(bytes, mf2), extractType(bytes, mf3), extractType(bytes, mf4))
}
}

object Tuple5Serializer {
import TupleSerializer._

def toByteBuffer[T1: Manifest, T2: Manifest, T3: Manifest, T4: Manifest, T5: Manifest](tuple: Tuple5[T1, T2, T3, T4, T5]): ByteBuffer = {
val buffer = new CompositeBuffer(byteBuffer(tuple._1), byteBuffer(tuple._2), byteBuffer(tuple._3), byteBuffer(tuple._4), byteBuffer(tuple._5))
buffer.buffer
}

def fromByteBuffer[T1, T2, T3, T4, T5](bytes:ByteBuffer, mf1: Manifest[T1], mf2: Manifest[T2], mf3: Manifest[T3], mf4: Manifest[T4], mf5: Manifest[T5]): Tuple5[T1, T2, T3, T4, T5] = {
(extractType(bytes, mf1), extractType(bytes, mf2), extractType(bytes, mf3), extractType(bytes, mf4), extractType(bytes, mf5))
}
}
12 changes: 12 additions & 0 deletions src/main/scala/com/shorrockin/cascal/utils/Conversions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,16 @@ object Conversions {

implicit def toSeqBytes(values:Seq[String]) = values.map { (s) => Conversions.byteBuffer(s) }
implicit def toJavaList[T](l: Seq[T]):java.util.List[T] = l.foldLeft(new java.util.ArrayList[T](l.size)){(al, e) => al.add(e); al}

implicit def byteBuffer[T1: Manifest, T2: Manifest](tuple: Tuple2[T1, T2]):ByteBuffer = Tuple2Serializer.toByteBuffer(tuple)
implicit def tuple[T1, T2](bytes:ByteBuffer)(implicit mf1: Manifest[T1], mf2: Manifest[T2]):Tuple2[T1, T2] = Tuple2Serializer.fromByteBuffer[T1, T2](bytes, mf1, mf2)

implicit def byteBuffer[T1: Manifest, T2: Manifest, T3: Manifest](tuple: Tuple3[T1, T2, T3]):ByteBuffer = Tuple3Serializer.toByteBuffer(tuple)
implicit def tuple[T1, T2, T3](bytes:ByteBuffer)(implicit mf1: Manifest[T1], mf2: Manifest[T2], mf3: Manifest[T3]):Tuple3[T1, T2, T3] = Tuple3Serializer.fromByteBuffer[T1, T2, T3](bytes, mf1, mf2, mf3)

implicit def byteBuffer[T1: Manifest, T2: Manifest, T3: Manifest, T4: Manifest](tuple: Tuple4[T1, T2, T3, T4]):ByteBuffer = Tuple4Serializer.toByteBuffer(tuple)
implicit def tuple[T1, T2, T3, T4](bytes:ByteBuffer)(implicit mf1: Manifest[T1], mf2: Manifest[T2], mf3: Manifest[T3], mf4: Manifest[T4]):Tuple4[T1, T2, T3, T4] = Tuple4Serializer.fromByteBuffer[T1, T2, T3, T4](bytes, mf1, mf2, mf3, mf4)

implicit def byteBuffer[T1: Manifest, T2: Manifest, T3: Manifest, T4: Manifest, T5: Manifest](tuple: Tuple5[T1, T2, T3, T4, T5]):ByteBuffer = Tuple5Serializer.toByteBuffer(tuple)
implicit def tuple[T1, T2, T3, T4, T5](bytes:ByteBuffer)(implicit mf1: Manifest[T1], mf2: Manifest[T2], mf3: Manifest[T3], mf4: Manifest[T4], mf5: Manifest[T5]):Tuple5[T1, T2, T3, T4, T5] = Tuple5Serializer.fromByteBuffer[T1, T2, T3, T4, T5](bytes, mf1, mf2, mf3, mf4, mf5)
}
4 changes: 3 additions & 1 deletion src/test/scala/com/shorrockin/cascal/CascalSchema.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.shorrockin.cascal

import scala.collection.JavaConversions._
import org.apache.cassandra.config.{CFMetaData, DatabaseDescriptor, KSMetaData}
import org.apache.cassandra.db.marshal.{AbstractType, BytesType, CounterColumnType, TimeUUIDType}
import org.apache.cassandra.db.marshal.{AbstractType, BytesType, CompositeType, CounterColumnType, IntegerType, TimeUUIDType}
import org.apache.cassandra.db.ColumnFamilyType
import org.apache.cassandra.locator.SimpleStrategy
import java.nio.ByteBuffer
Expand Down Expand Up @@ -31,5 +31,7 @@ trait CascalSchema extends Schema {
cfMetaData("SuperBytes", ColumnFamilyType.Super, BytesType.instance),
cfMetaData("StandardCounter", ColumnFamilyType.Standard, BytesType.instance).replicateOnWrite(true).defaultValidator(CounterColumnType.instance),
cfMetaData("SuperCounter", ColumnFamilyType.Super, BytesType.instance).replicateOnWrite(true).defaultValidator(CounterColumnType.instance),
cfMetaData("Composite2", ColumnFamilyType.Standard, CompositeType.getInstance(List(BytesType.instance, IntegerType.instance))),
cfMetaData("Composite3", ColumnFamilyType.Standard, CompositeType.getInstance(List(BytesType.instance, LongType.instance, BytesType.instance))),
standardIndexedCf)
}
78 changes: 78 additions & 0 deletions src/test/scala/com/shorrockin/cascal/CompositeTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.shorrockin.cascal

import org.junit.{Assert, Test}
import com.shorrockin.cascal.utils.Conversions._
import com.shorrockin.cascal.serialization.TupleSerializer
import Assert._
import com.shorrockin.cascal.session.Insert
import com.shorrockin.cascal.session.RangePredicate
import com.shorrockin.cascal.session.Order

class CompositeTest extends EmbeddedCassandra {

@Test def composite2InsertGet = borrow { session =>
val name = ("composite name", 1)
val col = "Test" \ "Composite2" \ "Insert Get" \ name
session.insert(col \ "composite value")

val colName = session.get(col).get.name
assertEquals(name, tuple[String, Int](session.get(col).get.name))
}

@Test def composite3InsertGet = borrow { session =>
val name = ("composite name", 1L, "name part 3")
val col = "Test" \ "Composite3" \ "Insert Get" \ name
session.insert(col \ "composite value")

val colName = session.get(col).get.name
assertEquals(name, tuple[String, Long, String](session.get(col).get.name))
}

@Test def composite2Range = borrow { session =>
val key = "Test" \ "Composite2" \ "Composite Range"
val col1 = key \ (("composite", 1)) \ 1
val col2 = key \ (("composite", 2)) \ 1
val col3 = key \ (("composite", 3)) \ 1
val col4 = key \ (("composite", 4)) \ 1
val col5 = key \ (("composite", 5)) \ 1
val col6 = key \ (("comcom", 5)) \ 1

session.batch(Insert(col1) :: Insert(col2) :: Insert(col3) :: Insert(col4) :: Insert(col5) :: Insert(col6))

val result1 = session.list(key, RangePredicate(Some(("c", None)), None, Order.Ascending, None))
assertEquals(6, result1.size)

val result2 = session.list(key, RangePredicate(Some(("composite", None)), None, Order.Ascending, None))
assertEquals(5, result2.size)

val result3 = session.list(key, RangePredicate(("composite", 2), ("composite", 4)))
assertEquals(3, result3.size)
assertEquals(col2, result3(0))
assertEquals(col3, result3(1))
assertEquals(col4, result3(2))
}

@Test def composite3Range = borrow { session =>
val key = "Test" \ "Composite3" \ "Composite Range"
val col1 = key \ (("composite", 1L, "A")) \ 1
val col2 = key \ (("composite", 1L, "a")) \ 1
val col3 = key \ (("composite", 10L, "A")) \ 1

session.batch(Insert(col1) :: Insert(col2) :: Insert(col3))

val result0 = session.list(key, RangePredicate(Some(("composite", None)), None, Order.Descending, None))
assertEquals(0, result0.size)

val result1 = session.list(key, RangePredicate(Some(("composite", None)), None, Order.Ascending, None))
assertEquals(3, result1.size)

val result2 = session.list(key, RangePredicate(("composite", 1L), ("composite", 2L)))
assertEquals(2, result2.size)
assertEquals(col1, result2(0))
assertEquals(col2, result2(1))

val result3 = session.list(key, RangePredicate(("composite", 1L, "B"), ("composite", 2L)))
assertEquals(1, result3.size)
assertEquals(col2, result3(0))
}
}

0 comments on commit 067224e

Please sign in to comment.