Skip to content

Commit

Permalink
Add airflow package catalog connector (#2437)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptitzler committed Feb 7, 2022
1 parent ca04453 commit 9de4439
Show file tree
Hide file tree
Showing 8 changed files with 425 additions and 3 deletions.
3 changes: 3 additions & 0 deletions MANIFEST.in
Expand Up @@ -44,3 +44,6 @@ recursive-include etc/config/jupyter_server_config.d *.json
recursive-include etc/config/metadata/runtime-images *.json
recursive-include etc/config/metadata/component-registries *.json
recursive-include etc/config/settings/ *.json

# Include Airflow catalog connector schemas
include elyra/pipeline/airflow/package_catalog_connector/airflow-package-catalog.json
19 changes: 19 additions & 0 deletions elyra/pipeline/airflow/package_catalog_connector/README.md
@@ -0,0 +1,19 @@
### Catalog connector for Apache Airflow packages

This catalog connector enables Elyra to load operator definitions from [Apache Airflow Python packages](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html). Only built distributions ('`.whl`') are supported.

### Use the connector

1. Launch JupyterLab.
1. [Open the '`Manage Components`' panel](
https://elyra.readthedocs.io/en/stable/user_guide/pipeline-components.html#managing-custom-components-using-the-jupyterlab-ui).
1. Add a new Airflow package catalog ('`+`' > '`New Apache Airflow package operator catalog`').
1. Specify a catalog name, e.g. '`Airflow 1.10.15 wheel`'.
1. (Optional) Specify a category under which the loaded operators will be organized in the palette.
1. Configure the '`Airflow package download URL`'. The URL must reference a location that Elyra can access using an HTTP GET request, without the need to authenticate. If the Airflow package is stored on PyPI:
1. Search for the Apache Airflow package on PyPI.
1. Open the package's release history and choose the desired version.
1. Open the `Download files` link.
1. Copy the download link for the package's wheel. ([Example download URL for Apache Airflow 1.10.15](https://files.pythonhosted.org/packages/f0/3a/f5ce74b2bdbbe59c925bb3398ec0781b66a64b8a23e2f6adc7ab9f1005d9/apache_airflow-1.10.15-py2.py3-none-any.whl))
1. Save the catalog entry.
1. Open the Visual Pipeline Editor and expand the palette. The loaded Apache Airflow operators are displayed.
15 changes: 15 additions & 0 deletions elyra/pipeline/airflow/package_catalog_connector/__init__.py
@@ -0,0 +1,15 @@
#
# Copyright 2018-2022 Elyra Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
@@ -0,0 +1,74 @@
{
"$schema": "https://raw.githubusercontent.com/elyra-ai/elyra/master/elyra/metadata/schemas/meta-schema.json",
"$id": "https://raw.githubusercontent.com/elyra-ai/elyra/master/elyra/pipeline/airflow/package_catalog_connector/airflow-package-catalog.json",
"title": "Apache Airflow package operator catalog",
"name": "airflow-package-catalog",
"schemaspace": "component-catalogs",
"schemaspace_id": "8dc89ca3-4b90-41fd-adb9-9510ad346620",
"metadata_class_name": "elyra.pipeline.component_metadata.ComponentCatalogMetadata",
"uihints": {
"title": "Apache Airflow core operator catalog",
"icon": "",
"reference_url": "https://github.com/elyra-ai/elyra/tree/master/elyra/pipeline/airflow/package_catalog_connector"
},
"properties": {
"schema_name": {
"title": "Schema Name",
"description": "The schema associated with this instance",
"type": "string",
"const": "airflow-package-catalog"
},
"display_name": {
"title": "Display Name",
"description": "Display name of this Component Catalog",
"type": "string",
"minLength": 1
},
"metadata": {
"description": "Additional data specific to this metadata",
"type": "object",
"properties": {
"description": {
"title": "Description",
"description": "Description of this Component Catalog",
"type": "string",
"default": "Airflow package operator catalog"
},
"runtime_type": {
"title": "Runtime",
"description": "List of runtime types this catalog supports",
"type": "string",
"enum": ["APACHE_AIRFLOW"],
"default": "APACHE_AIRFLOW",
"uihints": {
"field_type": "dropdown"
}
},
"categories": {
"title": "Category Names",
"description": "Assign the operators in the catalog to one or more categories, to group them in the visual pipeline editor palette.",
"type": "array",
"items": {
"type": "string",
"maxLength": 18
},
"default": ["Core packages"],
"uihints": {
"field_type": "array",
"category": "Component Categories"
}
},
"airflow_package_download_url": {
"title": "Airflow package download URL",
"description": "URL where the Apache Airflow package wheel can be downloaded.",
"type": "string",
"uihints": {
"category": "Source"
}
}
},
"required": ["runtime_type", "airflow_package_download_url"]
}
},
"required": ["schema_name", "display_name", "metadata"]
}
@@ -0,0 +1,256 @@
#
# Copyright 2018-2022 Elyra Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import ast
import os
from pathlib import Path
import shutil
from tempfile import mkdtemp
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from urllib.parse import urlparse
import zipfile

import requests

from elyra.pipeline.catalog_connector import ComponentCatalogConnector


class AirflowPackageCatalogConnector(ComponentCatalogConnector):
"""
Provides access to operators that are defined in Apache Airflow wheel archives.
"""

def get_catalog_entries(self, catalog_metadata: Dict[str, Any]) -> List[Dict[str, Any]]:

"""
Returns a list containing an entry for each Apache Airflow operator that was
detected in the specified Apache Airflow wheel archive.
:param catalog_metadata: contains information needed to download the archive using an HTTP GET request
"""

# Return data structure contains a list of Python scripts in the referenced
# Apache Airflow wheel that appear to implement at least one operator
# Each entry defines two keys:
# - 'airflow_package': The name of the Airflow package the user specified.
# This is included for informational purposes only.
# - 'file': Python script name
operator_key_list = []

# Read the user-supplied 'airflow_package_download_url', which is a required
# input defined in the 'airflow-package-catalog-catalog.json' schema file.
# Example value: https://archive.apache.org/dist/airflow/1.10.15/apache_airflow-1.10.15-py2.py3-none-any.whl
airflow_package_download_url = catalog_metadata['airflow_package_download_url']
# extract the package name, e.g. 'apache_airflow-1.10.15-py2.py3-none-any.whl'
airflow_package_name = Path(urlparse(airflow_package_download_url).path).name

if not airflow_package_name:
self.log.error('Error. The Airflow package connector is not configured properly. '
f'The package download URL \'{airflow_package_download_url}\' '
'does not include a file name.')
return operator_key_list

# tmp_archive_dir is used to store the downloaded archive and as working directory
if hasattr(self, 'tmp_archive_dir'):
# if the directory exists remove it in case the archive content has changed
shutil.rmtree(self.tmp_archive_dir.name, ignore_errors=True)
self.tmp_archive_dir = Path(mkdtemp())

try:
self.log.debug(f'Downloading Apache Airflow package from \'{airflow_package_download_url}\' ...')

# download archive; abort after 30 seconds
response = requests.get(airflow_package_download_url,
timeout=30,
allow_redirects=True)
if response.status_code != 200:
# download failed. Log error and abort processing
self.log.error('Error. The Airflow package connector is not configured properly. '
f'Download of archive \'{airflow_package_download_url}\' '
f'failed. HTTP response code: {response.status_code}')
return operator_key_list

# save downloaded archive
archive = str(self.tmp_archive_dir / airflow_package_name)
self.log.debug(f'Saving downloaded archive in \'{archive}\' ...')
with open(archive, 'wb') as archive_fh:
archive_fh.write(response.content)

# extract archive
self.log.debug(f'Extracting Airflow archive \'{archive}\' ...')
try:
with zipfile.ZipFile(archive, 'r') as zip_ref:
zip_ref.extractall(self.tmp_archive_dir)
except Exception as ex:
self.log.error('Error. The Airflow package connector is not configured properly. '
f'Error extracting downloaded Airflow archive \'{archive}\': '
f'{ex}')
os.remove(archive)
return operator_key_list

# delete archive
self.log.debug(f'Deleting downloaded Airflow archive \'{archive}\' ...')
os.remove(archive)

# Locate Python scripts that are stored in the 'airflow/operators' directory
python_scripts = [str(s) for s in self.tmp_archive_dir.glob('airflow/operators/*.py')]

#
# Identify Python scripts that define classes that extend the
# airflow.models.BaseOperator class
#
scripts_with_operator_class: List[str] = [] # Python scripts that contain operator definitions
extends_baseoperator: List[str] = [] # Classes that extend BaseOperator
classes_to_analyze = {}
imported_operator_classes: List[str] = [] # Imported operator classes
offset = len(str(self.tmp_archive_dir)) + 1
script_count = 0 # used for stats collection
# process each Python script ...
for script in python_scripts:
script_id = script[offset:]
if script_id == 'airflow/operators/__init__.py':
continue
script_count += 1
self.log.debug(f'Parsing \'{script}\' ...')
with open(script, 'r') as source_code:
# parse source code
tree = ast.parse(source_code.read())
# identify imports and class definitions
for node in ast.walk(tree):
if isinstance(node, ast.Import):
# Need to handle 'import airflow' in the future,
# should this ever surface in operator source code files
pass
elif isinstance(node, ast.ImportFrom):
node_module = node.module
for name in node.names:
if 'airflow.models' == node_module and name.name == 'BaseOperator':
imported_operator_classes.append(name.name)
elif isinstance(node, ast.ClassDef):
# determine whether this class extends the BaseOperator class
self.log.debug(f'Analyzing class \'{node.name}\' in {script_id} ...')
self.log.debug(f' Class {node.name} extends {[n.id for n in node.bases]}')
# determine whether class extends one of the imported operator classes
if len(node.bases) == 0:
# class does not extend other classes; it therefore does
# not extend the Airflow BaseOperator; skip class
continue
for base in node.bases:
extends = False
if base.id in imported_operator_classes:
# class extends Airflow BaseOperator
extends = True
extends_baseoperator.append(node.name)
if script_id not in scripts_with_operator_class:
scripts_with_operator_class.append(script_id)
break
if extends is False:
# need to further analyze whether this class
# extends Airflow BaseOperator
classes_to_analyze[node.name] = {
'node': node,
'file': script_id
}

# Identify classes that extend BaseOperator by extending
# classes that were identified as extending BaseOperator
# Example:
# class MyBaseOperator(BaseOperator)
# class MyOperator(MyBaseOperator)
analysis_complete = len(classes_to_analyze) == 0
while analysis_complete is False:
# assume that analysis is complete until proven otherwise
analysis_complete = True
for class_name in list(classes_to_analyze.keys()):
self.log.debug(f'Re-analyzing class \'{class_name}\' in '
f"'{classes_to_analyze[class_name]['file']}\'... ")
for base in classes_to_analyze[class_name]['node'].bases:
if base.id in extends_baseoperator:
# this class extends BaseOperator
extends_baseoperator.append(class_name)
if classes_to_analyze[class_name]['file'] not in scripts_with_operator_class:
scripts_with_operator_class.append(classes_to_analyze[class_name]['file'])
# remove class from todo list
del classes_to_analyze[class_name]
# A new class was discovered that implements
# BaseOperator. Analysis needs to be repeated because
# OTHER classes might extend THIS class.
analysis_complete = False
break

# Populate return data structure
for script in scripts_with_operator_class:
operator_key_list.append({
'airflow_package': airflow_package_name,
'file': script})

# Dump stats
self.log.info(f'Analysis of \'{airflow_package_download_url}\' completed. '
f'Located {len(extends_baseoperator)} operator classes '
f'in {len(scripts_with_operator_class)} Python scripts.')
self.log.debug(f'Operator key list: {operator_key_list}')
except Exception as ex:
self.log.error('Error retrieving operator list from Airflow package '
f'{airflow_package_download_url}: {ex}')

return operator_key_list

def read_catalog_entry(self,
catalog_entry_data: Dict[str, Any],
catalog_metadata: Dict[str, Any]) -> Optional[str]:
"""
Fetch the component that is identified by catalog_entry_data from
the downloaded Apache Airflow package.
:param catalog_entry_data: a dictionary that contains the information needed to read the content
of the component definition
:param catalog_metadata: the metadata associated with the catalog in which this catalog entry is
stored; in addition to catalog_entry_data, catalog_metadata may also be
needed to read the component definition for certain types of catalogs
:returns: the content of the given catalog entry's definition in string form
"""
operator_file_name = catalog_entry_data.get('file')

if hasattr(self, 'tmp_archive_dir') is False:
# Log error and return None
self.log.error('Error. Cannot fetch operator definition. The '
' downloaded Airflow package archive was not found.')
return None

# load operator source using the provided key
operator_source = self.tmp_archive_dir / operator_file_name
self.log.debug(f'Reading operator source \'{operator_source}\' ...')
try:
with open(operator_source, 'r') as source:
return source.read()
except Exception as ex:
self.log.error(f'Error reading operator source \'{operator_source}\': {ex}')

return None

def get_hash_keys(self) -> List[Any]:
"""
Instructs Elyra to use the specified keys to generate a unique
hash value for item returned by get_catalog_entries
:returns: a list of keys
"""
# Example key values:
# - file: operators/bash_operator.py
return ['file']

0 comments on commit 9de4439

Please sign in to comment.