Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 60 additions & 59 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,24 @@ jobs:
- 8
- 11
docker-img:
- docker.io/arangodb/arangodb:3.10.10
- docker.io/arangodb/enterprise:3.10.10
- docker.io/arangodb/arangodb:3.11.3
- docker.io/arangodb/enterprise:3.11.3
- docker.io/arangodb/arangodb:3.10.13
- docker.io/arangodb/enterprise:3.10.13
- docker.io/arangodb/arangodb:3.11.8
- docker.io/arangodb/enterprise:3.11.8
exclude:
- scala-version: 2.13
spark-version: 3.1
- docker-img: docker.io/arangodb/arangodb:3.10.10
- docker-img: docker.io/arangodb/arangodb:3.10.13
java-version: 8
- docker-img: docker.io/arangodb/enterprise:3.10.10
- docker-img: docker.io/arangodb/enterprise:3.10.13
java-version: 8
- docker-img: docker.io/arangodb/enterprise:3.10.10
- docker-img: docker.io/arangodb/enterprise:3.10.13
topology: single
- docker-img: docker.io/arangodb/arangodb:3.11.3
- docker-img: docker.io/arangodb/arangodb:3.11.8
java-version: 11
- docker-img: docker.io/arangodb/enterprise:3.11.3
- docker-img: docker.io/arangodb/enterprise:3.11.8
java-version: 8
- docker-img: docker.io/arangodb/enterprise:3.11.3
- docker-img: docker.io/arangodb/enterprise:3.11.8
topology: single

steps:
Expand Down Expand Up @@ -102,7 +102,7 @@ jobs:
java-version:
- 8
docker-img:
- docker.io/arangodb/arangodb:3.11.3
- docker.io/arangodb/arangodb:3.11.8
exclude:
- scala-version: 2.13
spark-version: 3.1
Expand Down Expand Up @@ -138,7 +138,7 @@ jobs:
spark-version: [3.1, 3.2, 3.3, 3.4]
topology: [single, cluster]
java-version: [8, 11]
docker-img: ["docker.io/arangodb/arangodb:3.11.3"]
docker-img: ["docker.io/arangodb/arangodb:3.11.8"]
exclude:
- topology: cluster
java-version: 8
Expand Down Expand Up @@ -197,7 +197,7 @@ jobs:
- 8
- 11
docker-img:
- docker.io/arangodb/arangodb:3.11.3
- docker.io/arangodb/arangodb:3.11.8
exclude:
- scala-version: 2.13
spark-version: 3.1
Expand Down Expand Up @@ -305,56 +305,57 @@ jobs:
run: ./docker/start_db.sh
env:
STARTER_MODE: cluster
DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.3
DOCKER_IMAGE: docker.io/arangodb/arangodb:3.11.8
- name: Info
run: mvn -version
- name: Install
run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -DskipTests=true install
- name: Deployment Test
run: mvn -f ./demo/pom.xml -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -DimportPath=docker/import test

sonar:
timeout-minutes: 10
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
scala-version:
- 2.12
spark-version:
- 3.4
topology:
- single
java-version:
- 11
docker-img:
- docker.io/arangodb/arangodb:3.11.3

steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- uses: actions/setup-java@v2
with:
java-version: ${{matrix.java-version}}
distribution: 'adopt'
cache: maven
- name: Start Database
run: ./docker/start_db.sh
env:
STARTER_MODE: ${{matrix.topology}}
DOCKER_IMAGE: ${{matrix.docker-img}}
- name: Cache SonarCloud packages
uses: actions/cache@v2
with:
path: ~/.sonar/cache
key: ${{ runner.os }}-sonar
restore-keys: ${{ runner.os }}-sonar
- name: Info
run: mvn -version
- name: Build and analyze
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=arangodb_arangodb-spark-datasource
## FIXME: sonar plugin requires Java 17
# sonar:
# timeout-minutes: 10
# runs-on: ubuntu-latest
#
# strategy:
# fail-fast: false
# matrix:
# scala-version:
# - 2.12
# spark-version:
# - 3.4
# topology:
# - single
# java-version:
# - 11
# docker-img:
# - docker.io/arangodb/arangodb:3.11.8
#
# steps:
# - uses: actions/checkout@v2
# with:
# fetch-depth: 0
# - uses: actions/setup-java@v2
# with:
# java-version: ${{matrix.java-version}}
# distribution: 'adopt'
# cache: maven
# - name: Start Database
# run: ./docker/start_db.sh
# env:
# STARTER_MODE: ${{matrix.topology}}
# DOCKER_IMAGE: ${{matrix.docker-img}}
# - name: Cache SonarCloud packages
# uses: actions/cache@v2
# with:
# path: ~/.sonar/cache
# key: ${{ runner.os }}-sonar
# restore-keys: ${{ runner.os }}-sonar
# - name: Info
# run: mvn -version
# - name: Build and analyze
# env:
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} # Needed to get PR information, if any
# SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }}
# run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dgpg.skip=true -B verify org.sonarsource.scanner.maven:sonar-maven-plugin:sonar -Dsonar.projectKey=arangodb_arangodb-spark-datasource
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
private def aqlOptions(): AqlQueryOptions = {
val opt = new AqlQueryOptions()
.stream(options.readOptions.stream)
.ttl(options.readOptions.ttl)
.fillBlockCache(options.readOptions.fillBlockCache)
.batchSize(options.readOptions.batchSize)
opt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ object ArangoDBConf {
.booleanConf
.createWithDefault(true)

val TTL = "ttl"
val DEFAULT_TTL = 30
val ttlConf: ConfigEntry[Int] = ConfigBuilder(TTL)
.doc("cursor ttl in seconds")
.intConf
.createWithDefault(DEFAULT_TTL)

val PARSE_MODE = "mode"
val parseModeConf: ConfigEntry[String] = ConfigBuilder(PARSE_MODE)
.doc("allows a mode for dealing with corrupt records during parsing")
Expand Down Expand Up @@ -273,6 +280,7 @@ object ArangoDBConf {
SAMPLE_SIZE -> sampleSizeConf,
FILL_BLOCK_CACHE -> fillBlockCacheConf,
STREAM -> streamConf,
TTL -> ttlConf,
PARSE_MODE -> parseModeConf,
COLUMN_NAME_OF_CORRUPT_RECORD -> columnNameOfCorruptRecordConf,

Expand Down Expand Up @@ -531,6 +539,8 @@ class ArangoDBReadConf(opts: Map[String, String]) extends ArangoDBConf(opts) {

val stream: Boolean = getConf(streamConf)

val ttl: Int = getConf(ttlConf)

val parseMode: ParseMode = ParseMode.fromString(getConf(parseModeConf))

val columnNameOfCorruptRecord: String = getConf(columnNameOfCorruptRecordConf).getOrElse("")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ class ReadTest extends BaseSparkTest {
@Test
def readCollectionSql(): Unit = {
val litalien = spark.sql(
"""
|SELECT likes
|FROM users
|WHERE name == ("Prudence" AS first, "Litalien" AS last)
| AND likes == ARRAY("swimming", "chess")
|""".stripMargin)
"""
|SELECT likes
|FROM users
|WHERE name == ("Prudence" AS first, "Litalien" AS last)
| AND likes == ARRAY("swimming", "chess")
|""".stripMargin)
.first()
assertThat(litalien.get(0)).isEqualTo(Seq("swimming", "chess"))
}
Expand Down Expand Up @@ -139,7 +139,7 @@ class ReadTest extends BaseSparkTest {
val badRecords = df.filter("badRecord IS NOT NULL").persist()
.select("badRecord")
.collect()
.map(_ (0).asInstanceOf[String])
.map(_(0).asInstanceOf[String])

assertThat(badRecords).hasSize(1)
assertThat(badRecords.head).contains(""""v":"3""")
Expand Down Expand Up @@ -191,7 +191,7 @@ class ReadTest extends BaseSparkTest {

@ParameterizedTest
@MethodSource(Array("provideProtocolAndContentType"))
def reatTimeout(protocol: String, contentType: String): Unit = {
def readTimeout(protocol: String, contentType: String): Unit = {
val query =
"""
|RETURN { value: SLEEP(5) }
Expand Down Expand Up @@ -224,4 +224,35 @@ class ReadTest extends BaseSparkTest {
.isInstanceOf(classOf[TimeoutException])
}

@ParameterizedTest
@MethodSource(Array("provideProtocolAndContentType"))
def readTtl(protocol: String, contentType: String): Unit = {
val query =
"""
|FOR i IN 1..10
|RETURN { value: null }
|""".stripMargin.replaceAll("\n", " ")

val df = spark.read
.format(BaseSparkTest.arangoDatasource)
.schema(StructType(Array(
StructField("value", NullType)
)))
.options(options + (
ArangoDBConf.QUERY -> query,
ArangoDBConf.PROTOCOL -> protocol,
ArangoDBConf.CONTENT_TYPE -> contentType,
ArangoDBConf.BATCH_SIZE -> "1",
ArangoDBConf.TTL -> "1"
))
.load()

val thrown: Throwable = catchThrowable(new ThrowingCallable() {
override def call(): Unit = df.foreach(_ => Thread.sleep(2000L))
})

assertThat(thrown).isInstanceOf(classOf[SparkException])
assertThat(thrown.getMessage).contains("Error: 1600")
assertThat(thrown.getMessage).contains("cursor not found")
}
}