Skip to content

Commit

Permalink
feat: Support Spark 3.3.X (#679)
Browse files Browse the repository at this point in the history
* build: Add Spark 3.3.0

* build: Add Spark 3.3

* make it compile for 3.3 will fail on  all other 3.x

* try to make it compile

* try to make it compile

* try to make it compile

* try to make it compile

* try to make it compile

* compiles with sbt for all versions, some unit test with 3.3.0 still fail but most is good

* build: Add Spark 3.3.0

* build: Add Spark 3.3

* make it compile for 3.3 will fail on  all other 3.x

* try to make it compile

* try to make it compile

* try to make it compile

* try to make it compile

* try to make it compile

* compiles with sbt for all versions, some unit test with 3.3.0 still fail but most is good

* use spark 3.3.1 instead of 3.3.0, log4j props for log4j 1 and 2, removed change in WorkbookReader

* build.sbt, added log4j handling for earlier versions than 3.3.x

* fixed locale issue

* merged main including module rename

* license got lost

* reorg folders

* reorg folders incl. mill

* fixed log4j in mill

* fixed log4j in mill

* fixed log4j in mill

Co-authored-by: Martin Mauch <martin.mauch@gmail.com>
  • Loading branch information
christianknoepfle and nightscape committed Nov 9, 2022
1 parent 79e398e commit db65e19
Show file tree
Hide file tree
Showing 17 changed files with 240 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
os: [ubuntu-latest]
scala: [2.12.17, 2.13.10]
java: [temurin@8]
spark: [2.4.1, 2.4.7, 2.4.8, 3.0.1, 3.0.3, 3.1.1, 3.1.2, 3.1.3, 3.2.2]
spark: [2.4.1, 2.4.7, 2.4.8, 3.0.1, 3.0.3, 3.1.1, 3.1.2, 3.1.3, 3.2.2, 3.3.1]
exclude:
- spark: 2.4.1
scala: 2.13.10
Expand Down
43 changes: 30 additions & 13 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ inThisBuild(
)
)

lazy val sparkVersion = "3.2.2"
lazy val sparkVersion = "3.3.1"
val poiVersion = "5.2.3"

val testSparkVersion = settingKey[String]("The version of Spark to test against.")
Expand All @@ -48,9 +48,6 @@ version := testSparkVersion.value + "_" + version.value

resolvers ++= Seq("jitpack" at "https://jitpack.io")

libraryDependencies ++= Seq("org.slf4j" % "slf4j-api" % "1.7.36" % "provided")
.map(_.excludeAll(ExclusionRule(organization = "stax")))

