From b2e12fe4b24e5df671d3c5f4d3ff4c8f1ba72ccd Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 12:51:36 +0100 Subject: [PATCH 01/17] fixed ComparingUnrelatedTypes --- .../apache/spark/sql/arangodb/datasource/BaseSparkTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala index 6e52835c..ff8d417e 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/datasource/BaseSparkTest.scala @@ -186,7 +186,7 @@ object BaseSparkTest { arangoDB.getUser(user) } catch { case e: ArangoDBException => - if (e.getResponseCode == 404 && e.getErrorNum == 1703) + if (e.getResponseCode.toInt == 404 && e.getErrorNum.toInt == 1703) arangoDB.createUser(user, password) else throw e } From 3195729391a34ec3ee66cfcbc8c868a254627a19 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 12:57:15 +0100 Subject: [PATCH 02/17] fixed FinalModifierOnCaseClass --- .../apache/spark/sql/arangodb/examples/DataTypesExample.scala | 2 +- .../apache/spark/sql/arangodb/examples/PushdownExample.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/DataTypesExample.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/DataTypesExample.scala index 94151b86..ea23433b 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/DataTypesExample.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/DataTypesExample.scala @@ -7,7 +7,7 @@ import java.time.{LocalDate, LocalDateTime} object DataTypesExample { - case class Order( + final case class Order( userId: String, price: Double, shipped: Boolean, diff --git a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/PushdownExample.scala b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/PushdownExample.scala index bacbec02..b35fcc71 100644 --- a/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/PushdownExample.scala +++ b/integration-tests/src/test/scala/org/apache/spark/sql/arangodb/examples/PushdownExample.scala @@ -5,7 +5,7 @@ import org.apache.spark.sql.{Dataset, Encoders, SparkSession} object PushdownExample { - case class User(name: String, age: Int) + final case class User(name: String, age: Int) def main(args: Array[String]): Unit = { prepareDB() From 7b5fd919876fcd0cbfbd962c867d23f508db9c0c Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 14:05:27 +0100 Subject: [PATCH 03/17] fixed EmptyInterpolatedString --- .../sql/arangodb/commons/filter/EqualToFilterTest.scala | 6 +++--- .../sql/arangodb/commons/filter/GreaterThanFilterTest.scala | 6 +++--- .../commons/filter/GreaterThanOrEqualFilterTest.scala | 6 +++--- .../spark/sql/arangodb/commons/filter/IsNullTest.scala | 4 ++-- .../sql/arangodb/commons/filter/LessThanFilterTest.scala | 6 +++--- .../arangodb/commons/filter/LessThanOrEqualFilterTest.scala | 6 +++--- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/EqualToFilterTest.scala b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/EqualToFilterTest.scala index 7d80b091..94eabab9 100644 --- a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/EqualToFilterTest.scala +++ b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/EqualToFilterTest.scala @@ -133,7 +133,7 @@ class EqualToFilterTest { val value = Seq("a", "b", "c") val filter = PushableFilter(EqualTo(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`stringArray` == ["a","b","c"]""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`stringArray` == ["a","b","c"]""") } @Test @@ -154,7 +154,7 @@ class EqualToFilterTest { ) val filter = PushableFilter(EqualTo(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`intMap` == {"a":1,"b":2,"c":3}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`intMap` == {"a":1,"b":2,"c":3}""") } @Test @@ -181,7 +181,7 @@ class EqualToFilterTest { ) val filter = PushableFilter(EqualTo(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`struct` == {"a":"str","b":22}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`struct` == {"a":"str","b":22}""") } @Test diff --git a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanFilterTest.scala b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanFilterTest.scala index bd1e9f42..38d863e0 100644 --- a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanFilterTest.scala +++ b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanFilterTest.scala @@ -126,7 +126,7 @@ class GreaterThanFilterTest { val value = Seq("a", "b", "c") val filter = PushableFilter(GreaterThan(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`array` > ["a","b","c"]""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`array` > ["a","b","c"]""") } @Test @@ -135,7 +135,7 @@ class GreaterThanFilterTest { val value = Map("a" -> 1, "b" -> 2, "c" -> 3) val filter = PushableFilter(GreaterThan(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`intMap` > {"a":1,"b":2,"c":3}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`intMap` > {"a":1,"b":2,"c":3}""") } @Test @@ -150,7 +150,7 @@ class GreaterThanFilterTest { ) val filter = PushableFilter(GreaterThan(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`struct` > {"a":"str","b":22}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`struct` > {"a":"str","b":22}""") } } diff --git a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanOrEqualFilterTest.scala b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanOrEqualFilterTest.scala index e412df3f..7961f334 100644 --- a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanOrEqualFilterTest.scala +++ b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/GreaterThanOrEqualFilterTest.scala @@ -117,7 +117,7 @@ class GreaterThanOrEqualFilterTest { val value = Seq("a", "b", "c") val filter = PushableFilter(GreaterThanOrEqual(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`array` >= ["a","b","c"]""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`array` >= ["a","b","c"]""") } @Test @@ -126,7 +126,7 @@ class GreaterThanOrEqualFilterTest { val value = Map("a" -> 1, "b" -> 2, "c" -> 3) val filter = PushableFilter(GreaterThanOrEqual(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`intMap` >= {"a":1,"b":2,"c":3}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`intMap` >= {"a":1,"b":2,"c":3}""") } @Test @@ -141,7 +141,7 @@ class GreaterThanOrEqualFilterTest { ) val filter = PushableFilter(GreaterThanOrEqual(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`struct` >= {"a":"str","b":22}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`struct` >= {"a":"str","b":22}""") } } diff --git a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/IsNullTest.scala b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/IsNullTest.scala index 32f3ca50..d509a525 100644 --- a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/IsNullTest.scala +++ b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/IsNullTest.scala @@ -16,14 +16,14 @@ class IsNullTest { def isNull(): Unit = { val isNullFilter = PushableFilter(IsNull("a.b"), schema) assertThat(isNullFilter.support()).isEqualTo(FilterSupport.FULL) - assertThat(isNullFilter.aql("d")).isEqualTo(s"""`d`.`a`.`b` == null""") + assertThat(isNullFilter.aql("d")).isEqualTo("""`d`.`a`.`b` == null""") } @Test def isNotNull(): Unit = { val isNotNullFilter = PushableFilter(IsNotNull("a.b"), schema) assertThat(isNotNullFilter.support()).isEqualTo(FilterSupport.FULL) - assertThat(isNotNullFilter.aql("d")).isEqualTo(s"""`d`.`a`.`b` != null""") + assertThat(isNotNullFilter.aql("d")).isEqualTo("""`d`.`a`.`b` != null""") } } diff --git a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanFilterTest.scala b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanFilterTest.scala index 04f0e84d..fd11179c 100644 --- a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanFilterTest.scala +++ b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanFilterTest.scala @@ -126,7 +126,7 @@ class LessThanFilterTest { val value = Seq("a", "b", "c") val filter = PushableFilter(LessThan(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`array` < ["a","b","c"]""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`array` < ["a","b","c"]""") } @Test @@ -135,7 +135,7 @@ class LessThanFilterTest { val value = Map("a" -> 1, "b" -> 2, "c" -> 3) val filter = PushableFilter(LessThan(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`intMap` < {"a":1,"b":2,"c":3}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`intMap` < {"a":1,"b":2,"c":3}""") } @Test @@ -150,7 +150,7 @@ class LessThanFilterTest { ) val filter = PushableFilter(LessThan(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`struct` < {"a":"str","b":22}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`struct` < {"a":"str","b":22}""") } } diff --git a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanOrEqualFilterTest.scala b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanOrEqualFilterTest.scala index 64acffd5..8f14f15d 100644 --- a/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanOrEqualFilterTest.scala +++ b/arangodb-spark-commons/src/test/scala/org/apache/spark/sql/arangodb/commons/filter/LessThanOrEqualFilterTest.scala @@ -126,7 +126,7 @@ class LessThanOrEqualFilterTest { val value = Seq("a", "b", "c") val filter = PushableFilter(LessThanOrEqual(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`array` <= ["a","b","c"]""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`array` <= ["a","b","c"]""") } @Test @@ -135,7 +135,7 @@ class LessThanOrEqualFilterTest { val value = Map("a" -> 1, "b" -> 2, "c" -> 3) val filter = PushableFilter(LessThanOrEqual(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`intMap` <= {"a":1,"b":2,"c":3}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`intMap` <= {"a":1,"b":2,"c":3}""") } @Test @@ -150,7 +150,7 @@ class LessThanOrEqualFilterTest { ) val filter = PushableFilter(LessThanOrEqual(field, value), schema: StructType) assertThat(filter.support()).isEqualTo(FilterSupport.FULL) - assertThat(filter.aql("d")).isEqualTo(s"""`d`.`struct` <= {"a":"str","b":22}""") + assertThat(filter.aql("d")).isEqualTo("""`d`.`struct` <= {"a":"str","b":22}""") } } From adedad310b86eba0a6d31aa29f28f6f7a12b0a1a Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 15:36:31 +0100 Subject: [PATCH 04/17] disabled scapegoat for tests --- pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/pom.xml b/pom.xml index 05df196f..6c5a66ef 100644 --- a/pom.xml +++ b/pom.xml @@ -280,6 +280,7 @@ -target:jvm-1.8 -P:scapegoat:dataDir:${project.build.directory}/scapegoat -P:scapegoat:overrideLevels:all=Warning + -P:scapegoat:ignoredFiles:.*/src/test/scala/.* From a8121088becc152ae744e30603e8c1018a31062a Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 15:47:21 +0100 Subject: [PATCH 05/17] disabled HeaderMatchesChecker --- lib/scalastyle_config.xml | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/lib/scalastyle_config.xml b/lib/scalastyle_config.xml index 0b67cca0..635b5717 100644 --- a/lib/scalastyle_config.xml +++ b/lib/scalastyle_config.xml @@ -6,25 +6,6 @@ - - - - - From 5facd82a5ff7e2f79d41b0b06ad9df79514d5bb8 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 15:55:24 +0100 Subject: [PATCH 06/17] fixed: If block needs braces --- .../sql/arangodb/commons/ArangoClient.scala | 7 +++-- .../sql/arangodb/commons/ArangoDBConf.scala | 10 ++++--- .../sql/arangodb/commons/ArangoUtils.scala | 5 ++-- .../commons/filter/PushableFilter.scala | 27 +++++++++++++------ 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index 4d519986..677710e5 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -175,8 +175,11 @@ object ArangoClient { case e: ArangoDBException => // single server < 3.8 returns Response: 500, Error: 4 - internal error // single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster - if (e.getErrorNum == 9 || e.getErrorNum == 4) Array("") - else throw e + if (e.getErrorNum == 9 || e.getErrorNum == 4) { + Array("") + } else { + throw e + } } } diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala index 5c6c4557..7bfbaf7c 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala @@ -480,9 +480,13 @@ class ArangoDBReadConf(opts: Map[String, String]) extends ArangoDBConf(opts) { val columnNameOfCorruptRecord: String = getConf(columnNameOfCorruptRecordConf).getOrElse("") val readMode: ReadMode = - if (query.isDefined) ReadMode.Query - else if (collection.isDefined) ReadMode.Collection - else throw new IllegalArgumentException("Either collection or query must be defined") + if (query.isDefined) { + ReadMode.Query + } else if (collection.isDefined) { + ReadMode.Collection + } else { + throw new IllegalArgumentException("Either collection or query must be defined") + } } diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoUtils.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoUtils.scala index 073520f5..9addacc0 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoUtils.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoUtils.scala @@ -22,10 +22,11 @@ object ArangoUtils { .json(spark.createDataset(sampleEntries)(Encoders.STRING)) .schema - if (options.readOptions.columnNameOfCorruptRecord.isEmpty) + if (options.readOptions.columnNameOfCorruptRecord.isEmpty) { schema - else + } else { schema.add(StructField(options.readOptions.columnNameOfCorruptRecord, StringType, nullable = true)) + } } } diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala index 3d0bc677..616b2ddb 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala @@ -48,9 +48,13 @@ private class OrFilter(parts: PushableFilter*) extends PushableFilter { * +---------++---------+---------+------+ */ override def support(): FilterSupport = - if (parts.exists(_.support == FilterSupport.NONE)) FilterSupport.NONE - else if (parts.forall(_.support == FilterSupport.FULL)) FilterSupport.FULL - else FilterSupport.PARTIAL + if (parts.exists(_.support == FilterSupport.NONE)) { + FilterSupport.NONE + } else if (parts.forall(_.support == FilterSupport.FULL)) { + FilterSupport.FULL + } else { + FilterSupport.PARTIAL + } override def aql(v: String): String = parts .map(_.aql(v)) @@ -69,9 +73,13 @@ private class AndFilter(parts: PushableFilter*) extends PushableFilter { * +---------++---------+---------+---------+ */ override def support(): FilterSupport = - if (parts.forall(_.support == FilterSupport.NONE)) FilterSupport.NONE - else if (parts.forall(_.support == FilterSupport.FULL)) FilterSupport.FULL - else FilterSupport.PARTIAL + if (parts.forall(_.support == FilterSupport.NONE)) { + FilterSupport.NONE + } else if (parts.forall(_.support == FilterSupport.FULL)) { + FilterSupport.FULL + } else { + FilterSupport.PARTIAL + } override def aql(v: String): String = parts .filter(_.support() != FilterSupport.NONE) @@ -91,8 +99,11 @@ private class NotFilter(child: PushableFilter) extends PushableFilter { * +---------++---------+ */ override def support(): FilterSupport = - if (child.support() == FilterSupport.FULL) FilterSupport.FULL - else FilterSupport.NONE + if (child.support() == FilterSupport.FULL) { + FilterSupport.FULL + } else { + FilterSupport.NONE + } override def aql(v: String): String = s"NOT (${child.aql(v)})" } From 4f9c5d2eaf00ee88c8850f8908dcd1778aaa7459 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 16:22:14 +0100 Subject: [PATCH 07/17] commons scalastyle fixes --- .../sql/arangodb/commons/ArangoDBConf.scala | 11 ++++--- .../commons/filter/PushableFilter.scala | 32 +++++++++++-------- .../spark/sql/arangodb/commons/package.scala | 6 ++-- .../arangodb/commons/utils/PushDownCtx.scala | 2 +- lib/scalastyle_config.xml | 2 -- 5 files changed, 29 insertions(+), 24 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala index 7bfbaf7c..79cee131 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala @@ -110,10 +110,11 @@ object ArangoDBConf { .createOptional val BATCH_SIZE = "batchSize" + val DEFAULT_BATCH_SIZE = 10000 val batchSizeConf: ConfigEntry[Int] = ConfigBuilder(BATCH_SIZE) .doc("batch size") .intConf - .createWithDefault(10000) + .createWithDefault(DEFAULT_BATCH_SIZE) val QUERY = "query" val queryConf: OptionalConfigEntry[String] = ConfigBuilder(QUERY) @@ -122,10 +123,11 @@ object ArangoDBConf { .createOptional val SAMPLE_SIZE = "sampleSize" + val DEFAULT_SAMPLE_SIZE = 1000 val sampleSizeConf: ConfigEntry[Int] = ConfigBuilder(SAMPLE_SIZE) .doc("sample size prefetched for schema inference") .intConf - .createWithDefault(1000) + .createWithDefault(DEFAULT_SAMPLE_SIZE) val FILL_BLOCK_CACHE = "fillBlockCache" val fillBlockCacheConf: ConfigEntry[Boolean] = ConfigBuilder(FILL_BLOCK_CACHE) @@ -153,10 +155,11 @@ object ArangoDBConf { .createOptional val NUMBER_OF_SHARDS = "table.shards" + val DEFAULT_NUMBER_OF_SHARDS = 1 val numberOfShardsConf: ConfigEntry[Int] = ConfigBuilder(NUMBER_OF_SHARDS) .doc("number of shards of the created collection (in case of SaveMode Append or Overwrite)") .intConf - .createWithDefault(1) + .createWithDefault(DEFAULT_NUMBER_OF_SHARDS) val COLLECTION_TYPE = "table.type" val collectionTypeConf: ConfigEntry[String] = ConfigBuilder(COLLECTION_TYPE) @@ -515,4 +518,4 @@ class ArangoDBWriteConf(opts: Map[String, String]) extends ArangoDBConf(opts) { val keepNull: Boolean = getConf(keepNullConf) -} \ No newline at end of file +} diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala index 616b2ddb..e970dbbc 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala @@ -2,7 +2,7 @@ package org.apache.spark.sql.arangodb.commons.filter import org.apache.spark.sql.arangodb.commons.PushdownUtils.getStructField import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DateType, StringType, StructType, TimestampType} +import org.apache.spark.sql.types.{DataType, DateType, StringType, StructType, TimestampType} sealed trait PushableFilter extends Serializable { def support(): FilterSupport @@ -117,14 +117,14 @@ private class EqualToFilter(attribute: String, value: Any, schema: StructType) e override def support(): FilterSupport = dataType match { case _: DateType => FilterSupport.FULL case _: TimestampType => FilterSupport.PARTIAL // microseconds are ignored in AQL - case t if isTypeAqlCompatible(t) => FilterSupport.FULL + case t: DataType if isTypeAqlCompatible(t) => FilterSupport.FULL case _ => FilterSupport.NONE } override def aql(v: String): String = dataType match { case t: DateType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) == DATE_TIMESTAMP(${getValue(t, value)})""" case t: TimestampType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) == DATE_TIMESTAMP(${getValue(t, value)})""" - case t => s"""`$v`.$escapedFieldName == ${getValue(t, value)}""" + case t: DataType => s"""`$v`.$escapedFieldName == ${getValue(t, value)}""" } } @@ -137,14 +137,14 @@ private class GreaterThanFilter(attribute: String, value: Any, schema: StructTyp override def support(): FilterSupport = dataType match { case _: DateType => FilterSupport.FULL case _: TimestampType => FilterSupport.PARTIAL // microseconds are ignored in AQL - case t if isTypeAqlCompatible(t) => FilterSupport.FULL + case t: DataType if isTypeAqlCompatible(t) => FilterSupport.FULL case _ => FilterSupport.NONE } override def aql(v: String): String = dataType match { case t: DateType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) > DATE_TIMESTAMP(${getValue(t, value)})""" case t: TimestampType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) >= DATE_TIMESTAMP(${getValue(t, value)})""" // microseconds are ignored in AQL - case t => s"""`$v`.$escapedFieldName > ${getValue(t, value)}""" + case t: DataType => s"""`$v`.$escapedFieldName > ${getValue(t, value)}""" } } @@ -157,14 +157,14 @@ private class GreaterThanOrEqualFilter(attribute: String, value: Any, schema: St override def support(): FilterSupport = dataType match { case _: DateType => FilterSupport.FULL case _: TimestampType => FilterSupport.PARTIAL // microseconds are ignored in AQL - case t if isTypeAqlCompatible(t) => FilterSupport.FULL + case t: DataType if isTypeAqlCompatible(t) => FilterSupport.FULL case _ => FilterSupport.NONE } override def aql(v: String): String = dataType match { case t: DateType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) >= DATE_TIMESTAMP(${getValue(t, value)})""" case t: TimestampType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) >= DATE_TIMESTAMP(${getValue(t, value)})""" - case t => s"""`$v`.$escapedFieldName >= ${getValue(t, value)}""" + case t: DataType => s"""`$v`.$escapedFieldName >= ${getValue(t, value)}""" } } @@ -177,14 +177,14 @@ private class LessThanFilter(attribute: String, value: Any, schema: StructType) override def support(): FilterSupport = dataType match { case _: DateType => FilterSupport.FULL case _: TimestampType => FilterSupport.PARTIAL // microseconds are ignored in AQL - case t if isTypeAqlCompatible(t) => FilterSupport.FULL + case t: DataType if isTypeAqlCompatible(t) => FilterSupport.FULL case _ => FilterSupport.NONE } override def aql(v: String): String = dataType match { case t: DateType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) < DATE_TIMESTAMP(${getValue(t, value)})""" case t: TimestampType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) <= DATE_TIMESTAMP(${getValue(t, value)})""" // microseconds are ignored in AQL - case t => s"""`$v`.$escapedFieldName < ${getValue(t, value)}""" + case t: DataType => s"""`$v`.$escapedFieldName < ${getValue(t, value)}""" } } @@ -197,14 +197,14 @@ private class LessThanOrEqualFilter(attribute: String, value: Any, schema: Struc override def support(): FilterSupport = dataType match { case _: DateType => FilterSupport.FULL case _: TimestampType => FilterSupport.PARTIAL // microseconds are ignored in AQL - case t if isTypeAqlCompatible(t) => FilterSupport.FULL + case t: DataType if isTypeAqlCompatible(t) => FilterSupport.FULL case _ => FilterSupport.NONE } override def aql(v: String): String = dataType match { case t: DateType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) <= DATE_TIMESTAMP(${getValue(t, value)})""" case t: TimestampType => s"""DATE_TIMESTAMP(`$v`.$escapedFieldName) <= DATE_TIMESTAMP(${getValue(t, value)})""" - case t => s"""`$v`.$escapedFieldName <= ${getValue(t, value)}""" + case t: DataType => s"""`$v`.$escapedFieldName <= ${getValue(t, value)}""" } } @@ -285,12 +285,16 @@ private class InFilter(attribute: String, values: Array[Any], schema: StructType override def support(): FilterSupport = dataType match { case _: DateType => FilterSupport.FULL case _: TimestampType => FilterSupport.PARTIAL // microseconds are ignored in AQL - case t if isTypeAqlCompatible(t) => FilterSupport.FULL + case t: DataType if isTypeAqlCompatible(t) => FilterSupport.FULL case _ => FilterSupport.NONE } override def aql(v: String): String = dataType match { - case _: TimestampType | DateType => s"""LENGTH([${values.map(getValue(dataType, _)).mkString(",")}][* FILTER DATE_TIMESTAMP(`$v`.$escapedFieldName) == DATE_TIMESTAMP(CURRENT)]) > 0""" - case _ => s"""LENGTH([${values.map(getValue(dataType, _)).mkString(",")}][* FILTER `$v`.$escapedFieldName == CURRENT]) > 0""" + case _: TimestampType | DateType => s"""LENGTH([${ + values.map(getValue(dataType, _)).mkString(",") + }][* FILTER DATE_TIMESTAMP(`$v`.$escapedFieldName) == DATE_TIMESTAMP(CURRENT)]) > 0""" + case _ => s"""LENGTH([${ + values.map(getValue(dataType, _)).mkString(",") + }][* FILTER `$v`.$escapedFieldName == CURRENT]) > 0""" } } diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/package.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/package.scala index 041973de..80cbc12d 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/package.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/package.scala @@ -86,13 +86,13 @@ object CollectionType { case object DOCUMENT extends CollectionType { override val name: String = "document" - override def get() = entity.CollectionType.DOCUMENT + override def get(): entity.CollectionType = entity.CollectionType.DOCUMENT } case object EDGE extends CollectionType { override val name: String = "edge" - override def get() = entity.CollectionType.EDGES + override def get(): entity.CollectionType = entity.CollectionType.EDGES } def apply(value: String): CollectionType = value match { @@ -100,4 +100,4 @@ object CollectionType { case EDGE.name => EDGE case _ => throw new IllegalArgumentException(s"${ArangoDBConf.COLLECTION_TYPE}: $value") } -} \ No newline at end of file +} diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/utils/PushDownCtx.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/utils/PushDownCtx.scala index 8e94bc7e..dfd0291b 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/utils/PushDownCtx.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/utils/PushDownCtx.scala @@ -10,4 +10,4 @@ class PushDownCtx( // filters to push down val filters: Array[PushableFilter] ) - extends Serializable \ No newline at end of file + extends Serializable diff --git a/lib/scalastyle_config.xml b/lib/scalastyle_config.xml index 635b5717..33d9cbbb 100644 --- a/lib/scalastyle_config.xml +++ b/lib/scalastyle_config.xml @@ -111,7 +111,6 @@ - @@ -119,5 +118,4 @@ - \ No newline at end of file From 3c3153f985857cff157fb7b320d1fe421ca5550a Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 19:26:25 +0100 Subject: [PATCH 08/17] commons scalastyle fixes --- .../apache/spark/sql/arangodb/commons/ArangoClient.scala | 2 +- .../apache/spark/sql/arangodb/commons/ArangoDBConf.scala | 6 +++--- .../spark/sql/arangodb/commons/filter/PushableFilter.scala | 2 ++ lib/scalastyle_config.xml | 6 ------ 4 files changed, 6 insertions(+), 10 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index 677710e5..72fca82d 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -195,4 +195,4 @@ object ArangoClient { res } -} \ No newline at end of file +} diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala index 79cee131..6869d3cc 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoDBConf.scala @@ -354,7 +354,7 @@ class ArangoDBConf(opts: Map[String, String]) extends Serializable with Logging */ def getAllDefinedConfigs: Seq[(String, String, String)] = confEntries.values.filter(_.isPublic).map { entry => - val displayValue = Option(getConfString(entry.key, null)).getOrElse(entry.defaultValueString) + val displayValue = settings.get(entry.key).getOrElse(entry.defaultValueString) (entry.key, displayValue, entry.doc) }.toSeq @@ -446,12 +446,12 @@ class ArangoDBDriverConf(opts: Map[String, String]) extends ArangoDBConf(opts) { val is = new ByteArrayInputStream(Base64.getDecoder.decode(b64cert)) val cert = CertificateFactory.getInstance(sslCertType).generateCertificate(is) val ks = KeyStore.getInstance(sslKeystoreType) - ks.load(null) + ks.load(null) // scalastyle:ignore null ks.setCertificateEntry(sslCertAlias, cert) val tmf = TrustManagerFactory.getInstance(sslAlgorithm) tmf.init(ks) val sc = SSLContext.getInstance(sslProtocol) - sc.init(null, tmf.getTrustManagers, null) + sc.init(null, tmf.getTrustManagers, null) // scalastyle:ignore null sc case None => SSLContext.getDefault } diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala index e970dbbc..7f5db8b3 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/filter/PushableFilter.scala @@ -11,6 +11,7 @@ sealed trait PushableFilter extends Serializable { } object PushableFilter { + // scalastyle:off cyclomatic.complexity def apply(filter: Filter, schema: StructType): PushableFilter = filter match { // @formatter:off case f: And => new AndFilter(apply(f.left, schema), apply(f.right, schema)) @@ -34,6 +35,7 @@ object PushableFilter { } // @formatter:on } + // scalastyle:on cyclomatic.complexity } private class OrFilter(parts: PushableFilter*) extends PushableFilter { diff --git a/lib/scalastyle_config.xml b/lib/scalastyle_config.xml index 33d9cbbb..f8e3e182 100644 --- a/lib/scalastyle_config.xml +++ b/lib/scalastyle_config.xml @@ -112,10 +112,4 @@ - - - - - - \ No newline at end of file From 073403455ca1203697bf42f340032a7d352f19fb Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Thu, 30 Dec 2021 19:43:08 +0100 Subject: [PATCH 09/17] datasource-3.1 scalastyle fixes --- .../main/scala/com/arangodb/spark/DefaultSource.scala | 3 ++- .../datasource/mapping/json/CreateJacksonParser.scala | 2 ++ .../datasource/mapping/json/JacksonGenerator.scala | 2 ++ .../arangodb/datasource/mapping/json/JacksonParser.scala | 2 ++ .../arangodb/datasource/mapping/json/JacksonUtils.scala | 2 ++ .../arangodb/datasource/mapping/json/JsonFilters.scala | 2 +- .../datasource/mapping/json/JsonInferSchema.scala | 2 ++ .../reader/ArangoCollectionPartitionReader.scala | 7 +++++-- .../datasource/reader/ArangoPartitionReaderFactory.scala | 2 +- .../arangodb/datasource/reader/ArangoQueryReader.scala | 7 +++++-- .../sql/arangodb/datasource/reader/ArangoScan.scala | 2 +- .../arangodb/datasource/reader/ArangoScanBuilder.scala | 9 ++++++--- .../arangodb/datasource/writer/ArangoDataWriter.scala | 9 ++++++--- .../arangodb/datasource/writer/ArangoWriterBuilder.scala | 8 +++++--- 14 files changed, 42 insertions(+), 17 deletions(-) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala b/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala index 73678489..94bcecd4 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala @@ -26,7 +26,8 @@ class DefaultSource extends TableProvider with DataSourceRegister { override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema() - private def getTable(options: CaseInsensitiveStringMap): Table = getTable(null, null, options.asCaseSensitiveMap()) + private def getTable(options: CaseInsensitiveStringMap): Table = + getTable(null, null, options.asCaseSensitiveMap()) // scalastyle:ignore null override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { if (table == null) { diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala index a58d4bc2..0fa095f1 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core.{JsonFactory, JsonParser} diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala index cff20cec..6c12f01c 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core._ diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala index e448fc87..c74c6e66 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core._ diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala index 3f2ea389..59249701 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core.{JsonParser, JsonToken} diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonFilters.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonFilters.scala index dda7d56c..d48aaa31 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonFilters.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonFilters.scala @@ -106,7 +106,7 @@ class JsonFilters(pushedFilters: Seq[sources.Filter], schema: StructType) val withLiterals: Map[Set[String], JsonPredicate] = groupedByRefSet.map { case (refSet, pred) if refSet.isEmpty => (schema.fields.map(_.name).toSet, pred.copy(totalRefs = 1)) - case others => others + case others: (Set[String], JsonPredicate) => others } // Build a map where key is only one field and value is seq of predicates refer to the field // "i" -> Seq(AlwaysTrue, IsNotNull("i"), Or(EqualTo("i", 0), StringStartsWith("s", "abc"))) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala index da2dd758..24dddda7 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core._ diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala index 2d1c7a03..e61cf5f7 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala @@ -43,8 +43,11 @@ class ArangoCollectionPartitionReader(inputPartition: ArangoCollectionPartition, case ContentType.VPACK => current.toByteArray case ContentType.JSON => current.toString.getBytes(StandardCharsets.UTF_8) }) - if (rowIterator.hasNext) true - else next + if (rowIterator.hasNext) { + true + } else { + next + } } else { // FIXME: https://arangodb.atlassian.net/browse/BTS-671 // stream AQL cursors' warnings are only returned along with the final batch diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoPartitionReaderFactory.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoPartitionReaderFactory.scala index 10f70b35..feacc04b 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoPartitionReaderFactory.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoPartitionReaderFactory.scala @@ -10,4 +10,4 @@ class ArangoPartitionReaderFactory(ctx: PushDownCtx, options: ArangoDBConf) exte case p: ArangoCollectionPartition => new ArangoCollectionPartitionReader(p, ctx, options) case SingletonPartition => new ArangoQueryReader(ctx.requiredSchema, options) } -} \ No newline at end of file +} diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala index 1d62b1d3..be967483 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala @@ -39,8 +39,11 @@ class ArangoQueryReader(schema: StructType, options: ArangoDBConf) extends Parti case ContentType.VPACK => current.toByteArray case ContentType.JSON => current.toString.getBytes(StandardCharsets.UTF_8) }) - if (rowIterator.hasNext) true - else next + if (rowIterator.hasNext) { + true + } else { + next + } } else { // FIXME: https://arangodb.atlassian.net/browse/BTS-671 // stream AQL cursors' warnings are only returned along with the final batch diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala index 3fe1e7fd..79b3c14e 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala @@ -25,4 +25,4 @@ class ArangoScan(ctx: PushDownCtx, options: ArangoDBConf) extends Scan with Batc .zip(Stream.continually(options.driverOptions.endpoints).flatten) .map(it => new ArangoCollectionPartition(it._1, it._2)) -} \ No newline at end of file +} diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala index fb4ea22b..beec80b0 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala @@ -38,12 +38,15 @@ class ArangoScanBuilder(options: ArangoDBConf, tableSchema: StructType) extends appliedPushableFilters = appliedFilters.map(_._2) appliedSparkFilters = appliedFilters.map(_._1) - if (fullSupp.nonEmpty) + if (fullSupp.nonEmpty) { logInfo(s"Filters fully applied in AQL:\n\t${fullSupp.map(_._1).mkString("\n\t")}") - if (partialSupp.nonEmpty) + } + if (partialSupp.nonEmpty) { logInfo(s"Filters partially applied in AQL:\n\t${partialSupp.map(_._1).mkString("\n\t")}") - if (noneSupp.nonEmpty) + } + if (noneSupp.nonEmpty) { logInfo(s"Filters not applied in AQL:\n\t${noneSupp.mkString("\n\t")}") + } partialSupp.map(_._1) ++ noneSupp } diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala index 430269dc..ed36f13c 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala @@ -37,16 +37,17 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I override def commit(): WriterCommitMessage = { flushBatch() - null + null // scalastyle:ignore null } /** * Data cleanup will happen in [[ArangoBatchWriter.abort()]] */ - override def abort(): Unit = if (!canRetry) + override def abort(): Unit = if (!canRetry) { throw new DataWriteAbortException( "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " + "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.") + } override def close(): Unit = { client.shutdown() @@ -93,7 +94,9 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I logWarning("Got exception while saving documents: ", e) client = createClient() saveDocuments(payload) - } else throw e + } else { + throw e + } } } diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoWriterBuilder.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoWriterBuilder.scala index e8de5a1e..6c895d49 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoWriterBuilder.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoWriterBuilder.scala @@ -11,8 +11,9 @@ class ArangoWriterBuilder(schema: StructType, options: ArangoDBConf) extends Wri override def buildForBatch(): BatchWrite = { val client = ArangoClient(options) - if (!client.collectionExists()) + if (!client.collectionExists()) { client.createCollection() + } client.shutdown() new ArangoBatchWriter(schema, options, mode) } @@ -21,10 +22,11 @@ class ArangoWriterBuilder(schema: StructType, options: ArangoDBConf) extends Wri mode = SaveMode.Overwrite if (options.writeOptions.confirmTruncate) { val client = ArangoClient(options) - if (client.collectionExists()) + if (client.collectionExists()) { client.truncate() - else + } else { client.createCollection() + } client.shutdown() this } else { From 9d288e6bdfccde731d42a9ccd671c7cf3386ee7f Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Fri, 31 Dec 2021 14:17:19 +0100 Subject: [PATCH 10/17] datasource-3.1 scapegoat fixes --- .../scala/com/arangodb/spark/DefaultSource.scala | 9 +-------- .../scala/com/arangodb/spark/DefaultSource.scala | 15 ++++++--------- .../sql/arangodb/datasource/ArangoTable.scala | 10 +++------- 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/arangodb-spark-datasource-2.4/src/main/scala/com/arangodb/spark/DefaultSource.scala b/arangodb-spark-datasource-2.4/src/main/scala/com/arangodb/spark/DefaultSource.scala index 5feffd2e..c1f7be4e 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/com/arangodb/spark/DefaultSource.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/com/arangodb/spark/DefaultSource.scala @@ -16,14 +16,7 @@ class DefaultSource extends DataSourceV2 with DataSourceRegister with ReadSupport with WriteSupport { - private var inferredSchema: StructType = _ - - private def inferSchema(options: ArangoDBConf): StructType = { - if (inferredSchema == null) { - inferredSchema = ArangoUtils.inferSchema(options) - } - inferredSchema - } + private def inferSchema(options: ArangoDBConf): StructType = ArangoUtils.inferSchema(options) private def extractOptions(options: DataSourceOptions): ArangoDBConf = { val opts: ArangoDBConf = ArangoDBConf(options.asMap()) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala b/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala index 94bcecd4..38c0925c 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/com/arangodb/spark/DefaultSource.scala @@ -12,8 +12,6 @@ import java.util class DefaultSource extends TableProvider with DataSourceRegister { - private var table: ArangoTable = _ - private def extractOptions(options: util.Map[String, String]): ArangoDBConf = { val opts: ArangoDBConf = ArangoDBConf(options) if (opts.driverOptions.acquireHostList) { @@ -27,17 +25,16 @@ class DefaultSource extends TableProvider with DataSourceRegister { override def inferSchema(options: CaseInsensitiveStringMap): StructType = getTable(options).schema() private def getTable(options: CaseInsensitiveStringMap): Table = - getTable(null, null, options.asCaseSensitiveMap()) // scalastyle:ignore null + getTable(None, options.asCaseSensitiveMap()) // scalastyle:ignore null - override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { - if (table == null) { - table = new ArangoTable(schema, extractOptions(properties)) - } - table - } + override def getTable(schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = + getTable(Option(schema), properties) override def supportsExternalMetadata(): Boolean = true override def shortName(): String = "arangodb" + private def getTable(schema: Option[StructType], properties: util.Map[String, String]) = + new ArangoTable(schema, extractOptions(properties)) + } diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/ArangoTable.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/ArangoTable.scala index 040915a3..e8f4d5a8 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/ArangoTable.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/ArangoTable.scala @@ -12,16 +12,12 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import java.util import scala.collection.JavaConverters.setAsJavaSetConverter -class ArangoTable(private var tableSchema: StructType, options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite { +class ArangoTable(private var schemaOpt: Option[StructType], options: ArangoDBConf) extends Table with SupportsRead with SupportsWrite { + private lazy val tableSchema = schemaOpt.getOrElse(ArangoUtils.inferSchema(options)) override def name(): String = this.getClass.toString - override def schema(): StructType = { - if (tableSchema == null) { - tableSchema = ArangoUtils.inferSchema(options) - } - tableSchema - } + override def schema(): StructType = tableSchema override def capabilities(): util.Set[TableCapability] = Set( TableCapability.BATCH_READ, From bfb130b86f559a7bd94002ef6b658024c80bf697 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 12:54:24 +0100 Subject: [PATCH 11/17] updated dev-README.md --- dev-README.md | 29 ++++++++++++++++++++++++++++- pom.xml | 3 +++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/dev-README.md b/dev-README.md index 73513be4..0b2bd7ce 100644 --- a/dev-README.md +++ b/dev-README.md @@ -7,6 +7,33 @@ Check results [here](https://github.com/arangodb/arangodb-spark-datasource/actio Check results [here](https://sonarcloud.io/project/overview?id=arangodb_arangodb-spark-datasource). ## check dependencies updates -```shell script +```shell mvn -Pspark-${sparkVersion} -Pscala-${scalaVersion} versions:display-dependency-updates ``` + +## analysis tools + +### scalastyle +```shell +mvn -Pspark-${sparkVersion} -Pscala-${scalaVersion} process-sources +``` +Reports: +- [arangodb-spark-commons](arangodb-spark-commons/target/scalastyle-output.xml) +- [arangodb-spark-datasource-2.4](arangodb-spark-datasource-2.4/target/scalastyle-output.xml) +- [arangodb-spark-datasource-3.1](arangodb-spark-datasource-3.1/target/scalastyle-output.xml) + +### scapegoat +```shell +mvn -Pspark-${sparkVersion} -Pscala-${scalaVersion} test-compile +``` +Reports: +- [arangodb-spark-commons](arangodb-spark-commons/target/scapegoat/scapegoat.html) +- [arangodb-spark-datasource-2.4](arangodb-spark-datasource-2.4/target/scapegoat/scapegoat.html) +- [arangodb-spark-datasource-3.1](arangodb-spark-datasource-3.1/target/scapegoat/scapegoat.html) + +### JaCoCo +```shell +mvn -Pspark-${sparkVersion} -Pscala-${scalaVersion} verify +``` +Report: +- [integration-tests](integration-tests/target/site/jacoco-aggregate/index.html) diff --git a/pom.xml b/pom.xml index 6c5a66ef..2e7cb6be 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,9 @@ 0.8.7 true + + org/apache/spark/sql/arangodb/datasource/mapping/json/**/* + From 2e5a2bec4cfbad1d371f6d685df12e837c57ab6a Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 13:15:25 +0100 Subject: [PATCH 12/17] datasource-3.1 scapegoat fixes --- .../spark/sql/arangodb/datasource/reader/ArangoScan.scala | 4 ++-- .../sql/arangodb/datasource/reader/ArangoScanBuilder.scala | 6 +++--- .../sql/arangodb/datasource/writer/ArangoDataWriter.scala | 2 +- pom.xml | 3 ++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala index 79b3c14e..3feedac5 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScan.scala @@ -15,12 +15,12 @@ class ArangoScan(ctx: PushDownCtx, options: ArangoDBConf) extends Scan with Batc override def planInputPartitions(): Array[InputPartition] = options.readOptions.readMode match { case ReadMode.Query => Array(SingletonPartition) - case ReadMode.Collection => planCollectionPartitions().asInstanceOf[Array[InputPartition]] + case ReadMode.Collection => planCollectionPartitions() } override def createReaderFactory(): PartitionReaderFactory = new ArangoPartitionReaderFactory(ctx, options) - private def planCollectionPartitions() = + private def planCollectionPartitions(): Array[InputPartition] = ArangoClient.getCollectionShardIds(options) .zip(Stream.continually(options.driverOptions.endpoints).flatten) .map(it => new ArangoCollectionPartition(it._1, it._2)) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala index beec80b0..c990ff80 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoScanBuilder.scala @@ -13,13 +13,13 @@ class ArangoScanBuilder(options: ArangoDBConf, tableSchema: StructType) extends with SupportsPushDownRequiredColumns with Logging { - private var requiredSchema: StructType = _ + private var readSchema: StructType = _ // fully or partially applied filters private var appliedPushableFilters: Array[PushableFilter] = Array() private var appliedSparkFilters: Array[Filter] = Array() - override def build(): Scan = new ArangoScan(new PushDownCtx(requiredSchema, appliedPushableFilters), options) + override def build(): Scan = new ArangoScan(new PushDownCtx(readSchema, appliedPushableFilters), options) override def pushFilters(filters: Array[Filter]): Array[Filter] = { // filters related to columnNameOfCorruptRecord are not pushed down @@ -54,6 +54,6 @@ class ArangoScanBuilder(options: ArangoDBConf, tableSchema: StructType) extends override def pushedFilters(): Array[Filter] = appliedSparkFilters override def pruneColumns(requiredSchema: StructType): Unit = { - this.requiredSchema = requiredSchema + this.readSchema = requiredSchema } } diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala index ed36f13c..5caa95b1 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala @@ -18,7 +18,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I private var failures = 0 private var endpointIdx = partitionId - private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten + private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten.toIndexedSeq private var client: ArangoClient = createClient() private var batchCount: Int = _ private var outVPack: ByteArrayOutputStream = _ diff --git a/pom.xml b/pom.xml index 2e7cb6be..39aefb1c 100644 --- a/pom.xml +++ b/pom.xml @@ -280,7 +280,8 @@ -target:jvm-1.8 -P:scapegoat:dataDir:${project.build.directory}/scapegoat -P:scapegoat:overrideLevels:all=Warning - -P:scapegoat:ignoredFiles:.*/src/test/scala/.* + -P:scapegoat:ignoredFiles:.*/src/test/scala/.*:.*/org/apache/spark/sql/arangodb/datasource/mapping/json/.* + -P:scapegoat:disabledInspections:CatchException From 6726c7c1b773df253628b22e6dc298d2bbc6e3d0 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 14:21:45 +0100 Subject: [PATCH 13/17] datasource-2.4 scalastyle fixes --- .../datasource/mapping/json/CreateJacksonParser.scala | 2 ++ .../datasource/mapping/json/JacksonGenerator.scala | 2 ++ .../datasource/mapping/json/JacksonParser.scala | 2 ++ .../datasource/mapping/json/JacksonUtils.scala | 2 ++ .../datasource/mapping/json/JsonInferSchema.scala | 2 ++ .../reader/ArangoCollectionPartitionReader.scala | 11 ++++++++--- .../datasource/reader/ArangoDataSourceReader.scala | 11 +++++++---- .../datasource/reader/ArangoQueryReader.scala | 11 ++++++++--- .../datasource/writer/ArangoDataSourceWriter.scala | 2 +- .../arangodb/datasource/writer/ArangoDataWriter.scala | 9 ++++++--- .../arangodb/datasource/writer/NoOpDataWriter.scala | 2 +- 11 files changed, 41 insertions(+), 15 deletions(-) diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala index 5dc9dec2..a97a59bb 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/CreateJacksonParser.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core.{JsonFactory, JsonParser} diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala index b29aa37a..04fb893d 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonGenerator.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core._ diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala index 4173acd4..9e14e283 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonParser.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core._ diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala index 42eb5d7e..065eb60c 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JacksonUtils.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core.{JsonParser, JsonToken} diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala index de991587..1b5185c7 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/mapping/json/JsonInferSchema.scala @@ -15,6 +15,8 @@ * limitations under the License. */ +// scalastyle:off + package org.apache.spark.sql.arangodb.datasource.mapping.json import com.fasterxml.jackson.core._ diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala index 4482a97f..3b8d0da8 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartitionReader.scala @@ -46,12 +46,17 @@ class ArangoCollectionPartitionReader( case ContentType.VPACK => current.toByteArray case ContentType.JSON => current.toString.getBytes(StandardCharsets.UTF_8) }) - if (rowIterator.hasNext) true - else next + if (rowIterator.hasNext) { + true + } else { + next + } } else { // FIXME: https://arangodb.atlassian.net/browse/BTS-671 // stream AQL cursors' warnings are only returned along with the final batch - if (options.readOptions.stream) logWarns() + if (options.readOptions.stream) { + logWarns() + } false } diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala index d91643cd..25869cbe 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala @@ -55,12 +55,15 @@ class ArangoDataSourceReader(tableSchema: StructType, options: ArangoDBConf) ext appliedPushableFilters = appliedFilters.map(_._2) appliedSparkFilters = appliedFilters.map(_._1) - if (fullSupp.nonEmpty) + if (fullSupp.nonEmpty) { logInfo(s"Filters fully applied in AQL:\n\t${fullSupp.map(_._1).mkString("\n\t")}") - if (partialSupp.nonEmpty) + } + if (partialSupp.nonEmpty) { logInfo(s"Filters partially applied in AQL:\n\t${partialSupp.map(_._1).mkString("\n\t")}") - if (noneSupp.nonEmpty) + } + if (noneSupp.nonEmpty) { logInfo(s"Filters not applied in AQL:\n\t${noneSupp.mkString("\n\t")}") + } partialSupp.map(_._1) ++ noneSupp } @@ -87,4 +90,4 @@ class ArangoDataSourceReader(tableSchema: StructType, options: ArangoDBConf) ext } } -} \ No newline at end of file +} diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala index 7f3a1948..09b09b60 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoQueryReader.scala @@ -40,12 +40,17 @@ class ArangoQueryReader(schema: StructType, options: ArangoDBConf) extends Input case ContentType.VPACK => current.toByteArray case ContentType.JSON => current.toString.getBytes(StandardCharsets.UTF_8) }) - if (rowIterator.hasNext) true - else next + if (rowIterator.hasNext) { + true + } else { + next + } } else { // FIXME: https://arangodb.atlassian.net/browse/BTS-671 // stream AQL cursors' warnings are only returned along with the final batch - if (options.readOptions.stream) logWarns() + if (options.readOptions.stream) { + logWarns() + } false } diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala index e249b47f..3469084d 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala @@ -25,7 +25,7 @@ class ArangoDataSourceWriter(writeUUID: String, schema: StructType, mode: SaveMo case SaveMode.Overwrite => ArangoClient(options).truncate() case SaveMode.ErrorIfExists => throw new AnalysisException( s"Collection '${options.writeOptions.collection}' already exists. SaveMode: ErrorIfExists.") - case SaveMode.Ignore => return new NoOpDataWriterFactory + case SaveMode.Ignore => new NoOpDataWriterFactory } } else { client.createCollection() diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala index 8bf73c83..db724570 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala @@ -38,16 +38,17 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I override def commit(): WriterCommitMessage = { flushBatch() client.shutdown() - null + null // scalastyle:ignore null } /** * Data cleanup will happen in [[ArangoDataSourceWriter.abort()]] */ - override def abort(): Unit = if (!canRetry) + override def abort(): Unit = if (!canRetry) { throw new DataWriteAbortException( "Task cannot be retried. To make batch writes idempotent, so that they can be retried, consider using " + "'keep.null=true' (default) and 'overwrite.mode=(ignore|replace|update)'.") + } private def createClient() = ArangoClient(options.updated(ArangoDBConf.ENDPOINTS, endpoints(endpointIdx))) @@ -90,7 +91,9 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I logWarning("Got exception while saving documents: ", e) client = createClient() saveDocuments(payload) - } else throw e + } else { + throw e + } } } diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/NoOpDataWriter.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/NoOpDataWriter.scala index ab048684..930c0f24 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/NoOpDataWriter.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/NoOpDataWriter.scala @@ -9,7 +9,7 @@ class NoOpDataWriter() extends DataWriter[InternalRow] { // do nothing } - override def commit(): WriterCommitMessage = null + override def commit(): WriterCommitMessage = null // scalastyle:ignore null override def abort(): Unit = { // do nothing From 192da6ba05c6afc04941b6890d409edc34b16d54 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 16:53:40 +0100 Subject: [PATCH 14/17] datasource-2.4 scalastyle fixes --- .../sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala index 3469084d..04039c8c 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataSourceWriter.scala @@ -25,7 +25,7 @@ class ArangoDataSourceWriter(writeUUID: String, schema: StructType, mode: SaveMo case SaveMode.Overwrite => ArangoClient(options).truncate() case SaveMode.ErrorIfExists => throw new AnalysisException( s"Collection '${options.writeOptions.collection}' already exists. SaveMode: ErrorIfExists.") - case SaveMode.Ignore => new NoOpDataWriterFactory + case SaveMode.Ignore => return new NoOpDataWriterFactory // scalastyle:ignore return } } else { client.createCollection() From b533a86408007f9bfa9194d1f906ea5367232ed2 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 17:26:42 +0100 Subject: [PATCH 15/17] datasource-2.4 scapegoat fixes --- .../reader/ArangoCollectionPartition.scala | 12 ++++++++++++ .../reader/ArangoDataSourceReader.scala | 16 ++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartition.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartition.scala index b5489032..c83fb6c2 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartition.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoCollectionPartition.scala @@ -33,3 +33,15 @@ class SingletonPartition( override def createPartitionReader(): InputPartitionReader[InternalRow] = new ArangoQueryReader(schema, options) } + +object ArangoPartition { + def ofCollection( + shardId: String, + endpoint: String, + ctx: PushDownCtx, + options: ArangoDBConf + ): InputPartition[InternalRow] = new ArangoCollectionPartition(shardId, endpoint, ctx, options) + + def ofSingleton(schema: StructType, options: ArangoDBConf): InputPartition[InternalRow] = + new SingletonPartition(schema, options) +} \ No newline at end of file diff --git a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala index 25869cbe..9edad49a 100644 --- a/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala +++ b/arangodb-spark-datasource-2.4/src/main/scala/org/apache/spark/sql/arangodb/datasource/reader/ArangoDataSourceReader.scala @@ -24,19 +24,19 @@ class ArangoDataSourceReader(tableSchema: StructType, options: ArangoDBConf) ext private var appliedPushableFilters: Array[PushableFilter] = Array() private var appliedSparkFilters: Array[Filter] = Array() - private var requiredSchema: StructType = _ + private var readingSchema: StructType = _ - override def readSchema(): StructType = Option(requiredSchema).getOrElse(tableSchema) + override def readSchema(): StructType = Option(readingSchema).getOrElse(tableSchema) - override def planInputPartitions(): util.List[InputPartition[InternalRow]] = (options.readOptions.readMode match { - case ReadMode.Query => List(new SingletonPartition(readSchema(), options)).asJava + override def planInputPartitions(): util.List[InputPartition[InternalRow]] = options.readOptions.readMode match { + case ReadMode.Query => List(ArangoPartition.ofSingleton(readSchema(), options)).asJava case ReadMode.Collection => planCollectionPartitions().toList.asJava - }).asInstanceOf[util.List[InputPartition[InternalRow]]] + } private def planCollectionPartitions() = ArangoClient.getCollectionShardIds(options) .zip(Stream.continually(options.driverOptions.endpoints).flatten) - .map(it => new ArangoCollectionPartition(it._1, it._2, new PushDownCtx(readSchema(), appliedPushableFilters), options)) + .map(it => ArangoPartition.ofCollection(it._1, it._2, new PushDownCtx(readSchema(), appliedPushableFilters), options)) override def pushFilters(filters: Array[Filter]): Array[Filter] = { // filters related to columnNameOfCorruptRecord are not pushed down @@ -71,7 +71,7 @@ class ArangoDataSourceReader(tableSchema: StructType, options: ArangoDBConf) ext override def pushedFilters(): Array[Filter] = appliedSparkFilters override def pruneColumns(requiredSchema: StructType): Unit = { - this.requiredSchema = requiredSchema + this.readingSchema = requiredSchema } /** @@ -82,7 +82,7 @@ class ArangoDataSourceReader(tableSchema: StructType, options: ArangoDBConf) ext schema: StructType, columnNameOfCorruptRecord: String): Unit = { schema.getFieldIndex(columnNameOfCorruptRecord).foreach { corruptFieldIndex => - val f = schema(corruptFieldIndex) + val f = schema.toIndexedSeq(corruptFieldIndex) if (f.dataType != StringType || !f.nullable) { throw new AnalysisException( "The field for corrupt records must be string type and nullable") From 437fdb6f2a5b6c00e448aa0ef2be1a3beadcdb24 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 21:05:14 +0100 Subject: [PATCH 16/17] reverted toIndexedSeq change --- .../spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala index 5caa95b1..ed36f13c 100644 --- a/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala +++ b/arangodb-spark-datasource-3.1/src/main/scala/org/apache/spark/sql/arangodb/datasource/writer/ArangoDataWriter.scala @@ -18,7 +18,7 @@ class ArangoDataWriter(schema: StructType, options: ArangoDBConf, partitionId: I private var failures = 0 private var endpointIdx = partitionId - private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten.toIndexedSeq + private val endpoints = Stream.continually(options.driverOptions.endpoints).flatten private var client: ArangoClient = createClient() private var batchCount: Int = _ private var outVPack: ByteArrayOutputStream = _ From 8f2df6b8c063f0e60063a3f07cdd07e8f9f307d7 Mon Sep 17 00:00:00 2001 From: Michele Rastelli Date: Mon, 10 Jan 2022 21:31:05 +0100 Subject: [PATCH 17/17] commons scapegoat fixes --- .../sql/arangodb/commons/ArangoClient.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala index 72fca82d..6b2207e6 100644 --- a/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala +++ b/arangodb-spark-commons/src/main/scala/org/apache/spark/sql/arangodb/commons/ArangoClient.scala @@ -17,6 +17,7 @@ import org.apache.spark.sql.types.StructType import scala.collection.JavaConverters.mapAsJavaMapConverter +@SuppressWarnings(Array("OptionGet")) class ArangoClient(options: ArangoDBConf) extends Logging { private def aqlOptions(): AqlQueryOptions = { @@ -68,11 +69,10 @@ class ArangoClient(options: ArangoDBConf) extends Logging { def readCollectionSample(): Seq[String] = { val query = "FOR d IN @@col LIMIT @size RETURN d" - val params = Map( + val params: Map[String, AnyRef] = Map( "@col" -> options.readOptions.collection.get, - "size" -> options.readOptions.sampleSize + "size" -> (options.readOptions.sampleSize: java.lang.Integer) ) - .asInstanceOf[Map[String, AnyRef]] val opts = aqlOptions() logDebug(s"""Executing AQL query: \n\t$query ${if (params.nonEmpty) s"\n\t with params: $params" else ""}""") @@ -147,7 +147,7 @@ class ArangoClient(options: ArangoDBConf) extends Logging { if (response.getBody.isArray) { val errors = response.getBody.arrayIterator.asScala .filter(it => it.get(ArangoResponseField.ERROR).isTrue) - .map(arangoDB.util().deserialize(_, classOf[ErrorEntity]).asInstanceOf[ErrorEntity]) + .map(arangoDB.util().deserialize[ErrorEntity](_, classOf[ErrorEntity])) .toIterable if (errors.nonEmpty) { throw new ArangoDBMultiException(errors) @@ -157,7 +157,10 @@ class ArangoClient(options: ArangoDBConf) extends Logging { } +@SuppressWarnings(Array("OptionGet")) object ArangoClient { + private val INTERNAL_ERROR_CODE = 4 + private val SHARDS_API_UNAVAILABLE_CODE = 9 def apply(options: ArangoDBConf): ArangoClient = new ArangoClient(options) @@ -175,7 +178,7 @@ object ArangoClient { case e: ArangoDBException => // single server < 3.8 returns Response: 500, Error: 4 - internal error // single server >= 3.8 returns Response: 501, Error: 9 - shards API is only available in a cluster - if (e.getErrorNum == 9 || e.getErrorNum == 4) { + if (INTERNAL_ERROR_CODE.equals(e.getErrorNum) || SHARDS_API_UNAVAILABLE_CODE.equals(e.getErrorNum)) { Array("") } else { throw e @@ -188,8 +191,7 @@ object ArangoClient { val response = client.execute(new Request(ArangoRequestParam.SYSTEM, RequestType.GET, "/_api/cluster/endpoints")) val field = response.getBody.get("endpoints") val res = client.util(Serializer.CUSTOM) - .deserialize(field, classOf[Seq[Map[String, String]]]) - .asInstanceOf[Seq[Map[String, String]]] + .deserialize[Seq[Map[String, String]]](field, classOf[Seq[Map[String, String]]]) .map(it => it("endpoint").replaceFirst(".*://", "")) client.shutdown() res