Skip to content

Commit

Permalink
Make partition-collapsing job support joins on inconsistent schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Hayes committed Oct 17, 2013
1 parent bbbd24b commit 6b1f496
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Properties;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
Expand All @@ -35,6 +36,8 @@

import datafu.hourglass.avro.AvroDateRangeMetadata;
import datafu.hourglass.avro.AvroKeyWithMetadataOutputFormat;
import datafu.hourglass.avro.AvroMultipleInputsKeyInputFormat;
import datafu.hourglass.avro.AvroMultipleInputsUtil;
import datafu.hourglass.fs.DatePath;
import datafu.hourglass.fs.DateRange;
import datafu.hourglass.fs.PathUtils;
Expand Down Expand Up @@ -436,7 +439,7 @@ private void execute() throws IOException, InterruptedException, ClassNotFoundEx
for (DatePath inputPath : planner.getNewInputsToProcess())
{
_log.info(inputPath.getPath());
MultipleInputs.addInputPath(job, inputPath.getPath(), AvroKeyInputFormat.class, DelegatingMapper.class);
MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
}
}

Expand All @@ -446,7 +449,7 @@ private void execute() throws IOException, InterruptedException, ClassNotFoundEx
for (DatePath inputPath : planner.getOldInputsToProcess())
{
_log.info(inputPath.getPath());
MultipleInputs.addInputPath(job, inputPath.getPath(), AvroKeyInputFormat.class, DelegatingMapper.class);
MultipleInputs.addInputPath(job, inputPath.getPath(), AvroMultipleInputsKeyInputFormat.class, DelegatingMapper.class);
}
}

Expand All @@ -463,11 +466,19 @@ private void execute() throws IOException, InterruptedException, ClassNotFoundEx

AvroDateRangeMetadata.configureOutputDateRange(conf, planner.getCurrentDateRange());

PartitionCollapsingSchemas spSchemas = new PartitionCollapsingSchemas(getSchemas(), planner.getInputSchemas(), getOutputSchemaName(), getOutputSchemaNamespace());
PartitionCollapsingSchemas spSchemas = new PartitionCollapsingSchemas(getSchemas(), planner.getInputSchemasByPath(), getOutputSchemaName(), getOutputSchemaNamespace());

job.setOutputFormatClass(AvroKeyWithMetadataOutputFormat.class);

AvroJob.setInputKeySchema(job, spSchemas.getMapInputSchema());
_log.info("Setting input path to schema mappings");
for (String path : spSchemas.getMapInputSchemas().keySet())
{
Schema schema = spSchemas.getMapInputSchemas().get(path);
_log.info("*** " + path);
_log.info("*** => " + schema.toString());
AvroMultipleInputsUtil.setInputKeySchemaForPath(job, schema, path);
}

AvroJob.setMapOutputKeySchema(job, spSchemas.getMapOutputKeySchema());
AvroJob.setMapOutputValueSchema(job, spSchemas.getMapOutputValueSchema());
AvroJob.setOutputKeySchema(job, spSchemas.getReduceOutputSchema());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class PartitionCollapsingExecutionPlanner extends ExecutionPlanner
private Map<Date,List<DatePath>> _inputsToProcessByDate = new HashMap<Date,List<DatePath>>();
private DatePath _previousOutputToProcess;
private List<Schema> _inputSchemas = new ArrayList<Schema>();
private Map<String,Schema> _inputSchemasByPath = new HashMap<String,Schema>();
private boolean _needAnotherPass;
private DateRange _currentDateRange;
private boolean _planExists;
Expand Down Expand Up @@ -234,6 +235,18 @@ public List<Schema> getInputSchemas()
return _inputSchemas;
}

/**
* Gets a map from input path to schema. Because multiple inputs are allowed, there may be multiple schemas.
* Must call {@link #createPlan()} first.
*
* @return map from path to input schema
*/
public Map<String,Schema> getInputSchemasByPath()
{
checkPlanExists();
return _inputSchemasByPath;
}

