Skip to content

Conversation

@EdgarLGB
Copy link
Member

@EdgarLGB EdgarLGB commented Mar 27, 2019

Hi all,

Here is the PR for an initial version of CouchbaseIO. Currently, the read part has been accomplished. In general, the idea is to read the data by using a set of sources which is supposed to read a portion of data according to the key range. Knowing that Couchbase does not provide the feature of splitting up a bucket. So firstly I fetch the total number of keys and then divide the key range by the number of threads. Each source will be attributed a part of key range so that a whole set of sources can do a parallel read.
I'm looking forward to the review. Thanks! (R: @aromanenko-dev @iemejia )

Regards,
Guobao

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status
Build Status
Build Status
Build Status Build Status Build Status
Python Build Status
Build Status
--- Build Status
Build Status
Build Status --- --- ---

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

Copy link
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work @EdgarLGB ! Let some comments that can be easily fixed.
I am going to update the ticket so we can get the Read part merged eagerly and we will do the Write part in a subsequent issue if you agree.
Please tell me when fixes are done.

LOG.debug(query);
N1qlQueryResult result = getBucket().query(N1qlQuery.simple(query));
if (!result.finalSuccess()) {
throw new CouchbaseIOException(result.errors().get(0).getString("msg"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you cannot estimate the size you can return 0 (following the specification of the parent), or the Exception but use just a normal IOException.

@Override
public boolean advance() {
if (keyIterator.hasNext()) {
currentKey = new String(keyIterator.next().byteValue(), Charset.defaultCharset());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer UTF-8 to defaultCharset for reliable tests.

import java.io.IOException;

/** Exception of {@link CouchbaseIO}. */
class CouchbaseIOException extends IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove

* @param port the http port
* @return a {@link PTransform} reading data from Couchbase
*/
public Read withHttpPort(int port) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add ValueProvider versions of the with methods

private Bucket getBucket() {
if (cluster == null) {
DefaultCouchbaseEnvironment.Builder builder = DefaultCouchbaseEnvironment.builder();
if (spec.httpPort() != null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't httpPort mandatory, if so remove the @Nullable annotation and you won t need this.

static class CouchbaseSource extends BoundedSource<JsonDocument> {

private final Read spec;
private int itemCount;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable private Long estimatedSizeBytes;

*/
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
String query = String.format("SELECT RAW COUNT(META().id) FROM `%s`", getBucket().name());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (estimatedSizeBytes == null) ... Estimating the size of the collection could be a heavy operation so this will save you extra calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to use BoundedSource API since there is no benefit of this here. It could be implemented by using just ParDo. I'll elaborate my point in top-level comment of this review.

long desiredBundleSize, PipelineOptions options) {
// If the desiredBundleSize equals to 0, it means that there will be only one bundle of data
// to be read.
int totalBundle =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since itemCount/estimatedSizeBytes could be null you may need to do long estimatedSizeBytes = getEstimatedSizeBytes(options); before.


@Override
public void close() {
// Delegate the disconnection of Couchbase to the method "expand" of CouchbaseSource,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better idea to open and deal with the connection lifecycle at the reader level, because the Source does not have a lifecycle.

}

@Override
public BoundedSource<JsonDocument> getCurrentSource() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is JsonDocument the only supported type? Document maybe? or even arbitrary objects?

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @EdgarLGB for contribution, great job!

Since @iemejia has already performed code level review, I'd add my thoughts about current implementation approach. I suggest do not use BoundedSource in this case because the benefits of it can not be applied here - we can't estimate the size of collection in bytes (only number of records as I understand) and it makes misleading to use this API in this manner. Instead, we could create an implementation based on just simple ParDo (like we have in RedisIO and JdbcIO that have similar cases).

The main principal would be the following:

  • Fetch total number of keys from the bucket;
  • Create a collection of offsets based on the size of bundle (default value should be reasonable but we also can provide a possibility for user to configure this);
  • Create new ParDo which will read bundle of records based on provided offset;
  • Avoid fusion break similar to RedisIO and JdbcIO implementation.

Is Write part going to be added later, as a separate Jira/PR?


@Override
public PCollection<JsonDocument> expand(PBegin input) {
checkArgument((hosts() != null), "WithHosts()is required");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: WithHosts()is - please, add a space before "is"

*/
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
String query = String.format("SELECT RAW COUNT(META().id) FROM `%s`", getBucket().name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to use BoundedSource API since there is no benefit of this here. It could be implemented by using just ParDo. I'll elaborate my point in top-level comment of this review.

@EdgarLGB
Copy link
Member Author

Thank you @iemejia and @aromanenko-dev for the code review. I really appreciated it! I stand for the view of @aromanenko-dev that it needs to do some rework. What do you think @iemejia ? Meanwhile, I need to handle the points that @iemejia has mentioned.

@iemejia
Copy link
Member

iemejia commented Mar 28, 2019

I would be surprised if there is not a way to estimate the size of the collection in Couchbase. For example couch_total_disk_size? or something like that.

I am definitely pro changing it to a ParDo based approach in any case. I just thought that the exercise to write the IO based on the BoundedSource API was worth, and thought about doing this next, but why not. Remember however that partitioning should be properly done in this case too (You can take a look at SolrIO also for an example).

For the ParDo based implementation is probably a good idea to implement readAll first as a basis for read, for an example take a look at HBaseIO.readAll. The idea is to pass a PCollection of queries to make it composable with only ParDos and with this produce the transform. Notice that HBaseAll uses annotations of the new IO API (SplittableDoFn) (PLEASE DON'T USE THOSE because they are not supported for all runners!. However produce the splits with a RestrictionTracker (ByteKey or OffsetRange or whatever they use to represent Ranges in Couchbase) this will help us to evolve in the future towards SDF if there is a need too. If you have doubts don't hesitate to ask me @EdgarLGB

@aromanenko-dev
Copy link
Contributor

@iemejia I'm not sure that couch_total_disk_size works in this case but It's possible to get a bucket statistics which provides dataUsed per bucket. So, we could potentially use this but there is another problem - there is no way (or I missed something?) to obtain mappings between bucket shards and offset/key ranges to create a proper splits. If there were such mapping then I believe we can use BoundedSource API.


PAssert.thatSingleton(output.apply("Count", Count.globally()))
.isEqualTo(Long.valueOf(SAMPLE_SIZE));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, add the following assert to check that all records were properly read:
PAssert.that(output).containsInAnyOrder(records);

@iemejia
Copy link
Member

iemejia commented Mar 28, 2019

@aromanenko-dev If there are no ways to obtain mappings between bucket shards and offset/key ranges how do they partition then? Because without this we may have even problems to apply a ReadAll-like with Restrictions (ranges) pattern.

@EdgarLGB
Copy link
Member Author

@iemejia Well, it seems that this mappings are not exposed to the user according to the documentation.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Mar 29, 2019

@iemejia These mappings are on low level and this API is not supposed to be exposed to users, as @EdgarLGB said. So, I think we need to follow an approach based on splitting by number of records and offset, and implement it using ParDo

@EdgarLGB
Copy link
Member Author

I have finished the rework using ParDo. I'm looking forward to some review. Thanks! @iemejia @aromanenko-dev

@aromanenko-dev
Copy link
Contributor

Run Java PreCommit

@iemejia
Copy link
Member

iemejia commented Jun 3, 2019

For info I did a review round of this one and will probably take the commit from @EdgarLGB and add some fixes on top because I think we can still improve it.

@EdgarLGB
Copy link
Member Author

EdgarLGB commented Jun 4, 2019

Run Java PreCommit

@EdgarLGB
Copy link
Member Author

EdgarLGB commented Jun 4, 2019

Thanks @iemejia! And additionally, I have made minor changes by resolving the conflict with the master branch.

@stale
Copy link

stale bot commented Aug 3, 2019

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.

@stale stale bot added the stale label Aug 3, 2019
@stale
Copy link

stale bot commented Aug 10, 2019

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@stale stale bot closed this Aug 10, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants