Skip to content

Commit

Permalink
Fixed #235.
Browse files Browse the repository at this point in the history
Some epilogue stage clients may ignore Direct I/O's input attributes.
  • Loading branch information
ashigeru committed Jul 31, 2012
1 parent aa4232a commit 061696e
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 124 deletions.
Expand Up @@ -29,6 +29,7 @@

import com.asakusafw.compiler.common.Naming;
import com.asakusafw.compiler.common.Precondition;
import com.asakusafw.compiler.flow.ExternalIoDescriptionProcessor.SourceInfo;
import com.asakusafw.compiler.flow.FlowCompilingEnvironment;
import com.asakusafw.compiler.flow.Location;
import com.asakusafw.compiler.flow.jobflow.CompiledStage;
Expand Down Expand Up @@ -230,24 +231,25 @@ private MethodDeclaration createStageInputsMethod() throws IOException {
.newObject()
.toExpression())
.toStatement());
// TODO add attributes
for (Map.Entry<String, String> entry : Collections.<String, String>emptyMap().entrySet()) {
statements.add(new ExpressionBuilder(factory, attributes)
.method("put",
Models.toLiteral(factory, entry.getKey()),
Models.toLiteral(factory, entry.getValue()))
.toStatement());
}
for (Slot.Input input : slot.getSource().getInputs()) {
statements.add(new ExpressionBuilder(factory, list)
.method("add", new TypeBuilder(factory, t(StageInput.class))
.newObject(
Models.toLiteral(factory, input.getLocation().toPath(PATH_SEPARATOR)),
factory.newClassLiteral(t(input.getFormatType())),
factory.newClassLiteral(mapperType),
attributes)
.toExpression())
.toStatement());
for (SourceInfo input : slot.getSource().getInputs()) {
for (Map.Entry<String, String> entry : input.getAttributes().entrySet()) {
statements.add(new ExpressionBuilder(factory, attributes)
.method("put",
Models.toLiteral(factory, entry.getKey()),
Models.toLiteral(factory, entry.getValue()))
.toStatement());
}
for (Location location : input.getLocations()) {
statements.add(new ExpressionBuilder(factory, list)
.method("add", new TypeBuilder(factory, t(StageInput.class))
.newObject(
Models.toLiteral(factory, location.toPath(PATH_SEPARATOR)),
factory.newClassLiteral(t(input.getFormat())),
factory.newClassLiteral(mapperType),
attributes)
.toExpression())
.toStatement());
}
}
}
statements.add(new ExpressionBuilder(factory, list)
Expand Down
Expand Up @@ -18,25 +18,23 @@
import java.lang.reflect.Type;
import java.util.List;

import org.apache.hadoop.mapreduce.InputFormat;

import com.asakusafw.compiler.common.Precondition;
import com.asakusafw.compiler.flow.Location;
import com.asakusafw.compiler.flow.ExternalIoDescriptionProcessor.SourceInfo;

/**
* ソートする出力のスロット。
*/
public class Slot {

private String outputName;
private final String outputName;

private Type type;
private final Type type;

private List<String> propertyNames;
private final List<String> propertyNames;

private List<Slot.Input> inputs;
private final List<SourceInfo> inputs;

private Class<?> outputFormatType;
private final Class<?> outputFormatType;

/**
* インスタンスを生成する。
Expand All @@ -51,7 +49,7 @@ public Slot(
String outputName,
Type type,
List<String> propertyNames,
List<Input> inputs,
List<SourceInfo> inputs,
Class<?> outputFormatType) {
Precondition.checkMustNotBeNull(outputName, "outputName"); //$NON-NLS-1$
Precondition.checkMustNotBeNull(type, "type"); //$NON-NLS-1$
Expand Down Expand Up @@ -93,7 +91,7 @@ public List<String> getSortPropertyNames() {
* このスロットへの入力の一覧を返す。
* @return このスロットへの入力の一覧
*/
public List<Slot.Input> getInputs() {
public List<SourceInfo> getInputs() {
return inputs;
}

Expand All @@ -104,44 +102,4 @@ public List<Slot.Input> getInputs() {
public Class<?> getOutputFormatType() {
return outputFormatType;
}

/**
* スロットへの入力。
*/
public static class Input {

private Location location;

private Class<? extends InputFormat<?, ?>> formatType;

/**
* インスタンスを生成する。
* @param location この入力が配置された位置
* @param formatType この入力のフォーマットを表す型
* @throws IllegalArgumentException 引数に{@code null}が指定された場合
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public Input(Location location, Class<? extends InputFormat> formatType) {
Precondition.checkMustNotBeNull(location, "location"); //$NON-NLS-1$
Precondition.checkMustNotBeNull(formatType, "formatType"); //$NON-NLS-1$
this.location = location;
this.formatType = (Class<? extends InputFormat<?, ?>>) formatType;
}

/**
* この入力が配置された位置を返す。
* @return この入力が配置された位置
*/
public Location getLocation() {
return location;
}

/**
* この入力のフォーマットを表す型を返す。
* @return この入力のフォーマットを表す型
*/
public Class<? extends InputFormat<?, ?>> getFormatType() {
return formatType;
}
}
}
Expand Up @@ -25,8 +25,6 @@
import java.util.TreeMap;
import java.util.regex.Pattern;

import org.apache.hadoop.mapreduce.InputFormat;

import com.asakusafw.compiler.flow.ExternalIoDescriptionProcessor;
import com.asakusafw.compiler.flow.Location;
import com.asakusafw.compiler.flow.jobflow.CompiledStage;
Expand Down Expand Up @@ -208,18 +206,11 @@ public int compare(Location o1, Location o2) {
private Slot toSlot(Output output, String name) {
assert output != null;
assert name != null;
List<Slot.Input> inputs = Lists.create();
for (SourceInfo source : output.getSources()) {
Class<? extends InputFormat<?, ?>> format = source.getFormat();
for (Location location : source.getLocations()) {
inputs.add(new Slot.Input(location, format));
}
}
return new Slot(
name,
output.getDescription().getDataType(),
Collections.<String>emptyList(),
inputs,
output.getSources(),
TemporaryOutputFormat.class);
}

Expand Down
Expand Up @@ -19,11 +19,13 @@
import static org.junit.Assert.*;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import org.junit.Test;

import com.asakusafw.compiler.flow.DataClass.Property;
import com.asakusafw.compiler.flow.ExternalIoDescriptionProcessor.SourceInfo;
import com.asakusafw.compiler.flow.JobflowCompilerTestRoot;
import com.asakusafw.compiler.flow.Location;
import com.asakusafw.compiler.flow.testing.model.Ex1;
Expand All @@ -46,9 +48,7 @@ public void single() {
"out",
Ex1.class,
Arrays.asList("sid"),
Arrays.asList(new Slot.Input[] {
new Slot.Input(Location.fromPath("a", '/'), TemporaryInputFormat.class),
}),
Arrays.asList(input("a")),
TemporaryOutputFormat.class);
List<ResolvedSlot> resolved = resolver.resolve(Arrays.asList(slot));
assertThat(environment.hasError(), is(false));
Expand All @@ -74,25 +74,19 @@ public void multiple() {
"out1",
Ex1.class,
Arrays.asList("sid"),
Arrays.asList(new Slot.Input[] {
new Slot.Input(Location.fromPath("a/1", '/'), TemporaryInputFormat.class),
}),
Arrays.asList(input("a/1")),
TemporaryOutputFormat.class),
new Slot(
"out2",
Ex2.class,
Arrays.asList("sid"),
Arrays.asList(new Slot.Input[] {
new Slot.Input(Location.fromPath("a/2", '/'), TemporaryInputFormat.class),
}),
Arrays.asList(input("a/2")),
TemporaryOutputFormat.class),
new Slot(
"out3",
Ex1.class,
Arrays.asList("sid"),
Arrays.asList(new Slot.Input[] {
new Slot.Input(Location.fromPath("a/3", '/'), TemporaryInputFormat.class),
}),
Arrays.asList(input("a/3")),
TemporaryOutputFormat.class),
});
List<ResolvedSlot> resolved = resolver.resolve(slots);
Expand All @@ -117,9 +111,7 @@ public void invalid_class() {
"out",
Void.class,
Arrays.asList("sid"),
Arrays.asList(new Slot.Input[] {
new Slot.Input(Location.fromPath("a", '/'), TemporaryInputFormat.class),
}),
Arrays.asList(input("a")),
TemporaryOutputFormat.class);
resolver.resolve(Arrays.asList(slot));
assertThat(environment.hasError(), is(true));
Expand All @@ -135,11 +127,13 @@ public void invalid_property() {
"out",
Ex1.class,
Arrays.asList("missing"),
Arrays.asList(new Slot.Input[] {
new Slot.Input(Location.fromPath("a", '/'), TemporaryInputFormat.class),
}),
Arrays.asList(input("a")),
TemporaryOutputFormat.class);
resolver.resolve(Arrays.asList(slot));
assertThat(environment.hasError(), is(true));
}

private SourceInfo input(String path) {
return new SourceInfo(Collections.singleton(Location.fromPath(path, '/')), TemporaryInputFormat.class);
}
}
Expand Up @@ -25,8 +25,6 @@
import java.util.TreeMap;
import java.util.regex.Pattern;

