Skip to content

Commit

Permalink
Adapt to changes in SPIP PR
Browse files Browse the repository at this point in the history
  • Loading branch information
s1ck committed Jun 28, 2019
1 parent 432389c commit 038a8ce
Showing 1 changed file with 4 additions and 7 deletions.
Expand Up @@ -18,7 +18,7 @@

package org.apache.spark.graph.api

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession, functions}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object CypherSession {
val ID_COLUMN = "$ID"
Expand Down Expand Up @@ -91,14 +91,11 @@ trait CypherSession {
val labelColumns = nodes.columns.filter(_.startsWith(":")).toSet
val nodeProperties = (nodes.columns.toSet - idColumn -- labelColumns).map(col => col -> col).toMap

val trueLit = functions.lit(true)
val falseLit = functions.lit(false)

val labelSets = labelColumns.subsets().toSet + Set.empty
val nodeFrames = labelSets.map { labelSet =>
val predicate = labelColumns.map {
case labelColumn if labelSet.contains(labelColumn) => nodes.col(labelColumn) === trueLit
case labelColumn => nodes.col(labelColumn) === falseLit
case labelColumn if labelSet.contains(labelColumn) => nodes.col(labelColumn)
case labelColumn => !nodes.col(labelColumn)
}.reduce(_ && _)

NodeFrame(nodes.filter(predicate), idColumn, labelSet.map(_.substring(1)), nodeProperties)
Expand All @@ -107,7 +104,7 @@ trait CypherSession {
val relTypeColumns = relationships.columns.filter(_.startsWith(":")).toSet
val relProperties = (relationships.columns.toSet - idColumn - sourceIdColumn - targetIdColumn -- relTypeColumns).map(col => col -> col).toMap
val relFrames = relTypeColumns.map { relTypeColumn =>
val predicate = relationships.col(relTypeColumn) === trueLit
val predicate = relationships.col(relTypeColumn)

RelationshipFrame(relationships.filter(predicate), idColumn, sourceIdColumn, targetIdColumn, relTypeColumn.substring(1), relProperties)
}
Expand Down

0 comments on commit 038a8ce

Please sign in to comment.