Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ jobs:
- 8
- 11
docker-img:
- docker.io/arangodb/arangodb:3.9.10
- docker.io/arangodb/arangodb:3.10.6
- docker.io/arangodb/arangodb:3.11.0
- docker.io/arangodb/arangodb:3.10.10
- docker.io/arangodb/enterprise:3.10.10
- docker.io/arangodb/arangodb:3.11.3
- docker.io/arangodb/enterprise:3.11.3
exclude:
- scala-version: 2.11
spark-version: 3.1
Expand All @@ -65,12 +66,18 @@ jobs:
spark-version: 2.4
- scala-version: 2.13
spark-version: 3.1
- docker-img: docker.io/arangodb/arangodb:3.9.10
- docker-img: docker.io/arangodb/arangodb:3.10.10
java-version: 8
- docker-img: docker.io/arangodb/arangodb:3.10.6
- docker-img: docker.io/arangodb/enterprise:3.10.10
java-version: 8
- docker-img: docker.io/arangodb/arangodb:3.11.0
- docker-img: docker.io/arangodb/enterprise:3.10.10
topology: single
- docker-img: docker.io/arangodb/arangodb:3.11.3
java-version: 11
- docker-img: docker.io/arangodb/enterprise:3.11.3
java-version: 8
- docker-img: docker.io/arangodb/enterprise:3.11.3
topology: single

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -111,7 +118,7 @@ jobs:
java-version:
- 8
docker-img:
- docker.io/arangodb/arangodb:3.11.0
- docker.io/arangodb/arangodb:3.11.3
exclude:
- scala-version: 2.11
spark-version: 3.1
Expand Down Expand Up @@ -157,7 +164,7 @@ jobs:
spark-version: [3.1, 3.2, 3.3, 3.4]
topology: [single, cluster]
java-version: [8, 11]
docker-img: ["docker.io/arangodb/arangodb:3.11.0"]
docker-img: ["docker.io/arangodb/arangodb:3.11.3"]
exclude:
- topology: cluster
java-version: 8
Expand Down Expand Up @@ -218,7 +225,7 @@ jobs:
- 8
- 11
docker-img:
- docker.io/arangodb/arangodb:3.11.0
- docker.io/arangodb/arangodb:3.11.3
exclude:
- scala-version: 2.11
spark-version: 3.1
Expand Down Expand Up @@ -344,7 +351,7 @@ jobs:
run: ./docker/start_db.sh
env:
STARTER_MODE: cluster
DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.0
DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3
- name: Info
run: mvn -version
- name: Install
Expand All @@ -368,7 +375,7 @@ jobs:
java-version:
- 11
docker-img:
- docker.io/arangodb/arangodb:3.11.0
- docker.io/arangodb/arangodb:3.11.3

steps:
- uses: actions/checkout@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,22 @@ object ArangoClient extends Logging {
val client = ArangoClient(options)
val adb = client.arangoDB
try {
val res = adb.execute(new Request.Builder[Void]()
val colName = options.readOptions.collection.get
val props = adb.execute(new Request.Builder[Void]()
.db(options.readOptions.db)
.method(Request.Method.GET)
.path(s"/_api/collection/${options.readOptions.collection.get}/shards")
.path(s"/_api/collection/$colName/properties")
.build(),
classOf[RawBytes])
val shardIds: Array[String] = adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]])
classOf[java.util.Map[String, Any]]).getBody

val shardIds: Array[String] =
if (props.get("isSmart") == true && props.get("type") == 3) {
// Smart Edge collection (BTS-1595, BTS-1596)
requestShards(adb, options.readOptions.db, s"_local_$colName") ++
requestShards(adb, options.readOptions.db, s"_from_$colName")
} else {
requestShards(adb, options.readOptions.db, colName)
}
client.shutdown()
shardIds
} catch {
Expand All @@ -221,13 +230,23 @@ object ArangoClient extends Logging {
// single server < 3.8 returns Response: 500, Error: 4 - internal error
// single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster
if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) {
Array("")
Array(null)
} else {
throw e
}
}
}

private def requestShards(adb: ArangoDB, db: String, col: String): Array[String] = {
val res = adb.execute(new Request.Builder[Void]()
.db(db)
.method(Request.Method.GET)
.path(s"/_api/collection/$col/shards")
.build(),
classOf[RawBytes])
adb.getSerde.deserialize(res.getBody.get, "/shards", classOf[Array[String]])
}

