Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,16 @@ To use in external Spark cluster, submit your application with the following par
- `user`: db user, default `root`
- `password`: db password
- `endpoints`: list of coordinators, eg. `c1:8529,c2:8529` (required)
- `acquire-host-list`: acquire the list of all known hosts in the cluster (`true`|`false`), default `false`
- `acquireHostList`: acquire the list of all known hosts in the cluster (`true`|`false`), default `false`
- `protocol`: communication protocol (`vst`|`http`), default `http`
- `content-type`: content type for driver communication (`json`|`vpack`), default `vpack`
- `contentType`: content type for driver communication (`json`|`vpack`), default `json`
- `ssl.enabled`: ssl secured driver connection (`true`|`false`), default `false`
- `ssl.cert.value`: base64 encoded certificate
- `ssl.cert.type`: certificate type, default `X.509`
- `ssl.cert.alias`: certificate alias name, default `arangodb`
- `ssl.algorithm`: trust manager algorithm, default `SunX509`
- `ssl.keystore.type`: keystore type, default `jks`
- `ssl.protocol`: SSLContext protocol, default `TLS`
- `database`: database name, default `_system`

### SSL

Expand Down Expand Up @@ -134,11 +133,12 @@ usersDF.filter(col("birthday") === "1982-12-15").show()

### Read Configuration

