From 4400794776a747ba789189165a2811a3e326870e Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Tue, 19 Mar 2024 14:38:56 +0100 Subject: [PATCH 1/3] query ttl --- .../sql/arangodb/commons/ArangoClient.scala | 1 + .../sql/arangodb/commons/ArangoDBConf.scala | 10 ++++ .../sql/arangodb/datasource/ReadTest.scala | 54 ++++++++++++++++--- 3 files changed, 57 insertions(+), 8 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index c0908551..3e1a307f 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -21,6 +21,7 @@ class ArangoClient(options: ArangoDBConf) extends Logging { private def aqlOptions(): AqlQueryOptions = { val opt = new AqlQueryOptions() .stream(options.readOptions.stream) + .ttl(options.readOptions.ttl) .fillBlockCache(options.readOptions.fillBlockCache) .batchSize(options.readOptions.batchSize) opt diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala index 01c5d488..c92aaa4a 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala @@ -155,6 +155,13 @@ object ArangoDBConf { .booleanConf .createWithDefault(true) + val TTL = "ttl" + val DEFAULT_TTL = 30 + val ttlConf: ConfigEntry[Int] = ConfigBuilder(TTL) + .doc("cursor ttl in seconds") + .intConf + .createWithDefault(DEFAULT_TTL) + val PARSE_MODE = "mode" val parseModeConf: ConfigEntry[String] = ConfigBuilder(PARSE_MODE) .doc("allows a mode for dealing with corrupt records during parsing") @@ -273,6 +280,7 @@ object ArangoDBConf { SAMPLE_SIZE -> sampleSizeConf, FILL_BLOCK_CACHE -> fillBlockCacheConf, STREAM -> streamConf, + TTL -> ttlConf, PARSE_MODE -> parseModeConf, COLUMN_NAME_OF_CORRUPT_RECORD -> columnNameOfCorruptRecordConf, @@ -531,6 +539,8 @@ class ArangoDBReadConf(opts: Map[String, String]) extends ArangoDBConf(opts) { val stream: Boolean = getConf(streamConf) + val ttl: Int = getConf(ttlConf) + val parseMode: ParseMode = ParseMode.fromString(getConf(parseModeConf)) val columnNameOfCorruptRecord: String = getConf(columnNameOfCorruptRecordConf).getOrElse("") diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala index 24f9c481..70780245 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala @@ -49,12 +49,12 @@ class ReadTest extends BaseSparkTest { @Test def readCollectionSql(): Unit = { val litalien = spark.sql( - """ - |SELECT likes - |FROM users - |WHERE name == ("Prudence" AS first, "Litalien" AS last) - | AND likes == ARRAY("swimming", "chess") - |""".stripMargin) + """ + |SELECT likes + |FROM users + |WHERE name == ("Prudence" AS first, "Litalien" AS last) + | AND likes == ARRAY("swimming", "chess") + |""".stripMargin) .first() assertThat(litalien.get(0)).isEqualTo(Seq("swimming", "chess")) } @@ -139,7 +139,7 @@ class ReadTest extends BaseSparkTest { val badRecords = df.filter("badRecord IS NOT NULL").persist() .select("badRecord") .collect() - .map(_ (0).asInstanceOf[String]) + .map(_(0).asInstanceOf[String]) assertThat(badRecords).hasSize(1) assertThat(badRecords.head).contains(""""v":"3""") @@ -191,7 +191,7 @@ class ReadTest extends BaseSparkTest { @ParameterizedTest @MethodSource(Array("provideProtocolAndContentType")) - def reatTimeout(protocol: String, contentType: String): Unit = { + def readTimeout(protocol: String, contentType: String): Unit = { val query = """ |RETURN { value: SLEEP(5) } @@ -224,4 +224,42 @@ class ReadTest extends BaseSparkTest { .isInstanceOf(classOf[TimeoutException]) } + @ParameterizedTest + @MethodSource(Array("provideProtocolAndContentType")) + def readTtl(protocol: String, contentType: String): Unit = { + val query = + """ + |FOR i IN 1..3 + |RETURN { value: SLEEP(2) } + |""".stripMargin.replaceAll("\n", "") + + val df = spark.read + .format(BaseSparkTest.arangoDatasource) + .schema(StructType(Array( + StructField("value", NullType) + ))) + .options(options + ( + ArangoDBConf.QUERY -> query, + ArangoDBConf.PROTOCOL -> protocol, + ArangoDBConf.CONTENT_TYPE -> contentType, + ArangoDBConf.BATCH_SIZE -> "1", + ArangoDBConf.TTL -> "1" + )) + .load() + + val thrown: Throwable = catchThrowable(new ThrowingCallable() { + override def call(): Unit = df.show() + }) + + assertThat(thrown) + .isInstanceOf(classOf[SparkException]) + + assertThat(thrown.getCause.getSuppressed).isNotEmpty + assertThat(thrown.getCause.getSuppressed.head).isInstanceOf(classOf[ArangoDBException]) + + val aEx = thrown.getCause.getSuppressed.head.asInstanceOf[ArangoDBException] + assertThat(aEx.getResponseCode).isEqualTo(404) + assertThat(aEx.getErrorNum).isEqualTo(1600) + assertThat(aEx.getMessage).contains("cursor not found") + } } From 15ba40ac73ad432c87974512d599d5ba32bdf7c4 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 20 Mar 2024 10:12:44 +0100 Subject: [PATCH 2/3] fix ttl test --- .../sql/arangodb/datasource/ReadTest.scala | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala index 70780245..823cb64e 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala @@ -229,9 +229,9 @@ class ReadTest extends BaseSparkTest { def readTtl(protocol: String, contentType: String): Unit = { val query = """ - |FOR i IN 1..3 - |RETURN { value: SLEEP(2) } - |""".stripMargin.replaceAll("\n", "") + |FOR i IN 1..10 + |RETURN { value: null } + |""".stripMargin.replaceAll("\n", " ") val df = spark.read .format(BaseSparkTest.arangoDatasource) @@ -248,18 +248,11 @@ class ReadTest extends BaseSparkTest { .load() val thrown: Throwable = catchThrowable(new ThrowingCallable() { - override def call(): Unit = df.show() + override def call(): Unit = df.foreach(_ => Thread.sleep(2000L)) }) - assertThat(thrown) - .isInstanceOf(classOf[SparkException]) - - assertThat(thrown.getCause.getSuppressed).isNotEmpty - assertThat(thrown.getCause.getSuppressed.head).isInstanceOf(classOf[ArangoDBException]) - - val aEx = thrown.getCause.getSuppressed.head.asInstanceOf[ArangoDBException] - assertThat(aEx.getResponseCode).isEqualTo(404) - assertThat(aEx.getErrorNum).isEqualTo(1600) - assertThat(aEx.getMessage).contains("cursor not found") + assertThat(thrown).isInstanceOf(classOf[SparkException]) + assertThat(thrown.getMessage).contains("Error: 1600") + assertThat(thrown.getMessage).contains("cursor not found") } } From 0db7779adafd767136266d9e888df9bfba1cfb05 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Wed, 20 Mar 2024 10:40:45 +0100 Subject: [PATCH 3/3] CI: updated test docker images --- .github/workflows/test.yml | 119 +++++++++++++++++++------------------ 1 file changed, 60 insertions(+), 59 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 62980009..6857f40e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,24 +45,24 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.10.10 - - docker.io/arangodb/enterprise:3.10.10 - - docker.io/arangodb/arangodb:3.11.3 - - docker.io/arangodb/enterprise:3.11.3 + - docker.io/arangodb/arangodb:3.10.13 + - docker.io/arangodb/enterprise:3.10.13 + - docker.io/arangodb/arangodb:3.11.8 + - docker.io/arangodb/enterprise:3.11.8 exclude: - scala-version: 2.13 spark-version: 3.1 - - docker-img: docker.io/arangodb/arangodb:3.10.10 + - docker-img: docker.io/arangodb/arangodb:3.10.13 java-version: 8 - - docker-img: docker.io/arangodb/enterprise:3.10.10 + - docker-img: docker.io/arangodb/enterprise:3.10.13 java-version: 8 - - docker-img: docker.io/arangodb/enterprise:3.10.10 + - docker-img: docker.io/arangodb/enterprise:3.10.13 topology: single - - docker-img: docker.io/arangodb/arangodb:3.11.3 + - docker-img: docker.io/arangodb/arangodb:3.11.8 java-version: 11 - - docker-img: docker.io/arangodb/enterprise:3.11.3 + - docker-img: docker.io/arangodb/enterprise:3.11.8 java-version: 8 - - docker-img: docker.io/arangodb/enterprise:3.11.3 + - docker-img: docker.io/arangodb/enterprise:3.11.8 topology: single steps: @@ -102,7 +102,7 @@ jobs: java-version: - 8 docker-img: - - docker.io/arangodb/arangodb:3.11.3 + - docker.io/arangodb/arangodb:3.11.8 exclude: - scala-version: 2.13 spark-version: 3.1 @@ -138,7 +138,7 @@ jobs: spark-version: [3.1, 3.2, 3.3, 3.4] topology: [single, cluster] java-version: [8, 11] - docker-img: ["docker.io/arangodb/arangodb:3.11.3"] + docker-img: ["docker.io/arangodb/arangodb:3.11.8"] exclude: - topology: cluster java-version: 8 @@ -197,7 +197,7 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.11.3 + - docker.io/arangodb/arangodb:3.11.8 exclude: - scala-version: 2.13 spark-version: 3.1 @@ -305,7 +305,7 @@ jobs: run: ./docker/start_db.sh env: STARTER_MODE: cluster - DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3 + DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.8 - name: Info run: mvn -version - name: Install @@ -313,48 +313,49 @@ jobs: - name: Deployment Test run: mvn -f ./demo/pom.xml -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -DimportPath=docker/import test - sonar: - timeout-minutes: 10 - runs-on: ubuntu-latest - - strategy: - fail-fast: false - matrix: - scala-version: - - 2.12 - spark-version: - - 3.4 - topology: - - single - java-version: - - 11 - docker-img: - - docker.io/arangodb/arangodb:3.11.3 - - steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - uses: actions/setup-java@v2 - with: - java-version: ${{matrix.java-version}} - distribution: 'adopt' - cache: maven - - name: Start Database - run: ./docker/start_db.sh - env: - STARTER_MODE: ${{matrix.topology}} - DOCKER_IMAGE: ${{matrix.docker-img}} - - name: Cache SonarCloud packages - uses: actions/cache@v2 - with: - path: ~/.sonar/cache - key: ${{ runner.os }}-sonar - restore-keys: ${{ runner.os }}-sonar - - name: Info - run: mvn -version - - name: Build and analyze - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} - run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=arangodb_arangodb-spark-datasource +## FIXME: sonar plugin requires Java 17 +# sonar: +# timeout-minutes: 10 +# runs-on: ubuntu-latest +# +# strategy: +# fail-fast: false +# matrix: +# scala-version: +# - 2.12 +# spark-version: +# - 3.4 +# topology: +# - single +# java-version: +# - 11 +# docker-img: +# - docker.io/arangodb/arangodb:3.11.8 +# +# steps: +# - uses: actions/checkout@v2 +# with: +# fetch-depth: 0 +# - uses: actions/setup-java@v2 +# with: +# java-version: ${{matrix.java-version}} +# distribution: 'adopt' +# cache: maven +# - name: Start Database +# run: ./docker/start_db.sh +# env: +# STARTER_MODE: ${{matrix.topology}} +# DOCKER_IMAGE: ${{matrix.docker-img}} +# - name: Cache SonarCloud packages +# uses: actions/cache@v2 +# with: +# path: ~/.sonar/cache +# key: ${{ runner.os }}-sonar +# restore-keys: ${{ runner.os }}-sonar +# - name: Info +# run: mvn -version +# - name: Build and analyze +# env: +# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any +# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} +# run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=arangodb_arangodb-spark-datasource