Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support an option to set POI IOUtils max byte array size #613

Merged
merged 6 commits into from
Jul 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ val df = spark.read
.option("addColorColumns", "true") // Optional, default: false
.option("timestampFormat", "MM-dd-yyyy HH:mm:ss") // Optional, default: yyyy-mm-dd hh:mm:ss[.fffffffff]
.option("maxRowsInMemory", 20) // Optional, default None. If set, uses a streaming reader which can help with big files (will fail if used with xls format files)
.option("maxByteArraySize", 2147483647) // Optional, default None. See https://poi.apache.org/apidocs/5.0/org/apache/poi/util/IOUtils.html#setByteArrayMaxOverride-int-
.option("excerptSize", 10) // Optional, default: 10. If set and if schema inferred, number of rows to infer schema from
.option("workbookPassword", "pass") // Optional, default None. Requires unlimited strength JCE for older JVMs
.schema(myCustomSchema) // Optional, default: Either inferred schema, or all columns are Strings
Expand All @@ -115,6 +116,7 @@ val df = spark.read.excel(
addColorColumns = true, // Optional, default: false
timestampFormat = "MM-dd-yyyy HH:mm:ss", // Optional, default: yyyy-mm-dd hh:mm:ss[.fffffffff]
maxRowsInMemory = 20, // Optional, default None. If set, uses a streaming reader which can help with big files (will fail if used with xls format files)
maxByteArraySize = 2147483647, // Optional, default None. See https://poi.apache.org/apidocs/5.0/org/apache/poi/util/IOUtils.html#setByteArrayMaxOverride-int-
excerptSize = 10, // Optional, default: 10. If set and if schema inferred, number of rows to infer schema from
workbookPassword = "pass" // Optional, default None. Requires unlimited strength JCE for older JVMs
).schema(myCustomSchema) // Optional, default: Either inferred schema, or all columns are Strings
Expand Down
44 changes: 33 additions & 11 deletions src/main/scala/com/crealytics/spark/excel/WorkbookReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
package com.crealytics.spark.excel

import java.io.InputStream

import com.crealytics.spark.excel.Utils.MapIncluding
import com.github.pjfanning.xlsx.StreamingReader
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.poi.ss.usermodel.{Workbook, WorkbookFactory}
import org.apache.poi.hssf.usermodel.HSSFWorkbookFactory
import org.apache.poi.util.IOUtils
import org.apache.poi.xssf.usermodel.XSSFWorkbookFactory

trait WorkbookReader {
Expand All @@ -45,7 +45,7 @@ trait WorkbookReader {

object WorkbookReader {
val WithLocationMaxRowsInMemoryAndPassword =
MapIncluding(Seq("path"), optionally = Seq("maxRowsInMemory", "workbookPassword"))
MapIncluding(Seq("path"), optionally = Seq("maxRowsInMemory", "workbookPassword", "maxByteArraySize"))

WorkbookFactory.addProvider(new HSSFWorkbookFactory)
WorkbookFactory.addProvider(new XSSFWorkbookFactory)
Expand All @@ -56,26 +56,48 @@ object WorkbookReader {
FileSystem.get(path.toUri, hadoopConfiguration).open(path)
}
parameters match {
case WithLocationMaxRowsInMemoryAndPassword(Seq(location), Seq(Some(maxRowsInMemory), passwordOption)) =>
new StreamingWorkbookReader(readFromHadoop(location), passwordOption, maxRowsInMemory.toInt)
case WithLocationMaxRowsInMemoryAndPassword(Seq(location), Seq(None, passwordOption)) =>
new DefaultWorkbookReader(readFromHadoop(location), passwordOption)
case WithLocationMaxRowsInMemoryAndPassword(
Seq(location),
Seq(Some(maxRowsInMemory), passwordOption, maxByteArraySizeOption)
) =>
new StreamingWorkbookReader(
readFromHadoop(location),
passwordOption,
maxRowsInMemory.toInt,
maxByteArraySizeOption.map(_.toInt)
)
case WithLocationMaxRowsInMemoryAndPassword(Seq(location), Seq(None, passwordOption, maxByteArraySizeOption)) =>
new DefaultWorkbookReader(readFromHadoop(location), passwordOption, maxByteArraySizeOption.map(_.toInt))
}
}
}
class DefaultWorkbookReader(inputStreamProvider: => InputStream, workbookPassword: Option[String])
extends WorkbookReader {
class DefaultWorkbookReader(
inputStreamProvider: => InputStream,
workbookPassword: Option[String],
maxByteArraySize: Option[Int]
) extends WorkbookReader {

protected def openWorkbook(): Workbook =
protected def openWorkbook(): Workbook = {
maxByteArraySize.foreach { maxSize =>
IOUtils.setByteArrayMaxOverride(maxSize)
}
workbookPassword
.fold(WorkbookFactory.create(inputStreamProvider))(password =>
WorkbookFactory.create(inputStreamProvider, password)
)
}
}

class StreamingWorkbookReader(inputStreamProvider: => InputStream, workbookPassword: Option[String], maxRowsInMem: Int)
extends WorkbookReader {
class StreamingWorkbookReader(
inputStreamProvider: => InputStream,
workbookPassword: Option[String],
maxRowsInMem: Int,
maxByteArraySize: Option[Int]
) extends WorkbookReader {
override protected def openWorkbook(): Workbook = {
maxByteArraySize.foreach { maxSize =>
IOUtils.setByteArrayMaxOverride(maxSize)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this parameter work in StreamingReader as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes - excel-streaming-reader uses some POI code under the hood

}
val builder = StreamingReader
.builder()
.rowCacheSize(maxRowsInMem)
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/com/crealytics/spark/excel/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ package object excel {
dataAddress: String = null,
timestampFormat: String = null,
maxRowsInMemory: java.lang.Integer = null,
maxByteArraySize: java.lang.Integer = null,
excerptSize: Int = 10,
workbookPassword: String = null
): DataFrameReader = {
Expand All @@ -95,6 +96,7 @@ package object excel {
"dataAddress" -> dataAddress,
"timestampFormat" -> timestampFormat,
"maxRowsInMemory" -> maxRowsInMemory,
"maxByteArraySize" -> maxByteArraySize,
"excerptSize" -> excerptSize,
"workbookPassword" -> workbookPassword
).foldLeft(dataFrameReader.format("com.crealytics.spark.excel")) { case (dfReader, (key, value)) =>
Expand Down
26 changes: 10 additions & 16 deletions src/main/scala/com/crealytics/spark/v2/excel/ExcelHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,17 @@
package com.crealytics.spark.v2.excel

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path
import org.apache.poi.ss.usermodel.Cell
import org.apache.poi.ss.usermodel.CellType
import org.apache.poi.ss.usermodel.DataFormatter
import org.apache.poi.ss.usermodel.FormulaError
import org.apache.poi.ss.usermodel.Workbook
import org.apache.poi.ss.usermodel.WorkbookFactory
import java.math.BigDecimal
import java.text.FieldPosition
import java.text.Format
import java.text.ParsePosition

import java.net.URI
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.poi.hssf.usermodel.HSSFWorkbookFactory
import org.apache.poi.ss.util.AreaReference
import org.apache.poi.ss.util.CellReference
import org.apache.poi.ss.SpreadsheetVersion
import org.apache.poi.ss.usermodel.{Cell, CellType, DataFormatter, FormulaError, Workbook, WorkbookFactory}
import org.apache.poi.ss.util.{AreaReference, CellReference}
import org.apache.poi.util.IOUtils
import org.apache.poi.xssf.usermodel.XSSFWorkbookFactory

import java.math.BigDecimal
import java.net.URI
import java.text.{FieldPosition, Format, ParsePosition}
import java.util.concurrent.atomic.AtomicBoolean
import scala.util.Try

Expand Down Expand Up @@ -209,6 +200,9 @@ object ExcelHelper {

def apply(options: ExcelOptions): ExcelHelper = {
configureProvidersOnce() // ExcelHelper ctor is private, so we guarantee that this is called!
options.maxByteArraySize.foreach { maxSize =>
IOUtils.setByteArrayMaxOverride(maxSize)
}
new ExcelHelper(options)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,10 @@ class ExcelOptions(
val r = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
if (r > 1.0 || r <= 0.0) 1.0 else r
}

//scalastyle:off
/**
* Optional parameter for <a href="https://poi.apache.org/apidocs/5.0/org/apache/poi/util/IOUtils.html#setByteArrayMaxOverride-int-">maxByteArraySize</a>
*/
val maxByteArraySize = getInt("maxByteArraySize")
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class IntegrationSuite
.map { case ((f, sf), idx) => sf.name -> f(data.map(_.get(idx))) }
}

def runTests(maxRowsInMemory: Option[Int]) {
def runTests(maxRowsInMemory: Option[Int], maxByteArraySize: Option[Int] = None) {
def writeThenRead(
df: DataFrame,
schema: Option[StructType] = Some(exampleDataSchema),
Expand All @@ -94,6 +94,7 @@ class IntegrationSuite
val reader = spark.read.excel(dataAddress = s"'$sheetName'!A1", header = header)
val configuredReader = Map(
"maxRowsInMemory" -> maxRowsInMemory,
"maxByteArraySize" -> maxByteArraySize,
"inferSchema" -> Some(schema.isEmpty),
"excerptSize" -> Some(10),
"dataAddress" -> dataAddress
Expand All @@ -116,7 +117,7 @@ class IntegrationSuite
assertDataFrameEquals(expected, inferred)
}

describe(s"with maxRowsInMemory = $maxRowsInMemory") {
describe(s"with maxRowsInMemory = $maxRowsInMemory; maxByteArraySize = $maxByteArraySize") {
it("parses known datatypes correctly") {
forAll(rowsGen) { rows =>
val expected = spark.createDataset(rows).toDF
Expand Down Expand Up @@ -344,6 +345,8 @@ class IntegrationSuite
differencesInNonOverwrittenData shouldBe empty
}
runTests(maxRowsInMemory = None)
runTests(maxRowsInMemory = None, maxByteArraySize = Some(100000000))
runTests(maxRowsInMemory = Some(20))
runTests(maxRowsInMemory = Some(1))
runTests(maxRowsInMemory = Some(1), maxByteArraySize = Some(100000000))
}