Skip to content

Commit bc48ab1

Browse files
committed
Spark 3.3
1 parent 48eda7f commit bc48ab1

31 files changed

+2656
-18
lines changed

.github/workflows/test.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,25 @@ jobs:
282282
- spark-version: 3.2
283283
scala-version: 2.13
284284
spark-full-version: 3.2.4
285+
- spark-version: 3.3
286+
scala-version: 2.12
287+
spark-full-version: 3.3.0
288+
- spark-version: 3.3
289+
scala-version: 2.13
290+
spark-full-version: 3.3.0
291+
- spark-version: 3.3
292+
scala-version: 2.12
293+
spark-full-version: 3.3.1
294+
- spark-version: 3.3
295+
scala-version: 2.13
296+
spark-full-version: 3.3.1
297+
- spark-version: 3.3
298+
scala-version: 2.12
299+
spark-full-version: 3.3.2
300+
- spark-version: 3.3
301+
scala-version: 2.13
302+
spark-full-version: 3.3.2
303+
285304
steps:
286305
- uses: actions/checkout@v2
287306
- uses: actions/setup-java@v2
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>arangodb-spark-datasource</artifactId>
7+
<groupId>com.arangodb</groupId>
8+
<version>1.4.3</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>arangodb-spark-datasource-3.3_${scala.compat.version}</artifactId>
13+
14+
<name>arangodb-spark-datasource-3.3</name>
15+
<description>ArangoDB Datasource for Apache Spark 3.3</description>
16+
<url>https://github.com/arangodb/arangodb-spark-datasource</url>
17+
18+
<developers>
19+
<developer>
20+
<name>Michele Rastelli</name>
21+
<url>https://github.com/rashtao</url>
22+
</developer>
23+
</developers>
24+
25+
<scm>
26+
<url>https://github.com/arangodb/arangodb-spark-datasource</url>
27+
</scm>
28+
29+
<properties>
30+
<maven.deploy.skip>false</maven.deploy.skip>
31+
<sonar.coverage.jacoco.xmlReportPaths>../integration-tests/target/site/jacoco-aggregate/jacoco.xml</sonar.coverage.jacoco.xmlReportPaths>
32+
<sonar.coverage.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.coverage.exclusions>
33+
<sonar.exclusions>src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/*</sonar.exclusions>
34+
<scalastyle.skip>false</scalastyle.skip>
35+
</properties>
36+
37+
<dependencies>
38+
<dependency>
39+
<groupId>com.arangodb</groupId>
40+
<artifactId>arangodb-spark-commons-${spark.compat.version}_${scala.compat.version}</artifactId>
41+
<version>${project.version}</version>
42+
</dependency>
43+
<dependency>
44+
<groupId>org.apache.httpcomponents</groupId>
45+
<artifactId>httpclient</artifactId>
46+
<version>4.5.13</version>
47+
</dependency>
48+
</dependencies>
49+
50+
<build>
51+
<plugins>
52+
<plugin>
53+
<artifactId>maven-assembly-plugin</artifactId>
54+
<configuration>
55+
<descriptorRefs>
56+
<descriptorRef>jar-with-dependencies</descriptorRef>
57+
</descriptorRefs>
58+
</configuration>
59+
<executions>
60+
<execution>
61+
<phase>package</phase>
62+
<goals>
63+
<goal>single</goal>
64+
</goals>
65+
</execution>
66+
</executions>
67+
</plugin>
68+
<plugin>
69+
<groupId>org.sonatype.plugins</groupId>
70+
<artifactId>nexus-staging-maven-plugin</artifactId>
71+
<extensions>true</extensions>
72+
<configuration>
73+
<skipNexusStagingDeployMojo>false</skipNexusStagingDeployMojo>
74+
</configuration>
75+
</plugin>
76+
</plugins>
77+
</build>
78+
79+
</project>
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.arangodb.datasource.mapping.ArangoGeneratorProviderImpl
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
org.apache.spark.sql.arangodb.datasource.mapping.ArangoParserProviderImpl
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.arangodb.spark.DefaultSource
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package com.arangodb.spark
2+
3+
import org.apache.spark.sql.arangodb.commons.{ArangoClient, ArangoDBConf}
4+
import org.apache.spark.sql.arangodb.datasource.ArangoTable
5+
import org.apache.spark.sql.connector.catalog.{Table, TableProvider}
6+
import org.apache.spark.sql.connector.expressions.Transform
7+
import org.apache.spark.sql.sources.DataSourceRegister
8+
import org.apache.spark.sql.types.StructType
9+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
10+
11+
import java.util
12+
13+
class DefaultSource extends TableProvider with DataSourceRegister {
14+
15+
private def extractOptions(options: util.Map[String, String]): ArangoDBConf = {
16+
val opts: ArangoDBConf = ArangoDBConf(options)
17+
if (opts.driverOptions.acquireHostList) {
18+
val hosts = ArangoClient.acquireHostList(opts)
19+
opts.updated(ArangoDBConf.ENDPOINTS, hosts.mkString(","))
20+
} else {
21+
opts
22+
}
23+
}
24+
25+
override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema()
26+
27+
private def getTable(options: CaseInsensitiveStringMap): Table =
28+
getTable(None, options.asCaseSensitiveMap()) // scalastyle:ignore null
29+
30+
override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table =
31+
getTable(Option(schema), properties)
32+
33+
override def supportsExternalMetadata(): Boolean = true
34+
35+
override def shortName(): String = "arangodb"
36+
37+
private def getTable(schema: Option[StructType], properties: util.Map[String, String]) =
38+
new ArangoTable(schema, extractOptions(properties))
39+
40+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.apache.spark.sql.arangodb.datasource
2+
3+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ArangoUtils}
4+
import org.apache.spark.sql.arangodb.datasource.reader.ArangoScanBuilder
5+
import org.apache.spark.sql.arangodb.datasource.writer.ArangoWriterBuilder
6+
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
7+
import org.apache.spark.sql.connector.read.ScanBuilder
8+
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
9+
import org.apache.spark.sql.types.StructType
10+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
11+
12+
import java.util
13+
import scala.collection.JavaConverters.setAsJavaSetConverter
14+
15+
class ArangoTable(private var schemaOpt: Option[StructType], options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite {
16+
private lazy val tableSchema = schemaOpt.getOrElse(ArangoUtils.inferSchema(options))
17+
18+
override def name(): String = this.getClass.toString
19+
20+
override def schema(): StructType = tableSchema
21+
22+
override def capabilities(): util.Set[TableCapability] = Set(
23+
TableCapability.BATCH_READ,
24+
TableCapability.BATCH_WRITE,
25+
// TableCapability.STREAMING_WRITE,
26+
TableCapability.ACCEPT_ANY_SCHEMA,
27+
TableCapability.TRUNCATE
28+
// TableCapability.OVERWRITE_BY_FILTER,
29+
// TableCapability.OVERWRITE_DYNAMIC,
30+
).asJava
31+
32+
override def newScanBuilder(scanOptions: CaseInsensitiveStringMap): ScanBuilder =
33+
new ArangoScanBuilder(options.updated(ArangoDBConf(scanOptions)), schema())
34+
35+
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder =
36+
new ArangoWriterBuilder(info.schema(), options.updated(ArangoDBConf(info.options())))
37+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package org.apache.spark.sql.arangodb.datasource.mapping
2+
3+
import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
4+
import com.fasterxml.jackson.core.JsonFactoryBuilder
5+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
6+
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoGenerator, ArangoGeneratorProvider}
7+
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonGenerator}
8+
import org.apache.spark.sql.types.{DataType, StructType}
9+
10+
import java.io.OutputStream
11+
12+
abstract sealed class ArangoGeneratorImpl(
13+
schema: DataType,
14+
writer: OutputStream,
15+
options: JSONOptions)
16+
extends JacksonGenerator(
17+
schema,
18+
options.buildJsonFactory().createGenerator(writer),
19+
options) with ArangoGenerator
20+
21+
class ArangoGeneratorProviderImpl extends ArangoGeneratorProvider {
22+
override def of(
23+
contentType: ContentType,
24+
schema: StructType,
25+
outputStream: OutputStream,
26+
conf: ArangoDBConf
27+
): ArangoGeneratorImpl = contentType match {
28+
case ContentType.JSON => new JsonArangoGenerator(schema, outputStream, conf)
29+
case ContentType.VPACK => new VPackArangoGenerator(schema, outputStream, conf)
30+
case _ => throw new IllegalArgumentException
31+
}
32+
}
33+
34+
class JsonArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
35+
extends ArangoGeneratorImpl(
36+
schema,
37+
outputStream,
38+
createOptions(new JsonFactoryBuilder().build(), conf)
39+
)
40+
41+
class VPackArangoGenerator(schema: StructType, outputStream: OutputStream, conf: ArangoDBConf)
42+
extends ArangoGeneratorImpl(
43+
schema,
44+
outputStream,
45+
createOptions(new VPackFactoryBuilder().build(), conf)
46+
)
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package org.apache.spark.sql.arangodb.datasource.mapping
2+
3+
import com.arangodb.jackson.dataformat.velocypack.VPackFactoryBuilder
4+
import com.fasterxml.jackson.core.json.JsonReadFeature
5+
import com.fasterxml.jackson.core.{JsonFactory, JsonFactoryBuilder}
6+
import org.apache.spark.sql.arangodb.commons.{ArangoDBConf, ContentType}
7+
import org.apache.spark.sql.arangodb.commons.mapping.{ArangoParser, ArangoParserProvider, MappingUtils}
8+
import org.apache.spark.sql.arangodb.datasource.mapping.json.{JSONOptions, JacksonParser}
9+
import org.apache.spark.sql.catalyst.InternalRow
10+
import org.apache.spark.sql.types.DataType
11+
import org.apache.spark.unsafe.types.UTF8String
12+
13+
abstract sealed class ArangoParserImpl(
14+
schema: DataType,
15+
options: JSONOptions,
16+
recordLiteral: Array[Byte] => UTF8String)
17+
extends JacksonParser(schema, options) with ArangoParser {
18+
override def parse(data: Array[Byte]): Iterable[InternalRow] = super.parse(
19+
data,
20+
(jsonFactory: JsonFactory, record: Array[Byte]) => jsonFactory.createParser(record),
21+
recordLiteral
22+
)
23+
}
24+
25+
class ArangoParserProviderImpl extends ArangoParserProvider {
26+
override def of(contentType: ContentType, schema: DataType, conf: ArangoDBConf): ArangoParserImpl = contentType match {
27+
case ContentType.JSON => new JsonArangoParser(schema, conf)
28+
case ContentType.VPACK => new VPackArangoParser(schema, conf)
29+
case _ => throw new IllegalArgumentException
30+
}
31+
}
32+
33+
class JsonArangoParser(schema: DataType, conf: ArangoDBConf)
34+
extends ArangoParserImpl(
35+
schema,
36+
createOptions(new JsonFactoryBuilder()
37+
.configure(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS, true)
38+
.build(), conf),
39+
(bytes: Array[Byte]) => UTF8String.fromBytes(bytes)
40+
)
41+
42+
class VPackArangoParser(schema: DataType, conf: ArangoDBConf)
43+
extends ArangoParserImpl(
44+
schema,
45+
createOptions(new VPackFactoryBuilder().build(), conf),
46+
(bytes: Array[Byte]) => UTF8String.fromString(MappingUtils.vpackToJson(bytes))
47+
)
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
// scalastyle:off
19+
20+
package org.apache.spark.sql.arangodb.datasource.mapping.json
21+
22+
import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
23+
import org.apache.hadoop.io.Text
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.unsafe.types.UTF8String
26+
import sun.nio.cs.StreamDecoder
27+
28+
import java.io.{ByteArrayInputStream, InputStream, InputStreamReader}
29+
import java.nio.channels.Channels
30+
import java.nio.charset.{Charset, StandardCharsets}
31+
32+
private[sql] object CreateJacksonParser extends Serializable {
33+
def string(jsonFactory: JsonFactory, record: String): JsonParser = {
34+
jsonFactory.createParser(record)
35+
}
36+
37+
def utf8String(jsonFactory: JsonFactory, record: UTF8String): JsonParser = {
38+
val bb = record.getByteBuffer
39+
assert(bb.hasArray)
40+
41+
val bain = new ByteArrayInputStream(
42+
bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
43+
44+
jsonFactory.createParser(new InputStreamReader(bain, StandardCharsets.UTF_8))
45+
}
46+
47+
def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
48+
jsonFactory.createParser(record.getBytes, 0, record.getLength)
49+
}
50+
51+
// Jackson parsers can be ranked according to their performance:
52+
// 1. Array based with actual encoding UTF-8 in the array. This is the fastest parser
53+
// but it doesn't allow to set encoding explicitly. Actual encoding is detected automatically
54+
// by checking leading bytes of the array.
55+
// 2. InputStream based with actual encoding UTF-8 in the stream. Encoding is detected
56+
// automatically by analyzing first bytes of the input stream.
57+
// 3. Reader based parser. This is the slowest parser used here but it allows to create
58+
// a reader with specific encoding.
59+
// The method creates a reader for an array with given encoding and sets size of internal
60+
// decoding buffer according to size of input array.
61+
private def getStreamDecoder(enc: String, in: Array[Byte], length: Int): StreamDecoder = {
62+
val bais = new ByteArrayInputStream(in, 0, length)
63+
val byteChannel = Channels.newChannel(bais)
64+
val decodingBufferSize = Math.min(length, 8192)
65+
val decoder = Charset.forName(enc).newDecoder()
66+
67+
StreamDecoder.forDecoder(byteChannel, decoder, decodingBufferSize)
68+
}
69+
70+
def text(enc: String, jsonFactory: JsonFactory, record: Text): JsonParser = {
71+
val sd = getStreamDecoder(enc, record.getBytes, record.getLength)
72+
jsonFactory.createParser(sd)
73+
}
74+
75+
def inputStream(jsonFactory: JsonFactory, is: InputStream): JsonParser = {
76+
jsonFactory.createParser(is)
77+
}
78+
79+
def inputStream(enc: String, jsonFactory: JsonFactory, is: InputStream): JsonParser = {
80+
jsonFactory.createParser(new InputStreamReader(is, enc))
81+
}
82+
83+
def internalRow(jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
84+
val ba = row.getBinary(0)
85+
86+
jsonFactory.createParser(ba, 0, ba.length)
87+
}
88+
89+
def internalRow(enc: String, jsonFactory: JsonFactory, row: InternalRow): JsonParser = {
90+
val binary = row.getBinary(0)
91+
val sd = getStreamDecoder(enc, binary, binary.length)
92+
93+
jsonFactory.createParser(sd)
94+
}
95+
}

0 commit comments

Comments
 (0)