Skip to content

Commit

Permalink
airbyte-lib snowflake integration (#34356)
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe Reuter committed Jan 19, 2024
1 parent 3a7f00f commit d88104e
Show file tree
Hide file tree
Showing 14 changed files with 1,224 additions and 445 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/airbyte-ci-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ jobs:
docker_hub_password: ${{ secrets.DOCKER_HUB_PASSWORD }}
docker_hub_username: ${{ secrets.DOCKER_HUB_USERNAME }}
gcs_credentials: ${{ secrets.METADATA_SERVICE_PROD_GCS_CREDENTIALS }}
gcp_gsm_credentials: ${{ secrets.GCP_GSM_CREDENTIALS }}
sentry_dsn: ${{ secrets.SENTRY_AIRBYTE_CI_DSN }}
github_token: ${{ secrets.GH_PAT_MAINTENANCE_OCTAVIA }}
subcommand: "test airbyte-lib"
subcommand: "test airbyte-lib --pass-env-var=GCP_GSM_CREDENTIALS --poetry-run-command='pytest'"
tailscale_auth_key: ${{ secrets.TAILSCALE_AUTH_KEY }}
3 changes: 3 additions & 0 deletions airbyte-lib/airbyte_lib/caches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from airbyte_lib.caches.base import SQLCacheBase
from airbyte_lib.caches.duckdb import DuckDBCache, DuckDBCacheConfig
from airbyte_lib.caches.postgres import PostgresCache, PostgresCacheConfig
from airbyte_lib.caches.snowflake import SnowflakeCacheConfig, SnowflakeSQLCache


# We export these classes for easy access: `airbyte_lib.caches...`
Expand All @@ -12,4 +13,6 @@
"PostgresCache",
"PostgresCacheConfig",
"SQLCacheBase",
"SnowflakeCacheConfig",
"SnowflakeSQLCache",
]
7 changes: 5 additions & 2 deletions airbyte-lib/airbyte_lib/caches/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import sqlalchemy
import ulid
from overrides import overrides
from sqlalchemy import CursorResult, Executable, TextClause, create_engine, text
from sqlalchemy import create_engine, text
from sqlalchemy.pool import StaticPool
from sqlalchemy.sql.elements import TextClause

from airbyte_lib._file_writers.base import FileWriterBase, FileWriterBatchHandle
from airbyte_lib._processors import BatchHandle, RecordProcessor
Expand All @@ -28,7 +29,9 @@
from pathlib import Path

from sqlalchemy.engine import Connection, Engine
from sqlalchemy.engine.cursor import CursorResult
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql.base import Executable

from airbyte_protocol.models import ConfiguredAirbyteStream

Expand Down Expand Up @@ -593,7 +596,7 @@ def _write_files_to_new_table(
schema=self.config.schema_name,
if_exists="append",
index=False,
dtype=self._get_sql_column_definitions(stream_name), # type: ignore[arg-type]
dtype=self._get_sql_column_definitions(stream_name),
)
return temp_table_name

Expand Down
4 changes: 3 additions & 1 deletion airbyte-lib/airbyte_lib/caches/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class PostgresCacheConfig(SQLCacheConfigBase, ParquetWriterConfig):
@overrides
def get_sql_alchemy_url(self) -> str:
"""Return the SQLAlchemy URL to use."""
return f"postgresql+psycopg://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
return (
f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
)

def get_database_name(self) -> str:
"""Return the name of the database."""
Expand Down
19 changes: 15 additions & 4 deletions airbyte-lib/airbyte_lib/caches/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from typing import TYPE_CHECKING

from overrides import overrides
from snowflake.sqlalchemy import URL

from airbyte_lib._file_writers import ParquetWriter, ParquetWriterConfig
from airbyte_lib.caches.base import SQLCacheBase, SQLCacheConfigBase
from airbyte_lib.caches.base import RecordDedupeMode, SQLCacheBase, SQLCacheConfigBase
from airbyte_lib.telemetry import CacheTelemetryInfo


Expand All @@ -31,16 +32,25 @@ class SnowflakeCacheConfig(SQLCacheConfigBase, ParquetWriterConfig):
password: str
warehouse: str
database: str
role: str

dedupe_mode = RecordDedupeMode.APPEND

# Already defined in base class:
# schema_name: str

@overrides
def get_sql_alchemy_url(self) -> str:
"""Return the SQLAlchemy URL to use."""
return (
f"snowflake://{self.username}:{self.password}@{self.account}/"
f"?warehouse={self.warehouse}&database={self.database}&schema={self.schema_name}"
return str(
URL(
account=self.account,
user=self.username,
password=self.password,
database=self.database,
warehouse=self.warehouse,
role=self.role,
)
)

def get_database_name(self) -> str:
Expand All @@ -67,6 +77,7 @@ def _write_files_to_new_table(
"""Write a file(s) to a new table.
TODO: Override the base implementation to use the COPY command.
TODO: Make sure this works for all data types.
"""
return super()._write_files_to_new_table(files, stream_name, batch_id)

Expand Down
2 changes: 1 addition & 1 deletion airbyte-lib/airbyte_lib/results.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from sqlalchemy import Engine
from sqlalchemy.engine import Engine

from airbyte_lib.caches import SQLCacheBase
from airbyte_lib.datasets import CachedDataset
Expand Down
246 changes: 246 additions & 0 deletions airbyte-lib/docs/generated/airbyte_lib/caches.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d88104e

Please sign in to comment.