Skip to content

Commit

Permalink
Merge pull request #33 from bird-house/development
Browse files Browse the repository at this point in the history
[WIP] Development
  • Loading branch information
David Caron committed May 21, 2019
2 parents c6ad26c + 5b55a0d commit 1bdb5bb
Show file tree
Hide file tree
Showing 14 changed files with 550 additions and 217 deletions.
2 changes: 1 addition & 1 deletion finch/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

__author__ = """David Huard"""
__email__ = 'huard.david@ouranos.ca'
__version__ = '0.2.1'
__version__ = '0.2.2'
14 changes: 11 additions & 3 deletions finch/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .wps_xsubsetpoint import SubsetGridPointProcess
from .wps_xsubset_bccaqv2 import SubsetBCCAQV2Process
from .wps_xclim_indices import make_xclim_indicator_process
from .wps_bccaqv2_heatwave import BCCAQV2HeatWave
import xclim
import xclim.atmos

Expand All @@ -24,7 +25,14 @@ def get_indicators(*args):

# Create PyWPS.Process subclasses
processes = [make_xclim_indicator_process(ind) for ind in indicators]
processes.extend([SubsetBboxProcess(), SubsetGridPointProcess(), SubsetBCCAQV2Process()])
processes.extend(
[
SubsetBboxProcess(),
SubsetGridPointProcess(),
SubsetBCCAQV2Process(),
BCCAQV2HeatWave(),
]
)


# Create virtual module for indicators so Sphinx can find it.
Expand All @@ -33,8 +41,8 @@ def _build_xclim():

objs = {p.__class__.__name__: p.__class__ for p in processes}

mod = xclim.build_module('finch.processes.xclim', objs, doc="""XCLIM Processes""")
sys.modules['finch.processes.xclim'] = mod
mod = xclim.build_module("finch.processes.xclim", objs, doc="""XCLIM Processes""")
sys.modules["finch.processes.xclim"] = mod
return mod


Expand Down
44 changes: 43 additions & 1 deletion finch/processes/base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
import zipfile

from dask.diagnostics import ProgressBar
from dask.diagnostics.progress import format_time
from pywps import Process
from pathlib import Path
from pywps import Process, ComplexInput, LiteralInput
from sentry_sdk import configure_scope
import xarray as xr
import logging
import os
from functools import wraps

from finch.processes.utils import is_opendap_url

LOGGER = logging.getLogger("PYWPS")


class FinchProcess(Process):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Must be assigned to the instance so that
# it's also copied over when the process is deepcopied
self.old_handler = self.handler
self.handler = self._handler_wrapped

def _handler_wrapped(self, request, response):
self.sentry_configure_scope(request)
return self.old_handler(request, response)

def get_input_or_none(self, inputs, identifier):
try:
return inputs[identifier][0].data
except KeyError:
return None

def try_opendap(self, input, chunks=None):
"""Try to open the file as an OPeNDAP url and chunk it. If OPeNDAP fails, access the file directly. In both
cases, return an xarray.Dataset.
Expand All @@ -30,6 +52,18 @@ def try_opendap(self, input, chunks=None):

return ds

def compute_indices(self, func, inputs):
kwds = {}
for name, input_queue in inputs.items():
input = input_queue[0]
if isinstance(input, ComplexInput):
ds = self.try_opendap(input)
kwds[name] = ds.data_vars[name]
elif isinstance(input, LiteralInput):
kwds[name] = input.data

return func(**kwds)

def log_file_path(self):
return os.path.join(self.workdir, "log.txt")

Expand All @@ -53,6 +87,14 @@ def sentry_configure_scope(self, request):
scope.set_extra("remote_addr", request.http_request.remote_addr)
scope.set_extra("xml_request", request.http_request.data)

def zip_files(self, output_filename, files, response, start_percentage=90):
with zipfile.ZipFile(output_filename, mode="w") as z:
n_files = len(files)
for n, filename in enumerate(files):
percentage = start_percentage + int(n / n_files * (100 - start_percentage))
self.write_log(f"Zipping file {n + 1} of {n_files}", response, percentage)
z.write(filename, arcname=Path(filename).name)


def chunk_dataset(ds, max_size=1000000):
"""Ensures the chunked size of a xarray.Dataset is below a certain size
Expand Down
43 changes: 43 additions & 0 deletions finch/processes/subset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from pathlib import Path

from pywps import FORMATS
from pywps.inout.outputs import MetaLink4, MetaFile

from finch.processes.base import FinchProcess

LOGGER = logging.getLogger("PYWPS")


