Skip to content

Commit

Permalink
Validate inside GpsProcessing evaluate. Changes in python-driver too. #…
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Jun 25, 2024
1 parent 96546fd commit ee9fc5f
Showing 1 changed file with 29 additions and 2 deletions.
31 changes: 29 additions & 2 deletions openeogeotrellis/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from openeo_driver.dry_run import SourceConstraint
from openeo_driver.errors import (InternalException, JobNotFinishedException, OpenEOApiException,
ServiceUnsupportedException,
ProcessParameterInvalidException, )
ProcessParameterInvalidException, ProcessGraphComplexityException, )
from openeo_driver.jobregistry import (DEPENDENCY_STATUS, JOB_STATUS, ElasticJobRegistry, PARTIAL_JOB_STATUS,
get_ejr_credentials_from_env)
from openeo_driver.ProcessGraphDeserializer import ENV_SAVE_RESULT, ConcreteProcessing
Expand Down Expand Up @@ -1193,10 +1193,36 @@ def request_costs(


class GpsProcessing(ConcreteProcessing):
def evaluate(self, process_graph: dict, env: EvalEnv = None):
if smart_bool((env.get('job_options') or {}).get("extent_size_check", True)):
env_validate = env.push({
"allow_check_missing_products": False,
})
issues = self.validate(process_graph=process_graph, env=env_validate)
# Only care for certain errors and make list of strings:
issues = [e["message"] for e in issues if e["code"] == "ExtentTooLarge"]
if issues:
if env.get("sync_job", False):
raise ProcessGraphComplexityException(
ProcessGraphComplexityException.message + f" Reasons: {' '.join(issues)}"
)
else:
raise ProcessGraphComplexityException(
"Found errors in process graph. Disable this check with 'extent_size_check': " +
" ".join(issues))

return super().evaluate(process_graph, env)

def extra_validation(
self, process_graph: dict, env: EvalEnv, result, source_constraints: List[SourceConstraint]
) -> Iterable[dict]:
try:
return self.extra_validation_impl(env, source_constraints)
except Exception as e:
return [{"code": "Internal", "message": str(e)}]

@staticmethod
def extra_validation_impl(env: EvalEnv, source_constraints: List[SourceConstraint]) -> Iterable[dict]:
catalog = env.backend_implementation.catalog
allow_check_missing_products = smart_bool(env.get("allow_check_missing_products", True))
sync_job = smart_bool(env.get("sync_job", False))
Expand Down Expand Up @@ -1318,7 +1344,6 @@ def extra_validation(
def verify_for_synchronous_processing(self, process_graph: dict, env: EvalEnv = None) -> Iterable[str]:
env_validate = env.push({
"allow_check_missing_products": False,
"sync_job": True,
})
errors = self.validate(process_graph=process_graph, env=env_validate)

Expand Down Expand Up @@ -1753,6 +1778,8 @@ def _start_job(self, job_id: str, user: User, get_vault_token: Callable[[str], s
job_options = job_info.get("job_options") or {} # can be None
job_specification_json = json.dumps({"process_graph": job_process_graph, "job_options": job_options})

# Could also validate here

job_title = job_info.get('title', '')
sentinel_hub_client_alias = deep_get(job_options, 'sentinel-hub', 'client-alias', default="default")

Expand Down

0 comments on commit ee9fc5f

Please sign in to comment.