Skip to content

Commit

Permalink
Merge pull request #68 from grycap/dev-calarcon
Browse files Browse the repository at this point in the history
dCache integration as input storage provider
  • Loading branch information
catttam committed Mar 14, 2023
2 parents 44f6fcc + 69f1eda commit 47337f1
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 13 deletions.
20 changes: 18 additions & 2 deletions faassupervisor/events/__init__.py
Expand Up @@ -38,16 +38,16 @@
from faassupervisor.events.minio import MinioEvent
from faassupervisor.events.onedata import OnedataEvent
from faassupervisor.events.s3 import S3Event
from faassupervisor.events.dCache import DCacheEvent
from faassupervisor.events.unknown import UnknownEvent
from faassupervisor.logger import get_logger
from faassupervisor.exceptions import exception, UnknowStorageEventWarning
from faassupervisor.utils import SysUtils
from faassupervisor.utils import SysUtils, ConfigUtils

_S3_EVENT = "aws:s3"
_MINIO_EVENT = "minio:s3"
_ONEDATA_EVENT = "OneTrigger"


def _is_api_gateway_event(event_info):
return 'httpMethod' in event_info

Expand All @@ -65,6 +65,8 @@ def _is_storage_event(event_info):
def _is_delegated_event(event_info):
return 'event' in event_info

def _is_dcache_event(event_info):
return 'event' in event_info and 'subscription' in event_info

@exception()
def _parse_storage_event(event, storage_provider='default'):
Expand All @@ -82,6 +84,16 @@ def _parse_storage_event(event, storage_provider='default'):
raise UnknowStorageEventWarning()
return parsed_event

def _parse_dcache_event(event, storage_provider='dcache'):
input_values = ConfigUtils.read_cfg_var('input')
provider = list(filter(lambda x: (x['storage_provider'] == 'webdav.dcache'),input_values))
input_path = ''
if len(provider): input_path = provider[0]['path']
else: get_logger().warning('There is no dcache input defined for this function.')
parsed_event = DCacheEvent(event, storage_provider)
parsed_event.set_path(input_path)
get_logger().info("DCACHE event created")
return parsed_event

def _set_storage_env_vars(parsed_event, event):
# Store 'object_key' in environment variable
Expand All @@ -107,6 +119,10 @@ def parse_event(event, storage_provider="default"):
get_logger().info("API Gateway event found.")
parsed_event = ApiGatewayEvent(event)
return parse_event(parsed_event.body)
if _is_dcache_event(event):
get_logger().info("Dcache event found.")
parsed_event = _parse_dcache_event(event)
return parsed_event
if _is_delegated_event(event):
get_logger().info("Delegated event found.")
if 'storage_provider' in event:
Expand Down
38 changes: 38 additions & 0 deletions faassupervisor/events/dCache.py
@@ -0,0 +1,38 @@
# Copyright (C) GRyCAP - I3M - UPV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""dCache event example:
{"event":
{"name":"image2.jpg",
"mask":["IN_CREATE"]},
"subscription":"https://prometheus.desy.de:3880/api/v1/events/channels/oyGcraV_6abmXQU0_yMApQ/subscriptions/inotify/AACvM"}
"""

from faassupervisor.events.unknown import UnknownEvent
from faassupervisor.utils import SysUtils

class DCacheEvent(UnknownEvent):
"""Class to parse the dCache event."""

_TYPE = 'DCACHE'

def __init__(self, event, provider_id='dcache'):
super().__init__(event.get('event') or event)
self.provider_id = provider_id

def _set_event_params(self):
self.file_name = self.event['name']
self.event_time = None

def set_path(self, path):
self.object_key = SysUtils.join_paths(path, self.file_name)
3 changes: 2 additions & 1 deletion faassupervisor/events/unknown.py
Expand Up @@ -17,7 +17,7 @@
import base64
import uuid
from faassupervisor.utils import SysUtils, FileUtils

from faassupervisor.logger import get_logger

class UnknownEvent():
"""Class to manage unknown events."""
Expand All @@ -31,6 +31,7 @@ def __init__(self, event):
records = event.get('Records')
if records:
self.event_records = records[0]
get_logger().info("event_records = %s \n", self.event_records)
self._set_event_params()

def _set_event_params(self):
Expand Down
3 changes: 3 additions & 0 deletions faassupervisor/storage/config.py
Expand Up @@ -189,6 +189,9 @@ def _get_input_auth_data(self, parsed_event):
This methods allows to filter ONEDATA provider when multiple inputs are defined."""
storage_type = parsed_event.get_type()
# Change storage type for dCache events to use WebDav
if storage_type == 'DCACHE':
storage_type = 'WEBDAV'
if storage_type == 'ONEDATA':
# Check input path and event object_key
if hasattr(parsed_event, 'object_key'):
Expand Down
2 changes: 1 addition & 1 deletion faassupervisor/storage/providers/__init__.py
Expand Up @@ -53,5 +53,5 @@ def upload_file(self, file_path, file_name, output_path):

