Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect and partition sparse region of UIDs #224

Merged
merged 21 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 53 additions & 6 deletions .github/actions/test-integrate/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ runs:
uses: actions/cache@v3
with:
path: ~/.m2/repository
key: ${{ runner.os }}-mvn-integrate-${{ inputs.spark-version }}-${{ inputs.scala-compat-version }}-${{ hashFiles('**/pom.xml') }}
key: ${{ runner.os }}-mvn-integrate-${{ inputs.spark-version }}-${{ inputs.scala-compat-version }}-${{ hashFiles('pom.xml') }}
restore-keys: |
${{ runner.os }}-mvn-integrate-${{ inputs.spark-version }}-${{ inputs.scala-compat-version }}-
${{ runner.os }}-mvn-build-${{ inputs.spark-version }}-${{ inputs.scala-compat-version }}-
Expand Down Expand Up @@ -85,8 +85,8 @@ runs:
mvn --batch-mode dependency:get -DgroupId=org.slf4j -DartifactId=slf4j-api -Dversion=1.7.16
shell: bash

- name: Start Dgraph cluster
id: dgraph
- name: Start Dgraph cluster (Small)
id: dgraph-small
env:
DGRAPH_TEST_CLUSTER_VERSION: ${{ inputs.dgraph-version }}
run: |
Expand All @@ -101,15 +101,62 @@ runs:
/tmp/dgraph-instance.insert.sh
shell: bash

- name: Integration Tests
- name: Integration Test (Example)
env:
SPARK_LOCAL_IP: 127.0.0.1
SPARK_HOME: ${{ steps.params.outputs.home }}/spark
ARTIFACT_ID: ${{ steps.params.outputs.artifact-id }}
VERSION: ${{ steps.params.outputs.version }}
run: |
${SPARK_HOME}/bin/spark-submit --packages uk.co.gresearch.spark:${ARTIFACT_ID}:${VERSION},graphframes:graphframes:${{ steps.params.outputs.graphframes-version }}-s_${{ inputs.scala-compat-version }},org.scalactic:scalactic_${{ inputs.scala-compat-version }}:3.2.15 --class uk.co.gresearch.spark.dgraph.connector.example.ExampleApp examples/scala/target/spark-dgraph-connector-examples_*.jar
shell: bash

- name: Stop Dgraph cluster
if: inputs.dgraph-version != '21.12.0'
run: docker stop ${{ steps.dgraph-small.outputs.docker }}
shell: bash

- name: Start Dgraph cluster (Large)
id: dgraph
if: inputs.dgraph-version != '21.12.0'
run: |
if [[ "${{ inputs.dgraph-version }}" < "21.03.0" ]]
then
cache=""
whitelist="--whitelist=0.0.0.0/0"
maxUID="maxLeaseId"
else
cache="--cache size-mb=2048"
whitelist="--security whitelist=0.0.0.0/0"
maxUID="maxUID"
fi

mkdir dgraph
curl -L -o dgraph/1million.rdf.gz "https://github.com/dgraph-io/tutorial/blob/master/resources/1million.rdf.gz?raw=true"
docker run --rm -p 5080:5080 -p 6080:6080 -p 8080:8080 -p 9080:9080 -v $(pwd)/dgraph:/dgraph --name dgraph dgraph/dgraph:v${{ inputs.dgraph-version }} dgraph zero &
sleep 2
docker exec dgraph dgraph alpha $cache --zero localhost:5080 $whitelist &

for attempt in {1..10}
do
sleep 10
echo "attempt $attempt"
if curl --data-binary @dgraph-instance.schema.live-loader.dql -H "Content-Type: text/plain;charset=UTF-8" http://localhost:8080/alter; then break; fi
if [ $attempt -eq 10 ]; then exit 1; fi
done

docker exec dgraph dgraph live -f 1million.rdf.gz --alpha localhost:9080 --zero localhost:5080 -c 1
shell: bash

- name: Integration Test (Sparse)
if: inputs.dgraph-version != '21.12.0'
env:
SPARK_LOCAL_IP: 127.0.0.1
SPARK_HOME: ${{ steps.params.outputs.home }}/spark
ARTIFACT_ID: ${{ steps.params.outputs.artifact-id }}
VERSION: ${{ steps.params.outputs.version }}
# There is only a Spark 3.0 release of graphframes
run: |
${SPARK_HOME}/bin/spark-submit --packages uk.co.gresearch.spark:${ARTIFACT_ID}:${VERSION},graphframes:graphframes:${{ steps.params.outputs.graphframes-version }}-s_${{ inputs.scala-compat-version }} --class uk.co.gresearch.spark.dgraph.connector.example.ExampleApp examples/scala/target/spark-dgraph-connector-examples_*.jar
${SPARK_HOME}/bin/spark-submit --packages uk.co.gresearch.spark:${ARTIFACT_ID}:${VERSION},org.scalactic:scalactic_2.12:3.2.15 --class uk.co.gresearch.spark.dgraph.connector.example.SparseApp examples/scala/target/spark-dgraph-connector-examples_*.jar
shell: bash

branding:
Expand Down
40 changes: 40 additions & 0 deletions dgraph-instance.schema.live-loader.dql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Define Directives and index

director.film: [uid] @reverse .
actor.film: [uid] @count .
genre: [uid] @reverse .
initial_release_date: dateTime @index(year) .
name: string @index(exact, term) @lang .
starring: [uid] .
performance.film: [uid] .
performance.character_note: string .
performance.character: [uid] .
performance.actor: [uid] .
performance.special_performance_type: [uid] .
type: [uid] .

