diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index c463fbd3..36ff9cb4 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -198,7 +198,7 @@ jobs:
- name: Info
run: mvn -version
- name: Install
- run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -Dgpg.skip=true install
+ run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -Dgpg.skip=true -DskipTests=true install
- name: Deployment Test
run: mvn -f ./demo/pom.xml -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -DimportPath=docker/import test
- name: Collect docker logs on failure
diff --git a/arangodb-spark-commons/pom.xml b/arangodb-spark-commons/pom.xml
index a3d056f6..e3489742 100644
--- a/arangodb-spark-commons/pom.xml
+++ b/arangodb-spark-commons/pom.xml
@@ -5,7 +5,7 @@
arangodb-spark-datasource
com.arangodb
- 1.1.1
+ 1.2.0-SNAPSHOT
4.0.0
diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala
index cf171ee3..ca92f695 100644
--- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala
+++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala
@@ -40,7 +40,10 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
})
.build()
- def shutdown(): Unit = arangoDB.shutdown()
+ def shutdown(): Unit = {
+ logDebug("closing db client")
+ arangoDB.shutdown()
+ }
def readCollectionPartition(shardId: String, filters: Array[PushableFilter], schema: StructType): ArangoCursor[VPackSlice] = {
val query =
@@ -101,12 +104,16 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
cursor.asScala.take(options.readOptions.sampleSize).toSeq
}
- def collectionExists(): Boolean = arangoDB
- .db(options.writeOptions.db)
- .collection(options.writeOptions.collection)
- .exists()
+ def collectionExists(): Boolean = {
+ logDebug("checking collection")
+ arangoDB
+ .db(options.writeOptions.db)
+ .collection(options.writeOptions.collection)
+ .exists()
+ }
def createCollection(): Unit = {
+ logDebug("creating collection")
val opts = new CollectionCreateOptions()
.numberOfShards(options.writeOptions.numberOfShards)
.`type`(options.writeOptions.collectionType)
@@ -119,11 +126,13 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
@tailrec
final def truncate(): Unit = {
+ logDebug("truncating collection")
try {
arangoDB
.db(options.writeOptions.db)
.collection(options.writeOptions.collection)
.truncate()
+ logDebug("truncated collection")
} catch {
case e: ArangoDBException =>
if (e.getCause.isInstanceOf[TimeoutException]) {
@@ -136,12 +145,16 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
}
}
- def drop(): Unit = arangoDB
- .db(options.writeOptions.db)
- .collection(options.writeOptions.collection)
- .drop()
+ def drop(): Unit = {
+ logDebug("dropping collection")
+ arangoDB
+ .db(options.writeOptions.db)
+ .collection(options.writeOptions.collection)
+ .drop()
+ }
def saveDocuments(data: VPackSlice): Unit = {
+ logDebug("saving batch")
val request = new Request(
options.writeOptions.db,
RequestType.POST,
@@ -176,24 +189,30 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
@SuppressWarnings(Array("OptionGet"))
-object ArangoClient {
+object ArangoClient extends Logging {
private val INTERNAL_ERROR_CODE = 4
private val SHARDS_API_UNAVAILABLE_CODE = 9
- def apply(options: ArangoDBConf): ArangoClient = new ArangoClient(options)
+ def apply(options: ArangoDBConf): ArangoClient = {
+ logDebug("creating db client")
+ new ArangoClient(options)
+ }
def getCollectionShardIds(options: ArangoDBConf): Array[String] = {
+ logDebug("reading collection shards")
+ val client = ArangoClient(options)
+ val adb = client.arangoDB
try {
- val client = ArangoClient(options).arangoDB
- val res = client.execute(new Request(
+ val res = adb.execute(new Request(
options.readOptions.db,
RequestType.GET,
s"/_api/collection/${options.readOptions.collection.get}/shards"))
- val shardIds: Array[String] = client.util().deserialize(res.getBody.get("shards"), classOf[Array[String]])
+ val shardIds: Array[String] = adb.util().deserialize(res.getBody.get("shards"), classOf[Array[String]])
client.shutdown()
shardIds
} catch {
case e: ArangoDBException =>
+ client.shutdown()
// single server < 3.8 returns Response: 500, Error: 4 - internal error
// single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster
if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) {
@@ -205,10 +224,12 @@ object ArangoClient {
}
def acquireHostList(options: ArangoDBConf): Iterable[String] = {
- val client = ArangoClient(options).arangoDB
- val response = client.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"))
+ logDebug("acquiring host list")
+ val client = ArangoClient(options)
+ val adb = client.arangoDB
+ val response = adb.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"))
val field = response.getBody.get("endpoints")
- val res = client.util(Serializer.CUSTOM)
+ val res = adb.util(Serializer.CUSTOM)
.deserialize[Seq[Map[String, String]]](field, classOf[Seq[Map[String, String]]])
.map(it => it("endpoint").replaceFirst(".*://", ""))
client.shutdown()
diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala
index e5d3f907..640906ba 100644
--- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala
+++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala
@@ -437,6 +437,7 @@ class ArangoDBDriverConf(opts: Map[String, String]) extends ArangoDBConf(opts) {
.useProtocol(arangoProtocol)
.timeout(timeout)
.user(user)
+ .maxConnections(1)
password.foreach(builder.password)
if (sslEnabled) {
diff --git a/arangodb-spark-datasource-2.4/pom.xml b/arangodb-spark-datasource-2.4/pom.xml
index f6175fe5..5c517e19 100644
--- a/arangodb-spark-datasource-2.4/pom.xml
+++ b/arangodb-spark-datasource-2.4/pom.xml
@@ -5,7 +5,7 @@
arangodb-spark-datasource
com.arangodb
- 1.1.1
+ 1.2.0-SNAPSHOT
4.0.0
diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala
index 3fddd6c6..6b125bcd 100644
--- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala
+++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala
@@ -45,10 +45,13 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
/**
* Data cleanup will happen in [[ArangoDataSourceWriter.abort()]]
*/
- override def abort(): Unit = if (!canRetry) {
- throw new DataWriteAbortException(
- "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " +
- "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.")
+ override def abort(): Unit = {
+ client.shutdown()
+ if (!canRetry) {
+ throw new DataWriteAbortException(
+ "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " +
+ "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.")
+ }
}
private def createClient() = ArangoClient(options.updated(ArangoDBConf.ENDPOINTS, endpoints(endpointIdx)))
diff --git a/arangodb-spark-datasource-3.1/pom.xml b/arangodb-spark-datasource-3.1/pom.xml
index 7d4dda3d..bf6badc3 100644
--- a/arangodb-spark-datasource-3.1/pom.xml
+++ b/arangodb-spark-datasource-3.1/pom.xml
@@ -5,7 +5,7 @@
arangodb-spark-datasource
com.arangodb
- 1.1.1
+ 1.2.0-SNAPSHOT
4.0.0
diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala
index 4ddea119..05aebf18 100644
--- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala
+++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala
@@ -45,6 +45,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
* Data cleanup will happen in [[ArangoBatchWriter.abort()]]
*/
override def abort(): Unit = if (!canRetry) {
+ client.shutdown()
throw new DataWriteAbortException(
"Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " +
"'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.")
diff --git a/demo/README.md b/demo/README.md
index 0c45acc7..395d3834 100644
--- a/demo/README.md
+++ b/demo/README.md
@@ -23,7 +23,7 @@ This demo requires:
Set environment variables:
```shell
-export ARANGO_SPARK_VERSION=1.1.1
+export ARANGO_SPARK_VERSION=1.2.0-SNAPSHOT
```
Start ArangoDB cluster with docker:
diff --git a/demo/pom.xml b/demo/pom.xml
index 45c1ac62..a18de077 100644
--- a/demo/pom.xml
+++ b/demo/pom.xml
@@ -6,7 +6,7 @@
com.arangodb
demo
- 1.1.1
+ 1.2.0-SNAPSHOT
1.8
@@ -55,7 +55,7 @@
com.arangodb
arangodb-spark-datasource-${spark.compat.version}_${scala.compat.version}
- 1.1.1
+ 1.2.0-SNAPSHOT
org.apache.spark
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index be715dc8..208fbf9f 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -5,7 +5,7 @@
arangodb-spark-datasource
com.arangodb
- 1.1.1
+ 1.2.0-SNAPSHOT
4.0.0
diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala
index ca7243bc..df96e0b9 100644
--- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala
+++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala
@@ -6,7 +6,7 @@ import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.arangodb.commons.ArangoDBConf
import org.apache.spark.sql.arangodb.datasource.BaseSparkTest
import org.assertj.core.api.Assertions.assertThat
-import org.junit.jupiter.api.BeforeEach
+import org.junit.jupiter.api.{BeforeEach, Disabled}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
@@ -29,22 +29,45 @@ class WriteResiliencyTest extends BaseSparkTest {
("Mamedyarov", "Shakhriyar"),
("So", "Wesley"),
("Radjabov", "Teimour")
- ).toDF("surname", "name")
+ ).toDF("_key", "name")
.repartition(6)
@BeforeEach
def beforeEach(): Unit = {
if (collection.exists()) {
- collection.drop()
+ collection.truncate()
+ } else {
+ collection.create()
}
}
+ @Disabled("manual test only")
@ParameterizedTest
@MethodSource(Array("provideProtocolAndContentType"))
- def save(protocol: String, contentType: String): Unit = {
+ def retryOnTimeout(protocol: String, contentType: String): Unit = {
df.write
.format(BaseSparkTest.arangoDatasource)
- .mode(SaveMode.Overwrite)
+ .mode(SaveMode.Append)
+ .options(options + (
+ ArangoDBConf.TIMEOUT -> "1",
+ ArangoDBConf.ENDPOINTS -> BaseSparkTest.endpoints,
+ ArangoDBConf.COLLECTION -> collectionName,
+ ArangoDBConf.PROTOCOL -> protocol,
+ ArangoDBConf.CONTENT_TYPE -> contentType,
+ ArangoDBConf.CONFIRM_TRUNCATE -> "true",
+ ArangoDBConf.OVERWRITE_MODE -> OverwriteMode.replace.getValue
+ ))
+ .save()
+
+ assertThat(collection.count().getCount).isEqualTo(10L)
+ }
+
+ @ParameterizedTest
+ @MethodSource(Array("provideProtocolAndContentType"))
+ def retryOnWrongHost(protocol: String, contentType: String): Unit = {
+ df.write
+ .format(BaseSparkTest.arangoDatasource)
+ .mode(SaveMode.Append)
.options(options + (
ArangoDBConf.ENDPOINTS -> (BaseSparkTest.endpoints + ",wrongHost:8529"),
ArangoDBConf.COLLECTION -> collectionName,
diff --git a/pom.xml b/pom.xml
index 501939e7..e55976c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
com.arangodb
arangodb-spark-datasource
pom
- 1.1.1
+ 1.2.0-SNAPSHOT
${project.artifactId}
ArangoDB Datasource for Apache Spark