diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9fdfe45..67dde7f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,10 +10,6 @@ on: tags: ['v[0-9]*', '[0-9]+.[0-9]+*'] # Match tags that resemble a version pull_request: # Run in every PR workflow_dispatch: # Allow manually triggering the workflow - schedule: - # Run roughly every 15 days at 00:00 UTC - # (useful to check if updates on dependencies break the package) - - cron: '0 0 1,16 * *' concurrency: group: >- @@ -30,7 +26,7 @@ jobs: - uses: actions/checkout@v3 with: {fetch-depth: 0} # deep clone for setuptools-scm - uses: actions/setup-python@v4 - with: {python-version: "3.9"} + with: {python-version: "3.10"} - name: Build package distribution files run: pipx run --spec tox==3.27.1 tox -e clean,build - name: Record the path of wheel distribution @@ -50,9 +46,7 @@ jobs: strategy: matrix: python: - #- "3.7" # oldest Python supported by PSF - - "3.9" - #- "3.10" + - "3.10" platform: - ubuntu-latest - macos-latest diff --git a/.github/workflows/run_tests.yaml b/.github/workflows/run_tests.yaml index 2ecf948..5ad12df 100644 --- a/.github/workflows/run_tests.yaml +++ b/.github/workflows/run_tests.yaml @@ -7,7 +7,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - python-version: ['3.9'] + python-version: ['3.9', '3.10'] steps: - uses: actions/checkout@v2 diff --git a/README.md b/README.md index 56652f3..b9e2a76 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,14 @@ tox -e publish # Publish the package you have been developing to a package ind tox -av # List all available tasks ``` +To create a pinned `requirements.txt` set of dependencies, [pip-tools](https://github.com/jazzband/pip-tools) is used: + +```commandline +pip-compile --extra transfer --resolver=backtracking` +``` + + + ## Notes diff --git a/docs/conf.py b/docs/conf.py index 74ac9a9..0bfcbc9 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -298,4 +298,4 @@ "pyscaffold": ("https://pyscaffold.org/en/stable", None), } -print(f"loading configurations for {project} {version} ...", file=sys.stderr) \ No newline at end of file +print(f"loading configurations for {project} {version} ...", file=sys.stderr) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..5e7990d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,175 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --extra=transfer +# +aiobotocore[boto3]==2.5.0 + # via + # s3fs + # vptstools (setup.py) +aiohttp==3.8.4 + # via + # aiobotocore + # s3fs +aioitertools==0.11.0 + # via aiobotocore +aiosignal==1.3.1 + # via aiohttp +async-timeout==4.0.2 + # via aiohttp +attrs==23.1.0 + # via + # aiohttp + # frictionless + # jsonschema +bcrypt==4.0.1 + # via paramiko +boto3==1.26.76 + # via aiobotocore +botocore==1.29.76 + # via + # aiobotocore + # boto3 + # s3transfer +certifi==2023.5.7 + # via requests +cffi==1.15.1 + # via + # cryptography + # pynacl +chardet==5.1.0 + # via frictionless +charset-normalizer==3.1.0 + # via + # aiohttp + # requests +click==8.1.3 + # via + # typer + # vptstools (setup.py) +colorama==0.4.6 + # via typer +cryptography==40.0.2 + # via paramiko +decorator==5.1.1 + # via validators +frictionless==5.13.1 + # via vptstools (setup.py) +frozenlist==1.3.3 + # via + # aiohttp + # aiosignal +fsspec==2023.5.0 + # via + # s3fs + # vptstools (setup.py) +h5py==3.8.0 + # via vptstools (setup.py) +humanize==4.6.0 + # via frictionless +idna==3.4 + # via + # requests + # yarl +isodate==0.6.1 + # via frictionless +jinja2==3.1.2 + # via frictionless +jmespath==1.0.1 + # via + # boto3 + # botocore +jsonschema==4.17.3 + # via frictionless +markdown-it-py==2.2.0 + # via rich +marko==1.3.0 + # via frictionless +markupsafe==2.1.2 + # via jinja2 +mdurl==0.1.2 + # via markdown-it-py +multidict==6.0.4 + # via + # aiohttp + # yarl +numpy==1.24.3 + # via + # h5py + # pandas + # pyarrow +pandas==2.0.1 + # via vptstools (setup.py) +paramiko==3.1.0 + # via vptstools (setup.py) +petl==1.7.12 + # via frictionless +pyarrow==12.0.0 + # via vptstools (setup.py) +pycparser==2.21 + # via cffi +pydantic==1.10.7 + # via frictionless +pygments==2.15.1 + # via rich +pynacl==1.5.0 + # via paramiko +pyrsistent==0.19.3 + # via jsonschema +python-dateutil==2.8.2 + # via + # botocore + # frictionless + # pandas +python-slugify==8.0.1 + # via frictionless +pytz==2023.3 + # via + # pandas + # vptstools (setup.py) +pyyaml==6.0 + # via frictionless +requests==2.30.0 + # via frictionless +rfc3986==2.0.0 + # via frictionless +rich==13.3.5 + # via typer +s3fs[boto3]==2023.5.0 + # via vptstools (setup.py) +s3transfer==0.6.1 + # via boto3 +shellingham==1.5.0.post1 + # via typer +simpleeval==0.9.13 + # via frictionless +six==1.16.0 + # via + # isodate + # python-dateutil +stringcase==1.2.0 + # via frictionless +tabulate==0.9.0 + # via frictionless +text-unidecode==1.3 + # via python-slugify +typer[all]==0.9.0 + # via frictionless +typing-extensions==4.5.0 + # via + # frictionless + # pydantic + # typer +tzdata==2023.3 + # via pandas +urllib3==1.26.15 + # via + # botocore + # requests +validators==0.20.0 + # via frictionless +wrapt==1.15.0 + # via aiobotocore +yarl==1.9.2 + # via aiohttp diff --git a/setup.cfg b/setup.cfg index 590c5b0..2c6fcfd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -60,10 +60,9 @@ exclude = [options.extras_require] # Requirements to work with the transfer functionalities (FTP/S3) transfer = - boto3 + s3fs[boto3] paramiko fsspec - s3fs pyarrow # Develop requirements (semicolon/line-separated) diff --git a/src/vptstools/bin/transfer_baltrad.py b/src/vptstools/bin/transfer_baltrad.py index 05151c5..9d45f9b 100644 --- a/src/vptstools/bin/transfer_baltrad.py +++ b/src/vptstools/bin/transfer_baltrad.py @@ -1,9 +1,12 @@ # Simple Python script that: # - Connects via SFTP to the BALTRAD server -# - For each vp file (pvol gets ignored), download the file from the server and upload it to the "aloft" S3 bucket +# - For each vp file (pvol gets ignored), download the file from the server and +# upload it to the "aloft" S3 bucket -# Designed to be executed daily via a simple cronjob (files disappear after a few days on the BALTRAD server) -# Use a simple config file named config.ini. Create one by copying config.template.ini and filling in the values. +# Designed to be executed daily via a simple cronjob (files disappear after a few +# days on the BALTRAD server) +# Use a simple config file named config.ini. Create one by copying config.template.ini +# and filling in the values. # If file already exists at destination => do nothing import os import tempfile @@ -37,9 +40,11 @@ def s3_key_exists(key: str, bucket: str, s3_client) -> bool: def extract_metadata_from_filename(filename: str) -> tuple: - """Extract the metadata from the filename (format such as 'fropo_vp_20220809T051000Z_0xb') + """Extract the metadata from the filename (format + such as 'fropo_vp_20220809T051000Z_0xb') - All returned values are strings, month and days are 0-prefixed if they are single-digit. + All returned values are strings, month and days are 0-prefixed if + they are single-digit. Parameters ---------- @@ -99,10 +104,14 @@ def cli(): radar_code, year, month_str, day_str = extract_metadata_from_filename( entry.filename ) - destination_key = f"baltrad/hdf5/{radar_code}/{year}/{month_str}/{day_str}/{entry.filename}" + destination_key = ( + f"baltrad/hdf5/{radar_code}/{year}/" + f"{month_str}/{day_str}/{entry.filename}" + ) if not s3_key_exists(destination_key, destination_bucket, s3_client): click.echo( - f"{destination_key} does not exist at {destination_bucket}, transfer it...", + f"{destination_key} does not exist at {destination_bucket}, " + f"transfer it...", end="", ) with tempfile.TemporaryDirectory() as tmpdirname: diff --git a/src/vptstools/bin/vph5_to_vpts.py b/src/vptstools/bin/vph5_to_vpts.py index 8177b2a..bbd3f83 100644 --- a/src/vptstools/bin/vph5_to_vpts.py +++ b/src/vptstools/bin/vph5_to_vpts.py @@ -23,7 +23,7 @@ default=2, type=int, help="Range of h5 vp files to include, i.e. files modified between now and N" - "modified-days-ago. If 0, all h5 files in the bucket will be included.", + "modified-days-ago. If 0, all h5 files in the bucket will be included.", ) @click.option( "--aws-profile", @@ -40,21 +40,26 @@ def cli(modified_days_ago, aws_profile): """ if aws_profile: storage_options = {"profile": aws_profile} + boto3_options = {"profile_name": aws_profile} else: storage_options = dict() + boto3_options = dict() + # Load the S3 manifest of today click.echo(f"Load the S3 manifest of {date.today()}.") manifest_parent_key = ( pd.Timestamp.now(tz="utc").date() - pd.Timedelta("1day") ).strftime(f"%Y-%m-%dT{MANIFEST_HOUR_OF_DAY}Z") - s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json" # define manifest of today + # define manifest of today + s3_url = f"{MANIFEST_URL}/{manifest_parent_key}/manifest.json" click.echo(f"Extract coverage and days to recreate from manifest {s3_url}.") if modified_days_ago == 0: modified_days_ago = (pd.Timestamp.now(tz="utc") - S3_BUCKET_CREATION).days + 1 click.echo( - f"Recreate the full set of bucket files (files modified since {modified_days_ago}days). " + f"Recreate the full set of bucket files (files " + f"modified since {modified_days_ago}days). " f"This will take a while!" ) @@ -73,10 +78,14 @@ def cli(modified_days_ago, aws_profile): # Run vpts daily conversion for each radar-day with modified files inbo_s3 = s3fs.S3FileSystem(**storage_options) + # PATCH TO OVERCOME RECURSIVE s3fs in wrapped context + import boto3 + + session = boto3.Session(**boto3_options) + s3_client = session.client("s3") click.echo(f"Create {days_to_create_vpts.shape[0]} daily vpts files.") for j, daily_vpts in enumerate(days_to_create_vpts["directory"]): - # Enlist files of the day to rerun (all the given day) source, _, radar_code, year, month, day = daily_vpts odim_path = OdimFilePath(source, radar_code, "vp", year, month, day) @@ -88,9 +97,15 @@ def cli(modified_days_ago, aws_profile): # - download the files of the day h5_file_local_paths = [] for i, file_key in enumerate(odim5_files): - h5_path = OdimFilePath.from_inventory(file_key) + h5_path = OdimFilePath.from_s3fs_enlisting(file_key) h5_local_path = str(temp_folder_path / h5_path.file_name) - inbo_s3.download(file_key, h5_local_path) + # inbo_s3.get_file(file_key, h5_local_path) + # s3f3 failes in wrapped moto environment; fall back to boto3 + s3_client.download_file( + S3_BUCKET, + f"{h5_path.s3_folder_path_h5}/{h5_path.file_name}", + h5_local_path, + ) h5_file_local_paths.append(h5_local_path) # - run vpts on all locally downloaded files diff --git a/src/vptstools/s3.py b/src/vptstools/s3.py index 6fcb5d4..a123958 100644 --- a/src/vptstools/s3.py +++ b/src/vptstools/s3.py @@ -61,9 +61,20 @@ def from_inventory(cls, h5_file_path): h5_file_path.split("/")[1], ) + @classmethod + def from_s3fs_enlisting(cls, h5_file_path): + """Initialize class from S3 inventory which contains bucket, + source and file_type""" + return cls( + h5_file_path.split("/")[1], + *cls.parse_file_name(str(h5_file_path)), + h5_file_path.split("/")[1], + ) + @staticmethod def parse_file_name(file_name): - """Parse an hdf5 file name radar_code, data_type, year, month, day, hour, minute and file_name. + """Parse an hdf5 file name radar_code, data_type, year, month, day, hour, + minute and file_name. Parameters ---------- @@ -120,7 +131,10 @@ def s3_path_setup(self, file_output): def s3_url_h5(self, bucket="aloft"): """Full S3 URL for the stored h5 file""" - return f"s3://{bucket}/{self.s3_path_setup('hdf5')}/{self.month}/{self.day}/{self.file_name}" + return ( + f"s3://{bucket}/{self.s3_path_setup('hdf5')}/" + f"{self.month}/{self.day}/{self.file_name}" + ) @property def s3_folder_path_h5(self): @@ -135,7 +149,10 @@ def s3_file_path_daily_vpts(self): @property def s3_file_path_monthly_vpts(self): """S3 key of the monthly concatenated vpts file corresponding to the h5 file""" - return f"{self.s3_path_setup('monthly')}/{self.radar_code}_vpts_{self.year}{self.month}.csv.gz" + return ( + f"{self.s3_path_setup('monthly')}/" + f"{self.radar_code}_vpts_{self.year}{self.month}.csv.gz" + ) def list_manifest_file_keys(s3_manifest_url, storage_options=None): @@ -247,6 +264,9 @@ def _handle_inventory( """ # Filter for h5 files and extract source + df["modified"] = pd.to_datetime( + df["modified"], format="%Y-%m-%dT%H:%M:%S.%fZ", utc=True + ) df["file_items"] = df["file"].str.split("/") df["suffix"] = df["file_items"].str.get(-1).str.split(".").str.get(-1) df = df[df["suffix"] == "h5"] @@ -296,21 +316,25 @@ def handle_manifest(manifest_url, modified_days_ago="2day", storage_options=None for j, obj in enumerate(list_manifest_file_keys(manifest_url, storage_options)): # Read the manifest referenced file parsed_url = urllib.parse.urlparse(manifest_url) - df = pd.read_csv( + + with pd.read_csv( f"s3://{parsed_url.netloc}/{obj['key']}", - engine="pyarrow", + engine="c", names=["repo", "file", "size", "modified"], storage_options=storage_options, - ) - - # Extract counts per group and groups within defined time window - df_co, df_last = _handle_inventory( - df, modified_days_ago, group_func=extract_daily_group_from_inventory - ) - # Extract IDs latest N days modified files - df_last_n_days.append(df_last) - # Count occurrences per radar-day -> coverage input - df_coverage.append(df_co) + chunksize=50000, + ) as reader: + for chunk in reader: + # Extract counts per group and groups within defined time window + df_co, df_last = _handle_inventory( + chunk, + modified_days_ago, + group_func=extract_daily_group_from_inventory, + ) + # Extract IDs latest N days modified files + df_last_n_days.append(df_last) + # Count occurrences per radar-day -> coverage input + df_coverage.append(df_co) # Create coverage file DataFrame df_cov = pd.concat(df_coverage) diff --git a/src/vptstools/vpts.py b/src/vptstools/vpts.py index 6b6271b..613f233 100644 --- a/src/vptstools/vpts.py +++ b/src/vptstools/vpts.py @@ -194,9 +194,12 @@ def vp(file_path, vpts_csv_version="v1.0", source_file=""): -------- >>> file_path = Path("bejab_vp_20221111T233000Z_0x9.h5") >>> vp(file_path) - >>> vp(file_path, source_file="s3://aloft/baltrad/hdf5/2022/11/11/bejab_vp_20221111T233000Z_0x9.h5") + >>> vp(file_path, + ... source_file="s3://aloft/baltrad/hdf5/2022/11/11/" \ + ... "bejab_vp_20221111T233000Z_0x9.h5") - Use file name itself as source_file representation in vp file using a custom callable function + Use file name itself as source_file representation in vp file using a custom + callable function >>> vp(file_path, source_file=lambda x: Path(x).name) """ @@ -230,14 +233,16 @@ def vpts(file_paths, vpts_csv_version="v1.0", source_file=None): Notes ----- - Due tot the multiprocessing support, the source_file as a callable can not be a anonymous lambda function. + Due tot the multiprocessing support, the source_file as a callable can not be + a anonymous lambda function. Examples -------- >>> file_paths = sorted(Path("../data/raw/baltrad/").rglob("*.h5")) >>> vpts(file_paths) - Use file name itself as source_file representation in vp file using a custom callable function + Use file name itself as source_file representation in vp file using a + custom callable function >>> def path_to_source(file_path): ... return Path(file_path).name @@ -247,7 +252,8 @@ def vpts(file_paths, vpts_csv_version="v1.0", source_file=None): if not source_file: source_file = _convert_to_source - with multiprocessing.Pool(processes=(multiprocessing.cpu_count() - 1)) as pool: + cpu_count = max(multiprocessing.cpu_count() - 1, 1) + with multiprocessing.Pool(processes=cpu_count) as pool: data = pool.map( functools.partial( vp, vpts_csv_version=vpts_csv_version, source_file=source_file @@ -326,7 +332,8 @@ def _write_resource_descriptor(vpts_file_path: Path, schema_version="v1.0"): "mediatype": "text/csv", "encoding": CSV_ENCODING, "dialect": {"delimiter": CSV_FIELD_DELIMITER}, - "schema": f"https://raw.githubusercontent.com/enram/vpts-csv/{schema_version}/vpts-csv-table-schema.json", + "schema": f"https://raw.githubusercontent.com/enram/vpts-csv/" + f"{schema_version}/vpts-csv-table-schema.json", } vpts_file_path.parent.mkdir(parents=True, exist_ok=True) diff --git a/src/vptstools/vpts_csv.py b/src/vptstools/vpts_csv.py index 05c90f0..3721ccc 100644 --- a/src/vptstools/vpts_csv.py +++ b/src/vptstools/vpts_csv.py @@ -114,7 +114,9 @@ def check_source_file(source_file, regex): Examples -------- - >>> check_source_file("s3://alof/baltrad/2023/01/01/bejab_vp_20230101T000500Z_0x9.h5", r".*h5") + >>> check_source_file("s3://alof/baltrad/2023/01/01/" + ... "bejab_vp_20230101T000500Z_0x9.h5", + ... r".*h5") 's3://alof/baltrad/2023/01/01/bejab_vp_20230101T000500Z_0x9.h5' """ sf_regex = re.compile(regex) @@ -130,9 +132,10 @@ def check_source_file(source_file, regex): """ VPTS CSV version abstract version and individual version mapping implementations -To create a new version of the VPTS CSV implementation, create a new class `VptsCsvVX` inherited from the -`AbstractVptsCsv` class and provide the `abstractmethod`. See the `mapping` method for the conversion -functionality. Make sure to add the mapping to the `_get_vpts_version` function +To create a new version of the VPTS CSV implementation, create a new class `VptsCsvVX` +inherited from the `AbstractVptsCsv` class and provide the `abstractmethod`. See the +`mapping` method for the conversion functionality. Make sure to add the mapping +to the `_get_vpts_version` function """ @@ -235,7 +238,6 @@ def mapping(self, bird_profile) -> dict: class VptsCsvV1(AbstractVptsCsv): - source_file_regex = r"^(?=^[^.\/~])(^((?!\.{2}).)*$).*$" @property diff --git a/tests/conftest.py b/tests/conftest.py index 5c7ba51..be278af 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,10 +1,19 @@ import os import datetime -from unittest.mock import MagicMock from pathlib import Path +from typing import Callable, Any +from unittest.mock import MagicMock import pytest import boto3 +import aiobotocore.awsrequest +import aiobotocore.endpoint +import aiohttp +import aiohttp.client_reqrep +import aiohttp.typedefs +import botocore.awsrequest +import botocore.model + from moto import mock_s3 from vptstools.vpts import BirdProfile @@ -14,6 +23,87 @@ SAMPlE_DATA_DIR = CURRENT_DIR / "data" +## Patch as described in https://github.com/aio-libs/aiobotocore/issues/755#issuecomment-1424945194 -------------------- +class MockAWSResponse(aiobotocore.awsrequest.AioAWSResponse): + """ + Mocked AWS Response. + + https://github.com/aio-libs/aiobotocore/issues/755 + https://gist.github.com/giles-betteromics/12e68b88e261402fbe31c2e918ea4168 + """ + + def __init__(self, response: botocore.awsrequest.AWSResponse): + self._moto_response = response + self.status_code = response.status_code + self.raw = MockHttpClientResponse(response) + + # adapt async methods to use moto's response + async def _content_prop(self) -> bytes: + return self._moto_response.content + + async def _text_prop(self) -> str: + return self._moto_response.text + + +class MockHttpClientResponse(aiohttp.client_reqrep.ClientResponse): + """ + Mocked HTP Response. + + See Notes + """ + + def __init__(self, response: botocore.awsrequest.AWSResponse): + """ + Mocked Response Init. + """ + + async def read(self: MockHttpClientResponse, n: int = -1) -> bytes: + return response.content + + self.content = MagicMock(aiohttp.StreamReader) + self.content.read = read + self.response = response + + @property + def raw_headers(self) -> Any: + """ + Return the headers encoded the way that aiobotocore expects them. + """ + return { + k.encode("utf-8"): str(v).encode("utf-8") + for k, v in self.response.headers.items() + }.items() + + +@pytest.fixture(scope="session", autouse=True) +def patch_aiobotocore() -> None: + """ + Pytest Fixture Supporting S3FS Mocks. + + See Notes + """ + + def factory(original: Callable[[Any, Any], Any]) -> Callable[[Any, Any], Any]: + """ + Response Conversion Factory. + """ + + def patched_convert_to_response_dict( + http_response: botocore.awsrequest.AWSResponse, + operation_model: botocore.model.OperationModel, + ) -> Any: + return original(MockAWSResponse(http_response), operation_model) + + return patched_convert_to_response_dict + + aiobotocore.endpoint.convert_to_response_dict = factory( + aiobotocore.endpoint.convert_to_response_dict + ) + + +# ---------------------------------------------------------------------------------------------------------------------- + + @pytest.fixture def path_with_vp(): """Return the folder containing minimal unit test files""" diff --git a/tests/test_vph5_to_vpts.py b/tests/test_vph5_to_vpts.py index 72a8532..a80b878 100644 --- a/tests/test_vph5_to_vpts.py +++ b/tests/test_vph5_to_vpts.py @@ -28,7 +28,6 @@ def test_e2e_cli(s3_inventory, path_inventory, tmp_path): "pandas.Timestamp.now", return_value=pd.Timestamp("2023-02-02 00:00:00", tz="UTC"), ): - # Run CLI command `vph5_to_vpts` with limited modified period check to 3 days runner = CliRunner() result = runner.invoke(cli, ["--modified-days-ago", 3]) @@ -78,7 +77,6 @@ def test_e2e_cli_all(s3_inventory, path_inventory, tmp_path): "pandas.Timestamp.now", return_value=pd.Timestamp("2023-02-02 00:00:00", tz="UTC"), ): - # Run CLI command `vph5_to_vpts` with limited modified period check to 3 days runner = CliRunner() result = runner.invoke(cli, ["--modified-days-ago", 0]) diff --git a/tox.ini b/tox.ini index 08292b4..888bfbf 100644 --- a/tox.ini +++ b/tox.ini @@ -5,7 +5,7 @@ name = vptstools [tox] minversion = 3.24 -envlist = py39 +envlist = py310 isolated_build = True @@ -95,7 +95,7 @@ commands = [testenv:dev] description = Create development environment with venv and register ipykernel -basepython = python3.9 +basepython = python3.10 usedevelop = true envdir = {toxinidir}/venv extras = @@ -104,4 +104,4 @@ extras = deps = ipykernel commands = - python -m ipykernel install --user --name {[main]name} --display-name 'Python 3.9 ({[main]name})' + python -m ipykernel install --user --name {[main]name} --display-name 'Python 3.10 ({[main]name})'