class SubsetProcess(FinchProcess):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def subset_resources(self, resources, subset_function) -> MetaLink4:
metalink = MetaLink4(
identity="subset_bbox",
description="Subsetted netCDF files",
publisher="Finch",
workdir=self.workdir,
)
for n, resource in enumerate(resources):
ds = self.try_opendap(resource)
out = subset_function(ds)

if not all(out.dims.values()):
LOGGER.warning(f"Subset is empty for dataset: {resource.url}")
continue

p = Path(resource._file or resource._build_file_name(resource.url))
out_fn = Path(self.workdir) / (p.stem + "_sub" + p.suffix)

out.to_netcdf(out_fn)

mf = MetaFile(
identity=p.stem,
fmt=FORMATS.NETCDF,
)
mf.file = out_fn
metalink.append(mf)

return metalink
75 changes: 58 additions & 17 deletions finch/processes/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import re
from copy import deepcopy

from typing import List
from enum import Enum

import requests
from pywps import ComplexInput, FORMATS
from siphon.catalog import TDSCatalog

bccaqv2_link = (
"https://boreas.ouranos.ca/thredds/catalog/birdhouse/pcic/BCCAQv2/catalog.xml"
)
BCCAQV2_LIMIT = 3 # Todo: remove-me. Temporary limit the number of datasets to request


def is_opendap_url(url):
if url and not url.startswith("file"):
Expand All @@ -26,8 +33,8 @@ class ParsingMethod(Enum):
xarray = 3


def get_bcca2v2_opendap_datasets(
catalog_url, variable, rcp, method: ParsingMethod = ParsingMethod.filename
def get_bccaqv2_opendap_datasets(
catalog_url, variable=None, rcp=None, method: ParsingMethod = ParsingMethod.filename
) -> List[str]:
"""Get a list of urls corresponding to variable and rcp on a Thredds server.
Expand All @@ -42,30 +49,64 @@ def get_bcca2v2_opendap_datasets(
for dataset in catalog.datasets.values():
opendap_url = dataset.access_urls["OPENDAP"]

variable_ok = variable is None
rcp_ok = rcp is None

if method == ParsingMethod.filename:
if rcp in dataset.name and dataset.name.startswith(variable):
urls.append(opendap_url)
variable_ok = variable_ok or dataset.name.startswith(variable)
rcp_ok = rcp_ok or rcp in dataset.name

elif method == ParsingMethod.opendap_das:
re_experiment = re.compile(r'String driving_experiment_id "(.+)"')
lines = requests.get(opendap_url + ".das").content.decode().split("\n")

has_variable = any(line.startswith(f" {variable} {{") for line in lines)
is_good_rcp = False
for line in lines:
match = re_experiment.search(line)
if match and rcp in match.group(1).split(","):
is_good_rcp = True

if has_variable and is_good_rcp:
urls.append(opendap_url)
variable_ok = variable_ok or any(
line.startswith(f" {variable} {{") for line in lines
)
if not rcp_ok:
for line in lines:
match = re_experiment.search(line)
if match and rcp in match.group(1).split(","):
rcp_ok = True

elif method == ParsingMethod.xarray:
import xarray as xr

ds = xr.open_dataset(opendap_url, decode_times=False)
rcps = [r for r in ds.attrs.get('driving_experiment_id', '').split(',') if 'rcp' in r]
if rcp in rcps and variable in ds.data_vars:
urls.append(opendap_url)

rcps = [
r
for r in ds.attrs.get("driving_experiment_id", "").split(",")
if "rcp" in r
]
variable_ok = variable_ok or variable in ds.data_vars
rcp_ok = rcp_ok or rcp in rcps

if variable_ok and rcp_ok:
urls.append(opendap_url)
urls = urls[:BCCAQV2_LIMIT] # Todo: remove-me
return urls


def get_bccaqv2_inputs(wps_inputs, variable=None, rcp=None, catalog_url=bccaqv2_link):
"""Adds a 'resource' input list with bccaqv2 urls to WPS inputs."""
new_inputs = deepcopy(wps_inputs)
workdir = next(iter(wps_inputs.values()))[0].workdir

new_inputs["resource"] = []
for url in get_bccaqv2_opendap_datasets(catalog_url, variable, rcp):
resource = _make_bccaqv2_resource_input(url)
resource.workdir = workdir
new_inputs["resource"].append(resource)

return new_inputs


def _make_bccaqv2_resource_input(url):
input = ComplexInput(
"resource",
"NetCDF resource",
max_occurs=1000,
supported_formats=[FORMATS.NETCDF, FORMATS.DODS],
)
input.url = url
return input
Loading

0 comments on commit 1bdb5bb

Please sign in to comment.