Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop taking locks when reading from unique indexes #5988

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.neo4j.cypher.internal.compiler.v2_2.planDescription.Argument
import org.neo4j.cypher.internal.compiler.v2_2.spi.PlanContext
import org.neo4j.graphdb.{Node, PropertyContainer, Relationship}

import scala.collection.GenTraversableOnce

class EntityProducerFactory extends GraphElementPropertyFunctions {

private def asProducer[T <: PropertyContainer](startItem: StartItem)
Expand All @@ -39,11 +41,19 @@ class EntityProducerFactory extends GraphElementPropertyFunctions {
def arguments: Seq[Argument] = startItem.arguments
}

def nodeStartItems: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
def readNodeStartItems: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
nodeById orElse
nodeByIndex orElse
nodeByIndexQuery orElse
nodeByIndexHint(read = true) orElse
nodeByLabel orElse
nodesAll

def updateNodeStartItems: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
nodeById orElse
nodeByIndex orElse
nodeByIndexQuery orElse
nodeByIndexHint orElse
nodeByIndexHint(read = false) orElse
nodeByLabel orElse
nodesAll

Expand Down Expand Up @@ -116,7 +126,7 @@ class EntityProducerFactory extends GraphElementPropertyFunctions {
state.query.relationshipOps.all }
}

