From eeca7cf03b2fb2fd4111b00f364a85ebad1d7a8e Mon Sep 17 00:00:00 2001 From: Pasha Shaik Date: Thu, 2 May 2024 11:09:17 +0200 Subject: [PATCH 1/6] perf(neo4j): improve query performance by using node labels --- .../graph/neo4j/Neo4jGraphService.java | 564 +++++++++--------- 1 file changed, 290 insertions(+), 274 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index e2806f093f77e..0256ea1144cc3 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -9,9 +9,7 @@ import com.linkedin.common.UrnArray; import com.linkedin.common.UrnArrayArray; import com.linkedin.common.urn.Urn; -import com.linkedin.metadata.aspect.models.graph.Edge; -import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; -import com.linkedin.metadata.aspect.models.graph.RelatedEntity; +import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphFilters; import com.linkedin.metadata.graph.GraphService; @@ -19,8 +17,9 @@ import com.linkedin.metadata.graph.LineageRelationship; import com.linkedin.metadata.graph.LineageRelationshipArray; import com.linkedin.metadata.graph.RelatedEntitiesResult; +import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; +import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; -import com.linkedin.metadata.query.LineageFlags; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; import com.linkedin.metadata.query.filter.CriterionArray; @@ -72,9 +71,9 @@ public Neo4jGraphService(@Nonnull LineageRegistry lineageRegistry, @Nonnull Driv } public Neo4jGraphService( - @Nonnull LineageRegistry lineageRegistry, - @Nonnull Driver driver, - @Nonnull SessionConfig sessionConfig) { + @Nonnull LineageRegistry lineageRegistry, + @Nonnull Driver driver, + @Nonnull SessionConfig sessionConfig) { this._lineageRegistry = lineageRegistry; this._driver = driver; this._sessionConfig = sessionConfig; @@ -89,9 +88,9 @@ public LineageRegistry getLineageRegistry() { public void addEdge(@Nonnull final Edge edge) { log.debug( - String.format( - "Adding Edge source: %s, destination: %s, type: %s", - edge.getSource(), edge.getDestination(), edge.getRelationshipType())); + String.format( + "Adding Edge source: %s, destination: %s, type: %s", + edge.getSource(), edge.getDestination(), edge.getRelationshipType())); final String sourceType = edge.getSource().getEntityType(); final String destinationType = edge.getDestination().getEntityType(); @@ -123,24 +122,24 @@ public void addEdge(@Nonnull final Edge edge) { // Add/Update relationship final String mergeRelationshipTemplate = - "MATCH (source:%s {urn: '%s'}),(destination:%s {urn: '%s'}) MERGE (source)-[r:%s]->(destination) "; + "MATCH (source:%s {urn: '%s'}),(destination:%s {urn: '%s'}) MERGE (source)-[r:%s]->(destination) "; String statement = - String.format( - mergeRelationshipTemplate, - sourceType, - sourceUrn, - destinationType, - destinationUrn, - edge.getRelationshipType()); + String.format( + mergeRelationshipTemplate, + sourceType, + sourceUrn, + destinationType, + destinationUrn, + edge.getRelationshipType()); String statementR = - String.format( - mergeRelationshipTemplate, - startType, - startUrn, - endType, - endUrn, - reverseRelationshipType); + String.format( + mergeRelationshipTemplate, + startType, + startUrn, + endType, + endUrn, + reverseRelationshipType); // Add/Update relationship properties String setCreatedOnTemplate; @@ -169,26 +168,26 @@ public void addEdge(@Nonnull final Edge edge) { for (Map.Entry entry : edge.getProperties().entrySet()) { // Make sure extra keys in properties are not preserved final Set preservedKeySet = - Set.of("createdOn", "createdActor", "updatedOn", "updatedActor", "startUrn", "endUrn"); + Set.of("createdOn", "createdActor", "updatedOn", "updatedActor", "startUrn", "endUrn"); if (preservedKeySet.contains(entry.getKey())) { throw new UnsupportedOperationException( - String.format( - "Tried setting properties on graph edge but property key is preserved. Key: %s", - entry.getKey())); + String.format( + "Tried setting properties on graph edge but property key is preserved. Key: %s", + entry.getKey())); } if (entry.getValue() instanceof String) { setPropertyTemplate = String.format("r.%s = '%s'", entry.getKey(), entry.getValue()); propertiesTemplateJoiner.add(setPropertyTemplate); } else { throw new UnsupportedOperationException( - String.format( - "Tried setting properties on graph edge but property value type is not supported. Key: %s, Value: %s ", - entry.getKey(), entry.getValue())); + String.format( + "Tried setting properties on graph edge but property value type is not supported. Key: %s, Value: %s ", + entry.getKey(), entry.getValue())); } } } final String setStartEndUrnTemplate = - String.format("r.startUrn = '%s', r.endUrn = '%s'", startUrn, endUrn); + String.format("r.startUrn = '%s', r.endUrn = '%s'", startUrn, endUrn); propertiesTemplateJoiner.add(setStartEndUrnTemplate); if (!StringUtils.isEmpty(propertiesTemplateJoiner.toString())) { statementR = String.format("%s SET %s", statementR, propertiesTemplateJoiner); @@ -207,9 +206,9 @@ public void upsertEdge(final Edge edge) { @Override public void removeEdge(final Edge edge) { log.debug( - String.format( - "Deleting Edge source: %s, destination: %s, type: %s", - edge.getSource(), edge.getDestination(), edge.getRelationshipType())); + String.format( + "Deleting Edge source: %s, destination: %s, type: %s", + edge.getSource(), edge.getDestination(), edge.getRelationshipType())); final String sourceType = edge.getSource().getEntityType(); final String destinationType = edge.getDestination().getEntityType(); @@ -233,23 +232,23 @@ public void removeEdge(final Edge edge) { // DELETE relationship final String mergeRelationshipTemplate = - "MATCH (source:%s {urn: '%s'})-[r:%s]->(destination:%s {urn: '%s'}) DELETE r"; + "MATCH (source:%s {urn: '%s'})-[r:%s]->(destination:%s {urn: '%s'}) DELETE r"; final String statement = - String.format( - mergeRelationshipTemplate, - sourceType, - sourceUrn, - edge.getRelationshipType(), - destinationType, - destinationUrn); + String.format( + mergeRelationshipTemplate, + sourceType, + sourceUrn, + edge.getRelationshipType(), + destinationType, + destinationUrn); final String statementR = - String.format( - mergeRelationshipTemplate, - startType, - startUrn, - reverseRelationshipType, - endType, - endUrn); + String.format( + mergeRelationshipTemplate, + startType, + startUrn, + reverseRelationshipType, + endType, + endUrn); statements.add(buildStatement(statement, new HashMap<>())); statements.add(buildStatement(statementR, new HashMap<>())); @@ -260,75 +259,76 @@ public void removeEdge(final Edge edge) { @WithSpan @Override public EntityLineageResult getLineage( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int offset, - int count, - int maxHops) { - return getLineage(entityUrn, direction, graphFilters, offset, count, maxHops, null); + @Nonnull Urn entityUrn, + @Nonnull LineageDirection direction, + GraphFilters graphFilters, + int offset, + int count, + int maxHops) { + return getLineage(entityUrn, direction, graphFilters, offset, count, maxHops, null, null); } @Nonnull @Override public EntityLineageResult getLineage( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int offset, - int count, - int maxHops, - @Nullable LineageFlags lineageFlags) { + @Nonnull Urn entityUrn, + @Nonnull LineageDirection direction, + GraphFilters graphFilters, + int offset, + int count, + int maxHops, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops)); final var statementAndParams = - generateLineageStatementAndParameters( - entityUrn, direction, graphFilters, maxHops, lineageFlags); + generateLineageStatementAndParameters( + entityUrn, direction, graphFilters, maxHops, startTimeMillis, endTimeMillis); final var statement = statementAndParams.getFirst(); final var parameters = statementAndParams.getSecond(); List neo4jResult = - statement != null - ? runQuery(buildStatement(statement, parameters)).list() - : new ArrayList<>(); + statement != null + ? runQuery(buildStatement(statement, parameters)).list() + : new ArrayList<>(); LineageRelationshipArray relations = new LineageRelationshipArray(); neo4jResult.stream() - .skip(offset) - .limit(count) - .forEach( - item -> { - String urn = item.values().get(2).asNode().get("urn").asString(); - try { - final var path = item.get(1).asPath(); - final List nodeListAsPath = - StreamSupport.stream(path.nodes().spliterator(), false) - .map(node -> createFromString(node.get("urn").asString())) - .collect(Collectors.toList()); - - final var firstRelationship = - Optional.ofNullable(Iterables.getFirst(path.relationships(), null)); - - relations.add( - new LineageRelationship() - .setEntity(Urn.createFromString(urn)) - // although firstRelationship should never be absent, provide "" as fallback - // value - .setType(firstRelationship.map(Relationship::type).orElse("")) - .setDegree(path.length()) - .setPaths(new UrnArrayArray(new UrnArray(nodeListAsPath)))); - } catch (URISyntaxException ignored) { - log.warn( - String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage())); - } - }); + .skip(offset) + .limit(count) + .forEach( + item -> { + String urn = item.values().get(2).asNode().get("urn").asString(); + try { + final var path = item.get(1).asPath(); + final List nodeListAsPath = + StreamSupport.stream(path.nodes().spliterator(), false) + .map(node -> createFromString(node.get("urn").asString())) + .collect(Collectors.toList()); + + final var firstRelationship = + Optional.ofNullable(Iterables.getFirst(path.relationships(), null)); + + relations.add( + new LineageRelationship() + .setEntity(Urn.createFromString(urn)) + // although firstRelationship should never be absent, provide "" as fallback + // value + .setType(firstRelationship.map(Relationship::type).orElse("")) + .setDegree(path.length()) + .setPaths(new UrnArrayArray(new UrnArray(nodeListAsPath)))); + } catch (URISyntaxException ignored) { + log.warn( + String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage())); + } + }); EntityLineageResult result = - new EntityLineageResult() - .setStart(offset) - .setCount(relations.size()) - .setRelationships(relations) - .setTotal(neo4jResult.size()); + new EntityLineageResult() + .setStart(offset) + .setCount(relations.size()) + .setRelationships(relations) + .setTotal(neo4jResult.size()); log.debug(String.format("Neo4j getLineage results = %s", result)); return result; @@ -339,7 +339,7 @@ private String getPathFindingLabelFilter(List entityNames) { } private String getPathFindingRelationshipFilter( - @Nonnull List entityNames, @Nullable LineageDirection direction) { + @Nonnull List entityNames, @Nullable LineageDirection direction) { // relationshipFilter supports mixing different directions for various relation types, // so simply transform entries lineage registry into format of filter final var filterComponents = new HashSet(); @@ -356,9 +356,9 @@ private String getPathFindingRelationshipFilter( } else { // return disjunctive combination of edge types regardless of direction for (final var direction1 : - List.of(LineageDirection.UPSTREAM, LineageDirection.DOWNSTREAM)) { + List.of(LineageDirection.UPSTREAM, LineageDirection.DOWNSTREAM)) { for (final var edgeInfo : - _lineageRegistry.getLineageRelationships(entityName, direction1)) { + _lineageRegistry.getLineageRelationships(entityName, direction1)) { filterComponents.add(edgeInfo.getType()); } } @@ -368,36 +368,40 @@ private String getPathFindingRelationshipFilter( } private Pair> generateLineageStatementAndParameters( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int maxHops, - @Nullable LineageFlags lineageFlags) { + @Nonnull Urn entityUrn, + @Nonnull LineageDirection direction, + GraphFilters graphFilters, + int maxHops, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { final var parameterMap = - new HashMap( - Map.of( - "urn", entityUrn.toString(), - "labelFilter", getPathFindingLabelFilter(graphFilters.getAllowedEntityTypes()), - "relationshipFilter", - getPathFindingRelationshipFilter( - graphFilters.getAllowedEntityTypes(), direction), - "maxHops", maxHops)); - - if (lineageFlags == null - || (lineageFlags.getStartTimeMillis() == null && lineageFlags.getEndTimeMillis() == null)) { + new HashMap( + Map.of( + "urn", entityUrn.toString(), + "labelFilter", getPathFindingLabelFilter(graphFilters.getAllowedEntityTypes()), + "relationshipFilter", + getPathFindingRelationshipFilter( + graphFilters.getAllowedEntityTypes(), direction), + "maxHops", maxHops)); + + // Get the entity type from the URN + final String entityType = entityUrn.getEntityType(); + + if (startTimeMillis == null && endTimeMillis == null) { // if no time filtering required, simply find all expansion paths to other nodes - final var statement = - "MATCH (a {urn: $urn}) " - + "CALL apoc.path.spanningTree(a, { " - + " relationshipFilter: $relationshipFilter, " - + " labelFilter: $labelFilter, " - + " minLevel: 1, " - + " maxLevel: $maxHops " - + "}) " - + "YIELD path " - + "WITH a, path AS path " - + "RETURN a, path, last(nodes(path));"; + final var statement = String.format( + "MATCH (a:%s {urn: $urn}) " + + "CALL apoc.path.spanningTree(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD path " + + "WITH a, path AS path " + + "RETURN a, path, last(nodes(path));", + entityType); return Pair.of(statement, parameterMap); } else { // when needing time filtering, possibility on multiple paths between two @@ -405,13 +409,13 @@ private Pair> generateLineageStatementAndParameters( // use r_ edges until they are no longer useful final var relationFilter = - getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), null) - .replaceAll("(\\w+)", "r_$1"); + getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), null) + .replaceAll("(\\w+)", "r_$1"); final var relationshipPattern = - String.format( - (direction == LineageDirection.UPSTREAM ? "<-[:%s*1..%d]-" : "-[:%s*1..%d]->"), - relationFilter, - maxHops); + String.format( + (direction == LineageDirection.UPSTREAM ? "<-[:%s*1..%d]-" : "-[:%s*1..%d]->"), + relationFilter, + maxHops); // two steps: // 1. find list of nodes reachable within maxHops @@ -419,38 +423,34 @@ private Pair> generateLineageStatementAndParameters( // (note: according to the docs of shortestPath, WHERE conditions are applied during path // exploration, not // after path exploration is done) - final var statement = - "MATCH (a {urn: $urn}) " - + "CALL apoc.path.subgraphNodes(a, { " - + " relationshipFilter: $relationshipFilter, " - + " labelFilter: $labelFilter, " - + " minLevel: 1, " - + " maxLevel: $maxHops " - + "}) " - + "YIELD node AS b " - + "WITH a, b " - + "MATCH path = shortestPath((a)" - + relationshipPattern - + "(b)) " - + "WHERE a <> b " - + " AND ALL(rt IN relationships(path) WHERE " - + " (rt.source IS NOT NULL AND rt.source = 'UI') OR " - + " (rt.createdOn IS NULL AND rt.updatedOn IS NULL) OR " - + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " - + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " - + " ) " - + "RETURN a, path, b;"; + final var statement = String.format( + "MATCH (a:%s {urn: $urn}) " + + "CALL apoc.path.subgraphNodes(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD node AS b " + + "WITH a, b " + + "MATCH path = shortestPath((a)" + + relationshipPattern + + "(b)) " + + "WHERE a <> b " + + " AND ALL(rt IN relationships(path) WHERE " + + " (rt.source IS NOT NULL AND rt.source = 'UI') OR " + + " (rt.createdOn IS NULL AND rt.updatedOn IS NULL) OR " + + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " + + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " + + " ) " + + "RETURN a, path, b;", + entityType); // provide dummy start/end time when not provided, so no need to // format clause differently if either of them is missing + parameterMap.put("startTimeMillis", startTimeMillis == null ? 0 : startTimeMillis); parameterMap.put( - "startTimeMillis", - lineageFlags.getStartTimeMillis() == null ? 0 : lineageFlags.getStartTimeMillis()); - parameterMap.put( - "endTimeMillis", - lineageFlags.getEndTimeMillis() == null - ? System.currentTimeMillis() - : lineageFlags.getEndTimeMillis()); + "endTimeMillis", endTimeMillis == null ? System.currentTimeMillis() : endTimeMillis); return Pair.of(statement, parameterMap); } @@ -458,26 +458,26 @@ private Pair> generateLineageStatementAndParameters( @Nonnull public RelatedEntitiesResult findRelatedEntities( - @Nullable final List sourceTypes, - @Nonnull final Filter sourceEntityFilter, - @Nullable final List destinationTypes, - @Nonnull final Filter destinationEntityFilter, - @Nonnull final List relationshipTypes, - @Nonnull final RelationshipFilter relationshipFilter, - final int offset, - final int count) { + @Nullable final List sourceTypes, + @Nonnull final Filter sourceEntityFilter, + @Nullable final List destinationTypes, + @Nonnull final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter, + final int offset, + final int count) { log.debug( - String.format( - "Finding related Neo4j nodes sourceType: %s, sourceEntityFilter: %s, destinationType: %s, ", - sourceTypes, sourceEntityFilter, destinationTypes) - + String.format( - "destinationEntityFilter: %s, relationshipTypes: %s, relationshipFilter: %s, ", - destinationEntityFilter, relationshipTypes, relationshipFilter) - + String.format("offset: %s, count: %s", offset, count)); + String.format( + "Finding related Neo4j nodes sourceType: %s, sourceEntityFilter: %s, destinationType: %s, ", + sourceTypes, sourceEntityFilter, destinationTypes) + + String.format( + "destinationEntityFilter: %s, relationshipTypes: %s, relationshipFilter: %s, ", + destinationEntityFilter, relationshipTypes, relationshipFilter) + + String.format("offset: %s, count: %s", offset, count)); if (sourceTypes != null && sourceTypes.isEmpty() - || destinationTypes != null && destinationTypes.isEmpty()) { + || destinationTypes != null && destinationTypes.isEmpty()) { return new RelatedEntitiesResult(offset, 0, 0, Collections.emptyList()); } @@ -486,17 +486,28 @@ public RelatedEntitiesResult findRelatedEntities( final String edgeCriteria = relationshipFilterToCriteria(relationshipFilter); final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + String srcNodeLabel = ""; + // Create a URN from the String. Only proceed if srcCriteria is not null or empty + if (srcCriteria != null && !srcCriteria.isEmpty()) { + final String urnValue = sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString(); + try { + final Urn urn = Urn.createFromString(urnValue); + srcNodeLabel = urn.getEntityType(); + } catch (URISyntaxException e) { + log.error("Failed to parse URN: {} ", urnValue, e); + } + } - String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s"; + String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s"; if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s"; + matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s"; } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s"; + matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s"; } final String returnNodes = - String.format( - "RETURN dest, type(r)"); // Return both related entity and the relationship type. + String.format( + "RETURN dest, type(r)"); // Return both related entity and the relationship type. final String returnCount = "RETURN count(*)"; // For getting the total results. String relationshipTypeFilter = ""; @@ -508,69 +519,70 @@ public RelatedEntitiesResult findRelatedEntities( // Build Statement strings String baseStatementString = - String.format( - matchTemplate, - srcCriteria, - relationshipTypeFilter, - edgeCriteria, - destCriteria, - whereClause); + String.format( + matchTemplate, + srcNodeLabel, + srcCriteria, + relationshipTypeFilter, + edgeCriteria, + destCriteria, + whereClause); log.info(baseStatementString); final String resultStatementString = - String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes); + String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes); final String countStatementString = String.format("%s %s", baseStatementString, returnCount); // Build Statements final Statement resultStatement = - new Statement(resultStatementString, ImmutableMap.of("offset", offset, "count", count)); + new Statement(resultStatementString, ImmutableMap.of("offset", offset, "count", count)); final Statement countStatement = new Statement(countStatementString, Collections.emptyMap()); // Execute Queries final List relatedEntities = - runQuery(resultStatement) - .list( - record -> - new RelatedEntity( - record.values().get(1).asString(), // Relationship Type - record - .values() - .get(0) - .asNode() - .get("urn") - .asString(), // Urn TODO: Validate this works against Neo4j. - null)); + runQuery(resultStatement) + .list( + record -> + new RelatedEntity( + record.values().get(1).asString(), // Relationship Type + record + .values() + .get(0) + .asNode() + .get("urn") + .asString(), // Urn TODO: Validate this works against Neo4j. + null)); final int totalCount = runQuery(countStatement).single().get(0).asInt(); return new RelatedEntitiesResult(offset, relatedEntities.size(), totalCount, relatedEntities); } private String computeEntityTypeWhereClause( - @Nonnull final List sourceTypes, @Nonnull final List destinationTypes) { + @Nonnull final List sourceTypes, @Nonnull final List destinationTypes) { String whereClause = " WHERE left(type(r), 2)<>'r_' "; Boolean hasSourceTypes = sourceTypes != null && !sourceTypes.isEmpty(); Boolean hasDestTypes = destinationTypes != null && !destinationTypes.isEmpty(); if (hasSourceTypes && hasDestTypes) { whereClause = - String.format( - " WHERE left(type(r), 2)<>'r_' AND %s AND %s", - sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR ")), - destinationTypes.stream() - .map(type -> "dest:" + type) - .collect(Collectors.joining(" OR "))); + String.format( + " WHERE left(type(r), 2)<>'r_' AND %s AND %s", + sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR ")), + destinationTypes.stream() + .map(type -> "dest:" + type) + .collect(Collectors.joining(" OR "))); } else if (hasSourceTypes) { whereClause = - String.format( - " WHERE left(type(r), 2)<>'r_' AND %s", - sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR "))); + String.format( + " WHERE left(type(r), 2)<>'r_' AND %s", + sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR "))); } else if (hasDestTypes) { whereClause = - String.format( - " WHERE left(type(r), 2)<>'r_' AND %s", - destinationTypes.stream() - .map(type -> "dest:" + type) - .collect(Collectors.joining(" OR "))); + String.format( + " WHERE left(type(r), 2)<>'r_' AND %s", + destinationTypes.stream() + .map(type -> "dest:" + type) + .collect(Collectors.joining(" OR "))); } return whereClause; } @@ -579,8 +591,10 @@ public void removeNode(@Nonnull final Urn urn) { log.debug(String.format("Removing Neo4j node with urn: %s", urn)); + final String srcNodeLabel = urn.getEntityType(); + // also delete any relationship going to or from it - final String matchTemplate = "MATCH (node {urn: $urn}) DETACH DELETE node"; + final String matchTemplate = String.format("MATCH (node:%s {urn: $urn}) DETACH DELETE node", srcNodeLabel); final String statement = String.format(matchTemplate); final Map params = new HashMap<>(); @@ -601,23 +615,25 @@ public void removeNode(@Nonnull final Urn urn) { * @param relationshipFilter Query relationship filter */ public void removeEdgesFromNode( - @Nonnull final Urn urn, - @Nonnull final List relationshipTypes, - @Nonnull final RelationshipFilter relationshipFilter) { + @Nonnull final Urn urn, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter) { log.debug( - String.format( - "Removing Neo4j edge types from node with urn: %s, types: %s, filter: %s", - urn, relationshipTypes, relationshipFilter)); + String.format( + "Removing Neo4j edge types from node with urn: %s, types: %s, filter: %s", + urn, relationshipTypes, relationshipFilter)); - // also delete any relationship going to or from it + final String srcNodeLabel = urn.getEntityType(); + + // Determine the direction of the relationship final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - String matchTemplate = "MATCH (src {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2"; + String matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = "MATCH (src {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0"; + matchTemplate = String.format("MATCH (src:%s {urn: $urn})<-[r%%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = "MATCH (src {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1"; + matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); } String relationshipTypeFilter = ""; @@ -629,21 +645,21 @@ public void removeEdgesFromNode( final Map params = new HashMap<>(); params.put("urn", urn.toString()); List neo4jResult = - statement != null ? runQuery(buildStatement(statement, params)).list() : new ArrayList<>(); + statement != null ? runQuery(buildStatement(statement, params)).list() : new ArrayList<>(); if (!neo4jResult.isEmpty()) { String removeMode = neo4jResult.get(0).values().get(2).toString(); if (removeMode.equals("2")) { - final String matchDeleteTemplate = "MATCH (src {urn: $urn})-[r%s]-(dest) DELETE r"; + final String matchDeleteTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%%s]-(dest) DELETE r", srcNodeLabel); relationshipTypeFilter = ""; if (!relationshipTypes.isEmpty()) { relationshipTypeFilter = - ":" - + StringUtils.join(relationshipTypes, "|") - + "|r_" - + StringUtils.join(relationshipTypes, "|r_"); + ":" + + StringUtils.join(relationshipTypes, "|") + + "|r_" + + StringUtils.join(relationshipTypes, "|r_"); } final String statementNoDirection = - String.format(matchDeleteTemplate, relationshipTypeFilter); + String.format(matchDeleteTemplate, relationshipTypeFilter); runQuery(buildStatement(statementNoDirection, params)).consume(); } else { for (Record typeDest : neo4jResult) { @@ -663,7 +679,7 @@ public void removeEdgesFromNode( public void removeNodesMatchingLabel(@Nonnull String labelPattern) { log.debug(String.format("Removing Neo4j nodes matching label %s", labelPattern)); final String matchTemplate = - "MATCH (n) WHERE any(l IN labels(n) WHERE l=~'%s') DETACH DELETE n"; + "MATCH (n) WHERE any(l IN labels(n) WHERE l=~'%s') DETACH DELETE n"; final String statement = String.format(matchTemplate, labelPattern); final Map params = new HashMap<>(); @@ -726,12 +742,12 @@ private synchronized ExecutionResult executeStatements(@Nonnull List do { try { session.writeTransaction( - tx -> { - for (Statement statement : statements) { - tx.run(statement.getCommandText(), statement.getParams()); - } - return 0; - }); + tx -> { + for (Statement statement : statements) { + tx.run(statement.getCommandText(), statement.getParams()); + } + return 0; + }); lastException = null; break; } catch (Neo4jException e) { @@ -742,8 +758,8 @@ private synchronized ExecutionResult executeStatements(@Nonnull List if (lastException != null) { throw new RetryLimitReached( - "Failed to execute Neo4j write transaction after " + MAX_TRANSACTION_RETRY + " retries", - lastException); + "Failed to execute Neo4j write transaction after " + MAX_TRANSACTION_RETRY + " retries", + lastException); } stopWatch.stop(); @@ -802,10 +818,10 @@ private static String disjunctionToCriteria(final ConjunctiveCriterionArray disj if (disjunction.size() > 1) { // TODO: Support disjunctions (ORs). throw new UnsupportedOperationException( - "Neo4j query filter only supports 1 set of conjunction criteria"); + "Neo4j query filter only supports 1 set of conjunction criteria"); } final CriterionArray criterionArray = - disjunction.size() > 0 ? disjunction.get(0).getAnd() : new CriterionArray(); + disjunction.size() > 0 ? disjunction.get(0).getAnd() : new CriterionArray(); return criterionToString(criterionArray); } @@ -818,15 +834,15 @@ private static String disjunctionToCriteria(final ConjunctiveCriterionArray disj @Nonnull private static String criterionToString(@Nonnull CriterionArray criterionArray) { if (!criterionArray.stream() - .allMatch(criterion -> Condition.EQUAL.equals(criterion.getCondition()))) { + .allMatch(criterion -> Condition.EQUAL.equals(criterion.getCondition()))) { throw new RuntimeException( - "Neo4j query filter only support EQUAL condition " + criterionArray); + "Neo4j query filter only support EQUAL condition " + criterionArray); } final StringJoiner joiner = new StringJoiner(",", "{", "}"); criterionArray.forEach( - criterion -> joiner.add(toCriterionString(criterion.getField(), criterion.getValue()))); + criterion -> joiner.add(toCriterionString(criterion.getField(), criterion.getValue()))); return joiner.length() <= 2 ? "" : joiner.toString(); } @@ -865,17 +881,17 @@ public boolean supportsMultiHop() { * @param relationshipType Entity relationship type */ private boolean isSourceDestReversed( - @Nonnull String sourceType, @Nonnull String relationshipType) { + @Nonnull String sourceType, @Nonnull String relationshipType) { // Get real direction by check INCOMING/OUTGOING direction and RelationshipType LineageRegistry.LineageSpec sourceLineageSpec = getLineageRegistry().getLineageSpec(sourceType); if (sourceLineageSpec != null) { List upstreamCheck = - sourceLineageSpec.getUpstreamEdges().stream() - .filter( - t -> - t.getDirection() == RelationshipDirection.OUTGOING - && t.getType().equals(relationshipType)) - .collect(Collectors.toList()); + sourceLineageSpec.getUpstreamEdges().stream() + .filter( + t -> + t.getDirection() == RelationshipDirection.OUTGOING + && t.getType().equals(relationshipType)) + .collect(Collectors.toList()); if (!upstreamCheck.isEmpty() || sourceType.equals("schemaField")) { return true; } @@ -894,17 +910,17 @@ private boolean isSourceDestReversed( @Nonnull @Override public RelatedEntitiesScrollResult scrollRelatedEntities( - @Nullable List sourceTypes, - @Nonnull Filter sourceEntityFilter, - @Nullable List destinationTypes, - @Nonnull Filter destinationEntityFilter, - @Nonnull List relationshipTypes, - @Nonnull RelationshipFilter relationshipFilter, - @Nonnull List sortCriterion, - @Nullable String scrollId, - int count, - @Nullable Long startTimeMillis, - @Nullable Long endTimeMillis) { + @Nullable List sourceTypes, + @Nonnull Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nonnull Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { throw new IllegalArgumentException("Not implemented"); } } From eb8b05a5d8710f278368ea4894c94b52ec808b14 Mon Sep 17 00:00:00 2001 From: Pasha Shaik Date: Thu, 2 May 2024 12:53:41 +0200 Subject: [PATCH 2/6] perf(neo4j): improve query performance by using node labels --- .../graph/neo4j/Neo4jGraphService.java | 539 +++++++++--------- 1 file changed, 272 insertions(+), 267 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 0256ea1144cc3..ef126ceb2734f 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -9,7 +9,9 @@ import com.linkedin.common.UrnArray; import com.linkedin.common.UrnArrayArray; import com.linkedin.common.urn.Urn; -import com.linkedin.metadata.graph.Edge; +import com.linkedin.metadata.aspect.models.graph.Edge; +import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult; +import com.linkedin.metadata.aspect.models.graph.RelatedEntity; import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphFilters; import com.linkedin.metadata.graph.GraphService; @@ -17,9 +19,8 @@ import com.linkedin.metadata.graph.LineageRelationship; import com.linkedin.metadata.graph.LineageRelationshipArray; import com.linkedin.metadata.graph.RelatedEntitiesResult; -import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; -import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.query.LineageFlags; import com.linkedin.metadata.query.filter.Condition; import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray; import com.linkedin.metadata.query.filter.CriterionArray; @@ -71,9 +72,9 @@ public Neo4jGraphService(@Nonnull LineageRegistry lineageRegistry, @Nonnull Driv } public Neo4jGraphService( - @Nonnull LineageRegistry lineageRegistry, - @Nonnull Driver driver, - @Nonnull SessionConfig sessionConfig) { + @Nonnull LineageRegistry lineageRegistry, + @Nonnull Driver driver, + @Nonnull SessionConfig sessionConfig) { this._lineageRegistry = lineageRegistry; this._driver = driver; this._sessionConfig = sessionConfig; @@ -88,9 +89,9 @@ public LineageRegistry getLineageRegistry() { public void addEdge(@Nonnull final Edge edge) { log.debug( - String.format( - "Adding Edge source: %s, destination: %s, type: %s", - edge.getSource(), edge.getDestination(), edge.getRelationshipType())); + String.format( + "Adding Edge source: %s, destination: %s, type: %s", + edge.getSource(), edge.getDestination(), edge.getRelationshipType())); final String sourceType = edge.getSource().getEntityType(); final String destinationType = edge.getDestination().getEntityType(); @@ -122,24 +123,24 @@ public void addEdge(@Nonnull final Edge edge) { // Add/Update relationship final String mergeRelationshipTemplate = - "MATCH (source:%s {urn: '%s'}),(destination:%s {urn: '%s'}) MERGE (source)-[r:%s]->(destination) "; + "MATCH (source:%s {urn: '%s'}),(destination:%s {urn: '%s'}) MERGE (source)-[r:%s]->(destination) "; String statement = - String.format( - mergeRelationshipTemplate, - sourceType, - sourceUrn, - destinationType, - destinationUrn, - edge.getRelationshipType()); + String.format( + mergeRelationshipTemplate, + sourceType, + sourceUrn, + destinationType, + destinationUrn, + edge.getRelationshipType()); String statementR = - String.format( - mergeRelationshipTemplate, - startType, - startUrn, - endType, - endUrn, - reverseRelationshipType); + String.format( + mergeRelationshipTemplate, + startType, + startUrn, + endType, + endUrn, + reverseRelationshipType); // Add/Update relationship properties String setCreatedOnTemplate; @@ -168,26 +169,26 @@ public void addEdge(@Nonnull final Edge edge) { for (Map.Entry entry : edge.getProperties().entrySet()) { // Make sure extra keys in properties are not preserved final Set preservedKeySet = - Set.of("createdOn", "createdActor", "updatedOn", "updatedActor", "startUrn", "endUrn"); + Set.of("createdOn", "createdActor", "updatedOn", "updatedActor", "startUrn", "endUrn"); if (preservedKeySet.contains(entry.getKey())) { throw new UnsupportedOperationException( - String.format( - "Tried setting properties on graph edge but property key is preserved. Key: %s", - entry.getKey())); + String.format( + "Tried setting properties on graph edge but property key is preserved. Key: %s", + entry.getKey())); } if (entry.getValue() instanceof String) { setPropertyTemplate = String.format("r.%s = '%s'", entry.getKey(), entry.getValue()); propertiesTemplateJoiner.add(setPropertyTemplate); } else { throw new UnsupportedOperationException( - String.format( - "Tried setting properties on graph edge but property value type is not supported. Key: %s, Value: %s ", - entry.getKey(), entry.getValue())); + String.format( + "Tried setting properties on graph edge but property value type is not supported. Key: %s, Value: %s ", + entry.getKey(), entry.getValue())); } } } final String setStartEndUrnTemplate = - String.format("r.startUrn = '%s', r.endUrn = '%s'", startUrn, endUrn); + String.format("r.startUrn = '%s', r.endUrn = '%s'", startUrn, endUrn); propertiesTemplateJoiner.add(setStartEndUrnTemplate); if (!StringUtils.isEmpty(propertiesTemplateJoiner.toString())) { statementR = String.format("%s SET %s", statementR, propertiesTemplateJoiner); @@ -206,9 +207,9 @@ public void upsertEdge(final Edge edge) { @Override public void removeEdge(final Edge edge) { log.debug( - String.format( - "Deleting Edge source: %s, destination: %s, type: %s", - edge.getSource(), edge.getDestination(), edge.getRelationshipType())); + String.format( + "Deleting Edge source: %s, destination: %s, type: %s", + edge.getSource(), edge.getDestination(), edge.getRelationshipType())); final String sourceType = edge.getSource().getEntityType(); final String destinationType = edge.getDestination().getEntityType(); @@ -232,23 +233,23 @@ public void removeEdge(final Edge edge) { // DELETE relationship final String mergeRelationshipTemplate = - "MATCH (source:%s {urn: '%s'})-[r:%s]->(destination:%s {urn: '%s'}) DELETE r"; + "MATCH (source:%s {urn: '%s'})-[r:%s]->(destination:%s {urn: '%s'}) DELETE r"; final String statement = - String.format( - mergeRelationshipTemplate, - sourceType, - sourceUrn, - edge.getRelationshipType(), - destinationType, - destinationUrn); + String.format( + mergeRelationshipTemplate, + sourceType, + sourceUrn, + edge.getRelationshipType(), + destinationType, + destinationUrn); final String statementR = - String.format( - mergeRelationshipTemplate, - startType, - startUrn, - reverseRelationshipType, - endType, - endUrn); + String.format( + mergeRelationshipTemplate, + startType, + startUrn, + reverseRelationshipType, + endType, + endUrn); statements.add(buildStatement(statement, new HashMap<>())); statements.add(buildStatement(statementR, new HashMap<>())); @@ -259,76 +260,75 @@ public void removeEdge(final Edge edge) { @WithSpan @Override public EntityLineageResult getLineage( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int offset, - int count, - int maxHops) { - return getLineage(entityUrn, direction, graphFilters, offset, count, maxHops, null, null); + @Nonnull Urn entityUrn, + @Nonnull LineageDirection direction, + GraphFilters graphFilters, + int offset, + int count, + int maxHops) { + return getLineage(entityUrn, direction, graphFilters, offset, count, maxHops, null); } @Nonnull @Override public EntityLineageResult getLineage( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int offset, - int count, - int maxHops, - @Nullable Long startTimeMillis, - @Nullable Long endTimeMillis) { + @Nonnull Urn entityUrn, + @Nonnull LineageDirection direction, + GraphFilters graphFilters, + int offset, + int count, + int maxHops, + @Nullable LineageFlags lineageFlags) { log.debug(String.format("Neo4j getLineage maxHops = %d", maxHops)); final var statementAndParams = - generateLineageStatementAndParameters( - entityUrn, direction, graphFilters, maxHops, startTimeMillis, endTimeMillis); + generateLineageStatementAndParameters( + entityUrn, direction, graphFilters, maxHops, lineageFlags); final var statement = statementAndParams.getFirst(); final var parameters = statementAndParams.getSecond(); List neo4jResult = - statement != null - ? runQuery(buildStatement(statement, parameters)).list() - : new ArrayList<>(); + statement != null + ? runQuery(buildStatement(statement, parameters)).list() + : new ArrayList<>(); LineageRelationshipArray relations = new LineageRelationshipArray(); neo4jResult.stream() - .skip(offset) - .limit(count) - .forEach( - item -> { - String urn = item.values().get(2).asNode().get("urn").asString(); - try { - final var path = item.get(1).asPath(); - final List nodeListAsPath = - StreamSupport.stream(path.nodes().spliterator(), false) - .map(node -> createFromString(node.get("urn").asString())) - .collect(Collectors.toList()); - - final var firstRelationship = - Optional.ofNullable(Iterables.getFirst(path.relationships(), null)); - - relations.add( - new LineageRelationship() - .setEntity(Urn.createFromString(urn)) - // although firstRelationship should never be absent, provide "" as fallback - // value - .setType(firstRelationship.map(Relationship::type).orElse("")) - .setDegree(path.length()) - .setPaths(new UrnArrayArray(new UrnArray(nodeListAsPath)))); - } catch (URISyntaxException ignored) { - log.warn( - String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage())); - } - }); + .skip(offset) + .limit(count) + .forEach( + item -> { + String urn = item.values().get(2).asNode().get("urn").asString(); + try { + final var path = item.get(1).asPath(); + final List nodeListAsPath = + StreamSupport.stream(path.nodes().spliterator(), false) + .map(node -> createFromString(node.get("urn").asString())) + .collect(Collectors.toList()); + + final var firstRelationship = + Optional.ofNullable(Iterables.getFirst(path.relationships(), null)); + + relations.add( + new LineageRelationship() + .setEntity(Urn.createFromString(urn)) + // although firstRelationship should never be absent, provide "" as fallback + // value + .setType(firstRelationship.map(Relationship::type).orElse("")) + .setDegree(path.length()) + .setPaths(new UrnArrayArray(new UrnArray(nodeListAsPath)))); + } catch (URISyntaxException ignored) { + log.warn( + String.format("Can't convert urn = %s, Error = %s", urn, ignored.getMessage())); + } + }); EntityLineageResult result = - new EntityLineageResult() - .setStart(offset) - .setCount(relations.size()) - .setRelationships(relations) - .setTotal(neo4jResult.size()); + new EntityLineageResult() + .setStart(offset) + .setCount(relations.size()) + .setRelationships(relations) + .setTotal(neo4jResult.size()); log.debug(String.format("Neo4j getLineage results = %s", result)); return result; @@ -339,7 +339,7 @@ private String getPathFindingLabelFilter(List entityNames) { } private String getPathFindingRelationshipFilter( - @Nonnull List entityNames, @Nullable LineageDirection direction) { + @Nonnull List entityNames, @Nullable LineageDirection direction) { // relationshipFilter supports mixing different directions for various relation types, // so simply transform entries lineage registry into format of filter final var filterComponents = new HashSet(); @@ -356,9 +356,9 @@ private String getPathFindingRelationshipFilter( } else { // return disjunctive combination of edge types regardless of direction for (final var direction1 : - List.of(LineageDirection.UPSTREAM, LineageDirection.DOWNSTREAM)) { + List.of(LineageDirection.UPSTREAM, LineageDirection.DOWNSTREAM)) { for (final var edgeInfo : - _lineageRegistry.getLineageRelationships(entityName, direction1)) { + _lineageRegistry.getLineageRelationships(entityName, direction1)) { filterComponents.add(edgeInfo.getType()); } } @@ -368,40 +368,39 @@ private String getPathFindingRelationshipFilter( } private Pair> generateLineageStatementAndParameters( - @Nonnull Urn entityUrn, - @Nonnull LineageDirection direction, - GraphFilters graphFilters, - int maxHops, - @Nullable Long startTimeMillis, - @Nullable Long endTimeMillis) { + @Nonnull Urn entityUrn, + @Nonnull LineageDirection direction, + GraphFilters graphFilters, + int maxHops, + @Nullable LineageFlags lineageFlags) { final var parameterMap = - new HashMap( - Map.of( - "urn", entityUrn.toString(), - "labelFilter", getPathFindingLabelFilter(graphFilters.getAllowedEntityTypes()), - "relationshipFilter", - getPathFindingRelationshipFilter( - graphFilters.getAllowedEntityTypes(), direction), - "maxHops", maxHops)); + new HashMap( + Map.of( + "urn", entityUrn.toString(), + "labelFilter", getPathFindingLabelFilter(graphFilters.getAllowedEntityTypes()), + "relationshipFilter", + getPathFindingRelationshipFilter( + graphFilters.getAllowedEntityTypes(), direction), + "maxHops", maxHops)); // Get the entity type from the URN final String entityType = entityUrn.getEntityType(); - if (startTimeMillis == null && endTimeMillis == null) { + if (lineageFlags == null + || (lineageFlags.getStartTimeMillis() == null && lineageFlags.getEndTimeMillis() == null)) { // if no time filtering required, simply find all expansion paths to other nodes final var statement = String.format( - "MATCH (a:%s {urn: $urn}) " - + "CALL apoc.path.spanningTree(a, { " - + " relationshipFilter: $relationshipFilter, " - + " labelFilter: $labelFilter, " - + " minLevel: 1, " - + " maxLevel: $maxHops " - + "}) " - + "YIELD path " - + "WITH a, path AS path " - + "RETURN a, path, last(nodes(path));", - entityType); + "MATCH (a:%s {urn: $urn}) " + + "CALL apoc.path.spanningTree(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD path " + + "WITH a, path AS path " + + "RETURN a, path, last(nodes(path));", entityType); return Pair.of(statement, parameterMap); } else { // when needing time filtering, possibility on multiple paths between two @@ -409,13 +408,13 @@ private Pair> generateLineageStatementAndParameters( // use r_ edges until they are no longer useful final var relationFilter = - getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), null) - .replaceAll("(\\w+)", "r_$1"); + getPathFindingRelationshipFilter(graphFilters.getAllowedEntityTypes(), null) + .replaceAll("(\\w+)", "r_$1"); final var relationshipPattern = - String.format( - (direction == LineageDirection.UPSTREAM ? "<-[:%s*1..%d]-" : "-[:%s*1..%d]->"), - relationFilter, - maxHops); + String.format( + (direction == LineageDirection.UPSTREAM ? "<-[:%s*1..%d]-" : "-[:%s*1..%d]->"), + relationFilter, + maxHops); // two steps: // 1. find list of nodes reachable within maxHops @@ -424,33 +423,37 @@ private Pair> generateLineageStatementAndParameters( // exploration, not // after path exploration is done) final var statement = String.format( - "MATCH (a:%s {urn: $urn}) " - + "CALL apoc.path.subgraphNodes(a, { " - + " relationshipFilter: $relationshipFilter, " - + " labelFilter: $labelFilter, " - + " minLevel: 1, " - + " maxLevel: $maxHops " - + "}) " - + "YIELD node AS b " - + "WITH a, b " - + "MATCH path = shortestPath((a)" - + relationshipPattern - + "(b)) " - + "WHERE a <> b " - + " AND ALL(rt IN relationships(path) WHERE " - + " (rt.source IS NOT NULL AND rt.source = 'UI') OR " - + " (rt.createdOn IS NULL AND rt.updatedOn IS NULL) OR " - + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " - + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " - + " ) " - + "RETURN a, path, b;", - entityType); + "MATCH (a:%s {urn: $urn}) " + + "CALL apoc.path.subgraphNodes(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD node AS b " + + "WITH a, b " + + "MATCH path = shortestPath((a)" + + relationshipPattern + + "(b)) " + + "WHERE a <> b " + + " AND ALL(rt IN relationships(path) WHERE " + + " (rt.source IS NOT NULL AND rt.source = 'UI') OR " + + " (rt.createdOn IS NULL AND rt.updatedOn IS NULL) OR " + + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " + + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " + + " ) " + + "RETURN a, path, b;", entityType); // provide dummy start/end time when not provided, so no need to // format clause differently if either of them is missing - parameterMap.put("startTimeMillis", startTimeMillis == null ? 0 : startTimeMillis); parameterMap.put( - "endTimeMillis", endTimeMillis == null ? System.currentTimeMillis() : endTimeMillis); + "startTimeMillis", + lineageFlags.getStartTimeMillis() == null ? 0 : lineageFlags.getStartTimeMillis()); + parameterMap.put( + "endTimeMillis", + lineageFlags.getEndTimeMillis() == null + ? System.currentTimeMillis() + : lineageFlags.getEndTimeMillis()); return Pair.of(statement, parameterMap); } @@ -458,26 +461,26 @@ private Pair> generateLineageStatementAndParameters( @Nonnull public RelatedEntitiesResult findRelatedEntities( - @Nullable final List sourceTypes, - @Nonnull final Filter sourceEntityFilter, - @Nullable final List destinationTypes, - @Nonnull final Filter destinationEntityFilter, - @Nonnull final List relationshipTypes, - @Nonnull final RelationshipFilter relationshipFilter, - final int offset, - final int count) { + @Nullable final List sourceTypes, + @Nonnull final Filter sourceEntityFilter, + @Nullable final List destinationTypes, + @Nonnull final Filter destinationEntityFilter, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter, + final int offset, + final int count) { log.debug( - String.format( - "Finding related Neo4j nodes sourceType: %s, sourceEntityFilter: %s, destinationType: %s, ", - sourceTypes, sourceEntityFilter, destinationTypes) - + String.format( - "destinationEntityFilter: %s, relationshipTypes: %s, relationshipFilter: %s, ", - destinationEntityFilter, relationshipTypes, relationshipFilter) - + String.format("offset: %s, count: %s", offset, count)); + String.format( + "Finding related Neo4j nodes sourceType: %s, sourceEntityFilter: %s, destinationType: %s, ", + sourceTypes, sourceEntityFilter, destinationTypes) + + String.format( + "destinationEntityFilter: %s, relationshipTypes: %s, relationshipFilter: %s, ", + destinationEntityFilter, relationshipTypes, relationshipFilter) + + String.format("offset: %s, count: %s", offset, count)); if (sourceTypes != null && sourceTypes.isEmpty() - || destinationTypes != null && destinationTypes.isEmpty()) { + || destinationTypes != null && destinationTypes.isEmpty()) { return new RelatedEntitiesResult(offset, 0, 0, Collections.emptyList()); } @@ -486,6 +489,7 @@ public RelatedEntitiesResult findRelatedEntities( final String edgeCriteria = relationshipFilterToCriteria(relationshipFilter); final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); + String srcNodeLabel = ""; // Create a URN from the String. Only proceed if srcCriteria is not null or empty if (srcCriteria != null && !srcCriteria.isEmpty()) { @@ -506,8 +510,8 @@ public RelatedEntitiesResult findRelatedEntities( } final String returnNodes = - String.format( - "RETURN dest, type(r)"); // Return both related entity and the relationship type. + String.format( + "RETURN dest, type(r)"); // Return both related entity and the relationship type. final String returnCount = "RETURN count(*)"; // For getting the total results. String relationshipTypeFilter = ""; @@ -519,70 +523,70 @@ public RelatedEntitiesResult findRelatedEntities( // Build Statement strings String baseStatementString = - String.format( - matchTemplate, - srcNodeLabel, - srcCriteria, - relationshipTypeFilter, - edgeCriteria, - destCriteria, - whereClause); + String.format( + matchTemplate, + srcNodeLabel, + srcCriteria, + relationshipTypeFilter, + edgeCriteria, + destCriteria, + whereClause); log.info(baseStatementString); final String resultStatementString = - String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes); + String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes); final String countStatementString = String.format("%s %s", baseStatementString, returnCount); // Build Statements final Statement resultStatement = - new Statement(resultStatementString, ImmutableMap.of("offset", offset, "count", count)); + new Statement(resultStatementString, ImmutableMap.of("offset", offset, "count", count)); final Statement countStatement = new Statement(countStatementString, Collections.emptyMap()); // Execute Queries final List relatedEntities = - runQuery(resultStatement) - .list( - record -> - new RelatedEntity( - record.values().get(1).asString(), // Relationship Type - record - .values() - .get(0) - .asNode() - .get("urn") - .asString(), // Urn TODO: Validate this works against Neo4j. - null)); + runQuery(resultStatement) + .list( + record -> + new RelatedEntity( + record.values().get(1).asString(), // Relationship Type + record + .values() + .get(0) + .asNode() + .get("urn") + .asString(), // Urn TODO: Validate this works against Neo4j. + null)); final int totalCount = runQuery(countStatement).single().get(0).asInt(); return new RelatedEntitiesResult(offset, relatedEntities.size(), totalCount, relatedEntities); } private String computeEntityTypeWhereClause( - @Nonnull final List sourceTypes, @Nonnull final List destinationTypes) { + @Nonnull final List sourceTypes, @Nonnull final List destinationTypes) { String whereClause = " WHERE left(type(r), 2)<>'r_' "; Boolean hasSourceTypes = sourceTypes != null && !sourceTypes.isEmpty(); Boolean hasDestTypes = destinationTypes != null && !destinationTypes.isEmpty(); if (hasSourceTypes && hasDestTypes) { whereClause = - String.format( - " WHERE left(type(r), 2)<>'r_' AND %s AND %s", - sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR ")), - destinationTypes.stream() - .map(type -> "dest:" + type) - .collect(Collectors.joining(" OR "))); + String.format( + " WHERE left(type(r), 2)<>'r_' AND %s AND %s", + sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR ")), + destinationTypes.stream() + .map(type -> "dest:" + type) + .collect(Collectors.joining(" OR "))); } else if (hasSourceTypes) { whereClause = - String.format( - " WHERE left(type(r), 2)<>'r_' AND %s", - sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR "))); + String.format( + " WHERE left(type(r), 2)<>'r_' AND %s", + sourceTypes.stream().map(type -> "src:" + type).collect(Collectors.joining(" OR "))); } else if (hasDestTypes) { whereClause = - String.format( - " WHERE left(type(r), 2)<>'r_' AND %s", - destinationTypes.stream() - .map(type -> "dest:" + type) - .collect(Collectors.joining(" OR "))); + String.format( + " WHERE left(type(r), 2)<>'r_' AND %s", + destinationTypes.stream() + .map(type -> "dest:" + type) + .collect(Collectors.joining(" OR "))); } return whereClause; } @@ -615,25 +619,26 @@ public void removeNode(@Nonnull final Urn urn) { * @param relationshipFilter Query relationship filter */ public void removeEdgesFromNode( - @Nonnull final Urn urn, - @Nonnull final List relationshipTypes, - @Nonnull final RelationshipFilter relationshipFilter) { + @Nonnull final Urn urn, + @Nonnull final List relationshipTypes, + @Nonnull final RelationshipFilter relationshipFilter) { log.debug( - String.format( - "Removing Neo4j edge types from node with urn: %s, types: %s, filter: %s", - urn, relationshipTypes, relationshipFilter)); - - final String srcNodeLabel = urn.getEntityType(); + String.format( + "Removing Neo4j edge types from node with urn: %s, types: %s, filter: %s", + urn, relationshipTypes, relationshipFilter)); - // Determine the direction of the relationship + // also delete any relationship going to or from it final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - String matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); + // build node label from entity type + final String srcNodeLabel = urn.getEntityType(); + + String matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = String.format("MATCH (src:%s {urn: $urn})<-[r%%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); + matchTemplate = String.format("MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); + matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); } String relationshipTypeFilter = ""; @@ -645,21 +650,21 @@ public void removeEdgesFromNode( final Map params = new HashMap<>(); params.put("urn", urn.toString()); List neo4jResult = - statement != null ? runQuery(buildStatement(statement, params)).list() : new ArrayList<>(); + statement != null ? runQuery(buildStatement(statement, params)).list() : new ArrayList<>(); if (!neo4jResult.isEmpty()) { String removeMode = neo4jResult.get(0).values().get(2).toString(); if (removeMode.equals("2")) { - final String matchDeleteTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%%s]-(dest) DELETE r", srcNodeLabel); + final String matchDeleteTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%s]-(dest) DELETE r", srcNodeLabel); relationshipTypeFilter = ""; if (!relationshipTypes.isEmpty()) { relationshipTypeFilter = - ":" - + StringUtils.join(relationshipTypes, "|") - + "|r_" - + StringUtils.join(relationshipTypes, "|r_"); + ":" + + StringUtils.join(relationshipTypes, "|") + + "|r_" + + StringUtils.join(relationshipTypes, "|r_"); } final String statementNoDirection = - String.format(matchDeleteTemplate, relationshipTypeFilter); + String.format(matchDeleteTemplate, relationshipTypeFilter); runQuery(buildStatement(statementNoDirection, params)).consume(); } else { for (Record typeDest : neo4jResult) { @@ -679,7 +684,7 @@ public void removeEdgesFromNode( public void removeNodesMatchingLabel(@Nonnull String labelPattern) { log.debug(String.format("Removing Neo4j nodes matching label %s", labelPattern)); final String matchTemplate = - "MATCH (n) WHERE any(l IN labels(n) WHERE l=~'%s') DETACH DELETE n"; + "MATCH (n) WHERE any(l IN labels(n) WHERE l=~'%s') DETACH DELETE n"; final String statement = String.format(matchTemplate, labelPattern); final Map params = new HashMap<>(); @@ -742,12 +747,12 @@ private synchronized ExecutionResult executeStatements(@Nonnull List do { try { session.writeTransaction( - tx -> { - for (Statement statement : statements) { - tx.run(statement.getCommandText(), statement.getParams()); - } - return 0; - }); + tx -> { + for (Statement statement : statements) { + tx.run(statement.getCommandText(), statement.getParams()); + } + return 0; + }); lastException = null; break; } catch (Neo4jException e) { @@ -758,8 +763,8 @@ private synchronized ExecutionResult executeStatements(@Nonnull List if (lastException != null) { throw new RetryLimitReached( - "Failed to execute Neo4j write transaction after " + MAX_TRANSACTION_RETRY + " retries", - lastException); + "Failed to execute Neo4j write transaction after " + MAX_TRANSACTION_RETRY + " retries", + lastException); } stopWatch.stop(); @@ -818,10 +823,10 @@ private static String disjunctionToCriteria(final ConjunctiveCriterionArray disj if (disjunction.size() > 1) { // TODO: Support disjunctions (ORs). throw new UnsupportedOperationException( - "Neo4j query filter only supports 1 set of conjunction criteria"); + "Neo4j query filter only supports 1 set of conjunction criteria"); } final CriterionArray criterionArray = - disjunction.size() > 0 ? disjunction.get(0).getAnd() : new CriterionArray(); + disjunction.size() > 0 ? disjunction.get(0).getAnd() : new CriterionArray(); return criterionToString(criterionArray); } @@ -834,15 +839,15 @@ private static String disjunctionToCriteria(final ConjunctiveCriterionArray disj @Nonnull private static String criterionToString(@Nonnull CriterionArray criterionArray) { if (!criterionArray.stream() - .allMatch(criterion -> Condition.EQUAL.equals(criterion.getCondition()))) { + .allMatch(criterion -> Condition.EQUAL.equals(criterion.getCondition()))) { throw new RuntimeException( - "Neo4j query filter only support EQUAL condition " + criterionArray); + "Neo4j query filter only support EQUAL condition " + criterionArray); } final StringJoiner joiner = new StringJoiner(",", "{", "}"); criterionArray.forEach( - criterion -> joiner.add(toCriterionString(criterion.getField(), criterion.getValue()))); + criterion -> joiner.add(toCriterionString(criterion.getField(), criterion.getValue()))); return joiner.length() <= 2 ? "" : joiner.toString(); } @@ -881,17 +886,17 @@ public boolean supportsMultiHop() { * @param relationshipType Entity relationship type */ private boolean isSourceDestReversed( - @Nonnull String sourceType, @Nonnull String relationshipType) { + @Nonnull String sourceType, @Nonnull String relationshipType) { // Get real direction by check INCOMING/OUTGOING direction and RelationshipType LineageRegistry.LineageSpec sourceLineageSpec = getLineageRegistry().getLineageSpec(sourceType); if (sourceLineageSpec != null) { List upstreamCheck = - sourceLineageSpec.getUpstreamEdges().stream() - .filter( - t -> - t.getDirection() == RelationshipDirection.OUTGOING - && t.getType().equals(relationshipType)) - .collect(Collectors.toList()); + sourceLineageSpec.getUpstreamEdges().stream() + .filter( + t -> + t.getDirection() == RelationshipDirection.OUTGOING + && t.getType().equals(relationshipType)) + .collect(Collectors.toList()); if (!upstreamCheck.isEmpty() || sourceType.equals("schemaField")) { return true; } @@ -910,17 +915,17 @@ private boolean isSourceDestReversed( @Nonnull @Override public RelatedEntitiesScrollResult scrollRelatedEntities( - @Nullable List sourceTypes, - @Nonnull Filter sourceEntityFilter, - @Nullable List destinationTypes, - @Nonnull Filter destinationEntityFilter, - @Nonnull List relationshipTypes, - @Nonnull RelationshipFilter relationshipFilter, - @Nonnull List sortCriterion, - @Nullable String scrollId, - int count, - @Nullable Long startTimeMillis, - @Nullable Long endTimeMillis) { + @Nullable List sourceTypes, + @Nonnull Filter sourceEntityFilter, + @Nullable List destinationTypes, + @Nonnull Filter destinationEntityFilter, + @Nonnull List relationshipTypes, + @Nonnull RelationshipFilter relationshipFilter, + @Nonnull List sortCriterion, + @Nullable String scrollId, + int count, + @Nullable Long startTimeMillis, + @Nullable Long endTimeMillis) { throw new IllegalArgumentException("Not implemented"); } } From 7346e42bbfcb38a864aa2605a6ec081bb9325221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CPasha?= <“shaikp@mediamarktsaturn.com”> Date: Thu, 23 May 2024 16:49:12 +0200 Subject: [PATCH 3/6] perf(neo4j): fix the gradle build issues occurred while implementing for Neo4j Query performance --- .../graph/neo4j/Neo4jGraphService.java | 89 +++++++++++-------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 75e694ffcc45d..16ff6df07e799 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -393,17 +393,19 @@ private Pair> generateLineageStatementAndParameters( if (lineageFlags == null || (lineageFlags.getStartTimeMillis() == null && lineageFlags.getEndTimeMillis() == null)) { // if no time filtering required, simply find all expansion paths to other nodes - final var statement = String.format( - "MATCH (a:%s {urn: $urn}) " - + "CALL apoc.path.spanningTree(a, { " - + " relationshipFilter: $relationshipFilter, " - + " labelFilter: $labelFilter, " - + " minLevel: 1, " - + " maxLevel: $maxHops " - + "}) " - + "YIELD path " - + "WITH a, path AS path " - + "RETURN a, path, last(nodes(path));", entityType); + final var statement = + String.format( + "MATCH (a:%s {urn: $urn}) " + + "CALL apoc.path.spanningTree(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD path " + + "WITH a, path AS path " + + "RETURN a, path, last(nodes(path));", + entityType); return Pair.of(statement, parameterMap); } else { // when needing time filtering, possibility on multiple paths between two @@ -425,27 +427,29 @@ private Pair> generateLineageStatementAndParameters( // (note: according to the docs of shortestPath, WHERE conditions are applied during path // exploration, not // after path exploration is done) - final var statement = String.format( - "MATCH (a:%s {urn: $urn}) " - + "CALL apoc.path.subgraphNodes(a, { " - + " relationshipFilter: $relationshipFilter, " - + " labelFilter: $labelFilter, " - + " minLevel: 1, " - + " maxLevel: $maxHops " - + "}) " - + "YIELD node AS b " - + "WITH a, b " - + "MATCH path = shortestPath((a)" - + relationshipPattern - + "(b)) " - + "WHERE a <> b " - + " AND ALL(rt IN relationships(path) WHERE " - + " (rt.source IS NOT NULL AND rt.source = 'UI') OR " - + " (rt.createdOn IS NULL AND rt.updatedOn IS NULL) OR " - + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " - + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " - + " ) " - + "RETURN a, path, b;", entityType); + final var statement = + String.format( + "MATCH (a:%s {urn: $urn}) " + + "CALL apoc.path.subgraphNodes(a, { " + + " relationshipFilter: $relationshipFilter, " + + " labelFilter: $labelFilter, " + + " minLevel: 1, " + + " maxLevel: $maxHops " + + "}) " + + "YIELD node AS b " + + "WITH a, b " + + "MATCH path = shortestPath((a)" + + relationshipPattern + + "(b)) " + + "WHERE a <> b " + + " AND ALL(rt IN relationships(path) WHERE " + + " (rt.source IS NOT NULL AND rt.source = 'UI') OR " + + " (rt.createdOn IS NULL AND rt.updatedOn IS NULL) OR " + + " ($startTimeMillis <= rt.createdOn <= $endTimeMillis OR " + + " $startTimeMillis <= rt.updatedOn <= $endTimeMillis) " + + " ) " + + "RETURN a, path, b;", + entityType); // provide dummy start/end time when not provided, so no need to // format clause differently if either of them is missing @@ -496,7 +500,8 @@ public RelatedEntitiesResult findRelatedEntities( String srcNodeLabel = ""; // Create a URN from the String. Only proceed if srcCriteria is not null or empty if (srcCriteria != null && !srcCriteria.isEmpty()) { - final String urnValue = sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString(); + final String urnValue = + sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString(); try { final Urn urn = Urn.createFromString(urnValue); srcNodeLabel = urn.getEntityType(); @@ -601,7 +606,8 @@ public void removeNode(@Nonnull final Urn urn) { final String srcNodeLabel = urn.getEntityType(); // also delete any relationship going to or from it - final String matchTemplate = String.format("MATCH (node:%s {urn: $urn}) DETACH DELETE node", srcNodeLabel); + final String matchTemplate = + String.format("MATCH (node:%s {urn: $urn}) DETACH DELETE node", srcNodeLabel); final String statement = String.format(matchTemplate); final Map params = new HashMap<>(); @@ -637,11 +643,17 @@ public void removeEdgesFromNode( // build node label from entity type final String srcNodeLabel = urn.getEntityType(); - String matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); + String matchTemplate = + String.format( + "MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = String.format("MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); + matchTemplate = + String.format( + "MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); + matchTemplate = + String.format( + "MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); } String relationshipTypeFilter = ""; @@ -657,7 +669,8 @@ public void removeEdgesFromNode( if (!neo4jResult.isEmpty()) { String removeMode = neo4jResult.get(0).values().get(2).toString(); if (removeMode.equals("2")) { - final String matchDeleteTemplate = String.format("MATCH (src:%s {urn: $urn})-[r%s]-(dest) DELETE r", srcNodeLabel); + final String matchDeleteTemplate = + String.format("MATCH (src:%s {urn: $urn})-[r%s]-(dest) DELETE r", srcNodeLabel); relationshipTypeFilter = ""; if (!relationshipTypes.isEmpty()) { relationshipTypeFilter = From 389082b3f5f639398bab2998693c75d4203420fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CPasha?= <“shaikp@mediamarktsaturn.com”> Date: Fri, 24 May 2024 16:19:45 +0200 Subject: [PATCH 4/6] perf(neo4j): have a null check to fix the unit test for neo4j performance fix --- .../linkedin/metadata/graph/neo4j/Neo4jGraphService.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 16ff6df07e799..acd68d3f8ef53 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -499,7 +499,13 @@ public RelatedEntitiesResult findRelatedEntities( String srcNodeLabel = ""; // Create a URN from the String. Only proceed if srcCriteria is not null or empty - if (srcCriteria != null && !srcCriteria.isEmpty()) { + if (srcCriteria != null + && !srcCriteria.isEmpty() + && sourceEntityFilter != null + && sourceEntityFilter.getOr() != null + && !sourceEntityFilter.getOr().isEmpty() + && sourceEntityFilter.getOr().get(0).getAnd() != null + && !sourceEntityFilter.getOr().get(0).getAnd().isEmpty()) { final String urnValue = sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString(); try { From 69d9fa88500ff59ab1d19b91bbf710c56efb00b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CPasha?= <“shaikp@mediamarktsaturn.com”> Date: Fri, 24 May 2024 18:39:22 +0200 Subject: [PATCH 5/6] perf(neo4j): have a null check to fix the unit test for neo4j performance issues --- .../graph/neo4j/Neo4jGraphService.java | 82 +++++++++++++------ 1 file changed, 58 insertions(+), 24 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index acd68d3f8ef53..9350d8a5057c2 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -498,6 +498,7 @@ public RelatedEntitiesResult findRelatedEntities( final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); String srcNodeLabel = ""; + String baseStatementString = ""; // Create a URN from the String. Only proceed if srcCriteria is not null or empty if (srcCriteria != null && !srcCriteria.isEmpty() @@ -516,13 +517,6 @@ public RelatedEntitiesResult findRelatedEntities( } } - String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s"; - if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s"; - } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s"; - } - final String returnNodes = String.format( "RETURN dest, type(r)"); // Return both related entity and the relationship type. @@ -535,16 +529,40 @@ public RelatedEntitiesResult findRelatedEntities( String whereClause = computeEntityTypeWhereClause(sourceTypes, destinationTypes); - // Build Statement strings - String baseStatementString = - String.format( - matchTemplate, - srcNodeLabel, - srcCriteria, - relationshipTypeFilter, - edgeCriteria, - destCriteria, - whereClause); + if (srcNodeLabel != null && !srcNodeLabel.isEmpty()) { + String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s"; + if (relationshipDirection == RelationshipDirection.INCOMING) { + matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s"; + } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s"; + } + // Build Statement strings + baseStatementString = + String.format( + matchTemplate, + srcNodeLabel, + srcCriteria, + relationshipTypeFilter, + edgeCriteria, + destCriteria, + whereClause); + } else { + String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s"; + if (relationshipDirection == RelationshipDirection.INCOMING) { + matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s"; + } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s"; + } + // Build Statement strings + baseStatementString = + String.format( + matchTemplate, + srcCriteria, + relationshipTypeFilter, + edgeCriteria, + destCriteria, + whereClause); + } log.info(baseStatementString); @@ -648,18 +666,34 @@ public void removeEdgesFromNode( // build node label from entity type final String srcNodeLabel = urn.getEntityType(); + String matchTemplate = ""; - String matchTemplate = - String.format( - "MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); - if (relationshipDirection == RelationshipDirection.INCOMING) { + if (srcNodeLabel != null && !srcNodeLabel.isEmpty()) { matchTemplate = String.format( - "MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); - } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + "MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); + if (relationshipDirection == RelationshipDirection.INCOMING) { + matchTemplate = + String.format( + "MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); + } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + matchTemplate = + String.format( + "MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); + } + } else { matchTemplate = String.format( - "MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); + "MATCH (src {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); + if (relationshipDirection == RelationshipDirection.INCOMING) { + matchTemplate = + String.format( + "MATCH (src {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); + } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + matchTemplate = + String.format( + "MATCH (src {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); + } } String relationshipTypeFilter = ""; From 5a0c830460c6f14841c46396610c87e397799032 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E2=80=9CPasha?= <“shaikp@mediamarktsaturn.com”> Date: Wed, 29 May 2024 09:28:33 +0200 Subject: [PATCH 6/6] perf(neo4j): code changes to speedup Query performance of Ne04j using Nodelabels --- .../graph/neo4j/Neo4jGraphService.java | 185 +++++++----------- 1 file changed, 67 insertions(+), 118 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java index 9350d8a5057c2..70b30f27553c7 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphService.java @@ -105,46 +105,26 @@ public void addEdge(@Nonnull final Edge edge) { // or indirect pattern match String endUrn = destinationUrn; String startUrn = sourceUrn; - String endType = destinationType; - String startType = sourceType; // Extra relationship typename start with r_ for // direct-outgoing-downstream/indirect-incoming-upstream relationships String reverseRelationshipType = "r_" + edge.getRelationshipType(); + final String createOrFindSourceNode = + String.format("MERGE (source:%s {urn: '%s'})", sourceType, sourceUrn); + final String createOrFindDestinationNode = + String.format("MERGE (destination:%s {urn: '%s'})", destinationType, destinationUrn); + final String createSourceToDestinationRelationShip = + String.format("MERGE (source)-[:%s]->(destination)", edge.getRelationshipType()); + String createReverseRelationShip = + String.format("MERGE (source)-[r:%s]->(destination)", reverseRelationshipType); + if (isSourceDestReversed(sourceType, edge.getRelationshipType())) { endUrn = sourceUrn; - endType = sourceType; startUrn = destinationUrn; - startType = destinationType; + createReverseRelationShip = + String.format("MERGE (destination)-[r:%s]->(source)", reverseRelationshipType); } - final List statements = new ArrayList<>(); - - // Add/Update source & destination node first - statements.add(getOrInsertNode(edge.getSource())); - statements.add(getOrInsertNode(edge.getDestination())); - - // Add/Update relationship - final String mergeRelationshipTemplate = - "MATCH (source:%s {urn: '%s'}),(destination:%s {urn: '%s'}) MERGE (source)-[r:%s]->(destination) "; - String statement = - String.format( - mergeRelationshipTemplate, - sourceType, - sourceUrn, - destinationType, - destinationUrn, - edge.getRelationshipType()); - - String statementR = - String.format( - mergeRelationshipTemplate, - startType, - startUrn, - endType, - endUrn, - reverseRelationshipType); - // Add/Update relationship properties String setCreatedOnTemplate; String setcreatedActorTemplate; @@ -193,12 +173,22 @@ public void addEdge(@Nonnull final Edge edge) { final String setStartEndUrnTemplate = String.format("r.startUrn = '%s', r.endUrn = '%s'", startUrn, endUrn); propertiesTemplateJoiner.add(setStartEndUrnTemplate); + + StringBuilder finalStatement = new StringBuilder(); + finalStatement + .append(createOrFindSourceNode) + .append(" ") + .append(createOrFindDestinationNode) + .append(" ") + .append(createSourceToDestinationRelationShip) + .append(" ") + .append(createReverseRelationShip) + .append(" "); if (!StringUtils.isEmpty(propertiesTemplateJoiner.toString())) { - statementR = String.format("%s SET %s", statementR, propertiesTemplateJoiner); + finalStatement.append("SET ").append(propertiesTemplateJoiner); } - - statements.add(buildStatement(statement, new HashMap<>())); - statements.add(buildStatement(statementR, new HashMap<>())); + final List statements = new ArrayList<>(); + statements.add(buildStatement(finalStatement.toString(), new HashMap<>())); executeStatements(statements); } @@ -387,7 +377,6 @@ private Pair> generateLineageStatementAndParameters( graphFilters.getAllowedEntityTypes(), direction), "maxHops", maxHops)); - // Get the entity type from the URN final String entityType = entityUrn.getEntityType(); if (lineageFlags == null @@ -497,31 +486,27 @@ public RelatedEntitiesResult findRelatedEntities( final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - String srcNodeLabel = ""; - String baseStatementString = ""; + String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s"; + if (relationshipDirection == RelationshipDirection.INCOMING) { + matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s"; + } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s"; + } + + String srcNodeLabel = StringUtils.EMPTY; // Create a URN from the String. Only proceed if srcCriteria is not null or empty - if (srcCriteria != null - && !srcCriteria.isEmpty() - && sourceEntityFilter != null - && sourceEntityFilter.getOr() != null - && !sourceEntityFilter.getOr().isEmpty() - && sourceEntityFilter.getOr().get(0).getAnd() != null - && !sourceEntityFilter.getOr().get(0).getAnd().isEmpty()) { + if (StringUtils.isNotEmpty(srcCriteria)) { final String urnValue = sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString(); try { final Urn urn = Urn.createFromString(urnValue); srcNodeLabel = urn.getEntityType(); + matchTemplate = matchTemplate.replace("(src ", "(src:%s "); } catch (URISyntaxException e) { log.error("Failed to parse URN: {} ", urnValue, e); } } - final String returnNodes = - String.format( - "RETURN dest, type(r)"); // Return both related entity and the relationship type. - final String returnCount = "RETURN count(*)"; // For getting the total results. - String relationshipTypeFilter = ""; if (!relationshipTypes.isEmpty()) { relationshipTypeFilter = ":" + StringUtils.join(relationshipTypes, "|"); @@ -529,14 +514,10 @@ public RelatedEntitiesResult findRelatedEntities( String whereClause = computeEntityTypeWhereClause(sourceTypes, destinationTypes); - if (srcNodeLabel != null && !srcNodeLabel.isEmpty()) { - String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s"; - if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s"; - } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s"; - } - // Build Statement strings + // Build Statement strings + String baseStatementString; + + if (StringUtils.isNotEmpty(srcNodeLabel)) { baseStatementString = String.format( matchTemplate, @@ -547,13 +528,6 @@ public RelatedEntitiesResult findRelatedEntities( destCriteria, whereClause); } else { - String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s"; - if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s"; - } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s"; - } - // Build Statement strings baseStatementString = String.format( matchTemplate, @@ -563,9 +537,12 @@ public RelatedEntitiesResult findRelatedEntities( destCriteria, whereClause); } - log.info(baseStatementString); + final String returnNodes = + "RETURN dest, type(r)"; // Return both related entity and the relationship type. + final String returnCount = "RETURN count(*)"; // For getting the total results. + final String resultStatementString = String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes); final String countStatementString = String.format("%s %s", baseStatementString, returnCount); @@ -626,13 +603,11 @@ private String computeEntityTypeWhereClause( public void removeNode(@Nonnull final Urn urn) { log.debug(String.format("Removing Neo4j node with urn: %s", urn)); - final String srcNodeLabel = urn.getEntityType(); // also delete any relationship going to or from it - final String matchTemplate = - String.format("MATCH (node:%s {urn: $urn}) DETACH DELETE node", srcNodeLabel); - final String statement = String.format(matchTemplate); + final String matchTemplate = "MATCH (node:%s {urn: $urn}) DETACH DELETE node"; + final String statement = String.format(matchTemplate, srcNodeLabel); final Map params = new HashMap<>(); params.put("urn", urn.toString()); @@ -663,44 +638,20 @@ public void removeEdgesFromNode( // also delete any relationship going to or from it final RelationshipDirection relationshipDirection = relationshipFilter.getDirection(); - - // build node label from entity type final String srcNodeLabel = urn.getEntityType(); - String matchTemplate = ""; - if (srcNodeLabel != null && !srcNodeLabel.isEmpty()) { - matchTemplate = - String.format( - "MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); - if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = - String.format( - "MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); - } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = - String.format( - "MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); - } - } else { - matchTemplate = - String.format( - "MATCH (src {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2", srcNodeLabel); - if (relationshipDirection == RelationshipDirection.INCOMING) { - matchTemplate = - String.format( - "MATCH (src {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0", srcNodeLabel); - } else if (relationshipDirection == RelationshipDirection.OUTGOING) { - matchTemplate = - String.format( - "MATCH (src {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1", srcNodeLabel); - } + String matchTemplate = "MATCH (src:%s {urn: $urn})-[r%s]-(dest) RETURN type(r), dest, 2"; + if (relationshipDirection == RelationshipDirection.INCOMING) { + matchTemplate = "MATCH (src:%s {urn: $urn})<-[r%s]-(dest) RETURN type(r), dest, 0"; + } else if (relationshipDirection == RelationshipDirection.OUTGOING) { + matchTemplate = "MATCH (src:%s {urn: $urn})-[r%s]->(dest) RETURN type(r), dest, 1"; } String relationshipTypeFilter = ""; if (!relationshipTypes.isEmpty()) { relationshipTypeFilter = ":" + StringUtils.join(relationshipTypes, "|"); } - final String statement = String.format(matchTemplate, relationshipTypeFilter); + final String statement = String.format(matchTemplate, srcNodeLabel, relationshipTypeFilter); final Map params = new HashMap<>(); params.put("urn", urn.toString()); @@ -709,8 +660,7 @@ public void removeEdgesFromNode( if (!neo4jResult.isEmpty()) { String removeMode = neo4jResult.get(0).values().get(2).toString(); if (removeMode.equals("2")) { - final String matchDeleteTemplate = - String.format("MATCH (src:%s {urn: $urn})-[r%s]-(dest) DELETE r", srcNodeLabel); + final String matchDeleteTemplate = "MATCH (src:%s {urn: $urn})-[r%s]-(dest) DELETE r"; relationshipTypeFilter = ""; if (!relationshipTypes.isEmpty()) { relationshipTypeFilter = @@ -720,7 +670,7 @@ public void removeEdgesFromNode( + StringUtils.join(relationshipTypes, "|r_"); } final String statementNoDirection = - String.format(matchDeleteTemplate, relationshipTypeFilter); + String.format(matchDeleteTemplate, srcNodeLabel, relationshipTypeFilter); runQuery(buildStatement(statementNoDirection, params)).consume(); } else { for (Record typeDest : neo4jResult) { @@ -794,33 +744,32 @@ private static final class ExecutionResult { * * @param statements List of statements with parameters to be executed in order */ - private synchronized ExecutionResult executeStatements(@Nonnull List statements) { - int retry = 0; + private ExecutionResult executeStatements(@Nonnull List statements) { final StopWatch stopWatch = new StopWatch(); stopWatch.start(); - Exception lastException; + int retry = 0; try (final Session session = _driver.session(_sessionConfig)) { - do { + for (retry = 0; retry <= MAX_TRANSACTION_RETRY; retry++) { try { - session.writeTransaction( + session.executeWrite( tx -> { for (Statement statement : statements) { tx.run(statement.getCommandText(), statement.getParams()); } - return 0; + return null; }); - lastException = null; break; } catch (Neo4jException e) { - lastException = e; + log.warn("Failed to execute Neo4j write transaction. Retry count: {}", retry, e); + if (retry == MAX_TRANSACTION_RETRY) { + throw new RetryLimitReached( + "Failed to execute Neo4j write transaction after " + + MAX_TRANSACTION_RETRY + + " retries", + e); + } } - } while (++retry <= MAX_TRANSACTION_RETRY); - } - - if (lastException != null) { - throw new RetryLimitReached( - "Failed to execute Neo4j write transaction after " + MAX_TRANSACTION_RETRY + " retries", - lastException); + } } stopWatch.stop();