Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New manifest structure and added support for BQ manifest. #295

Merged
merged 27 commits into from Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4ce2e63
New manifest structure and added support for BQ manifest.
mahrsee1997 Feb 9, 2023
987ba94
Merge branch 'main' into dl-bq-manifest
mahrsee1997 Feb 9, 2023
aa69563
simplified Enum class names & sytle nit: fluent interface
mahrsee1997 Feb 10, 2023
b2e4e1f
Added description for BQ Schema fields.
mahrsee1997 Feb 10, 2023
3509000
Implemented strategy pattern for getting the file size & remove lock …
mahrsee1997 Feb 10, 2023
b132048
Changes as follows:
mahrsee1997 Feb 13, 2023
fbfdb4a
minor fix in from_dict() class method w.r.t to Stage.
mahrsee1997 Feb 13, 2023
ed0a15a
Added config file name in manifest.
mahrsee1997 Feb 13, 2023
dc364fb
Added validation for non existed file in LocalSystemFileSizeStrategy(…
mahrsee1997 Feb 13, 2023
dc8c30f
Implemented set_stage() method in manifest system.
mahrsee1997 Feb 13, 2023
245e1aa
Attempt to reduce the impact of _read() and _update() calls on the st…
mahrsee1997 Feb 13, 2023
f50b37a
Avoid making an update call in manifest.transact() call.
mahrsee1997 Feb 14, 2023
e1dbd67
Attempt to fix GCS Manifest.
mahrsee1997 Feb 14, 2023
0379ad4
Changes as follows:
mahrsee1997 Feb 15, 2023
b4d62fc
Implementation changes w.r.t. GCS manifest.
mahrsee1997 Feb 15, 2023
0312a03
Made changes w.r.t to getting file size for manifest.
mahrsee1997 Feb 15, 2023
6c945b8
Fix GCS Manifest lock.
mahrsee1997 Feb 15, 2023
82bfdbc
Protection against SQL injection.
mahrsee1997 Feb 16, 2023
15e9b38
Include area in manifest schema.
mahrsee1997 Feb 21, 2023
bae619c
Moved down the BQ manifest note in README.md
mahrsee1997 Feb 21, 2023
5eedf12
Merge branch 'main' into dl-bq-manifest
mahrsee1997 Feb 21, 2023
2230050
Changed the type of 'area' field in BQ schema.
mahrsee1997 Feb 21, 2023
280fe1f
Remove support for GCS manifest.
mahrsee1997 Feb 21, 2023
b2db4cc
Incorporate new manifest system in ECMWFPublicClient.
mahrsee1997 Feb 21, 2023
a430b90
Sanitize BQ manifest _read() inputs while placing select query.
mahrsee1997 Feb 22, 2023
c6461e5
Bumped weather-dl version to 0.1.13.
mahrsee1997 Feb 22, 2023
7ebeea1
Updated unit tests in util_test.py(weather-dl).
mahrsee1997 Feb 22, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions weather_dl/README.md
Expand Up @@ -44,8 +44,11 @@ _Common options_:
and `--direct_num_workers`
* `-m, --manifest-location MANIFEST_LOCATION`: Location of the manifest. By default, it will use Cloud Logging
(stdout for direct runner). You can set the name of the manifest as the hostname of a URL with the 'cli' protocol.
For example, 'cli://manifest' will prefix all the manifest logs as '[manifest]'. In addition, users can specify a GCS
bucket URI, or 'noop://<name>' for an in-memory location.
For example, `cli://manifest` will prefix all the manifest logs as '[manifest]'. In addition, users can specify
either a BigQuery table (`bq://<project-id>.<dataset-name>.<table-name>`) or GCS bucket URI, or `noop://<name>`
mahrsee1997 marked this conversation as resolved.
Show resolved Hide resolved
for an in-memory location.
> Note: Tool will create the BQ table itself, if not already present.
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
Or it will use the existing table but can report errors in case of schema mismatch.
* `-n, --num-requests-per-key`: Number of concurrent requests to make per API key. Default: make an educated guess per
client & config. Please see the client documentation for more details.
* `-p, --partition-chunks`: Group shards into chunks of this size when computing the partitions. Specifically, this
Expand Down
40 changes: 35 additions & 5 deletions weather_dl/download_pipeline/clients.py
Expand Up @@ -16,6 +16,7 @@
import abc
import collections
import contextlib
import datetime
import io
import json
import logging
Expand All @@ -30,6 +31,7 @@
from ecmwfapi import ECMWFService, api

from .config import Config, optimize_selection_partition
from .manifest import Manifest, Stage
from .util import retry_with_exponential_backoff

warnings.simplefilter(
Expand All @@ -54,7 +56,7 @@ def __init__(self, config: Config, level: int = logging.INFO) -> None:
self.logger.setLevel(level)

@abc.abstractmethod
def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None:
def retrieve(self, dataset: str, selection: t.Dict, output: str, manifest: Manifest) -> None:
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
"""Download from data source."""
pass

Expand Down Expand Up @@ -104,9 +106,16 @@ def __init__(self, config: Config, level: int = logging.INFO) -> None:
error_callback=self.logger.error,
)

def retrieve(self, dataset: str, selection: t.Dict, target: str) -> None:
def retrieve(self, dataset: str, selection: t.Dict, output: str, manifest: Manifest) -> None:
selection_ = optimize_selection_partition(selection)
self.c.retrieve(dataset, selection_, target)
manifest.set_stage(Stage.RETRIEVE)
precise_retrieve_start_time = (
datetime.datetime.utcnow()
.replace(tzinfo=datetime.timezone.utc)
.isoformat(timespec='seconds')
)
manifest.prev_stage_precise_start_time = precise_retrieve_start_time
self.c.retrieve(dataset, selection_, output)

@property
def license_url(self):
Expand Down Expand Up @@ -256,7 +265,7 @@ class MarsClient(Client):
def __init__(self, config: Config, level: int = logging.INFO) -> None:
super().__init__(config, level)

def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None:
def retrieve(self, dataset: str, selection: t.Dict, output: str, manifest: Manifest) -> None:
self.c = MARSECMWFServiceExtended(
"mars",
key=self.config.kwargs.get('api_key', os.environ.get("MARSAPI_KEY")),
Expand All @@ -267,7 +276,21 @@ def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None:
)
selection_ = optimize_selection_partition(selection)
with StdoutLogger(self.logger, level=logging.DEBUG):
manifest.set_stage(Stage.FETCH)
precise_fetch_start_time = (
datetime.datetime.utcnow()
.replace(tzinfo=datetime.timezone.utc)
.isoformat(timespec='seconds')
)
manifest.prev_stage_precise_start_time = precise_fetch_start_time
result = self.c.fetch(req=selection_)
manifest.set_stage(Stage.DOWNLOAD)
precise_download_start_time = (
datetime.datetime.utcnow()
.replace(tzinfo=datetime.timezone.utc)
.isoformat(timespec='seconds')
)
manifest.prev_stage_precise_start_time = precise_download_start_time
mahrsee1997 marked this conversation as resolved.
Show resolved Hide resolved
self.c.download(result, target=output)

@property
Expand All @@ -293,7 +316,14 @@ def num_requests_per_key(cls, dataset: str) -> int:
class FakeClient(Client):
"""A client that writes the selection arguments to the output file."""

def retrieve(self, dataset: str, selection: t.Dict, output: str) -> None:
def retrieve(self, dataset: str, selection: t.Dict, output: str, manifest: Manifest) -> None:
manifest.set_stage(Stage.RETRIEVE)
precise_retrieve_start_time = (
datetime.datetime.utcnow()
.replace(tzinfo=datetime.timezone.utc)
.isoformat(timespec='seconds')
)
manifest.prev_stage_precise_start_time = precise_retrieve_start_time
self.logger.debug(f'Downloading {dataset} to {output}')
with open(output, 'w') as f:
json.dump({dataset: selection}, f)
Expand Down
3 changes: 3 additions & 0 deletions weather_dl/download_pipeline/config.py
Expand Up @@ -24,6 +24,8 @@ class Config:
"""Contains pipeline parameters.

Attributes:
config_name:
Name of the config file.
client:
Name of the Weather-API-client. Supported clients are mentioned in the 'CLIENTS' variable.
dataset (optional):
Expand All @@ -48,6 +50,7 @@ class Config:
Contains parameters used to select desired data.
"""

config_name: str = ""
client: str = ""
dataset: t.Optional[str] = ""
target_path: str = ""
Expand Down
18 changes: 13 additions & 5 deletions weather_dl/download_pipeline/fetcher.py
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import dataclasses
import datetime
import logging
import tempfile
import typing as t
Expand All @@ -20,7 +21,7 @@

from .clients import CLIENTS, Client
from .config import Config
from .manifest import Manifest, NoOpManifest, Location
from .manifest import Manifest, NoOpManifest, Location, Stage
from .parsers import prepare_target_name
from .partition import skip_partition
from .stores import Store, FSStore
Expand Down Expand Up @@ -55,7 +56,7 @@ def __post_init__(self):
@retry_with_exponential_backoff
def retrieve(self, client: Client, dataset: str, selection: t.Dict, dest: str) -> None:
"""Retrieve from download client, with retries."""
client.retrieve(dataset, selection, dest)
client.retrieve(dataset, selection, dest, self.manifest)

def fetch_data(self, config: Config, *, worker_name: str = 'default') -> None:
"""Download data from a client to a temp file, then upload to Cloud Storage."""
Expand All @@ -68,11 +69,18 @@ def fetch_data(self, config: Config, *, worker_name: str = 'default') -> None:
client = CLIENTS[self.client_name](config)
target = prepare_target_name(config)

with self.manifest.transact(config.selection, target, config.user_id):
with tempfile.NamedTemporaryFile() as temp:
logger.info(f'[{worker_name}] Fetching data for {target!r}.')
with tempfile.NamedTemporaryFile() as temp:
logger.info(f'[{worker_name}] Fetching data for {target!r}.')
with self.manifest.transact(config.config_name, config.selection, target, config.user_id):
self.retrieve(client, config.dataset, config.selection, temp.name)

self.manifest.set_stage(Stage.UPLOAD)
precise_upload_start_time = (
datetime.datetime.utcnow()
.replace(tzinfo=datetime.timezone.utc)
.isoformat(timespec='seconds')
)
self.manifest.prev_stage_precise_start_time = precise_upload_start_time
alxmrs marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f'[{worker_name}] Uploading to store for {target!r}.')