# Define Types

type Person {
name
director.film
actor.film
}

type Movie {
name
initial_release_date
genre
starring
}

type Genre {
name
}

type Performance {
performance.film
performance.character
performance.actor
}

2 changes: 1 addition & 1 deletion examples/scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ Run these examples as follows:

(cd ../../; mvn install)
mvn package
${SPARK_HOME}/bin/spark-submit --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.5.0-3.0-SNAPSHOT,graphframes:graphframes:0.8.0-spark3.0-s_2.12 --class uk.co.gresearch.spark.dgraph.connector.example.ExampleApp examples/scala/target/spark-dgraph-connector-examples_2.12-*.jar
${SPARK_HOME}/bin/spark-submit --packages uk.co.gresearch.spark:spark-dgraph-connector_2.12:0.5.0-3.0-SNAPSHOT,graphframes:graphframes:0.8.0-spark3.0-s_2.12,org.scalactic:scalactic_2.12:3.2.15 --class uk.co.gresearch.spark.dgraph.connector.example.ExampleApp examples/scala/target/spark-dgraph-connector-examples_2.12-*.jar
10 changes: 8 additions & 2 deletions examples/scala/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<!-- spark.compat.version must be reflected in project version -->
<spark.compat.version>3.5</spark.compat.version>
<spark.version>${spark.compat.version}.0</spark.version>
<graphframes.version>0.8.3-spark3.5</graphframes.version>
</properties>

<!-- required to resolve GraphFrames dependency -->
Expand All @@ -33,6 +34,12 @@
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.scalactic</groupId>
<artifactId>scalactic_${scala.compat.version}</artifactId>
<version>3.2.15</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
Expand Down Expand Up @@ -63,8 +70,7 @@
<dependency>
<groupId>graphframes</groupId>
<artifactId>graphframes</artifactId>
<!-- no Spark 3.3 release for graphframes yet -->
<version>0.8.1-spark3.0-s_${scala.compat.version}</version>
<version>${graphframes.version}-s_${scala.compat.version}</version>
</dependency>
</dependencies>

Expand Down
43 changes: 43 additions & 0 deletions examples/scala/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

# Set Spark Dgraph Connector logging to INFO
log4j.logger.uk.co.gresearch.spark.dgraph=INFO
72 changes: 72 additions & 0 deletions examples/scala/src/main/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#
# Copyright 2020 G-Research
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = warn
rootLogger.appenderRef.stdout.ref = console

# In the pattern layout configuration below, we specify an explicit `%ex` conversion
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
# class packaging information. That extra information can sometimes add a substantial
# performance overhead, so we disable it in our default logging config.
# For more information, see SPARK-39361.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = warn

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = warn

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = warn
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = info
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = info
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral

# Set G-Research Spark logging to INFO
logger.GRSpark.name = uk.co.gresearch.spark
logger.GRSpark.level = info
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package uk.co.gresearch.spark.dgraph.connector.example
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.scalactic.TripleEquals
import org.graphframes.GraphFrame
import uk.co.gresearch.spark.dgraph.connector.{IncludeReservedPredicatesOption, TypedNode}
import uk.co.gresearch.spark.dgraph.graphx.{EdgeProperty, VertexProperty}
Expand All @@ -29,6 +30,7 @@ object ExampleApp {
property.property == "dgraph.type" && Option(property.value).exists(_.toString.startsWith("dgraph."))

def main(args: Array[String]): Unit = {
import TripleEquals._

val spark: SparkSession = {
SparkSession
Expand Down Expand Up @@ -65,10 +67,10 @@ object ExampleApp {
val dgraphVertexIds = reader.dgraph.vertices(target).filter(v => dgraphVertex(v._2)).map(_._1).collect().toSet
val edges: RDD[Edge[EdgeProperty]] = removeDgraphEdges(reader.dgraph.edges(target), dgraphVertexIds)

assert(graph.edges.count() == 12, graph.edges.count())
assert(graph.vertices.count() == 10, graph.vertices.count())
assert(edges.count() == 12, edges.count())
assert(vertices.count() == 49, vertices.count())
assert(graph.edges.count() === 12)
assert(graph.vertices.count() === 10)
assert(edges.count() === 12)
assert(vertices.count() === 49)
}

{
Expand All @@ -95,9 +97,9 @@ object ExampleApp {
val edges: DataFrame = removeDgraphEdges(reader.dgraph.edges(target), dgraphNodes)

val triangles = graph.triangleCount.run().select($"id", $"count").orderBy($"id").as[(Long, Long)].collect().toSeq
assert(triangles.length == 10, triangles)
assert(edges.count() == 12, edges.count())
assert(vertices.count() == 10, vertices.count())
assert(triangles.length === 10)
assert(edges.count() === 12)
assert(vertices.count() === 10)
}

{
Expand All @@ -120,9 +122,9 @@ object ExampleApp {
val edges: DataFrame = reader.dgraph.edges(target)
val nodes: DataFrame = removeDgraphTypedNodes(reader.dgraph.nodes(target).as[TypedNode]).toDF()

assert(triples.count() == 61, triples.count())
assert(edges.count() == 12, edges.count())
assert(nodes.count() == 49, nodes.count())
assert(triples.count() === 61)
assert(edges.count() === 12)
assert(nodes.count() === 49)
}

}
Expand Down
Loading
Loading