Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .github/workflows/maven-deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@ jobs:

strategy:
fail-fast: false
matrix:
include:
- scala-version: 2.11
spark-version: 2.4
- scala-version: 2.12
spark-version: 2.4
- scala-version: 2.12
spark-version: 3.1
- scala-version: 2.12
spark-version: 3.2
- scala-version: 2.13
spark-version: 3.2
- scala-version: 2.12
spark-version: 3.3
- scala-version: 2.13
spark-version: 3.3
- scala-version: 2.12
spark-version: 3.4
- scala-version: 2.13
spark-version: 3.4

steps:
- uses: actions/checkout@v2
Expand Down
8 changes: 8 additions & 0 deletions .github/workflows/maven-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ jobs:
spark-version: 3.2
- scala-version: 2.13
spark-version: 3.2
- scala-version: 2.12
spark-version: 3.3
- scala-version: 2.13
spark-version: 3.3
- scala-version: 2.12
spark-version: 3.4
- scala-version: 2.13
spark-version: 3.4

steps:
- uses: actions/checkout@v2
Expand Down
35 changes: 33 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
- 2.4
- 3.1
- 3.2
- 3.3
- 3.4
topology:
- single
- cluster
Expand All @@ -53,12 +55,18 @@ jobs:
spark-version: 3.1
- scala-version: 2.11
spark-version: 3.2
- scala-version: 2.11
spark-version: 3.3
- scala-version: 2.11
spark-version: 3.4
- scala-version: 2.11
java-version: 11
- scala-version: 2.13
spark-version: 2.4
- scala-version: 2.13
spark-version: 3.1
- docker-img: docker.io/arangodb/arangodb:3.9.10
java-version: 8
- docker-img: docker.io/arangodb/arangodb:3.10.6
java-version: 8
- docker-img: docker.io/arangodb/arangodb:3.11.0
Expand Down Expand Up @@ -96,6 +104,8 @@ jobs:
- 2.4
- 3.1
- 3.2
- 3.3
- 3.4
topology:
- cluster
java-version:
Expand All @@ -107,6 +117,10 @@ jobs:
spark-version: 3.1
- scala-version: 2.11
spark-version: 3.2
- scala-version: 2.11
spark-version: 3.3
- scala-version: 2.11
spark-version: 3.4
- scala-version: 2.13
spark-version: 2.4
- scala-version: 2.13
Expand Down Expand Up @@ -140,10 +154,15 @@ jobs:
matrix:
python-version: [3.9]
scala-version: [2.12]
spark-version: [3.1, 3.2]
spark-version: [3.1, 3.2, 3.3, 3.4]
topology: [single, cluster]
java-version: [8, 11]
docker-img: ["docker.io/arangodb/arangodb:3.11.0"]
exclude:
- topology: cluster
java-version: 8
- topology: single
java-version: 11

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -191,6 +210,8 @@ jobs:
- 2.4
- 3.1
- 3.2
- 3.3
- 3.4
topology:
- single
java-version:
Expand All @@ -203,6 +224,10 @@ jobs:
spark-version: 3.1
- scala-version: 2.11
spark-version: 3.2
- scala-version: 2.11
spark-version: 3.3
- scala-version: 2.11
spark-version: 3.4
- scala-version: 2.13
spark-version: 2.4
- scala-version: 2.13
Expand Down Expand Up @@ -301,6 +326,12 @@ jobs:
- spark-version: 3.3
scala-version: 2.13
spark-full-version: 3.3.2
- spark-version: 3.4
scala-version: 2.12
spark-full-version: 3.4.0
- spark-version: 3.4
scala-version: 2.13
spark-full-version: 3.4.0

steps:
- uses: actions/checkout@v2
Expand Down Expand Up @@ -331,7 +362,7 @@ jobs:
scala-version:
- 2.12
spark-version:
- 3.2
- 3.4
topology:
- single
java-version:
Expand Down
79 changes: 79 additions & 0 deletions arangodb-spark-datasource-3.4/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>arangodb-spark-datasource</artifactId>
<groupId>com.arangodb</groupId>
<version>1.4.3</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>arangodb-spark-datasource-3.4_${scala.compat.version}</artifactId>

<name>arangodb-spark-datasource-3.4</name>
<description>ArangoDB Datasource for Apache Spark 3.4</description>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>

<developers>
<developer>
<name>Michele Rastelli</name>
<url>https://github.com/rashtao</url>
</developer>
</developers>

