Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Provisional upgrade to Cassanra 0.7 API and dependencies

  • Loading branch information...
commit a0a7dfb7a103ad08c64ddf74c5aa93dec4873184 1 parent eba8bb6
@azinman authored
Showing with 226 additions and 181 deletions.
  1. +11 −5 pom.xml
  2. +4 −4 src/main/scala/com/shorrockin/cascal/jmx/CascalStatistics.scala
  3. +15 −13 src/main/scala/com/shorrockin/cascal/model/Column.scala
  4. +2 −1  src/main/scala/com/shorrockin/cascal/model/ColumnContainer.scala
  5. +2 −1  src/main/scala/com/shorrockin/cascal/model/Keyspace.scala
  6. +3 −2 src/main/scala/com/shorrockin/cascal/model/PathComponent.scala
  7. +5 −3 src/main/scala/com/shorrockin/cascal/model/StandardColumnContainer.scala
  8. +6 −5 src/main/scala/com/shorrockin/cascal/model/StandardKey.scala
  9. +13 −12 src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala
  10. +5 −4 src/main/scala/com/shorrockin/cascal/model/SuperKey.scala
  11. +23 −23 src/main/scala/com/shorrockin/cascal/serialization/Converter.scala
  12. +20 −18 src/main/scala/com/shorrockin/cascal/serialization/Serializer.scala
  13. +0 −8 src/main/scala/com/shorrockin/cascal/session/Consistency.scala
  14. +17 −8 src/main/scala/com/shorrockin/cascal/session/KeyRange.scala
  15. +9 −8 src/main/scala/com/shorrockin/cascal/session/Predicate.scala
  16. +67 −37 src/main/scala/com/shorrockin/cascal/session/Session.scala
  17. +1 −3 src/main/scala/com/shorrockin/cascal/session/SessionPool.scala
  18. +2 −8 src/main/scala/com/shorrockin/cascal/session/SessionTemplate.scala
  19. +4 −2 src/main/scala/com/shorrockin/cascal/testing/CassandraTestPool.scala
  20. +17 −16 src/main/scala/com/shorrockin/cascal/utils/Conversions.scala
