Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9722] added SnowflakeIO with Read operation #11360

Merged
merged 16 commits into from May 21, 2020

Conversation

DariuszAniszewski
Copy link
Contributor

@DariuszAniszewski DariuszAniszewski commented Apr 9, 2020

This PR is adding SnowflakeIO with Read operation as part of BEAM-9722

Snowflake is an analytic data warehouse provided as Software-as-a-Service (SaaS). It uses a new SQL database engine with a unique architecture designed for the cloud. To read more details please check here and here.

The SnowflakeIO.Read uses Snowflake's JDBC driver to run COPY INTO statement to move data on GCS as CSV files that are then read via FileIO.

The SnowflakeIO allows to use three authentication methods against Snowflake:

  • username and password
  • key-pair
  • pre-obtained OAuth token

This PR is first of series, once merged, subsequent PRs will come - with Write, integration tests etc. We're also working on cross-language to support Python SDK.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- Build Status --- --- Build Status
Java Build Status Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status
Build Status
Build Status
Python Build Status
Build Status
Build Status
Build Status
--- Build Status
Build Status
Build Status
Build Status
Build Status
--- --- Build Status
XLang --- --- --- Build Status --- --- Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website
Non-portable Build Status Build Status
Build Status
Build Status Build Status
Portable --- Build Status --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

@DariuszAniszewski DariuszAniszewski changed the title added SnowflakeIO with Read operation [WIP] added SnowflakeIO with Read operation Apr 9, 2020
@DariuszAniszewski DariuszAniszewski marked this pull request as draft April 9, 2020 09:51
@DariuszAniszewski DariuszAniszewski changed the title [WIP] added SnowflakeIO with Read operation [WIP][BEAM-9722] added SnowflakeIO with Read operation Apr 9, 2020
@DariuszAniszewski DariuszAniszewski changed the title [WIP][BEAM-9722] added SnowflakeIO with Read operation [BEAM-9722] added SnowflakeIO with Read operation Apr 10, 2020
@DariuszAniszewski DariuszAniszewski marked this pull request as ready for review April 10, 2020 14:45
@DariuszAniszewski
Copy link
Contributor Author

R: @aromanenko-dev
on dev-list you were interested in reviewing PRs related to SnowflakeIO - could you please take a look or point other reviewer? thanks!

@aromanenko-dev
Copy link
Contributor

@DariuszAniszewski Thank you for contribution! I'm a bit busy for now but I'll try to take a look on this asap.

@DariuszAniszewski
Copy link
Contributor Author

@aromanenko-dev I don't want to push you but do you have any ETA on the review? I'm OK with waiting but it would be nice to roughly know how long it may take ;)

I also have a generic question - as mentioned in description of this very PR, it's first from a series. We have more pieces of SnowflakeIO done, but we wanted to make as small and atomic PRs as possible to ease the review process and merge them one by one. But maybe this approach is misfired and it's better to put all pieces of the IO (in atomic commits of course) into this one PR?

WDYT?

@aromanenko-dev
Copy link
Contributor

@DariuszAniszewski I'm sorry for a delay (have quite busy time at work) with review. I'll try to do the first round till the end of this week.

Regarding a second part of your message. AFAIK, there is no common rule for the size of PR in Beam, but personally I'd prefer an approach you proposed and move forward by logical atomic steps since it will be easier for all, as I believe. Also, I'd suggest to create the separate Jiras in this case.

@aromanenko-dev
Copy link
Contributor

@iemejia Would you have some time to take a look as well?

Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for contribution again! I think it should be a useful and demanded IO for Beam users.

I did a first round of review (only main business logic, not tests yet), please take a look on my comments.
Also, please, fix a commit message - it has to start with Jira ID as well as PR name.

