-
Notifications
You must be signed in to change notification settings - Fork 39
/
temp_hooks.py
58 lines (47 loc) · 1.88 KB
/
temp_hooks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""
Copyright Astronomer, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from urllib.parse import quote_plus
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.providers.snowflake.hooks.snowflake import SnowflakeHook
class TempSnowflakeHook(SnowflakeHook):
"""
Temporary class to get around a bug in the snowflakehook when creating URIs
"""
def get_uri(self) -> str:
"""Override DbApiHook get_uri method for get_sqlalchemy_engine()"""
conn_config = self._get_conn_params()
uri = (
"snowflake://{user}:{password}@{account}/{database}/{schema}"
"?warehouse={warehouse}&role={role}&authenticator={authenticator}"
)
return uri.format(**conn_config)
class TempPostgresHook(PostgresHook):
"""
Temporary class to get around a bug in the snowflakehook when creating URIs
"""
def get_uri(self) -> str:
"""
Extract the URI from the connection.
:return: the extracted uri.
"""
conn = self.get_connection(getattr(self, self.conn_name_attr))
login = ""
if conn.login:
login = f"{quote_plus(conn.login)}:{quote_plus(conn.password)}@"
host = conn.host
if conn.port is not None:
host += f":{conn.port}"
uri = f"postgresql://{login}{host}/"
if self.schema:
uri += self.schema
return uri