New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-1542] Cloud Spanner Source #3395
Conversation
R: @jkff |
Retest this please |
Changes Unknown when pulling 9460ad0 on mairbek:readapi-pr into ** on apache:master**. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Here's a first round.
/** Creates a batch transaction. */ | ||
class CreateTransactionFn extends AbstractSpannerFn<Object, Transaction> { | ||
|
||
private static final long serialVersionUID = -4174426331092286581L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need serialVersionUID on DoFn's
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
databaseClient().readOnlyTransaction(config.getTimestampBound())) { | ||
// Run a dummy sql statement to force the RPC and obtain the timestamp from the server. | ||
ResultSet resultSet = readOnlyTransaction.executeQuery(Statement.of("SELECT 1")); | ||
while (resultSet.next()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ResultSet#next
actually send the create transaction gRCP request.
return builder().build(); | ||
} | ||
|
||
public static Builder builder() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're already exposing the withBlah() methods - no need to expose the Builder class as public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
/** Builder for {@link SpannerConfig}. */ | ||
@AutoValue.Builder | ||
public abstract static class Builder { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of unnecessary blank lines
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleaned this up
/** | ||
* Returns a new {@link SpannerIO.Read} that will write to the specified Cloud Spanner project. | ||
* | ||
* <p>Does not modify this object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove all the "Does not modify this object" comments - this is true for all withBlah() methods in Beam and specifying it is redundant.
You can also drop "Returns a new SpannerIO.Read that..." - usually we phrase these comments as simply "Specifies the Cloud Spanner project" (note: your comment currently says write - that's a copy-paste artifact I suppose; same is true for a bunch of other builder methods below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
} | ||
|
||
/** | ||
* Returns a new {@link SpannerIO.Read} that will read from the specified Cloud Spanner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is incorrect
public abstract static class CreateTransaction | ||
extends PTransform<PBegin, PCollectionView<Transaction>> { | ||
|
||
private static final long serialVersionUID = 9201734106453817417L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serialVersionUID's on PTransforms are also unnecessary. We don't serialize PTransform's using Serializable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
/** Unit tests for {@link SpannerIO}. */ | ||
@RunWith(JUnit4.class) | ||
public class SpannerIOReadTest implements Serializable { | ||
@Rule public final transient TestPipeline pipeline = TestPipeline.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you're not using this pipeline. Were you planning to add some tests reading from a mock spanner instance?
/** Pipeline options for this test. */ | ||
public interface SpannerTestPipelineOptions extends TestPipelineOptions { | ||
@Description("Project ID for Spanner") | ||
@Default.String("apache-beam-testing") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, looks like this test can only be run by people who are members of the apache-beam-testing project (e.g. I'm not).
Better implement tests with mocks, like we do for BigQuery and many other connectors. If you want to also have ITs that's fine, but let's discuss it separately after the mocks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a project that beam CI uses. @dhalperi set up the Spanner config.
Agree, I'm keeping this test and will add more tests to SpannerIOReadTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davorbonaci or @jasonkuster can say more about the ITs. It is standard for the ITs to default to a project that Jenkins can access, but that is not public if the underlying service doesn't make it easy. (BigQuery's ReadITs can use public datasets, but not for writing... etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Humans can run these by overriding the arguments on the command line / in the test config.
Added more tests, please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Here's another round; starting to look pretty good.
databaseClient().readOnlyTransaction(timestampBound)) { | ||
ResultSet resultSet = execute(readOnlyTransaction); | ||
while (resultSet.next()) { | ||
c.output(resultSet.getCurrentRowAsStruct()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, is this efficient? Or is it normally a lot more efficient for users to construct their own objects from the ResultSet
directly, like people normally do with JDBC ResultSet's?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be reasonably efficient.
We might want to introduce a Schema
abstraction to avoid serialization overhead (currently, we serialize pairs of name, value both for struct and mutation). Something for the future PR.
abstract SpannerConfig getSpannerConfig(); | ||
|
||
@Nullable | ||
abstract Long getBatchSizeBytes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change it back to long
? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merged with master
Write withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) { | ||
return toBuilder().setServiceFactory(serviceFactory).build(); | ||
/** Specifies the batch size limit. */ | ||
public Write withBatchSizeBytes(long batchSizeBytes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making this public should not be part of the current PR too, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was public before. I've moved methods around.
} | ||
getSpannerConfig().populateDisplayData(builder); | ||
builder.add( | ||
DisplayData.item("batchSizeBytes", getBatchSizeBytes()).withLabel("Batch Size in Bytes")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's not public and controllable by user, it should not be in display data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's public.
@@ -0,0 +1,125 @@ | |||
/* | |||
* Licensed to the Apache Software Foundation (ASF) under one |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I welcome this move, but it should not be part of the current PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you prefer if I send e9528e1 as a separate RP?
import org.mockito.Matchers; | ||
|
||
/** | ||
* A serialization friendly type service factory that maintains a mock {@link Spanner} and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it need to be serializable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's part of the Transform which gets serialized by TestPipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I'll merge soon.
Defines Cloud Spanner Source API, and a naive implementation, similar to JdbcIO.
Also includes #3358