Skip to content

Commit

Permalink
[Fixes #7] Improve test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
mattiagiupponi committed May 31, 2022
1 parent 1290ebb commit 3ad2372
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 104 deletions.
3 changes: 0 additions & 3 deletions importer/admin.py

This file was deleted.

9 changes: 5 additions & 4 deletions importer/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@ def run_setup_hooks(*args, **kwargs):
from django.conf.urls import include, url
from geonode.urls import urlpatterns
from geonode.settings import CELERY_TASK_QUEUES, GEONODE_EXCHANGE, Queue

url_already_injected = any(
[
'importer.urls' in x.urlconf_name.__name__
"importer.urls" in x.urlconf_name.__name__
for x in urlpatterns
if hasattr(x, 'urlconf_name') and not isinstance(x.urlconf_name, list)
if hasattr(x, "urlconf_name") and not isinstance(x.urlconf_name, list)
]
)

if not url_already_injected:
urlpatterns.insert(
0, url(r"^api/v2/", include("importer.api.urls")),
0,
url(r"^api/v2/", include("importer.api.urls")),
)
2 changes: 1 addition & 1 deletion importer/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from celery import Celery

app = Celery('importer')
app = Celery("importer")
6 changes: 3 additions & 3 deletions importer/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def __init__(self, files: list, resource_type: str) -> None:
self.handler = SUPPORTED_TYPES.get(resource_type)

def is_valid(self):
'''
"""
Perform basic validation steps
'''
return self.handler.is_valid(self.files)
"""
return self.handler.is_valid(self.files)
9 changes: 5 additions & 4 deletions importer/handlers.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from abc import ABC
import os


class AbstractHandler(ABC):
TASKS_LIST = []

def step_list(self):
return self.TASKS_LIST

def is_valid(self):
'''
"""
Define basic validation steps
'''
"""
return NotImplementedError


Expand All @@ -23,4 +24,4 @@ class GPKGFileHandler(AbstractHandler):
)

def is_valid(self, files):
return all([os.path.exists(x) for x in files])
return all([os.path.exists(x) for x in files])
3 changes: 0 additions & 3 deletions importer/models.py

This file was deleted.

71 changes: 52 additions & 19 deletions importer/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,38 @@
from importer.api.exception import ImportException
from importer.handlers import GPKGFileHandler
from importer.celery_app import app
from geonode.upload.models import Upload
from geonode.base.enumerations import STATE_RUNNING

logger = logging.getLogger(__name__)


SUPPORTED_TYPES = {
"gpkg": GPKGFileHandler()
#"vector": VectorFileHandler
# "vector": VectorFileHandler
}

class ImportOrchestrator:

class ImportOrchestrator:
def __init__(self, enable_legacy_upload_status=True) -> None:
self.enable_legacy_upload_status = enable_legacy_upload_status

@property
def supported_type(self):
'''
"""
Returns the supported types for the import
'''
"""
return SUPPORTED_TYPES.keys()

def get_file_handler(self, file_type):
'''
"""
Returns the supported types for the import
'''
"""
_type = SUPPORTED_TYPES.get(file_type)
if not _type:
raise ImportException(detail=f"The requested filetype is not supported: {file_type}")
raise ImportException(
detail=f"The requested filetype is not supported: {file_type}"
)
return _type

def get_execution_object(self, exec_id):
Expand All @@ -48,15 +52,20 @@ def perform_next_import_step(self, resource_type: str, execution_id: str) -> Non
tasks = self.get_file_handler(resource_type).TASKS_LIST
# getting the index
try:
_index = tasks.index(_exec.step)+1
_index = tasks.index(_exec.step) + 1
# finding in the task_list the last step done
remaining_tasks = tasks[_index:] if not _index >= len(tasks) else []
if not remaining_tasks:
return
# getting the next step to perform
next_step = next(iter(remaining_tasks))
# calling the next step for the resource
app.tasks.get(next_step).apply_async((resource_type, str(execution_id),))
app.tasks.get(next_step).apply_async(
(
resource_type,
str(execution_id),
)
)

except StopIteration:
# means that the expected list of steps is completed
Expand All @@ -67,23 +76,47 @@ def perform_next_import_step(self, resource_type: str, execution_id: str) -> Non
execution_id=str(execution_id),
status=ExecutionRequest.STATUS_FAILED,
finished=datetime.utcnow(),
last_updated=datetime.utcnow()
)
last_updated=datetime.utcnow(),
)
raise ImportException(detail=e.args[0])


def create_execution_request(self, user: get_user_model, func_name: str, step: str, input_params: dict, resource=None) -> str:
'''
def create_execution_request(
self,
user: get_user_model,
func_name: str,
step: str,
input_params: dict,
resource=None,
) -> str:
"""
Create an execution request for the user. Return the UUID of the request
'''
"""
execution = ExecutionRequest.objects.create(
user=user,
geonode_resource=resource,
func_name=func_name,
step=step,
input_params=input_params
input_params=input_params,
)
if self.enable_legacy_upload_status:
Upload.objects.create(
state=STATE_RUNNING,
metadata={
**{
"func_name": func_name,
"step": step,
"exec_id": str(execution.exec_id),
},
**input_params,
},
)
return execution.exec_id

def update_execution_request_status(self, execution_id, **kwargs):
ExecutionRequest.objects.filter(exec_id=execution_id).update(**kwargs)

def update_execution_request_status(self, execution_id, status, **kwargs):
ExecutionRequest.objects.filter(exec_id=execution_id).update(
status=status, **kwargs
)
if self.enable_legacy_upload_status:
Upload.objects.filter(metadata__contains=execution_id).update(
state=status, metadata={**kwargs, **{"exec_id": execution_id}}
)
Loading

0 comments on commit 3ad2372

Please sign in to comment.