- `database`: database name, default `_system`
- `table`: datasource ArangoDB collection name, ignored if `query` is specified. Either `table` or `query` is required.
- `query`: custom AQL read query. If set, `table` will be ignored. Either `table` or `query` is required.
- `sample.size`: sample size prefetched for schema inference, only used if read schema is not provided, default `1000`
- `batch.size`: reading batch size, default `1000`
- `fill.cache`: whether the query should store the data it reads in the RocksDB block cache (`true`|`false`)
- `batchSize`: reading batch size, default `10000`
- `sampleSize`: sample size prefetched for schema inference, only used if read schema is not provided, default `1000`
- `fillBlockCache`: whether the query should store the data it reads in the RocksDB block cache (`true`|`false`), default `false`
- `stream`: whether the query should be executed lazily, default `true`
- `mode`: allows a mode for dealing with corrupt records during parsing:
- `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a field configured by
Expand Down Expand Up @@ -211,24 +211,24 @@ according to the related target collection definition and is different from the
### Write Configuration

- `table`: target ArangoDB collection name (required)
- `batchSize`: writing batch size, default `10000`
- `table.shards`: number of shards of the created collection (in case of SaveMode `Append` or `Overwrite`)
- `table.type`: type (`document`|`edge`) of the created collection (in case of SaveMode `Append` or `Overwrite`)
- `batch.size`: writing batch size, default `1000`
- `wait.sync`: whether to wait until the documents have been synced to disk (`true`|`false`)
- `confirm.truncate`: confirm to truncate table when using `SaveMode.Overwrite` mode, default `false`
- `overwrite.mode`: configures the behavior in case a document with the specified `_key` value exists already
- `table.type`: type (`document`|`edge`) of the created collection (in case of SaveMode `Append` or `Overwrite`), default `document`
- `waitForSync`: whether to wait until the documents have been synced to disk (`true`|`false`), default `false`
- `confirmTruncate`: confirm to truncate table when using save mode `Overwrite` mode, default `false`
- `overwriteMode`: configures the behavior in case a document with the specified `_key` value exists already
- `ignore`: it will not be written
- `replace`: it will be overwritten with the specified document value
- `update`: it will be patched (partially updated) with the specified document value. The overwrite mode can be
further controlled via the `keep.null` and `merge.objects` parameter. `keep.null` will also be automatically set to
further controlled via the `keepNull` and `mergeObjects` parameter. `keepNull` will also be automatically set to
`true`, so that null values are kept in the saved documents and not used to remove existing document fields (as for
default ArangoDB upsert behavior).
- `conflict`: return a unique constraint violation error so that the insert operation fails
- `merge.objects`: in case `overwrite.mode` is set to `update`, controls whether objects (not arrays) will be merged.
- `true`: objects will be merged
- `conflict` (default): return a unique constraint violation error so that the insert operation fails
- `mergeObjects`: in case `overwriteMode` is set to `update`, controls whether objects (not arrays) will be merged.
- `true` (default): objects will be merged
- `false`: existing document fields will be overwritten
- `keep.null`: in case `overwrite.mode` is set to `update`
- `true`: `null` values are saved within the document (default)
- `keepNull`: in case `overwriteMode` is set to `update`
- `true` (default): `null` values are saved within the document (default)
- `false`: `null` values are used to delete corresponding existing attributes

### SaveMode
Expand All @@ -239,18 +239,18 @@ already exists.
Spark 2.4 implementation supports all save modes with the following semantics:
- `Append`: the target collection is created if it does not exist
- `Overwrite`: the target collection is created if it does not exist, it is truncated otherwise. Use in combination with
`confirm.truncate` write configuration parameter.
`confirmTruncate` write configuration parameter.
- `ErrorIfExists`: the target collection is created if it does not exist, an `AnalysisException` is thrown otherwise
- `Ignore`: the target collection is created if it does not exist, no write is performed otherwise

Spark 3.1 implementation supports:
- `Append`: the target collection is created if it does not exist
- `Overwrite`: the target collection is created if it does not exist, it is truncated otherwise. Use in combination with
`confirm.truncate` write configuration parameter.
`confirmTruncate` write configuration parameter.

`SaveMode.ErrorIfExists` and `SaveMode.Ignore` behave the same as `SaveMode.Append`.
In Spark 3.1, save modes `ErrorIfExists` and `Ignore` behave the same as `Append`.

Use `overwrite.mode` write configuration parameter to specify the documents overwrite behavior (in case a document with
Use `overwriteMode` write configuration parameter to specify the documents overwrite behavior (in case a document with
the same `_key` already exists).


Expand All @@ -260,34 +260,34 @@ The data of each partition is saved in batches using ArangoDB API for inserting
([create multiple documents](https://www.arangodb.com/docs/stable/http/document-working-with-documents.html#create-multiple-documents)).
This operation is not atomic, therefore some documents could be successfully written to the database, while others could
fail. To makes the job more resilient to temporary errors (i.e. connectivity problems), in case of failure the request
will be retried (with another coordinator) if the configured `overwrite.mode` allows for idempotent requests, namely:
will be retried (with another coordinator) if the configured `overwriteMode` allows for idempotent requests, namely:
- `replace`
- `ignore`
- `update` with `keep.null=true`

These configurations of `overwrite.mode` would also be compatible with speculative execution of tasks.
These configurations of `overwriteMode` would also be compatible with speculative execution of tasks.

A failing batch-saving request is retried at most once for every coordinator. After that, if still failing, the write
task for the related partition is aborted. According to the Spark configuration, the task could be retried and
rescheduled on a different executor, if the `overwrite.mode` allows for idempotent requests (as above).
rescheduled on a different executor, if the `overwriteMode` allows for idempotent requests (as above).

If a task ultimately fails and is aborted, the entire write job will be aborted as well. Depending on the `SaveMode`
configuration, the following cleanup operations will be performed:
- `SaveMode.Append`: no cleanup is performed and the underlying data source may require manual cleanup.
- `Append`: no cleanup is performed and the underlying data source may require manual cleanup.
`DataWriteAbortException` is thrown.
- `SaveMode.Overwrite`: the target collection will be truncated
- `SaveMode.ErrorIfExists`: the target collection will be dropped
- `SaveMode.Ignore`: if the collection did not exist before it will be dropped, nothing otherwise
- `Overwrite`: the target collection will be truncated
- `ErrorIfExists`: the target collection will be dropped
- `Ignore`: if the collection did not exist before it will be dropped, nothing otherwise


### Write Limitations

- Batch writes are not performed atomically, so in some cases (i.e. in case of `overwrite.mode: conflict`) some
documents in the batch may be written and some others may return an exception (i.e. due to conflicting key).
- Writing records with `_key` attribute is only allowed on collections sharded by `_key`.
- In case of `SaveMode.Append`, failed jobs cannot be rolled back and the underlying data source may require manual
- In case of save mode `Append`, failed jobs cannot be rolled back and the underlying data source may require manual
cleanup.
- Speculative execution of tasks would only work for idempotent `overwrite.mode` configurations
- Speculative execution of tasks would only work for idempotent `overwriteMode` configurations
(see [Write Resiliency](#write-resiliency)).


Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.spark.sql.arangodb.commons

import com.arangodb.entity.ErrorEntity
import com.arangodb.internal.{ArangoRequestParam, ArangoResponseField}
import com.arangodb.internal.util.ArangoSerializationFactory.Serializer
import com.arangodb.internal.{ArangoRequestParam, ArangoResponseField}
import com.arangodb.mapping.ArangoJack
import com.arangodb.model.{AqlQueryOptions, CollectionCreateOptions, OverwriteMode}
import com.arangodb.model.{AqlQueryOptions, CollectionCreateOptions}
import com.arangodb.velocypack.VPackSlice
import com.arangodb.velocystream.{Request, RequestType}
import com.arangodb.{ArangoCursor, ArangoDB, ArangoDBException}
Expand All @@ -17,12 +17,13 @@ import org.apache.spark.sql.types.StructType

import scala.collection.JavaConverters.mapAsJavaMapConverter

class ArangoClient(options: ArangoOptions) extends Logging {
class ArangoClient(options: ArangoDBConf) extends Logging {

private def aqlOptions(): AqlQueryOptions = {
val opt = new AqlQueryOptions().stream(options.readOptions.stream)
options.readOptions.fillBlockCache.foreach(opt.fillBlockCache(_))
options.readOptions.batchSize.foreach(opt.batchSize(_))
val opt = new AqlQueryOptions()
.stream(options.readOptions.stream)
.fillBlockCache(options.readOptions.fillBlockCache)
.batchSize(options.readOptions.batchSize)
opt
}

Expand Down Expand Up @@ -105,8 +106,8 @@ class ArangoClient(options: ArangoOptions) extends Logging {

def createCollection(): Unit = {
val opts = new CollectionCreateOptions()
options.writeOptions.numberOfShards.foreach(opts.numberOfShards(_))
options.writeOptions.collectionType.foreach(ct => opts.`type`(ct.get()))
.numberOfShards(options.writeOptions.numberOfShards)
.`type`(options.writeOptions.collectionType)

arangoDB
.db(options.writeOptions.db)
Expand All @@ -131,14 +132,10 @@ class ArangoClient(options: ArangoOptions) extends Logging {
s"/_api/document/${options.writeOptions.collection}")

request.putQueryParam("silent", true)
options.writeOptions.waitForSync.foreach(request.putQueryParam("waitForSync", _))
options.writeOptions.overwriteMode.foreach(it => {
request.putQueryParam("overwriteMode", it)
if (it == OverwriteMode.update) {
request.putQueryParam("keepNull", options.writeOptions.keepNull)
options.writeOptions.mergeObjects.foreach(request.putQueryParam("mergeObjects", _))
}
})
request.putQueryParam("waitForSync", options.writeOptions.waitForSync)
request.putQueryParam("overwriteMode", options.writeOptions.overwriteMode.getValue)
request.putQueryParam("keepNull", options.writeOptions.keepNull)
request.putQueryParam("mergeObjects", options.writeOptions.mergeObjects)

request.setBody(data)
val response = arangoDB.execute(request)
Expand All @@ -162,9 +159,9 @@ class ArangoClient(options: ArangoOptions) extends Logging {

object ArangoClient {

def apply(options: ArangoOptions): ArangoClient = new ArangoClient(options)
def apply(options: ArangoDBConf): ArangoClient = new ArangoClient(options)

def getCollectionShardIds(options: ArangoOptions): Array[String] = {
def getCollectionShardIds(options: ArangoDBConf): Array[String] = {
try {
val client = ArangoClient(options).arangoDB
val res = client.execute(new Request(
Expand All @@ -183,7 +180,7 @@ object ArangoClient {
}
}

def acquireHostList(options: ArangoOptions): Iterable[String] = {
def acquireHostList(options: ArangoDBConf): Iterable[String] = {
val client = ArangoClient(options).arangoDB
val response = client.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints"))
val field = response.getBody.get("endpoints")
Expand Down
Loading