Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-3342] Create a Cloud Bigtable IO connector for Python #8457
[BEAM-3342] Create a Cloud Bigtable IO connector for Python #8457
Changes from 11 commits
6fa024d
156358a
90697db
ad9d3b9
8cc6485
99b0efd
c2270cb
b8c85bb
acc5a20
8819ee8
c4bd3ef
89d9424
c7688ef
ffce22f
aa20d08
057a7b0
9b76efe
c3b8b47
614887e
dd6c962
3114d98
727d54a
b771645
5414e0c
b27cef5
ad33d95
27f2a4a
9eb031d
cbfc64b
ae7d712
7acbacd
9c54caa
c3d4249
fdc426d
1962866
096953c
a8878d5
7eca091
d8c0130
3f6f37c
e9874c4
0d964ac
bc2a8dd
7bb69e1
2f3e37b
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: PEP 8 recommends using parenthesis for formatting instead of backslashes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to define this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally, "inherited" from attempts to implement a splittable
DoFn
. Removed.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method doesn't need to exist. I would think that you could use
bundles
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the way it's implemented in Apache Beam iobase.Read(PTransform). The FlatMap needs a callable object to process the elements in parallel, and the
split_source
makes up that callable. I'd also suggest we use similar naming convention for better unification/readability. What do you think?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Would it make sense to call this
_split_source
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed it too. The
iobase.Read
version doesn't have the underscore. It's whether we prefer the "proper" underscored way or the "unified" non-underscored one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a chat with Cham, and he rightly suggested it would be good to have unit testing of this splitting function, as it could cause data being dropped if it has any bugs - so maybe make it static, and add tests?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on a unit test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case that sample_row_keys is length 1,
split_source
will return an empty list and no data will be processed from the table. In addition, the documentation for table saysThe table might have contents before the first row key in the list and after the last one
which I dont think is taken into consideration here https://googleapis.dev/python/bigtable/latest/table.htmlWhat you may want to do is only do the Reshuffle + FlatMap if sample_row_keys is > 1. That worked for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
["", "")
means read the entire table.""
is used as a placeholder.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
split_source
must returnshuffle([["", <key1>), [[<key1>, <key2>), ... [<keyN>, ""))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If
sample_row_keys()
on line 240 returns a list of length one, then line 250 (insplit_source
) will be skipped (as it is trying to iterate overrange(1,1)
and shuffle will be called asshuffle([])
(which my python shell returns asNone
).Maybe line 242 should read as
if len(sample_row_keys) >= 1 and sample_row_keys[0].row_key != b''
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I dont see a place where [, ""] is appended to sample_row_keys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might have been implemented incorrectly. 'sample_row_keys' is implemented in the outer scope but 'split_source' is used as input to a FlatMap transform (hence split_source will be come a function wrapping a DoFn). Instead of using FlatMap here implement a DoFn class and provide whatever state that it needs to perform splitting to the constructor of the DoFn. Also note that this class (with it's state) has to be picklable.
One difference between Direct and Dataflow runners is that Dataflow runner pickles DoFn objects to send to remote workers. That might explain why your code doesn't work for Dataflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chamikaramj It might, but then how could it work a few months back? The code below works now, with Dataflow, albeit from a standalone file, meaning that pickling is probably not an issue:
When packaged, it becomes more like this:
The latter breaks with Dataflow while still running under Direct. As you can see, the logic is nearly identical, suggesting that some magic might happen during [un]packaging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "when packaged" ?
Note that if you are trying to utilize other modules that are not included with SDK, you have to build a local tarball before testing.
python setup.py sdist
When running include the SDK tarball with your module using --sdk_location option (since the default tarball downloaded by Dataflow will not have your module). For example, for worcount,
python -m apache_beam.examples.wordcount --input <input file path in GCS> --output <output path in GCS> --runner DataflowRunner --temp_location <temp location in GCS> --project <project> --sdk_location <local path to beam>/beam/sdks/python/dist/apache-beam-2.17.0.dev0.tar.gz
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"When packaged" means when packed into a local tarball, precisely via
python setup.py sdist
.So far we've been using
extra_package
option, and it seems to work fine for thewrite
part, and at some point worked for theread
part too. The main functional difference between thewrite
andread
tests is that for thewrite
part the rows are generated locally [within the test script], then sent to workers without intermediate steps, while during theread
sequence we first try splitting the database based on the sample row keys within the Dataflow engine, and then read the chunks of rows, as you may see from the sample script above.But I don't disagree, we could try specifying the
--sdk_location
option, see what happens.