Skip to content

Commit

Permalink
Merge ab7f6f6 into 99b5b05
Browse files Browse the repository at this point in the history
  • Loading branch information
ssisk committed Jun 29, 2017
2 parents 99b5b05 + ab7f6f6 commit e885f24
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 23 deletions.
Expand Up @@ -20,6 +20,7 @@
import com.google.bigtable.v2.Row;
import com.google.cloud.bigtable.config.BigtableOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
Expand All @@ -41,8 +42,10 @@ public void testE2EBigtableRead() throws Exception {
BigtableTestOptions options = TestPipeline.testingPipelineOptions()
.as(BigtableTestOptions.class);

String project = options.as(GcpOptions.class).getProject();

BigtableOptions.Builder bigtableOptionsBuilder = new BigtableOptions.Builder()
.setProjectId(options.getProjectId())
.setProjectId(project)
.setInstanceId(options.getInstanceId());

final String tableId = "BigtableReadTest";
Expand Down
Expand Up @@ -25,11 +25,6 @@
* Properties needed when using Bigtable with the Beam SDK.
*/
public interface BigtableTestOptions extends TestPipelineOptions {
@Description("Project ID for Bigtable")
@Default.String("apache-beam-testing")
String getProjectId();
void setProjectId(String value);

@Description("Instance ID for Bigtable")
@Default.String("beam-test")
String getInstanceId();
Expand Down
Expand Up @@ -73,15 +73,17 @@ public class BigtableWriteIT implements Serializable {
private static BigtableTableAdminClient tableAdminClient;
private final String tableId =
String.format("BigtableWriteIT-%tF-%<tH-%<tM-%<tS-%<tL", new Date());
private String project;

@Before
public void setup() throws Exception {
PipelineOptionsFactory.register(BigtableTestOptions.class);
options = TestPipeline.testingPipelineOptions().as(BigtableTestOptions.class);
project = options.as(GcpOptions.class).getProject();

bigtableOptions =
new Builder()
.setProjectId(options.getProjectId())
.setProjectId(project)
.setInstanceId(options.getInstanceId())
.setUserAgent("apache-beam-test")
.build();
Expand Down
Expand Up @@ -31,6 +31,8 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
Expand Down Expand Up @@ -59,11 +61,6 @@ public class SpannerReadIT {

/** Pipeline options for this test. */
public interface SpannerTestPipelineOptions extends TestPipelineOptions {
@Description("Project ID for Spanner")
@Default.String("apache-beam-testing")
String getProjectId();
void setProjectId(String value);

@Description("Instance ID to write to in Spanner")
@Default.String("beam-test")
String getInstanceId();
Expand All @@ -84,13 +81,16 @@ public interface SpannerTestPipelineOptions extends TestPipelineOptions {
private DatabaseAdminClient databaseAdminClient;
private SpannerTestPipelineOptions options;
private String databaseName;
private String project;

@Before
public void setUp() throws Exception {
PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);

spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService();
project = options.as(GcpOptions.class).getProject();

spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService();

databaseName = generateDatabaseName();

Expand Down Expand Up @@ -118,7 +118,7 @@ public void testRead() throws Exception {
DatabaseClient databaseClient =
spanner.getDatabaseClient(
DatabaseId.of(
options.getProjectId(), options.getInstanceId(), databaseName));
project, options.getInstanceId(), databaseName));

List<Mutation> mutations = new ArrayList<>();
for (int i = 0; i < 5L; i++) {
Expand All @@ -134,7 +134,7 @@ public void testRead() throws Exception {
databaseClient.writeAtLeastOnce(mutations);

SpannerConfig spannerConfig = SpannerConfig.create()
.withProjectId(options.getProjectId())
.withProjectId(project)
.withInstanceId(options.getInstanceId())
.withDatabaseId(databaseName);

Expand Down
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.spanner.Statement;
import com.google.spanner.admin.database.v1.CreateDatabaseMetadata;
import java.util.Collections;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
Expand All @@ -59,11 +60,6 @@ public class SpannerWriteIT {

/** Pipeline options for this test. */
public interface SpannerTestPipelineOptions extends TestPipelineOptions {
@Description("Project ID for Spanner")
@Default.String("apache-beam-testing")
String getProjectId();
void setProjectId(String value);

@Description("Instance ID to write to in Spanner")
@Default.String("beam-test")
String getInstanceId();
Expand All @@ -84,13 +80,16 @@ public interface SpannerTestPipelineOptions extends TestPipelineOptions {
private DatabaseAdminClient databaseAdminClient;
private SpannerTestPipelineOptions options;
private String databaseName;
private String project;

@Before
public void setUp() throws Exception {
PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);

spanner = SpannerOptions.newBuilder().setProjectId(options.getProjectId()).build().getService();
project = options.as(GcpOptions.class).getProject();

spanner = SpannerOptions.newBuilder().setProjectId(project).build().getService();

databaseName = generateDatabaseName();

Expand Down Expand Up @@ -128,15 +127,15 @@ public void testWrite() throws Exception {
.apply(ParDo.of(new GenerateMutations(options.getTable())))
.apply(
SpannerIO.write()
.withProjectId(options.getProjectId())
.withProjectId(project)
.withInstanceId(options.getInstanceId())
.withDatabaseId(databaseName));

p.run();
DatabaseClient databaseClient =
spanner.getDatabaseClient(
DatabaseId.of(
options.getProjectId(), options.getInstanceId(), databaseName));
project, options.getInstanceId(), databaseName));

ResultSet resultSet =
databaseClient
Expand Down

0 comments on commit e885f24

Please sign in to comment.