View
16 pom.xml
@@ -59,7 +59,7 @@
</execution>
</executions>
<configuration>
- <!-- no luck <vscaladocVersion>1.1</vscaladocVersion> -->
+ <!-- no luck <vscaladocVersion>1.1</vscaladocVersion> -->
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-${java.compile.version}</arg>
@@ -133,6 +133,12 @@
<repositories>
<repository>
+ <id>maven</id>
+ <name>Maven Main Repo</name>
+ <url>http://repo1.maven.org/maven2/</url>
+ </repository>
+
+ <repository>
<id>scala-tools.org-snapshots</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-snapshots/</url>
@@ -160,9 +166,9 @@
</dependency>
<dependency>
- <groupId>org.apache.thrift</groupId>
+ <groupId>org.apache.cassandra.deps</groupId>
<artifactId>libthrift</artifactId>
- <version>917130</version>
+ <version>0.5.0</version>
</dependency>
<dependency>
@@ -173,8 +179,8 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
- <artifactId>cassandra</artifactId>
- <version>0.6.1</version>
+ <artifactId>cassandra-all</artifactId>
+ <version>0.7.0</version>
</dependency>
<dependency>
View
8 src/main/scala/com/shorrockin/cascal/jmx/CascalStatistics.scala
@@ -34,7 +34,7 @@ object CascalStatistics extends CascalStatistics$MBean {
mbeanServer.registerMBean(this, objectName)
}
-
+
/**
* retrieves the stats for the specified host, creating and registering them if they don't
* exist.
@@ -60,7 +60,7 @@ object CascalStatistics extends CascalStatistics$MBean {
}
def register(pool:SessionPool) = pools = pool :: pools
- def unregister(pool:SessionPool) = pools = pools - pool
+ def unregister(pool:SessionPool) = pools = pools.filterNot(_ == pool)
def creation(host:Host) = get(host).creation
def creationError(host:Host) = get(host).creationError
@@ -94,7 +94,7 @@ class HostStatistics(host:Host) extends HostStatisticsMBean {
def getTotalUsageTime() = usageTime
def getNumberOfCreationFailures() = createFails
def getNumberOfUsageExceptions() = usageErrors
- def getNumberOfSessionsCreated() = created
+ def getNumberOfSessionsCreated() = created
}
trait HostStatisticsMBean {
@@ -109,4 +109,4 @@ trait HostStatisticsMBean {
trait CascalStatistics$MBean extends HostStatisticsMBean {
def getNumberOfActiveConnections():Int
def getNumberOfIdleConnections():Int
-}
+}
View
28 src/main/scala/com/shorrockin/cascal/model/Column.scala
@@ -1,5 +1,6 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
import java.util.Date
import org.apache.cassandra.thrift.{ColumnPath, ColumnOrSuperColumn}
import org.apache.cassandra.thrift.{Column => CassColumn}
@@ -15,14 +16,14 @@ import com.shorrockin.cascal.utils.Utils.now
* @author Chris Shorrock
* @param Owner the type of object which owns this column
*/
-case class Column[Owner](val name:Array[Byte],
- val value:Array[Byte],
+case class Column[Owner](val name:ByteBuffer,
+ val value:ByteBuffer,
val time:Long,
val owner:Owner) extends Gettable[Column[Owner]] {
- def this(name:Array[Byte], value:Array[Byte], owner:Owner) = this(name, value, now, owner)
- def this(name:Array[Byte], owner:Owner) = this(name, null, now, owner)
- def this(name:Array[Byte], value:Array[Byte], date:Date, owner:Owner) = this(name, value, date.getTime, owner)
+ def this(name:ByteBuffer, value:ByteBuffer, owner:Owner) = this(name, value, now, owner)
+ def this(name:ByteBuffer, owner:Owner) = this(name, null, now, owner)
+ def this(name:ByteBuffer, value:ByteBuffer, date:Date, owner:Owner) = this(name, value, date.getTime, owner)
val partial = (value == null)
@@ -44,7 +45,7 @@ case class Column[Owner](val name:Array[Byte],
case key:StandardKey => cosc.setColumn(new CassColumn(name, value, time))
case sup:SuperColumn =>
val list = Conversions.toJavaList(new CassColumn(name, value, time) :: Nil)
- cosc.setSuper_column(new CassSuperColumn(sup.value, list))
+ cosc.setSuper_column(new CassSuperColumn(sup.value, list))
}
}
@@ -53,7 +54,7 @@ case class Column[Owner](val name:Array[Byte],
* copy method to create a new instance of this column with a new value and
* the same other values.
*/
- def \(newValue:Array[Byte]) = new Column[Owner](name, newValue, time, owner)
+ def \(newValue:ByteBuffer) = new Column[Owner](name, newValue, time, owner)
/**
@@ -68,15 +69,16 @@ case class Column[Owner](val name:Array[Byte],
*/
def convertGetResult(colOrSuperCol:ColumnOrSuperColumn):Column[Owner] = {
val col = colOrSuperCol.getColumn
- Column(col.getName, col.getValue, col.getTimestamp, owner)
+ Column(ByteBuffer.wrap(col.getName), ByteBuffer.wrap(col.getValue), col.getTimestamp, owner)
}
- private def stringIfPossible(a:Array[Byte]):String = {
- if (a.length <= 4) return "Array (" + a.mkString(", ") + ")"
- if (a.length > 1000) return a.toString
- try { Conversions.string(a) } catch { case _ => a.toString }
+ private def stringIfPossible(a:ByteBuffer):String = {
+ if (a == null) return "NULL"
+ if (a.array.length <= 4) return "Array (" + a.array.mkString(", ") + ")"
+ if (a.array.length > 1000) return a.array.toString
+ try { Conversions.string(a) } catch { case _ => a.array.toString }
}
override def toString():String = "%s \\ Column(name = %s, value = %s, time = %s)".format(
owner.toString, stringIfPossible(name), stringIfPossible(value), time)
-}
+}
View
3  src/main/scala/com/shorrockin/cascal/model/ColumnContainer.scala
@@ -1,5 +1,6 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
import org.apache.cassandra.thrift.{ColumnParent, ColumnPath, ColumnOrSuperColumn}
/**
@@ -13,7 +14,7 @@ import org.apache.cassandra.thrift.{ColumnParent, ColumnPath, ColumnOrSuperColum
* @param ListType when listed, what type of object does it return.
*/
trait ColumnContainer[ColumnType, ListType] {
- def \(value:Array[Byte]):ColumnType
+ def \(value:ByteBuffer):ColumnType
val family:ColumnFamily[_]
val key:Key[_, _]
View
3  src/main/scala/com/shorrockin/cascal/model/Keyspace.scala
@@ -1,4 +1,5 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
/**
* provides the high level abstraction for the keyspace. can be thought
@@ -15,4 +16,4 @@ case class Keyspace(val value:String) extends StringValue {
def \(value:String):StandardColumnFamily = new StandardColumnFamily(value, this)
def \\(value:String):SuperColumnFamily = new SuperColumnFamily(value, this)
override def toString = "Keyspace(value = %s)".format(value)
-}
+}
View
5 src/main/scala/com/shorrockin/cascal/model/PathComponent.scala
@@ -1,4 +1,5 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
/**
* categorization of a cassandra path component.
@@ -13,10 +14,10 @@ trait PathComponent[ValueType] { val value:ValueType }
* categorization of a path component who's value is a byte
* @author Chris Shorrock
*/
-trait ByteValue extends PathComponent[Array[Byte]]
+trait ByteValue extends PathComponent[ByteBuffer]
/**
* categorization of path component who's value is a string
* @author Chris Shorrock
*/
-trait StringValue extends PathComponent[String]
+trait StringValue extends PathComponent[String]
View
8 src/main/scala/com/shorrockin/cascal/model/StandardColumnContainer.scala
@@ -1,12 +1,14 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
+
/**
* a type of column container which holds standard columns.
*
* @author Chris Shorrock
*/
trait StandardColumnContainer[ColumnType, SliceType] extends ColumnContainer[ColumnType, SliceType] {
- def \(name:Array[Byte]):ColumnType
- def \(name:Array[Byte], value:Array[Byte]):ColumnType
- def \(name:Array[Byte], value:Array[Byte], time:Long):ColumnType
+ def \(name:ByteBuffer):ColumnType
+ def \(name:ByteBuffer, value:ByteBuffer):ColumnType
+ def \(name:ByteBuffer, value:ByteBuffer, time:Long):ColumnType
}
View
11 src/main/scala/com/shorrockin/cascal/model/StandardKey.scala
@@ -1,5 +1,6 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
import org.apache.cassandra.thrift.{ColumnOrSuperColumn}
/**
@@ -12,16 +13,16 @@ import org.apache.cassandra.thrift.{ColumnOrSuperColumn}
case class StandardKey(val value:String, val family:StandardColumnFamily) extends Key[Column[StandardKey], Seq[Column[StandardKey]]]
with StandardColumnContainer[Column[StandardKey], Seq[Column[StandardKey]]] {
- def \(name:Array[Byte]) = new Column(name, this)
- def \(name:Array[Byte], value:Array[Byte]) = new Column(name, value, this)
- def \(name:Array[Byte], value:Array[Byte], time:Long) = new Column(name, value, time, this)
+ def \(name:ByteBuffer) = new Column(name, this)
+ def \(name:ByteBuffer, value:ByteBuffer) = new Column(name, value, this)
+ def \(name:ByteBuffer, value:ByteBuffer, time:Long) = new Column(name, value, time, this)
def convertListResult(results:Seq[ColumnOrSuperColumn]):Seq[Column[StandardKey]] = {
results.map { (result) =>
val column = result.getColumn
- \(column.getName, column.getValue, column.getTimestamp)
+ \(ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp)
}
}
override def toString = "%s \\ StandardKey(value = %s)".format(family.toString, value)
-}
+}
View
25 src/main/scala/com/shorrockin/cascal/model/SuperColumn.scala
@@ -1,5 +1,6 @@
package com.shorrockin.cascal.model
+import java.nio.ByteBuffer
import org.apache.cassandra.thrift.{ColumnPath, ColumnParent, ColumnOrSuperColumn}
import com.shorrockin.cascal.utils.Conversions
@@ -10,11 +11,11 @@ import com.shorrockin.cascal.utils.Conversions
*
* @author Chris Shorrock
*/
-case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable[Seq[Column[SuperColumn]]]()
+case class SuperColumn(val value:ByteBuffer, val key:SuperKey) extends Gettable[Seq[Column[SuperColumn]]]()
with StandardColumnContainer[Column[SuperColumn], Seq[Column[SuperColumn]]] {
- def \(name:Array[Byte]) = new Column(name, this)
- def \(name:Array[Byte], value:Array[Byte]) = new Column(name, value, this)
- def \(name:Array[Byte], value:Array[Byte], time:Long) = new Column(name, value, time, this)
+ def \(name:ByteBuffer) = new Column(name, this)
+ def \(name:ByteBuffer, value:ByteBuffer) = new Column(name, value, this)
+ def \(name:ByteBuffer, value:ByteBuffer, time:Long) = new Column(name, value, time, this)
val family = key.family
val keyspace = family.keyspace
@@ -22,7 +23,7 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable
lazy val columnParent = new ColumnParent(family.value).setSuper_column(value)
lazy val columnPath = new ColumnPath(family.value).setSuper_column(value)
- def ::(other:SuperColumn):List[SuperColumn] = other :: this :: Nil
+ def ::(other:SuperColumn):List[SuperColumn] = other :: this :: Nil
private def convertList[T](v:java.util.List[T]):List[T] = {
scala.collection.JavaConversions.asBuffer(v).toList
@@ -34,7 +35,7 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable
*/
def convertGetResult(colOrSuperCol:ColumnOrSuperColumn):Seq[Column[SuperColumn]] = {
val superCol = colOrSuperCol.getSuper_column
- convertList(superCol.getColumns).map { (column) => \(column.getName, column.getValue, column.getTimestamp) }
+ convertList(superCol.getColumns).map { (column) => \(ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp) }
}
@@ -45,16 +46,16 @@ case class SuperColumn(val value:Array[Byte], val key:SuperKey) extends Gettable
def convertListResult(results:Seq[ColumnOrSuperColumn]):Seq[Column[SuperColumn]] = {
results.map { (result) =>
val column = result.getColumn
- \(column.getName, column.getValue, column.getTimestamp)
+ \(ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp)
}
}
- private def stringIfPossible(a:Array[Byte]):String = {
- if (a.length <= 4) return "Array (" + a.mkString(", ") + ")"
- if (a.length > 1000) return a.toString
- try { Conversions.string(a) } catch { case _ => a.toString }
+ private def stringIfPossible(a:ByteBuffer):String = {
+ if (a.array.length <= 4) return "Array (" + a.array.mkString(", ") + ")"
+ if (a.array.length > 1000) return a.array.toString
+ try { Conversions.string(a) } catch { case _ => a.array.toString }
}
override def toString():String = "%s \\ SuperColumn(value = %s)".format(
key.toString, stringIfPossible(value))
-}
+}
View
9 src/main/scala/com/shorrockin/cascal/model/SuperKey.scala
@@ -1,10 +1,11 @@
package com.shorrockin.cascal.model
import org.apache.cassandra.thrift.{ColumnOrSuperColumn}
+import java.nio.ByteBuffer
case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[SuperColumn, Seq[(SuperColumn, Seq[Column[SuperColumn]])]] {
- def \(value:Array[Byte]) = new SuperColumn(value, this)
+ def \(value:ByteBuffer) = new SuperColumn(value, this)
/**
* converts a list of super columns to the specified return type
@@ -12,9 +13,9 @@ case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[
def convertListResult(results:Seq[ColumnOrSuperColumn]):Seq[(SuperColumn, Seq[Column[SuperColumn]])] = {
results.map { (result) =>
val nativeSuperCol = result.getSuper_column
- val superColumn = this \ nativeSuperCol.getName
+ val superColumn = this \ ByteBuffer.wrap(nativeSuperCol.getName)
val columns = convertList(nativeSuperCol.getColumns).map { (column) =>
- superColumn \ (column.getName, column.getValue, column.getTimestamp)
+ superColumn \ (ByteBuffer.wrap(column.getName), ByteBuffer.wrap(column.getValue), column.getTimestamp)
}
(superColumn -> columns)
}
@@ -25,4 +26,4 @@ case class SuperKey(val value:String, val family:SuperColumnFamily) extends Key[
}
override def toString = "%s \\ SuperKey(value = %s)".format(family.toString, value)
-}
+}
View
46 src/main/scala/com/shorrockin/cascal/serialization/Converter.scala
@@ -1,9 +1,10 @@
package com.shorrockin.cascal.serialization
+import java.nio.ByteBuffer
import reflect.Manifest
import java.lang.annotation.Annotation
import java.lang.reflect.{Field, Method}
-import java.util.{Arrays, Date, UUID}
+import java.util.{Date, UUID}
import annotations.{Columns, Optional}
import annotations.{Key => AKey, SuperColumn => ASuperColumn, Value => AValue}
import annotations.{Keyspace => AKeySpace, Super => ASuper, Family => AFamily}
@@ -25,7 +26,7 @@ object Converter extends Converter(Serializer.Default) with Logging {
class Converter(serializers:Map[Class[_], Serializer[_]]) {
private var reflectionCache = Map[Class[_], ReflectionInformation]()
-
+
/**
* converts all the column sequences in the provided map (which is returned from a list
@@ -99,7 +100,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
* Given a class type, a Method that returns that type, and a source object (Cascal ORM object),
* return the appropriate serialized byte array. Does not support Option.
*/
- private def getFieldSerialized[T](fieldType:Class[_], fieldGetter:Method, obj:T):Array[Byte] = {
+ private def getFieldSerialized[T](fieldType:Class[_], fieldGetter:Method, obj:T):ByteBuffer = {
// Couldn't figure out how to case match classes on a class obj with type erasure
if (fieldType == classOf[String]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[String])
else if (fieldType == classOf[UUID]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[UUID])
@@ -109,8 +110,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
else if (fieldType == classOf[Float]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[Float])
else if (fieldType == classOf[Double]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[Double])
else if (fieldType == classOf[Date]) Conversions.bytes(fieldGetter.invoke(obj).asInstanceOf[Date])
- else throw new IllegalStateException(
- "Type %s of getter %s is unknown".format(fieldGetter.getName, fieldType.toString))
+ else throw new IllegalStateException("Type %s of getter %s is unknown".format(fieldGetter.getName, fieldType.toString))
}
/**
@@ -118,7 +118,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
* return null if calling the method returns None, or otherwise the appropriate
* serialized byte array.
*/
- private def getOptionFieldSerialized[T](fieldGetter:Method, obj:T):Array[Byte] = {
+ private def getOptionFieldSerialized[T](fieldGetter:Method, obj:T):ByteBuffer = {
val opt = fieldGetter.invoke(obj).asInstanceOf[Option[_]]
opt match {
case None => null
@@ -139,8 +139,8 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
* Given an object of type T using the Cascal Annotations returns a list of columns
* complete with name/value. Uses the serializers to convert values in columns to their
* appropriate byte array.
- */
- def unapply[T](obj:T)(implicit manifest:Manifest[T]):Seq[Column[_]] = {
+ */
+ def unapply[T](obj:T)(implicit manifest:Manifest[T]):List[Column[_]] = {
val info = Converter.this.info(manifest.erasure)
val key:String = info.fieldGettersAndColumnNames.filter(tup => tup._2._2 match {
@@ -148,7 +148,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
case _ => false
}).head._1.invoke(obj).asInstanceOf[String]
- var superCol:Array[Byte] = null
+ var superCol:ByteBuffer = null
if (info.isSuper) {
val superTup = info.fieldGettersAndColumnNames.filter(tup => tup._2._2 match {
case a:ASuperColumn => true
@@ -159,11 +159,11 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
superCol = getFieldSerialized(superType, superGetter, obj)
}
- info.fieldGettersAndColumnNames.map((tup) => {
+ info.fieldGettersAndColumnNames.foldLeft(List[Column[_]]()) { (acc, tup) =>
val fieldGetter = tup._1
var optField = false
val fieldType = tup._2._2 match {
- case a:Optional =>
+ case a:Optional =>
optField = true
a.as
case _ => tup._2._1
@@ -174,17 +174,17 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
case _ => null
}
- val value:Array[Byte] = optField match {
+ val value:ByteBuffer = optField match {
case false => getFieldSerialized(fieldType, fieldGetter, obj)
case true => getOptionFieldSerialized(fieldGetter, obj)
}
- if (columnName == null || value == null) null
+ if (columnName == null || value == null) acc
else info.isSuper match {
- case true => info.family.asInstanceOf[SuperColumnFamily] \ key \ superCol \ (Conversions.bytes(columnName), value)
- case false => info.family.asInstanceOf[StandardColumnFamily] \ key \ (Conversions.bytes(columnName), value)
+ case true => (info.family.asInstanceOf[SuperColumnFamily] \ key \ superCol \ (Conversions.bytes(columnName), value)) :: acc
+ case false => (info.family.asInstanceOf[StandardColumnFamily] \ key \ (Conversions.bytes(columnName), value)) :: acc
}
- }).filter(_!=null)
+ }
}
/**
@@ -205,11 +205,11 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
/**
- * returns the column with the specified name, or
+ * returns the column with the specified name, or
*/
private def find(name:String, columns:Seq[Column[_]]):Option[Column[_]] = {
val nameBytes = Conversions.bytes(name)
- columns.find { (c) => Arrays.equals(nameBytes, c.name) }
+ columns.find { (c) => nameBytes.equals(c.name) }
}
@@ -217,7 +217,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
* converts the specified byte array to the specified type using the installed
* serializers.
*/
- private def bytesToObject[A](ofType:Class[A], bytes:Array[Byte]):A = {
+ private def bytesToObject[A](ofType:Class[A], bytes:ByteBuffer):A = {
serializers.get(ofType) match {
case None => throw new IllegalArgumentException("unable to find serializer for type: " + ofType)
case Some(s) =>
@@ -272,7 +272,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
case class ReflectionInformation(val cls:Class[_]) {
val keyspace = {
extract(cls, classOf[AKeySpace]) match {
- case None => throw new IllegalArgumentException("all mapped classes must contain @Keyspace annotation")
+ case None => throw new IllegalArgumentException("all mapped classes must contain @Keyspace annotation; not found in " + cls)
case Some(v) => Keyspace(v.value())
}
}
@@ -325,7 +325,7 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
cls.getDeclaredFields.foreach { field =>
val annotations = field.getDeclaredAnnotations
if (annotations.length > 0) annotations(0) match {
- case a:AKey => out = (field -> a) :: out
+ case a:AKey => out = (field -> a) :: out
case a:Optional => out = (field -> a) :: out
case a:ASuperColumn => out = (field -> a) :: out
case a:AValue => out = (field -> a) :: out
@@ -350,11 +350,11 @@ class Converter(serializers:Map[Class[_], Serializer[_]]) {
* returns all the fields matching the specified annotation
*/
def fields[A <: Annotation](cls:Class[A]):Seq[(Field, Annotation)] = fields.filter { (tup) => cls.equals(tup._2.getClass) }
-
+
private def extract[A <: Annotation](cls:Class[_], annot:Class[A]):Option[A] = {
val value = cls.getAnnotation(annot).asInstanceOf[A]
if (null == value) None
else Some(value)
}
}
-}
+}
View
38 src/main/scala/com/shorrockin/cascal/serialization/Serializer.scala
@@ -4,7 +4,7 @@ import com.shorrockin.cascal.utils.{UUID => UUIDUtils}
import java.util.UUID
import java.util.Date
import java.nio.charset.Charset
-import java.nio.ByteBuffer
+import java.nio.{ByteBuffer,CharBuffer}
object Serializer {
@@ -31,10 +31,10 @@ object Serializer {
*/
trait Serializer[A] {
/** converts this object to a byte array for entry into cassandra */
- def toBytes(obj:A):Array[Byte]
+ def toBytes(obj:A):ByteBuffer
/** converts the specified byte array into an object */
- def fromBytes(bytes:Array[Byte]):A
+ def fromBytes(bytes:ByteBuffer):A
/** converts the specified value to a string */
def toString(obj:A):String
@@ -45,15 +45,17 @@ trait Serializer[A] {
object StringSerializer extends Serializer[String] {
val utf8 = Charset.forName("UTF-8")
+ val decoder = utf8.newDecoder
+ val encoder = utf8.newEncoder
- def toBytes(str:String) = str.getBytes(utf8)
- def fromBytes(bytes:Array[Byte]) = new String(bytes, utf8)
+ def toBytes(str:String) = encoder.encode(CharBuffer.wrap(str.toCharArray))
+ def fromBytes(bytes:ByteBuffer) = decoder.decode(bytes).toString
def toString(str:String) = str
def fromString(str:String) = str
}
object UUIDSerializer extends Serializer[UUID] {
- def fromBytes(bytes:Array[Byte]) = UUIDUtils(bytes)
+ def fromBytes(bytes:ByteBuffer) = UUIDUtils(bytes.array)
def toString(uuid:UUID) = uuid.toString
def fromString(str:String) = UUID.fromString(str)
@@ -65,7 +67,7 @@ object UUIDSerializer extends Serializer[UUID] {
(0 until 8).foreach { (i) => buffer(i) = (msb >>> 8 * (7 - i)).asInstanceOf[Byte] }
(8 until 16).foreach { (i) => buffer(i) = (lsb >>> 8 * (7 - i)).asInstanceOf[Byte] }
- buffer
+ ByteBuffer.wrap(buffer)
}
}
@@ -73,8 +75,8 @@ object UUIDSerializer extends Serializer[UUID] {
object IntSerializer extends Serializer[Int] {
val bytesPerInt = java.lang.Integer.SIZE / java.lang.Byte.SIZE
- def toBytes(i:Int) = ByteBuffer.wrap(new Array[Byte](bytesPerInt)).putInt(i).array()
- def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getInt()
+ def toBytes(i:Int) = ByteBuffer.allocate(bytesPerInt).putInt(i)
+ def fromBytes(bytes:ByteBuffer) = bytes.getInt
def toString(obj:Int) = obj.toString
def fromString(str:String) = str.toInt
}
@@ -82,15 +84,15 @@ object IntSerializer extends Serializer[Int] {
object LongSerializer extends Serializer[Long] {
val bytesPerLong = java.lang.Long.SIZE / java.lang.Byte.SIZE
- def toBytes(l:Long) = ByteBuffer.wrap(new Array[Byte](bytesPerLong)).putLong(l).array()
- def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getLong()
+ def toBytes(l:Long) = ByteBuffer.allocate(bytesPerLong).putLong(l)
+ def fromBytes(bytes:ByteBuffer) = bytes.getLong()
def toString(obj:Long) = obj.toString
def fromString(str:String) = str.toLong
}
object BooleanSerializer extends Serializer[Boolean] {
def toBytes(b:Boolean) = StringSerializer.toBytes(b.toString)
- def fromBytes(bytes:Array[Byte]) = StringSerializer.fromBytes(bytes).toBoolean
+ def fromBytes(bytes:ByteBuffer) = StringSerializer.fromBytes(bytes).toBoolean
def toString(obj:Boolean) = obj.toString
def fromString(str:String) = str.toBoolean
}
@@ -98,8 +100,8 @@ object BooleanSerializer extends Serializer[Boolean] {
object FloatSerializer extends Serializer[Float] {
val bytesPerFloat = java.lang.Float.SIZE / java.lang.Byte.SIZE
- def toBytes(f:Float) = ByteBuffer.wrap(new Array[Byte](bytesPerFloat)).putFloat(f).array()
- def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getFloat()
+ def toBytes(f:Float) = ByteBuffer.allocate(bytesPerFloat).putFloat(f)
+ def fromBytes(bytes:ByteBuffer) = bytes.getFloat()
def toString(obj:Float) = obj.toString
def fromString(str:String) = str.toFloat
}
@@ -107,15 +109,15 @@ object FloatSerializer extends Serializer[Float] {
object DoubleSerializer extends Serializer[Double] {
val bytesPerDouble = java.lang.Double.SIZE / java.lang.Byte.SIZE
- def toBytes(d:Double) = ByteBuffer.wrap(new Array[Byte](bytesPerDouble)).putDouble(d).array()
- def fromBytes(bytes:Array[Byte]) = ByteBuffer.wrap(bytes).getDouble
+ def toBytes(d:Double) = ByteBuffer.allocate(bytesPerDouble).putDouble(d)
+ def fromBytes(bytes:ByteBuffer) = bytes.getDouble
def toString(obj:Double) = obj.toString
def fromString(str:String) = str.toDouble
}
object DateSerializer extends Serializer[Date] {
def toBytes(date:Date) = LongSerializer.toBytes(date.getTime)
- def fromBytes(bytes:Array[Byte]) = new Date(LongSerializer.fromBytes(bytes).longValue)
+ def fromBytes(bytes:ByteBuffer) = new Date(LongSerializer.fromBytes(bytes).longValue)
def toString(obj:Date) = obj.getTime.toString
def fromString(str:String) = new Date(str.toLong.longValue)
-}
+}
View
8 src/main/scala/com/shorrockin/cascal/session/Consistency.scala
@@ -8,14 +8,6 @@ import org.apache.cassandra.thrift.ConsistencyLevel
* @author Chris Shorrock
*/
object Consistency {
-
- /**
- * WRITE: Ensure nothing. A write happens asynchronously in background
- *
- * READ: Not supported, because it doesn't make sense
- */
- val Zero = new Consistency { def thriftValue = ConsistencyLevel.ZERO }
-
/**
* WRITE: Ensure that the write has been written to at least 1 node,
* including hinted recipients.
View
25 src/main/scala/com/shorrockin/cascal/session/KeyRange.scala
@@ -1,6 +1,7 @@
package com.shorrockin.cascal.session
import org.apache.cassandra.thrift.{KeyRange => CassKeyRange}
+import java.nio.charset.Charset
/**
* a key range is used when you list by keys to specified the start and end
@@ -10,11 +11,19 @@ import org.apache.cassandra.thrift.{KeyRange => CassKeyRange}
*
* @author Chris Shorrock
*/
-case class KeyRange(start:String, end:String, limit:Int) {
+object KeyRange {
+ val utf8 = Charset.forName("UTF-8")
+}
+
+trait CassandraKeyRange {
+ lazy val cassandraRange:CassKeyRange = null
+}
+
+case class KeyRange(start:String, end:String, limit:Int) extends CassKeyRange {
lazy val cassandraRange = {
val range = new CassKeyRange(limit)
- range.setStart_key(start)
- range.setEnd_key(end)
+ range.setStart_key(KeyRange.utf8.encode(start))
+ range.setEnd_key(KeyRange.utf8.encode(end))
range
}
}
@@ -28,11 +37,11 @@ case class KeyRange(start:String, end:String, limit:Int) {
*
* @author Chris Shorrock
*/
-case class TokenRange(tokenStart:String, tokenEnd:String, tokenLimit:Int) extends KeyRange(tokenStart, tokenEnd, tokenLimit) {
+case class TokenRange(tokenStart:String, tokenEnd:String, tokenLimit:Int) extends CassandraKeyRange {
override lazy val cassandraRange = {
- val range = new CassKeyRange(limit)
- range.setStart_token(start)
- range.setEnd_token(end)
+ val range = new CassKeyRange(tokenLimit)
+ range.setStart_token(tokenStart)
+ range.setEnd_token(tokenEnd)
range
}
-}
+}
View
17 src/main/scala/com/shorrockin/cascal/session/Predicate.scala
@@ -1,5 +1,6 @@
package com.shorrockin.cascal.session
+import java.nio.ByteBuffer
import org.apache.cassandra.thrift.{SliceRange, SlicePredicate}
import com.shorrockin.cascal.utils.Conversions
@@ -20,7 +21,7 @@ trait Predicate {
*
* @author Chris Shorrock
*/
-case class ColumnPredicate(values:Seq[Array[Byte]]) extends Predicate {
+case class ColumnPredicate(values:Seq[ByteBuffer]) extends Predicate {
val slicePredicate = new SlicePredicate()
slicePredicate.setColumn_names(Conversions.toJavaList(values))
}
@@ -28,9 +29,9 @@ case class ColumnPredicate(values:Seq[Array[Byte]]) extends Predicate {
object RangePredicate {
def apply(limit:Int) = new RangePredicate(None, None, Order.Ascending, Some(limit))
def apply(order:Order, limit:Int) = new RangePredicate(None, None, order, Some(limit))
- def apply(start:Array[Byte], end:Array[Byte]) = new RangePredicate(Some(start), Some(end), Order.Ascending, None)
- def apply(start:Array[Byte], end:Array[Byte], limit:Int) = new RangePredicate(Some(start), Some(end), Order.Ascending, Some(limit))
- def apply(start:Option[Array[Byte]], end:Option[Array[Byte]], order:Order, limit:Option[Int]) = new RangePredicate(start, end, order, limit)
+ def apply(start:ByteBuffer, end:ByteBuffer) = new RangePredicate(Some(start), Some(end), Order.Ascending, None)
+ def apply(start:ByteBuffer, end:ByteBuffer, limit:Int) = new RangePredicate(Some(start), Some(end), Order.Ascending, Some(limit))
+ def apply(start:Option[ByteBuffer], end:Option[ByteBuffer], order:Order, limit:Option[Int]) = new RangePredicate(start, end, order, limit)
}
/**
@@ -38,10 +39,10 @@ object RangePredicate {
*
* @author Chris Shorrock
*/
-class RangePredicate(start:Option[Array[Byte]], end:Option[Array[Byte]], order:Order, limit:Option[Int]) extends Predicate {
- val emptyBytes = new Array[Byte](0)
+class RangePredicate(start:Option[ByteBuffer], end:Option[ByteBuffer], order:Order, limit:Option[Int]) extends Predicate {
+ val emptyBytes = ByteBuffer.wrap(new Array[Byte](0))
- def optBytesToBytes(opt:Option[Array[Byte]]) = opt match {
+ def optBytesToBytes(opt:Option[ByteBuffer]) = opt match {
case None => emptyBytes
case Some(array) => array
}
@@ -55,4 +56,4 @@ class RangePredicate(start:Option[Array[Byte]], end:Option[Array[Byte]], order:O
slicePredicate.setSlice_range(new SliceRange(optBytesToBytes(start), optBytesToBytes(end), order.reversed, limitVal))
}
-case object EmptyPredicate extends RangePredicate(None, None, Order.Ascending, None)
+case object EmptyPredicate extends RangePredicate(None, None, Order.Ascending, None)
View
104 src/main/scala/com/shorrockin/cascal/session/Session.scala
@@ -1,19 +1,21 @@
package com.shorrockin.cascal.session
-import org.apache.thrift.protocol.TBinaryProtocol
+import scala.collection.mutable
+import collection.immutable.HashSet
+import java.util.concurrent.atomic.AtomicLong
+import java.util.{Map => JMap, List => JList, HashMap, ArrayList}
+import java.nio.ByteBuffer
+import org.apache.thrift.protocol.TBinaryProtocol
+import org.apache.thrift.transport.{TFramedTransport, TSocket}
import org.apache.cassandra.thrift.{Mutation, Cassandra, NotFoundException, ConsistencyLevel}
-import java.util.{Map => JMap, List => JList, HashMap, ArrayList}
+import org.apache.cassandra.thrift.{Column => CassColumn}
+import org.apache.cassandra.thrift.{SuperColumn => CassSuperColumn}
+import com.shorrockin.cascal.model._
import com.shorrockin.cascal.utils.Conversions._
import com.shorrockin.cascal.utils.Utils.now
-import com.shorrockin.cascal.model._
-import org.apache.thrift.transport.{TFramedTransport, TSocket}
-import collection.immutable.HashSet
-
-import java.util.concurrent.atomic.AtomicLong
-
/**
* a cascal session is the entry point for interacting with the
* cassandra system through various path elements.
@@ -67,43 +69,52 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
/**
* return the current cluster name of the cassandra instance
*/
- lazy val clusterName = client.get_string_property("cluster name")
-
-
- /**
- * returns the configuration file of the connected cassandra instance
- */
- lazy val configFile = client.get_string_property("config file")
+ lazy val clusterName = client.describe_cluster_name()
/**
* returns the version of the cassandra instance
*/
- lazy val version = client.get_string_property("version")
+ lazy val version = client.describe_version()
/**
* returns all the keyspaces from the cassandra instance
*/
- lazy val keyspaces: Seq[String] = Buffer(client.get_string_list_property("keyspaces"))
+ lazy val keyspaces: Seq[String] = Buffer(client.describe_keyspaces.map { _.name })
/**
* returns the descriptors for all keyspaces
*/
lazy val keyspaceDescriptors: Set[Tuple3[String, String, String]] = {
var keyspaceDesc: Set[Tuple3[String, String, String]] = new HashSet[Tuple3[String, String, String]]
- convertSet(client.describe_keyspaces) foreach {
+ client.describe_keyspaces foreach {
space =>
- val familyMap = client.describe_keyspace(space)
- familyMap.keySet foreach {
+ val familyMap = space.cf_defs
+ familyMap foreach {
family =>
- keyspaceDesc = keyspaceDesc + ((space, family, familyMap.get(family).get("Type")))
+ keyspaceDesc = keyspaceDesc + ((space.name, family.name, family.column_type))
()
}
}
keyspaceDesc
}
+ /**
+ * Cassandra 0.7 requires you to set the keyspace before CRUD operations. We cache the last
+ * set keyspace and change it in the client if the new operation differs.
+ */
+ private var currentKeyspace:String = null
+
+ def verifyKeyspace(keyspace:String) = {
+ if (keyspace == null || keyspace.length == 0)
+ throw new IllegalArgumentException("Keyspace cannot be null")
+ if (currentKeyspace == null || !keyspace.equals(currentKeyspace)) {
+ client.set_keyspace(keyspace)
+ currentKeyspace = keyspace
+ }
+ }
+
def verifyInsert[E](col: Column[E]) {
var famType = if (col.owner.isInstanceOf[SuperColumn]) "Super" else "Standard"
if (!keyspaceDescriptors.contains(col.keyspace.value, col.family.value, famType)) {
@@ -131,7 +142,7 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
*/
def get[ResultType](col: Gettable[ResultType], consistency: Consistency): Option[ResultType] = detect {
try {
- val result = client.get(col.keyspace.value, col.key.value, col.columnPath, consistency)
+ val result = client.get(ByteBuffer.wrap(col.key.value.getBytes("UTF-8")), col.columnPath, consistency)
Some(col.convertGetResult(result))
} catch {
case nfe: NotFoundException => None
@@ -150,7 +161,9 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
*/
def insert[E](col: Column[E], consistency: Consistency) = detect {
verifyInsert(col)
- client.insert(col.keyspace.value, col.key.value, col.columnPath, col.value, col.time, consistency)
+ verifyKeyspace(col.keyspace.value)
+ val cassCol = new CassColumn(col.name, col.value, col.time)
+ client.insert(col.key.value, col.key.columnParent, cassCol, consistency)
col
}
@@ -164,11 +177,15 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
/**
* counts the number of columns in the specified column container
*/
- def count(container: ColumnContainer[_, _], consistency: Consistency): Int = detect {
- client.get_count(container.keyspace.value, container.key.value, container.columnParent, consistency)
+ def count(container: ColumnContainer[_, _], predicate:Predicate, consistency: Consistency): Int = detect {
+ verifyKeyspace(container.keyspace.value)
+ client.get_count(container.key.value, container.columnParent, predicate.slicePredicate, consistency)
}
-
+ /**
+ * counts the number of columns in the specified column container
+ */
+ def count(container: ColumnContainer[_, _], consistency: Consistency): Int = count(container, EmptyPredicate, consistency)
/**
* performs count on the specified column container
*/
@@ -179,8 +196,9 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
* removes the specified column container
*/
def remove(container: ColumnContainer[_, _], consistency: Consistency): Unit = detect {
+ verifyKeyspace(container.keyspace.value)
verifyRemove(container)
- client.remove(container.keyspace.value, container.key.value, container.columnPath, now, consistency)
+ client.remove(container.key.value, container.columnPath, now, consistency)
}
@@ -194,7 +212,8 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
* removes the specified column container
*/
def remove(column: Column[_], consistency: Consistency): Unit = detect {
- client.remove(column.keyspace.value, column.key.value, column.columnPath, now, consistency)
+ verifyKeyspace(column.keyspace.value)
+ client.remove(column.key.value, column.columnPath, now, consistency)
}
@@ -209,7 +228,8 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
* to determine which columns to return.
*/
def list[ResultType](container: ColumnContainer[_, ResultType], predicate: Predicate, consistency: Consistency): ResultType = detect {
- val results = client.get_slice(container.keyspace.value, container.key.value, container.columnParent, predicate.slicePredicate, consistency)
+ verifyKeyspace(container.keyspace.value)
+ val results = client.get_slice(container.key.value, container.columnParent, predicate.slicePredicate, consistency)
container.convertListResult(convertList(results))
}
@@ -239,10 +259,17 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
if (containers.size > 0) detect {
val firstContainer = containers(0)
val keyspace = firstContainer.keyspace
- val keyStrings = containers.map {_.key.value}
- val results = client.multiget_slice(keyspace.value, keyStrings, firstContainer.columnParent, predicate.slicePredicate, consistency)
+ val keyStrings = containers.map {container => ByteBuffer.wrap(container.key.value.getBytes("UTF-8"))}
+ verifyKeyspace(keyspace.value)
+ val results = client.multiget_slice(keyStrings, firstContainer.columnParent, predicate.slicePredicate, consistency)
+
+ val containersByKey = containers.foldLeft(
+ mutable.Map[String, ColumnContainer[ColumnType, ResultType]]())((acc, container) => {
+ acc += (container.key.value -> container)
+ })
- def locate(key: String) = (containers.find {_.key.value.equals(key)}).get
+ // def locate(key: String) = (containers.find {_.key.value.equals(key)}).get
+ def locate(key: String) = containersByKey(key)
convertMap(results).map { (tuple) =>
val key = locate(tuple._1)
@@ -273,7 +300,8 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
* of tokens. This list call is only available when using an order-preserving partition.
*/
def list[ColumnType, ListType](family: ColumnFamily[Key[ColumnType, ListType]], range: KeyRange, predicate: Predicate, consistency: Consistency): Map[Key[ColumnType, ListType], ListType] = detect {
- val results = client.get_range_slices(family.keyspace.value, family.columnParent, predicate.slicePredicate, range.cassandraRange, consistency)
+ verifyKeyspace(family.keyspace.value)
+ val results = client.get_range_slices(family.columnParent, predicate.slicePredicate, range.cassandraRange, consistency)
var map = Map[Key[ColumnType, ListType], ListType]()
convertList(results).foreach { (keyslice) =>
@@ -307,8 +335,9 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
*/
def batch(ops: Seq[Operation], consistency: Consistency): Unit = {
if (ops.size > 0) detect {
- val keyToFamilyMutations = new HashMap[String, JMap[String, JList[Mutation]]]()
+ val keyToFamilyMutations = new HashMap[ByteBuffer, JMap[String, JList[Mutation]]]()
val keyspace = ops(0).keyspace
+ verifyKeyspace(keyspace.value)
def getOrElse[A, B](map: JMap[A, B], key: A, f: => B): B = {
if (map.containsKey(key)) {
@@ -323,13 +352,13 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
ops.foreach {
(op) =>
verifyOperation(op)
- val familyToMutations = getOrElse(keyToFamilyMutations, op.key.value, new HashMap[String, JList[Mutation]]())
+ val familyToMutations = getOrElse(keyToFamilyMutations, ByteBuffer.wrap(op.key.value.getBytes("UTF-8")), new HashMap[String, JList[Mutation]]())
val mutationList = getOrElse(familyToMutations, op.family.value, new ArrayList[Mutation]())
mutationList.add(op.mutation)
}
// TODO may need to flatten duplicate super columns?
- client.batch_mutate(keyspace.value, keyToFamilyMutations, consistency)
+ client.batch_mutate(keyToFamilyMutations, consistency)
} else {
throw new IllegalArgumentException("cannot perform batch operation on 0 length operation sequence")
}
@@ -364,7 +393,7 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
* Given a code block, keep retrying it (up to maxTries) in the event of a
* Cassandra-related timeout exception.
*/
- private def timeoutTry(f: =>Unit, maxTries:Int=5):Unit = {
+ private def timeoutTry(f: =>Unit, maxTries:Int=5, timeoutWaitMsec:Int=200):Unit = {
var tries = maxTries
assert(maxTries > 0)
while(tries > 0) {
@@ -374,6 +403,7 @@ class Session(val host:Host, val defaultConsistency:Consistency, val framedTrans
case e:org.apache.cassandra.thrift.UnavailableException => tries -= 1; if (tries <= 0) throw e
case e:Exception => throw e
}
+ Thread.sleep(timeoutWaitMsec)
}
}
View
4 src/main/scala/com/shorrockin/cascal/session/SessionPool.scala
@@ -159,8 +159,6 @@ class SessionPool(val hosts:Seq[Host], val params:PoolParams, consistency:Consis
def clusterName:String = borrow { _.clusterName }
- def configFile:String = borrow { _.configFile }
-
def version:String = borrow { _.version }
def keyspaces:Seq[String] = borrow { _.keyspaces }
@@ -276,4 +274,4 @@ case class PoolParams(maxActive:Int,
GenericObjectPool.DEFAULT_TEST_WHILE_IDLE,
GenericObjectPool.DEFAULT_SOFT_MIN_EVICTABLE_IDLE_TIME_MILLIS,
true)
-}
+}
View
10 src/main/scala/com/shorrockin/cascal/session/SessionTemplate.scala
@@ -10,7 +10,7 @@ import com.shorrockin.cascal.model._
* @author Chris Shorrock
*/
trait SessionTemplate {
-
+
/**
* return the current cluster name of the cassandra instance
*/
@@ -18,12 +18,6 @@ trait SessionTemplate {
/**
- * returns the configuration file of the connected cassandra instance
- */
- def configFile:String
-
-
- /**
* returns the version of the cassandra instance
*/
def version:String
@@ -169,4 +163,4 @@ trait SessionTemplate {
* performs the list of operations in batch using the default consistency
*/
def batch(ops:Seq[Operation]):Unit
-}
+}
View
6 src/main/scala/com/shorrockin/cascal/testing/CassandraTestPool.scala
@@ -1,3 +1,5 @@
+// TODO Need to update this for Cassandra 0.7
+
package com.shorrockin.cascal.testing
import org.apache.cassandra.thrift.CassandraDaemon
@@ -37,7 +39,7 @@ object EmbeddedTestCassandra extends Logging {
log.debug("creating cassandra instance at: " + homeDirectory.getCanonicalPath)
log.debug("copying cassandra configuration files to root directory")
-
+
val fileSep = System.getProperty("file.separator")
val storageFile = new File(homeDirectory, "storage-conf.xml")
val logFile = new File(homeDirectory, "log4j.properties")
@@ -49,7 +51,7 @@ object EmbeddedTestCassandra extends Logging {
log.debug("creating data file and log location directories")
DatabaseDescriptor.getAllDataFileLocations.foreach { (file) => new File(file).mkdirs }
- new File(DatabaseDescriptor.getLogFileLocation).mkdirs
+ // new File(DatabaseDescriptor.getLogFileLocation).mkdirs
val daemon = new CassandraDaemonThread
daemon.start
View
33 src/main/scala/com/shorrockin/cascal/utils/Conversions.scala
@@ -4,6 +4,7 @@ import java.nio.charset.Charset
import com.shorrockin.cascal.model.{Column, Keyspace}
import java.util.{Date, UUID => JavaUUID}
import com.shorrockin.cascal.serialization._
+import java.nio.ByteBuffer
/**
* some implicits to assist with common conversions
@@ -13,36 +14,36 @@ object Conversions {
implicit def keyspace(str:String) = new Keyspace(str)
- implicit def bytes(date:Date):Array[Byte] = DateSerializer.toBytes(date)
- implicit def date(bytes:Array[Byte]):Date = DateSerializer.fromBytes(bytes)
+ implicit def bytes(date:Date):ByteBuffer = DateSerializer.toBytes(date)
+ implicit def date(bytes:ByteBuffer):Date = DateSerializer.fromBytes(bytes)
implicit def string(date:Date):String = DateSerializer.toString(date)
- implicit def bytes(b:Boolean):Array[Byte] = BooleanSerializer.toBytes(b)
- implicit def boolean(bytes:Array[Byte]):Boolean = BooleanSerializer.fromBytes(bytes)
+ implicit def bytes(b:Boolean):ByteBuffer = BooleanSerializer.toBytes(b)
+ implicit def boolean(bytes:ByteBuffer):Boolean = BooleanSerializer.fromBytes(bytes)
implicit def string(b:Boolean):String = BooleanSerializer.toString(b)
- implicit def bytes(b:Float):Array[Byte] = FloatSerializer.toBytes(b)
- implicit def float(bytes:Array[Byte]):Float = FloatSerializer.fromBytes(bytes)
+ implicit def bytes(b:Float):ByteBuffer = FloatSerializer.toBytes(b)
+ implicit def float(bytes:ByteBuffer):Float = FloatSerializer.fromBytes(bytes)
implicit def string(b:Float):String = FloatSerializer.toString(b)
- implicit def bytes(b:Double):Array[Byte] = DoubleSerializer.toBytes(b)
- implicit def double(bytes:Array[Byte]):Double = DoubleSerializer.fromBytes(bytes)
+ implicit def bytes(b:Double):ByteBuffer = DoubleSerializer.toBytes(b)
+ implicit def double(bytes:ByteBuffer):Double = DoubleSerializer.fromBytes(bytes)
implicit def string(b:Double):String = DoubleSerializer.toString(b)
- implicit def bytes(l:Long):Array[Byte] = LongSerializer.toBytes(l)
- implicit def long(bytes:Array[Byte]):Long = LongSerializer.fromBytes(bytes)
+ implicit def bytes(l:Long):ByteBuffer = LongSerializer.toBytes(l)
+ implicit def long(bytes:ByteBuffer):Long = LongSerializer.fromBytes(bytes)
implicit def string(l:Long):String = LongSerializer.toString(l)
- implicit def bytes(i:Int):Array[Byte] = IntSerializer.toBytes(i)
- implicit def int(bytes:Array[Byte]):Int = IntSerializer.fromBytes(bytes)
+ implicit def bytes(i:Int):ByteBuffer = IntSerializer.toBytes(i)
+ implicit def int(bytes:ByteBuffer):Int = IntSerializer.fromBytes(bytes)
implicit def string(i:Int) = IntSerializer.toString(i)
- implicit def bytes(str:String):Array[Byte] = StringSerializer.toBytes(str)
- implicit def string(bytes:Array[Byte]):String = StringSerializer.fromBytes(bytes)
+ implicit def bytes(str:String):ByteBuffer = StringSerializer.toBytes(str)
+ implicit def string(bytes:ByteBuffer):String = StringSerializer.fromBytes(bytes)
implicit def string(source:JavaUUID) = UUIDSerializer.toString(source)
implicit def uuid(source:String) = UUIDSerializer.fromString(source)
- implicit def bytes(source:JavaUUID):Array[Byte] = UUIDSerializer.toBytes(source)
+ implicit def bytes(source:JavaUUID):ByteBuffer = UUIDSerializer.toBytes(source)
implicit def string(col:Column[_]):String = {
"%s -> %s (time: %s)".format(Conversions.string(col.name),
@@ -52,4 +53,4 @@ object Conversions {
implicit def toSeqBytes(values:Seq[String]) = values.map { (s) => Conversions.bytes(s) }
implicit def toJavaList[T](l: Seq[T]):java.util.List[T] = l.foldLeft(new java.util.ArrayList[T](l.size)){(al, e) => al.add(e); al}
-}
+}
Please sign in to comment.
Something went wrong with that request. Please try again.