Skip to content
This repository has been archived by the owner on Oct 30, 2018. It is now read-only.

Commit

Permalink
* Added bulk get / fetch capabilities to the caching interface
Browse files Browse the repository at this point in the history
* Switched internal representation to use gnu.trove collections, because they have a much smaller memory footprint in bulk.
  • Loading branch information
Chris Bissell committed Sep 14, 2012
1 parent 36c6550 commit 6b1944c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 14 deletions.
16 changes: 15 additions & 1 deletion pom.xml
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.gravity</groupId>
<artifactId>gravity-hpaste</artifactId>
<version>0.1.13-SNAPSHOT</version>
<version>0.1.24-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hpaste</name>
<url>http://github.com/GravityLabs/HPaste</url>
Expand Down Expand Up @@ -77,11 +77,20 @@
<artifactId>joda-time</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.90.4</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Expand All @@ -96,6 +105,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.trove4j</groupId>
<artifactId>trove4j</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
Expand Down
15 changes: 8 additions & 7 deletions src/main/scala/com/gravity/hbase/schema/DeserializedResult.scala
Expand Up @@ -22,9 +22,9 @@ case class DeserializedResult(rowid: AnyRef, famCount: Int) {
def familyValueMap[K, V](fam: ColumnFamily[_, _, _, _, _]) = {
val famMap = family(fam)
if (famMap != null) {
famMap.asInstanceOf[java.util.HashMap[K, V]]
famMap.asInstanceOf[java.util.Map[K, V]]
} else {
new java.util.HashMap[K, V]()
new gnu.trove.map.hash.THashMap[K, V]()
}
}

Expand All @@ -33,7 +33,7 @@ case class DeserializedResult(rowid: AnyRef, famCount: Int) {
if (famMap != null) {
famMap.keySet.asInstanceOf[java.util.Set[K]]
} else {
new java.util.HashSet[K]()
new gnu.trove.set.hash.THashSet[K]()
}
}

Expand Down Expand Up @@ -101,9 +101,10 @@ case class DeserializedResult(rowid: AnyRef, famCount: Int) {
}


var values = new Array[java.util.HashMap[AnyRef, AnyRef]](famCount)
var values = new Array[java.util.Map[AnyRef, AnyRef]](famCount)

private val timestampLookaside = new Array[gnu.trove.map.TObjectLongMap[AnyRef]](famCount)

var timestampLookaside = new Array[java.util.HashMap[AnyRef, Long]](famCount)


/**This is a map whose key is the family type, and whose values are maps of column keys to columnvalues paired with their timestamps */
Expand All @@ -114,14 +115,14 @@ case class DeserializedResult(rowid: AnyRef, famCount: Int) {
def add(family: ColumnFamily[_, _, _, _, _], qualifier: AnyRef, value: AnyRef, timeStamp: Long) {
var map = values(family.index)
if (map == null) {
map = new java.util.HashMap[AnyRef, AnyRef]()
map = new gnu.trove.map.hash.THashMap[AnyRef, AnyRef]()
values(family.index) = map
}
map.put(qualifier, value)

var tsMap = timestampLookaside(family.index)
if (tsMap == null) {
tsMap = new java.util.HashMap[AnyRef, Long]()
tsMap = new gnu.trove.map.hash.TObjectLongHashMap[AnyRef]()
timestampLookaside(family.index) = tsMap
}
tsMap.put(qualifier, timeStamp)
Expand Down
15 changes: 13 additions & 2 deletions src/main/scala/com/gravity/hbase/schema/Query.scala
Expand Up @@ -207,7 +207,9 @@ class Query[T <: HbaseTable[T, R,RR], R, RR <: HRow[T,R]](table: HbaseTable[T, R
resultMap // DONE!
}

private def buildGetsAndCheckCache(skipCache: Boolean)(receiveGetAndKey: (Get, Array[Byte]) => Unit = (get, key) => {})(receiveCachedResult: (Option[RR], Get) => Unit = (qr, get) => {}): Seq[Get] = {
private def buildGetsAndCheckCache(skipCache: Boolean)
(receiveGetAndKey: (Get, Array[Byte]) => Unit = (get, key) => {})
(receiveCachedResult: (Option[RR], Get) => Unit = (qr, get) => {}): Seq[Get] = {
if (keys.isEmpty) return Seq.empty[Get] // no keys..? nothing to see here... move along... move along.

val gets = Buffer[Get]() // buffer for the raw `Get's
Expand Down Expand Up @@ -244,9 +246,18 @@ class Query[T <: HbaseTable[T, R,RR], R, RR <: HRow[T,R]](table: HbaseTable[T, R
}

// try the cache with this filled in get
if (!skipCache) receiveCachedResult(table.cache.getResult(get), get)
// if (!skipCache) receiveCachedResult(table.cache.getResult(get), get)
}

if(!skipCache) {
val cacheResults = table.cache.getResults(gets)

cacheResults.foreach {case (get, rowOpt) =>
receiveCachedResult(rowOpt,get)
}
}


gets
}

Expand Down
9 changes: 8 additions & 1 deletion src/main/scala/com/gravity/hbase/schema/Query2.scala
Expand Up @@ -698,9 +698,16 @@ class Query2[T <: HbaseTable[T, R, RR], R, RR <: HRow[T, R]] private(
}

// try the cache with this filled in get
if (!skipCache) receiveCachedResult(table.cache.getResult(get), get)
// if (!skipCache) receiveCachedResult(table.cache.getResult(get), get)
}

if(!skipCache) {
val cacheResults = table.cache.getResults(gets)

cacheResults.foreach {case (get, rowOpt) =>
receiveCachedResult(rowOpt,get)
}
}
gets
}

Expand Down
8 changes: 5 additions & 3 deletions src/main/scala/com/gravity/hbase/schema/ResultCaching.scala
Expand Up @@ -41,6 +41,8 @@ trait QueryResultCache[T <: HbaseTable[T, R, RR], R, RR <: HRow[T,R]] {

def getResult(key: Get): Option[RR]

def getResults(keys:Iterable[Get]) : Map[Get,Option[RR]]

def putResult(key: Get, value: RR, ttl: Int)
}

Expand All @@ -56,8 +58,8 @@ class NoOpCache[T <: HbaseTable[T, R,RR], R, RR <: HRow[T,R]] extends QueryResul

override def putScanResult(key: Scan, value: Seq[RR], ttl: Int) {}

override def getResult(key: Get): Option[RR] = None


override def putResult(key: Get, value: RR, ttl: Int) {}

override def getResults(keys:Iterable[Get]) = Map[Get,Option[RR]]()
override def getResult(key:Get) = None
}

0 comments on commit 6b1944c

Please sign in to comment.