Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ jobs:
- name: Info
run: mvn -version
- name: Install
run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -Dgpg.skip=true install
run: mvn -e --no-transfer-progress -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -Dgpg.skip=true -DskipTests=true install
- name: Deployment Test
run: mvn -f ./demo/pom.xml -Pscala-${{matrix.scala-version}} -Pspark-${{matrix.spark-version}} -Dspark.version=${{matrix.spark-full-version}} -DimportPath=docker/import test
- name: Collect docker logs on failure
Expand Down
2 changes: 1 addition & 1 deletion arangodb-spark-commons/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
})
.build()

def shutdown(): Unit = arangoDB.shutdown()
def shutdown(): Unit = {
logDebug("closing db client")
arangoDB.shutdown()
}

def readCollectionPartition(shardId: String, filters: Array[PushableFilter], schema: StructType): ArangoCursor[VPackSlice] = {
val query =
Expand Down Expand Up @@ -101,12 +104,16 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
cursor.asScala.take(options.readOptions.sampleSize).toSeq
}

def collectionExists(): Boolean = arangoDB
.db(options.writeOptions.db)
.collection(options.writeOptions.collection)
.exists()
def collectionExists(): Boolean = {
logDebug("checking collection")
arangoDB
.db(options.writeOptions.db)
.collection(options.writeOptions.collection)
.exists()
}

