Skip to content

Commit

Permalink
Fixup after rebase.
Browse files Browse the repository at this point in the history
  • Loading branch information
Reuven Lax committed Jul 28, 2017
1 parent c1a6497 commit 3d64f3f
Show file tree
Hide file tree
Showing 7 changed files with 12 additions and 13 deletions.
Expand Up @@ -141,7 +141,7 @@ List<PCollectionView<?>> getDynamicDestinationSideInputs(
entry.getKey(),
originalPCollection,
transformProto,
sdkComponents.toComponents()));
RehydratedComponents.forComponents(sdkComponents.toComponents())));
}
return views;
}
Expand Down
Expand Up @@ -550,7 +550,7 @@ public void writeWithRunnerDeterminedSharding() {
WriteFiles.to(
new FileBasedSink<Integer, Void, Integer>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.<Integer>constant(null)) {
DynamicFileDestinations.<Integer>constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Void, Integer> createWriteOperation() {
return null;
Expand Down
Expand Up @@ -144,7 +144,7 @@ public void withNoShardingSpecifiedReturnsNewTransform() {
WriteFiles.to(
new FileBasedSink<Object, Void, Object>(
StaticValueProvider.of(outputDirectory),
DynamicFileDestinations.constant(null)) {
DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Void, Object> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
Expand Down
Expand Up @@ -1297,7 +1297,6 @@ public void validate(PipelineOptions options) {}
TestSink(String tmpFolder) {
super(
StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
<<<<<<< HEAD
DynamicFileDestinations.constant(
new FilenamePolicy() {
@Override
Expand All @@ -1317,9 +1316,6 @@ public ResourceId unwindowedFilename(
throw new UnsupportedOperationException("should not be called");
}
}, SerializableFunctions.identity()));
=======
DynamicFileDestinations.constant(null));
>>>>>>> Address comments.
}

@Override
Expand Down
Expand Up @@ -313,7 +313,7 @@ final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
this,
DynamicDestinations.class,
new TypeVariableExtractor<
DynamicDestinations<UserT, DestinationT>, DestinationT>() {});
DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
checkArgument(
descriptor != null,
"Unable to infer a coder for DestinationT, "
Expand Down
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -71,6 +72,7 @@
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
Expand Down Expand Up @@ -623,15 +625,16 @@ public void testDynamicDestinations() throws Exception {
expectedElements.add(createRecord(element, prefix, new Schema.Parser().parse(jsonSchema)));
}
PCollectionView<Map<String, String>> schemaView =
p.apply("createSchemaView", Create.of(schemaMap)).apply(View.<String, String>asMap());
writePipeline.apply("createSchemaView", Create.of(schemaMap))
.apply(View.<String, String>asMap());

PCollection<String> input =
p.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
writePipeline.apply("createInput", Create.of(elements).withCoder(StringUtf8Coder.of()));
input.apply(AvroIO.<String>writeCustomTypeToGenericRecords()
.to(new TestDynamicDestinations(baseDir, schemaView))
.withoutSharding()
.withTempDirectory(baseDir));
p.run();
writePipeline.run();

// Validate that the data written matches the expected elements in the expected order.

Expand Down
Expand Up @@ -275,10 +275,10 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
new UserWriteType("caab", "sixth"));
PCollection<UserWriteType> input = p.apply(Create.of(elements));
input.apply(
TextIO.<UserWriteType>writeCustomType())
TextIO.<UserWriteType>writeCustomType()
.to(new UserWriteDestination(baseDir), new DefaultFilenamePolicy.Params())
.withFormatFunction(new SerializeUserWrite())
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true));
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
p.run();

String[] aElements =
Expand Down

0 comments on commit 3d64f3f

Please sign in to comment.