Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create salesforce to gcs transfer #10760

Merged
merged 10 commits into from
Oct 30, 2020

Conversation

chipmyersjr
Copy link
Contributor

Add Salesforce to GCS transfer operator. Closes issue #8896


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@boring-cyborg boring-cyborg bot added area:dev-tools area:docs provider:google Google (including GCP) related issues labels Sep 5, 2020
"""

def __init__(
self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self,
self, *,

nitpick here. Operator arguments are now keyword-only

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Added this arg.

Comment on lines 36 to 37
:param query: The query to make to Salesforce.
:type query: str
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
:param query: The query to make to Salesforce.
:type query: str
:param query: The query to make to Salesforce.
:type query: str

I'm thinking that this would give error if not on the same line with the param?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, fixed formatting for all parameters in this doc string.

from airflow.providers.google.cloud.transfers.salesforce_to_gcs import SalesforceToGcsOperator
from airflow.utils.dates import days_ago

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/root/keyfile.json"
Copy link
Contributor

@ephraimbuddy ephraimbuddy Sep 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! 👍
System test would make this easy with @pytest.mark.credential_file and I think it(system test) is required for google provider packages. Take a look

@pytest.mark.credential_file(GCP_BIGQUERY_KEY)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the example! I am looking into implementing system test for this operator.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General comments:

  1. Salesforce hook already support multiple formats (json, csv , ndjson) why do you limit this operator only to csv? It also has write_object_to_file method which seems like you implemented in another way in here.

  2. I wonder how close can we get to the same behavior and parameters as BaseSQLToGCSOperator? (It has relevant params like approx_max_file_size_bytes )

:type object_name: str
:param salesforce_conn_id: the name of the connection that has the parameters
we need to connect to Salesforce. The connection should be type `http` and
include a user's security token in the `Extras` field.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to specify this. It's part of the doc of salesforce connection #10482.
This can change when Airflow will support other connection options for salesforce.

@chipmyersjr
Copy link
Contributor Author

@eladkal Thanks for the feedback. I think using write_object_to_file method is a good call out. Will make this simpler and more flexible so I'm going to try to implement it that way.

In terms of adding other parameters, do you think that adding parameters related to file creation like approx_max_file_size_bytes and field_delimiter to happen within write_object_to_file would make sense? Alternatively the execute method would chunk the results and call write_object_to_file multiple times.

@eladkal
Copy link
Contributor

eladkal commented Sep 7, 2020

In terms of adding other parameters, do you think that adding parameters related to file creation like approx_max_file_size_bytes and field_delimiter to happen within write_object_to_file would make sense? Alternatively the execute method would chunk the results and call write_object_to_file multiple times.

Ignore my previous comment
I would say that providing the same GCP parameters like FacebookAdsReportToGcsOperator is OK though I'm not sure why it doesn't have delegate_to. Could be that at some point there will be a BaseFileToGcsOperator which will prevent the need from adding delegate_to & impersonation_chain to every operator.
I think @mik-laj or @turbaszek are better to answer what parameters are mandatory for GCP.

I'll review the salesforce part later this week.



@contextmanager
def provide_facebook_connection(key_file_path: str):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind moving this function to tests.test_utils.facebook_system_helpers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Moved to test_utils. Also this should be named salesforce so renamed as well.

@pytest.mark.backend("mysql", "postgres")
@pytest.mark.credential_file(GCP_BIGQUERY_KEY)
@pytest.mark.credential_file(SALESFORCE_KEY)
@pytest.mark.system("google.cloud")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also mark it as salesforce?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Marked as salesforce system test.

@turbaszek
Copy link
Member

@chipmyersjr can you please take a look at the CI issues? Pylint seems to be sad :<

@chipmyersjr
Copy link
Contributor Author

@turbaszek seems pylint was complaining about some unrelated files. I just did rebase and checks seem to be in in good shape now.

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Salesforce logic looks good
Can you please add support for template_fields, template_ext (reading query from file) ?

@chipmyersjr
Copy link
Contributor Author

@eladkal thanks for checking. I added templating support for parameters where I felt is was appropriate.

filename=path,
gzip=self.gzip,
)
self.log.info("%s uploaded to GCS", path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to return the path to file so it can be easily referenced in downstream operators

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @turbaszek , 'path' variable here represents the temporary file that gets created
just to be uploaded to GCS and wouldn't be available outside of this context. I'm wondering if it would be more appropriate to pass
this or alternatively the path to the actual GCS object? Also wondering about which would be better to log actually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not being precise, by returning path I thought about returning GCS uri

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. I implemented uri return value for execute method.

@eladkal
Copy link
Contributor

eladkal commented Oct 27, 2020

@turbaszek is something missing to get it merged?

@ryw ryw removed the area:docs label Oct 27, 2020
@turbaszek
Copy link
Member

@chipmyersjr can you please rebase and solve the conflict?

@chipmyersjr
Copy link
Contributor Author

chipmyersjr commented Oct 29, 2020

@turbaszek Should be good to go now.

@turbaszek turbaszek merged commit 2f703df into apache:master Oct 30, 2020
szn pushed a commit to szn/airflow that referenced this pull request Nov 1, 2020
Adds SalesforceToGcsOperator that allows users to transfer data from
Salesforce to GCS bucket.

Co-authored-by: Tomek Urbaszek <turbaszek@gmail.com>
@eladkal eladkal mentioned this pull request Nov 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:google Google (including GCP) related issues
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants