From 23637cbd1d0fa008851829e46b392de84352c10a Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 11 Jul 2016 21:29:26 -0700 Subject: [PATCH] Remove getDataflowClient() from DataflowPipelineJob --- .../beam/runners/dataflow/DataflowPipelineJob.java | 7 ------- .../runners/dataflow/testing/TestDataflowRunner.java | 2 +- .../dataflow/testing/TestDataflowRunnerTest.java | 12 ------------ 3 files changed, 1 insertion(+), 20 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java index 0c79a920261e..1b3dd434d949 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineJob.java @@ -152,13 +152,6 @@ public DataflowPipelineJob getReplacedByJob() { return replacedByJob; } - /** - * Get the Cloud Dataflow API Client used by this job. - */ - public Dataflow getDataflowClient() { - return dataflowClient; - } - /** * Waits for the job to finish and return the final status. * diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java index 19a21782ff2f..1325cf3dd8db 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunner.java @@ -182,7 +182,7 @@ Optional checkForSuccess(DataflowPipelineJob job) return Optional.of(false); } - JobMetrics metrics = job.getDataflowClient().projects().jobs() + JobMetrics metrics = options.getDataflowClient().projects().jobs() .getMetrics(job.getProjectId(), job.getJobId()).execute(); if (metrics == null || metrics.getMetrics() == null) { diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java index cd99643b6da3..221cd0d6e068 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/testing/TestDataflowRunnerTest.java @@ -127,7 +127,6 @@ public void testRunBatchJobThatSucceeds() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -148,7 +147,6 @@ public void testRunBatchJobThatFails() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.FAILED); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -174,7 +172,6 @@ public void testBatchPipelineFailsIfException() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.RUNNING); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -218,7 +215,6 @@ public void testRunStreamingJobThatSucceeds() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.RUNNING); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -240,7 +236,6 @@ public void testRunStreamingJobThatFails() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.RUNNING); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -350,7 +345,6 @@ public void testStreamingPipelineFailsIfException() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.RUNNING); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -393,7 +387,6 @@ public void testBatchOnCreateMatcher() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -418,7 +411,6 @@ public void testStreamingOnCreateMatcher() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -445,7 +437,6 @@ public void testBatchOnSuccessMatcherWhenPipelineSucceeds() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -470,7 +461,6 @@ public void testStreamingOnSuccessMatcherWhenPipelineSucceeds() throws Exception PAssert.that(pc).containsInAnyOrder(1, 2, 3); final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.DONE); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -497,7 +487,6 @@ public void testBatchOnSuccessMatcherWhenPipelineFails() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.FAILED); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job"); @@ -529,7 +518,6 @@ public void testStreamingOnSuccessMatcherWhenPipelineFails() throws Exception { PAssert.that(pc).containsInAnyOrder(1, 2, 3); final DataflowPipelineJob mockJob = Mockito.mock(DataflowPipelineJob.class); - when(mockJob.getDataflowClient()).thenReturn(service); when(mockJob.getState()).thenReturn(State.FAILED); when(mockJob.getProjectId()).thenReturn("test-project"); when(mockJob.getJobId()).thenReturn("test-job");