Skip to content

Commit

Permalink
Add kylin operator (#9149)
Browse files Browse the repository at this point in the history
Co-authored-by: yongheng.liu <yongheng.liu@kyligence.io>
  • Loading branch information
liuyonghengheng and yongheng.liu committed Jul 14, 2020
1 parent ed5004c commit a2c5389
Show file tree
Hide file tree
Showing 24 changed files with 780 additions and 24 deletions.
16 changes: 8 additions & 8 deletions CONTRIBUTING.rst
Expand Up @@ -314,14 +314,14 @@ This is the full list of those extras:
.. START EXTRAS HERE
all_dbs, amazon, apache.atlas, apache_beam, apache.cassandra, apache.druid, apache.hdfs,
apache.hive, apache.pinot, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups,
cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop, doc, docker, druid,
elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql,
microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot, postgres,
presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity, slack,
snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm, yandexcloud, all,
devel_ci
apache.hive, apache.kylin, apache.pinot, apache.webhdfs, async, atlas, aws, azure, cassandra,
celery, cgroups, cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop, doc,
docker, druid, elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap, microsoft.azure,
microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password,
pinot, postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry,
singularity, slack, snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm,
yandexcloud, all, devel_ci

.. END EXTRAS HERE
Expand Down
16 changes: 8 additions & 8 deletions INSTALL
Expand Up @@ -45,14 +45,14 @@ pip install . --constraint requirements/requirements-python3.7.txt
# START EXTRAS HERE

all_dbs, amazon, apache.atlas, apache_beam, apache.cassandra, apache.druid, apache.hdfs,
apache.hive, apache.pinot, apache.webhdfs, async, atlas, aws, azure, cassandra, celery, cgroups,
cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop, doc, docker, druid,
elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google, google_auth, grpc,
hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap, microsoft.azure, microsoft.mssql,
microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password, pinot, postgres,
presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry, singularity, slack,
snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm, yandexcloud, all,
devel_ci
apache.hive, apache.kylin, apache.pinot, apache.webhdfs, async, atlas, aws, azure, cassandra,
celery, cgroups, cloudant, cncf.kubernetes, dask, databricks, datadog, devel, devel_hadoop, doc,
docker, druid, elasticsearch, exasol, facebook, gcp, gcp_api, github_enterprise, google,
google_auth, grpc, hashicorp, hdfs, hive, jdbc, jira, kerberos, kubernetes, ldap, microsoft.azure,
microsoft.mssql, microsoft.winrm, mongo, mssql, mysql, odbc, oracle, pagerduty, papermill, password,
pinot, postgres, presto, qds, rabbitmq, redis, salesforce, samba, segment, sendgrid, sentry,
singularity, slack, snowflake, spark, ssh, statsd, tableau, vertica, virtualenv, webhdfs, winrm,
yandexcloud, all, devel_ci

# END EXTRAS HERE

Expand Down
16 changes: 16 additions & 0 deletions airflow/providers/apache/kylin/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
16 changes: 16 additions & 0 deletions airflow/providers/apache/kylin/example_dags/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
136 changes: 136 additions & 0 deletions airflow/providers/apache/kylin/example_dags/example_kylin_dag.py
@@ -0,0 +1,136 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is an example DAG which uses the KylinCubeOperator.
The tasks below include kylin build, refresh, merge operation.
"""
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.kylin.operators.kylin_cube import KylinCubeOperator
from airflow.utils.dates import days_ago

args = {
'owner': 'airflow',
'start_date': days_ago(1),
}

dag = DAG(
dag_id='example_kylin_operator',
default_args=args,
schedule_interval=None,
tags=['example']
)


def gen_build_time(**kwargs):
"""
gen build time and push to xcom
:param kwargs:
:return:
"""
ti = kwargs['ti']
ti.xcom_push(key='date_start', value='1325347200000')
ti.xcom_push(key='date_end', value='1325433600000')


gen_build_time_task = PythonOperator(
python_callable=gen_build_time,
task_id='gen_build_time',
dag=dag
)

build_task1 = KylinCubeOperator(
task_id="kylin_build_1",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='build',
start_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_start') }}",
end_time="{{ task_instance.xcom_pull(task_ids='gen_build_time',key='date_end') }}",
is_track_job=True,
dag=dag,
)

build_task2 = KylinCubeOperator(
task_id="kylin_build_2",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='build',
start_time='1325433600000',
end_time='1325520000000',
is_track_job=True,
dag=dag,
)

refresh_task1 = KylinCubeOperator(
task_id="kylin_refresh_1",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='refresh',
start_time='1325347200000',
end_time='1325433600000',
is_track_job=True,
dag=dag,
)

merge_task = KylinCubeOperator(
task_id="kylin_merge",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='merge',
start_time='1325347200000',
end_time='1325520000000',
is_track_job=True,
dag=dag,
)

disable_task = KylinCubeOperator(
task_id="kylin_disable",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='disable',
dag=dag,
)

purge_task = KylinCubeOperator(
task_id="kylin_purge",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='purge',
dag=dag,
)

build_task3 = KylinCubeOperator(
task_id="kylin_build_3",
kylin_conn_id='kylin_default',
project='learn_kylin',
cube='kylin_sales_cube',
command='build',
start_time='1325433600000',
end_time='1325520000000',
dag=dag,
)

gen_build_time_task >> build_task1 >> build_task2 >> refresh_task1 >> merge_task
merge_task >> disable_task >> purge_task >> build_task3
16 changes: 16 additions & 0 deletions airflow/providers/apache/kylin/hooks/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
77 changes: 77 additions & 0 deletions airflow/providers/apache/kylin/hooks/kylin.py
@@ -0,0 +1,77 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from typing import Optional

from kylinpy import exceptions, kylinpy

from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook


class KylinHook(BaseHook):
"""
:param kylin_conn_id: The connection id as configured in Airflow administration.
:type kylin_conn_id: str
:param project: porject name
:type project: Optional[str]
:param dsn: dsn
:type dsn: Optional[str]
"""
def __init__(self,
kylin_conn_id: Optional[str] = 'kylin_default',
project: Optional[str] = None,
dsn: Optional[str] = None
):
super().__init__()
self.kylin_conn_id = kylin_conn_id
self.project = project
self.dsn = dsn

def get_conn(self):
conn = self.get_connection(self.kylin_conn_id)
if self.dsn:
return kylinpy.create_kylin(self.dsn)
else:
self.project = self.project if self.project else conn.schema
return kylinpy.Kylin(conn.host, username=conn.login,
password=conn.password, port=conn.port,
project=self.project, **conn.extra_dejson)

def cube_run(self, datasource_name, op, **op_args):
"""
run CubeSource command whitch in CubeSource.support_invoke_command
:param datasource_name:
:param op: command
:param op_args: command args
:return: response
"""
cube_source = self.get_conn().get_datasource(datasource_name)
try:
response = cube_source.invoke_command(op, **op_args)
return response
except exceptions.KylinError as err:
raise AirflowException("Cube operation {} error , Message: {}".format(op, err))

def get_job_status(self, job_id):
"""
get job status
:param job_id: kylin job id
:return: job status
"""
return self.get_conn().get_job(job_id).status
16 changes: 16 additions & 0 deletions airflow/providers/apache/kylin/operators/__init__.py
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

0 comments on commit a2c5389

Please sign in to comment.