diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 62980009..6857f40e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -45,24 +45,24 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.10.10 - - docker.io/arangodb/enterprise:3.10.10 - - docker.io/arangodb/arangodb:3.11.3 - - docker.io/arangodb/enterprise:3.11.3 + - docker.io/arangodb/arangodb:3.10.13 + - docker.io/arangodb/enterprise:3.10.13 + - docker.io/arangodb/arangodb:3.11.8 + - docker.io/arangodb/enterprise:3.11.8 exclude: - scala-version: 2.13 spark-version: 3.1 - - docker-img: docker.io/arangodb/arangodb:3.10.10 + - docker-img: docker.io/arangodb/arangodb:3.10.13 java-version: 8 - - docker-img: docker.io/arangodb/enterprise:3.10.10 + - docker-img: docker.io/arangodb/enterprise:3.10.13 java-version: 8 - - docker-img: docker.io/arangodb/enterprise:3.10.10 + - docker-img: docker.io/arangodb/enterprise:3.10.13 topology: single - - docker-img: docker.io/arangodb/arangodb:3.11.3 + - docker-img: docker.io/arangodb/arangodb:3.11.8 java-version: 11 - - docker-img: docker.io/arangodb/enterprise:3.11.3 + - docker-img: docker.io/arangodb/enterprise:3.11.8 java-version: 8 - - docker-img: docker.io/arangodb/enterprise:3.11.3 + - docker-img: docker.io/arangodb/enterprise:3.11.8 topology: single steps: @@ -102,7 +102,7 @@ jobs: java-version: - 8 docker-img: - - docker.io/arangodb/arangodb:3.11.3 + - docker.io/arangodb/arangodb:3.11.8 exclude: - scala-version: 2.13 spark-version: 3.1 @@ -138,7 +138,7 @@ jobs: spark-version: [3.1, 3.2, 3.3, 3.4] topology: [single, cluster] java-version: [8, 11] - docker-img: ["docker.io/arangodb/arangodb:3.11.3"] + docker-img: ["docker.io/arangodb/arangodb:3.11.8"] exclude: - topology: cluster java-version: 8 @@ -197,7 +197,7 @@ jobs: - 8 - 11 docker-img: - - docker.io/arangodb/arangodb:3.11.3 + - docker.io/arangodb/arangodb:3.11.8 exclude: - scala-version: 2.13 spark-version: 3.1 @@ -305,7 +305,7 @@ jobs: run: ./docker/start_db.sh env: STARTER_MODE: cluster - DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3 + DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.8 - name: Info run: mvn -version - name: Install @@ -313,48 +313,49 @@ jobs: - name: Deployment Test run: mvn -f ./demo/pom.xml -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -DimportPath=docker/import test - sonar: - timeout-minutes: 10 - runs-on: ubuntu-latest - - strategy: - fail-fast: false - matrix: - scala-version: - - 2.12 - spark-version: - - 3.4 - topology: - - single - java-version: - - 11 - docker-img: - - docker.io/arangodb/arangodb:3.11.3 - - steps: - - uses: actions/checkout@v2 - with: - fetch-depth: 0 - - uses: actions/setup-java@v2 - with: - java-version: ${{matrix.java-version}} - distribution: 'adopt' - cache: maven - - name: Start Database - run: ./docker/start_db.sh - env: - STARTER_MODE: ${{matrix.topology}} - DOCKER_IMAGE: ${{matrix.docker-img}} - - name: Cache SonarCloud packages - uses: actions/cache@v2 - with: - path: ~/.sonar/cache - key: ${{ runner.os }}-sonar - restore-keys: ${{ runner.os }}-sonar - - name: Info - run: mvn -version - - name: Build and analyze - env: - GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} - run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=arangodb_arangodb-spark-datasource +## FIXME: sonar plugin requires Java 17 +# sonar: +# timeout-minutes: 10 +# runs-on: ubuntu-latest +# +# strategy: +# fail-fast: false +# matrix: +# scala-version: +# - 2.12 +# spark-version: +# - 3.4 +# topology: +# - single +# java-version: +# - 11 +# docker-img: +# - docker.io/arangodb/arangodb:3.11.8 +# +# steps: +# - uses: actions/checkout@v2 +# with: +# fetch-depth: 0 +# - uses: actions/setup-java@v2 +# with: +# java-version: ${{matrix.java-version}} +# distribution: 'adopt' +# cache: maven +# - name: Start Database +# run: ./docker/start_db.sh +# env: +# STARTER_MODE: ${{matrix.topology}} +# DOCKER_IMAGE: ${{matrix.docker-img}} +# - name: Cache SonarCloud packages +# uses: actions/cache@v2 +# with: +# path: ~/.sonar/cache +# key: ${{ runner.os }}-sonar +# restore-keys: ${{ runner.os }}-sonar +# - name: Info +# run: mvn -version +# - name: Build and analyze +# env: +# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any +# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} +# run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=arangodb_arangodb-spark-datasource diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index c0908551..3e1a307f 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -21,6 +21,7 @@ class ArangoClient(options: ArangoDBConf) extends Logging { private def aqlOptions(): AqlQueryOptions = { val opt = new AqlQueryOptions() .stream(options.readOptions.stream) + .ttl(options.readOptions.ttl) .fillBlockCache(options.readOptions.fillBlockCache) .batchSize(options.readOptions.batchSize) opt diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala index 01c5d488..c92aaa4a 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala @@ -155,6 +155,13 @@ object ArangoDBConf { .booleanConf .createWithDefault(true) + val TTL = "ttl" + val DEFAULT_TTL = 30 + val ttlConf: ConfigEntry[Int] = ConfigBuilder(TTL) + .doc("cursor ttl in seconds") + .intConf + .createWithDefault(DEFAULT_TTL) + val PARSE_MODE = "mode" val parseModeConf: ConfigEntry[String] = ConfigBuilder(PARSE_MODE) .doc("allows a mode for dealing with corrupt records during parsing") @@ -273,6 +280,7 @@ object ArangoDBConf { SAMPLE_SIZE -> sampleSizeConf, FILL_BLOCK_CACHE -> fillBlockCacheConf, STREAM -> streamConf, + TTL -> ttlConf, PARSE_MODE -> parseModeConf, COLUMN_NAME_OF_CORRUPT_RECORD -> columnNameOfCorruptRecordConf, @@ -531,6 +539,8 @@ class ArangoDBReadConf(opts: Map[String, String]) extends ArangoDBConf(opts) { val stream: Boolean = getConf(streamConf) + val ttl: Int = getConf(ttlConf) + val parseMode: ParseMode = ParseMode.fromString(getConf(parseModeConf)) val columnNameOfCorruptRecord: String = getConf(columnNameOfCorruptRecordConf).getOrElse("") diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala index 24f9c481..823cb64e 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/ReadTest.scala @@ -49,12 +49,12 @@ class ReadTest extends BaseSparkTest { @Test def readCollectionSql(): Unit = { val litalien = spark.sql( - """ - |SELECT likes - |FROM users - |WHERE name == ("Prudence" AS first, "Litalien" AS last) - | AND likes == ARRAY("swimming", "chess") - |""".stripMargin) + """ + |SELECT likes + |FROM users + |WHERE name == ("Prudence" AS first, "Litalien" AS last) + | AND likes == ARRAY("swimming", "chess") + |""".stripMargin) .first() assertThat(litalien.get(0)).isEqualTo(Seq("swimming", "chess")) } @@ -139,7 +139,7 @@ class ReadTest extends BaseSparkTest { val badRecords = df.filter("badRecord IS NOT NULL").persist() .select("badRecord") .collect() - .map(_ (0).asInstanceOf[String]) + .map(_(0).asInstanceOf[String]) assertThat(badRecords).hasSize(1) assertThat(badRecords.head).contains(""""v":"3""") @@ -191,7 +191,7 @@ class ReadTest extends BaseSparkTest { @ParameterizedTest @MethodSource(Array("provideProtocolAndContentType")) - def reatTimeout(protocol: String, contentType: String): Unit = { + def readTimeout(protocol: String, contentType: String): Unit = { val query = """ |RETURN { value: SLEEP(5) } @@ -224,4 +224,35 @@ class ReadTest extends BaseSparkTest { .isInstanceOf(classOf[TimeoutException]) } + @ParameterizedTest + @MethodSource(Array("provideProtocolAndContentType")) + def readTtl(protocol: String, contentType: String): Unit = { + val query = + """ + |FOR i IN 1..10 + |RETURN { value: null } + |""".stripMargin.replaceAll("\n", " ") + + val df = spark.read + .format(BaseSparkTest.arangoDatasource) + .schema(StructType(Array( + StructField("value", NullType) + ))) + .options(options + ( + ArangoDBConf.QUERY -> query, + ArangoDBConf.PROTOCOL -> protocol, + ArangoDBConf.CONTENT_TYPE -> contentType, + ArangoDBConf.BATCH_SIZE -> "1", + ArangoDBConf.TTL -> "1" + )) + .load() + + val thrown: Throwable = catchThrowable(new ThrowingCallable() { + override def call(): Unit = df.foreach(_ => Thread.sleep(2000L)) + }) + + assertThat(thrown).isInstanceOf(classOf[SparkException]) + assertThat(thrown.getMessage).contains("Error: 1600") + assertThat(thrown.getMessage).contains("cursor not found") + } }