CHANGES.md Outdated Show resolved Hide resolved
.apply(Wait.on(output))
.apply(
ParDo.of(
new CleanTmpFilesFromGcsFn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the temp directory be cleaned if pipeline was failed before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that it doesn't at the moment. @kkucharc will share more info here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we checked and it doesn't. @aromanenko-dev do you think it should be provided? In case of testing, probably tests should take care of cleanup.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that many failed pipelines could lead to wasting of used dick space in this case. It would be better to avoid such behavior, if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I generally agree that failing pipelines will eventually lead to lots of garbage on GCS (or other storage in future) I'm not sure how to ensure those files are deleted regardless of pipeline status.

AFAIK there is nothing like @After or @Teardown for PTransform which we have in our IO. Using Wait transform gives us ability to remove files once data is read and seems reasonable choice.

I've been checking how i.e. BigQueryIO is handling that case as it also needs to cleanup and they also have a cleanup transform that is called once all rows are read. I assume in their case cleanup also won't be run if something in-between fails.

How about filing JIRA issue for this case and try to look at solution in parallel to delivering other pieces of the Snowflake connector? @aromanenko-dev @kkucharc WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a known issue with BQ source as well. Failed pipelines can leave temporary files behind. I'm afraid there is no good solution today. I think we need to introduce some sort of a generalized cleanup step to address this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I'd suggest to add this into IO class Java doc to make users aware that such situation is possible and it will require manual procedure to clean temp dirs up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a note here - would it be enough?


/** Interface which defines common methods for cloud providers. */
public interface SnowflakeCloudProvider {
void removeFiles(String bucketName, String pathOnBucket);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use Apache Beam's notion of a FileSystem?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggestion @lukecwik . I changed it to GCSFileSystem.

I also tried to change removeFiles in Fake implementation to use LocalFileSystem but I am a little bit concerned - LocalFileSystem doesn't match nested directories and fails on deleting not empty directory. That can cause that testing directory won't be cleaned and tests will become flaky.

public class GCSProvider implements SnowflakeCloudProvider, Serializable {

@Override
public void removeFiles(String bucketName, String pathOnBucket) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the Apache Beam GCS filesystem?

@DariuszAniszewski
Copy link
Contributor Author

Thanks @aromanenko-dev and @lukecwik for review.
I rebased onto (current) master and applied simplest changes already - I'll be back in a few days with rest of them.

@DariuszAniszewski
Copy link
Contributor Author

I'm dealing with some private issues this week and I'll be unavailable.
@kkucharc and @purbanow will continue working on this PR

@DariuszAniszewski
Copy link
Contributor Author

@aromanenko-dev @lukecwik thanks again for the review. We've applied the changes and all your comments/questions are addressed: either changed or answered.

Can you please re-review? :)

Copy link
Contributor

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Just one comment.

emptyCollection
.apply(
ParDo.of(
new CopyIntoStageFn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better but I don't think it's a good user experience to fail the pipeline whenever a bundle is retried. Bundle retries are a usual part of the runner execution and source/sink transforms should be able to handle that.

What is usually done in this case is to write data to a new temporary location each time the copy step is run. Then you add a Reshuffle write after the copy operation to checkpoint the results. This way you can guarantee that the set of files that are output to the next step are from a single execution and the job does not have to fail simply because a bundle is retried.

When you cleanup you have to clean all temporary locations including retries.

I'm fine if you don't want to handle this in the first try but let's at least add a TODO and a JIRA to fix this later.

@chamikaramj
Copy link
Contributor

LGTM. Thanks.

@chamikaramj
Copy link
Contributor

Retest tthis please

@chamikaramj
Copy link
Contributor

Retest this please

@chamikaramj
Copy link
Contributor

I can "squash and merge" after tests pass.
Feel free to squash/fixup commits if you need more than one commit.

@chamikaramj
Copy link
Contributor

Retest this please

@DariuszAniszewski
Copy link
Contributor Author

Thanks!
I'm OK with squash-and-merge - it was intended to go as single commit

@chamikaramj
Copy link
Contributor

Retest this please

1 similar comment
@chamikaramj
Copy link
Contributor

Retest this please

@chamikaramj
Copy link
Contributor

Having trouble re-triggering tests.
Can someone else try ?

@chamikaramj
Copy link
Contributor

Retest this please

@chamikaramj
Copy link
Contributor

Run Python PreCommit

1 similar comment
@chamikaramj
Copy link
Contributor

Run Python PreCommit

@chamikaramj
Copy link
Contributor

Run Python2_PVR_Flink PreCommit

2 similar comments
@chamikaramj
Copy link
Contributor

Run Python2_PVR_Flink PreCommit

@chamikaramj
Copy link
Contributor

Run Python2_PVR_Flink PreCommit

@kkucharc
Copy link
Contributor

Run Python2_PVR_Flink PreCommit

@DariuszAniszewski
Copy link
Contributor Author

Just a small comment about the force-push from above - it was mistakenly done, then reverted. HEAD of this branch is still on 3ba192a and comment is a leftover.

@kkucharc
Copy link
Contributor

I retested failing test - probably the previous one was timeouting.

@chamikaramj chamikaramj merged commit 73fa135 into apache:master May 21, 2020
@chamikaramj
Copy link
Contributor

Seems like this was not included in Beam 2.22.0 cut. So I'll remove it from CHANGES.md entry for Beam 2.22.0.

yirutang pushed a commit to yirutang/beam that referenced this pull request Jul 23, 2020
* [BEAM-9722] added SnowflakeIO with Read operation

* [BEAM-9722] Added SnowflakeCloudProvider to enable use various clouds with Snowflake

* [BEAM-9722] added docstrings for public methods

* [BEAM-9722] Added changed cleanup staged GCS files to Beam FileSystems

* [BEAM-9722] Added javadocs for public methods in DataSourceConfiguration

* add testing p8 file to RAT exclude
refactor SnowflakeCredentials
add information about possibly left files on cloud storage
small docs changes

* documentation changes

* [BEAM-9722] Added TestRule and changed Unit tests to use pipeline.run

* [BEAM-9722] Renamed Snowflake Read unit test and applied spotless

* [BEAM-9722] remove SnowflakeCloudProvider interface

* [BEAM-9722] doc changes

* [BEAM-9722] add `withoutValidation` to disable verifying connection to Snowflake during pipeline construction

* [BEAM-9722] added MoveOption and removed leftover file

* [BEAM-9722] fixed tests. Add tests for `withQuery`

* [BEAM-9722] make `CopyIntoStageFn` retryable

* [BEAM-9722] added `Reshuffle` step after `CopyIntoStageFn`

Co-authored-by: Kasia Kucharczyk <katarzyna.kucharczyk@polidea.com>
Co-authored-by: pawel.urbanowicz <pawel.urbanowicz@polidea.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants