Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Development #33

Merged
merged 42 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8708142
configure sentry when a process inherits from FinchProcess
davidcaron May 15, 2019
c837a5f
move the bccaqv2_link constant in base.py
davidcaron May 15, 2019
d3f6b88
extract function to create a zipfile from a metalink
davidcaron May 15, 2019
a4a8c05
extract function to get bccaqv2 datasets
davidcaron May 15, 2019
f376f92
extract subset function for gridpoint subsetting and add heatwave pro…
davidcaron May 15, 2019
70ac4f6
extract base subsetting process
davidcaron May 15, 2019
e29ebd8
reuse functions and methods for bccaqv2 heatwave process
davidcaron May 15, 2019
30f6eb3
move bccaqv2_link constant
davidcaron May 15, 2019
e139bfb
fix bccaqv2_link reference
davidcaron May 15, 2019
c55e231
typo
davidcaron May 15, 2019
f8f8683
simpler handler wrapper
davidcaron May 15, 2019
6c7dcb3
add BCCAQV2HeatWave process
davidcaron May 15, 2019
afa8a93
fix handler wrapper
davidcaron May 16, 2019
180a4d5
fix tests
davidcaron May 16, 2019
37fd8b9
if rcp is None, all rcp are returned
davidcaron May 16, 2019
a7c620c
extract compute_indices method
davidcaron May 16, 2019
eb00c08
finish first implementation of heat wave frequency bccaqv2
davidcaron May 16, 2019
dc8081d
add zip function to het_wave_frequency
davidcaron May 16, 2019
52e8bbb
fix for rcp is None
davidcaron May 16, 2019
f2e0dc0
fix handler
davidcaron May 16, 2019
729baf0
add tests for bccaqv2 heat wave frequency gridpoint
davidcaron May 16, 2019
ed46d98
add workdir for generated inputs
davidcaron May 16, 2019
bcb39bd
fix result variable in test
davidcaron May 16, 2019
e5a6e0d
fix test
davidcaron May 16, 2019
adabe37
flake8
davidcaron May 16, 2019
97b33df
output an empty csv file for now, so that the interface can be tested
davidcaron May 16, 2019
7cf9862
temporarily limit the number of datasets to request
davidcaron May 16, 2019
9fec50e
fix index computation
davidcaron May 16, 2019
2497961
change required and default values for bccaqv2 subset process
davidcaron May 16, 2019
2b0b0fe
make variable optional for bccaqv2 datasets query
davidcaron May 16, 2019
80ef095
black formatting
davidcaron May 16, 2019
1b8e5e2
default constant for "ALL"
davidcaron May 16, 2019
350d5a8
use subset_gridpoint function if lat1 and lon1 are omitted
davidcaron May 16, 2019
20447f5
choose output format, with netcdf by default for bccaqv2 download
davidcaron May 16, 2019
935733c
don't repeat output filename generation
davidcaron May 16, 2019
d959394
change output name for bccaqv2 subset
davidcaron May 16, 2019
dc90668
fix pathlib concatenation
davidcaron May 16, 2019
c531d90
input ordering
davidcaron May 16, 2019
991bd6b
required inputs for tests
davidcaron May 16, 2019
fb461fc
better handling of optional inputs
davidcaron May 21, 2019
6707ce0
fix output filename
davidcaron May 21, 2019
5b55a0d
0.2.2
davidcaron May 21, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion finch/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

__author__ = """David Huard"""
__email__ = 'huard.david@ouranos.ca'
__version__ = '0.2.1'
__version__ = '0.2.2'
14 changes: 11 additions & 3 deletions finch/processes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from .wps_xsubsetpoint import SubsetGridPointProcess
from .wps_xsubset_bccaqv2 import SubsetBCCAQV2Process
from .wps_xclim_indices import make_xclim_indicator_process
from .wps_bccaqv2_heatwave import BCCAQV2HeatWave
import xclim
import xclim.atmos

Expand All @@ -24,7 +25,14 @@ def get_indicators(*args):

# Create PyWPS.Process subclasses
processes = [make_xclim_indicator_process(ind) for ind in indicators]
processes.extend([SubsetBboxProcess(), SubsetGridPointProcess(), SubsetBCCAQV2Process()])
processes.extend(
[
SubsetBboxProcess(),
SubsetGridPointProcess(),
SubsetBCCAQV2Process(),
BCCAQV2HeatWave(),
]
)


# Create virtual module for indicators so Sphinx can find it.
Expand All @@ -33,8 +41,8 @@ def _build_xclim():

objs = {p.__class__.__name__: p.__class__ for p in processes}

mod = xclim.build_module('finch.processes.xclim', objs, doc="""XCLIM Processes""")
sys.modules['finch.processes.xclim'] = mod
mod = xclim.build_module("finch.processes.xclim", objs, doc="""XCLIM Processes""")
sys.modules["finch.processes.xclim"] = mod
return mod


Expand Down
44 changes: 43 additions & 1 deletion finch/processes/base.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
import zipfile

from dask.diagnostics import ProgressBar
from dask.diagnostics.progress import format_time
from pywps import Process
from pathlib import Path
from pywps import Process, ComplexInput, LiteralInput
from sentry_sdk import configure_scope
import xarray as xr
import logging
import os
from functools import wraps

from finch.processes.utils import is_opendap_url

LOGGER = logging.getLogger("PYWPS")


class FinchProcess(Process):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# Must be assigned to the instance so that
# it's also copied over when the process is deepcopied
self.old_handler = self.handler
self.handler = self._handler_wrapped

def _handler_wrapped(self, request, response):
self.sentry_configure_scope(request)
return self.old_handler(request, response)

def get_input_or_none(self, inputs, identifier):
try:
return inputs[identifier][0].data
except KeyError:
return None

def try_opendap(self, input, chunks=None):
"""Try to open the file as an OPeNDAP url and chunk it. If OPeNDAP fails, access the file directly. In both
cases, return an xarray.Dataset.
Expand All @@ -30,6 +52,18 @@ def try_opendap(self, input, chunks=None):

return ds

def compute_indices(self, func, inputs):
kwds = {}
for name, input_queue in inputs.items():
input = input_queue[0]
if isinstance(input, ComplexInput):
ds = self.try_opendap(input)
kwds[name] = ds.data_vars[name]
elif isinstance(input, LiteralInput):
kwds[name] = input.data

return func(**kwds)

def log_file_path(self):
return os.path.join(self.workdir, "log.txt")

Expand All @@ -53,6 +87,14 @@ def sentry_configure_scope(self, request):
scope.set_extra("remote_addr", request.http_request.remote_addr)
scope.set_extra("xml_request", request.http_request.data)

def zip_files(self, output_filename, files, response, start_percentage=90):
with zipfile.ZipFile(output_filename, mode="w") as z:
n_files = len(files)
for n, filename in enumerate(files):
percentage = start_percentage + int(n / n_files * (100 - start_percentage))
self.write_log(f"Zipping file {n + 1} of {n_files}", response, percentage)
z.write(filename, arcname=Path(filename).name)


def chunk_dataset(ds, max_size=1000000):
"""Ensures the chunked size of a xarray.Dataset is below a certain size
Expand Down
43 changes: 43 additions & 0 deletions finch/processes/subset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging
from pathlib import Path

from pywps import FORMATS
from pywps.inout.outputs import MetaLink4, MetaFile

from finch.processes.base import FinchProcess

LOGGER = logging.getLogger("PYWPS")


class SubsetProcess(FinchProcess):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def subset_resources(self, resources, subset_function) -> MetaLink4:
metalink = MetaLink4(
identity="subset_bbox",
description="Subsetted netCDF files",
publisher="Finch",
workdir=self.workdir,
)
for n, resource in enumerate(resources):
ds = self.try_opendap(resource)
out = subset_function(ds)

if not all(out.dims.values()):
LOGGER.warning(f"Subset is empty for dataset: {resource.url}")
continue

p = Path(resource._file or resource._build_file_name(resource.url))
out_fn = Path(self.workdir) / (p.stem + "_sub" + p.suffix)

out.to_netcdf(out_fn)

mf = MetaFile(
identity=p.stem,
fmt=FORMATS.NETCDF,
)
mf.file = out_fn
metalink.append(mf)

return metalink
75 changes: 58 additions & 17 deletions finch/processes/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import re
from copy import deepcopy

from typing import List
from enum import Enum

import requests
from pywps import ComplexInput, FORMATS
from siphon.catalog import TDSCatalog

bccaqv2_link = (
"https://boreas.ouranos.ca/thredds/catalog/birdhouse/pcic/BCCAQv2/catalog.xml"
)
BCCAQV2_LIMIT = 3 # Todo: remove-me. Temporary limit the number of datasets to request


def is_opendap_url(url):
if url and not url.startswith("file"):
Expand All @@ -26,8 +33,8 @@ class ParsingMethod(Enum):
xarray = 3


def get_bcca2v2_opendap_datasets(
catalog_url, variable, rcp, method: ParsingMethod = ParsingMethod.filename
def get_bccaqv2_opendap_datasets(
catalog_url, variable=None, rcp=None, method: ParsingMethod = ParsingMethod.filename
) -> List[str]:
"""Get a list of urls corresponding to variable and rcp on a Thredds server.

Expand All @@ -42,30 +49,64 @@ def get_bcca2v2_opendap_datasets(
for dataset in catalog.datasets.values():
opendap_url = dataset.access_urls["OPENDAP"]

variable_ok = variable is None
rcp_ok = rcp is None

if method == ParsingMethod.filename:
if rcp in dataset.name and dataset.name.startswith(variable):
urls.append(opendap_url)
variable_ok = variable_ok or dataset.name.startswith(variable)
rcp_ok = rcp_ok or rcp in dataset.name

elif method == ParsingMethod.opendap_das:
re_experiment = re.compile(r'String driving_experiment_id "(.+)"')
lines = requests.get(opendap_url + ".das").content.decode().split("\n")

has_variable = any(line.startswith(f" {variable} {{") for line in lines)
is_good_rcp = False
for line in lines:
match = re_experiment.search(line)
if match and rcp in match.group(1).split(","):
is_good_rcp = True

if has_variable and is_good_rcp:
urls.append(opendap_url)
variable_ok = variable_ok or any(
line.startswith(f" {variable} {{") for line in lines
)
if not rcp_ok:
for line in lines:
match = re_experiment.search(line)
if match and rcp in match.group(1).split(","):
rcp_ok = True

elif method == ParsingMethod.xarray:
import xarray as xr

ds = xr.open_dataset(opendap_url, decode_times=False)
rcps = [r for r in ds.attrs.get('driving_experiment_id', '').split(',') if 'rcp' in r]
if rcp in rcps and variable in ds.data_vars:
urls.append(opendap_url)

rcps = [
r
for r in ds.attrs.get("driving_experiment_id", "").split(",")
if "rcp" in r
]
variable_ok = variable_ok or variable in ds.data_vars
rcp_ok = rcp_ok or rcp in rcps

if variable_ok and rcp_ok:
urls.append(opendap_url)
urls = urls[:BCCAQV2_LIMIT] # Todo: remove-me
return urls


def get_bccaqv2_inputs(wps_inputs, variable=None, rcp=None, catalog_url=bccaqv2_link):
"""Adds a 'resource' input list with bccaqv2 urls to WPS inputs."""
new_inputs = deepcopy(wps_inputs)
workdir = next(iter(wps_inputs.values()))[0].workdir

new_inputs["resource"] = []
for url in get_bccaqv2_opendap_datasets(catalog_url, variable, rcp):
resource = _make_bccaqv2_resource_input(url)
resource.workdir = workdir
new_inputs["resource"].append(resource)

return new_inputs


def _make_bccaqv2_resource_input(url):
input = ComplexInput(
"resource",
"NetCDF resource",
max_occurs=1000,
supported_formats=[FORMATS.NETCDF, FORMATS.DODS],
)
input.url = url
return input
Loading