Permalink
Browse files

update

  • Loading branch information...
1 parent 82325ba commit ffd1f9fc72d0356eaba651ff996c9f2dd3303d63 @briangu committed May 22, 2012
@@ -1,10 +1,9 @@
package io.stored.server
import _root_.io.viper.core.server.router._
-import common.{Schema, IndexStorage, Record}
+import common.{ProjectionField, Projection, IndexStorage, Record}
import io.viper.common.{NestServer, RestServer}
import org.jboss.netty.handler.codec.http.HttpResponseStatus
-import java.security.MessageDigest
import io.stored.common.FileUtil
import org.json.{JSONArray, JSONObject}
import io.stored.server.ext.storage.H2IndexStorage
@@ -17,7 +16,11 @@ import java.util.Arrays
import collection.mutable.{SynchronizedBuffer, ArrayBuffer, ListBuffer, SynchronizedMap}
+<<<<<<< Updated upstream
class Node(val localhost: String, val ids: Set[Int], val schema: Schema) {
+=======
+class Node(val localhost: String, val allNodeIds: Set[Int], val ids: Set[Int], val projection: Projection) {
+>>>>>>> Stashed changes
val storage = new collection.mutable.HashMap[Int, IndexStorage] with SynchronizedMap[Int, IndexStorage]
@@ -27,6 +30,7 @@ object Node {
var node: Node = null
+<<<<<<< Updated upstream
def getNodeId(hashCoords: Map[String, Array[Byte]], schema: Schema) : Int = {
0
}
@@ -38,21 +42,61 @@ object Node {
def getTargetNodeIds(sql: String) : Set[Int] = {
Set(0)
+=======
+ def getNodeIds(projection: Projection, hashCoords: Map[String, List[BigInt]]) : Set[Int] = {
+ if (hashCoords == null || hashCoords.size == 0) return node.allNodeIds
+
+ val intersection = hashCoords.keySet.intersect(projection.getFields)
+ if (intersection.size == 0) return node.allNodeIds
+
+ val nodeIds = new collection.mutable.HashSet[Int]
+
+ var nodeId = BigInt.apply(0)
+ projection.getFields.foreach{ key =>
+ if (!hashCoords.contains(key)) throw new RuntimeException("missing expected projection field: " + key)
+ val sf = projection.getFieldValue(key).get
+ nodeId = nodeId << sf.bitWeight
+ nodeId = nodeId | (hashCoords.get(key).get(0) & (math.pow(2, sf.bitWeight).toInt-1))
+ }
+
+ nodeIds.add((nodeId & (math.pow(2, projection.dimensions).toInt-1)).toInt)
+
+ nodeIds.toSet
+>>>>>>> Stashed changes
}
- def getTargetHosts(nodeIds: Set[Int]) : Map[String, Set[Int]] = {
+ def getNodeHosts(nodeIds: Set[Int]) : Map[String, Set[Int]] = {
Map((node.localhost, nodeIds))
}
- def hashSchemaFields(data: Map[String, AnyRef], schema: Schema) : Map[String, Array[Byte]] = {
- null
+ // TODO: support multiple values for data map
+ def hashProjectionFields(projection: Projection, data: Map[String, AnyRef]) : Map[String, List[BigInt]] = {
+ val result = new collection.mutable.HashMap[String, List[BigInt]] with SynchronizedMap[String, List[BigInt]]
+ projection.getFields.par.foreach{ key =>
+ if (data.contains(key)) {
+ val dataVal = data.get(key)
+
+ if (dataVal.isInstanceOf[Long]) {
+ result.put(key, List(ProjectionField.md5Hash(dataVal.asInstanceOf[Long])))
+ } else if (dataVal.isInstanceOf[String]) {
+ result.put(key, List(ProjectionField.md5Hash(dataVal.asInstanceOf[Long])))
+ } else if (dataVal.isInstanceOf[Double]) {
+ result.put(key, List(ProjectionField.md5Hash(dataVal.asInstanceOf[Long])))
+ } else {
+ throw new RuntimeException("unknown field type: " + dataVal.getClass)
+ }
+ }
+ }
+ result.toMap
}
- def indexRecord(nodeId: Int, record: Record) {
- if (node.ids.contains(nodeId)) {
- node.storage.get(nodeId).get.add(record)
- } else {
- indexRecord(findNodeHost(nodeId), record)
+ def indexRecord(nodeIds: Set[Int], record: Record) {
+ nodeIds.par.foreach{ nodeId =>
+ if (node.ids.contains(nodeId)) {
+ node.storage.get(nodeId).get.add(record)
+ } else {
+ indexRecord(findNodeHost(nodeId), record)
+ }
}
}
@@ -73,6 +117,7 @@ object Node {
ids.toSet
}
+<<<<<<< Updated upstream
def md5Hash(Record: Array[Byte]) : BigInt = {
val m = MessageDigest.getInstance("MD5");
m.reset();
@@ -108,6 +153,17 @@ object Node {
def initialize(localhost: String, schemaFile: String, storagePath: String) {
val schema = Schema.create(FileUtil.readJson(schemaFile))
node = new Node(localhost, determineNodeIds(schema.dimensions), schema)
+=======
+ def determineAllNodeIds(dimensions: Int) : Set[Int] = {
+ val ids = new collection.mutable.HashSet[Int]
+ for (x <- 0 until math.pow(2, dimensions).toInt) ids.add(x)
+ ids.toSet
+ }
+
+ def initialize(localhost: String, schemaFile: String, storagePath: String) {
+ val schema = Projection.create(FileUtil.readJson(schemaFile))
+ node = new Node(localhost, determineAllNodeIds(schema.dimensions), determineNodeIds(schema.dimensions), schema)
+>>>>>>> Stashed changes
val storage = H2IndexStorage.create(storagePath)
node.ids.par.foreach(id => node.storage.put(id, storage))
}
@@ -122,7 +178,11 @@ object Node {
resultMap.toMap
}
+<<<<<<< Updated upstream
def testjsql {
+=======
+ def processSqlRequest(sql: String) : (String, List[String], Map[String, List[BigInt]]) = {
+>>>>>>> Stashed changes
val pm = new CCJSqlParserManager();
var sql = "SELECT * FROM MY_TABLE1, MY_TABLE2, (SELECT * FROM MY_TABLE3) LEFT OUTER JOIN MY_TABLE4 WHERE ID = (SELECT MAX(ID) FROM MY_TABLE5) AND ID2 IN (SELECT * FROM MY_TABLE6)" ;
@@ -193,8 +253,10 @@ object Node {
def exec(args: java.util.Map[String, String]): RouteResponse = {
if (!args.containsKey("record")) return new StatusResponse(HttpResponseStatus.BAD_REQUEST)
val record = Record.create(args.get("record"))
- val nodeId = getTargetNodeId(record.colMap, node.schema)
- indexRecord(nodeId, record)
+ if (!record.colMap.keySet.equals(node.projection.getFields)) return new StatusResponse(HttpResponseStatus.BAD_REQUEST)
+ val hashCoords = hashProjectionFields(node.projection, record.colMap)
+ val nodeIds = getNodeIds(node.projection, hashCoords)
+ indexRecord(nodeIds, record)
val response = new JSONObject();
response.put("id", record.id)
new JsonResponse(response)
@@ -206,8 +268,15 @@ object Node {
if (!args.containsKey("sql")) return new StatusResponse(HttpResponseStatus.BAD_REQUEST)
val sql = args.get("sql")
+<<<<<<< Updated upstream
val nodeIds = getTargetNodeIds(sql)
val hostsMap = getTargetHosts(nodeIds)
+=======
+ val (nodeSql, selectedItems, whereItems) = processSqlRequest(sql)
+
+ val nodeIds = getNodeIds(node.projection, whereItems)
+ val hostsMap = getNodeHosts(nodeIds)
+>>>>>>> Stashed changes
// TODO: rewrite sql for node queries, as it's almost certain what we want to sub query is not the same
//val resultMap = new HashMap[String, Map[Int, List[Record]]] with SynchronizedMap[String, Map[Int, List[Record]]]
@@ -0,0 +1,30 @@
+package io.stored.server.common
+
+import org.json.JSONObject
+import collection.mutable.LinkedHashMap
+
+
+class Projection(val dimensions: Int, fields: collection.mutable.LinkedHashMap[String, ProjectionField]) {
+
+ def getFields = fields.keySet
+ def getFieldValue(key: String) = fields.get(key)
+
+}
+
+object Projection {
+ def create(obj: JSONObject) : Projection = {
+
+ val dimensions = obj.getInt("dimensions")
+ val fields = obj.getJSONObject("fields")
+
+ val fieldMap = new LinkedHashMap[String, ProjectionField]
+
+ val iter = fields.keys()
+ while (iter.hasNext) {
+ val key = iter.next().asInstanceOf[String];
+ fieldMap.put(key, ProjectionField.create(key, fields.get(key)))
+ }
+
+ new Projection(dimensions, fieldMap)
+ }
+}
@@ -0,0 +1,33 @@
+package io.stored.server.common
+
+import util.parsing.json.JSONObject
+import java.security.MessageDigest
+
+class ProjectionField(val name: String, val bitWeight: Int) {
+
+}
+
+object ProjectionField {
+
+ def md5Hash(v: Double) : BigInt = md5Hash(BigInt.apply(v.toLong).toByteArray)
+ def md5Hash(v: Long) : BigInt = md5Hash(BigInt.apply(v).toByteArray)
+ def md5Hash(str: String) : BigInt = md5Hash(str.getBytes("UTF-8"))
+
+ def md5Hash(record: Array[Byte]) : BigInt = {
+ val m = MessageDigest.getInstance("MD5");
+ m.reset();
+ m.update(record);
+ val digest = m.digest();
+ BigInt.apply(digest)
+ }
+
+ def create(name: String, obj: Object) : ProjectionField = {
+ if (obj.isInstanceOf[Int]) {
+ new ProjectionField(name, obj.asInstanceOf[Int])
+ } else if (obj.isInstanceOf[JSONObject]) {
+ throw new RuntimeException("obj type not supported")
+ } else {
+ throw new RuntimeException("obj type not supported")
+ }
+ }
+}
@@ -1,27 +0,0 @@
-package io.stored.server.common
-
-import org.json.JSONObject
-import collection.mutable.HashMap
-
-
-class Schema(val dimensions: Int, val fields: Map[String, SchemaField]) {
-
-}
-
-object Schema {
- def create(obj: JSONObject) : Schema = {
-
- val dimensions = obj.getInt("dimensions")
- val fields = obj.getJSONObject("fields")
-
- val fieldMap = new HashMap[String, SchemaField]
-
- val iter = fields.keys()
- while (iter.hasNext) {
- val key = iter.next().asInstanceOf[String];
- fieldMap.put(key, SchemaField.create(key, fields.get(key)))
- }
-
- new Schema(dimensions, fieldMap.toMap)
- }
-}
@@ -1,19 +0,0 @@
-package io.stored.server.common
-
-import util.parsing.json.JSONObject
-
-class SchemaField(name: String, bitWeight: Int) {
-
-}
-
-object SchemaField {
- def create(name: String, obj: Object) : SchemaField = {
- if (obj.isInstanceOf[Int]) {
- new SchemaField(name, obj.asInstanceOf[Int])
- } else if (obj.isInstanceOf[JSONObject]) {
- throw new RuntimeException("obj type not supported")
- } else {
- throw new RuntimeException("obj type not supported")
- }
- }
-}
Oops, something went wrong.

0 comments on commit ffd1f9f

Please sign in to comment.