val nodeByIndexHint: PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] = {
def nodeByIndexHint(read: Boolean): PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] = {
case (planContext, startItem @ SchemaIndex(identifier, labelName, propertyName, AnyIndex, valueExp)) =>

val indexGetter = planContext.getIndexRule(labelName, propertyName)
Expand All @@ -142,7 +152,12 @@ class EntityProducerFactory extends GraphElementPropertyFunctions {
(throw new InternalException("Something went wrong trying to build your query."))

asProducer[Node](startItem) { (m: ExecutionContext, state: QueryState) =>
indexQuery(expression, m, state, state.query.exactUniqueIndexSearch(index, _), labelName, propertyName)
val search: (Any) => GenTraversableOnce[Node] = if (read)
state.query.exactIndexSearch(index, _)
else
state.query.lockingIndexSearch(index, _)

indexQuery(expression, m, state, search, labelName, propertyName)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class MergeStartPointBuilder extends PlanBuilder {
case other => other
}

val nodeProducer = PlainMergeNodeProducer(entityProducerFactory.nodeStartItems((ctx, startItem.s)))
val nodeProducer = PlainMergeNodeProducer(entityProducerFactory.updateNodeStartItems((ctx, startItem.s)))
val solvedPredicates = startItem.solvedPredicates
val predicatesLeft = where.toSet -- solvedPredicates

Expand All @@ -98,7 +98,7 @@ class MergeStartPointBuilder extends PlanBuilder {
}

val nodeProducer = UniqueMergeNodeProducers(startItems.map {
case (label: KeyToken, propertyKey: KeyToken, item: RatedStartItem) => IndexNodeProducer(label, propertyKey, entityProducerFactory.nodeStartItems((ctx, item.s)))
case (label: KeyToken, propertyKey: KeyToken, item: RatedStartItem) => IndexNodeProducer(label, propertyKey, entityProducerFactory.updateNodeStartItems((ctx, item.s)))
})
val solvedPredicates = startItems.flatMap {
case (_, _, item: RatedStartItem) => item.solvedPredicates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class StartPointBuilder extends PlanBuilder {
private def genNodeStart(entityFactory: EntityProducerFactory): PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
entityFactory.nodeByIndex orElse
entityFactory.nodeByIndexQuery orElse
entityFactory.nodeByIndexHint orElse
entityFactory.nodeByIndexHint(read = true) orElse
entityFactory.nodeById orElse
entityFactory.nodesAll orElse
entityFactory.nodeByLabel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class TraversalMatcherBuilder extends PlanBuilder with PatternGraphBuilder {
val entityFactory = new EntityProducerFactory

private def mapNodeStartCreator(): PartialFunction[(PlanContext, StartItem), EntityProducer[Node]] =
entityFactory.nodeStartItems
entityFactory.readNodeStartItems

def canWorkWith(plan: ExecutionPlanInProgress, ctx: PlanContext)(implicit pipeMonitor: PipeMonitor): Boolean = {
val (longest,_) = extractExpanderStepsFromQuery(plan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ case class NodeIndexSeekPipe(ident: String,
val descriptor = new IndexDescriptor(label.nameId.id, propertyKey.nameId.id)

val indexFactory: (QueryState) => (Any) => Iterator[Node] =
if (unique)
(state: QueryState) => (x: Any) => state.query.exactUniqueIndexSearch(descriptor, x).toIterator
else
(state: QueryState) => (x: Any) => state.query.exactIndexSearch(descriptor, x)

protected def internalCreateResults(state: QueryState): Iterator[ExecutionContext] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class PipeExecutionPlanBuilder(clock: Clock, monitors: Monitors) {

case LegacyIndexSeek(id, hint: NodeStartItem, _) =>
val source = new SingleRowPipe()
val ep = entityProducerFactory.nodeStartItems((planContext, StatementConverters.StartItemConverter(hint).asCommandStartItem))
val ep = entityProducerFactory.readNodeStartItems((planContext, StatementConverters.StartItemConverter(hint).asCommandStartItem))
NodeStartPipe(source, id.name, ep)()

case x =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class DelegatingQueryContext(inner: QueryContext) extends QueryContext {

def withAnyOpenQueryContext[T](work: (QueryContext) => T): T = inner.withAnyOpenQueryContext(work)

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = singleDbHit(inner.exactUniqueIndexSearch(index, value))
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = singleDbHit(inner.lockingIndexSearch(index, value))

override def commitAndRestartTx() {
inner.commitAndRestartTx()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ trait QueryContext extends TokenContext {

def exactIndexSearch(index: IndexDescriptor, value: Any): Iterator[Node]

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node]
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node]

def getNodesByLabel(id: Int): Iterator[Node]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class EntityProducerFactoryTest extends CypherFunSuite {
when(planContext.getIndexRule(label, prop)).thenReturn(None)

//WHEN
intercept[IndexHintException](factory.nodeByIndexHint(planContext -> SchemaIndex("id", label, prop, AnyIndex, None)))
intercept[IndexHintException](factory.nodeByIndexHint(read = true)(planContext -> SchemaIndex("id", label, prop, AnyIndex, None)))
}

test("calls_the_right_methods") {
Expand All @@ -62,7 +62,7 @@ class EntityProducerFactoryTest extends CypherFunSuite {
val state = QueryStateHelper.emptyWith(query = queryContext)

//WHEN
val func = factory.nodeByIndexHint(planContext -> SchemaIndex("id", label, prop, AnyIndex, Some(SingleQueryExpression(Literal(value)))))
val func = factory.nodeByIndexHint(read = true)(planContext -> SchemaIndex("id", label, prop, AnyIndex, Some(SingleQueryExpression(Literal(value)))))
func(context, state) should equal(indexResult)
}

Expand All @@ -88,7 +88,7 @@ class EntityProducerFactoryTest extends CypherFunSuite {
val propertyKey = "prop"
val index: IndexDescriptor = new IndexDescriptor(123, 456)
when(planContext.getIndexRule(labelName, propertyKey)).thenReturn(Some(index))
val producer = factory.nodeByIndexHint(planContext -> SchemaIndex("x", labelName, propertyKey, AnyIndex, Some(SingleQueryExpression(Literal(Seq(1,2,3))))))
val producer = factory.nodeByIndexHint(read = true)(planContext -> SchemaIndex("x", labelName, propertyKey, AnyIndex, Some(SingleQueryExpression(Literal(Seq(1,2,3))))))
val queryContext: QueryContext = mock[QueryContext]
val state = QueryStateHelper.emptyWith(query = queryContext)
when(queryContext.exactIndexSearch(index, Array(1,2,3))).thenReturn(Iterator.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo
test("should handle unique index lookups for multiple values") {
// given
val queryState = QueryStateHelper.emptyWith(
query = exactUniqueIndexFor(
"hello" -> Some(node),
"world" -> Some(node2)
query = exactIndexFor(
"hello" -> Iterator(node),
"world" -> Iterator(node2)
)
)

Expand Down Expand Up @@ -110,8 +110,8 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo
test("should handle unique index lookups for multiple values when some are null") {
// given
val queryState = QueryStateHelper.emptyWith(
query = exactUniqueIndexFor(
"hello" -> Some(node)
query = exactIndexFor(
"hello" -> Iterator(node)
)
)

Expand Down Expand Up @@ -194,7 +194,7 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo

test("should return the node found by the unique index lookup when both labelId and property key id are solved at compile time") {
// given
val queryState = QueryStateHelper.emptyWith( query = exactUniqueIndexFor("hello"->Some(node)) )
val queryState = QueryStateHelper.emptyWith( query = exactIndexFor("hello"->Iterator(node)) )

// when
val pipe = NodeIndexSeekPipe("n", label, propertyKey, SingleQueryExpression(Literal("hello")), unique = true)()
Expand All @@ -219,18 +219,6 @@ class NodeIndexSeekPipeTest extends CypherFunSuite with AstConstructionTestSuppo
result.map(_("n")).toList should equal(List(node))
}


private def exactUniqueIndexFor(values : (Any, Option[Node])*): QueryContext = {
val query = mock[QueryContext]
when(query.exactUniqueIndexSearch(any(), any())).thenReturn(None)

values.foreach {
case (searchTerm, result) => when(query.exactUniqueIndexSearch(descriptor, searchTerm)).thenReturn(result)
}

query
}

private def exactIndexFor(values : (Any, Iterator[Node])*): QueryContext = {
val query = mock[QueryContext]
when(query.exactIndexSearch(any(), any())).thenReturn(Iterator.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class StartPipePlanDescriptionTest extends CypherFunSuite {
}

private def createPlanDescription(startItem: StartItem): InternalPlanDescription = {
val producer = factory.nodeStartItems((planContext, startItem))
val producer = factory.readNodeStartItems((planContext, startItem))
val pipe = new NodeStartPipe(SingleRowPipe(), "n", producer)()
pipe.planDescription
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.neo4j.cypher.internal.compiler.v2_2.planDescription.{Argument, Intern
import org.neo4j.cypher.internal.compiler.v2_2.spi.MapToPublicExceptions
import org.neo4j.cypher.internal.compiler.v2_2.tracing.rewriters.RewriterStepSequencer
import org.neo4j.cypher.internal.compiler.v2_2.{CypherException => CypherException_v2_2, _}
import org.neo4j.cypher.internal.spi.v2_2.TransactionBoundQueryContext.IndexSearchMonitor
import org.neo4j.cypher.internal.spi.v2_2.{TransactionBoundGraphStatistics, TransactionBoundPlanContext, TransactionBoundQueryContext}
import org.neo4j.cypher.javacompat.ProfilerStatistics
import org.neo4j.cypher.{ArithmeticException, CypherTypeException, EntityNotFoundException, FailedIndexException, HintException, IncomparableValuesException, IndexHintException, InternalException, InvalidArgumentException, InvalidSemanticsException, LabelScanHintException, LoadCsvStatusWrapCypherException, LoadExternalResourceException, MergeConstraintConflictException, NodeStillHasRelationshipsException, ParameterNotFoundException, ParameterWrongTypeException, PatternException, PeriodicCommitInOpenTransactionException, ProfilerStatisticsNotReadyException, SyntaxException, UniquePathNotUniqueException, UnknownLabelException, _}
Expand Down Expand Up @@ -150,6 +151,7 @@ trait CompatibilityFor2_2 {
protected val compiler: v2_2.CypherCompiler

implicit val executionMonitor = kernelMonitors.newMonitor(classOf[QueryExecutionMonitor])
implicit val indexSearchMonitor = kernelMonitors.newMonitor(classOf[IndexSearchMonitor])

def produceParsedQuery(statementAsText: String, rawStatement: String, offset: InputPosition) = new ParsedQuery {
val preparedQueryForV_2_2 = Try(compiler.prepareQuery(statementAsText, rawStatement, Some(offset)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ class ExceptionTranslatingQueryContext(inner: QueryContext) extends DelegatingQu
override def getRelTypeName(id: Int) =
translateException(super.getRelTypeName(id))

override def exactUniqueIndexSearch(index: IndexDescriptor, value: Any) =
translateException(super.exactUniqueIndexSearch(index, value))
override def lockingIndexSearch(index: IndexDescriptor, value: Any) =
translateException(super.lockingIndexSearch(index, value))

override def commitAndRestartTx() =
translateException(super.commitAndRestartTx())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.neo4j.cypher.internal.compiler.v2_2.{EntityNotFoundException, FailedI
import org.neo4j.cypher.internal.compiler.v2_2.spi._
import org.neo4j.cypher.internal.helpers.JavaConversionSupport
import org.neo4j.cypher.internal.helpers.JavaConversionSupport._
import org.neo4j.cypher.internal.spi.v2_2.TransactionBoundQueryContext.IndexSearchMonitor
import org.neo4j.graphdb.DynamicRelationshipType._
import org.neo4j.graphdb._
import org.neo4j.graphdb.factory.GraphDatabaseSettings
Expand All @@ -45,7 +46,7 @@ import scala.collection.{Iterator, mutable}
final class TransactionBoundQueryContext(graph: GraphDatabaseAPI,
var tx: Transaction,
val isTopLevelTx: Boolean,
initialStatement: Statement)
initialStatement: Statement)(implicit indexSearchMonitor: IndexSearchMonitor)
extends TransactionBoundTokenContext(initialStatement) with QueryContext {

private var open = true
Expand Down Expand Up @@ -131,10 +132,13 @@ final class TransactionBoundQueryContext(graph: GraphDatabaseAPI,
case Some(typeIds) => JavaConversionSupport.asScala(statement.readOperations().nodeGetRelationships(node.getId, dir, typeIds: _* )).map(relationshipOps.getById)
}

def exactIndexSearch(index: IndexDescriptor, value: Any) =
def exactIndexSearch(index: IndexDescriptor, value: Any) = {
indexSearchMonitor.exactIndexSearch(index, value)
mapToScala(statement.readOperations().nodesGetFromIndexLookup(index, value))(nodeOps.getById)
}

def exactUniqueIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = {
def lockingIndexSearch(index: IndexDescriptor, value: Any): Option[Node] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming of these methods is a bit inconsistent. Maybe call this one lockingExactIndexSearch?
Or maybe we can name them exactIndexSearchForRead and exactIndexSearchForWrite.

indexSearchMonitor.lockingIndexSearch(index, value)
val nodeId: Long = statement.readOperations().nodeGetUniqueFromIndexLookup(index, value)
if (StatementConstants.NO_SUCH_NODE == nodeId) None else Some(nodeOps.getById(nodeId))
}
Expand Down Expand Up @@ -321,3 +325,11 @@ final class TransactionBoundQueryContext(graph: GraphDatabaseAPI,
statement = txBridge.instance()
}
}

object TransactionBoundQueryContext {
trait IndexSearchMonitor {
def exactIndexSearch(index: IndexDescriptor, value: Any): Unit

def lockingIndexSearch(index: IndexDescriptor, value: Any): Unit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.neo4j.cypher.internal.commons.{CypherFunSuite, CypherTestSupport}
import org.neo4j.cypher.internal.compiler.v2_2.spi.PlanContext
import org.neo4j.cypher.internal.helpers.GraphIcing
import org.neo4j.cypher.internal.spi.v2_2.TransactionBoundPlanContext
import org.neo4j.cypher.internal.spi.v2_2.TransactionBoundQueryContext.IndexSearchMonitor
import org.neo4j.graphdb._
import org.neo4j.graphdb.factory.GraphDatabaseSettings
import org.neo4j.kernel.api.{DataWriteOperations, KernelAPI}
Expand Down Expand Up @@ -221,6 +222,8 @@ trait GraphDatabaseTestSupport extends CypherTestSupport with GraphIcing {
def kernelAPI = graph.getDependencyResolver.resolveDependency(classOf[KernelAPI])

def planContext: PlanContext = new TransactionBoundPlanContext(statement, graph)

def indexSearchMonitor = kernelMonitors.newMonitor(classOf[IndexSearchMonitor])
}

trait Snitch extends GraphDatabaseAPI {
Expand Down