Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kfp component files to include optional parameter #1854

Merged
merged 5 commits into from Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
37 changes: 37 additions & 0 deletions LICENSE
Expand Up @@ -241,3 +241,40 @@ OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

-------------

Various Kubeflow Pipelines component examples (elyra/pipeline/resources/kfp)

Copyright 2021 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-------------

Kubeflow Pipelines kfserving example (elyra/pipeline/resources/kfp)

# Copyright 2019 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

-------------
21 changes: 10 additions & 11 deletions elyra/pipeline/component.py
Expand Up @@ -242,7 +242,7 @@ def type(self) -> str:
return self.type

@abstractmethod
def read_component_definition(self, component_id: str, location: str) -> str:
def read_component_definition(self, registry_entry: dict) -> str:
raise NotImplementedError()


Expand All @@ -252,13 +252,12 @@ class FilesystemComponentReader(ComponentReader):
"""
type = 'filename'

def read_component_definition(self, component_id: str, location: str) -> str:
component_location = os.path.join(os.path.dirname(__file__), location)
if not os.path.exists(component_location):
self.log.error(f'Invalid location for component: {component_id} -> {component_location}')
raise FileNotFoundError(f'Invalid location for component: {component_id} -> {component_location}')
def read_component_definition(self, registry_entry: dict) -> str:
if not os.path.exists(registry_entry.location):
self.log.error(f'Invalid location for component: {registry_entry.id} -> {registry_entry.location}')
raise FileNotFoundError(f'Invalid location for component: {registry_entry.id} -> {registry_entry.location}')

with open(component_location, 'r') as f:
with open(registry_entry.location, 'r') as f:
return f.read()


Expand All @@ -268,11 +267,11 @@ class UrlComponentReader(ComponentReader):
"""
type = 'url'

def read_component_definition(self, component_id: str, location: str) -> str:
res = requests.get(location)
def read_component_definition(self, registry_entry: dict) -> str:
res = requests.get(registry_entry.location)
if res.status_code != HTTPStatus.OK:
self.log.error (f'Invalid location for component: {component_id} -> {location} (HTTP code {res.status_code})') # noqa: E211 E501
raise FileNotFoundError (f'Invalid location for component: {component_id} -> {location} (HTTP code {res.status_code})') # noqa: E211 E501
self.log.error (f'Invalid location for component: {registry_entry.id} -> {registry_entry.location} (HTTP code {res.status_code})') # noqa: E211 E501
raise FileNotFoundError (f'Invalid location for component: {registry_entry.id} -> {registry_entry.location} (HTTP code {res.status_code})') # noqa: E211 E501

return res.text

Expand Down
9 changes: 1 addition & 8 deletions elyra/pipeline/component_parser_airflow.py
Expand Up @@ -14,7 +14,6 @@
# limitations under the License.
#
import ast
import os
import re
from typing import List

Expand All @@ -40,11 +39,6 @@ def parse(self, registry_entry) -> List[Component]:

component_definition = self._read_component_definition(registry_entry)

# Adjust filename for display on frontend
if registry_entry.type == "filename":
registry_entry.location = os.path.join(os.path.dirname(__file__),
registry_entry.location)