/**
* Determines the number of reducers to use based on the input data size and the previous output,
* if it exists and is being reused.
Expand Down Expand Up @@ -277,7 +290,9 @@ private void determineInputSchemas() throws IOException
List<DatePath> lastInputs = _inputsToProcessByDate.get(lastDate);
for (DatePath input : lastInputs)
{
_inputSchemas.add(PathUtils.getSchemaFromPath(getFileSystem(),input.getPath()));
Schema schema = PathUtils.getSchemaFromPath(getFileSystem(),input.getPath());
_inputSchemas.add(schema);
_inputSchemasByPath.put(input.getPath().toString(), schema);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
Expand All @@ -39,28 +41,28 @@ public class PartitionCollapsingSchemas implements Serializable
{
private static String DATED_INTERMEDIATE_VALUE_SCHEMA_NAME = "DatedMapValue";
private static String KEY_SCHEMA = "key.schema";
private static String INPUT_SCHEMA = "input.schema";
private static String INTERMEDIATE_VALUE_SCHEMA = "intermediate.value.schema";
private static String OUTPUT_VALUE_SCHEMA = "output.value.schema";

private final String _outputSchemaName;
private final String _outputSchemaNamespace;
private transient Schema _keySchema;
private transient Schema _intermediateValueSchema;
private transient Schema _inputSchema;
private transient Schema _outputValueSchema;

// generated schemas
private transient Schema _mapOutputSchema;
private transient Schema _dateIntermediateValueSchema;
private transient Schema _mapOutputValueSchema;
private transient Schema _reduceOutputSchema;
private transient Schema _mapInputSchema;
private transient Map<String,Schema> _mapInputSchemas;

//schemas are stored here so the object can be serialized
private Map<String,String> conf;

private Map<String,String> _inputSchemas;

public PartitionCollapsingSchemas(TaskSchemas schemas, List<Schema> inputSchemas, String outputSchemaName, String outputSchemaNamespace)
public PartitionCollapsingSchemas(TaskSchemas schemas, Map<String,Schema> inputSchemas, String outputSchemaName, String outputSchemaNamespace)
{
if (schemas == null)
{
Expand All @@ -82,42 +84,47 @@ public PartitionCollapsingSchemas(TaskSchemas schemas, List<Schema> inputSchemas
_outputSchemaNamespace = outputSchemaNamespace;

conf = new HashMap<String,String>();
conf.put(INPUT_SCHEMA, Schema.createUnion(inputSchemas).toString());
conf.put(KEY_SCHEMA, schemas.getKeySchema().toString());
conf.put(INTERMEDIATE_VALUE_SCHEMA, schemas.getIntermediateValueSchema().toString());
conf.put(OUTPUT_VALUE_SCHEMA, schemas.getOutputValueSchema().toString());
}

public Schema getInputSchema()
{
if (_inputSchema == null)

_inputSchemas = new HashMap<String,String>();
for (Entry<String,Schema> schema : inputSchemas.entrySet())
{
_inputSchema = new Schema.Parser().parse(conf.get(INPUT_SCHEMA));
_inputSchemas.put(schema.getKey(), schema.getValue().toString());
}
return _inputSchema;
}

public Schema getMapInputSchema()
public Map<String,Schema> getMapInputSchemas()
{
if (_mapInputSchema == null)
if (_mapInputSchemas == null)
{
List<Schema> mapInputSchemas = new ArrayList<Schema>();
_mapInputSchemas = new HashMap<String,Schema>();

if (getInputSchema().getType() == Type.UNION)
{
mapInputSchemas.addAll(getInputSchema().getTypes());
}
else
for (Entry<String,String> schemaPair : _inputSchemas.entrySet())
{
mapInputSchemas.add(getInputSchema());
Schema schema = new Schema.Parser().parse(schemaPair.getValue());

List<Schema> mapInputSchemas = new ArrayList<Schema>();

if (schema.getType() == Type.UNION)
{
mapInputSchemas.addAll(schema.getTypes());
}
else
{
mapInputSchemas.add(schema);
}

// feedback from output (optional)
mapInputSchemas.add(getReduceOutputSchema());

_mapInputSchemas.put(schemaPair.getKey(), Schema.createUnion(mapInputSchemas));
}

// feedback from output (optional)
mapInputSchemas.add(getReduceOutputSchema());

_mapInputSchema = Schema.createUnion(mapInputSchemas);
}
return _mapInputSchema;
return Collections.unmodifiableMap(_mapInputSchemas);
}

public Schema getMapOutputSchema()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Properties;

Expand All @@ -34,6 +35,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.codehaus.jackson.node.NullNode;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -73,10 +75,22 @@ public class PartitionCollapsingJoinTest extends TestBase

static
{
// Create two schemas having the same name, but different types (int vs long). The fields using this schema are not used. This simply
// tests that we can join two conflicting schemas.
Schema intSchema = Schema.createRecord("Dummy", null, "datafu.hourglass", false);
intSchema.setFields(Arrays.asList(
new Field("dummy",Schema.create(Type.INT), null,null)));
Schema longSchema = Schema.createRecord("Dummy", null, "datafu.hourglass", false);
longSchema.setFields(Arrays.asList(
new Field("dummy",Schema.create(Type.LONG), null,null)));

IMPRESSION_SCHEMA = Schemas.createRecordSchema(PartitionPreservingJoinTests.class, "Impression",
new Field("id", Schema.create(Type.LONG), "ID", null));
new Field("id", Schema.create(Type.LONG), "ID", null),
new Field("dummy", Schema.createUnion(Arrays.asList(Schema.create(Type.NULL),intSchema)), null, NullNode.instance));

CLICK_SCHEMA = Schemas.createRecordSchema(PartitionPreservingJoinTests.class, "Click",
new Field("id", Schema.create(Type.LONG), "ID", null));
new Field("id", Schema.create(Type.LONG), "ID", null),
new Field("dummy", Schema.createUnion(Arrays.asList(Schema.create(Type.NULL),longSchema)), null, NullNode.instance));
}

public PartitionCollapsingJoinTest() throws IOException
Expand Down

0 comments on commit 6b1f496

Please sign in to comment.