diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java index 75dde19ea1..bd1fbff285 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/DataflowProfilingOptions.java @@ -27,11 +27,17 @@ @Hidden public interface DataflowProfilingOptions { - @Description("Whether to periodically dump profiling information to local disk.\n" - + "WARNING: Enabling this option may fill local disk with profiling information.") + @Description( + "This option is deprecated and ignored. Using --saveProfilesToGcs= is preferred.") boolean getEnableProfilingAgent(); void setEnableProfilingAgent(boolean enabled); + @Description( + "When set to a non-empty value, enables recording profiles and saving them to GCS.\n" + + "Profiles will continue until the pipeline is stopped or updated without this option.\n") + String getSaveProfilesToGcs(); + void setSaveProfilesToGcs(String gcsPath); + @Description( "[INTERNAL] Additional configuration for the profiling agent. Not typically necessary.") @Hidden diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java index fd6c1ea608..5f57872b76 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java @@ -275,6 +275,14 @@ public static DataflowPipelineRunner fromOptions(PipelineOptions options) { if (dataflowOptions.getTempLocation() != null) { validator.validateOutputFilePrefixSupported(dataflowOptions.getTempLocation()); } + if (!Strings.isNullOrEmpty(dataflowOptions.getSaveProfilesToGcs())) { + validator.validateOutputFilePrefixSupported(dataflowOptions.getSaveProfilesToGcs()); + } + if (dataflowOptions.getEnableProfilingAgent()) { + LOG.error("--enableProfilingAgent is no longer supported, and will be ignored. " + + "Use --saveProfilesToGcs instead."); + } + if (Strings.isNullOrEmpty(dataflowOptions.getTempLocation())) { dataflowOptions.setTempLocation(dataflowOptions.getStagingLocation()); } else if (Strings.isNullOrEmpty(dataflowOptions.getStagingLocation())) { diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java index 1dac6c4cf2..24cb9eca69 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunnerTest.java @@ -32,7 +32,9 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.google.api.services.dataflow.Dataflow; @@ -94,6 +96,7 @@ import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; import org.joda.time.Instant; +import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.internal.matchers.ThrowableMessageMatcher; @@ -129,12 +132,44 @@ public class DataflowPipelineRunnerTest { private static final String PROJECT_ID = "some-project"; + public static final String VALID_GS_SPAM_HAM_EGGS = "gs://spam/ham/eggs"; + public static final String VALID_SOMEBUCKET_SOME_PATH = "gs://somebucket/some/path"; + public static final String VALID_BUCKET_OBJECT = "gs://bucket/object"; @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); @Rule public ExpectedException thrown = ExpectedException.none(); + private GcsUtil mockGcsUtil; + private Dataflow.Projects.Jobs mockJobs; + + @Before + public void setUp() throws IOException { + this.mockGcsUtil = mock(GcsUtil.class); + this.mockJobs = mock(Dataflow.Projects.Jobs.class); + when(mockGcsUtil.create(any(GcsPath.class), anyString())) + .then(new Answer() { + @Override + public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { + return FileChannel.open( + Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); + when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer>() { + @Override + public List answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + when(mockGcsUtil.bucketExists(GcsPath.fromUri(VALID_GS_SPAM_HAM_EGGS))).thenReturn(true); + when(mockGcsUtil.bucketExists(GcsPath.fromUri(VALID_SOMEBUCKET_SOME_PATH))).thenReturn(true); + when(mockGcsUtil.bucketExists(GcsPath.fromUri(VALID_BUCKET_OBJECT))).thenReturn(true); + } + // Asserts that the given Job has all expected fields set. private static void assertValidJob(Job job) { assertNull(job.getId()); @@ -146,25 +181,22 @@ private DataflowPipeline buildDataflowPipeline(DataflowPipelineOptions options) options.setStableUniqueNames(CheckEnabled.ERROR); DataflowPipeline p = DataflowPipeline.create(options); - p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + p.apply(TextIO.Read.named("ReadMyFile").from(VALID_BUCKET_OBJECT)) + .apply(TextIO.Write.named("WriteMyFile").to(VALID_BUCKET_OBJECT)); return p; } - private static Dataflow buildMockDataflow( - final ArgumentCaptor jobCaptor) throws IOException { + private Dataflow buildMockDataflow() throws IOException { Dataflow mockDataflowClient = mock(Dataflow.class); Dataflow.Projects mockProjects = mock(Dataflow.Projects.class); - Dataflow.Projects.Jobs mockJobs = mock(Dataflow.Projects.Jobs.class); Dataflow.Projects.Jobs.Create mockRequest = mock(Dataflow.Projects.Jobs.Create.class); Dataflow.Projects.Jobs.List mockList = mock(Dataflow.Projects.Jobs.List.class); when(mockDataflowClient.projects()).thenReturn(mockProjects); when(mockProjects.jobs()).thenReturn(mockJobs); - when(mockJobs.create(eq(PROJECT_ID), jobCaptor.capture())) - .thenReturn(mockRequest); + when(mockJobs.create(eq(PROJECT_ID), isA(Job.class))).thenReturn(mockRequest); when(mockJobs.list(eq(PROJECT_ID))).thenReturn(mockList); when(mockList.setPageToken(anyString())).thenReturn(mockList); when(mockList.execute()) @@ -183,43 +215,14 @@ private static Dataflow buildMockDataflow( return mockDataflowClient; } - private GcsUtil buildMockGcsUtil(boolean bucketExists) throws IOException { - GcsUtil mockGcsUtil = mock(GcsUtil.class); - when(mockGcsUtil.create(any(GcsPath.class), anyString())) - .then(new Answer() { - @Override - public SeekableByteChannel answer(InvocationOnMock invocation) throws Throwable { - return FileChannel.open( - Files.createTempFile("channel-", ".tmp"), - StandardOpenOption.CREATE, StandardOpenOption.DELETE_ON_CLOSE); - } - }); - - when(mockGcsUtil.isGcsPatternSupported(anyString())).thenReturn(true); - when(mockGcsUtil.expand(any(GcsPath.class))).then(new Answer>() { - @Override - public List answer(InvocationOnMock invocation) throws Throwable { - return ImmutableList.of((GcsPath) invocation.getArguments()[0]); - } - }); - when(mockGcsUtil.bucketExists(any(GcsPath.class))).thenReturn(bucketExists); - return mockGcsUtil; - } - private DataflowPipelineOptions buildPipelineOptions() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - return buildPipelineOptions(jobCaptor); - } - - private DataflowPipelineOptions buildPipelineOptions( - ArgumentCaptor jobCaptor) throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setProject(PROJECT_ID); - options.setTempLocation("gs://somebucket/some/path"); + options.setTempLocation(VALID_SOMEBUCKET_SOME_PATH); // Set FILES_PROPERTY to empty to prevent a default value calculated from classpath. options.setFilesToStage(new LinkedList()); - options.setDataflowClient(buildMockDataflow(jobCaptor)); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setDataflowClient(buildMockDataflow()); + options.setGcsUtil(mockGcsUtil); options.setGcpCredential(new TestCredential()); return options; } @@ -227,8 +230,7 @@ private DataflowPipelineOptions buildPipelineOptions( @Test public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception { String mixedCase = "ThisJobNameHasMixedCase"; - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setJobName(mixedCase); DataflowPipelineRunner runner = DataflowPipelineRunner.fromOptions(options); @@ -237,12 +239,13 @@ public void testFromOptionsWithUppercaseConvertsToLowercase() throws Exception { @Test public void testRun() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipeline p = buildDataflowPipeline(options); DataflowPipelineJob job = p.run(); assertEquals("newid", job.getJobId()); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -273,14 +276,15 @@ public void testRunReturnDifferentRequestId() throws IOException { @Test public void testUpdate() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setUpdate(true); options.setJobName("oldJobName"); DataflowPipeline p = buildDataflowPipeline(options); DataflowPipelineJob job = p.run(); assertEquals("newid", job.getJobId()); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -334,8 +338,7 @@ protected boolean matchesSafely(DataflowJobAlreadyUpdatedException item) { public void testRunWithFiles() throws IOException { // Test that the function DataflowPipelineRunner.stageFiles works as // expected. - GcsUtil mockGcsUtil = buildMockGcsUtil(true /* bucket exists */); - final String gcsStaging = "gs://somebucket/some/path"; + final String gcsStaging = VALID_SOMEBUCKET_SOME_PATH; final String gcsTemp = "gs://somebucket/some/temp/path"; final String cloudDataflowDataset = "somedataset"; @@ -347,8 +350,8 @@ public void testRunWithFiles() throws IOException { String overridePackageName = "alias.txt"; - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); options.setFilesToStage(ImmutableList.of( temp1.getAbsolutePath(), overridePackageName + "=" + temp2.getAbsolutePath())); @@ -357,15 +360,17 @@ public void testRunWithFiles() throws IOException { options.setTempDatasetId(cloudDataflowDataset); options.setProject(PROJECT_ID); options.setJobName("job"); - options.setDataflowClient(buildMockDataflow(jobCaptor)); - options.setGcsUtil(mockGcsUtil); + options.setDataflowClient(buildMockDataflow()); options.setGcpCredential(new TestCredential()); + when(mockGcsUtil.bucketExists(GcsPath.fromUri(gcsTemp))).thenReturn(true); DataflowPipeline p = buildDataflowPipeline(options); DataflowPipelineJob job = p.run(); assertEquals("newid", job.getJobId()); + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); Job workflowJob = jobCaptor.getValue(); assertValidJob(workflowJob); @@ -442,11 +447,12 @@ public void testGcsStagingLocationInitialization() throws Exception { // Set temp location (required), and check that staging location is set. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); options.setTempLocation(gcsTemp); options.setProject(PROJECT_ID); options.setGcpCredential(new TestCredential()); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + when(mockGcsUtil.bucketExists(GcsPath.fromUri(gcsTemp))).thenReturn(true); DataflowPipelineRunner.fromOptions(options); assertNotNull(options.getStagingLocation()); @@ -454,9 +460,7 @@ public void testGcsStagingLocationInitialization() throws Exception { @Test public void testNonGcsFilePathInReadFailure() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath())); thrown.expectCause(Matchers.allOf( @@ -464,13 +468,12 @@ public void testNonGcsFilePathInReadFailure() throws IOException { ThrowableMessageMatcher.hasMessage( containsString("expected a valid 'gs://' path but was given")))); p.run(); - assertValidJob(jobCaptor.getValue()); } @Test public void testNonGcsFilePathInWriteFailure() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from(VALID_BUCKET_OBJECT)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); @@ -479,9 +482,7 @@ public void testNonGcsFilePathInWriteFailure() throws IOException { @Test public void testMultiSlashGcsFileReadPath() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - - Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); + Pipeline p = buildDataflowPipeline(buildPipelineOptions()); p.apply(TextIO.Read.named("ReadInvalidGcsFile") .from("gs://bucket/tmp//file")); @@ -489,13 +490,16 @@ public void testMultiSlashGcsFileReadPath() throws IOException { instanceOf(IllegalArgumentException.class), ThrowableMessageMatcher.hasMessage(containsString("consecutive slashes")))); p.run(); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @Test public void testMultiSlashGcsFileWritePath() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from(VALID_BUCKET_OBJECT)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("consecutive slashes"); @@ -504,14 +508,15 @@ public void testMultiSlashGcsFileWritePath() throws IOException { @Test public void testInvalidTempLocation() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); options.setTempLocation("file://temp/location"); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); DataflowPipelineRunner.fromOptions(options); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -535,34 +540,67 @@ public void testInvalidStagingLocation() throws IOException { } @Test - public void testNonExistentTempLocation() throws IOException { - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + public void testInvalidProfileLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSaveProfilesToGcs("file://my/staging/location"); + try { + DataflowPipelineRunner.fromOptions(options); + fail("fromOptions should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); + } - GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setGcsUtil(mockGcsUtil); + options.setSaveProfilesToGcs("my/staging/location"); + try { + DataflowPipelineRunner.fromOptions(options); + fail("fromOptions should have failed"); + } catch (IllegalArgumentException e) { + assertThat(e.getMessage(), containsString("expected a valid 'gs://' path but was given")); + } + } + + @Test + public void testNonExistentTempLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); options.setTempLocation("gs://non-existent-bucket/location"); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString( "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); DataflowPipelineRunner.fromOptions(options); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @Test public void testNonExistentStagingLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setStagingLocation("gs://non-existent-bucket/location"); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage(containsString( + "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); + DataflowPipelineRunner.fromOptions(options); + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); + assertValidJob(jobCaptor.getValue()); + } - GcsUtil mockGcsUtil = buildMockGcsUtil(false /* bucket exists */); - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); - options.setGcsUtil(mockGcsUtil); - options.setStagingLocation("gs://non-existent-bucket/location"); + @Test + public void testNonExistentProfileLocation() throws IOException { + DataflowPipelineOptions options = buildPipelineOptions(); + options.setSaveProfilesToGcs("gs://non-existent-bucket/location"); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString( "Output path does not exist or is not writeable: gs://non-existent-bucket/location")); DataflowPipelineRunner.fromOptions(options); + + ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); + verify(mockJobs).create(eq(PROJECT_ID), jobCaptor.capture()); assertValidJob(jobCaptor.getValue()); } @@ -587,9 +625,9 @@ public void testProjectId() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowPipelineRunner.class); options.setProject("foo-12345"); + options.setGcsUtil(mockGcsUtil); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setStagingLocation(VALID_GS_SPAM_HAM_EGGS); options.setGcpCredential(new TestCredential()); DataflowPipelineRunner.fromOptions(options); @@ -598,11 +636,11 @@ public void testProjectId() throws IOException { @Test public void testProjectPrefix() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); options.setRunner(DataflowPipelineRunner.class); options.setProject("google.com:some-project-12345"); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setStagingLocation(VALID_GS_SPAM_HAM_EGGS); options.setGcpCredential(new TestCredential()); DataflowPipelineRunner.fromOptions(options); @@ -611,11 +649,11 @@ public void testProjectPrefix() throws IOException { @Test public void testProjectNumber() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); options.setRunner(DataflowPipelineRunner.class); options.setProject("12345"); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setStagingLocation(VALID_GS_SPAM_HAM_EGGS); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Project ID"); @@ -627,11 +665,12 @@ public void testProjectNumber() throws IOException { @Test public void testProjectDescription() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); + options.setGcsUtil(mockGcsUtil); options.setRunner(DataflowPipelineRunner.class); options.setProject("some project"); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setStagingLocation(VALID_GS_SPAM_HAM_EGGS); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Project ID"); @@ -645,10 +684,9 @@ public void testInvalidNumberOfWorkerHarnessThreads() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); options.setRunner(DataflowPipelineRunner.class); options.setProject("foo-12345"); + options.setGcsUtil(mockGcsUtil); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); - + options.setStagingLocation(VALID_GS_SPAM_HAM_EGGS); options.as(DataflowPipelineDebugOptions.class).setNumberOfWorkerHarnessThreads(-1); thrown.expect(IllegalArgumentException.class); @@ -673,11 +711,12 @@ public void testNoStagingLocationAndNoTempLocationFails() { @Test public void testStagingLocationAndNoTempLocationSucceeds() throws Exception { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); options.setRunner(DataflowPipelineRunner.class); options.setGcpCredential(new TestCredential()); options.setProject("foo-project"); - options.setStagingLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setStagingLocation(VALID_GS_SPAM_HAM_EGGS); + DataflowPipelineRunner.fromOptions(options); } @@ -685,11 +724,11 @@ public void testStagingLocationAndNoTempLocationSucceeds() throws Exception { @Test public void testTempLocationAndNoStagingLocationSucceeds() throws Exception { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setGcsUtil(mockGcsUtil); options.setRunner(DataflowPipelineRunner.class); options.setGcpCredential(new TestCredential()); options.setProject("foo-project"); - options.setTempLocation("gs://spam/ham/eggs"); - options.setGcsUtil(buildMockGcsUtil(true /* bucket exists */)); + options.setTempLocation(VALID_GS_SPAM_HAM_EGGS);; DataflowPipelineRunner.fromOptions(options); } @@ -795,10 +834,7 @@ protected Coder getDefaultOutputCoder(PCollection input) { @Test public void testTransformTranslatorMissing() throws IOException { - // Test that we throw if we don't provide a translation. - ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); - - DataflowPipelineOptions options = buildPipelineOptions(jobCaptor); + DataflowPipelineOptions options = buildPipelineOptions(); DataflowPipeline p = DataflowPipeline.create(options); p.apply(Create.of(Arrays.asList(1, 2, 3))) @@ -808,13 +844,14 @@ public void testTransformTranslatorMissing() throws IOException { thrown.expectMessage(Matchers.containsString("no translator registered")); DataflowPipelineTranslator.fromOptions(options) .translate(p, p.getRunner(), Collections.emptyList()); - assertValidJob(jobCaptor.getValue()); } @Test public void testTransformTranslator() throws IOException { // Test that we can provide a custom translation DataflowPipelineOptions options = buildPipelineOptions(); + when(mockGcsUtil.bucketExists(GcsPath.fromUri("gs://somebucket/some/path/staging"))) + .thenReturn(true); DataflowPipeline p = DataflowPipeline.create(options); TestTransform transform = new TestTransform(); diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java index a65bc0a077..0c4b1e37b2 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineTranslatorTest.java @@ -19,9 +19,9 @@ import static com.google.cloud.dataflow.sdk.util.Structs.addObject; import static com.google.cloud.dataflow.sdk.util.Structs.getDictionary; import static com.google.cloud.dataflow.sdk.util.Structs.getString; - import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasKey; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; @@ -60,6 +60,7 @@ import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.dataflow.sdk.transforms.View; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import com.google.cloud.dataflow.sdk.util.DataflowPathValidator; import com.google.cloud.dataflow.sdk.util.GcsUtil; import com.google.cloud.dataflow.sdk.util.OutputReference; import com.google.cloud.dataflow.sdk.util.PropertyNames; @@ -91,7 +92,6 @@ import java.io.Serializable; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -184,25 +184,23 @@ public void testSettingOfSdkPipelineOptions() throws IOException { .translate(p, p.getRunner(), Collections.emptyList()) .getJob(); - // Note that the contents of this materialized map may be changed by the act of reading an - // option, which will cause the default to get materialized whereas it would otherwise be - // left absent. It is permissible to simply alter this test to reflect current behavior. - Map settings = new HashMap<>(); - settings.put("appName", "DataflowPipelineTranslatorTest"); - settings.put("project", "some-project"); - settings.put("pathValidatorClass", "com.google.cloud.dataflow.sdk.util.DataflowPathValidator"); - settings.put("runner", "com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner"); - settings.put("jobName", "some-job-name"); - settings.put("tempLocation", "gs://somebucket/some/path"); - settings.put("stagingLocation", "gs://somebucket/some/path/staging"); - settings.put("stableUniqueNames", "WARNING"); - settings.put("streaming", false); - settings.put("numberOfWorkerHarnessThreads", 0); - settings.put("experiments", null); - Map sdkPipelineOptions = job.getEnvironment().getSdkPipelineOptions(); assertThat(sdkPipelineOptions, hasKey("options")); - assertEquals(settings, sdkPipelineOptions.get("options")); + Map optionsMap = (Map) sdkPipelineOptions.get("options"); + + assertThat(optionsMap, hasEntry("appName", (Object) "DataflowPipelineTranslatorTest")); + assertThat(optionsMap, hasEntry("project", (Object) "some-project")); + assertThat(optionsMap, + hasEntry("pathValidatorClass", (Object) DataflowPathValidator.class.getName())); + assertThat(optionsMap, + hasEntry("runner", (Object) DataflowPipelineRunner.class.getName())); + assertThat(optionsMap, hasEntry("jobName", (Object) "some-job-name")); + assertThat(optionsMap, hasEntry("tempLocation", (Object) "gs://somebucket/some/path")); + assertThat(optionsMap, + hasEntry("stagingLocation", (Object) "gs://somebucket/some/path/staging")); + assertThat(optionsMap, hasEntry("stableUniqueNames", (Object) "WARNING")); + assertThat(optionsMap, hasEntry("streaming", (Object) false)); + assertThat(optionsMap, hasEntry("numberOfWorkerHarnessThreads", (Object) 0)); } @Test