# If id is prepended with elyra_op_, only parse for the class specified in the id.
# Else, parse the component definition for all classes
if registry_entry.adjusted_id:
Expand Down Expand Up @@ -188,7 +182,6 @@ def _read_component_definition(self, registry_entry):
Delegate to ComponentReader to read component definition
"""
reader = self._get_reader(registry_entry)
component_definition = \
reader.read_component_definition(registry_entry.id, registry_entry.location)
component_definition = reader.read_component_definition(registry_entry)

return component_definition
10 changes: 1 addition & 9 deletions elyra/pipeline/component_parser_kfp.py
Expand Up @@ -13,15 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
from typing import List
kiersten-stokes marked this conversation as resolved.
Show resolved Hide resolved

import yaml

from elyra.pipeline.component import Component
from elyra.pipeline.component import ComponentParser
from elyra.pipeline.component import ComponentProperty
from elyra.pipeline.component import FilesystemComponentReader


class KfpComponentParser(ComponentParser):
Expand All @@ -36,11 +34,6 @@ def get_adjusted_component_id(self, component_id):
def parse(self, registry_entry) -> List[Component]:
component_yaml = self._read_component_yaml(registry_entry)

# Adjust filename for display on frontend
if registry_entry.type == FilesystemComponentReader.type:
registry_entry.location = os.path.join(os.path.dirname(__file__),
registry_entry.location)

description = ""
if component_yaml.get('description'):
description = ' '.join(component_yaml.get('description').split())
Expand Down Expand Up @@ -136,8 +129,7 @@ def _read_component_yaml(self, registry_entry):
"""
try:
reader = self._get_reader(registry_entry)
component_definition = \
reader.read_component_definition(registry_entry.id, registry_entry.location)
component_definition = reader.read_component_definition(registry_entry)

return yaml.safe_load(component_definition)
except yaml.YAMLError as e:
Expand Down
14 changes: 13 additions & 1 deletion elyra/pipeline/component_registry.py
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
#
import json
import os
import time
from types import SimpleNamespace
from typing import Dict
Expand Down Expand Up @@ -145,17 +146,28 @@ def _read_component_registry(self) -> List:
self.log.debug(f"Component registry: processing component {component_entry.get('name')}")

component_type = next(iter(component_entry.get('location')))
component_location = self._get_absolute_location(component_type,
component_entry["location"][component_type])
entry = {
"id": component_id,
"name": component_entry["name"],
"type": component_type,
"location": component_entry["location"][component_type],
"location": component_location,
"adjusted_id": None
}
component_entries.append(SimpleNamespace(**entry))

return component_entries

def _get_absolute_location(self, component_type: str, component_path: str):
"""
Gets the absolute path for a component from a file-based registry
"""
if component_type == "filename":
component_resource_dir = "resources/" + self._parser._type
component_path = os.path.join(os.path.dirname(__file__), component_resource_dir, component_path)
return component_path

def _get_component_registry_entry(self, component_id):
"""
Get the body of the component registry entry with the given id
Expand Down
1 change: 1 addition & 0 deletions elyra/pipeline/parser.py
Expand Up @@ -264,6 +264,7 @@ def _get_remaining_component_params(self, node: Dict):
if key in ["filename", "runtime_image", "cpu", "gpu", "memory", "dependencies", "env_vars", "outputs",
"include_subdirectories", "ui_data", "component_source", "component_source_type", "label",
"image", "description", "properties", "invalidNodeError", "runtime"]:

continue

component_params[key] = value
Expand Down
157 changes: 157 additions & 0 deletions elyra/pipeline/resources/airflow/slack_operator.py
@@ -0,0 +1,157 @@
# flake8: noqa
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.hooks.slack_hook import SlackHook
from airflow.exceptions import AirflowException


class SlackAPIOperator(BaseOperator):
"""
Base Slack Operator
The SlackAPIPostOperator is derived from this operator.
In the future additional Slack API Operators will be derived from this class as well

:param slack_conn_id: Slack connection ID which its password is Slack API token
:type slack_conn_id: str
:param token: Slack API token (https://api.slack.com/web)
:type token: str
:param method: The Slack API Method to Call (https://api.slack.com/methods)
:type method: str
:param api_params: API Method call parameters (https://api.slack.com/methods)
:type api_params: dict
"""

@apply_defaults
def __init__(self,
slack_conn_id=None,
token=None,
method=None,
api_params=None,
*args, **kwargs):
super(SlackAPIOperator, self).__init__(*args, **kwargs)

if token is None and slack_conn_id is None:
raise AirflowException('No valid Slack token nor slack_conn_id supplied.')
if token is not None and slack_conn_id is not None:
raise AirflowException('Cannot determine Slack credential '
'when both token and slack_conn_id are supplied.')

self.token = token
self.slack_conn_id = slack_conn_id

self.method = method
self.api_params = api_params

def construct_api_call_params(self):
"""
Used by the execute function. Allows templating on the source fields
of the api_call_params dict before construction

Override in child classes.
Each SlackAPIOperator child class is responsible for
having a construct_api_call_params function
which sets self.api_call_params with a dict of
API call parameters (https://api.slack.com/methods)
"""

def execute(self, **kwargs):
"""
SlackAPIOperator calls will not fail even if the call is not unsuccessful.
It should not prevent a DAG from completing in success
"""
if not self.api_params:
self.construct_api_call_params()
slack = SlackHook(token=self.token, slack_conn_id=self.slack_conn_id)
slack.call(self.method, self.api_params)


class SlackAPIPostOperator(SlackAPIOperator):
"""
Posts messages to a slack channel

:param slack_conn_id: Slack connection ID which its password is Slack API token
:type slack_conn_id: str
:param token: Slack API token (https://api.slack.com/web)
:type token: str
:param method: The Slack API Method to Call (https://api.slack.com/methods)
:type method: str
:param api_params: API Method call parameters (https://api.slack.com/methods)
:type api_params: dict
:param channel: channel in which to post message on slack name (#general) or
ID (C12318391). (templated)
:type channel: str
:param username: Username that airflow will be posting to Slack as. (templated)
:type username: str
:param text: message to send to slack. (templated)
:type text: str
:param icon_url: url to icon used for this message
:type icon_url: str
:param attachments: extra formatting details. (templated)
- see https://api.slack.com/docs/attachments.
:type attachments: list of hashes
:param blocks: extra block layouts. (templated)
- see https://api.slack.com/reference/block-kit/blocks.
:type blocks: list of hashes
"""

template_fields = ('username', 'text', 'attachments', 'blocks', 'channel')
ui_color = '#FFBA40'

@apply_defaults
def __init__(self,
slack_conn_id=None,
token=None,
api_params=None,
channel='#general',
username='Airflow',
text='No message has been set.\n'
'Here is a cat video instead\n'
'https://www.youtube.com/watch?v=J---aiyznGQ',
icon_url='https://raw.githubusercontent.com/apache/'
'airflow/master/airflow/www/static/pin_100.png',
attachments=None,
blocks=None,
*args, **kwargs):
self.method = 'chat.postMessage'
self.channel = channel
self.username = username
self.text = text
self.icon_url = icon_url
self.attachments = attachments or []
self.blocks = blocks or []
super(SlackAPIPostOperator, self).__init__(method=self.method,
token=token,
slack_conn_id=slack_conn_id,
api_params=api_params,
*args, **kwargs)

def construct_api_call_params(self):
self.api_params = {
'channel': self.channel,
'username': self.username,
'text': self.text,
'icon_url': self.icon_url,
'attachments': json.dumps(self.attachments),
'blocks': json.dumps(self.blocks),
}
@@ -0,0 +1,41 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Component source location: https://raw.githubusercontent.com/kubeflow/pipelines/master/components/sample/Shell_script/component.yaml
# Component details: Takes a text file and a regex pattern filter to produce a filtered text file
name: Filter text
akchinSTC marked this conversation as resolved.
Show resolved Hide resolved
inputs:
- {name: Text, optional: false, description: 'Path to file to be filtered'}
- {name: Pattern, optional: true, default: '.*', description: 'Regex pattern'}
outputs:
- {name: Filtered text}
metadata:
annotations:
author: Alexey Volkov <alexey.volkov@ark-kun.com>
implementation:
container:
image: alpine
command:
- sh
- -ec
- |
text_path=$0
pattern=$1
filtered_text_path=$2
mkdir -p "$(dirname "$filtered_text_path")"

grep "$pattern" < "$text_path" > "$filtered_text_path"
- {inputPath: Text}
- {inputValue: Pattern}
- {outputPath: Filtered text}