Skip to content

Commit

Permalink
[AIRFLOW-6982] add native python exasol support (#7621)
Browse files Browse the repository at this point in the history
* [AIRFLOW-6982] add native python exasol support

This adds exasol DB support, including a hook, connection type &
operator. The [pyexasol](https://github.com/badoo/pyexasol) library is
used to interact with the database.

* Add exasol to EXTRAS documentation

* Add exasol requirements to requirements files

* Add exasol to backport packages setup

Co-authored-by: Jan Omar <jan.omar@wooga.net>
  • Loading branch information
inytar and jomar83 committed Apr 2, 2020
1 parent be1451b commit 69dc91b
Show file tree
Hide file tree
Showing 20 changed files with 600 additions and 18 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ This is the full list of those extras:
.. START EXTRAS HERE
all, all_dbs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, dask, databricks,
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, gcp, gcp_api,
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, exasol, gcp, gcp_api,
github_enterprise, google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap,
mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot, postgres, presto, qds,
rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity, slack, snowflake, ssh,
Expand Down
2 changes: 1 addition & 1 deletion INSTALL
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pip install . --constraint requirements/requirements-python3.7.txt
# START EXTRAS HERE

all, all_dbs, async, atlas, aws, azure, cassandra, celery, cgroups, cloudant, dask, databricks,
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, gcp, gcp_api,
datadog, devel, devel_ci, devel_hadoop, doc, docker, druid, elasticsearch, exasol, gcp, gcp_api,
github_enterprise, google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap,
mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot, postgres, presto, qds,
rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity, slack, snowflake, ssh,
Expand Down
2 changes: 2 additions & 0 deletions airflow/models/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"airflow.providers.elasticsearch.hooks.elasticsearch.ElasticsearchHook",
"elasticsearch_conn_id"
),
"exasol": ("airflow.providers.exasol.hooks.exasol.ExasolHook", "exasol_conn_id"),
"gcpcloudsql": (
"airflow.providers.google.cloud.hooks.cloud_sql.CloudSQLDatabaseHook",
"gcp_cloudsql_conn_id",
Expand Down Expand Up @@ -117,6 +118,7 @@ class Connection(Base, LoggingMixin):
_types = [
('docker', 'Docker Registry'),
('elasticsearch', 'Elasticsearch'),
('exasol', 'Exasol'),
('fs', 'File (path)'),
('ftp', 'FTP'),
('google_cloud_platform', 'Google Cloud Platform'),
Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/exasol/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
17 changes: 17 additions & 0 deletions airflow/providers/exasol/hooks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
179 changes: 179 additions & 0 deletions airflow/providers/exasol/hooks/exasol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from contextlib import closing

import pyexasol
from past.builtins import basestring

from airflow.hooks.dbapi_hook import DbApiHook


class ExasolHook(DbApiHook):
"""
Interact with Exasol.
You can specify the pyexasol ``compression``, ``encryption``, ``json_lib``
and ``client_name`` parameters in the extra field of your connection
as ``{"compression": True, "json_lib": "rapidjson", etc}``.
See `pyexasol reference
<https://github.com/badoo/pyexasol/blob/master/docs/REFERENCE.md#connect>`_
for more details.
"""
conn_name_attr = 'exasol_conn_id'
default_conn_name = 'exasol_default'
supports_autocommit = True

def __init__(self, *args, **kwargs):
super(ExasolHook, self).__init__(*args, **kwargs)
self.schema = kwargs.pop("schema", None)

def get_conn(self):
conn_id = getattr(self, self.conn_name_attr)
conn = self.get_connection(conn_id)
conn_args = dict(
dsn='%s:%s' % (conn.host, conn.port),
user=conn.login,
password=conn.password,
schema=self.schema or conn.schema)
# check for parameters in conn.extra
for arg_name, arg_val in conn.extra_dejson.items():
if arg_name in ['compression', 'encryption', 'json_lib', 'client_name']:
conn_args[arg_name] = arg_val

conn = pyexasol.connect(**conn_args)
return conn

def get_pandas_df(self, sql, parameters=None):
"""
Executes the sql and returns a pandas dataframe
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
with closing(self.get_conn()) as conn:
conn.export_to_pandas(sql, query_params=parameters)

def get_records(self, sql, parameters=None):
"""
Executes the sql and returns a set of records.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
with closing(self.get_conn()) as conn:
with closing(conn.execute(sql, parameters)) as cur:
return cur.fetchall()

def get_first(self, sql, parameters=None):
"""
Executes the sql and returns the first resulting row.
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
with closing(self.get_conn()) as conn:
with closing(conn.execute(sql, parameters)) as cur:
return cur.fetchone()

def run(self, sql, autocommit=False, parameters=None):
"""
Runs a command or a list of commands. Pass a list of sql
statements to the sql parameter to get them to execute
sequentially
:param sql: the sql statement to be executed (str) or a list of
sql statements to execute
:type sql: str or list
:param autocommit: What to set the connection's autocommit setting to
before executing the query.
:type autocommit: bool
:param parameters: The parameters to render the SQL query with.
:type parameters: mapping or iterable
"""
if isinstance(sql, basestring):
sql = [sql]

with closing(self.get_conn()) as conn:
if self.supports_autocommit:
self.set_autocommit(conn, autocommit)

for query in sql:
self.log.info(query)
with closing(conn.execute(query, parameters)) as cur:
self.log.info(cur.row_count)
# If autocommit was set to False for db that supports autocommit,
# or if db does not supports autocommit, we do a manual commit.
if not self.get_autocommit(conn):
conn.commit()

def set_autocommit(self, conn, autocommit):
"""
Sets the autocommit flag on the connection
:param conn: Connection to set autocommit setting to.
:type conn: connection object
:param autocommit: The autocommit setting to set.
:type autocommit: bool
"""
if not self.supports_autocommit and autocommit:
self.log.warning(
("%s connection doesn't support "
"autocommit but autocommit activated."),
getattr(self, self.conn_name_attr))
conn.set_autocommit(autocommit)

def get_autocommit(self, conn):
"""
Get autocommit setting for the provided connection.
Return True if autocommit is set.
Return False if autocommit is not set or set to False or conn
does not support autocommit.
:param conn: Connection to get autocommit setting from.
:type conn: connection object
:return: connection autocommit setting.
:rtype: bool
"""
autocommit = conn.attr.get('autocommit')
if autocommit is None:
autocommit = super(ExasolHook, self).get_autocommit(conn)
return autocommit

@staticmethod
def _serialize_cell(cell, conn=None):
"""
Exasol will adapt all arguments to the execute() method internally,
hence we return cell without any conversion.
:param cell: The cell to insert into the table
:type cell: object
:param conn: The database connection
:type conn: connection object
:return: The cell
:rtype: object
"""
return cell
17 changes: 17 additions & 0 deletions airflow/providers/exasol/operators/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
71 changes: 71 additions & 0 deletions airflow/providers/exasol/operators/exasol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import Mapping, Optional

from airflow.models import BaseOperator
from airflow.providers.exasol.hooks.exasol import ExasolHook
from airflow.utils.decorators import apply_defaults


class ExasolOperator(BaseOperator):
"""
Executes sql code in a specific Exasol database
:param sql: the sql code to be executed. (templated)
:type sql: Can receive a str representing a sql statement,
a list of str (sql statements), or reference to a template file.
Template reference are recognized by str ending in '.sql'
:param exasol_conn_id: reference to a specific Exasol database
:type exasol_conn_id: string
:param autocommit: if True, each command is automatically committed.
(default value: False)
:type autocommit: bool
:param parameters: (optional) the parameters to render the SQL query with.
:type parameters: mapping
:param schema: (optional) name of the schema which overwrite defined one in connection
:type schema: string
"""

template_fields = ('sql',)
template_ext = ('.sql',)
ui_color = '#ededed'

@apply_defaults
def __init__(
self,
sql: str,
exasol_conn_id: str = 'exasol_default',
autocommit: bool = False,
parameters: Optional[Mapping] = None,
schema: Optional[str] = None,
*args, **kwargs):
super(ExasolOperator, self).__init__(*args, **kwargs)
self.exasol_conn_id = exasol_conn_id
self.sql = sql
self.autocommit = autocommit
self.parameters = parameters
self.schema = schema

def execute(self, context):
self.log.info('Executing: %s', self.sql)
hook = ExasolHook(exasol_conn_id=self.exasol_conn_id,
schema=self.schema)
hook.run(
self.sql,
autocommit=self.autocommit,
parameters=self.parameters)
1 change: 1 addition & 0 deletions backport_packages/setup_backport_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def run(self):
"docker": setup.docker,
"email": [],
"elasticsearch": [],
"exasol": setup.exasol,
"ftp": [],
"google": setup.gcp,
"grpc": setup.grpc,
Expand Down
4 changes: 4 additions & 0 deletions docs/autoapi_templates/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ All operators are in the following packages:

airflow/providers/email/operators/index

airflow/providers/exasol/operators/index

airflow/providers/ftp/sensors/index

airflow/providers/google/ads/operators/index
Expand Down Expand Up @@ -247,6 +249,8 @@ All hooks are in the following packages:

airflow/providers/elasticsearch/hooks/index

airflow/providers/exasol/hooks/index

airflow/providers/ftp/hooks/index

airflow/providers/google/ads/hooks/index
Expand Down
2 changes: 2 additions & 0 deletions docs/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ Here's the list of the subpackages and what they enable:
+---------------------+-----------------------------------------------------+-----------------------------------------------------------------------------------+
| elasticsearch | ``pip install 'apache-airflow[elasticsearch]'`` | Elasticsearch hooks and Log Handler |
+---------------------+-----------------------------------------------------+-----------------------------------------------------------------------------------+
| exasol | ``pip install 'apache-airflow[exasol]'`` | Exasol hooks and operators |
+---------------------+-----------------------------------------------------+-----------------------------------------------------------------------------------+
| kubernetes | ``pip install 'apache-airflow[kubernetes]'`` | Kubernetes Executor and operator |
+---------------------+-----------------------------------------------------+-----------------------------------------------------------------------------------+
| mongo | ``pip install 'apache-airflow[mongo]'`` | Mongo hooks and operators |
Expand Down
6 changes: 6 additions & 0 deletions docs/operators-and-hooks-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,12 @@ These integrations allow you to perform various operations using various softwar
-
-

* - `Exasol <https://docs.exasol.com/home.htm>`__
-
- :mod:`airflow.providers.exasol.hooks.exasol`
- :mod:`airflow.providers.exasol.operators.exasol`
-

* - `GNU Bash <https://www.gnu.org/software/bash/>`__
- :doc:`How to use <howto/operator/bash>`
-
Expand Down
Loading

0 comments on commit 69dc91b

Please sign in to comment.