def acquireHostList(options: ArangoDBConf): Iterable[String] = {
logDebug("acquiring host list")
val client = ArangoClient(options)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.apache.spark.sql.arangodb.datasource

import com.arangodb.entity.ServerRole
import com.arangodb.entity.{License, ServerRole}
import com.arangodb.model.CollectionCreateOptions
import com.arangodb.serde.jackson.JacksonSerde
import com.arangodb.spark.DefaultSource
Expand Down Expand Up @@ -33,7 +33,9 @@ class BaseSparkTest {

def isSingle: Boolean = BaseSparkTest.isSingle

def isCluster: Boolean = !BaseSparkTest.isSingle
def isCluster: Boolean = BaseSparkTest.isCluster

def isEnterprise: Boolean = BaseSparkTest.isEnterprise
}

object BaseSparkTest {
Expand Down Expand Up @@ -78,8 +80,10 @@ object BaseSparkTest {
.serde(serde)
.build()
}
private val db: ArangoDatabase = arangoDB.db(database)
private val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE
val db: ArangoDatabase = arangoDB.db(database)
val isSingle: Boolean = arangoDB.getRole == ServerRole.SINGLE
val isCluster: Boolean = !isSingle
val isEnterprise: Boolean = arangoDB.getVersion.getLicense == License.ENTERPRISE
private val options = Map(
"database" -> database,
"user" -> user,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package org.apache.spark.sql.arangodb.datasource

import com.arangodb.entity.EdgeDefinition
import com.arangodb.model.GraphCreateOptions
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.arangodb.commons.ArangoDBConf
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Assumptions.assumeTrue
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

import java.util
import scala.collection.JavaConverters.asJavaIterableConverter
import scala.collection.immutable

class ReadSmartEdgeCollectionTest extends BaseSparkTest {

@ParameterizedTest
@MethodSource(Array("provideProtocolAndContentType"))
def readSmartEdgeCollection(protocol: String, contentType: String): Unit = {
val df: DataFrame = spark.read
.format(BaseSparkTest.arangoDatasource)
.options(options + (
ArangoDBConf.COLLECTION -> ReadSmartEdgeCollectionTest.name,
ArangoDBConf.PROTOCOL -> protocol,
ArangoDBConf.CONTENT_TYPE -> contentType
))
.load()


import spark.implicits._
val read = df
.as[Edge]
.collect()

assertThat(read.map(_.name)).containsAll(ReadSmartEdgeCollectionTest.data.map(d => d("name")).asJava)
}

}

object ReadSmartEdgeCollectionTest {
val name = "smartEdgeCol"
val from = s"from-$name"
val to = s"from-$name"

val data: immutable.Seq[Map[String, String]] = (1 to 10)
.map(x => Map(
"name" -> s"name-$x",
"_from" -> s"$from/a:$x",
"_to" -> s"$to/b:$x"
))

@BeforeAll
def init(): Unit = {
assumeTrue(BaseSparkTest.isCluster && BaseSparkTest.isEnterprise)

if (BaseSparkTest.db.graph(name).exists()) {
BaseSparkTest.db.graph(name).drop(true)
}

val ed = new EdgeDefinition()
.collection(name)
.from(from)
.to(to)
val opts = new GraphCreateOptions()
.numberOfShards(2)
.isSmart(true)
.smartGraphAttribute("name")
BaseSparkTest.db.createGraph(name, List(ed).asJava.asInstanceOf[util.Collection[EdgeDefinition]], opts)
BaseSparkTest.db.collection(name).insertDocuments(data.asJava.asInstanceOf[util.Collection[Any]])
}
}

case class Edge(
name: String,
_from: String,
_to: String
)
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<profile>
<id>spark-3.3</id>
<properties>
<spark.version>3.3.2</spark.version>
<spark.version>3.3.3</spark.version>
<spark.compat.version>3.3</spark.compat.version>
<jackson.vpack.variant/>
<jackson.vpack.version>4.1.0</jackson.vpack.version>
Expand All @@ -120,7 +120,7 @@
<profile>
<id>spark-3.4</id>
<properties>
<spark.version>3.4.0</spark.version>
<spark.version>3.4.1</spark.version>
<spark.compat.version>3.4</spark.compat.version>
<jackson.vpack.variant/>
<jackson.vpack.version>4.1.0</jackson.vpack.version>
Expand Down