Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the interface for universal transfer operator #1267

Closed
wants to merge 26 commits into from
Closed

Add the interface for universal transfer operator #1267

wants to merge 26 commits into from

Conversation

sunank200
Copy link
Contributor

@sunank200 sunank200 commented Nov 18, 2022

Description

What is the current behavior?

Add interface for universal transfer operator with working example DAG.

closes: #1139

What is the new behavior?

  • Define an interface for the universal transfer operator
  • Implement a POC for Fivetran integration
  • Implement GCS to S3 transfer

Does this introduce a breaking change?

No

Checklist

  • Created tests which fail without the change (if possible)
  • Extended the README / documentation, if necessary

@codecov
Copy link

codecov bot commented Nov 18, 2022

Codecov Report

Base: 97.36% // Head: 97.36% // No change to project coverage 👍

Coverage data is based on head (56763c8) compared to base (0925b32).
Patch has no changes to coverable lines.

Additional details and impacted files
@@           Coverage Diff           @@
##             main    #1267   +/-   ##
=======================================
  Coverage   97.36%   97.36%           
=======================================
  Files          19       19           
  Lines         682      682           
=======================================
  Hits          664      664           
  Misses         18       18           

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report at Codecov.
📢 Do you have feedback about the report comment? Let us know in this issue.

python-sdk/dev/Dockerfile Outdated Show resolved Hide resolved
Copy link
Contributor

@pankajastro pankajastro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class FileLocationand IngestorSupported should we keep them in some constant.py or use the existing one?

create_dataprovider function should be a part of dataprovider module so what about keeping it in init.py of dataprovider

create_transfer_integration function should be a part of the integrations module so what about keeping it in init.py of integrations

get_dataset_connection_type should be part of the base dataset class

I feel util.py should not require

UTO will only be available if native API is available from source to destination?

Do we need some load testing?

I feel there is a lot of "Universal" prefixes here which is a bit confusing.

python-sdk/src/uto/data_providers/aws/s3.py Outdated Show resolved Hide resolved
python-sdk/src/uto/universal_transfer_operator.py Outdated Show resolved Hide resolved
python-sdk/src/uto/utils.py Outdated Show resolved Hide resolved
python-sdk/src/uto/datasets/base.py Outdated Show resolved Hide resolved
python-sdk/src/uto/universal_transfer_operator.py Outdated Show resolved Hide resolved
python-sdk/src/uto/universal_transfer_operator.py Outdated Show resolved Hide resolved
python-sdk/src/uto/data_providers/aws/s3.py Outdated Show resolved Hide resolved
python-sdk/src/uto/data_providers/aws/s3.py Outdated Show resolved Hide resolved
@sunank200
Copy link
Contributor Author

class FileLocationand IngestorSupported should we keep them in some constant.py or use the existing one?

create_dataprovider function should be a part of dataprovider module so what about keeping it in init.py of dataprovider

create_transfer_integration function should be a part of the integrations module so what about keeping it in init.py of integrations

get_dataset_connection_type should be part of the base dataset class

I feel util.py should not require

UTO will only be available if native API is available from source to destination?

Do we need some load testing?

I feel there is a lot of "Universal" prefixes here which is a bit confusing.

I have done the necessary changes or commented accordingly.

@@ -0,0 +1,27 @@
from enum import Enum
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunank200 will the UniversalTransferOperator be shipped as an independent package or as part of the python-sdk?
If it is part of the same package, it may be confusing to have a separate constants file - and a different FileLocation definition than the rest.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was exactly the question I had here

@sunank200 sunank200 marked this pull request as ready for review November 30, 2022 07:30
python-sdk/src/transfers/universal_transfer_operator.py Outdated Show resolved Hide resolved
python-sdk/src/transfers/integrations/base.py Outdated Show resolved Hide resolved
Comment on lines 24 to 26
module = importlib.import_module(module_path)
class_name = get_class_name(module_ref=module, suffix="Integration")
transfer_integrations: TransferIntegrations = getattr(module, class_name)(transfer_params)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really have to dynamically import? I think for simplicity, we should map type to class directly. E.g.

from transfers.integrations.fivetran import FivetranIntegration

CUSTOM_INGESTION_TYPE_TO_MODULE_PATH = {"Fivetran": FivetranIntegration}

Because if you really want to do all of this dynamically you need to take care of more things. For example, suffix="Integration" is very hidden whereas the definition of the class(name) is in one place i.e. class FivetranIntegration. One could easily forget that the class has to have an integration suffix.

Note also that your exceptions on the module path only fail on run-time - not during parsing.

I think as long as a dynamic import is not necessary, I would go with static imports instead.

Copy link
Collaborator

@dimberman dimberman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunank200 is there a way to break this up into smaller PRs? This is a lot to bite off at once.

@@ -0,0 +1,27 @@
from enum import Enum
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was exactly the question I had here

Comment on lines +4 to +12
class FileLocation(Enum):
# [START filelocation]
LOCAL = "local"
HTTP = "http"
HTTPS = "https"
GS = "google_cloud_platform" # Google Cloud Storage
google_cloud_platform = "google_cloud_platform" # Google Cloud Storage
S3 = "s3" # Amazon S3
AWS = "aws"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the plan to keep this static to what we "officially" support? Will there be a way for users to load custom locations/ingestors on top of this library?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if users could easily create custom ingestors and treat them as plugins

Comment on lines +18 to +37
def create_dataprovider(
dataset: Dataset,
transfer_params: dict = {},
transfer_mode: TransferMode = TransferMode.NONNATIVE,
if_exists: LoadExistStrategy = "replace",
) -> DataProviders:
from airflow.hooks.base import BaseHook

conn_type = BaseHook.get_connection(dataset.conn_id).conn_type
module_path = DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING[conn_type]
module = importlib.import_module(module_path)
class_name = get_class_name(module_ref=module, suffix="DataProvider")
data_provider: DataProviders = getattr(module, class_name)(
conn_id=dataset.conn_id,
extra=dataset.extra,
transfer_params=transfer_params,
transfer_mode=transfer_mode,
if_exists=if_exists,
)
return data_provider
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See I think we could easily have something that checks for plugins in this function. This might be future-work just wanted to bring it up

from transfers.data_providers.base import DataProviders
from transfers.datasets.base import UniversalDataset as Dataset

from astro.constants import LoadExistStrategy
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF the plan is to separate this into its own package it will be worth seeing how much/how little we actually import from other astro SDK modules.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(if this is separated I imagine astro SDK would import this rather than the other way around)

"""Return true if the dataset exists"""
raise NotImplementedError

def load_data_from_gcs(self, source_dataset: Dataset, destination_dataset: Dataset) -> None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait are we going to have a function for each ingestor? This seems like it could blow up the classes. Unless this is more meant for "special cases"?

source_dataprovider.get_bucket_name(source_dataset), # type: ignore
source_dataset.extra.get("delimiter", None),
source_dataset.extra.get("prefix", None),
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is WAY too long. Please break it up into sub-functions.

