Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance loader plugin #17

Merged
merged 24 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d340d87
add new parameter databus_base_url
msaipraneeth Aug 23, 2023
a78167e
add new parameter DatabusSearch type
msaipraneeth Aug 28, 2023
898d48c
add FacetSearch parameter type
msaipraneeth Aug 28, 2023
b72b6e2
add parameter artifact version
msaipraneeth Aug 30, 2023
a9c67fd
filter databus file based on format and version
msaipraneeth Aug 30, 2023
edaae53
fix lint issues
msaipraneeth Aug 30, 2023
3bccaa7
rename parameter name to databus artifact
msaipraneeth Aug 30, 2023
9ec68fe
switch to user context instead of super user context
msaipraneeth Aug 30, 2023
28fe656
update query to remove filtering of file from latest version
msaipraneeth Sep 1, 2023
be52706
add new resource parameter type
msaipraneeth Sep 3, 2023
aa95baa
use create resource to upload databus file to project
msaipraneeth Sep 3, 2023
9303c22
optimze imports
msaipraneeth Sep 3, 2023
f532376
update dependency
msaipraneeth Sep 3, 2023
309a81d
add query term to option, if that is not part of project files
msaipraneeth Sep 4, 2023
aae68e1
enable dynamic versioning
msaipraneeth Sep 4, 2023
c982c07
filter option based on query terms
msaipraneeth Sep 4, 2023
947e1ec
fix linter issues
msaipraneeth Sep 4, 2023
9e4a001
default chunk_size
msaipraneeth Sep 10, 2023
b174ab9
add test for loader
msaipraneeth Sep 10, 2023
bba0f05
fix label for Databus File ID parameter
msaipraneeth Sep 11, 2023
234cc57
minor modifications to test Constants
msaipraneeth Sep 11, 2023
d474534
extend documentation, sort auto-complete values, set default values
seebi Sep 11, 2023
3ec9f1d
add plugin_id=cmem-plugin-databus-Download
msaipraneeth Sep 12, 2023
cecf783
fix typo in plugin_id
msaipraneeth Sep 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
329 changes: 282 additions & 47 deletions cmem_plugin_databus/loader.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,313 @@
"""Plugin for loading one file from the databus and write it ino a dataset"""
from typing import Any, Optional

import requests
from cmem.cmempy.workspace.tasks import get_task
from cmem_plugin_base.dataintegration.context import ExecutionContext, ExecutionReport
from cmem.cmempy.workspace.projects.resources import get_resources
from cmem.cmempy.workspace.projects.resources.resource import create_resource
from cmem_plugin_base.dataintegration.context import (
ExecutionContext,
ExecutionReport,
PluginContext
)
from cmem_plugin_base.dataintegration.description import Plugin, PluginParameter
from cmem_plugin_base.dataintegration.parameter.dataset import DatasetParameterType
from cmem_plugin_base.dataintegration.plugins import WorkflowPlugin
from cmem_plugin_base.dataintegration.utils import setup_cmempy_super_user_access
from cmem_plugin_base.dataintegration.types import StringParameterType, Autocompletion
from cmem_plugin_base.dataintegration.utils import setup_cmempy_user_access

from cmem_plugin_databus.utils import (
byte_iterator_context_update,
get_clock,
fetch_api_search_result, fetch_facets_options, fetch_databus_files,
)
from cmem_plugin_databus.cmem_wrappers import post_streamed_bytes


class DatabusSearch(StringParameterType):
"""Databus Search Type"""

autocompletion_depends_on_parameters: list[str] = ["databus_base_url"]

# auto complete for values
allow_only_autocompleted_values: bool = True
# auto complete for labels
autocomplete_value_with_labels: bool = True

def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:

if not query_terms:
label = "Search for Databus artifacts"
return [Autocompletion(value="", label=f"{label}")]

databus_base_url = depend_on_parameter_values[0]
result = fetch_api_search_result(
databus_base=databus_base_url,
url_parameters={
"query": " ".join(query_terms),
"typeNameWeight": 0,
"minRelevance": 20,
"maxResults": 25,
"typeName": " Artifact"
}
)
return [
Autocompletion(
value=f"{_.resource}",
label=f"{_.label}",
)for _ in result
]