def createCollection(): Unit = {
logDebug("creating collection")
val opts = new CollectionCreateOptions()
.numberOfShards(options.writeOptions.numberOfShards)
.`type`(options.writeOptions.collectionType)
Expand All @@ -119,11 +126,13 @@ class ArangoClient(options: ArangoDBConf) extends Logging {

@tailrec
final def truncate(): Unit = {
logDebug("truncating collection")
try {
arangoDB
.db(options.writeOptions.db)
.collection(options.writeOptions.collection)
.truncate()
logDebug("truncated collection")
} catch {
case e: ArangoDBException =>
if (e.getCause.isInstanceOf[TimeoutException]) {
Expand All @@ -136,12 +145,16 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
}
}

def drop(): Unit = arangoDB
.db(options.writeOptions.db)
.collection(options.writeOptions.collection)
.drop()
def drop(): Unit = {
logDebug("dropping collection")
arangoDB
.db(options.writeOptions.db)
.collection(options.writeOptions.collection)
.drop()
}

def saveDocuments(data: VPackSlice): Unit = {
logDebug("saving batch")
val request = new Request(
options.writeOptions.db,
RequestType.POST,
Expand Down Expand Up @@ -176,24 +189,30 @@ class ArangoClient(options: ArangoDBConf) extends Logging {


@SuppressWarnings(Array("OptionGet"))
object ArangoClient {
object ArangoClient extends Logging {
private val INTERNAL_ERROR_CODE = 4
private val SHARDS_API_UNAVAILABLE_CODE = 9

def apply(options: ArangoDBConf): ArangoClient = new ArangoClient(options)
def apply(options: ArangoDBConf): ArangoClient = {
logDebug("creating db client")
new ArangoClient(options)
}

def getCollectionShardIds(options: ArangoDBConf): Array[String] = {
logDebug("reading collection shards")
val client = ArangoClient(options)
val adb = client.arangoDB
try {
val client = ArangoClient(options).arangoDB
val res = client.execute(new Request(
val res = adb.execute(new Request(
options.readOptions.db,
RequestType.GET,
s"/_api/collection/${options.readOptions.collection.get}/shards"))
val shardIds: Array[String] = client.util().deserialize(res.getBody.get("shards"), classOf[Array[String]])
val shardIds: Array[String] = adb.util().deserialize(res.getBody.get("shards"), classOf[Array[String]])
client.shutdown()
shardIds
} catch {
case e: ArangoDBException =>
client.shutdown()
// single server < 3.8 returns Response: 500, Error: 4 - internal error
// single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster
if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) {
Expand All @@ -205,10 +224,12 @@ object ArangoClient {
}

def acquireHostList(options: ArangoDBConf): Iterable[String] = {
val client = ArangoClient(options).arangoDB
val response = client.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"))
logDebug("acquiring host list")
val client = ArangoClient(options)
val adb = client.arangoDB
val response = adb.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"))
val field = response.getBody.get("endpoints")
val res = client.util(Serializer.CUSTOM)
val res = adb.util(Serializer.CUSTOM)
.deserialize[Seq[Map[String, String]]](field, classOf[Seq[Map[String, String]]])
.map(it => it("endpoint").replaceFirst(".*://", ""))
client.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,7 @@ class ArangoDBDriverConf(opts: Map[String, String]) extends ArangoDBConf(opts) {
.useProtocol(arangoProtocol)
.timeout(timeout)
.user(user)
.maxConnections(1)
password.foreach(builder.password)

if (sslEnabled) {
Expand Down
2 changes: 1 addition & 1 deletion arangodb-spark-datasource-2.4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
/**
* Data cleanup will happen in [[ArangoDataSourceWriter.abort()]]
*/
override def abort(): Unit = if (!canRetry) {
throw new DataWriteAbortException(
"Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " +
"'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.")
override def abort(): Unit = {
client.shutdown()
if (!canRetry) {
throw new DataWriteAbortException(
"Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " +
"'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.")
}
}

private def createClient() = ArangoClient(options.updated(ArangoDBConf.ENDPOINTS, endpoints(endpointIdx)))
Expand Down
2 changes: 1 addition & 1 deletion arangodb-spark-datasource-3.1/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I
* Data cleanup will happen in [[ArangoBatchWriter.abort()]]
*/
override def abort(): Unit = if (!canRetry) {
client.shutdown()
throw new DataWriteAbortException(
"Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " +
"'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.")
Expand Down
2 changes: 1 addition & 1 deletion demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ This demo requires:
Set environment variables:

```shell
export ARANGO_SPARK_VERSION=1.1.1
export ARANGO_SPARK_VERSION=1.2.0-SNAPSHOT
```

Start ArangoDB cluster with docker:
Expand Down
4 changes: 2 additions & 2 deletions demo/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.arangodb</groupId>
<artifactId>demo</artifactId>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.8</maven.compiler.source>
Expand Down Expand Up @@ -55,7 +55,7 @@
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-datasource-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.arangodb.commons.ArangoDBConf
import org.apache.spark.sql.arangodb.datasource.BaseSparkTest
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.{BeforeEach, Disabled}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource

Expand All @@ -29,22 +29,45 @@ class WriteResiliencyTest extends BaseSparkTest {
("Mamedyarov", "Shakhriyar"),
("So", "Wesley"),
("Radjabov", "Teimour")
).toDF("surname", "name")
).toDF("_key", "name")
.repartition(6)

@BeforeEach
def beforeEach(): Unit = {
if (collection.exists()) {
collection.drop()
collection.truncate()
} else {
collection.create()
}
}

@Disabled("manual test only")
@ParameterizedTest
@MethodSource(Array("provideProtocolAndContentType"))
def save(protocol: String, contentType: String): Unit = {
def retryOnTimeout(protocol: String, contentType: String): Unit = {
df.write
.format(BaseSparkTest.arangoDatasource)
.mode(SaveMode.Overwrite)
.mode(SaveMode.Append)
.options(options + (
ArangoDBConf.TIMEOUT -> "1",
ArangoDBConf.ENDPOINTS -> BaseSparkTest.endpoints,
ArangoDBConf.COLLECTION -> collectionName,
ArangoDBConf.PROTOCOL -> protocol,
ArangoDBConf.CONTENT_TYPE -> contentType,
ArangoDBConf.CONFIRM_TRUNCATE -> "true",
ArangoDBConf.OVERWRITE_MODE -> OverwriteMode.replace.getValue
))
.save()

assertThat(collection.count().getCount).isEqualTo(10L)
}

@ParameterizedTest
@MethodSource(Array("provideProtocolAndContentType"))
def retryOnWrongHost(protocol: String, contentType: String): Unit = {
df.write
.format(BaseSparkTest.arangoDatasource)
.mode(SaveMode.Append)
.options(options + (
ArangoDBConf.ENDPOINTS -> (BaseSparkTest.endpoints + ",wrongHost:8529"),
ArangoDBConf.COLLECTION -> collectionName,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-datasource</artifactId>
<packaging>pom</packaging>
<version>1.1.1</version>
<version>1.2.0-SNAPSHOT</version>

<name>${project.artifactId}</name>
<description>ArangoDB Datasource for Apache Spark</description>
Expand Down