Join GitHub today
GitHub is home to over 20 million developers working together to host and review code, manage projects, and build software together.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
Already on GitHub? Sign in to your account
Filtered Reader Implementation, Avro Specific Support #68
Conversation
JacobMetcalf
added some commits
Jun 22, 2013
dvryaboy
and 2 others
commented on an outdated diff
Jun 23, 2013
| @@ -44,6 +45,23 @@ public AvroParquetWriter(Path file, Schema avroSchema, | ||
| compressionCodecName, blockSize, pageSize); | ||
| } | ||
| + /** Create a new {@link AvroParquetWriter}. | ||
| + * | ||
| + * @param file | ||
| + * @param avroSchema | ||
| + * @param compressionCodecName | ||
| + * @param blockSize | ||
| + * @param pageSize | ||
| + * @throws IOException | ||
| + */ | ||
| + public AvroParquetWriter(Path file, Schema avroSchema, | ||
| + CompressionCodecName compressionCodecName, int blockSize, | ||
| + int pageSize, boolean enableDictionary) throws IOException { | ||
| + super(file, (WriteSupport<T>) | ||
| + new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema), | ||
| + compressionCodecName, blockSize, pageSize,enableDictionary,false); |
JacobMetcalf
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| @@ -18,6 +18,7 @@ | ||
| import java.util.Map; | ||
| import org.apache.avro.Schema; | ||
| import org.apache.avro.generic.GenericRecord; |
dvryaboy
Contributor
|
dvryaboy
commented on the diff
Jun 23, 2013
| @@ -25,7 +26,7 @@ | ||
| /** | ||
| * A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files. | ||
| */ | ||
| -public class AvroParquetOutputFormat extends ParquetOutputFormat<GenericRecord> { | ||
| +public class AvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> { |
dvryaboy
Contributor
|
dvryaboy
commented on the diff
Jun 23, 2013
| @@ -25,7 +26,7 @@ | ||
| /** | ||
| * Write Avro records to a Parquet file. | ||
| */ | ||
| -public class AvroParquetWriter<T> extends ParquetWriter<T> { | ||
| +public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T> { |
JacobMetcalf
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| +import org.junit.Test; | ||
| +import parquet.hadoop.ParquetReader; | ||
| +import parquet.hadoop.ParquetWriter; | ||
| +import parquet.hadoop.metadata.CompressionCodecName; | ||
| + | ||
| +import java.io.File; | ||
| +import java.io.IOException; | ||
| + | ||
| +import static org.junit.Assert.assertEquals; | ||
| +import static org.junit.Assert.assertNull; | ||
| +import static parquet.filter.ColumnRecordFilter.column; | ||
| +import static parquet.filter.ColumnRecordFilter.equalTo; | ||
| +import static parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE; | ||
| +import static parquet.hadoop.ParquetWriter.DEFAULT_PAGE_SIZE; | ||
| + | ||
| +public class TestSpecificReadWrite { |
dvryaboy
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| @@ -44,6 +45,23 @@ public AvroParquetWriter(Path file, Schema avroSchema, | ||
| compressionCodecName, blockSize, pageSize); | ||
| } | ||
| + /** Create a new {@link AvroParquetWriter}. | ||
| + * | ||
| + * @param file | ||
| + * @param avroSchema | ||
| + * @param compressionCodecName | ||
| + * @param blockSize | ||
| + * @param pageSize |
|
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| + } | ||
| + | ||
| + | ||
| + private Path writeCarsToParquetFile( int num, boolean varyYear, | ||
| + CompressionCodecName compression, boolean enableDictionary) throws IOException { | ||
| + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); | ||
| + tmp.deleteOnExit(); | ||
| + tmp.delete(); | ||
| + Path path = new Path(tmp.getPath()); | ||
| + | ||
| + Car vwPolo = getVwPolo(); | ||
| + Car vwPassat = getVwPassat(); | ||
| + Car bmwMini = getBmwMini(); | ||
| + | ||
| + ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path, Car.SCHEMA$, compression, | ||
| + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE,enableDictionary); |
|
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| + | ||
| + | ||
| + private Path writeCarsToParquetFile( int num, boolean varyYear, | ||
| + CompressionCodecName compression, boolean enableDictionary) throws IOException { | ||
| + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); | ||
| + tmp.deleteOnExit(); | ||
| + tmp.delete(); | ||
| + Path path = new Path(tmp.getPath()); | ||
| + | ||
| + Car vwPolo = getVwPolo(); | ||
| + Car vwPassat = getVwPassat(); | ||
| + Car bmwMini = getBmwMini(); | ||
| + | ||
| + ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path, Car.SCHEMA$, compression, | ||
| + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE,enableDictionary); | ||
| + for ( int i =0; i < num; i++ ) { |
|
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| + | ||
| + private Path writeCarsToParquetFile( int num, boolean varyYear, | ||
| + CompressionCodecName compression, boolean enableDictionary) throws IOException { | ||
| + File tmp = File.createTempFile(getClass().getSimpleName(), ".tmp"); | ||
| + tmp.deleteOnExit(); | ||
| + tmp.delete(); | ||
| + Path path = new Path(tmp.getPath()); | ||
| + | ||
| + Car vwPolo = getVwPolo(); | ||
| + Car vwPassat = getVwPassat(); | ||
| + Car bmwMini = getBmwMini(); | ||
| + | ||
| + ParquetWriter<Car> writer = new AvroParquetWriter<Car>(path, Car.SCHEMA$, compression, | ||
| + DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE,enableDictionary); | ||
| + for ( int i =0; i < num; i++ ) { | ||
| + if (varyYear ) { |
|
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| + @Test | ||
| + public void testFilterMatchesMultiple() throws IOException { | ||
| + | ||
| + Path path = writeCarsToParquetFile(10, false, CompressionCodecName.UNCOMPRESSED, false); | ||
| + | ||
| + ParquetReader<Car> reader = new AvroParquetReader<Car>(path, column("make", equalTo("Volkswagen"))); | ||
| + for ( int i =0; i < 10; i++ ) { | ||
| + assertEquals(reader.read().toString(), getVwPolo().toString()); | ||
| + assertEquals(reader.read().toString(), getVwPassat().toString()); | ||
| + } | ||
| + assertNull( reader.read()); | ||
| + } | ||
| + | ||
| + | ||
| + private Path writeCarsToParquetFile( int num, boolean varyYear, | ||
| + CompressionCodecName compression, boolean enableDictionary) throws IOException { |
dvryaboy
Contributor
|
dvryaboy
and 2 others
commented on an outdated diff
Jun 23, 2013
| @@ -25,6 +25,11 @@ | ||
| <version>1.7</version> | ||
| <scope>compile</scope> | ||
| </dependency> | ||
| + <dependency> |
dvryaboy
Contributor
|
dvryaboy
commented on the diff
Jun 23, 2013
| * reads the current value | ||
| */ | ||
| public void readCurrentValue() { | ||
| binding.read(); | ||
| } | ||
| - protected void checkValueRead() { | ||
| + /** | ||
| + * Reads the value into the binding or skips forwards. |
dvryaboy
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| + | ||
| + return ( filterOnColumn.isFullyConsumed()) ? false : filterPredicate.apply( filterOnColumn ); | ||
| + } | ||
| + | ||
| + /** | ||
| + * @return true if the column we are filtering on has no more values. | ||
| + */ | ||
| + @Override | ||
| + public boolean isFullyConsumed() { | ||
| + return filterOnColumn.isFullyConsumed(); | ||
| + } | ||
| + | ||
| + /** | ||
| + * Predicate for string equality | ||
| + */ | ||
| + public static final Predicate<ColumnReader> equalTo( final String value ) { |
dvryaboy
Contributor
|
dvryaboy
commented on an outdated diff
Jun 23, 2013
| + return new PagedRecordFilter( startPos, pageSize ); | ||
| + } | ||
| + }; | ||
| + } | ||
| + | ||
| + /** | ||
| + * Private constructor, use column() instead. | ||
| + */ | ||
| + private PagedRecordFilter(long startPos, long pageSize) { | ||
| + this.startPos = startPos; | ||
| + this.endPos = startPos + pageSize; | ||
| + } | ||
| + | ||
| + /** | ||
| + * Terminate early when we have got our page. Later we will want a row count and this | ||
| + * will be no good. |
dvryaboy
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| @@ -58,8 +60,19 @@ | ||
| this, | ||
| recordMaterializer, | ||
| validating, | ||
| - new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()) | ||
| - ); | ||
| + new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()), | ||
| + RecordFilter.NULL_FILTER |
dvryaboy
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| @@ -356,6 +361,10 @@ public int compare(Case o1, Case o2) { | ||
| Collections.sort(state.definedCases, caseComparator); | ||
| Collections.sort(state.undefinedCases, caseComparator); | ||
| } | ||
| + | ||
| + // We need to make defensive copy to stop interference but as an optimisation don't bother if null |
dvryaboy
Contributor
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| +import parquet.example.data.simple.convert.GroupRecordConverter; | ||
| +import parquet.io.api.RecordMaterializer; | ||
| + | ||
| +import static org.junit.Assert.assertEquals; | ||
| +import static org.junit.Assert.assertNull; | ||
| +import static parquet.example.Paper.r1; | ||
| +import static parquet.example.Paper.r2; | ||
| +import static parquet.example.Paper.schema; | ||
| +import static parquet.filter.AndRecordFilter.and; | ||
| +import static parquet.filter.PagedRecordFilter.page; | ||
| +import static parquet.filter.ColumnRecordFilter.equalTo; | ||
| +import static parquet.filter.ColumnRecordFilter.column; | ||
| + | ||
| +public class TestFiltered { | ||
| + | ||
| + private static final Log LOG = Log.getLog(TestColumnIO.class); |
|
|
dvryaboy
and 1 other
commented on an outdated diff
Jun 23, 2013
| @@ -53,7 +59,7 @@ public ParquetReader(Path file, ReadSupport<T> readSupport) throws IOException { | ||
| MessageType schema = fileMetaData.getSchema(); | ||
| Map<String, String> extraMetadata = fileMetaData.getKeyValueMetaData(); | ||
| final ReadContext readContext = readSupport.init(conf, extraMetadata, schema); | ||
| - reader = new ParquetRecordReader<T>(readSupport); | ||
| + reader = new ParquetRecordReader<T>(readSupport,filter); |
|
|
|
This is very nice, Jacob! |
julienledem
and 2 others
commented on an outdated diff
Jun 29, 2013
| @@ -111,7 +113,9 @@ public Converter getConverter(int fieldIndex) { | ||
| @Override | ||
| public void start() { | ||
| - this.currentRecord = new GenericData.Record(avroSchema); | ||
| + // Should do the right thing whether it is generic or specific | ||
| + this.currentRecord = | ||
| + (IndexedRecord) SpecificData.get().newRecord((IndexedRecord)null, avroSchema); |
julienledem
Owner
|
julienledem
commented on the diff
Jun 29, 2013
| @@ -98,4 +98,14 @@ | ||
| * @return the current value | ||
| */ | ||
| double getDouble(); | ||
| + | ||
| + /** | ||
| + * @return Descriptor of the column. | ||
| + */ | ||
| + ColumnDescriptor getDescriptor(); | ||
| + | ||
| + /** | ||
| + * Skip the current value | ||
| + */ | ||
| + void skip(); |
JacobMetcalf
Contributor
|
julienledem
and 1 other
commented on an outdated diff
Jun 29, 2013
julienledem
commented on the diff
Jun 29, 2013
| @@ -46,6 +46,15 @@ public float readFloat() { | ||
| } | ||
| @Override | ||
| + public void skipFloat() { |
julienledem
Owner
|
julienledem
and 1 other
commented on an outdated diff
Jun 29, 2013
julienledem
and 1 other
commented on an outdated diff
Jun 29, 2013
| @@ -375,6 +384,12 @@ private RecordConsumer wrap(RecordConsumer recordConsumer) { | ||
| */ | ||
| @Override | ||
| public T read() { | ||
| + // Skip forwards until the filter matches a record | ||
| + if ( !skipToMatch()) { | ||
| + return null; | ||
| + } |
julienledem
Owner
|
julienledem
and 1 other
commented on an outdated diff
Jun 29, 2013
| +import parquet.example.data.Group; | ||
| +import parquet.example.data.GroupWriter; | ||
| +import parquet.example.data.simple.convert.GroupRecordConverter; | ||
| +import parquet.io.api.RecordMaterializer; | ||
| + | ||
| +import static org.junit.Assert.assertEquals; | ||
| +import static org.junit.Assert.assertNull; | ||
| +import static parquet.example.Paper.r1; | ||
| +import static parquet.example.Paper.r2; | ||
| +import static parquet.example.Paper.schema; | ||
| +import static parquet.filter.AndRecordFilter.and; | ||
| +import static parquet.filter.PagedRecordFilter.page; | ||
| +import static parquet.filter.ColumnRecordFilter.equalTo; | ||
| +import static parquet.filter.ColumnRecordFilter.column; | ||
| + | ||
| +public class TestFiltered { |
julienledem
Owner
|
|
Great work Jacob. |
dvryaboy
commented on the diff
Jun 30, 2013
dvryaboy
commented on the diff
Jun 30, 2013
| @@ -0,0 +1,51 @@ | ||
| +package parquet.filter; |
dvryaboy
Contributor
|
|
Have tried to incorporate all your comments. Let me know what you think. |
JacobMetcalf
added some commits
Jul 6, 2013
|
Have updated for above comments, included APL headers and caught up with master. Let me know if there is anything else needed to pull this into master, |
julienledem
commented on the diff
Jul 8, 2013
| @@ -239,6 +249,26 @@ final public void addBinary(Binary value) { | ||
| } | ||
| + static final class FieldEnumConverter extends PrimitiveConverter { | ||
| + | ||
| + private final ParentValueContainer parent; | ||
| + private final Class<? extends Enum> enumClass; | ||
| + | ||
| + public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema) { | ||
| + this.parent = parent; | ||
| + this.enumClass = SpecificData.get().getClass(enumSchema); | ||
| + } | ||
| + | ||
| + @Override | ||
| + final public void addBinary(Binary value) { | ||
| + Object enumValue = value.toStringUsingUTF8(); | ||
| + if (enumClass != null) { |
|
|
julienledem
commented on the diff
Jul 8, 2013
| } | ||
| throw new UnsupportedOperationException("Cannot convert Avro type " + type); | ||
| } | ||
| + private Type convertUnion(String fieldName, Schema schema, Type.Repetition repetition) { | ||
| + List<Schema> nonNullSchemas = new ArrayList(schema.getTypes().size()); | ||
| + for (Schema childSchema : schema.getTypes()) { | ||
| + if (childSchema.getType().equals(Schema.Type.NULL)) { | ||
| + repetition = Type.Repetition.OPTIONAL; | ||
| + } else { | ||
| + nonNullSchemas.add(childSchema); | ||
| + } | ||
| + } | ||
| + // If we only get a null and one other type then its a simple optional field | ||
| + // otherwise construct a union container | ||
| + switch (nonNullSchemas.size()) { | ||
| + case 0: | ||
| + throw new UnsupportedOperationException("Cannot convert Avro union of only nulls"); |
julienledem
Owner
|
julienledem
commented on the diff
Jul 8, 2013
| - new AvroSchemaConverter().convert(schema); | ||
| + // Avro union is modelled using optional data members of thw different types; | ||
| + testConversion( | ||
| + schema, | ||
| + "message record2 {\n" + | ||
| + " optional group myunion {\n" + |
julienledem
Owner
|
julienledem
commented on the diff
Jul 8, 2013
| + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| + * See the License for the specific language governing permissions and | ||
| + * limitations under the License. | ||
| + */ | ||
| +package parquet.io; | ||
| + | ||
| +import parquet.column.ColumnReader; | ||
| +import parquet.column.impl.ColumnReadStoreImpl; | ||
| +import parquet.filter.RecordFilter; | ||
| +import parquet.filter.UnboundRecordFilter; | ||
| +import parquet.io.api.RecordMaterializer; | ||
| + | ||
| +import java.util.Arrays; | ||
| + | ||
| +/** | ||
| + * Extends the |
|
|
|
Great contribution @JacobMetcalf ! |
|
Folks, what's left to be done to merge this? Looks like some refactor diverged it from master. |
|
The only missing part is to integrate the change to PlainValuesReader so that ValuesReader has a single split() method. Also I had a comment about making the parent of the union required. |
|
@julienledem Please integrate with master if you have time. If not I will do at the weekend. The comment for FilteredRecordReader should be: "Extends the record reader to add filtering capabilities. User supplies a filter if the current record being read does not match the filter then the reader skips to the next record." The comment for FieldEnumConverter would be "enumClass will be blank in the case of AvroGeneric". |
|
I just merged master into @JacobMetcalf 's contribution there: #89 |
julienledem
closed this
Jul 13, 2013
|
I just merged. Thank you @JacobMetcalf, I'm looking forward to your next contribution! |
|
Thanks @JacobMetcalf ! |
julienledem
added a commit
that referenced
this pull request
Oct 2, 2014
|
|
rdblue + julienledem |
be1222e
|
abayer
pushed a commit
to cloudera/parquet-mr
that referenced
this pull request
Nov 24, 2014
|
|
rdblue + |
5e6f922
|
abayer
pushed a commit
to cloudera/parquet-mr
that referenced
this pull request
Dec 21, 2014
|
|
rdblue + tomwhite |
2d63011
|
cloudera-hudson
pushed a commit
to cloudera/parquet-mr
that referenced
this pull request
Apr 27, 2015
|
|
rdblue + rdblue |
70ddf00
|
JacobMetcalf commentedJun 23, 2013
As discussed on groups I have implemented: