Permalink
Browse files

Merge pull request #285 from mickaellcr/cascading_sink

[CASCADING] Provide the sink implementation for ParquetTupleScheme
  • Loading branch information...
2 parents d356578 + 509e268 commit 6063921a37a77d9cd29eab64be3f91146ac52a15 @julienledem julienledem committed Feb 28, 2014
@@ -20,12 +20,9 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.fs.Path;
import parquet.hadoop.ParquetInputFormat;
- import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.Footer;
- import parquet.hadoop.metadata.ParquetMetadata;
import parquet.hadoop.mapred.Container;
import parquet.hadoop.mapred.DeprecatedParquetInputFormat;
import parquet.schema.MessageType;
@@ -40,6 +37,9 @@
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Tuple;
import cascading.tuple.Fields;
+ import cascading.tuple.TupleEntry;
+ import parquet.hadoop.ParquetOutputFormat;
+ import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
/**
* A Cascading Scheme that converts Parquet groups into Cascading tuples.
@@ -56,6 +56,7 @@
public class ParquetTupleScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]>{
private static final long serialVersionUID = 0L;
+ private String parquetSchema;
public ParquetTupleScheme() {
super();
@@ -65,6 +66,20 @@ public ParquetTupleScheme(Fields sourceFields) {
super(sourceFields);
}
+ /**
+ * ParquetTupleScheme constructor used a sink need to be implemented
+ *
+ * @param sourceFields used for the reading step
+ * @param sinkFields used for the writing step
+ * @param schema is mandatory if you add sinkFields and needs to be the
+ * toString() from a MessageType. This value is going to be parsed when the
+ * parquet file will be created.
+ */
+ public ParquetTupleScheme(Fields sourceFields, Fields sinkFields, final String schema) {
+ super(sourceFields, sinkFields);
+ parquetSchema = schema;
+ }
+
@SuppressWarnings("rawtypes")
@Override
public void sourceConfInit(FlowProcess<JobConf> fp,
@@ -130,19 +145,23 @@ public boolean source(FlowProcess<JobConf> fp, SourceCall<Object[], RecordReader
@SuppressWarnings("rawtypes")
@Override
- public void sinkConfInit(FlowProcess<JobConf> arg0,
- Tap<JobConf, RecordReader, OutputCollector> arg1, JobConf arg2) {
- throw new UnsupportedOperationException("ParquetTupleScheme does not support Sinks");
-
+ public void sinkConfInit(FlowProcess<JobConf> fp,
+ Tap<JobConf, RecordReader, OutputCollector> tap, JobConf jobConf) {
+ jobConf.setOutputFormat(DeprecatedParquetOutputFormat.class);
+ jobConf.set(TupleWriteSupport.PARQUET_CASCADING_SCHEMA, parquetSchema);
+ ParquetOutputFormat.setWriteSupportClass(jobConf, TupleWriteSupport.class);
}
@Override
- public boolean isSink() { return false; }
-
+ public boolean isSink() {
+ return parquetSchema != null;
+ }
@Override
- public void sink(FlowProcess<JobConf> arg0, SinkCall<Object[], OutputCollector> arg1)
- throws IOException {
- throw new UnsupportedOperationException("ParquetTupleScheme does not support Sinks");
+ public void sink(FlowProcess<JobConf> fp, SinkCall<Object[], OutputCollector> sink)
+ throws IOException {
+ TupleEntry tuple = sink.getOutgoingEntry();
+ OutputCollector outputCollector = sink.getOutput();
+ outputCollector.collect(null, tuple);
}
}
@@ -0,0 +1,103 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package parquet.cascading;
+
+import cascading.tuple.TupleEntry;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Type;
+
+/**
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ */
+public class TupleWriteSupport extends WriteSupport<TupleEntry> {
+
+ private RecordConsumer recordConsumer;
+ private MessageType rootSchema;
+ public static final String PARQUET_CASCADING_SCHEMA = "parquet.cascading.schema";
+
+ @Override
+ public WriteContext init(Configuration configuration) {
+ String schema = configuration.get(PARQUET_CASCADING_SCHEMA);
+ rootSchema = MessageTypeParser.parseMessageType(schema);
+ return new WriteContext(rootSchema, new HashMap<String, String>());
+ }
+
+ @Override
+ public void prepareForWrite(RecordConsumer recordConsumer) {
+ this.recordConsumer = recordConsumer;
+ }
+
+ @Override
+ public void write(TupleEntry record) {
+ recordConsumer.startMessage();
+ final List<Type> fields = rootSchema.getFields();
+
+ for (int i = 0; i < fields.size(); i++) {
+ Type field = fields.get(i);
+
+ if (record == null || record.getObject(field.getName()) == null) {
+ continue;
+ }
+ recordConsumer.startField(field.getName(), i);
+ if (field.isPrimitive()) {
+ writePrimitive(record, field.asPrimitiveType());
+ } else {
+ throw new UnsupportedOperationException("Complex type not implemented");
+ }
+ recordConsumer.endField(field.getName(), i);
+ }
+ recordConsumer.endMessage();
+ }
+
+ private void writePrimitive(TupleEntry record, PrimitiveType field) {
+ switch (field.getPrimitiveTypeName()) {
+ case BINARY:
+ recordConsumer.addBinary(Binary.fromString(record.getString(field.getName())));
+ break;
+ case BOOLEAN:
+ recordConsumer.addBoolean(record.getBoolean(field.getName()));
+ break;
+ case INT32:
+ recordConsumer.addInteger(record.getInteger(field.getName()));
+ break;
+ case INT64:
+ recordConsumer.addLong(record.getLong(field.getName()));
+ break;
+ case DOUBLE:
+ recordConsumer.addDouble(record.getDouble(field.getName()));
+ break;
+ case FLOAT:
+ recordConsumer.addFloat(record.getFloat(field.getName()));
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ throw new UnsupportedOperationException("Fixed len byte array type not implemented");
+ case INT96:
+ throw new UnsupportedOperationException("Int96 type not implemented");
+ default:
+ throw new UnsupportedOperationException(field.getName() + " type not implemented");
+ }
+ }
+}
@@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -101,6 +102,10 @@ public static void setWriteSupportClass(Job job, Class<?> writeSupportClass) {
getConfiguration(job).set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
}
+ public static void setWriteSupportClass(JobConf job, Class<?> writeSupportClass) {
+ job.set(WRITE_SUPPORT_CLASS, writeSupportClass.getName());
+ }
+
public static Class<?> getWriteSupportClass(Configuration configuration) {
final String className = configuration.get(WRITE_SUPPORT_CLASS);
if (className == null) {

0 comments on commit 6063921

Please sign in to comment.