Comment on lines +99 to +110
if files:
for file in files:
with source_hook.provide_file(
object_name=file, bucket_name=source_dataprovider.get_bucket_name(source_dataset) # type: ignore
) as local_tmp_file:
dest_key = os.path.join(dest_s3_key, file)
logging.info("Saving file to %s", dest_key)
self.hook.load_file(
filename=local_tmp_file.name,
key=dest_key,
replace="replace",
acl_policy=destination_dataset.extra.get("s3_acl_policy", None),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This level of nesting is problematic. This should be a subfunction.

@sunank200
Copy link
Contributor Author

Closing this as PR as continuing the work on other PR: #1492

@sunank200 sunank200 closed this Dec 23, 2022
sunank200 added a commit that referenced this pull request Jan 13, 2023
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
Closing the PR #1267 and
continuing the change in this fresh PR.

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1139 
closes: #1544 
closes: #1545
closes: #1546
closes: #1551


## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->
- Define interfaces
    -  Use Airflow 2.4 Dataset concept to build more types of Datasets:
        - Table
        - File
        - Dataframe
        - API
    
    - Define an interface for the universal transfer operator 
- Add the `TransferParameters` class to pass transfer configurations.
        - Use context manager from DataProvider for clean up.
- Introduce three transfer modes - `native`, `non-native` and
`third-party`.
    
    - `DataProviders`
        - Add interface for `DataProvider`.
        - Add interface for `BaseFilesystemProviders`.
- Add `read` and `write` methods in `DataProviders` with the context
manager.
    
    - `TransferIntegrations` and third-party transfers
- Add interface for `TransferIntegrations` and introduce the third-party
transfer approach
 
- Non-native transfers
    - Add `Dataprovider` for S3 and GCS.
    - Add a transfer workflow for S3 to GCS using a non-native approach.
    - Add a transfer workflow for GCS to S3 using a non-native approach.
    - Add example DAG for S3 to GCS implementation.
    - Add example DAG for GCS to S3 implementation.

- Third-party transfers
- Add `FivetranTransferIntegration` class for all transfers using
Fivetran.
- Implement `FivetranOptions` which inherits from `TransferParameters`
class to pass transfer configurations.
    - Implement a POC for Fivetran integration
    - Add example DAG for Fivetran implementation
- Fivetran POC with working DAG for transfer example (S3 to Snowflake)
when `connector_id` is passed.

- Document the APIs for Fivetran transfers on the notion here:
https://www.notion.so/astronomerio/Fivetran-3bd9ecfbdcae411faa49cb38595a4571

- MakeFile and Dockerfile along with docker-compose.yaml to build it
locally and on the container


## Does this introduce a breaking change?
No

### Checklist
- [x] Example DAG
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README/documentation, if necessary

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com>
utkarsharma2 pushed a commit that referenced this pull request Jan 17, 2023
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
Closing the PR #1267 and
continuing the change in this fresh PR.

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1139 
closes: #1544 
closes: #1545
closes: #1546
closes: #1551


## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->
- Define interfaces
    -  Use Airflow 2.4 Dataset concept to build more types of Datasets:
        - Table
        - File
        - Dataframe
        - API
    
    - Define an interface for the universal transfer operator 
- Add the `TransferParameters` class to pass transfer configurations.
        - Use context manager from DataProvider for clean up.
- Introduce three transfer modes - `native`, `non-native` and
`third-party`.
    
    - `DataProviders`
        - Add interface for `DataProvider`.
        - Add interface for `BaseFilesystemProviders`.
- Add `read` and `write` methods in `DataProviders` with the context
manager.
    
    - `TransferIntegrations` and third-party transfers
- Add interface for `TransferIntegrations` and introduce the third-party
transfer approach
 
- Non-native transfers
    - Add `Dataprovider` for S3 and GCS.
    - Add a transfer workflow for S3 to GCS using a non-native approach.
    - Add a transfer workflow for GCS to S3 using a non-native approach.
    - Add example DAG for S3 to GCS implementation.
    - Add example DAG for GCS to S3 implementation.

- Third-party transfers
- Add `FivetranTransferIntegration` class for all transfers using
Fivetran.
- Implement `FivetranOptions` which inherits from `TransferParameters`
class to pass transfer configurations.
    - Implement a POC for Fivetran integration
    - Add example DAG for Fivetran implementation
- Fivetran POC with working DAG for transfer example (S3 to Snowflake)
when `connector_id` is passed.

- Document the APIs for Fivetran transfers on the notion here:
https://www.notion.so/astronomerio/Fivetran-3bd9ecfbdcae411faa49cb38595a4571

- MakeFile and Dockerfile along with docker-compose.yaml to build it
locally and on the container


## Does this introduce a breaking change?
No

### Checklist
- [x] Example DAG
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README/documentation, if necessary

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com>
sunank200 added a commit to astronomer/apache-airflow-providers-transfers that referenced this pull request Mar 24, 2023
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
Closing the PR astronomer/astro-sdk#1267 and
continuing the change in this fresh PR.

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1139 
closes: #1544 
closes: #1545
closes: #1546
closes: #1551


## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->
- Define interfaces
    -  Use Airflow 2.4 Dataset concept to build more types of Datasets:
        - Table
        - File
        - Dataframe
        - API
    
    - Define an interface for the universal transfer operator 
- Add the `TransferParameters` class to pass transfer configurations.
        - Use context manager from DataProvider for clean up.
- Introduce three transfer modes - `native`, `non-native` and
`third-party`.
    
    - `DataProviders`
        - Add interface for `DataProvider`.
        - Add interface for `BaseFilesystemProviders`.
- Add `read` and `write` methods in `DataProviders` with the context
manager.
    
    - `TransferIntegrations` and third-party transfers
- Add interface for `TransferIntegrations` and introduce the third-party
transfer approach
 
- Non-native transfers
    - Add `Dataprovider` for S3 and GCS.
    - Add a transfer workflow for S3 to GCS using a non-native approach.
    - Add a transfer workflow for GCS to S3 using a non-native approach.
    - Add example DAG for S3 to GCS implementation.
    - Add example DAG for GCS to S3 implementation.

- Third-party transfers
- Add `FivetranTransferIntegration` class for all transfers using
Fivetran.
- Implement `FivetranOptions` which inherits from `TransferParameters`
class to pass transfer configurations.
    - Implement a POC for Fivetran integration
    - Add example DAG for Fivetran implementation
- Fivetran POC with working DAG for transfer example (S3 to Snowflake)
when `connector_id` is passed.

- Document the APIs for Fivetran transfers on the notion here:
https://www.notion.so/astronomerio/Fivetran-3bd9ecfbdcae411faa49cb38595a4571

- MakeFile and Dockerfile along with docker-compose.yaml to build it
locally and on the container


## Does this introduce a breaking change?
No

### Checklist
- [x] Example DAG
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README/documentation, if necessary

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Co-authored-by: Felix Uellendall <feluelle@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement Universal transfer operator [POC]
6 participants