Skip to content
This repository has been archived by the owner on Apr 11, 2024. It is now read-only.

Commit

Permalink
Add the DatabaseDataProvider and SqliteDataProvider (#1777)
Browse files Browse the repository at this point in the history
# Description
## What is the current behavior?
<!-- Please describe the current behavior that you are modifying. -->
- Add `Dataprovider` for Databases
- Add `DataProvider` for Sqlite
- Add non-native transfer implementation for s3 to Sqlite
- Add non-native transfer implementation for GCS to Sqlite
- Add example DAG

<!--
Issues are required for both bug fixes and features.
Reference it using one of the following:

closes: #ISSUE
related: #ISSUE
-->
closes: #1731

## What is the new behavior?
<!-- Please describe the behavior or changes that are being added by
this PR. -->

- Add `Dataprovider` for Databases
- Add `DataProvider` for Sqlite
- Add non-native transfer implementation for s3 to Sqlite
- Add non-native transfer implementation for GCS to Sqlite
- Add example DAG

Following is the recording of example DAG running:
https://astronomer.slack.com/archives/C03868KGF2Q/p1676662728169199

## Does this introduce a breaking change?
No

### Checklist
- [x] Created tests which fail without the change (if possible)
- [x] Extended the README / documentation, if necessary

---------

Co-authored-by: utkarsh sharma <utkarsharma2@gmail.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 2, 2023
1 parent 8554233 commit 9a311e7
Show file tree
Hide file tree
Showing 22 changed files with 1,472 additions and 43 deletions.
31 changes: 24 additions & 7 deletions example_dags/example_universal_transfer_operator.py
Expand Up @@ -3,12 +3,15 @@

from airflow import DAG

from universal_transfer_operator.constants import TransferMode
from universal_transfer_operator.constants import FileType, TransferMode
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Metadata, Table
from universal_transfer_operator.integrations.fivetran import Connector, Destination, FiveTranOptions, Group
from universal_transfer_operator.universal_transfer_operator import UniversalTransferOperator

s3_bucket = os.getenv("S3_BUCKET", "s3://astro-sdk-test")
gcs_bucket = os.getenv("GCS_BUCKET", "gs://uto-test")

with DAG(
"example_universal_transfer_operator",
schedule_interval=None,
Expand All @@ -17,30 +20,44 @@
) as dag:
transfer_non_native_gs_to_s3 = UniversalTransferOperator(
task_id="transfer_non_native_gs_to_s3",
source_dataset=File(path="gs://uto-test/uto/", conn_id="google_cloud_default"),
destination_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
source_dataset=File(path=f"{gcs_bucket}/uto/", conn_id="google_cloud_default"),
destination_dataset=File(path=f"{s3_bucket}/uto/", conn_id="aws_default"),
)

transfer_non_native_s3_to_gs = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_gs",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
source_dataset=File(path=f"{s3_bucket}/uto/", conn_id="aws_default"),
destination_dataset=File(
path="gs://uto-test/uto/",
path=f"{gcs_bucket}/uto/",
conn_id="google_cloud_default",
),
)

transfer_non_native_s3_to_sqlite = UniversalTransferOperator(
task_id="transfer_non_native_s3_to_sqlite",
source_dataset=File(path=f"{s3_bucket}/uto/csv_files/", conn_id="aws_default", filetype=FileType.CSV),
destination_dataset=Table(name="uto_s3_table", conn_id="sqlite_default"),
)

transfer_non_native_gs_to_sqlite = UniversalTransferOperator(
task_id="transfer_non_native_gs_to_sqlite",
source_dataset=File(
path=f"{gcs_bucket}/uto/csv_files/", conn_id="google_cloud_default", filetype=FileType.CSV
),
destination_dataset=Table(name="uto_gs_table", conn_id="sqlite_default"),
)

transfer_fivetran_with_connector_id = UniversalTransferOperator(
task_id="transfer_fivetran_with_connector_id",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
source_dataset=File(path=f"{s3_bucket}/uto/", conn_id="aws_default"),
destination_dataset=Table(name="fivetran_test", conn_id="snowflake_default"),
transfer_mode=TransferMode.THIRDPARTY,
transfer_params=FiveTranOptions(conn_id="fivetran_default", connector_id="filing_muppet"),
)

transfer_fivetran_without_connector_id = UniversalTransferOperator(
task_id="transfer_fivetran_without_connector_id",
source_dataset=File(path="s3://astro-sdk-test/uto/", conn_id="aws_default"),
source_dataset=File(path=f"{s3_bucket}/uto/", conn_id="aws_default"),
destination_dataset=Table(
name="fivetran_test",
conn_id="snowflake_default",
Expand Down
10 changes: 8 additions & 2 deletions pyproject.toml
Expand Up @@ -61,7 +61,10 @@ google = [
snowflake = [
"apache-airflow-providers-snowflake",
"snowflake-sqlalchemy>=1.2.0",
"snowflake-connector-python[pandas]",
"snowflake-connector-python[pandas]<3.0.0",
# pinning snowflake-connector-python[pandas]<3.0.0 due to a conflict in snowflake-connector-python/pyarrow/google
# packages and pandas-gbq/google packages which is forcing pandas-gbq of version 0.13.2 installed, which is not
# compatible with pandas 1.5.3
]

amazon = [
Expand All @@ -79,7 +82,10 @@ all = [
"apache-airflow-providers-google>=6.4.0",
"apache-airflow-providers-snowflake",
"smart-open[all]>=5.2.1",
"snowflake-connector-python[pandas]",
"snowflake-connector-python[pandas]<3.0.0",
# pinning snowflake-connector-python[pandas]<3.0.0 due to a conflict in snowflake-connector-python/pyarrow/google
# packages and pandas-gbq/google packages which is forcing pandas-gbq of version 0.13.2 installed, which is not
# compatible with pandas 1.5.3
"snowflake-sqlalchemy>=1.2.0",
"sqlalchemy-bigquery>=1.3.0",
"s3fs",
Expand Down
1 change: 1 addition & 0 deletions src/universal_transfer_operator/constants.py
Expand Up @@ -99,3 +99,4 @@ def __repr__(self):
LoadExistStrategy = Literal["replace", "append"]
DEFAULT_CHUNK_SIZE = 1000000
ColumnCapitalization = Literal["upper", "lower", "original"]
DEFAULT_SCHEMA = "tmp_transfers"
16 changes: 10 additions & 6 deletions src/universal_transfer_operator/data_providers/__init__.py
Expand Up @@ -5,14 +5,18 @@
from universal_transfer_operator.constants import TransferMode
from universal_transfer_operator.data_providers.base import DataProviders
from universal_transfer_operator.datasets.base import Dataset
from universal_transfer_operator.datasets.file.base import File
from universal_transfer_operator.datasets.table import Table
from universal_transfer_operator.utils import TransferParameters, get_class_name

DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING = {
"s3": "universal_transfer_operator.data_providers.filesystem.aws.s3",
"aws": "universal_transfer_operator.data_providers.filesystem.aws.s3",
"gs": "universal_transfer_operator.data_providers.filesystem.google.cloud.gcs",
"google_cloud_platform": "universal_transfer_operator.data_providers.filesystem.google.cloud.gcs",
"sftp": "universal_transfer_operator.data_providers.filesystem.sftp",
("s3", File): "universal_transfer_operator.data_providers.filesystem.aws.s3",
("aws", File): "universal_transfer_operator.data_providers.filesystem.aws.s3",
("gs", File): "universal_transfer_operator.data_providers.filesystem.google.cloud.gcs",
("google_cloud_platform", File): "universal_transfer_operator.data_providers.filesystem.google.cloud.gcs",
("sqlite", Table): "universal_transfer_operator.data_providers.database.sqlite",
("sftp", File): "universal_transfer_operator.data_providers.filesystem.sftp",
("sqlite", Table): "universal_transfer_operator.data_providers.database.sqlite",
}


Expand All @@ -22,7 +26,7 @@ def create_dataprovider(
transfer_mode: TransferMode = TransferMode.NONNATIVE,
) -> DataProviders:
conn_type = BaseHook.get_connection(dataset.conn_id).conn_type
module_path = DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING[conn_type]
module_path = DATASET_CONN_ID_TO_DATAPROVIDER_MAPPING[(conn_type, type(dataset))]
module = importlib.import_module(module_path)
class_name = get_class_name(module_ref=module, suffix="DataProvider")
data_provider: DataProviders = getattr(module, class_name)(
Expand Down
8 changes: 8 additions & 0 deletions src/universal_transfer_operator/data_providers/base.py
Expand Up @@ -97,3 +97,11 @@ def openlineage_dataset_name(self) -> str:
https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
"""
raise NotImplementedError

@property
def openlineage_dataset_uri(self) -> str:
"""
Returns the open lineage dataset uri as per
https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md
"""
raise NotImplementedError
Empty file.

0 comments on commit 9a311e7

Please sign in to comment.