From e141f53902244d1911e8521557a251bc178f4c48 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 10:17:26 +0200 Subject: [PATCH 1/7] updated test docker images --- .github/workflows/test.yml | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e0c3d926..938ce38b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,9 +47,8 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.9.10 - - docker.io/arangodb/arangodb:3.10.6 - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.10.10 + - docker.io/arangodb/arangodb:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -65,11 +64,9 @@ jobs: spark-version: 2.4 - scala-version: 2.13 spark-version: 3.1 - - docker-img: docker.io/arangodb/arangodb:3.9.10 + - docker-img: docker.io/arangodb/arangodb:3.10.10 java-version: 8 - - docker-img: docker.io/arangodb/arangodb:3.10.6 - java-version: 8 - - docker-img: docker.io/arangodb/arangodb:3.11.0 + - docker-img: docker.io/arangodb/arangodb:3.11.3 java-version: 11 steps: @@ -111,7 +108,7 @@ jobs: java-version: - 8 docker-img: - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -157,7 +154,7 @@ jobs: spark-version: [3.1, 3.2, 3.3, 3.4] topology: [single, cluster] java-version: [8, 11] - docker-img: ["docker.io/arangodb/arangodb:3.11.0"] + docker-img: ["docker.io/arangodb/arangodb:3.11.3"] exclude: - topology: cluster java-version: 8 @@ -218,7 +215,7 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -344,7 +341,7 @@ jobs: run: ./docker/start_db.sh env: STARTER_MODE: cluster - DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.0 + DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3 - name: Info run: mvn -version - name: Install @@ -368,7 +365,7 @@ jobs: java-version: - 11 docker-img: - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.11.3 steps: - uses: actions/checkout@v2 From 56712a95dab2a518581395370a9294b0832b22c5 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 10:20:50 +0200 Subject: [PATCH 2/7] deps upd --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 4ad60f91..5e1a7bab 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,7 @@ spark-3.3 - 3.3.2 + 3.3.3 3.3 4.1.0 @@ -120,7 +120,7 @@ spark-3.4 - 3.4.0 + 3.4.1 3.4 4.1.0 From 85f193b261163912c23d8a163d93fb5123fbc190 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 13:40:59 +0200 Subject: [PATCH 3/7] 3.10: read from smart edge collection --- .../sql/arangodb/commons/ArangoClient.scala | 13 ++- .../arangodb/datasource/BaseSparkTest.scala | 12 ++- .../ReadSmartEdgeCollectionTest.scala | 80 +++++++++++++++++++ 3 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index e92a2e9b..d539c13d 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -206,22 +206,29 @@ object ArangoClient extends Logging { val client = ArangoClient(options) val adb = client.arangoDB try { + val colName = options.readOptions.collection.get val res = adb.execute(new Request.Builder[Void]() .db(options.readOptions.db) .method(Request.Method.GET) - .path(s"/_api/collection/${options.readOptions.collection.get}/shards") + .path(s"/_api/collection/$colName/shards") .build(), classOf[RawBytes]) val shardIds: Array[String] = adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]]) client.shutdown() - shardIds + if (shardIds.isEmpty) { + // Smart Edge collections return empty shardIds (BTS-1596) + logWarning(s"Got empty shardIds for collection '$colName', reading will not be parallelized.") + Array(null) + } else { + shardIds + } } catch { case e: ArangoDBException => client.shutdown() // single server < 3.8 returns Response: 500, Error: 4 - internal error // single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) { - Array("") + Array(null) } else { throw e } diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala index 8df24f33..359b0a57 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala @@ -1,6 +1,6 @@ package org.apache.spark.sql.arangodb.datasource -import com.arangodb.entity.ServerRole +import com.arangodb.entity.{License, ServerRole} import com.arangodb.model.CollectionCreateOptions import com.arangodb.serde.jackson.JacksonSerde import com.arangodb.spark.DefaultSource @@ -33,7 +33,9 @@ class BaseSparkTest { def isSingle: Boolean = BaseSparkTest.isSingle - def isCluster: Boolean = !BaseSparkTest.isSingle + def isCluster: Boolean = BaseSparkTest.isCluster + + def isEnterprise: Boolean = BaseSparkTest.isEnterprise } object BaseSparkTest { @@ -78,8 +80,10 @@ object BaseSparkTest { .serde(serde) .build() } - private val db: ArangoDatabase = arangoDB.db(database) - private val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE + val db: ArangoDatabase = arangoDB.db(database) + val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE + val isCluster: Boolean = !isSingle + val isEnterprise: Boolean = arangoDB.getVersion.getLicense == License.ENTERPRISE private val options = Map( "database" -> database, "user" -> user, diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala new file mode 100644 index 00000000..d44abe8e --- /dev/null +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala @@ -0,0 +1,80 @@ +package org.apache.spark.sql.arangodb.datasource + +import com.arangodb.entity.EdgeDefinition +import com.arangodb.model.GraphCreateOptions +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.arangodb.commons.ArangoDBConf +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util +import scala.collection.JavaConverters.asJavaIterableConverter +import scala.collection.immutable +import scala.jdk.CollectionConverters.asJavaCollectionConverter + +class ReadSmartEdgeCollectionTest extends BaseSparkTest { + + @ParameterizedTest + @MethodSource(Array("provideProtocolAndContentType")) + def readSmartEdgeCollection(protocol: String, contentType: String): Unit = { + val df: DataFrame = spark.read + .format(BaseSparkTest.arangoDatasource) + .options(options + ( + ArangoDBConf.COLLECTION -> ReadSmartEdgeCollectionTest.name, + ArangoDBConf.PROTOCOL -> protocol, + ArangoDBConf.CONTENT_TYPE -> contentType + )) + .load() + + + import spark.implicits._ + val read = df + .as[Edge] + .collect() + + assertThat(read.map(_.name)).containsAll(ReadSmartEdgeCollectionTest.data.map(d => d("name")).asJava) + } + +} + +object ReadSmartEdgeCollectionTest { + val name = "smartEdgeCol" + val from = s"from-$name" + val to = s"from-$name" + + val data: immutable.Seq[Map[String, String]] = (1 to 10) + .map(x => Map( + "name" -> s"name-$x", + "_from" -> s"$from/a:$x", + "_to" -> s"$to/b:$x" + )) + + @BeforeAll + def init(): Unit = { + assumeTrue(BaseSparkTest.isCluster && BaseSparkTest.isEnterprise) + + if (BaseSparkTest.db.graph(name).exists()) { + BaseSparkTest.db.graph(name).drop(true) + } + + val ed = new EdgeDefinition() + .collection(name) + .from(from) + .to(to) + val opts = new GraphCreateOptions() + .numberOfShards(2) + .isSmart(true) + .smartGraphAttribute("name") + BaseSparkTest.db.createGraph(name, List(ed).asJavaCollection, opts) + BaseSparkTest.db.collection(name).insertDocuments(data.asJava.asInstanceOf[util.Collection[Any]]) + } +} + +case class Edge( + name: String, + _from: String, + _to: String + ) From e6b151cde0d2a0b50b5cace53c4dbeaa5787c4cf Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 13:58:15 +0200 Subject: [PATCH 4/7] fixed tests for Scala 2.13 --- .../sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala index d44abe8e..739738be 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala @@ -13,7 +13,6 @@ import org.junit.jupiter.params.provider.MethodSource import java.util import scala.collection.JavaConverters.asJavaIterableConverter import scala.collection.immutable -import scala.jdk.CollectionConverters.asJavaCollectionConverter class ReadSmartEdgeCollectionTest extends BaseSparkTest { @@ -68,7 +67,7 @@ object ReadSmartEdgeCollectionTest { .numberOfShards(2) .isSmart(true) .smartGraphAttribute("name") - BaseSparkTest.db.createGraph(name, List(ed).asJavaCollection, opts) + BaseSparkTest.db.createGraph(name, List(ed).asJava.asInstanceOf[util.Collection[EdgeDefinition]], opts) BaseSparkTest.db.collection(name).insertDocuments(data.asJava.asInstanceOf[util.Collection[Any]]) } } From 2b6e7820e71a843cab120ce524b48654e7e5a750 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 14:01:12 +0200 Subject: [PATCH 5/7] CI: added enterprise tests --- .github/workflows/test.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 938ce38b..e445f427 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -48,6 +48,7 @@ jobs: - 11 docker-img: - docker.io/arangodb/arangodb:3.10.10 + - docker.io/arangodb/enterprise:3.10.10 - docker.io/arangodb/arangodb:3.11.3 exclude: - scala-version: 2.11 @@ -66,6 +67,9 @@ jobs: spark-version: 3.1 - docker-img: docker.io/arangodb/arangodb:3.10.10 java-version: 8 + - docker-img: docker.io/arangodb/enterprise:3.10.10 + java-version: 8 + topology: single - docker-img: docker.io/arangodb/arangodb:3.11.3 java-version: 11 From eff189da83b5da9c8778036d11803cbd4822219d Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 14:53:02 +0200 Subject: [PATCH 6/7] 3.11: read from smart edge collection --- .github/workflows/test.yml | 6 ++++ .../sql/arangodb/commons/ArangoClient.scala | 35 +++++++++++++------ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e445f427..81853702 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -50,6 +50,7 @@ jobs: - docker.io/arangodb/arangodb:3.10.10 - docker.io/arangodb/enterprise:3.10.10 - docker.io/arangodb/arangodb:3.11.3 + - docker.io/arangodb/enterprise:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -69,9 +70,14 @@ jobs: java-version: 8 - docker-img: docker.io/arangodb/enterprise:3.10.10 java-version: 8 + - docker-img: docker.io/arangodb/enterprise:3.10.10 topology: single - docker-img: docker.io/arangodb/arangodb:3.11.3 java-version: 11 + - docker-img: docker.io/arangodb/enterprise:3.11.3 + java-version: 8 + - docker-img: docker.io/arangodb/enterprise:3.11.3 + topology: single steps: - uses: actions/checkout@v2 diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index d539c13d..8f65ed1a 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -14,6 +14,7 @@ import java.util.UUID import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.jdk.CollectionConverters.MapHasAsScala @SuppressWarnings(Array("OptionGet")) class ArangoClient(options: ArangoDBConf) extends Logging { @@ -207,21 +208,23 @@ object ArangoClient extends Logging { val adb = client.arangoDB try { val colName = options.readOptions.collection.get - val res = adb.execute(new Request.Builder[Void]() + val props = adb.execute(new Request.Builder[Void]() .db(options.readOptions.db) .method(Request.Method.GET) - .path(s"/_api/collection/$colName/shards") + .path(s"/_api/collection/$colName/properties") .build(), - classOf[RawBytes]) - val shardIds: Array[String] = adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]]) + classOf[java.util.Map[String, Any]]).getBody.asScala + + val shardIds: Array[String] = + if (props("isSmart") == true && props("type") == 3) { + // Smart Edge collection (BTS-1595, BTS-1596) + requestShards(adb, options.readOptions.db, s"_local_$colName") ++ + requestShards(adb, options.readOptions.db, s"_from_$colName") + } else { + requestShards(adb, options.readOptions.db, colName) + } client.shutdown() - if (shardIds.isEmpty) { - // Smart Edge collections return empty shardIds (BTS-1596) - logWarning(s"Got empty shardIds for collection '$colName', reading will not be parallelized.") - Array(null) - } else { - shardIds - } + shardIds } catch { case e: ArangoDBException => client.shutdown() @@ -235,6 +238,16 @@ object ArangoClient extends Logging { } } + private def requestShards(adb: ArangoDB, db: String, col: String): Array[String] = { + val res = adb.execute(new Request.Builder[Void]() + .db(db) + .method(Request.Method.GET) + .path(s"/_api/collection/$col/shards") + .build(), + classOf[RawBytes]) + adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]]) + } + def acquireHostList(options: ArangoDBConf): Iterable[String] = { logDebug("acquiring host list") val client = ArangoClient(options) From a8cdb4cc14336339e113f33cb093a797e016a510 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 22 Sep 2023 15:05:11 +0200 Subject: [PATCH 7/7] fixed compilation for Scala 2.12 --- .../org/apache/spark/sql/arangodb/commons/ArangoClient.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index 8f65ed1a..c0908551 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -14,7 +14,6 @@ import java.util.UUID import java.util.concurrent.TimeoutException import scala.annotation.tailrec import scala.collection.JavaConverters.mapAsJavaMapConverter -import scala.jdk.CollectionConverters.MapHasAsScala @SuppressWarnings(Array("OptionGet")) class ArangoClient(options: ArangoDBConf) extends Logging { @@ -213,10 +212,10 @@ object ArangoClient extends Logging { .method(Request.Method.GET) .path(s"/_api/collection/$colName/properties") .build(), - classOf[java.util.Map[String, Any]]).getBody.asScala + classOf[java.util.Map[String, Any]]).getBody val shardIds: Array[String] = - if (props("isSmart") == true && props("type") == 3) { + if (props.get("isSmart") == true && props.get("type") == 3) { // Smart Edge collection (BTS-1595, BTS-1596) requestShards(adb, options.readOptions.db, s"_local_$colName") ++ requestShards(adb, options.readOptions.db, s"_from_$colName")