Skip to content

Commit

Permalink
cypher performance - mutable map creation limited to a single place
Browse files Browse the repository at this point in the history
  • Loading branch information
jexp committed May 9, 2012
1 parent c32b45b commit 6afc3b7
Show file tree
Hide file tree
Showing 21 changed files with 84 additions and 42 deletions.
Expand Up @@ -44,11 +44,11 @@ case class AllIdentifiers() extends ReturnColumn {
case class ReturnItem(expression: Expression, name: String, renamed: Boolean = false) extends ReturnColumn {
def expressions(symbols: SymbolTable) = Seq(expression)

def dependencies = expression.dependencies(AnyType())
val dependencies = expression.dependencies(AnyType())

def identifier = Identifier(name, expression.identifier.typ)
val identifier = Identifier(name, expression.identifier.typ)

def columnName = identifier.name
val columnName = identifier.name

override def toString() = identifier.name

Expand Down
Expand Up @@ -61,7 +61,7 @@ case class CreateNodeStartItem(key: String, props: Map[String, Expression])
val node = db.createNode()
state.createdNodes.increase()
setProps(node, m, context, state)
context.copy(m = context.m ++ Map(key -> node))
context.newWith( key -> node )
})
} else {
val node = db.createNode()
Expand Down
Expand Up @@ -91,7 +91,7 @@ class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends

private def getLazyReadonlyQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = {
val func = (params: Map[String, Any]) => {
val state = new QueryState(graph, MutableMap() ++ params)
val state = new QueryState(graph, MutableMaps.create ++ params)
new PipeExecutionResult(pipe.createResults(state), pipe.symbols, columns)
}

Expand All @@ -100,7 +100,7 @@ class ExecutionPlanImpl(inputQuery: Query, graph: GraphDatabaseService) extends

private def getEagerReadWriteQuery(pipe: Pipe, columns: List[String]): Map[String, Any] => ExecutionResult = {
val func = (params: Map[String, Any]) => {
val state = new QueryState(graph, MutableMap() ++ params)
val state = new QueryState(graph, MutableMaps.create ++ params)
new EagerPipeExecutionResult(pipe.createResults(state), pipe.symbols, columns, state)
}

Expand Down
Expand Up @@ -35,8 +35,7 @@ class AllShortestPathsPipe(source: Pipe, ast: ShortestPath) extends ShortestPath
case (List(), true) => Seq(ctx += pathName -> null)
case (List(), false) => Seq()
case (paths, _) => paths.map(path => {
val result = ctx.m.clone()
ctx.copy(m = result += pathName -> path)
ctx.newWith(pathName -> path)
})
}
}
Expand Down
Expand Up @@ -19,9 +19,9 @@
*/
package org.neo4j.cypher.internal.pipes

import collection.mutable.Map
import org.neo4j.cypher.internal.commands.{ParameterValue, ReturnItem}
import org.neo4j.cypher.internal.symbols.{Identifier, SymbolTable}
import scala.collection.JavaConverters._

class ColumnFilterPipe(source: Pipe, val returnItems: Seq[ReturnItem], lastPipe: Boolean)
extends PipeWithSource(source) {
Expand All @@ -37,20 +37,17 @@ class ColumnFilterPipe(source: Pipe, val returnItems: Seq[ReturnItem], lastPipe:

def createResults(state: QueryState) = {
source.createResults(state).map(ctx => {
val newMap = Map[String, Any]()
val newMap = MutableMaps.create(ctx.size)

ctx.foreach {
case (k, p) => if (p.isInstanceOf[ParameterValue] && !lastPipe) {
newMap.put(k, p)
} else {
val ri = returnItems.find(_.expression.identifier.name == k)
if (ri.nonEmpty) {
newMap.put(ri.get.columnName, p)
}
returnItems.foreach( ri => if (ri.expression.identifier.name == k) { newMap.put(ri.columnName, p) } )
}
}

ctx.copy(m = newMap)
ctx.newFrom( newMap )
})
}

Expand Down
Expand Up @@ -22,7 +22,7 @@ package org.neo4j.cypher.internal.pipes
import aggregation.AggregationFunction
import org.neo4j.cypher.internal.symbols.{AnyType, Identifier, SymbolTable}
import org.neo4j.cypher.internal.commands.{Expression, AggregationExpression}
import collection.mutable.Map
import collection.mutable.{Map => MutableMap}

// Eager aggregation means that this pipe will eagerly load the whole resulting sub graphs before starting
// to emit aggregated results.
Expand All @@ -42,7 +42,7 @@ class EagerAggregationPipe(source: Pipe, val keyExpressions: Seq[Expression], ag

def createResults(state: QueryState): Traversable[ExecutionContext] = {
// This is the temporary storage used while the aggregation is going on
val result = Map[NiceHasher, (ExecutionContext,Seq[AggregationFunction])]()
val result = MutableMap[NiceHasher, (ExecutionContext,Seq[AggregationFunction])]()
val keyNames = keyExpressions.map(_.identifier.name)
val aggregationNames = aggregations.map(_.identifier.name)

Expand All @@ -54,15 +54,16 @@ class EagerAggregationPipe(source: Pipe, val keyExpressions: Seq[Expression], ag

result.map {
case (key, (ctx,aggregator)) => {
val newMap = Map[String,Any]()

val newMap = MutableMaps.create

//add key values
keyNames.zip(key.original).foreach( newMap += _)

//add aggregated values
aggregationNames.zip(aggregator.map(_.result)).foreach( newMap += _ )

ctx.copy(m=newMap)
ctx.newFrom(newMap)
}
}
}
Expand Down
Expand Up @@ -29,7 +29,7 @@ class MatchPipe(source: Pipe, patterns: Seq[Pattern], predicates: Seq[Predicate]

def createResults(state: QueryState) =
source.createResults(state).flatMap(ctx => {
matchingContext.getMatches(ctx.toMap).map(pm => ctx.copy(m = ctx.m ++ pm) )
matchingContext.getMatches(ctx.toMap).map(pm => ctx.newWith( pm ) )
})

override def executionPlan(): String = source.executionPlan() + "\r\nPatternMatch(" + patterns.mkString(",") + ")"
Expand Down
Expand Up @@ -60,7 +60,15 @@ private class OrderedAggregator(source: Traversable[ExecutionContext],
val aggregateColumns = aggregations.map(_.identifier.name)

def getIntermediateResults[U](ctx: ExecutionContext) = {
ctx.copy(m = (keyColumns.zip(currentKey.get) ++ aggregateColumns.zip(aggregationSpool.map(_.result))).foldLeft(Map[String, Any]())(_ += _))
val newMap = MutableMaps.create

//add key values
keyColumns.zip(currentKey.get).foreach( newMap += _)

//add aggregated values
aggregateColumns.zip(aggregationSpool.map(_.result)).foreach( newMap += _ )

ctx.newFrom(newMap)
}

def foreach[U](f: ExecutionContext => U) {
Expand Down
45 changes: 42 additions & 3 deletions cypher/src/main/scala/org/neo4j/cypher/internal/pipes/Pipe.scala
Expand Up @@ -25,6 +25,8 @@ import collection.Iterator
import org.neo4j.cypher.internal.mutation.UpdateAction
import org.neo4j.graphdb.{GraphDatabaseService, Transaction}
import collection.mutable.{Queue, Map => MutableMap}
import scala.collection.JavaConverters._
import java.util.HashMap

/**
* Pipe is a central part of Cypher. Most pipes are decorators - they
Expand All @@ -48,8 +50,20 @@ class NullPipe extends Pipe {
def executionPlan(): String = ""
}

object MutableMaps {
// def create = new java.util.HashMap[String,Any](100).asScala
// def create = new java.util.HashMap[String,Any](100).asScala
def create = collection.mutable.Map[String,Any]() // new java.util.HashMap[String,Any](100).asScala
def create(size : Int) = new java.util.HashMap[String,Any](size).asScala
def create(input : scala.collection.Map[String,Any]) = new java.util.HashMap[String,Any](input.asJava).asScala
def create(input : Seq[(String,Any)]) = {
val m: HashMap[String, Any] = new java.util.HashMap[String, Any]()
input.foreach( { case (k,v) => m.put(k,v) })
m.asScala
}
}
object QueryState {
def apply() = new QueryState(null, MutableMap())
def apply() = new QueryState(null, MutableMaps.create)
}

class QueryState(val db:GraphDatabaseService,
Expand All @@ -69,16 +83,20 @@ class Counter {
}

object ExecutionContext {
def empty = new ExecutionContext(MutableMap())
def empty = new ExecutionContext()
}

case class ExecutionContext(m: MutableMap[String, Any],
case class ExecutionContext(m: MutableMap[String, Any] = MutableMaps.create,
mutationCommands: Queue[UpdateAction] = Queue[UpdateAction]())
extends MutableMap[String, Any] {
def get(key: String): Option[Any] = m.get(key)

def iterator: Iterator[(String, Any)] = m.iterator

override def size = m.size

override def foreach[U](f: ((String, Any)) => U) { m.foreach(f) }

def +=(kv: (String, Any)) = {
m += kv
this
Expand All @@ -88,4 +106,25 @@ case class ExecutionContext(m: MutableMap[String, Any],
m -= key
this
}
def newWith(newEntries : Seq[(String,Any)]) = {
copy(m = (MutableMaps.create(this.m) ++= newEntries))
}
def newWith(newEntries : scala.collection.Map[String,Any]) = {
copy(m = (MutableMaps.create(this.m) ++= newEntries))
}
def newFrom(newEntries : Seq[(String,Any)]) = {
copy(m = MutableMaps.create(newEntries))
}
def newFrom(newEntries : scala.collection.Map[String,Any]) = {
copy(m = MutableMaps.create(newEntries))
}

def newWith(newEntry : (String,Any)) = {
copy(m = (MutableMaps.create(this.m) += newEntry))
}
/*
def newWith(newEntries : (String,Any)*) = {
copy(m = this.m.clone() ++ newEntries)
}
*/
}
Expand Up @@ -37,8 +37,7 @@ abstract class StartPipe[T <: PropertyContainer](inner: Pipe, name: String, crea
val map = inner.createResults(state).flatMap(ctx => {
val source: Iterable[T] = createSource(ctx)
source.map(x => {
val newMap = ctx.m.clone().asInstanceOf[Map[String, Any]]
ctx.copy(m = newMap += name -> x)
ctx.newWith(name -> x)
})
})
map
Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.neo4j.graphmatching.{PatternMatcher => SimplePatternMatcher, PatternN
import collection.JavaConverters._
import org.neo4j.cypher.internal.commands.{Predicate, True}
import org.neo4j.cypher.internal.symbols.SymbolTable
import org.neo4j.cypher.internal.pipes.MutableMaps

class SimplePatternMatcherBuilder(pattern: PatternGraph, predicates: Seq[Predicate], symbolTable: SymbolTable) extends MatcherBuilder {
val patternNodes = pattern.patternNodes.map {
Expand Down Expand Up @@ -72,7 +73,7 @@ class SimplePatternMatcherBuilder(pattern: PatternGraph, predicates: Seq[Predica

def getMatches(sourceRow: Map[String, Any]) = {
setAssociations(sourceRow)
val result = collection.mutable.Map(sourceRow.toSeq: _*)
val result = MutableMaps.create(sourceRow)
val validPredicates = predicates.filter(p => symbolTable.satisfies(p.dependencies))
val startPoint = patternNodes.values.find(_.getAssociation != null).get
SimplePatternMatcher.getMatcher.`match`(startPoint, startPoint.getAssociation).asScala.map(patternMatch => {
Expand All @@ -84,7 +85,7 @@ class SimplePatternMatcherBuilder(pattern: PatternGraph, predicates: Seq[Predica
}

if (validPredicates.forall(p => p.isMatch(result)))
result.clone()
MutableMaps.create(result)
else
null
}).filter(_ != null)
Expand Down
Expand Up @@ -20,6 +20,6 @@
package org.neo4j.cypher.internal.symbols

object AnyIterableType {
lazy val instance = new IterableType(AnyType())
val instance = new IterableType(AnyType())
def apply() = instance
}
Expand Up @@ -39,7 +39,7 @@ object AnyType {
ScalarType()
}

lazy val instance = new AnyType()
val instance = new AnyType()

def apply() = instance
}
Expand Down
Expand Up @@ -20,7 +20,7 @@
package org.neo4j.cypher.internal.symbols

object BooleanType {
lazy val instance = new BooleanType()
val instance = new BooleanType()

def apply() = instance
}
Expand Down
Expand Up @@ -21,7 +21,7 @@ package org.neo4j.cypher.internal.symbols


object NumberType {
lazy val instance = new NumberType()
val instance = new NumberType()

def apply(): NumberType = instance
}
Expand Down
Expand Up @@ -24,7 +24,7 @@ package org.neo4j.cypher.internal.symbols
*/
object ScalarType
{
lazy val instance = new ScalarType()
val instance = new ScalarType()

def apply() = instance
}
Expand Down
Expand Up @@ -25,7 +25,7 @@ package org.neo4j.cypher.internal.symbols

object StringType
{
lazy val instance = new StringType()
val instance = new StringType()

def apply() = instance
}
Expand Down
Expand Up @@ -1475,7 +1475,7 @@ RETURN x0.name?
relate(b, y, "X", "rBY")

val result = parseAndExecute("""START a=node(1), b=node(2) match a-[r1?]->x<-[r2?]-b return x""")
assert(List(z, x, y) === result.columnAs[Node]("x").toList)
assert(List(x, y, z) === result.columnAs[Node]("x").toList)
}

private def createTriangle(number: Int): (Node, Node, Node) = {
Expand Down
Expand Up @@ -23,14 +23,14 @@ import org.scalatest.Assertions
import org.neo4j.cypher.internal.symbols.{SymbolTable, RelationshipType, NodeType, Identifier}
import collection.mutable.{Map => MutableMap}
import org.neo4j.cypher.internal.executionplan.{ExecutionPlanInProgress, PartiallySolvedQuery}
import org.neo4j.cypher.internal.pipes.{Pipe, NullPipe, FakePipe}
import org.neo4j.cypher.internal.pipes.{MutableMaps, Pipe, NullPipe, FakePipe}

abstract class BuilderTest extends Assertions {
def createPipe(nodes: Seq[String] = Seq(), relationships: Seq[String] = Seq()) = {
val nodeIdentifiers = nodes.map(x => Identifier(x, NodeType()))
val relIdentifiers = relationships.map(x => Identifier(x, RelationshipType()))

new FakePipe(Seq(MutableMap()), new SymbolTable(nodeIdentifiers ++ relIdentifiers: _*))
new FakePipe(Seq(MutableMaps.create), new SymbolTable(nodeIdentifiers ++ relIdentifiers: _*))
}

def plan(q: PartiallySolvedQuery):ExecutionPlanInProgress = plan(new NullPipe, q)
Expand Down
Expand Up @@ -22,18 +22,16 @@ package org.neo4j.cypher.internal.mutation
import org.scalatest.Assertions
import org.neo4j.cypher.ExecutionEngineHelper
import org.junit.Test
import org.neo4j.cypher.internal.pipes.{ExecutionContext, QueryState}
import org.neo4j.cypher.internal.commands.{CreateNodeStartItem, Literal}
import collection.mutable.{Map => MutableMap}

import org.neo4j.cypher.internal.pipes.{MutableMaps, ExecutionContext, QueryState}

class CreateNodeActionTest extends ExecutionEngineHelper with Assertions {

@Test def mixed_types_are_not_ok() {
val action = CreateNodeStartItem("id", Map("*" -> Literal(Map("name" -> "Andres", "age" -> 37))))

val tx = graph.beginTx()
action.exec(ExecutionContext.empty, new QueryState(graph, MutableMap()))
action.exec(ExecutionContext.empty, new QueryState(graph, MutableMaps.create))
tx.success()
tx.finish()

Expand Down
Expand Up @@ -32,7 +32,7 @@ import org.neo4j.cypher.internal.commands.{CreateRelationshipStartItem, CreateNo

class MutationTest extends ExecutionEngineHelper with Assertions {

def createQueryState = new QueryState(graph, MutableMap())
def createQueryState = new QueryState(graph, MutableMaps.create)

@Test
def create_node() {
Expand Down

0 comments on commit 6afc3b7

Please sign in to comment.