Permalink
Browse files

Add AvroTrevniOutputFormat, still untested.

  • Loading branch information...
1 parent 7da4d6b commit e2dfe29e3f5e7a1265f20cbba0c690adbb783770 @cutting committed Apr 16, 2012
View
@@ -52,10 +52,16 @@
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
+ <artifactId>avro-mapred</artifactId>
<version>${avro-version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop-version}</version>
+ <scope>compile</scope>
+ </dependency>
</dependencies>
<build>
@@ -60,6 +60,10 @@ public AvroColumnWriter(Schema s, ColumnFileMetaData meta, GenericData model)
this.model = model;
}
+ /** Return the approximate size of the file that will be written.
+ * Tries to over-estimate. */
+ public long sizeEstimate() { return writer.sizeEstimate(); }
+
/** Write all rows added to the named output stream. */
public void writeTo(OutputStream out) throws IOException {
writer.writeTo(out);
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.trevni.avro;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+import org.apache.avro.Schema;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.mapred.AvroJob;
+import org.apache.avro.mapred.AvroWrapper;
+
+import org.apache.trevni.MetaData;
+import org.apache.trevni.ColumnFileMetaData;
+
+/** An {@link org.apache.hadoop.mapred.OutputFormat} that writes Avro data to
+ * Trevni files. */
+public class AvroTrevniOutputFormat <T>
+ extends FileOutputFormat<AvroWrapper<T>, NullWritable> {
+
+ /** The file name extension for trevni files. */
+ public final static String EXT = ".trv";
+
+ public static final String META_PREFIX = "trevni.meta.";
+
+ /** Add metadata to job output files.*/
+ public static void setMeta(JobConf job, String key, String value) {
+ job.set(META_PREFIX+key, value);
+ }
+
+ @Override
+ public RecordWriter<AvroWrapper<T>, NullWritable>
+ getRecordWriter(FileSystem ignore, final JobConf job,
+ final String name, Progressable prog)
+ throws IOException {
+
+ boolean isMapOnly = job.getNumReduceTasks() == 0;
+ final Schema schema = isMapOnly
+ ? AvroJob.getMapOutputSchema(job)
+ : AvroJob.getOutputSchema(job);
+
+ final ColumnFileMetaData meta = new ColumnFileMetaData();
+ for (Map.Entry<String,String> e : job)
+ if (e.getKey().startsWith(META_PREFIX))
+ meta.put(e.getKey().substring(AvroJob.TEXT_PREFIX.length()),
+ e.getValue().getBytes(MetaData.UTF8));
+
+ final Path dir = FileOutputFormat.getTaskOutputPath(job, name);
+ final FileSystem fs = dir.getFileSystem(job);
+ if (!fs.mkdirs(dir))
+ throw new IOException("Failed to create directory: " + dir);
+ final long blockSize = fs.getDefaultBlockSize();
+
+ return new RecordWriter<AvroWrapper<T>, NullWritable>() {
+ private int part = 0;
+
+ private AvroColumnWriter<T> writer =
+ new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+
+ private void flush() throws IOException {
+ if (writer != null)
+ writer.writeTo(fs.create(new Path(dir, "part-"+(part++)+EXT)));
+ writer = new AvroColumnWriter<T>(schema, meta, ReflectData.get());
+ }
+
+ public void write(AvroWrapper<T> wrapper, NullWritable ignore)
+ throws IOException {
+ writer.write(wrapper.datum());
+ if (writer.sizeEstimate() >= blockSize) // block full
+ flush();
+ }
+ public void close(Reporter reporter) throws IOException {
+ flush();
+ }
+ };
+ }
+
+}
@@ -16,9 +16,8 @@
* limitations under the License.
*/
-package org.apache.trevni.tool;
+package org.apache.trevni.avro;
-import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@@ -23,8 +23,9 @@
class ArrayColumnOutputBuffer extends ColumnOutputBuffer {
private int length; // remaining in current array
- public ArrayColumnOutputBuffer(ColumnMetaData meta) throws IOException {
- super(meta);
+ public ArrayColumnOutputBuffer(ColumnFileWriter writer, ColumnMetaData meta)
+ throws IOException {
+ super(writer, meta);
assert getMeta().isArray() || getMeta().getParent() != null;
assert !getMeta().hasIndexValues();
}
@@ -37,6 +37,7 @@
private long rowCount;
private int columnCount;
+ private long size;
/** Construct given metadata for each column in the file. */
public ColumnFileWriter(ColumnFileMetaData fileMeta,
@@ -49,8 +50,9 @@ public ColumnFileWriter(ColumnFileMetaData fileMeta,
ColumnMetaData c = columnMeta[i];
c.setDefaults(metaData);
columns[i] = c.isArray()
- ? new ArrayColumnOutputBuffer(c)
- : new ColumnOutputBuffer(c);
+ ? new ArrayColumnOutputBuffer(this, c)
+ : new ColumnOutputBuffer(this, c);
+ size += OutputBuffer.BLOCK_SIZE; // over-estimate
}
}
@@ -68,6 +70,12 @@ private void checkColumns(ColumnMetaData[] columnMeta) {
}
}
+ void incrementSize(int n) { size += n; }
+
+ /** Return the approximate size of the file that will be written.
+ * Tries to over-estimate. */
+ public long sizeEstimate() { return size; }
+
/** Return this file's metadata. */
public ColumnFileMetaData getMetaData() { return metaData; }
@@ -24,6 +24,7 @@
import java.util.List;
class ColumnOutputBuffer {
+ private ColumnFileWriter writer;
private ColumnMetaData meta;
private Codec codec;
private Checksum checksum;
@@ -32,8 +33,11 @@
private List<byte[]> blockData;
private List<byte[]> firstValues;
private int rowCount;
+ private long size = 4; // room for block count
- public ColumnOutputBuffer(ColumnMetaData meta) throws IOException {
+ public ColumnOutputBuffer(ColumnFileWriter writer, ColumnMetaData meta)
+ throws IOException {
+ this.writer = writer;
this.meta = meta;
this.codec = Codec.get(meta);
this.checksum = Checksum.get(meta);
@@ -80,21 +84,22 @@ private void flushBuffer() throws IOException {
data.put(checksum.compute(raw));
blockData.add(data.array());
+ int sizeIncrement =
+ (4*3) // descriptor
+ + (firstValues != null // firstValue
+ ? firstValues.get(firstValues.size()-1).length
+ : 0)
+ + data.position(); // data
+
+ writer.incrementSize(sizeIncrement);
+ size += sizeIncrement;
+
buffer = new OutputBuffer();
rowCount = 0;
}
public long size() throws IOException {
flushBuffer();
- long size = 4; // count of blocks
- size += 4 * 3 * blockDescriptors.size(); // descriptors
-
- if (meta.hasIndexValues()) // first values
- for (byte[] value : firstValues)
- size += value.length;
-
- for (byte[] data : blockData)
- size += data.length; // data
return size;
}
@@ -30,7 +30,7 @@
static final String CODEC_KEY = RESERVED_KEY_PREFIX + "codec";
static final String CHECKSUM_KEY = RESERVED_KEY_PREFIX + "checksum";
- private static final Charset UTF8 = Charset.forName("UTF-8");
+ public static final Charset UTF8 = Charset.forName("UTF-8");
private MetaData<?> defaults;
View
@@ -82,6 +82,11 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>trevni-avro</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${avro-version}</version>
@@ -91,86 +96,6 @@
<artifactId>jackson-core-asl</artifactId>
<version>${jackson-version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- <version>${hadoop-version}</version>
- <optional>true</optional>
- <exclusions>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jetty-util</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-api-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>jsp-2.1</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-compiler</artifactId>
- </exclusion>
- <exclusion>
- <groupId>tomcat</groupId>
- <artifactId>jasper-runtime</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.java.dev.jets3t</groupId>
- <artifactId>jets3t</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-httpclient</groupId>
- <artifactId>commons-httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-el</groupId>
- <artifactId>commons-el</artifactId>
- </exclusion>
- <exclusion>
- <groupId>xmlenc</groupId>
- <artifactId>xmlenc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.mortbay.jetty</groupId>
- <artifactId>servlet-api-2.5</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.eclipse.jdt</groupId>
- <artifactId>core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>hsqldb</groupId>
- <artifactId>hsqldb</artifactId>
- </exclusion>
- <exclusion>
- <groupId>oro</groupId>
- <artifactId>oro</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
</project>
@@ -29,6 +29,7 @@
import java.net.URI;
import org.apache.trevni.Input;
+import org.apache.trevni.avro.HadoopInput;
import org.apache.trevni.InputFile;
import org.apache.hadoop.conf.Configuration;

0 comments on commit e2dfe29

Please sign in to comment.