Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop taking locks when reading from unique indexes #5988

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.neo4j.cypher.internal.compiler.v2_2.planDescription.Argument
import org.neo4j.cypher.internal.compiler.v2_2.spi.PlanContext
import org.neo4j.graphdb.{Node, PropertyContainer, Relationship}

import scala.collection.GenTraversableOnce

class EntityProducerFactory extends GraphElementPropertyFunctions {

private def asProducer[T <: PropertyContainer](startItem: StartItem)
Expand All @@ -39,11 +41,19 @@ class EntityProducerFactory extends GraphElementPropertyFunctions {
def arguments: Seq[Argument] = startItem.arguments
}

def nodeStartItems: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
def readNodeStartItems: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
nodeById orElse
nodeByIndex orElse
nodeByIndexQuery orElse
nodeByIndexHint(read = true) orElse
nodeByLabel orElse
nodesAll

def updateNodeStartItems: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
nodeById orElse
nodeByIndex orElse
nodeByIndexQuery orElse
nodeByIndexHint orElse
nodeByIndexHint(read = false) orElse
nodeByLabel orElse
nodesAll

Expand Down Expand Up @@ -116,7 +126,7 @@ class EntityProducerFactory extends GraphElementPropertyFunctions {
state.query.relationshipOps.all }
}

