Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add GCSToTrinoOperator #21704

Merged
merged 1 commit into from
Feb 27, 2022
Merged

Add GCSToTrinoOperator #21704

merged 1 commit into from
Feb 27, 2022

Conversation

rsg17
Copy link
Contributor

@rsg17 rsg17 commented Feb 21, 2022

Follow-up PR as discussed in #21084

Logic followed is similar to the above PR.
Loads a csv file from Google Cloud Storage into a Trino table.
Assumptions:

  1. First row of the csv contains headers
  2. Trino table with requisite columns is already created

^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@rsg17
Copy link
Contributor Author

rsg17 commented Feb 21, 2022

r? @eladkal
Here is the PR for GCSToTrinoOperator that we discussed on the GCSToPrestoOperator PR.

Comment on lines 96 to 100
data = list(csv.reader(temp_file))
fields = tuple(data[0])
rows = []
for row in data[1:]:
rows.append(tuple(row))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
data = list(csv.reader(temp_file))
fields = tuple(data[0])
rows = []
for row in data[1:]:
rows.append(tuple(row))
data = csv.reader(temp_file)
fields = tuple(next(data))
rows = (tuple(row) for row in data)

This saves significant memory usage when the downloaded file is large.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uranusjr
fields = tuple(next(data)) fails with TypeError: 'tuple' object is not an iterator when I run the unit-test.
The line flagged from the test file is op.execute(None)

I am not really sure why this fails. The error indicates I am passing in a tuple in place of a csv-reader. But, given the line from the test file at which the failure occurs; I am not sure how I can pass in a csv-reader.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeError: 'tuple' object is not an iterator

This doesn’t sound right. The next call is on a csv.reader(), which is not a tuple. Did you mistype this as next(tuple(data))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No - I did not mistype that. I was surprised it failed too!

I have pushed the change (I expect CI would fail).

@eladkal
Copy link
Contributor

eladkal commented Feb 21, 2022

Tests are failing:


  self = <Task(GCSToTrinoOperator): test_gcs_to_trino>, context = None
  
      def execute(self, context: 'Context') -> None:
          gcs_hook = GCSHook(
              gcp_conn_id=self.gcp_conn_id,
              delegate_to=self.delegate_to,
              impersonation_chain=self.impersonation_chain,
          )
      
          trino_hook = TrinoHook(trino_conn_id=self.trino_conn_id)
      
          with NamedTemporaryFile("w+") as temp_file:
              self.log.info("Downloading data from %s", self.source_object)
              gcs_hook.download(
                  bucket_name=self.source_bucket,
                  object_name=self.source_object,
                  filename=temp_file.name,
              )
      
              data = csv.reader(temp_file)
  >           fields = tuple(next(data))
  E           TypeError: 'tuple' object is not an iterator
  
  airflow/providers/trino/transfers/gcs_to_trino.py:97: TypeError

@rsg17
Copy link
Contributor Author

rsg17 commented Feb 21, 2022

Tests are failing:


  self = <Task(GCSToTrinoOperator): test_gcs_to_trino>, context = None
  
      def execute(self, context: 'Context') -> None:
          gcs_hook = GCSHook(
              gcp_conn_id=self.gcp_conn_id,
              delegate_to=self.delegate_to,
              impersonation_chain=self.impersonation_chain,
          )
      
          trino_hook = TrinoHook(trino_conn_id=self.trino_conn_id)
      
          with NamedTemporaryFile("w+") as temp_file:
              self.log.info("Downloading data from %s", self.source_object)
              gcs_hook.download(
                  bucket_name=self.source_bucket,
                  object_name=self.source_object,
                  filename=temp_file.name,
              )
      
              data = csv.reader(temp_file)
  >           fields = tuple(next(data))
  E           TypeError: 'tuple' object is not an iterator
  
  airflow/providers/trino/transfers/gcs_to_trino.py:97: TypeError

Yes. I made the change based on @uranusjr suggestion. This logic is more efficient than reading in the whole csv at once; but it causes the unit tests to fail.

I pushed the change upstream so that I can get suggestions on why this happens.

@rsg17
Copy link
Contributor Author

rsg17 commented Feb 22, 2022

Here is an updated version without the target_fields in the first row of the csv. I think all tests would pass for this.

I am still not sure why fields = tuple(next(data)) failed with TypeError: 'tuple' object is not an iterator. I finally ended excluding it from the code.

cc - @eladkal, @uranusjr

@uranusjr
Copy link
Member

But fields is gone entirely in your new commit. Is it intended?

@rsg17
Copy link
Contributor Author

rsg17 commented Feb 22, 2022

But fields is gone entirely in your new commit. Is it intended?

Yes. It is intended because it does not work with the logic for fields.

I can add a different way to specify schema (maybe a separate json file); but not fields

from airflow.utils.context import Context


class GCSToTrinoOperator(BaseOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there are args for this operator that could dynamically generated and should be template_fields? I'm thinking source_bucket, source_object, and trino_table(?) might be good candidates here.

And, on a related note, probably worth also adding the same template_fields to GCSToPrestoOperator in a separate PR too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you give an example of template fields / dynamically generated fields?

I can make the change here and for the presto operator after that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Within operators and sensors you can specify template_fields which allow the values for those args to be Jinja templated and unlock some functionality with TaskFlow API. The Concepts docs have some information on Jinja templating in operators.

So you'd want to think about how users might interact with this operator. Could they potentially use any of the built-in Jinja templates as part of the arg for certain parameters or even want to use an output from an upstream task as the value?

Storage buckets/containers and object names are pretty classic examples where Jinja templating is used frequently. Especially if you think about date-partitioned paths. This is why I was suggesting source_bucket and source_object as template_fields. But adding trino_table as an arg that can be templated can't hurt anything though.

Copy link
Contributor Author

@rsg17 rsg17 Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added template_fields. Thank you!!

@rsg17
Copy link
Contributor Author

rsg17 commented Feb 25, 2022

@eladkal , @uranusjr : I have added alternate ways to provide fields. User can either provide them as a list or point to a json file on GCS that has the fields. I referred to how this was done for gcs_to_bigquery

Do you think these will be enough?

@github-actions github-actions bot added the okay to merge It's ok to merge this PR as it does not require more tests label Feb 25, 2022
@github-actions
Copy link

The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest main or amend the last commit of the PR, and push it with --force-with-lease.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools area:providers kind:documentation okay to merge It's ok to merge this PR as it does not require more tests
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants