Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Build correct SQLAlchemy URI in hooks based on DbApiHook #38195

Open
1 of 17 tasks
Taragolis opened this issue Mar 15, 2024 · 9 comments
Open
1 of 17 tasks

Build correct SQLAlchemy URI in hooks based on DbApiHook #38195

Taragolis opened this issue Mar 15, 2024 · 9 comments

Comments

@Taragolis
Copy link
Contributor

Taragolis commented Mar 15, 2024

Body

Original discussion: https://lists.apache.org/thread/8rhmz3qh30hvkondct4sfmgk4vd07mn5

tl;dr; DbApiHook assumes that Airflow Connection URI representation is a valid SA URI, but it is not true and never was, it might work in simple cases, however it will fail in some complex one.

def get_uri(self) -> str:
"""
Extract the URI from the connection.
:return: the extracted uri.
"""
conn = self.get_connection(getattr(self, self.conn_name_attr))
conn.schema = self.__schema or conn.schema
return conn.get_uri()

This task intend to track fixes into the hooks which based on DbApiHook.

Hooks which don't overwrite invalid DbApiHook.get_uri implementation

  • MySqlHook from mysql provider. This hook provide support for two different drivers.
  • OracleHook from oracle provider
  • JdbcHook from jdbc provider
  • TrinoHook from trino provider
  • HiveServer2Hook from apache.hive provider
  • ImpalaHook from apache.impala provider
  • DatabricksSqlHook from databricks provider
  • ExasolHook from exasol provider
  • PrestoHook from presto provider
  • VerticaHook from vertica provider
  • TeradataHook from teradata provider

Hooks which uses Connection.get_uri for construct SQLAlchemy URI

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@Taragolis Taragolis added good first issue kind:meta High-level information important to the community labels Mar 15, 2024
@Taragolis Taragolis changed the title Fix build SQLAlchemy URI in Fix build SQLAlchemy URI in hooks based on DbApiHook Mar 15, 2024
@Taragolis Taragolis changed the title Fix build SQLAlchemy URI in hooks based on DbApiHook Build correct SQLAlchemy URI in hooks based on DbApiHook Mar 16, 2024
@Bowrna
Copy link
Contributor

Bowrna commented Apr 5, 2024

hello @Taragolis I was checking this issue and seeing if I could fix it.

def get_uri(self) -> str:
"""Return connection in URI format."""
if self.conn_type and "_" in self.conn_type:
self.log.warning(
"Connection schemes (type: %s) shall not contain '_' according to RFC3986.",
self.conn_type,
)
if self.conn_type:
uri = f"{self.conn_type.lower().replace('_', '-')}://"
else:
uri = "//"
if self.host and "://" in self.host:
protocol, host = self.host.split("://", 1)
else:
protocol, host = None, self.host
if protocol:
uri += f"{protocol}://"
authority_block = ""
if self.login is not None:
authority_block += quote(self.login, safe="")
if self.password is not None:
authority_block += ":" + quote(self.password, safe="")
if authority_block > "":
authority_block += "@"
uri += authority_block
host_block = ""
if host:
host_block += quote(host, safe="")
if self.port:
if host_block == "" and authority_block == "":
host_block += f"@:{self.port}"
else:
host_block += f":{self.port}"
if self.schema:
host_block += f"/{quote(self.schema, safe='')}"
uri += host_block
if self.extra:
try:
query: str | None = urlencode(self.extra_dejson)
except TypeError:
query = None
if query and self.extra_dejson == dict(parse_qsl(query, keep_blank_values=True)):
uri += ("?" if self.schema else "/?") + query
else:
uri += ("?" if self.schema else "/?") + urlencode({self.EXTRA_KEY: self.extra})
return uri

Is fixing the get_uri part so that Airflow Connection URI representation becomes a valid SA URI the right way to fix this issue?

@Taragolis
Copy link
Contributor Author

As far as i remember, we recently discuss this case in slack. I guess better introduce sa_uri property (or some similar naming) which returns sqlalchemy.engine.URL into the DbApiHook without any implementation, e.g. raise NotImplementedError and implements it per hook which have implementation for SQAlchemy.

get_uri - not clear which URI it is implements, DBApi? SQLAlchemy or even some internal (ElasticsearchSQLHook)

cc: @potiuk @dstandish

@rawwar
Copy link
Contributor

rawwar commented Apr 9, 2024

@Taragolis , I am working on this PR - #38871 . This is a separate PR to just update the DbApiHook to include a method to return a SqlAlchemy URL that can be reused.

@dabla
Copy link
Contributor

dabla commented Apr 15, 2024

As far as i remember, we recently discuss this case in slack. I guess better introduce sa_uri property (or some similar naming) which returns sqlalchemy.engine.URL into the DbApiHook without any implementation, e.g. raise NotImplementedError and implements it per hook which have implementation for SQAlchemy.

get_uri - not clear which URI it is implements, DBApi? SQLAlchemy or even some internal (ElasticsearchSQLHook)

cc: @potiuk @dstandish

@Taragolis Why not implement another method named get_url and use that one to instantiate the SQLALchemy engine, as the engine can accept an SQLAlchemy URL object. Also the SQLAlchemy URL class has a factory method named create, which easily allows you to create the URL without the hastle of string manipulations, this is how I did it for my custom SQL operator (the one I mentioned on Slack):

    def get_url(self) -> URL:
        return URL.create(
            self.scheme,
            username=self.connection.login,
            password=self.connection.password,
            host=self.connection.host,
            port=self.connection.port,
        )

Also see here the SQL documentation concerning creating URL's programmatically: https://docs.sqlalchemy.org/en/20/core/engines.html#creating-urls-programmatically

I see @rawwar was first mentioning this, I think this would be a good idea and would simplify code and be less bug prone.

@Taragolis
Copy link
Contributor Author

There is new property which intends to returns SQL Alchemy URL, current behaviour of get_url is ambiguous and non-all hook return SA URI here, for some of the DB there is no even SA dialects exists, so better keep get_url as is and step-by-step migrate to appropriate ones. So if dialect exists and it installed with provider I do not have any objections to add it implementation as sqlalchemy_url

@dabla
Copy link
Contributor

dabla commented Apr 16, 2024

There is new property which intends to returns SQL Alchemy URL, current behaviour of get_url is ambiguous and non-all hook return SA URI here, for some of the DB there is no even SA dialects exists, so better keep get_url as is and step-by-step migrate to appropriate ones. So if dialect exists and it installed with provider I do not have any objections to add it implementation as sqlalchemy_url

Hey Andrey think there is some confusion here. Yes we should definitly leave the get_uri method as it is (not get_url as it doesn’t exist yet) and implement a new get_url method which would then be called by the existing get_sqlalchemy_engine.

@dabla
Copy link
Contributor

dabla commented Apr 16, 2024

This is how I done it in my custom SQLRowsInsertOperator:

    @cached_property
    def connection(self) -> Connection:
        return BaseHook.get_connection(self.conn_id)  # type: ignore

    @cached_property
    def scheme(self) -> str:
        return self.connection.extra_dejson["sqlalchemy_scheme"]

    @cached_property
    def table_name(self) -> str:
        return self.get_columns().table_name_with_schema()

    @cached_property
    def driver(self) -> str:
        return self.connection.extra_dejson["driver"]

    # TODO: Maybe this should be moved to DbAPiHook
    def get_url(self) -> URL:
        self.log.info("Connection schema: %s", self.schema)
        self.log.info("Connection scheme: %s", self.scheme)
        self.log.info("Connection driver: %s", self.driver)
        self.log.info("Connection type: %s", self.connection.conn_type)
        self.log.info("Connection login: %s", self.connection.login)
        self.log.info("Connection password: %s", self.connection.password)
        self.log.info("Connection host: %s", self.connection.host)
        self.log.info("Connection port: %s", self.connection.port)

        return URL.create(
            self.scheme,
            username=self.connection.login,
            password=self.connection.password,
            host=self.connection.host,
            port=self.connection.port,
        )

   # TODO: Maybe this method in DbAPiHook should be changed to this instead of calling get_uri() method
    def get_sqlalchemy_engine(self, engine_kwargs=None):
        if engine_kwargs is None:
            engine_kwargs = {}
        return create_engine(self.get_url(), **engine_kwargs)

    # TODO: Maybe this should be moved to DbAPiHook
    @cached_property
    def inspector(self):
        engine = self.get_sqlalchemy_engine()
        self.log.debug("Engine drivername: %s", engine.url.drivername)
        self.log.debug("Engine username: %s", engine.url.username)
        self.log.debug("Engine password: %s", engine.url.password)
        self.log.debug("Engine host: %s", engine.url.host)
        self.log.debug("Engine port: %s", engine.url.port)
        self.log.debug("Engine database: %s", engine.url.database)
        self.log.debug("Engine dialect: %s", engine.dialect.name)

        return Inspector.from_engine(engine)

@potiuk
Copy link
Member

potiuk commented Apr 18, 2024

Hey Andrey think there is some confusion here. Yes we should definitly leave the get_uri method as it is (not get_url as it doesn’t exist yet) and implement a new get_url method which would then be called by the existing get_sqlalchemy_engine.

Yes, adding new method is the way to go. How to do it, should be proposed and discussed in the PR, we have 20+ sql providers, and I think the proposal should be complete and explain how to deal with cases that can/cannot be mapped. Also the method should explicily - IMHO - call our sqlalachemy, (getSqlAlchemyURI ?) and likely allow variations - from what I understand SQLAlchemy can have different URLs not only for different databases, but also for different drivers. Also some of our Connections allows to add different kinds of extras and parameters - not only hosts but other parameters that might be used and passed differently to different drivers.

If we decide to introduce a new "standard" method, in a product like Airflow and common API like DBApiHook, the proposal should provide answers to all the questions that users might ask in order to be mergable. It does not have to handle all of those but at least get enough error handlling and interface specification to explain what is the transition from the Connection form (in all those cases we have DBAPI implementation) -> SQLAlchemy URL. Which variations are handled, how it should work, how to map engine args etc.

I don't think it can be handled by a single implementation, it's more a scaffloiding + implementation for all the implementations and it should be designed nicely with al least a handful (if not all) implementation from the above list to be quite sure that we will be able to handle those cases.

@rawwar
Copy link
Contributor

rawwar commented May 6, 2024

Hey Andrey think there is some confusion here. Yes we should definitly leave the get_uri method as it is (not get_url as it doesn’t exist yet) and implement a new get_url method which would then be called by the existing get_sqlalchemy_engine.

@dabla , I think, we are indeed adding get_url functionality using sqlalchemy_url property.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment