From c8dc5ce163e61d92b947d89611824f66b8385682 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 9 Apr 2024 20:56:42 +0530 Subject: [PATCH 1/9] Add sqlalchemy_url property to DbApiHook Signed-off-by: kalyanr --- airflow/providers/common/sql/hooks/sql.py | 33 ++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 7f1536a39b47e..52fc1f298aa48 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -39,6 +39,7 @@ import sqlparse from more_itertools import chunked from sqlalchemy import create_engine +from sqlalchemy.engine import URL from airflow.exceptions import ( AirflowException, @@ -192,6 +193,17 @@ def placeholder(self): ) return self._placeholder + @property + def drivername(self) -> str: + """ + Return the drivername for the connection. + + Should be implemented in the derived class to return database backend and drivername. + + :return: the drivername for the connection. + """ + raise NotImplementedError("drivername property should be implemented in the provider subclass.") + def get_conn(self): """Return a connection object.""" db = self.get_connection(getattr(self, cast(str, self.conn_name_attr))) @@ -207,6 +219,24 @@ def get_uri(self) -> str: conn.schema = self.__schema or conn.schema return conn.get_uri() + def get_sqlalchemy_url(self) -> URL: + """ + Extract the SQLAlchemy URI from the connection. + + :return: the extracted uri. + """ + conn = self.get_connection(getattr(self, self.conn_name_attr)) + conn.schema = self.__schema or conn.schema + conn.port = conn.port or 5432 + return URL.create( + drivername=self.drivername, + username=conn.login, + password=conn.password, + host=conn.host, + port=conn.port, + database=conn.schema, + ) + def get_sqlalchemy_engine(self, engine_kwargs=None): """ Get an sqlalchemy_engine object. @@ -216,7 +246,8 @@ def get_sqlalchemy_engine(self, engine_kwargs=None): """ if engine_kwargs is None: engine_kwargs = {} - return create_engine(self.get_uri(), **engine_kwargs) + sa_uri = self.get_sqlalchemy_url().render_as_string(hide_password=False) + return create_engine(sa_uri, **engine_kwargs) def get_pandas_df( self, From d7a8a0788342df7775fe15a6c6a5f53ec36f4b25 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 9 Apr 2024 21:16:09 +0530 Subject: [PATCH 2/9] update get_sqlalchemy_url docstring --- airflow/providers/common/sql/hooks/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 52fc1f298aa48..6564b41b8a547 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -221,9 +221,9 @@ def get_uri(self) -> str: def get_sqlalchemy_url(self) -> URL: """ - Extract the SQLAlchemy URI from the connection. + Return a Sqlalchemy.engine.URL object from the connection. - :return: the extracted uri. + :return: the extracted sqlalchemy.engine.URL object. """ conn = self.get_connection(getattr(self, self.conn_name_attr)) conn.schema = self.__schema or conn.schema From f5208675a677b74bd2449c3d3e7597d3d43273d8 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 9 Apr 2024 21:24:06 +0530 Subject: [PATCH 3/9] update drivername property docstring --- airflow/providers/common/sql/hooks/sql.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 6564b41b8a547..3b976e4a484bb 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -196,9 +196,9 @@ def placeholder(self): @property def drivername(self) -> str: """ - Return the drivername for the connection. + Return the database drivername. - Should be implemented in the derived class to return database backend and drivername. + Should be implemented in the derived class to return database drivername. :return: the drivername for the connection. """ From f93e0f10e670289093804ee86e0dd7b963e9a07c Mon Sep 17 00:00:00 2001 From: kalyanr Date: Tue, 9 Apr 2024 21:40:58 +0530 Subject: [PATCH 4/9] fix spelling --- airflow/providers/common/sql/hooks/sql.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 3b976e4a484bb..45acb6243b144 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -196,11 +196,11 @@ def placeholder(self): @property def drivername(self) -> str: """ - Return the database drivername. + Return the database driver name. - Should be implemented in the derived class to return database drivername. + Should be implemented in the derived class to return database driver name. - :return: the drivername for the connection. + :return: the driver name for the connection. """ raise NotImplementedError("drivername property should be implemented in the provider subclass.") From c9358d0ebcf5cfe7412fb3afd0990e05ebf9cec9 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 10 Apr 2024 13:52:58 +0530 Subject: [PATCH 5/9] add sqlalchemy_url property to DbApiHook --- airflow/providers/common/sql/hooks/sql.py | 33 +++++------------------ 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 45acb6243b144..3fe917fdd60e7 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -39,7 +39,6 @@ import sqlparse from more_itertools import chunked from sqlalchemy import create_engine -from sqlalchemy.engine import URL from airflow.exceptions import ( AirflowException, @@ -50,6 +49,7 @@ if TYPE_CHECKING: from pandas import DataFrame + from sqlalchemy.engine import URL from airflow.providers.openlineage.extractors import OperatorLineage from airflow.providers.openlineage.sqlparser import DatabaseInfo @@ -193,17 +193,6 @@ def placeholder(self): ) return self._placeholder - @property - def drivername(self) -> str: - """ - Return the database driver name. - - Should be implemented in the derived class to return database driver name. - - :return: the driver name for the connection. - """ - raise NotImplementedError("drivername property should be implemented in the provider subclass.") - def get_conn(self): """Return a connection object.""" db = self.get_connection(getattr(self, cast(str, self.conn_name_attr))) @@ -219,23 +208,16 @@ def get_uri(self) -> str: conn.schema = self.__schema or conn.schema return conn.get_uri() - def get_sqlalchemy_url(self) -> URL: + @property + def sqlalchemy_url(self) -> URL: """ Return a Sqlalchemy.engine.URL object from the connection. + Needs to be implemented in the provider subclass to return the sqlalchemy.engine.URL object. + :return: the extracted sqlalchemy.engine.URL object. """ - conn = self.get_connection(getattr(self, self.conn_name_attr)) - conn.schema = self.__schema or conn.schema - conn.port = conn.port or 5432 - return URL.create( - drivername=self.drivername, - username=conn.login, - password=conn.password, - host=conn.host, - port=conn.port, - database=conn.schema, - ) + raise NotImplementedError("drivername property should be implemented in the provider subclass.") def get_sqlalchemy_engine(self, engine_kwargs=None): """ @@ -246,8 +228,7 @@ def get_sqlalchemy_engine(self, engine_kwargs=None): """ if engine_kwargs is None: engine_kwargs = {} - sa_uri = self.get_sqlalchemy_url().render_as_string(hide_password=False) - return create_engine(sa_uri, **engine_kwargs) + return create_engine(self.get_uri(), **engine_kwargs) def get_pandas_df( self, From 8e04a5ca577512b72b6f276b937419c87a5dc262 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Wed, 10 Apr 2024 13:55:04 +0530 Subject: [PATCH 6/9] fix typo --- airflow/providers/common/sql/hooks/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 3fe917fdd60e7..96b38be45079c 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -217,7 +217,7 @@ def sqlalchemy_url(self) -> URL: :return: the extracted sqlalchemy.engine.URL object. """ - raise NotImplementedError("drivername property should be implemented in the provider subclass.") + raise NotImplementedError("sqlalchemy_url property should be implemented in the provider subclass.") def get_sqlalchemy_engine(self, engine_kwargs=None): """ From 6df807c898f9bf4628547445f755941e1323a550 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Thu, 11 Apr 2024 21:48:44 +0530 Subject: [PATCH 7/9] precommit failure fix Signed-off-by: kalyanr --- airflow/providers/common/sql/hooks/sql.pyi | 3 +++ 1 file changed, 3 insertions(+) diff --git a/airflow/providers/common/sql/hooks/sql.pyi b/airflow/providers/common/sql/hooks/sql.pyi index 83135a235bf97..0af6d7f59d53c 100644 --- a/airflow/providers/common/sql/hooks/sql.pyi +++ b/airflow/providers/common/sql/hooks/sql.pyi @@ -41,6 +41,7 @@ from airflow.hooks.base import BaseHook as BaseHook from airflow.providers.openlineage.extractors import OperatorLineage as OperatorLineage from airflow.providers.openlineage.sqlparser import DatabaseInfo as DatabaseInfo from pandas import DataFrame as DataFrame +from sqlalchemy.engine import URL as URL from typing import Any, Callable, Generator, Iterable, Mapping, Protocol, Sequence, TypeVar, overload T = TypeVar("T") @@ -65,6 +66,8 @@ class DbApiHook(BaseHook): def placeholder(self): ... def get_conn(self): ... def get_uri(self) -> str: ... + @property + def sqlalchemy_url(self) -> URL: ... def get_sqlalchemy_engine(self, engine_kwargs: Incomplete | None = None): ... def get_pandas_df( self, sql, parameters: list | tuple | Mapping[str, Any] | None = None, **kwargs From 4bfae8cde415b5170e776eeffd53e9f788ccbf90 Mon Sep 17 00:00:00 2001 From: Kalyan Date: Fri, 12 Apr 2024 17:25:36 +0530 Subject: [PATCH 8/9] Update sql.py Co-authored-by: Andrey Anshin --- airflow/providers/common/sql/hooks/sql.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 96b38be45079c..7add9574c7ac4 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -217,7 +217,12 @@ def sqlalchemy_url(self) -> URL: :return: the extracted sqlalchemy.engine.URL object. """ - raise NotImplementedError("sqlalchemy_url property should be implemented in the provider subclass.") + qualname = f"{self.__class__.__module__}.{self.__class__.__qualname__}" + if qualname != "airflow.providers.common.sql.hooks.sql.DbApiHook": + msg = f"{qualname!r} not implements/supports built SQLAlchemy URI." + else: + msg = "`sqlalchemy_url` property should be implemented in the provider subclass." + raise NotImplementedError(msg) def get_sqlalchemy_engine(self, engine_kwargs=None): """ From 62591f5589f3aa57fec95d2a106d9e51be492a52 Mon Sep 17 00:00:00 2001 From: kalyanr Date: Fri, 12 Apr 2024 18:09:15 +0530 Subject: [PATCH 9/9] fix error message --- airflow/providers/common/sql/hooks/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/providers/common/sql/hooks/sql.py b/airflow/providers/common/sql/hooks/sql.py index 299ea2ffb786c..c6197f91454fd 100644 --- a/airflow/providers/common/sql/hooks/sql.py +++ b/airflow/providers/common/sql/hooks/sql.py @@ -221,7 +221,7 @@ def sqlalchemy_url(self) -> URL: """ qualname = f"{self.__class__.__module__}.{self.__class__.__qualname__}" if qualname != "airflow.providers.common.sql.hooks.sql.DbApiHook": - msg = f"{qualname!r} not implements/supports built SQLAlchemy URI." + msg = f"{qualname!r} does not implement/support built SQLAlchemy URL." else: msg = "`sqlalchemy_url` property should be implemented in the provider subclass." raise NotImplementedError(msg)