From 731a70f539efada4d899ffbea9a931b19e9f46b4 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 9 Mar 2022 09:57:59 +0100 Subject: [PATCH 1/5] test retry on timeout --- .../write/WriteResiliencyTest.scala | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala index ca7243bc..dd645fcd 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala @@ -29,22 +29,44 @@ class WriteResiliencyTest extends BaseSparkTest { ("Mamedyarov", "Shakhriyar"), ("So", "Wesley"), ("Radjabov", "Teimour") - ).toDF("surname", "name") + ).toDF("_key", "name") .repartition(6) @BeforeEach def beforeEach(): Unit = { if (collection.exists()) { - collection.drop() + collection.truncate() + } else { + collection.create() } } @ParameterizedTest @MethodSource(Array("provideProtocolAndContentType")) - def save(protocol: String, contentType: String): Unit = { + def retryOnTimeout(protocol: String, contentType: String): Unit = { df.write .format(BaseSparkTest.arangoDatasource) - .mode(SaveMode.Overwrite) + .mode(SaveMode.Append) + .options(options + ( + ArangoDBConf.TIMEOUT -> "1", + ArangoDBConf.ENDPOINTS -> BaseSparkTest.endpoints, + ArangoDBConf.COLLECTION -> collectionName, + ArangoDBConf.PROTOCOL -> protocol, + ArangoDBConf.CONTENT_TYPE -> contentType, + ArangoDBConf.CONFIRM_TRUNCATE -> "true", + ArangoDBConf.OVERWRITE_MODE -> OverwriteMode.replace.getValue + )) + .save() + + assertThat(collection.count().getCount).isEqualTo(10L) + } + + @ParameterizedTest + @MethodSource(Array("provideProtocolAndContentType")) + def retryOnWrongHost(protocol: String, contentType: String): Unit = { + df.write + .format(BaseSparkTest.arangoDatasource) + .mode(SaveMode.Append) .options(options + ( ArangoDBConf.ENDPOINTS -> (BaseSparkTest.endpoints + ",wrongHost:8529"), ArangoDBConf.COLLECTION -> collectionName, From 9f6ab76c27ed894aad9d7f7e517602a5a5e583b1 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 14 Mar 2022 13:40:24 +0100 Subject: [PATCH 2/5] shutdown connection when aborting write task --- .../arangodb/datasource/writer/ArangoDataWriter.scala | 11 +++++++---- .../arangodb/datasource/writer/ArangoDataWriter.scala | 1 + 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala index 3fddd6c6..6b125bcd 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala @@ -45,10 +45,13 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I /** * Data cleanup will happen in [[ArangoDataSourceWriter.abort()]] */ - override def abort(): Unit = if (!canRetry) { - throw new DataWriteAbortException( - "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " + - "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.") + override def abort(): Unit = { + client.shutdown() + if (!canRetry) { + throw new DataWriteAbortException( + "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " + + "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.") + } } private def createClient() = ArangoClient(options.updated(ArangoDBConf.ENDPOINTS, endpoints(endpointIdx))) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala index 4ddea119..05aebf18 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala @@ -45,6 +45,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I * Data cleanup will happen in [[ArangoBatchWriter.abort()]] */ override def abort(): Unit = if (!canRetry) { + client.shutdown() throw new DataWriteAbortException( "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " + "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.") From 34bdb08d464e0275dcbdcaf0a06e9356d194130c Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 14 Mar 2022 13:43:15 +0100 Subject: [PATCH 3/5] set max 1 db connection per client instance --- .../org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala index e5d3f907..640906ba 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala @@ -437,6 +437,7 @@ class ArangoDBDriverConf(opts: Map[String, String]) extends ArangoDBConf(opts) { .useProtocol(arangoProtocol) .timeout(timeout) .user(user) + .maxConnections(1) password.foreach(builder.password) if (sslEnabled) { From 25e8ad01bc89ef666c39b3ef1c02904355bdf1ab Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 14 Mar 2022 13:56:22 +0100 Subject: [PATCH 4/5] prepare next release --- .github/workflows/test.yml | 2 +- arangodb-spark-commons/pom.xml | 2 +- arangodb-spark-datasource-2.4/pom.xml | 2 +- arangodb-spark-datasource-3.1/pom.xml | 2 +- demo/README.md | 2 +- demo/pom.xml | 4 ++-- integration-tests/pom.xml | 2 +- .../sql/arangodb/datasource/write/WriteResiliencyTest.scala | 3 ++- pom.xml | 2 +- 9 files changed, 11 insertions(+), 10 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c463fbd3..36ff9cb4 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -198,7 +198,7 @@ jobs: - name: Info run: mvn -version - name: Install - run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -Dgpg.skip=true install + run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -Dgpg.skip=true -DskipTests=true install - name: Deployment Test run: mvn -f ./demo/pom.xml -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -DimportPath=docker/import test - name: Collect docker logs on failure diff --git a/arangodb-spark-commons/pom.xml b/arangodb-spark-commons/pom.xml index a3d056f6..e3489742 100644 --- a/arangodb-spark-commons/pom.xml +++ b/arangodb-spark-commons/pom.xml @@ -5,7 +5,7 @@ arangodb-spark-datasource com.arangodb - 1.1.1 + 1.2.0-SNAPSHOT 4.0.0 diff --git a/arangodb-spark-datasource-2.4/pom.xml b/arangodb-spark-datasource-2.4/pom.xml index f6175fe5..5c517e19 100644 --- a/arangodb-spark-datasource-2.4/pom.xml +++ b/arangodb-spark-datasource-2.4/pom.xml @@ -5,7 +5,7 @@ arangodb-spark-datasource com.arangodb - 1.1.1 + 1.2.0-SNAPSHOT 4.0.0 diff --git a/arangodb-spark-datasource-3.1/pom.xml b/arangodb-spark-datasource-3.1/pom.xml index 7d4dda3d..bf6badc3 100644 --- a/arangodb-spark-datasource-3.1/pom.xml +++ b/arangodb-spark-datasource-3.1/pom.xml @@ -5,7 +5,7 @@ arangodb-spark-datasource com.arangodb - 1.1.1 + 1.2.0-SNAPSHOT 4.0.0 diff --git a/demo/README.md b/demo/README.md index 0c45acc7..395d3834 100644 --- a/demo/README.md +++ b/demo/README.md @@ -23,7 +23,7 @@ This demo requires: Set environment variables: ```shell -export ARANGO_SPARK_VERSION=1.1.1 +export ARANGO_SPARK_VERSION=1.2.0-SNAPSHOT ``` Start ArangoDB cluster with docker: diff --git a/demo/pom.xml b/demo/pom.xml index 45c1ac62..a18de077 100644 --- a/demo/pom.xml +++ b/demo/pom.xml @@ -6,7 +6,7 @@ com.arangodb demo - 1.1.1 + 1.2.0-SNAPSHOT 1.8 @@ -55,7 +55,7 @@ com.arangodb arangodb-spark-datasource-${spark.compat.version}_${scala.compat.version} - 1.1.1 + 1.2.0-SNAPSHOT org.apache.spark diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index be715dc8..208fbf9f 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -5,7 +5,7 @@ arangodb-spark-datasource com.arangodb - 1.1.1 + 1.2.0-SNAPSHOT 4.0.0 diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala index dd645fcd..df96e0b9 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/write/WriteResiliencyTest.scala @@ -6,7 +6,7 @@ import org.apache.spark.sql.SaveMode import org.apache.spark.sql.arangodb.commons.ArangoDBConf import org.apache.spark.sql.arangodb.datasource.BaseSparkTest import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.{BeforeEach, Disabled} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -41,6 +41,7 @@ class WriteResiliencyTest extends BaseSparkTest { } } + @Disabled("manual test only") @ParameterizedTest @MethodSource(Array("provideProtocolAndContentType")) def retryOnTimeout(protocol: String, contentType: String): Unit = { diff --git a/pom.xml b/pom.xml index 501939e7..e55976c1 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ com.arangodb arangodb-spark-datasource pom - 1.1.1 + 1.2.0-SNAPSHOT ${project.artifactId} ArangoDB Datasource for Apache Spark From d7db86fcf7f9a14ce770e93fcb223aa918aac905 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 14 Mar 2022 14:19:16 +0100 Subject: [PATCH 5/5] added ArangoClient debug logs --- .../sql/arangodb/commons/ArangoClient.scala | 55 +++++++++++++------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index cf171ee3..ca92f695 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -40,7 +40,10 @@ class ArangoClient(options: ArangoDBConf) extends Logging { }) .build() - def shutdown(): Unit = arangoDB.shutdown() + def shutdown(): Unit = { + logDebug("closing db client") + arangoDB.shutdown() + } def readCollectionPartition(shardId: String, filters: Array[PushableFilter], schema: StructType): ArangoCursor[VPackSlice] = { val query = @@ -101,12 +104,16 @@ class ArangoClient(options: ArangoDBConf) extends Logging { cursor.asScala.take(options.readOptions.sampleSize).toSeq } - def collectionExists(): Boolean = arangoDB - .db(options.writeOptions.db) - .collection(options.writeOptions.collection) - .exists() + def collectionExists(): Boolean = { + logDebug("checking collection") + arangoDB + .db(options.writeOptions.db) + .collection(options.writeOptions.collection) + .exists() + } def createCollection(): Unit = { + logDebug("creating collection") val opts = new CollectionCreateOptions() .numberOfShards(options.writeOptions.numberOfShards) .`type`(options.writeOptions.collectionType) @@ -119,11 +126,13 @@ class ArangoClient(options: ArangoDBConf) extends Logging { @tailrec final def truncate(): Unit = { + logDebug("truncating collection") try { arangoDB .db(options.writeOptions.db) .collection(options.writeOptions.collection) .truncate() + logDebug("truncated collection") } catch { case e: ArangoDBException => if (e.getCause.isInstanceOf[TimeoutException]) { @@ -136,12 +145,16 @@ class ArangoClient(options: ArangoDBConf) extends Logging { } } - def drop(): Unit = arangoDB - .db(options.writeOptions.db) - .collection(options.writeOptions.collection) - .drop() + def drop(): Unit = { + logDebug("dropping collection") + arangoDB + .db(options.writeOptions.db) + .collection(options.writeOptions.collection) + .drop() + } def saveDocuments(data: VPackSlice): Unit = { + logDebug("saving batch") val request = new Request( options.writeOptions.db, RequestType.POST, @@ -176,24 +189,30 @@ class ArangoClient(options: ArangoDBConf) extends Logging { @SuppressWarnings(Array("OptionGet")) -object ArangoClient { +object ArangoClient extends Logging { private val INTERNAL_ERROR_CODE = 4 private val SHARDS_API_UNAVAILABLE_CODE = 9 - def apply(options: ArangoDBConf): ArangoClient = new ArangoClient(options) + def apply(options: ArangoDBConf): ArangoClient = { + logDebug("creating db client") + new ArangoClient(options) + } def getCollectionShardIds(options: ArangoDBConf): Array[String] = { + logDebug("reading collection shards") + val client = ArangoClient(options) + val adb = client.arangoDB try { - val client = ArangoClient(options).arangoDB - val res = client.execute(new Request( + val res = adb.execute(new Request( options.readOptions.db, RequestType.GET, s"/_api/collection/${options.readOptions.collection.get}/shards")) - val shardIds: Array[String] = client.util().deserialize(res.getBody.get("shards"), classOf[Array[String]]) + val shardIds: Array[String] = adb.util().deserialize(res.getBody.get("shards"), classOf[Array[String]]) client.shutdown() shardIds } catch { case e: ArangoDBException => + client.shutdown() // single server < 3.8 returns Response: 500, Error: 4 - internal error // single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) { @@ -205,10 +224,12 @@ object ArangoClient { } def acquireHostList(options: ArangoDBConf): Iterable[String] = { - val client = ArangoClient(options).arangoDB - val response = client.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints")) + logDebug("acquiring host list") + val client = ArangoClient(options) + val adb = client.arangoDB + val response = adb.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints")) val field = response.getBody.get("endpoints") - val res = client.util(Serializer.CUSTOM) + val res = adb.util(Serializer.CUSTOM) .deserialize[Seq[Map[String, String]]](field, classOf[Seq[Map[String, String]]]) .map(it => it("endpoint").replaceFirst(".*://", "")) client.shutdown()