Skip to content

Commit

Permalink
Improve javadoc & fix input format to handle sub-schemas.
Browse files Browse the repository at this point in the history
  • Loading branch information
cutting committed Apr 18, 2012
1 parent f22eb77 commit d02bdf5
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@

import static org.apache.trevni.avro.AvroColumnator.isSimple;

/** Write Avro records to a Trevni column file. Each primitive type is written
* to a separate column. */
/** Write Avro records to a Trevni column file.
*
* <p>Each primitive type is written to a separate column.
*
* <p>Output is buffered until {@link #writeTo(OutputStream)} is called. The
* {@link #sizeEstimate()} indicates both the amount of data buffered and the
* size of the file that will be written.
*/
public class AvroColumnWriter<D> {
private Schema schema;
private GenericData model;
Expand All @@ -60,8 +66,10 @@ public AvroColumnWriter(Schema s, ColumnFileMetaData meta, GenericData model)
this.model = model;
}

/** Return the approximate size of the file that will be written.
* Tries to over-estimate. */
/** Return the approximate size of the file that will be written. Tries to
* slightly over-estimate. Indicates both the size in memory of the buffered
* data as well as the size of the file that will be written by {@link
* #writeTo(OutputStream)}. */
public long sizeEstimate() { return writer.sizeEstimate(); }

/** Write all rows added to the named output stream. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@
import org.apache.hadoop.mapred.RecordReader;

import org.apache.avro.reflect.ReflectData;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.AvroWrapper;

/** An {@link org.apache.hadoop.mapred.InputFormat} for Trevni files */
/** An {@link org.apache.hadoop.mapred.InputFormat} for Trevni files.
*
* <p>A subset schema to be read may be specified with {@link
* AvroJob#setInputSchema(Schema)}.
*/
public class AvroTrevniInputFormat<T>
extends FileInputFormat<AvroWrapper<T>, NullWritable> {

Expand All @@ -61,11 +66,15 @@ protected FileStatus[] listStatus(JobConf job) throws IOException {
Reporter reporter) throws IOException {
final FileSplit file = (FileSplit)split;
reporter.setStatus(file.toString());

final AvroColumnReader.Params params =
new AvroColumnReader.Params(new HadoopInput(file.getPath(), job));
params.setModel(ReflectData.get());
if (job.get(AvroJob.INPUT_SCHEMA) != null)
params.setSchema(AvroJob.getInputSchema(job));

return new RecordReader<AvroWrapper<T>, NullWritable>() {
private AvroColumnReader<T> reader =
new AvroColumnReader<T>
(new AvroColumnReader.Params(new HadoopInput(file.getPath(), job))
.setModel(ReflectData.get()));
private AvroColumnReader<T> reader = new AvroColumnReader<T>(params);
private float rows = reader.getRowCount();
private long row;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,13 @@
import org.apache.trevni.ColumnFileMetaData;

/** An {@link org.apache.hadoop.mapred.OutputFormat} that writes Avro data to
* Trevni files. */
* Trevni files.
*
* <p>Writes a directory of files per task, each comprising a single filesystem
* block. To reduce the number of files, increase the default filesystem block
* size for the job. Each task also requires enough memory to buffer a
* filesystem block.
*/
public class AvroTrevniOutputFormat <T>
extends FileOutputFormat<AvroWrapper<T>, NullWritable> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.avro.mapred.AvroMapper;
Expand Down Expand Up @@ -110,18 +111,18 @@ public void testOutputFormat() throws Exception {

private static long total;

public static class Counter extends AvroMapper<Pair<Void,Long>,Void> {
@Override public void map(Pair<Void,Long> p, AvroCollector<Void> collector,
public static class Counter extends AvroMapper<GenericRecord,Void> {
@Override public void map(GenericRecord r, AvroCollector<Void> collector,
Reporter reporter) throws IOException {
total += p.value();
total += (Long)r.get("value");
}
}

public void testInputFormat() throws Exception {
JobConf job = new JobConf();

Schema subSchema = Schema.parse("{\"type\":\"record\"," +
"\"name\":\"org.apache.avro.mapred.Pair\","+
"\"name\":\"PairValue\","+
"\"fields\": [ " +
"{\"name\":\"value\", \"type\":\"long\"}" +
"]}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ private void checkColumns(ColumnMetaData[] columnMeta) {

void incrementSize(int n) { size += n; }

/** Return the approximate size of the file that will be written.
* Tries to over-estimate. */
/** Return the approximate size of the file that will be written. Tries to
* slightly over-estimate. Indicates both the size in memory of the buffered
* data as well as the size of the file that will be written by {@link
* #writeTo(OutputStream)}. */
public long sizeEstimate() { return size; }

/** Return this file's metadata. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
import org.codehaus.jackson.JsonEncoding;
import org.codehaus.jackson.util.MinimalPrettyPrinter;

/** Tool to read Trevni files and print them as JSON.
* This can read any Trevni file. Nested structure is reconstructed from the
* columns rather than any schema information.
*/
/** Tool to print Trevni file metadata as JSON. */
public class MetadataTool implements Tool {
static final JsonFactory FACTORY = new JsonFactory();

Expand Down

0 comments on commit d02bdf5

Please sign in to comment.