diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e0c3d926..81853702 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -47,9 +47,10 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.9.10 - - docker.io/arangodb/arangodb:3.10.6 - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.10.10 + - docker.io/arangodb/enterprise:3.10.10 + - docker.io/arangodb/arangodb:3.11.3 + - docker.io/arangodb/enterprise:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -65,12 +66,18 @@ jobs: spark-version: 2.4 - scala-version: 2.13 spark-version: 3.1 - - docker-img: docker.io/arangodb/arangodb:3.9.10 + - docker-img: docker.io/arangodb/arangodb:3.10.10 java-version: 8 - - docker-img: docker.io/arangodb/arangodb:3.10.6 + - docker-img: docker.io/arangodb/enterprise:3.10.10 java-version: 8 - - docker-img: docker.io/arangodb/arangodb:3.11.0 + - docker-img: docker.io/arangodb/enterprise:3.10.10 + topology: single + - docker-img: docker.io/arangodb/arangodb:3.11.3 java-version: 11 + - docker-img: docker.io/arangodb/enterprise:3.11.3 + java-version: 8 + - docker-img: docker.io/arangodb/enterprise:3.11.3 + topology: single steps: - uses: actions/checkout@v2 @@ -111,7 +118,7 @@ jobs: java-version: - 8 docker-img: - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -157,7 +164,7 @@ jobs: spark-version: [3.1, 3.2, 3.3, 3.4] topology: [single, cluster] java-version: [8, 11] - docker-img: ["docker.io/arangodb/arangodb:3.11.0"] + docker-img: ["docker.io/arangodb/arangodb:3.11.3"] exclude: - topology: cluster java-version: 8 @@ -218,7 +225,7 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.11.3 exclude: - scala-version: 2.11 spark-version: 3.1 @@ -344,7 +351,7 @@ jobs: run: ./docker/start_db.sh env: STARTER_MODE: cluster - DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.0 + DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3 - name: Info run: mvn -version - name: Install @@ -368,7 +375,7 @@ jobs: java-version: - 11 docker-img: - - docker.io/arangodb/arangodb:3.11.0 + - docker.io/arangodb/arangodb:3.11.3 steps: - uses: actions/checkout@v2 diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index e92a2e9b..c0908551 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -206,13 +206,22 @@ object ArangoClient extends Logging { val client = ArangoClient(options) val adb = client.arangoDB try { - val res = adb.execute(new Request.Builder[Void]() + val colName = options.readOptions.collection.get + val props = adb.execute(new Request.Builder[Void]() .db(options.readOptions.db) .method(Request.Method.GET) - .path(s"/_api/collection/${options.readOptions.collection.get}/shards") + .path(s"/_api/collection/$colName/properties") .build(), - classOf[RawBytes]) - val shardIds: Array[String] = adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]]) + classOf[java.util.Map[String, Any]]).getBody + + val shardIds: Array[String] = + if (props.get("isSmart") == true && props.get("type") == 3) { + // Smart Edge collection (BTS-1595, BTS-1596) + requestShards(adb, options.readOptions.db, s"_local_$colName") ++ + requestShards(adb, options.readOptions.db, s"_from_$colName") + } else { + requestShards(adb, options.readOptions.db, colName) + } client.shutdown() shardIds } catch { @@ -221,13 +230,23 @@ object ArangoClient extends Logging { // single server < 3.8 returns Response: 500, Error: 4 - internal error // single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) { - Array("") + Array(null) } else { throw e } } } + private def requestShards(adb: ArangoDB, db: String, col: String): Array[String] = { + val res = adb.execute(new Request.Builder[Void]() + .db(db) + .method(Request.Method.GET) + .path(s"/_api/collection/$col/shards") + .build(), + classOf[RawBytes]) + adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]]) + } + def acquireHostList(options: ArangoDBConf): Iterable[String] = { logDebug("acquiring host list") val client = ArangoClient(options) diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala index 8df24f33..359b0a57 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala @@ -1,6 +1,6 @@ package org.apache.spark.sql.arangodb.datasource -import com.arangodb.entity.ServerRole +import com.arangodb.entity.{License, ServerRole} import com.arangodb.model.CollectionCreateOptions import com.arangodb.serde.jackson.JacksonSerde import com.arangodb.spark.DefaultSource @@ -33,7 +33,9 @@ class BaseSparkTest { def isSingle: Boolean = BaseSparkTest.isSingle - def isCluster: Boolean = !BaseSparkTest.isSingle + def isCluster: Boolean = BaseSparkTest.isCluster + + def isEnterprise: Boolean = BaseSparkTest.isEnterprise } object BaseSparkTest { @@ -78,8 +80,10 @@ object BaseSparkTest { .serde(serde) .build() } - private val db: ArangoDatabase = arangoDB.db(database) - private val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE + val db: ArangoDatabase = arangoDB.db(database) + val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE + val isCluster: Boolean = !isSingle + val isEnterprise: Boolean = arangoDB.getVersion.getLicense == License.ENTERPRISE private val options = Map( "database" -> database, "user" -> user, diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala new file mode 100644 index 00000000..739738be --- /dev/null +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadSmartEdgeCollectionTest.scala @@ -0,0 +1,79 @@ +package org.apache.spark.sql.arangodb.datasource + +import com.arangodb.entity.EdgeDefinition +import com.arangodb.model.GraphCreateOptions +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.arangodb.commons.ArangoDBConf +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.Assumptions.assumeTrue +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + +import java.util +import scala.collection.JavaConverters.asJavaIterableConverter +import scala.collection.immutable + +class ReadSmartEdgeCollectionTest extends BaseSparkTest { + + @ParameterizedTest + @MethodSource(Array("provideProtocolAndContentType")) + def readSmartEdgeCollection(protocol: String, contentType: String): Unit = { + val df: DataFrame = spark.read + .format(BaseSparkTest.arangoDatasource) + .options(options + ( + ArangoDBConf.COLLECTION -> ReadSmartEdgeCollectionTest.name, + ArangoDBConf.PROTOCOL -> protocol, + ArangoDBConf.CONTENT_TYPE -> contentType + )) + .load() + + + import spark.implicits._ + val read = df + .as[Edge] + .collect() + + assertThat(read.map(_.name)).containsAll(ReadSmartEdgeCollectionTest.data.map(d => d("name")).asJava) + } + +} + +object ReadSmartEdgeCollectionTest { + val name = "smartEdgeCol" + val from = s"from-$name" + val to = s"from-$name" + + val data: immutable.Seq[Map[String, String]] = (1 to 10) + .map(x => Map( + "name" -> s"name-$x", + "_from" -> s"$from/a:$x", + "_to" -> s"$to/b:$x" + )) + + @BeforeAll + def init(): Unit = { + assumeTrue(BaseSparkTest.isCluster && BaseSparkTest.isEnterprise) + + if (BaseSparkTest.db.graph(name).exists()) { + BaseSparkTest.db.graph(name).drop(true) + } + + val ed = new EdgeDefinition() + .collection(name) + .from(from) + .to(to) + val opts = new GraphCreateOptions() + .numberOfShards(2) + .isSmart(true) + .smartGraphAttribute("name") + BaseSparkTest.db.createGraph(name, List(ed).asJava.asInstanceOf[util.Collection[EdgeDefinition]], opts) + BaseSparkTest.db.collection(name).insertDocuments(data.asJava.asInstanceOf[util.Collection[Any]]) + } +} + +case class Edge( + name: String, + _from: String, + _to: String + ) diff --git a/pom.xml b/pom.xml index 4ad60f91..5e1a7bab 100644 --- a/pom.xml +++ b/pom.xml @@ -111,7 +111,7 @@ spark-3.3 - 3.3.2 + 3.3.3 3.3 4.1.0 @@ -120,7 +120,7 @@ spark-3.4 - 3.4.0 + 3.4.1 3.4 4.1.0