enablePlugins(ThinFatJar)
shadedDeps ++= Seq(
"org.apache.poi" % "poi" % poiVersion,
Expand Down Expand Up @@ -91,29 +88,49 @@ libraryDependencies ++= Seq(
"org.scalamock" %% "scalamock" % "5.2.0" % Test
)

// spark >= 3.3 uses log4j 2.x while previous version relied on log4j 1.x
libraryDependencies ++= {
if (testSparkVersion.value >= "3.3.0") {
Seq("org.apache.logging.log4j" % "log4j-core" % "2.19.0" % Test)
} else {
Seq("org.slf4j" % "slf4j-api" % "1.7.36" % "provided")
}
}

// Custom source layout for Spark Data Source API 2
Compile / unmanagedSourceDirectories := {
if (testSparkVersion.value >= "3.2.2") {
if (testSparkVersion.value >= "3.3.0") {
Seq(
(Compile / sourceDirectory)(_ / "scala"),
(Compile / sourceDirectory)(_ / "3.x/scala"),
(Compile / sourceDirectory)(_ / "3.1_3.2/scala"),
(Compile / sourceDirectory)(_ / "3.2/scala")
(Compile / sourceDirectory)(_ / "3.3/scala"),
(Compile / sourceDirectory)(_ / "3.0_and_up/scala"),
(Compile / sourceDirectory)(_ / "3.1_and_up/scala"),
(Compile / sourceDirectory)(_ / "3.2_and_up/scala")
).join.value
} else if (testSparkVersion.value >= "3.2.0") {
Seq(
(Compile / sourceDirectory)(_ / "scala"),
(Compile / sourceDirectory)(_ / "3.0_3.1_3.2/scala"),
(Compile / sourceDirectory)(_ / "3.0_and_up/scala"),
(Compile / sourceDirectory)(_ / "3.1_and_up/scala"),
(Compile / sourceDirectory)(_ / "3.2_and_up/scala")
).join.value
} else if (testSparkVersion.value >= "3.1.0") {
Seq(
(Compile / sourceDirectory)(_ / "scala"),
(Compile / sourceDirectory)(_ / "3.x/scala"),
(Compile / sourceDirectory)(_ / "3.0_3.1/scala"),
(Compile / sourceDirectory)(_ / "3.1/scala"),
(Compile / sourceDirectory)(_ / "3.1_3.2/scala")
(Compile / sourceDirectory)(_ / "3.0_3.1/scala"),
(Compile / sourceDirectory)(_ / "3.0_3.1_3.2/scala"),
(Compile / sourceDirectory)(_ / "3.0_and_up/scala"),
(Compile / sourceDirectory)(_ / "3.1_and_up/scala")
).join.value
} else if (testSparkVersion.value >= "3.0.0") {
Seq(
(Compile / sourceDirectory)(_ / "scala"),
(Compile / sourceDirectory)(_ / "3.x/scala"),
(Compile / sourceDirectory)(_ / "3.0/scala"),
(Compile / sourceDirectory)(_ / "3.0_3.1/scala")
(Compile / sourceDirectory)(_ / "3.0_3.1/scala"),
(Compile / sourceDirectory)(_ / "3.0_3.1_3.2/scala"),
(Compile / sourceDirectory)(_ / "3.0_and_up/scala")
).join.value
} else if (testSparkVersion.value >= "2.4.0") {
Seq((Compile / sourceDirectory)(_ / "scala"), (Compile / sourceDirectory)(_ / "2.4/scala")).join.value
Expand Down
64 changes: 40 additions & 24 deletions build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule
override def millSourcePath = super.millSourcePath / os.up / os.up / os.up

// Custom source layout for Spark Data Source API 2
val sparkVersionSpecificSources = if (sparkVersion >= "3.2.2") {
Seq("scala", "3.x/scala", "3.1_3.2/scala", "3.2/scala")
val sparkVersionSpecificSources = if (sparkVersion >= "3.3.0") {
Seq("scala", "3.3/scala", "3.0_and_up/scala", "3.1_and_up/scala", "3.2_and_up/scala")
} else if (sparkVersion >= "3.2.0") {
Seq("scala", "3.0_3.1_3.2/scala", "3.0_and_up/scala", "3.1_and_up/scala", "3.2_and_up/scala")
} else if (sparkVersion >= "3.1.0") {
Seq("scala", "3.x/scala", "3.0_3.1/scala", "3.1/scala", "3.1_3.2/scala")
Seq("scala", "3.1/scala", "3.0_3.1/scala", "3.0_3.1_3.2/scala", "3.0_and_up/scala", "3.1_and_up/scala")
} else if (sparkVersion >= "3.0.0") {
Seq("scala", "3.x/scala", "3.0/scala", "3.0_3.1/scala")
Seq("scala", "3.0/scala", "3.0_3.1/scala", "3.0_3.1_3.2/scala", "3.0_and_up/scala")
} else if (sparkVersion >= "2.4.0") {
Seq("scala", "2.4/scala")
} else {
Expand Down Expand Up @@ -50,26 +52,39 @@ class SparkModule(_scalaVersion: String, sparkVersion: String) extends SbtModule
ivy"org.apache.spark::spark-sql:$sparkVersion",
ivy"org.apache.spark::spark-hive:$sparkVersion"
)
override def compileIvyDeps = sparkDeps ++ Agg(ivy"org.slf4j:slf4j-api:1.7.36".excludeOrg("stax"))
override def compileIvyDeps = if (sparkVersion < "3.3.0") {
sparkDeps ++ Agg(ivy"org.slf4j:slf4j-api:1.7.36".excludeOrg("stax"))
} else {
sparkDeps
}


val poiVersion = "5.2.3"
override def ivyDeps = Agg(
ivy"org.apache.poi:poi:$poiVersion",
ivy"org.apache.poi:poi-ooxml:$poiVersion",
ivy"org.apache.poi:poi-ooxml-lite:$poiVersion",
ivy"org.apache.xmlbeans:xmlbeans:5.1.1",
ivy"com.norbitltd::spoiwo:2.2.1",
ivy"com.github.pjfanning:excel-streaming-reader:4.0.4",
ivy"com.github.pjfanning:poi-shared-strings:2.5.5",
ivy"commons-io:commons-io:2.11.0",
ivy"org.apache.commons:commons-compress:1.22",
ivy"org.apache.logging.log4j:log4j-api:2.19.0",
ivy"com.zaxxer:SparseBitSet:1.2",
ivy"org.apache.commons:commons-collections4:4.4",
ivy"com.github.virtuald:curvesapi:1.07",
ivy"commons-codec:commons-codec:1.15",
ivy"org.apache.commons:commons-math3:3.6.1",
ivy"org.scala-lang.modules::scala-collection-compat:2.8.1"
)
override def ivyDeps = {
val base = Agg(
ivy"org.apache.poi:poi:$poiVersion",
ivy"org.apache.poi:poi-ooxml:$poiVersion",
ivy"org.apache.poi:poi-ooxml-lite:$poiVersion",
ivy"org.apache.xmlbeans:xmlbeans:5.1.1",
ivy"com.norbitltd::spoiwo:2.2.1",
ivy"com.github.pjfanning:excel-streaming-reader:4.0.4",
ivy"com.github.pjfanning:poi-shared-strings:2.5.5",
ivy"commons-io:commons-io:2.11.0",
ivy"org.apache.commons:commons-compress:1.22",
ivy"org.apache.logging.log4j:log4j-api:2.19.0",
ivy"com.zaxxer:SparseBitSet:1.2",
ivy"org.apache.commons:commons-collections4:4.4",
ivy"com.github.virtuald:curvesapi:1.07",
ivy"commons-codec:commons-codec:1.15",
ivy"org.apache.commons:commons-math3:3.6.1",
ivy"org.scala-lang.modules::scala-collection-compat:2.8.1"
)
if (sparkVersion >= "3.3.0") {
base ++ Agg(ivy"org.apache.logging.log4j:log4j-core:2.19.0")
} else {
base
}
}
object test extends Tests with SbtModule with TestModule.ScalaTest {

override def millSourcePath = super.millSourcePath
Expand Down Expand Up @@ -97,8 +112,9 @@ val spark24 = List("2.4.1", "2.4.7", "2.4.8")
val spark30 = List("3.0.1", "3.0.3")
val spark31 = List("3.1.1", "3.1.2", "3.1.3")
val spark32 = List("3.2.2")
val spark33 = List("3.3.1")

val crossMatrix =
(spark24 ++ spark30 ++ spark31 ++ spark32).map(spark => (scala212, spark)) ++ spark32.map(spark => (scala213, spark))
(spark24 ++ spark30 ++ spark31 ++ spark32 ++ spark33).map(spark => (scala212, spark)) ++ (spark32 ++ spark33).map(spark => (scala213, spark))

object `spark-excel` extends Cross[SparkModule](crossMatrix: _*) {}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ addSbtPlugin(
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.0.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.2")
addDependencyTreePlugin
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ package com.crealytics.spark.excel.v2

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.execution.datasources.v2.{FileScan, TextBasedFileScan}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package com.crealytics.spark.excel.v2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.connector.read.SupportsPushDownFilters
import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownFilters}
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.sources.Filter
Expand Down
107 changes: 107 additions & 0 deletions src/main/3.3/scala/com/crealytics/spark/excel/v2/ExcelScan.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2022 Martin Mauch (@nightscape)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.crealytics.spark.excel.v2

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, Expression}
import org.apache.spark.sql.connector.read.PartitionReaderFactory
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.TextBasedFileScan
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.SerializableConfiguration

import scala.collection.compat.immutable.ArraySeq
import scala.jdk.CollectionConverters._

case class ExcelScan(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
dataSchema: StructType,
readDataSchema: StructType,
readPartitionSchema: StructType,
options: CaseInsensitiveStringMap,
pushedFilters: Array[Filter],
partitionFilters: Seq[Expression] = Seq.empty,
dataFilters: Seq[Expression] = Seq.empty
) extends TextBasedFileScan(sparkSession, options) {

private lazy val parsedOptions: ExcelOptions = new ExcelOptions(
options.asScala.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord
)

override def isSplitable(path: Path): Boolean = false

override def getFileUnSplittableReason(path: Path): String = {
"No practical method of splitting an excel file"
}

override def createReaderFactory(): PartitionReaderFactory = {

/* Check a field requirement for corrupt records here to throw an exception in a driver side
*/
ExprUtils.verifyColumnNameOfCorruptRecord(dataSchema, parsedOptions.columnNameOfCorruptRecord)

if (
readDataSchema.length == 1 &&
readDataSchema.head.name == parsedOptions.columnNameOfCorruptRecord
) {
throw new RuntimeException(
"Queries from raw Excel files are disallowed when the referenced " +
"columns only include the internal corrupt record column"
)
}

val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap

/* Hadoop Configurations are case sensitive. */
val hadoopConf = sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)

val broadcastedConf = sparkSession.sparkContext
.broadcast(new SerializableConfiguration(hadoopConf))

/* The partition values are already truncated in `FileScan.partitions`. We should use `readPartitionSchema` as the
* partition schema here.
*/
ExcelPartitionReaderFactory(
sparkSession.sessionState.conf,
broadcastedConf,
dataSchema,
readDataSchema,
readPartitionSchema,
parsedOptions,
ArraySeq.unsafeWrapArray(pushedFilters)
)
}

override def equals(obj: Any): Boolean = obj match {
case c: ExcelScan =>
super.equals(c) && dataSchema == c.dataSchema && options == c.options &&
equivalentFilters(pushedFilters, c.pushedFilters)
case _ => false
}

override def hashCode(): Int = super.hashCode()

override def description(): String = {
super.description() + ", PushedFilters: " + pushedFilters.mkString("[", ", ", "]")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022 Martin Mauch (@nightscape)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.crealytics.spark.excel.v2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution.datasources.PartitioningAwareFileIndex
import org.apache.spark.sql.execution.datasources.v2.FileScanBuilder
import org.apache.spark.sql.internal.connector.SupportsPushDownCatalystFilters
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap

case class ExcelScanBuilder(
sparkSession: SparkSession,
fileIndex: PartitioningAwareFileIndex,
schema: StructType,
dataSchema: StructType,
options: CaseInsensitiveStringMap
) extends FileScanBuilder(sparkSession, fileIndex, dataSchema)
with SupportsPushDownCatalystFilters {

override def build(): Scan = {
ExcelScan(sparkSession, fileIndex, dataSchema, readDataSchema(), readPartitionSchema(), options, pushedDataFilters)
}

}
18 changes: 18 additions & 0 deletions src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# config for log4j 1.x (spark < 3.3)
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.out
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# config for log4j 2.x (spark >= 3.3)
# Extra logging related to initialization of Log4j
# Set to debug or trace if log4j initialization is failing
status = warn


# Console appender configuration
appender.console.type = Console
appender.console.name = consoleLogger
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

# Root logger level
rootLogger.level = debug
# Root logger referring to console appender
rootLogger.appenderRef.stdout.ref = consoleLogger

0 comments on commit db65e19

Please sign in to comment.