Permalink
Browse files

wip

  • Loading branch information...
1 parent 76bba8a commit 60c8b206f344d18e431166b8886520f67aee4f57 Brian Guarraci committed Oct 27, 2012
@@ -4,11 +4,12 @@ import _root_.io.viper.core.server.router._
import common._
import ext.storage.H2IndexStorage
import io.viper.common.{NestServer, RestServer}
-import org.jboss.netty.handler.codec.http.HttpResponseStatus
+import org.jboss.netty.handler.codec.http._
import org.json.{JSONArray, JSONObject}
import net.sf.jsqlparser.JSQLParserException
import collection.immutable._
import sql.QueryInfo
+import org.jboss.netty.buffer.ChannelBuffers
class Node(val localNode : IndexStorage, val projections: ProjectionsConfig) {
@@ -49,6 +50,24 @@ class Node(val localNode : IndexStorage, val projections: ProjectionsConfig) {
}
}
+ def applyJsonSelectItems(selectedItems: List[String], records: JSONArray) : JSONArray = {
+ if (selectedItems == null || selectedItems.size == 0 || selectedItems(0).equals("*")) return records
+
+ /*
+ val selSet = selectedItems.map(_.toUpperCase).toSet
+ (0 until records.length()).flatMap { record =>
+ if (record.colMap == null || record.colMap.keySet.intersect(selSet).size > 0) {
+ val dst = new JSONObject()
+ selectedItems.foreach(rawPath => copyJsonObjectPath(record.rawData, dst, rawPath.split("__").toList))
+ if (dst.length > 0) { List(new Record(record.id, null, dst)) } else { Nil }
+ } else {
+ Nil
+ }
+ }
+ */
+ records
+ }
+
def insert(projectionName: String, record: JSONObject) : String = {
val projection = projections.getProjection(projectionName)
if (projection == null) throw new IllegalArgumentException("using unregistered projection: " + projectionName)
@@ -131,6 +150,30 @@ class Node(val localNode : IndexStorage, val projections: ProjectionsConfig) {
applySelectItems(queryInfo.selectedItems, results)
}
+ private def doJsonQuery(queryInfo: QueryInfo, projection: Projection, nodeMap: Map[IndexStorage, Set[Int]], nodeIds: Set[Int]) : String = {
+ val results = projection.getNodeIndexStorage(nodeIds.toList(0)).jsonQuery(projection, nodeIds, queryInfo.nodeSql)
+/*
+ val results = if (nodeMap.keySet.size == 1 && queryInfo.nodeSql.equals(queryInfo.finalSql)) {
+ projection.getNodeIndexStorage(nodeIds.toList(0)).jsonQuery(projection, nodeIds, queryInfo.nodeSql)
+ } else {
+ val qr = nodeMap.keySet.par.flatMap(node => node.query(projection, nodeMap.get(node).get, queryInfo.nodeSql)).toList
+ if (queryInfo.postSqlRequired) {
+ val mergeDb = H2IndexStorage.createInMemoryDb
+ try {
+ mergeDb.addAll(projection, null, qr)
+ mergeDb.query(projection, nodeIds, queryInfo.finalSql)
+ } finally {
+ mergeDb.shutdown()
+ }
+ } else {
+ qr
+ }
+ }
+*/
+
+ results
+ }
+
private def getRequestedProjection(args: java.util.Map[String, String], defaultProjName: String) : Projection = {
if (args.containsKey("projection") && (projections.hasProjection(args.get("projection")))) {
projections.getProjection(args.get("projection"))
@@ -233,17 +276,22 @@ object Node {
val nodeIds = JsonUtil.intSetFromJsonArray(args.get("nodeIds"))
val nodeMap = Map(node.localNode -> nodeIds)
val queryInfo = new QueryInfo(projection.name, nodeSql, nodeSql, List(), Map(), false)
- node.doQuery(queryInfo, projection, nodeMap, nodeIds)
+ node.doJsonQuery(queryInfo, projection, nodeMap, nodeIds)
} else {
val queryInfo = QueryInfo.create(args.get("sql"))
val projection = node.getRequestedProjection(args, queryInfo.projectionName)
if (projection == null) return new StatusResponse(HttpResponseStatus.BAD_REQUEST)
val nodeIds = projection.getNodeIds(queryInfo.whereItems)
val nodeMap = projection.getNodeStores(nodeIds)
- node.doQuery(queryInfo, projection, nodeMap, nodeIds)
+ node.doJsonQuery(queryInfo, projection, nodeMap, nodeIds)
}
- JsonUtil.jsonResponse("records", JsonUtil.toJsonArray(results, { record: Record => record.rawData }))
+ val time = System.currentTimeMillis()
+ val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
+ response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "application/javascript; charset=UTF-8")
+ response.setContent(ChannelBuffers.wrappedBuffer(results.getBytes("UTF-8")))
+ println("encoding time: %d".format(System.currentTimeMillis() - time))
+ new RouteResponse(response)
} catch {
case e: JSQLParserException => return new StatusResponse(HttpResponseStatus.BAD_REQUEST)
}
@@ -1,11 +1,13 @@
package io.stored.server.common
+
trait IndexStorage {
def init()
def shutdown()
def purge()
+ def jsonQuery(projection: Projection, nodeIds: Set[Int], sql: String) : String
def query(projection: Projection, nodeIds: Set[Int], sql: String) : List[Record]
def add(projection: Projection, nodeIdMap: Map[Int, Set[String]], record: Record) : String = {
@@ -2,14 +2,14 @@ package io.stored.server.ext.storage
import org.apache.log4j.Logger
import org.h2.jdbcx.JdbcConnectionPool
-import org.json.JSONException
-import org.json.JSONObject
+import org.json.{JSONArray, JSONException, JSONObject}
import java.io.File
import java.sql._
import io.stored.common.SqlUtil
import collection.mutable.{ListBuffer, SynchronizedSet, HashSet}
import io.stored.server.common.{Projection, Record, IndexStorage}
import java.util.UUID
+import collection.mutable
object H2IndexStorage {
@@ -312,7 +312,7 @@ class H2IndexStorage(configRoot: String) extends IndexStorage {
var db: Connection = null
var statement: PreparedStatement = null
try {
- db = getReadOnlyDbConnection
+ db = getDbConnection
statement = db.prepareStatement(sql)
val rs: ResultSet = statement.executeQuery
while (rs.next) results.append(new Record(rs.getString("HASH"), null, new JSONObject(rs.getString("RAWDATA"))))
@@ -326,4 +326,39 @@ class H2IndexStorage(configRoot: String) extends IndexStorage {
results.toList
}
+
+ def jsonQuery(projection: Projection, nodeIds: Set[Int], sql: String): String = {
+ val results = new mutable.StringBuilder
+
+ val time = System.currentTimeMillis()
+ var db: Connection = null
+ var statement: PreparedStatement = null
+ try {
+ db = getDbConnection
+
+// createIndex(db, _tableName, "CLOSE")
+
+ statement = db.prepareStatement(sql)
+ val rs: ResultSet = statement.executeQuery
+ println("query took: %d".format(System.currentTimeMillis() - time))
+ results.append("{\"results\":[");
+ while (rs.next) {
+ results.append(rs.getString("RAWDATA"))
+ if (!rs.isLast()) {
+ results.append(",")
+ }
+ }
+ results.append("]}")
+ } catch {
+ case e: JSONException => H2IndexStorage.log.error(e)
+ case e: SQLException => H2IndexStorage.log.error(e)
+ } finally {
+ SqlUtil.SafeClose(statement)
+ SqlUtil.SafeClose(db)
+ }
+
+ println("took: %d".format(System.currentTimeMillis() - time))
+
+ results.toString
+ }
}
@@ -2,6 +2,7 @@ package io.stored.server.ext.storage
import io.stored.server.common.{Record, Projection, IndexStorage}
import collection.mutable.{SynchronizedMap, HashMap}
+import org.json.JSONArray
/***
* This is an experimental IndexStorage that creates a new DB for each projection.
@@ -34,6 +35,10 @@ class H2ProjectionStorageRouter(configRoot: String) extends IndexStorage {
getProjStore(projection.name).query(projection, nodeIds, sql)
}
+ def jsonQuery(projection: Projection, nodeIds: Set[Int], sql: String) : String = {
+ getProjStore(projection.name).jsonQuery(projection, nodeIds, sql)
+ }
+
def addAll(projection: Projection, nodeIdMap: Map[Int, Set[String]], recordMap: Map[String, Record]): List[String] = {
getProjStore(projection.name).addAll(projection, nodeIdMap, recordMap)
}
@@ -47,6 +47,10 @@ class HttpIndexStorage(host: String) extends IndexStorage
}
}
+ def jsonQuery(projection: Projection, nodeIds: Set[Int], sql: String) : String = {
+ ""
+ }
+
def add(projection: Projection, nodeIds: Set[Int], record: Record) : String = {
val response = asyncHttpClient
.preparePost("%s/records".format(host))
@@ -9,6 +9,7 @@ import java.util.Arrays
import collection.mutable.HashMap
import net.sf.jsqlparser.statement.select._
import net.sf.jsqlparser.schema.{Table, Column}
+import java.util
class SqlRequestProcessor extends SelectVisitor with ExpressionVisitor
@@ -83,8 +84,23 @@ class SqlRequestProcessor extends SelectVisitor with ExpressionVisitor
if (!(sqlSelectItems.size() == 1 && sqlSelectItems.get(0).equals("*")))
{
(0 until sqlSelectItems.size()).foreach(i => sqlSelectItems.get(i).asInstanceOf[SelectItem].accept(selectItemExtractor))
- // Node SQL requires *
- plainSelect.setSelectItems(Arrays.asList("*"))
+
+ // if we are doing a non-distinct call, then overwrite the select items and filter on the collect phase
+ if (plainSelect.getDistinct == null) {
+ // Node SQL requires *
+ plainSelect.setSelectItems(Arrays.asList("*"))
+ } else {
+ val newSelectItems = new java.util.ArrayList[String]()
+ import collection.JavaConversions._
+
+ sqlSelectItems.foreach{item => {
+ newSelectItems.add(item.toString)
+ }}
+
+ if (!newSelectItems.contains("HASH")) newSelectItems.add("HASH")
+ if (!newSelectItems.contains("RAWDATA")) newSelectItems.add("RAWDATA")
+ plainSelect.setSelectItems(newSelectItems)
+ }
}
selectItems = selectItemExtractor.selectItems.toList

0 comments on commit 60c8b20

Please sign in to comment.