<scm>
<url>https://github.com/arangodb/arangodb-spark-datasource</url>
</scm>

<properties>
<maven.deploy.skip>false</maven.deploy.skip>
<sonar.coverage.jacoco.xmlReportPaths>../integration-tests/target/site/jacoco-aggregate/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
<sonar.coverage.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.coverage.exclusions>
<sonar.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.exclusions>
<scalastyle.skip>false</scalastyle.skip>
</properties>

<dependencies>
<dependency>
<groupId>com.arangodb</groupId>
<artifactId>arangodb-spark-commons-${spark.compat.version}_${scala.compat.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.13</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<skipNexusStagingDeployMojo>false</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoGeneratorProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
org.apache.spark.sql.arangodb.datasource.mapping.ArangoParserProviderImpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.arangodb.spark.DefaultSource
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.arangodb.spark

import org.apache.spark.sql.arangodb.commons.{ArangoClient, ArangoDBConf}
import org.apache.spark.sql.arangodb.datasource.ArangoTable
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util

class DefaultSource extends TableProvider with DataSourceRegister {

private def extractOptions(options: util.Map[String, String]): ArangoDBConf = {
val opts: ArangoDBConf = ArangoDBConf(options)
if (opts.driverOptions.acquireHostList) {
val hosts = ArangoClient.acquireHostList(opts)
opts.updated(ArangoDBConf.ENDPOINTS, hosts.mkString(","))
} else {
opts
}
}

override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema()

private def getTable(options: CaseInsensitiveStringMap): Table =
getTable(None, options.asCaseSensitiveMap()) // scalastyle:ignore null

override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table =
getTable(Option(schema), properties)

override def supportsExternalMetadata(): Boolean = true

override def shortName(): String = "arangodb"

private def getTable(schema: Option[StructType], properties: util.Map[String, String]) =
new ArangoTable(schema, extractOptions(properties))

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.apache.spark.sql.arangodb.datasource

import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ArangoUtils}
import org.apache.spark.sql.arangodb.datasource.reader.ArangoScanBuilder
import org.apache.spark.sql.arangodb.datasource.writer.ArangoWriterBuilder
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

import java.util
import scala.collection.JavaConverters.setAsJavaSetConverter

class ArangoTable(private var schemaOpt: Option[StructType], options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite {
private lazy val tableSchema = schemaOpt.getOrElse(ArangoUtils.inferSchema(options))

override def name(): String = this.getClass.toString

override def schema(): StructType = tableSchema

override def capabilities(): util.Set[TableCapability] = Set(
TableCapability.BATCH_READ,
TableCapability.BATCH_WRITE,
// TableCapability.STREAMING_WRITE,
TableCapability.ACCEPT_ANY_SCHEMA,
TableCapability.TRUNCATE
// TableCapability.OVERWRITE_BY_FILTER,
// TableCapability.OVERWRITE_DYNAMIC,
).asJava

override def newScanBuilder(scanOptions: CaseInsensitiveStringMap): ScanBuilder =
new ArangoScanBuilder(options.updated(ArangoDBConf(scanOptions)), schema())

override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
new ArangoWriterBuilder(info.schema(), options.updated(ArangoDBConf(info.options())))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.apache.spark.sql.arangodb.datasource.mapping

import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
import com.fasterxml.jackson.core.JsonFactoryBuilder
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoGenerator, ArangoGeneratorProvider}
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonGenerator}
import org.apache.spark.sql.types.{DataType, StructType}

import java.io.OutputStream

abstract sealed class ArangoGeneratorImpl(
schema: DataType,
writer: OutputStream,
options: JSONOptions)
extends JacksonGenerator(
schema,
options.buildJsonFactory().createGenerator(writer),
options) with ArangoGenerator

class ArangoGeneratorProviderImpl extends ArangoGeneratorProvider {
override def of(
contentType: ContentType,
schema: StructType,
outputStream: OutputStream,
conf: ArangoDBConf
): ArangoGeneratorImpl = contentType match {
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream, conf)
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream, conf)
case _ => throw new IllegalArgumentException
}
}

class JsonArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new JsonFactoryBuilder().build(), conf)
)

class VPackArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
extends ArangoGeneratorImpl(
schema,
outputStream,
createOptions(new VPackFactoryBuilder().build(), conf)
)
Loading