Skip to content

[BEAM-1542] Introduced SpannerIO.readAll#3591

Closed
mairbek wants to merge 10 commits intoapache:masterfrom
mairbek:readrefactor
Closed

[BEAM-1542] Introduced SpannerIO.readAll#3591
mairbek wants to merge 10 commits intoapache:masterfrom
mairbek:readrefactor

Conversation

@mairbek
Copy link
Contributor

@mairbek mairbek commented Jul 19, 2017

SpannerIO.readAll allows to pipe reading from Cloud Spanner to other computations. The motivating use case is exporting data from a Spanner database, when list of the tables is generated in a previous transform.

R: @jkff

@mairbek mairbek changed the title [BEAM-1542] Introduced SpannerIO.readFn [BEAM-1542] Introduced SpannerIO.readFn Jul 19, 2017
@mairbek
Copy link
Contributor Author

mairbek commented Jul 19, 2017

Also includes https://github.com/apache/beam/pull/3574

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Exciting to get more connectors following this pattern (reading a PCollection of sources).

* {@link Read#createTransaction()}. If null, the doesn't provide a transactional guarantees.
* @return a {@link DoFn} that concurrently reads from multiple Cloud Spanner sources.
*/
@Experimental
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoFn's should not be part of the public API of a PTransform https://beam.apache.org/contribute/ptransform-style-guide/#exposing-a-ptransform-vs-something-else

Instead, please follow the example of TextIO.readAll() https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java#L170 and create a new PTransform SpannerIO.readAll()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@mairbek mairbek changed the title [BEAM-1542] Introduced SpannerIO.readFn [BEAM-1542] Introduced SpannerIO.readAll Jul 19, 2017
Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, the approach looks good.

sideInputs = Collections.singletonList(getTransaction());
}
NaiveSpannerReadFn fn = new NaiveSpannerReadFn(getSpannerConfig(), getTransaction());
return input.apply("Execute query", ParDo.of(fn).withSideInputs(sideInputs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to reshuffle the input collection before applying this fn, or it may be fused with another fn that was producing multiple queries per element, and they'll all be read sequentially.


NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TestPipeline test for readAll() to this file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.02%) to 70.62% when pulling 250ea5f on mairbek:readrefactor into be5b934 on apache:master.

Copy link
Contributor

@jkff jkff left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I'll make these changes myself and merge tomorrow.

.apply("Pair wth random key",
WithKeys.of(new SerializableFunction<ReadOperation, String>() {

@Override public String apply(ReadOperation input1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Override on separate line

@Override
public PCollection<Struct> expand(PCollection<ReadOperation> input) {
PCollection<ReadOperation> reshuffled = input
.apply("Pair wth random key",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with

sideInputs = Collections.singletonList(getTransaction());
}
NaiveSpannerReadFn fn = new NaiveSpannerReadFn(getSpannerConfig(), getTransaction());
return reshuffled.apply("Execute all query", ParDo.of(fn).withSideInputs(sideInputs));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queries

}
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation is off

}

/**
* A {@link PTransform} that reads data from Google Cloud Spanner.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we document this as "Implementation of {@link #read}" or something like that, to avoid redundancy.

private ReadOnlyTransaction mockTx;

private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()),
private transient Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also make these static final.

@asfgit asfgit closed this in afeba37 Jul 20, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants