Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,25 @@ jobs:
- spark-version: 3.2
scala-version: 2.13
spark-full-version: 3.2.4
- spark-version: 3.3
scala-version: 2.12
spark-full-version: 3.3.0
- spark-version: 3.3
scala-version: 2.13
spark-full-version: 3.3.0
- spark-version: 3.3
scala-version: 2.12
spark-full-version: 3.3.1
- spark-version: 3.3
scala-version: 2.13
spark-full-version: 3.3.1
- spark-version: 3.3
scala-version: 2.12
spark-full-version: 3.3.2
- spark-version: 3.3
scala-version: 2.13
spark-full-version: 3.3.2

steps:
- uses: actions/checkout@v2
- uses: actions/setup-java@v2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@ package org.apache.spark.sql.arangodb.commons

import com.arangodb.{ArangoCursor, ArangoDB, ArangoDBException, Request}
import com.arangodb.entity.ErrorEntity
import com.arangodb.internal.serde.InternalSerdeProvider
import com.arangodb.internal.serde.{InternalSerde, InternalSerdeProvider}
import com.arangodb.model.{AqlQueryOptions, CollectionCreateOptions}
import com.arangodb.serde.ArangoSerde
import com.arangodb.util.{RawBytes, RawJson}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.arangodb.commons.exceptions.ArangoDBMultiException
Expand Down Expand Up @@ -38,7 +37,7 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
.build()
}

lazy val serde: ArangoSerde = arangoDB.getSerde
lazy val serde: InternalSerde = arangoDB.getSerde

def shutdown(): Unit = {
logDebug("closing db client")
Expand Down Expand Up @@ -174,7 +173,6 @@ class ArangoClient(options: ArangoDBConf) extends Logging {
// request.putQueryParam("silent", true)

val response = arangoDB.execute(request, classOf[RawBytes])
val serde = arangoDB.getSerde

import scala.collection.JavaConverters.asScalaIteratorConverter
val errors = serde.parse(response.getBody.get).iterator().asScala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ object ArangoDBConf {
* @param comment Additional info regarding to the removed config. For example,
* reasons of config deprecation, what users should use instead of it.
*/
case class DeprecatedConfig(key: String, version: String, comment: String)
final case class DeprecatedConfig(key: String, version: String, comment: String)

/**
* Maps deprecated Spark ArangoDB config keys to information about the deprecation.
Expand All @@ -320,7 +320,7 @@ object ArangoDBConf {
* users that they set non-default value to an already removed config.
* @param comment Additional info regarding to the removed config.
*/
case class RemovedConfig(key: String, version: String, defaultValue: String, comment: String)
final case class RemovedConfig(key: String, version: String, defaultValue: String, comment: String)

/**
* The map contains info about removed Spark ArangoDB configs. Keys are Spark ArangoDB config names,
Expand Down Expand Up @@ -363,7 +363,7 @@ class ArangoDBConf(opts: Map[String, String]) extends Serializable with Logging

/** Return the value of Spark ArangoDB configuration property for the given key. */
@throws[NoSuchElementException]("if key is not set")
def getConfString(key: String): String = settings.get(key).getOrElse(throw new NoSuchElementException(key))
def getConfString(key: String): String = settings.getOrElse(key, throw new NoSuchElementException(key))

/**
* Return the value of Spark ArangoDB configuration property for the given key. If the key is not set
Expand All @@ -389,7 +389,7 @@ class ArangoDBConf(opts: Map[String, String]) extends Serializable with Logging
* Return the `string` value of Spark ArangoDB configuration property for the given key. If the key is
* not set, return `defaultValue`.
*/
def getConfString(key: String, defaultValue: String): String = settings.get(key).getOrElse(defaultValue)
def getConfString(key: String, defaultValue: String): String = settings.getOrElse(key, defaultValue)

/**
* Return all the configuration properties that have been set (i.e. not the default).
Expand All @@ -403,7 +403,7 @@ class ArangoDBConf(opts: Map[String, String]) extends Serializable with Logging
*/
def getAllDefinedConfigs: Seq[(String, String, String)] =
confEntries.values.filter(_.isPublic).map { entry =>
val displayValue = settings.get(entry.key).getOrElse(entry.defaultValueString)
val displayValue = settings.getOrElse(entry.key, entry.defaultValueString)
(entry.key, displayValue, entry.doc)
}.toSeq

Expand Down
79 changes: 79 additions & 0 deletions arangodb-spark-datasource-3.3/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.4.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arangodb-spark-datasource-3.3_${scala.compat.version}</artifactId>

<name>arangodb-spark-datasource-3.3</name>
<description>ArangoDB Datasource for Apache Spark 3.3</description>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>

<developers>
<developer>
<name>Michele Rastelli</name>
<url>https://github.com/rashtao</url>
</developer>
</developers>

<scm>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>
</scm>

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<sonar.coverage.jacoco.xmlReportPaths>../integration-tests/target/site/jacoco-aggregate/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<sonar.coverage.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.coverage.exclusions>
<sonar.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.exclusions>
<scalastyle.skip>false</scalastyle.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-commons-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<skipNexusStagingDeployMojo>false</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoGeneratorProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoParserProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.arangodb.spark.DefaultSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.arangodb.spark

import org.apache.spark.sql.arangodb.commons.{ArangoClient, ArangoDBConf}
import org.apache.spark.sql.arangodb.datasource.ArangoTable
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util

class DefaultSource extends TableProvider with DataSourceRegister {

private def extractOptions(options: util.Map[String, String]): ArangoDBConf = {
val opts: ArangoDBConf = ArangoDBConf(options)
if (opts.driverOptions.acquireHostList) {
val hosts = ArangoClient.acquireHostList(opts)
opts.updated(ArangoDBConf.ENDPOINTS, hosts.mkString(","))
} else {
opts
}
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema()

private def getTable(options: CaseInsensitiveStringMap): Table =
getTable(None, options.asCaseSensitiveMap()) // scalastyle:ignore null

override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table =
getTable(Option(schema), properties)

override def supportsExternalMetadata(): Boolean = true

override def shortName(): String = "arangodb"

private def getTable(schema: Option[StructType], properties: util.Map[String, String]) =
new ArangoTable(schema, extractOptions(properties))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.spark.sql.arangodb.datasource

import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ArangoUtils}
import org.apache.spark.sql.arangodb.datasource.reader.ArangoScanBuilder
import org.apache.spark.sql.arangodb.datasource.writer.ArangoWriterBuilder
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import scala.collection.JavaConverters.setAsJavaSetConverter

class ArangoTable(private var schemaOpt: Option[StructType], options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite {
private lazy val tableSchema = schemaOpt.getOrElse(ArangoUtils.inferSchema(options))

override def name(): String = this.getClass.toString

override def schema(): StructType = tableSchema

override def capabilities(): util.Set[TableCapability] = Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
// TableCapability.STREAMING_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA,
TableCapability.TRUNCATE
// TableCapability.OVERWRITE_BY_FILTER,
// TableCapability.OVERWRITE_DYNAMIC,
).asJava

override def newScanBuilder(scanOptions: CaseInsensitiveStringMap): ScanBuilder =
new ArangoScanBuilder(options.updated(ArangoDBConf(scanOptions)), schema())

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new ArangoWriterBuilder(info.schema(), options.updated(ArangoDBConf(info.options())))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.spark.sql.arangodb.datasource.mapping

import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
import com.fasterxml.jackson.core.JsonFactoryBuilder
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoGenerator, ArangoGeneratorProvider}
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonGenerator}
import org.apache.spark.sql.types.{DataType, StructType}

import java.io.OutputStream

abstract sealed class ArangoGeneratorImpl(
schema: DataType,
writer: OutputStream,
options: JSONOptions)
extends JacksonGenerator(
schema,
options.buildJsonFactory().createGenerator(writer),
options) with ArangoGenerator

class ArangoGeneratorProviderImpl extends ArangoGeneratorProvider {
override def of(
contentType: ContentType,
schema: StructType,
outputStream: OutputStream,
conf: ArangoDBConf
): ArangoGeneratorImpl = contentType match {
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream, conf)
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream, conf)
case _ => throw new IllegalArgumentException
}
}

class JsonArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new JsonFactoryBuilder().build(), conf)
)

class VPackArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new VPackFactoryBuilder().build(), conf)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.apache.spark.sql.arangodb.datasource.mapping

import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
import com.fasterxml.jackson.core.json.JsonReadFeature
import com.fasterxml.jackson.core.{JsonFactory, JsonFactoryBuilder}
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoParser, ArangoParserProvider, MappingUtils}
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonParser}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String

abstract sealed class ArangoParserImpl(
schema: DataType,
options: JSONOptions,
recordLiteral: Array[Byte] => UTF8String)
extends JacksonParser(schema, options) with ArangoParser {
override def parse(data: Array[Byte]): Iterable[InternalRow] = super.parse(
data,
(jsonFactory: JsonFactory, record: Array[Byte]) => jsonFactory.createParser(record),
recordLiteral
)
}

class ArangoParserProviderImpl extends ArangoParserProvider {
override def of(contentType: ContentType, schema: DataType, conf: ArangoDBConf): ArangoParserImpl = contentType match {
case ContentType.JSON => new JsonArangoParser(schema, conf)
case ContentType.VPACK => new VPackArangoParser(schema, conf)
case _ => throw new IllegalArgumentException
}
}

class JsonArangoParser(schema: DataType, conf: ArangoDBConf)
extends ArangoParserImpl(
schema,
createOptions(new JsonFactoryBuilder()
.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS, true)
.build(), conf),
(bytes: Array[Byte]) => UTF8String.fromBytes(bytes)
)

class VPackArangoParser(schema: DataType, conf: ArangoDBConf)
extends ArangoParserImpl(
schema,
createOptions(new VPackFactoryBuilder().build(), conf),
(bytes: Array[Byte]) => UTF8String.fromString(MappingUtils.vpackToJson(bytes))
)
Loading