import org.apache.hadoop.mapreduce.InputFormat;

import com.asakusafw.compiler.flow.ExternalIoDescriptionProcessor;
import com.asakusafw.compiler.flow.FlowCompilerOptions.GenericOptionValue;
import com.asakusafw.compiler.flow.Location;
Expand Down Expand Up @@ -326,18 +324,11 @@ public int compare(Location o1, Location o2) {
private Slot toSlot(Output output, String name) {
assert output != null;
assert name != null;
List<Slot.Input> inputs = Lists.create();
for (SourceInfo source : output.getSources()) {
Class<? extends InputFormat<?, ?>> format = source.getFormat();
for (Location location : source.getLocations()) {
inputs.add(new Slot.Input(location, format));
}
}
return new Slot(
name,
output.getDescription().getDataType(),
Collections.<String>emptyList(),
inputs,
output.getSources(),
extract(output.getDescription()).getOutputFormat());
}

Expand Down
Expand Up @@ -26,7 +26,6 @@
import java.util.Set;
import java.util.TreeSet;

import org.apache.hadoop.mapreduce.InputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -286,19 +285,12 @@ public List<CompiledStage> emitEpilogue(IoContext context) throws IOException {

private Slot toSlot(Output output) {
BulkLoadExporterDescription desc = extract(output.getDescription());
List<Slot.Input> inputs = Lists.create();
for (SourceInfo source : output.getSources()) {
Class<? extends InputFormat<?, ?>> format = source.getFormat();
for (Location location : source.getLocations()) {
inputs.add(new Slot.Input(location, format));
}
}
String name = normalize(output.getDescription().getName());
return new Slot(
name,
output.getDescription().getDataType(),
desc.getPrimaryKeyNames(),
inputs,
output.getSources(),
TemporaryOutputFormat.class);
}

Expand Down
Expand Up @@ -28,7 +28,6 @@
import java.util.TreeMap;
import java.util.TreeSet;

import org.apache.hadoop.mapreduce.InputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -177,19 +176,13 @@ public List<CompiledStage> emitEpilogue(IoContext context) throws IOException {
}

private Slot toSlot(Output output) {
List<Slot.Input> inputs = Lists.create();
for (SourceInfo source : output.getSources()) {
Class<? extends InputFormat<?, ?>> format = source.getFormat();
for (Location location : source.getLocations()) {
inputs.add(new Slot.Input(location, format));
}
}
assert output != null;
String name = normalize(output.getDescription().getName());
return new Slot(
name,
output.getDescription().getDataType(),
Collections.<String>emptyList(),
inputs,
output.getSources(),
TemporaryOutputFormat.class);
}

Expand Down

0 comments on commit 061696e

Please sign in to comment.