# In dry-run mode we actually aren't required to upload a file.
Expand Down
19 changes: 12 additions & 7 deletions weather_dl/download_pipeline/fetcher_test.py
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import io
import json
import os
import tempfile
import unittest
Expand Down Expand Up @@ -79,12 +80,13 @@ def test_fetch_data__manifest__returns_success(self, mock_retrieve):
fetcher.fetch_data(config)

self.assertDictContainsSubset(dict(
selection=config.selection,
selection=json.dumps(config.selection),
location=os.path.join(tmpdir, 'download-01-12.nc'),
stage='upload',
status='success',
error=None,
error=json.dumps(None),
user='unknown',
), list(self.dummy_manifest.records.values())[0]._asdict())
), list(self.dummy_manifest.records.values())[0])

@patch('cdsapi.Client.retrieve')
def test_fetch_data__manifest__records_retrieve_failure(self, mock_retrieve):
Expand All @@ -111,11 +113,12 @@ def test_fetch_data__manifest__records_retrieve_failure(self, mock_retrieve):
fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore())
fetcher.fetch_data(config)

actual = list(self.dummy_manifest.records.values())[0]._asdict()
actual = list(self.dummy_manifest.records.values())[0]

self.assertDictContainsSubset(dict(
selection=config.selection,
selection=json.dumps(config.selection),
location=os.path.join(tmpdir, 'download-01-12.nc'),
stage='retrieve',
status='failure',
user='unknown',
), actual)
Expand Down Expand Up @@ -148,10 +151,12 @@ def test_fetch_data__manifest__records_gcs_failure(self, mock_retrieve):
fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore())
fetcher.fetch_data(config)

actual = list(self.dummy_manifest.records.values())[0]._asdict()
actual = list(self.dummy_manifest.records.values())[0]

self.assertDictContainsSubset(dict(
selection=config.selection,
selection=json.dumps(config.selection),
location=os.path.join(tmpdir, 'download-01-12.nc'),
stage='retrieve',
status='failure',
user='unknown',
), actual)
Expand Down