class FacetSearch(StringParameterType):
"""Facet Type"""

autocompletion_depends_on_parameters: list[str] = [
"databus_base_url",
"databus_artifact"
]

# auto complete for values
allow_only_autocompleted_values: bool = True
# auto complete for labels
autocomplete_value_with_labels: bool = False
#

def __init__(self, facet_option: str):
self.facet_option = facet_option

def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:

databus_base_url = depend_on_parameter_values[0]
databus_document = depend_on_parameter_values[1]
facet_options = fetch_facets_options(
databus_base=databus_base_url,
url_parameters={
"type": "artifact",
"uri": databus_document
}
)
_format = facet_options[self.facet_option]
if self.facet_option == "version":
_format = sorted(_format, reverse=True)
else:
_format = sorted(_format)

result = [
Autocompletion(
value=f"{_}",
label=f"{_}",
) for _ in _format
]
if query_terms:
result = [_ for _ in result if _.value.find(query_terms[0]) > -1]

return result


class ResourceParameterType(StringParameterType):
"""Resource parameter type."""
allow_only_autocompleted_values: bool = True

autocomplete_value_with_labels: bool = True

file_type: Optional[str] = None

def __init__(self, file_type: Optional[str] = None):
"""Dataset parameter type."""
self.file_type = file_type

def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:
setup_cmempy_user_access(context.user)
resources = get_resources(context.project_id)
result = [
Autocompletion(
value=f"{_['fullPath']}",
label=f"{_['name']}",
) for _ in resources
]
if query_terms:
result = [_ for _ in result if _.value.find(query_terms[0]) > -1]

if not result and query_terms:
result = [
Autocompletion(
value=f"{query_terms[0]}",
label=f"{query_terms[0]} (New resource)"
)
]

return result


class DatabusFile(StringParameterType):
"""Class for DatabusFile"""
autocompletion_depends_on_parameters: list[str] = [
"databus_base_url",
"databus_artifact",
"artifact_format",
"artifact_version"
]

# auto complete for values
allow_only_autocompleted_values: bool = True
# auto complete for labels
autocomplete_value_with_labels: bool = False

def autocomplete(
self,
query_terms: list[str],
depend_on_parameter_values: list[Any],
context: PluginContext,
) -> list[Autocompletion]:
databus_base_url = depend_on_parameter_values[0]
databus_document = depend_on_parameter_values[1]
artifact_format = depend_on_parameter_values[2]
artifact_version = depend_on_parameter_values[3]
result = fetch_databus_files(
endpoint=databus_base_url,
artifact=databus_document,
version=artifact_version,
file_format=artifact_format
)
finalized_result = []
for _ in result:
variant = _["variant"]["value"] \
if not _["variant"]["value"].startswith(", ") \
else _["variant"]["value"].replace(", ", "", 1)
finalized_result.append(
Autocompletion(
value=f"{_['file']['value']}",
label=f'Version={_["version"]["value"]}, '
f'Variant={variant}, '
f'Format={_["format"]["value"]}, '
f'Compression={_["compression"]["value"]}, '
f'Size={_["size"]["value"]} Bytes',
)
)
return finalized_result


class ResponseStream:
"""A context manager for streaming the content of an HTTP response in chunks.

This class allows you to stream the content of an HTTP response in manageable chunks
without loading the entire response into memory at once. It provides an iterable
interface to read the response content piece by piece."""

def __enter__(self):
return self._read()

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def __init__(self, response, chunk_size=1048576):
self.response = response
self.chunk_size = chunk_size

def _read(self):
for _ in self.response.iter_content(chunk_size=self.chunk_size):
yield _


@Plugin(
label="Simple Databus Loading Plugin",
description="Loads a specfic file from the Databus to a local directory",
documentation="""
This CMEM task loads a file from the defined Databus to a RDF dataset.
""",
label="Download File from a DBpedia Databus",
plugin_id="cmem_plugin_databus-Download",
description="Download a file artifact listed in a Databus Catalog.",
documentation="This workflow task allows for selecting and downloading a file"
" artifact from a DBpedia Databus to a Corporate Memory dataset"
" resource.",
parameters=[
PluginParameter(
name="target_graph",
label="Target Graph",
description="Graph name to save the response from the Databus.",
param_type=DatasetParameterType(dataset_type="eccencaDataPlatform"),
name="databus_base_url",
label="Databus Base URL",
description="The deployment URL of a Databus service,"
" e.g. https://databus.dbpedia.org/",
),
PluginParameter(
name="databus_artifact",
label="Artifact URL",
description="The URL of the Databus artifact. You can search by name.",
param_type=DatabusSearch(),
default_value=""
),
PluginParameter(
name="artifact_format",
label="Format",
description="The format of the Databus artifact. You can select the"
" format, after you have a proper Artifact URL selected.",
param_type=FacetSearch(facet_option="format"),
default_value=""
),
PluginParameter(
name="artifact_version",
label="Version",
description="The version of Databus artifact. You can select the"
" version, after you have a proper Artifact URL selected.",
param_type=FacetSearch(facet_option="version"),
default_value=""
),
PluginParameter(
name="databus_file_id",
label="Databus File ID",
description="The Databus file id of the file to download",
description="The Databus file ID of the file to download.",
param_type=DatabusFile()
),
PluginParameter(
name="target_file",
label="File",
description="File name where you want to save the dowloaded file"
" from the Databus.",
param_type=ResourceParameterType(),
),
PluginParameter(
name="chunk_size",
label="Chunk Size",
description="Chunksize during up/downloading the graph",
description="Chunksize during up/downloading the graph.",
default_value=1048576,
advanced=True,
),
],
)
class SimpleDatabusLoadingPlugin(WorkflowPlugin):
"""Implementation of loading one file from the Databus into a given dataset"""
"""Implementation of downloading one file from the Databus to a dataset resource."""

# pylint: disable=too-many-arguments
def __init__(
self, databus_file_id: str, target_graph: str, chunk_size: int
self,
databus_base_url: str = "https://databus.dbpedia.org/",
databus_artifact: str = "",
artifact_format: str = "",
artifact_version: str = "",
databus_file_id: str = "",
target_file: str = "",
chunk_size: int = 1048576
) -> None:
self.databus_url = databus_base_url
self.databus_file_id = databus_file_id
self.target_graph = target_graph
self.target_file = target_file
self.chunk_size = chunk_size

def __get_graph_uri(self, context: ExecutionContext):
task_info = get_task(project=context.task.project_id(), task=self.target_graph)
return task_info["data"]["parameters"]["graph"]["value"]
# to get rid of unused-argument
_ = databus_artifact
_ = artifact_format
_ = artifact_version

def execute(
self, inputs=(), context: ExecutionContext = ExecutionContext()
) -> None:
setup_cmempy_super_user_access()
setup_cmempy_user_access(context.user)
self.log.info(f"Loading file from {self.databus_file_id}")

data: bytearray = bytearray()
databus_file_resp = requests.get(
self.databus_file_id,
allow_redirects=True,
Expand All @@ -78,28 +323,18 @@ def execute(
)
return

with databus_file_resp as resp:
for _, chunk in enumerate(resp.iter_content(chunk_size=self.chunk_size)):
data += bytearray(chunk)
desc = f"Downloading File {get_clock(_)}"
context.report.update(
ExecutionReport(
entity_count=len(data) // 1000000,
operation="load",
operation_desc=desc,
)
)
graph_uri = self.__get_graph_uri(context)
post_resp = post_streamed_bytes(
str(graph_uri),
byte_iterator_context_update(
bytes(data), context, self.chunk_size, "Uploading File"
),
replace=True,
upload_response = create_resource(
project_name=context.task.project_id(),
resource_name=self.target_file,
file_resource=ResponseStream(databus_file_resp),
replace=True
)
if post_resp.status_code < 400:
if upload_response.status_code < 400:
context.report.update(ExecutionReport(operation_desc="Upload Successful ✓"))
else:
context.report.update(
ExecutionReport(operation_desc="Upload Failed ❌", error=post_resp.text)
ExecutionReport(
operation_desc="Upload Failed ❌",
error=upload_response.text
)
)
Loading