val nodeByIndexHint: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] = {
def nodeByIndexHint(read: Boolean): PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] = {
case (planContext, startItem @ SchemaIndex(identifier, labelName, propertyName, AnyIndex, valueExp)) =>

val indexGetter = planContext.getIndexRule(labelName, propertyName)
Expand All @@ -142,7 +152,12 @@ class EntityProducerFactory extends GraphElementPropertyFunctions {
(throw new InternalException("Something went wrong trying to build your query."))

asProducer[Node](startItem) { (m: ExecutionContext, state: QueryState) =>
indexQuery(expression, m, state, state.query.exactUniqueIndexSearch(index, _), labelName, propertyName)
val search: (Any) => GenTraversableOnce[Node] = if (read)
state.query.exactIndexSearch(index, _)
else
state.query.lockingIndexSearch(index, _)

indexQuery(expression, m, state, search, labelName, propertyName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MergeStartPointBuilder extends PlanBuilder {
case other => other
}

val nodeProducer = PlainMergeNodeProducer(entityProducerFactory.nodeStartItems((ctx, startItem.s)))
val nodeProducer = PlainMergeNodeProducer(entityProducerFactory.updateNodeStartItems((ctx, startItem.s)))
val solvedPredicates = startItem.solvedPredicates
val predicatesLeft = where.toSet -- solvedPredicates

Expand All @@ -98,7 +98,7 @@ class MergeStartPointBuilder extends PlanBuilder {
}

val nodeProducer = UniqueMergeNodeProducers(startItems.map {
case (label: KeyToken, propertyKey: KeyToken, item: RatedStartItem) => IndexNodeProducer(label, propertyKey, entityProducerFactory.nodeStartItems((ctx, item.s)))
case (label: KeyToken, propertyKey: KeyToken, item: RatedStartItem) => IndexNodeProducer(label, propertyKey, entityProducerFactory.updateNodeStartItems((ctx, item.s)))
})
val solvedPredicates = startItems.flatMap {
case (_, _, item: RatedStartItem) => item.solvedPredicates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class StartPointBuilder extends PlanBuilder {
private def genNodeStart(entityFactory: EntityProducerFactory): PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
entityFactory.nodeByIndex orElse
entityFactory.nodeByIndexQuery orElse
entityFactory.nodeByIndexHint orElse
entityFactory.nodeByIndexHint(read = true) orElse
entityFactory.nodeById orElse
entityFactory.nodesAll orElse
entityFactory.nodeByLabel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class TraversalMatcherBuilder extends PlanBuilder with PatternGraphBuilder {
val entityFactory = new EntityProducerFactory

private def mapNodeStartCreator(): PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
entityFactory.nodeStartItems
entityFactory.readNodeStartItems

def canWorkWith(plan: ExecutionPlanInProgress, ctx: PlanContext)(implicit pipeMonitor: PipeMonitor): Boolean = {
val (longest,_) = extractExpanderStepsFromQuery(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ case class NodeIndexSeekPipe(ident: String,
val descriptor = new IndexDescriptor(label.nameId.id, propertyKey.nameId.id)

val indexFactory: (QueryState) => (Any) => Iterator[Node] =
if (unique)
(state: QueryState) => (x: Any) => state.query.exactUniqueIndexSearch(descriptor, x).toIterator
else
(state: QueryState) => (x: Any) => state.query.exactIndexSearch(descriptor, x)

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class PipeExecutionPlanBuilder(clock: Clock, monitors: Monitors) {

case LegacyIndexSeek(id, hint: NodeStartItem, _) =>
val source = new SingleRowPipe()
val ep = entityProducerFactory.nodeStartItems((planContext, StatementConverters.StartItemConverter(hint).asCommandStartItem))
val ep = entityProducerFactory.readNodeStartItems((planContext, StatementConverters.StartItemConverter(hint).asCommandStartItem))
NodeStartPipe(source, id.name, ep)()

case x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DelegatingQueryContext(inner: QueryContext) extends QueryContext {

def withAnyOpenQueryContext[T](work: (QueryContext) => T): T = inner.withAnyOpenQueryContext(work)

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = singleDbHit(inner.exactUniqueIndexSearch(index, value))
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = singleDbHit(inner.lockingIndexSearch(index, value))

override def commitAndRestartTx() {
inner.commitAndRestartTx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ trait QueryContext extends TokenContext {

def exactIndexSearch(index: IndexDescriptor, value: Any): Iterator[Node]

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node]
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node]

def getNodesByLabel(id: Int): Iterator[Node]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class EntityProducerFactoryTest extends CypherFunSuite {
when(planContext.getIndexRule(label, prop)).thenReturn(None)

//WHEN
intercept[IndexHintException](factory.nodeByIndexHint(planContext -> SchemaIndex("id", label, prop, AnyIndex, None)))
intercept[IndexHintException](factory.nodeByIndexHint(read = true)(planContext -> SchemaIndex("id", label, prop, AnyIndex, None)))
}

test("calls_the_right_methods") {
Expand All @@ -62,7 +62,7 @@ class EntityProducerFactoryTest extends CypherFunSuite {
val state = QueryStateHelper.emptyWith(query = queryContext)

//WHEN
val func = factory.nodeByIndexHint(planContext -> SchemaIndex("id", label, prop, AnyIndex, Some(SingleQueryExpression(Literal(value)))))
val func = factory.nodeByIndexHint(read = true)(planContext -> SchemaIndex("id", label, prop, AnyIndex, Some(SingleQueryExpression(Literal(value)))))
func(context, state) should equal(indexResult)
}

Expand All @@ -88,7 +88,7 @@ class EntityProducerFactoryTest extends CypherFunSuite {
val propertyKey = "prop"
val index: IndexDescriptor = new IndexDescriptor(123, 456)
when(planContext.getIndexRule(labelName, propertyKey)).thenReturn(Some(index))
val producer = factory.nodeByIndexHint(planContext -> SchemaIndex("x", labelName, propertyKey, AnyIndex, Some(SingleQueryExpression(Literal(Seq(1,2,3))))))
val producer = factory.nodeByIndexHint(read = true)(planContext -> SchemaIndex("x", labelName, propertyKey, AnyIndex, Some(SingleQueryExpression(Literal(Seq(1,2,3))))))
val queryContext: QueryContext = mock[QueryContext]
val state = QueryStateHelper.emptyWith(query = queryContext)
when(queryContext.exactIndexSearch(index, Array(1,2,3))).thenReturn(Iterator.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo
test("should handle unique index lookups for multiple values") {
// given
val queryState = QueryStateHelper.emptyWith(
query = exactUniqueIndexFor(
"hello" -> Some(node),
"world" -> Some(node2)
query = exactIndexFor(
"hello" -> Iterator(node),
"world" -> Iterator(node2)
)
)

Expand Down Expand Up @@ -110,8 +110,8 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo
test("should handle unique index lookups for multiple values when some are null") {
// given
val queryState = QueryStateHelper.emptyWith(
query = exactUniqueIndexFor(
"hello" -> Some(node)
query = exactIndexFor(
"hello" -> Iterator(node)
)
)

Expand Down Expand Up @@ -194,7 +194,7 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo

test("should return the node found by the unique index lookup when both labelId and property key id are solved at compile time") {
// given
val queryState = QueryStateHelper.emptyWith( query = exactUniqueIndexFor("hello"->Some(node)) )
val queryState = QueryStateHelper.emptyWith( query = exactIndexFor("hello"->Iterator(node)) )

// when
val pipe = NodeIndexSeekPipe("n", label, propertyKey, SingleQueryExpression(Literal("hello")), unique = true)()
Expand All @@ -219,18 +219,6 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo
result.map(_("n")).toList should equal(List(node))
}


private def exactUniqueIndexFor(values : (Any, Option[Node])*): QueryContext = {
val query = mock[QueryContext]
when(query.exactUniqueIndexSearch(any(), any())).thenReturn(None)

values.foreach {
case (searchTerm, result) => when(query.exactUniqueIndexSearch(descriptor, searchTerm)).thenReturn(result)
}

query
}

private def exactIndexFor(values : (Any, Iterator[Node])*): QueryContext = {
val query = mock[QueryContext]
when(query.exactIndexSearch(any(), any())).thenReturn(Iterator.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class StartPipePlanDescriptionTest extends CypherFunSuite {
}

private def createPlanDescription(startItem: StartItem): InternalPlanDescription = {
val producer = factory.nodeStartItems((planContext, startItem))
val producer = factory.readNodeStartItems((planContext, startItem))
val pipe = new NodeStartPipe(SingleRowPipe(), "n", producer)()
pipe.planDescription
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class ExceptionTranslatingQueryContext(inner: QueryContext) extends DelegatingQu
override def getRelTypeName(id: Int) =
translateException(super.getRelTypeName(id))

override def exactUniqueIndexSearch(index: IndexDescriptor, value: Any) =
translateException(super.exactUniqueIndexSearch(index, value))
override def lockingIndexSearch(index: IndexDescriptor, value: Any) =
translateException(super.lockingIndexSearch(index, value))

override def commitAndRestartTx() =
translateException(super.commitAndRestartTx())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ final class TransactionBoundQueryContext(graph: GraphDatabaseAPI,
def exactIndexSearch(index: IndexDescriptor, value: Any) =
mapToScala(statement.readOperations().nodesGetFromIndexLookup(index, value))(nodeOps.getById)

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = {
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming of these methods is a bit inconsistent. Maybe call this one lockingExactIndexSearch?
Or maybe we can name them exactIndexSearchForRead and exactIndexSearchForWrite.

val nodeId: Long = statement.readOperations().nodeGetUniqueFromIndexLookup(index, value)
if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Copyright (c) 2002-2015 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.cypher

class UniqueIndexUsageAcceptanceTest extends ExecutionEngineFunSuite with NewPlannerTestSupport {
test("should be able to use indexes") {
given()

// When
val result = executeWithAllPlanners("MATCH (n:Crew) WHERE n.name = 'Neo' RETURN n")

// Then
result.executionPlanDescription().toString should include("NodeUniqueIndexSeek")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use result should use("NodeUniqueIndexSeek")

(likewise for all tests in this class)

Probably not in 2.2

result should have size 1
}

test("should not forget predicates") {
given()

// When
val result = executeWithAllPlanners("MATCH (n:Crew) WHERE n.name = 'Neo' AND n.name = 'Morpheus' RETURN n")

// Then
result shouldBe empty
result.executionPlanDescription().toString should include("NodeUniqueIndexSeek")
}

test("should use index when there are multiple labels on the node") {
given()

// When
val result = executeWithAllPlanners("MATCH (n:Matrix:Crew) WHERE n.name = 'Cypher' RETURN n")

// Then
result.executionPlanDescription().toString should include("NodeUniqueIndexSeek")
result should have size 1
}

test("should be able to use value coming from UNWIND for index seek") {
// Given
graph.createConstraint("Prop", "id")
val n1 = createLabeledNode(Map("id" -> 1), "Prop")
val n2 = createLabeledNode(Map("id" -> 2), "Prop")
val n3 = createLabeledNode(Map("id" -> 3), "Prop")
for (i <- 4 to 30) createLabeledNode(Map("id" -> i), "Prop")

// When
val result = executeWithAllPlanners("unwind [1,2,3] as x match (n:Prop) where n.id = x return n;")

// Then
result.toList should equal(List(Map("n" -> n1), Map("n" -> n2), Map("n" -> n3)))
result.executionPlanDescription().toString should include("NodeUniqueIndexSeek")
}

test("should handle nulls in index lookup") {
// Given
val cat = createLabeledNode("Cat")
val dog = createLabeledNode("Dog")
relate(cat, dog, "FRIEND_OF")

// create many nodes with label 'Place' to make sure index seek is planned
(1 to 100).foreach(i => createLabeledNode(Map("name" -> s"Area $i"), "Place"))

graph.createConstraint("Place", "name")

// When
val result = executeWithCostPlannerOnly(
"""
|MATCH ()-[f:FRIEND_OF]->()
|WITH f.placeName AS placeName
|OPTIONAL MATCH (p:Place)
|WHERE p.name = placeName
|RETURN p, placeName
""".stripMargin)

// Then
result.toList should equal(List(Map("p" -> null, "placeName" -> null)))
}

test("should not use indexes when RHS of property comparison depends on the node searched for (equality)") {
// Given
val n1 = createLabeledNode(Map("a" -> 0, "b" -> 1), "MyNodes")
val n2 = createLabeledNode(Map("a" -> 1, "b" -> 1), "MyNodes")
val n3 = createLabeledNode(Map("a" -> 2, "b" -> 2), "MyNodes")
val n4 = createLabeledNode(Map("a" -> 3, "b" -> 5), "MyNodes")

graph.createConstraint("MyNodes", "a")

val query =
"""|MATCH (m:MyNodes)
|WHERE m.a = m.b
|RETURN m""".stripMargin

// When
val result = executeWithAllPlanners(query)

// Then
result.toList should equal(List(
Map("m" -> n2),
Map("m" -> n3)
))
result.executionPlanDescription().toString shouldNot include("Index")
}

private def given() {
executeWithRulePlannerOnly(
"""CREATE (architect:Matrix { name:'The Architect' }),
| (smith:Matrix { name:'Agent Smith' }),
| (cypher:Matrix:Crew { name:'Cypher' }),
| (trinity:Crew { name:'Trinity' }),
| (morpheus:Crew { name:'Morpheus' }),
| (neo:Crew { name:'Neo' }),
| smith-[:CODED_BY]->architect,
| cypher-[:KNOWS]->smith,
| morpheus-[:KNOWS]->trinity,
| morpheus-[:KNOWS]->cypher,
| neo-[:KNOWS]->morpheus,
| neo-[:LOVES]->trinity""".stripMargin)

for (i <- 1 to 10) createLabeledNode(Map("name" -> ("Joe" + i)), "Crew")

for (i <- 1 to 10) createLabeledNode(Map("name" -> ("Smith" + i)), "Matrix")

graph.createConstraint("Crew", "name")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class SnitchingQueryContext extends QueryContext {

def withAnyOpenQueryContext[T](work: (QueryContext) => T): T = ???

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = ???
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = ???

def commitAndRestartTx() { ??? }

Expand Down