def get_type(self):
"""Returns the storage type.
Can be LOCAL, MINIO, ONEDATA, S3."""
Can be LOCAL, MINIO, ONEDATA, S3, WEBDAV."""
return self._TYPE
25 changes: 22 additions & 3 deletions faassupervisor/storage/providers/webdav.py
@@ -1,8 +1,26 @@
# Copyright (C) GRyCAP - I3M - UPV
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Module containing all the classes and methods
related with the WebDav storage provider. """

from faassupervisor.storage.providers import DefaultStorageProvider
from webdav3.client import Client

from faassupervisor.utils import SysUtils

class WebDav(DefaultStorageProvider):
"""Class that manages downloads and uploads from providers that use WebDav."""

_TYPE = "WEBDAV"

def __init__(self, stg_auth):
Expand All @@ -18,9 +36,10 @@ def _get_client(self):
}
return Client(options=options)

# a webdav storage provider as input is not suported but the method has to be created
def download_file(self, parsed_event, input_dir_path):
pass
file_download_path = SysUtils.join_paths(input_dir_path, parsed_event.file_name)
self.client.download_sync(remote_path=parsed_event.object_key, local_path=file_download_path)
return file_download_path

def upload_file(self, file_path, file_name, output_path):
if self.client.check(output_path):
Expand Down
10 changes: 5 additions & 5 deletions faassupervisor/supervisor.py
Expand Up @@ -66,12 +66,12 @@ def _parse_input(self):
but one event always represents only one file (so far), so only
one provider is going to be used for each event received.
"""
# Parse the 'download_input' config var
download_input = ConfigUtils.read_cfg_var('file_stage_in')
if download_input == '':
download_input = True
# Parse the 'file_stage_in' config var
skip_download = ConfigUtils.read_cfg_var('file_stage_in')
if skip_download == '':
skip_download = False
# Parse input file
if download_input is False:
if skip_download is True:
get_logger().info('Skipping download of input file.')
else:
input_file_path = self.stg_config.download_input(self.parsed_event,
Expand Down
2 changes: 1 addition & 1 deletion faassupervisor/version.py
Expand Up @@ -14,4 +14,4 @@
"""Stores the package version."""


__version__ = '1.5.6'
__version__ = '1.5.7'
14 changes: 14 additions & 0 deletions test/unit/faassupervisor/events.py
Expand Up @@ -22,6 +22,7 @@
from faassupervisor.events.minio import MinioEvent
from faassupervisor.events.onedata import OnedataEvent
from faassupervisor.events.unknown import UnknownEvent
from faassupervisor.events.dCache import DCacheEvent
from faassupervisor.events.apigateway import ApiGatewayEvent

# pylint: disable=missing-docstring
Expand Down Expand Up @@ -71,6 +72,9 @@
'isBase64Encoded': False,
'queryStringParameters': {'q1':'v1', 'q2':'v2'}}

DCACHE_EVENT = {"event": {"name":"image2.jpg",
"mask":["IN_CREATE"]},
"subscription":"https://prometheus.desy.de:3880/api/v1/events/channels/oyGcraV_6abmXQU0_yMApQ/subscriptions/inotify/AAC"}

class EventModuleTest(unittest.TestCase):

Expand Down Expand Up @@ -98,6 +102,10 @@ def test_parse_storage_event_onedata(self):
result = events._parse_storage_event(ONEDATA_EVENT)
self.assertIsInstance(result, OnedataEvent)

def test_parse_event_dcache(self):
result = events.parse_event(DCACHE_EVENT)
self.assertIsInstance(result, DCacheEvent)

def test_parse_storage_event_unknown(self):
result = events._parse_storage_event(UNKNOWN_EVENT)
self.assertIsNone(result)
Expand Down Expand Up @@ -204,6 +212,12 @@ def test_s3_event_creation(self):
self.assertEqual(event.file_name, "dog.jpg")
self.assertEqual(event.get_type(), "S3")

class DCacheEventTest(unittest.TestCase):

def test_dcache_event_creation(self):
event = DCacheEvent(DCACHE_EVENT)
self.assertEqual(event.file_name, "image2.jpg")
self.assertEqual(event.get_type(), "DCACHE")

class UnknownEventTest(unittest.TestCase):

Expand Down

0 comments on commit 47337f1

Please sign in to comment.