Skip to content

Commit

Permalink
PARQUET-227 Enforce that unions have only 1 set value, tolerate bad r…
Browse files Browse the repository at this point in the history
…ecords in read path

See https://issues.apache.org/jira/browse/PARQUET-227

Author: Alex Levenson <alexlevenson@twitter.com>

Closes #153 from isnotinvain/alexlevenson/double-union and squashes the following commits:

ef4d36f [Alex Levenson] fix package names
e201deb [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
01694fa [Alex Levenson] Forgot a break in a switch statement
2f31321 [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
9292274 [Alex Levenson] Add in ShouldNeverHappenException which I forgot to check in
8d61515 [Alex Levenson] Address first round of comments
4d71bcb [Alex Levenson] Merge branch 'master' into alexlevenson/double-union
8f9334c [Alex Levenson] Some cleanup and fixes
8153bc9 [Alex Levenson] Enforce that unions have only 1 set value, tolerate bad records in read path
  • Loading branch information
isnotinvain committed Apr 30, 2015
1 parent b287d35 commit 9993450
Show file tree
Hide file tree
Showing 15 changed files with 665 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.parquet.io.api;

import org.apache.parquet.io.ParquetDecodingException;

/**
* Top-level class which should be implemented in order to materialize objects from
* a stream of Parquet data.
Expand All @@ -33,6 +35,7 @@ abstract public class RecordMaterializer<T> {

/**
* @return the result of the conversion
* @throws RecordMaterializationException to signal that a record cannot be materialized, but can be skipped
*/
abstract public T getCurrentRecord();

Expand All @@ -45,4 +48,28 @@ public void skipCurrentRecord() { }
* @return the root converter for this tree
*/
abstract public GroupConverter getRootConverter();

/**
* This exception signals that the current record is cannot be converted from parquet columns to a materialized
* record, but can be skipped if requested. This exception should be used to signal errors like a union with no
* set values, or an error in converting parquet primitive values to a materialized record. It should not
* be used to signal unrecoverable errors, like a data column being corrupt or unreadable.
*/
public static class RecordMaterializationException extends ParquetDecodingException {
public RecordMaterializationException() {
super();
}

public RecordMaterializationException(String message, Throwable cause) {
super(message, cause);
}

public RecordMaterializationException(String message) {
super(message);
}

public RecordMaterializationException(Throwable cause) {
super(cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet;

/**
* Used in code blocks that should be unreachable, but the compiler does
* not know this, for example the default clause of an exhaustive switch statement.
*/
public class ShouldNeverHappenException extends ParquetRuntimeException {
public ShouldNeverHappenException() {
}

public ShouldNeverHappenException(String message, Throwable cause) {
super(message, cause);
}

public ShouldNeverHappenException(String message) {
super(message);
}

public ShouldNeverHappenException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;

import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

Expand All @@ -43,6 +43,7 @@
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.RecordMaterializer;
import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
Expand Down Expand Up @@ -80,6 +81,7 @@ class InternalParquetRecordReader<T> {
private long totalCountLoadedSoFar = 0;

private Path file;
private UnmaterializableRecordCounter unmaterializableRecordCounter;

/**
* @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
Expand Down Expand Up @@ -179,6 +181,7 @@ public void initialize(MessageType fileSchema,
for (BlockMetaData block : blocks) {
total += block.getRowCount();
}
this.unmaterializableRecordCounter = new UnmaterializableRecordCounter(configuration, total);
LOG.info("RecordReader initialized will read a total of " + total + " records.");
}

Expand Down Expand Up @@ -206,8 +209,17 @@ public boolean nextKeyValue() throws IOException, InterruptedException {

try {
checkRead();
currentValue = recordReader.read();
current ++;

try {
currentValue = recordReader.read();
} catch (RecordMaterializationException e) {
// this might throw, but it's fatal if it does.
unmaterializableRecordCounter.incErrors(e);
if (DEBUG) LOG.debug("skipping a corrupt record");
continue;
}

if (recordReader.shouldSkipCurrentRecord()) {
// this record is being filtered via the filter2 package
if (DEBUG) LOG.debug("skipping record");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;


import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.Log;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException;

// Essentially taken from:
// https://github.com/twitter/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java#L124

/**
* Tracks number of records that cannot be materialized and throws ParquetDecodingException
* if the rate of errors crosses a limit.<p> These types of errors are meant
* to be recoverable record conversion errors, such as a union missing a value, or schema
* mismatch and so on. It's not meant to recover from corruptions in the parquet
* columns themselves.
*
* The intention is to skip over very rare file corruption or bugs where
* the write path has allowed invalid records into the file, but still catch large
* numbers of failures. Not turned on by default (by default, no errors are tolerated).
*/
public class UnmaterializableRecordCounter {

/* Tolerated percent bad records */
public static final String BAD_RECORD_THRESHOLD_CONF_KEY = "parquet.read.bad.record.threshold";

private static final Log LOG = Log.getLog(UnmaterializableRecordCounter.class);

private static final float DEFAULT_THRESHOLD = 0f;

private long numErrors;

private final double errorThreshold; // max fraction of errors allowed
private final long totalNumRecords; // how many records are we going to see total?

public UnmaterializableRecordCounter(Configuration conf, long totalNumRecords) {
this(
conf.getFloat(BAD_RECORD_THRESHOLD_CONF_KEY, DEFAULT_THRESHOLD),
totalNumRecords
);
}

public UnmaterializableRecordCounter(double errorThreshold, long totalNumRecords) {
this.errorThreshold = errorThreshold;
this.totalNumRecords = totalNumRecords;
numErrors = 0;
}

public void incErrors(RecordMaterializationException cause) throws ParquetDecodingException {
numErrors++;

LOG.warn(String.format("Error while reading an input record (%s out of %s): ",
numErrors, totalNumRecords), cause);

if (numErrors > 0 && errorThreshold <= 0) { // no errors are tolerated
throw new ParquetDecodingException("Error while decoding records", cause);
}

double errRate = numErrors/(double)totalNumRecords;

if (errRate > errorThreshold) {
String message = String.format("Decoding error rate of at least %s/%s crosses configured threshold of %s",
numErrors, totalNumRecords, errorThreshold);
LOG.error(message);
throw new ParquetDecodingException(message, cause);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.scrooge;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;

import org.apache.parquet.hadoop.thrift.TestCorruptThriftRecords;
import org.apache.parquet.hadoop.thrift.ThriftReadSupport;
import org.apache.parquet.scrooge.test.StructWithUnionV2;
import org.apache.parquet.scrooge.test.StructWithUnionV2$;

import static org.junit.Assert.assertEquals;

public class TestCorruptScroogeRecords extends TestCorruptThriftRecords {

@Override
public void setupJob(Job job, Path path) throws Exception {
job.setInputFormatClass(ParquetScroogeInputFormat.class);
ParquetScroogeInputFormat.setInputPaths(job, path);
ParquetScroogeInputFormat.setThriftClass(job.getConfiguration(), StructWithUnionV2.class);


ThriftReadSupport.setRecordConverterClass(job.getConfiguration(), ScroogeRecordConverter.class);

job.setMapperClass(ReadMapper.class);
job.setNumReduceTasks(0);
job.setOutputFormatClass(NullOutputFormat.class);
}

@Override
protected void assertEqualsExcepted(List<org.apache.parquet.thrift.test.compat.StructWithUnionV2> expected, List<Object> found) throws Exception {
List<StructWithUnionV2> scroogeExpected = new ArrayList<StructWithUnionV2>();
for (org.apache.parquet.thrift.test.compat.StructWithUnionV2 tbase : expected) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
TProtocol out = new Factory().getProtocol(new TIOStreamTransport(baos));
tbase.write(out);
TProtocol in = new Factory().getProtocol(new TIOStreamTransport(new ByteArrayInputStream(baos.toByteArray())));
scroogeExpected.add(StructWithUnionV2$.MODULE$.decode(in));
}
assertEquals(scroogeExpected, found);
}
}
34 changes: 34 additions & 0 deletions parquet-scrooge/src/test/thrift/test.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,37 @@ struct StringAndBinary {
1: required string s;
2: required binary b;
}

struct AString {
1: required string s
}

struct ALong {
1: required i64 l
}

struct ABool {
1: required bool b
}

union UnionV2 {
1: AString aString,
2: ALong aLong,
3: ABool aNewBool
}

struct StructWithUnionV2 {
1: required string name,
2: required UnionV2 aUnion
}

struct AStructThatLooksLikeUnionV2 {
1: optional AString aString,
2: optional ALong aLong,
3: optional ABool aNewBool
}

struct StructWithAStructThatLooksLikeUnionV2 {
1: required string name,
2: required AStructThatLooksLikeUnionV2 aNotQuiteUnion
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,27 @@ public class ThriftReadSupport<T> extends ReadSupport<T> {
* implementation creates standard Apache Thrift {@link TBase} objects; to support alternatives, such
* as <a href="http://github.com/twitter/scrooge">Twiter's Scrooge</a>, a custom converter can be specified
* (for example, ScroogeRecordConverter from parquet-scrooge).
*
* @deprecated use {@link #setRecordConverterClass(Configuration, Class)} below
*/
@Deprecated
public static void setRecordConverterClass(JobConf conf,
Class<?> klass) {
setRecordConverterClass((Configuration) conf, klass);
}

/**
* A {@link ThriftRecordConverter} builds an object by working with {@link TProtocol}. The default
* implementation creates standard Apache Thrift {@link TBase} objects; to support alternatives, such
* as <a href="http://github.com/twitter/scrooge">Twiter's Scrooge</a>, a custom converter can be specified
* (for example, ScroogeRecordConverter from parquet-scrooge).
*/
public static void setRecordConverterClass(Configuration conf,
Class<?> klass) {
conf.set(RECORD_CONVERTER_CLASS_KEY, klass.getName());
}


/**
* used from hadoop
* the configuration must contain a "parquet.thrift.read.class" setting
Expand Down
Loading

0 comments on commit 9993450

Please sign in to comment.