Skip to content

Cannot read row group larger than 2GB #2057

@asfimport

Description

@asfimport

Parquet MR 1.8.2 does not support reading row groups which are larger than 2 GB. See:https://github.com/apache/parquet-mr/blob/parquet-1.8.x/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java#L1064

We are seeing this when writing skewed records. This throws off the estimation of the memory check interval in the InternalParquetRecordWriter. The following spark code illustrates this:

/**
 * Create a data frame that will make parquet write a file with a row group larger than 2 GB. Parquet
 * only checks the size of the row group after writing a number of records. This number is based on
 * average row size of the already written records. This is problematic in the following scenario:
 * - The initial (100) records in the record group are relatively small.
 * - The InternalParquetRecordWriter checks if it needs to write to disk (it should not), it assumes
 *   that the remaining records have a similar size, and (greatly) increases the check interval (usually
 *   to 10000).
 * - The remaining records are much larger then expected, making the row group larger than 2 GB (which
 *   makes reading the row group impossible).
 *
 * The data frame below illustrates such a scenario. This creates a row group of approximately 4GB.
 */
val badDf = spark.range(0, 2200, 1, 1).mapPartitions { iterator =>
  var i = 0
  val random = new scala.util.Random(42)
  val buffer = new Array[Char](750000)
  iterator.map { id =>
    // the first 200 records have a length of 1K and the remaining 2000 have a length of 750K.
    val numChars = if (i < 200) 1000 else 750000
    i += 1

    // create a random array
    var j = 0
    while (j < numChars) {
      // Generate a char (borrowed from scala.util.Random)
      buffer(j) = (random.nextInt(0xD800 - 1) + 1).toChar
      j += 1
    }

    // create a string: the string constructor will copy the buffer.
    new String(buffer, 0, numChars)
  }
}
badDf.write.parquet("somefile")
val corruptedDf = spark.read.parquet("somefile")
corruptedDf.select(count(lit(1)), max(length($"value"))).show()

The latter fails with the following exception:

java.lang.NegativeArraySizeException
	at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1064)
	at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:698)
...

-This seems to be fixed by commit 6b605a4 in parquet 1.9.x. Is there any chance that we can fix this in 1.8.x?-

Reporter: Herman van Hövell

Note: This issue was originally created as PARQUET-980. Please see the migration documentation for further details.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions