Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BUFR file creation script and fix unit test bug in test_get_bufr.py #263

Merged
merged 16 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/unit_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
workflow_dispatch:

jobs:
build:
test:
name: unit_test
runs-on: ubuntu-latest
strategy:
Expand All @@ -19,6 +19,9 @@ jobs:
uses: actions/checkout@v3
with:
token: ${{ secrets.GITHUB_TOKEN }}
- name: Install eccodes
run : |
sudo apt-get install -y libeccodes-dev
- name: Install dependencies
shell: bash
run: |
Expand All @@ -30,4 +33,4 @@ jobs:
- name: Run unit tests
shell: bash
run: |
python3 -m unittest discover tests.e2e
python3 -m unittest discover tests
4 changes: 3 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
package_data={
"pypromice.tx": ["payload_formats.csv", "payload_types.csv"],
"pypromice.qc.percentiles": ["thresholds.csv"],
"pypromice.postprocess": ["station_configurations.toml", "positions_seed.csv"],
"pypromice.postprocess": ["positions_seed.csv"],
},
install_requires=['numpy~=1.23', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'Bottleneck', 'netcdf4', 'pyDataverse==0.3.1', 'eccodes', 'scikit-learn>=1.1.0'],
# extras_require={'postprocess': ['eccodes','scikit-learn>=1.1.0']},
Expand All @@ -47,6 +47,8 @@
'get_l2tol3 = pypromice.process.get_l2tol3:main',
'get_watsontx = pypromice.tx.get_watsontx:get_watsontx',
'get_bufr = pypromice.postprocess.get_bufr:main',
'create_bufr_files = pypromice.postprocess.create_bufr_files:main',
'bufr_to_csv = pypromice.postprocess.bufr_to_csv:main',
'get_msg = pypromice.tx.get_msg:get_msg'
],
},
Expand Down
7 changes: 6 additions & 1 deletion src/pypromice/postprocess/bufr_to_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@

from pypromice.postprocess.bufr_utilities import read_bufr_file

if __name__ == "__main__":

def main():
parser = argparse.ArgumentParser("BUFR to CSV converter")
parser.add_argument("path", type=Path)
args = parser.parse_args()

print(read_bufr_file(args.path).to_csv())


if __name__ == "__main__":
main()
93 changes: 78 additions & 15 deletions src/pypromice/postprocess/bufr_utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def round(value: float):

return round


# Enforce precision
# Note the sensor accuracies listed here:
# https://essd.copernicus.org/articles/13/3819/2021/#section8
Expand All @@ -64,28 +65,82 @@ class BUFRVariables:
* heightOfSensorAboveLocalGroundOrDeckOfMarinePlatformWSPD: Corresponds to "#7#heightOfSensorAboveLocalGroundOrDeckOfMarinePlatform" which is height if anemometer relative to ground or deck of marine platform.

"""
wmo_id: str

# Station type: "mobile" or "land"
# ===============================
# Fixed land station schema: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307080
# Mobile station schema: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307090

station_type: str

# WMO station identifier
# Land stations: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/301090
# Mobile stations: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/301092
# ======================================================================================================
wmo_id: str
timestamp: datetime.datetime
relativeHumidity: float = attrs.field(converter=round_converter(0))
airTemperature: float = attrs.field(converter=round_converter(1))
pressure: float = attrs.field(converter=round_converter(1))
windDirection: float = attrs.field(converter=round_converter(0))
windSpeed: float = attrs.field(converter=round_converter(1))
latitude: float = attrs.field(converter=round_converter(6))
longitude: float = attrs.field(converter=round_converter(6))

# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/005001
# Scale: 5, unit: degrees
# TODO: Test if eccodes does the rounding as well. The rounding is was 6 which is larger that the scale.
latitude: float = attrs.field(converter=round_converter(5))
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/006001
# Scale: 5, unit: degrees
longitude: float = attrs.field(converter=round_converter(5))

# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007030
# Scale: 1, unit: m
heightOfStationGroundAboveMeanSeaLevel: float = attrs.field(
converter=round_converter(2)
converter=round_converter(1)
)
#
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007031
# Scale: 1, unit: m
heightOfBarometerAboveMeanSeaLevel: float = attrs.field(
converter=round_converter(2),
converter=round_converter(1),
)

# Pressure information
# ====================
# Definition table: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302031
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007004
# Scale: -1, unit: Pa
pressure: float = attrs.field(converter=round_converter(-1))
# There are two other pressure variables in the template: 302001 and 010062.

# Basic synoptic "instantaneous" data
# ===================================
# Definition table: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302035
# This section only include the temperature and humidity data (302032).
# Precipitation and cloud data are currently ignored.
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007032
# Scale: 2, unit: m
# This is the first appearance of this variable id.
heightOfSensorAboveLocalGroundOrDeckOfMarinePlatformTempRH: float = attrs.field(
converter=round_converter(4),
converter=round_converter(2),
)
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/012101
# Scale: 2, unit: K
airTemperature: float = attrs.field(converter=round_converter(2))
# There is also a Dewpoint temperature in this template: 012103 which is currently unused.
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/012103
# Scale: 0, unit: %
relativeHumidity: float = attrs.field(converter=round_converter(0))

# Basic synoptic "period" data
# ============================
# Definition table: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302043
# Wind data: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/302042
# Wind direction: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/011001
# Scale: 0, unit: degrees
windDirection: float = attrs.field(converter=round_converter(0))
# Wind speed: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/011002
# Scale: 1, unit: m/s
windSpeed: float = attrs.field(converter=round_converter(1))
# https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/007032
# Scale: 2, unit: m
# This is the 7th appearance of this variable id.
heightOfSensorAboveLocalGroundOrDeckOfMarinePlatformWSPD: float = attrs.field(
converter=round_converter(4)
converter=round_converter(2)
)

def as_series(self) -> pd.Series:
Expand Down Expand Up @@ -129,6 +184,7 @@ def __eq__(self, other: "BUFRVariables"):

BUFR_TEMPLATES = {
"mobile": {
# Template definition: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307090
"unexpandedDescriptors": (307090), # message template, "synopMobil"
"edition": 4, # latest edition
"masterTableNumber": 0,
Expand All @@ -144,6 +200,7 @@ def __eq__(self, other: "BUFRVariables"):
"compressedData": 0,
},
"land": {
# Template definition: https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_D/307080
"unexpandedDescriptors": (307080), # message template, "synopLand"
"edition": 4, # latest edition
"masterTableNumber": 0,
Expand Down Expand Up @@ -246,6 +303,11 @@ def set_station(ibufr, station_type: str, wmo_id: str):
elif station_type == "land":
# StationNumber for land stations are integeres
wmo_id_int = int(wmo_id)
if wmo_id_int >= 1024:
raise ValueError(
f"Invalid WMO ID {wmo_id}. Land station number must be less than 1024."
"See https://vocabulary-manager.eumetsat.int/vocabularies/BUFR/WMO/32/TABLE_B/001002"
)
station_config = dict(stationNumber=wmo_id_int)
else:
raise Exception(f"Unsupported station station type {station_type}")
Expand Down Expand Up @@ -485,5 +547,6 @@ def read_bufr_file(path: PathLike) -> pd.DataFrame:
message_vars = read_bufr_message(fp)
if message_vars is None:
break
lines.append(message_vars)
return pd.DataFrame(lines).rename_axis("message_index")
lines.append(message_vars.as_series())
data_frame = pd.DataFrame(lines).set_index("wmo_id")
return data_frame
178 changes: 178 additions & 0 deletions src/pypromice/postprocess/create_bufr_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import logging
from pathlib import Path
from typing import Sequence, List

import pandas as pd
from pypromice.station_configuration import load_station_configuration_mapping

from pypromice.postprocess.get_bufr import (
get_bufr,
DEFAULT_LIN_REG_TIME_LIMIT,
DEFAULT_POSITION_SEED_PATH,
)

main_logger = logging.getLogger(__name__)


def create_bufr_files(
input_files: Sequence[Path],
station_configuration_root: Path,
period_start: str,
period_end: str,
output_root: Path,
override: bool,
break_on_error: bool = False,
output_filename_suffix: str = "geus_",
):
"""
Generate hourly bufr files from the for all input files

Parameters
----------
input_files
Paths to csv l3 hourly data files
station_configuration_root
Root directory containing station configuration toml files
period_start
Datetime string for period start. Eg '2024-01-01T00:00' or '20240101
period_end
Datetime string for period end
output_root
Output dir for both bufr files for individual stations and compiled. Organized in two sub directories.
override
If False: Skip a period if the compiled output file exists.
break_on_error
If True: Stop processing if an error occurs
output_filename_suffix
Suffix for the compiled output file

"""
periods = pd.date_range(period_start, period_end, freq="H")
output_individual_root = output_root / "individual"
output_compiled_root = output_root / "compiled"
output_individual_root.mkdir(parents=True, exist_ok=True)
output_compiled_root.mkdir(parents=True, exist_ok=True)

station_configuration_mapping = load_station_configuration_mapping(
station_configuration_root,
skip_unexpected_fields=True,
)

for period in periods:
period: pd.Timestamp
date_str = period.strftime("%Y%m%dT%H%M")
main_logger.info(f"Processing {date_str}")
output_dir_path = output_individual_root / f"{date_str}"
output_file_path = (
output_compiled_root / f"{output_filename_suffix}{date_str}.bufr"
)

main_logger.info(f"{period}, {date_str}")
if override or not output_file_path.exists():
get_bufr(
bufr_out=output_dir_path,
input_files=input_files,
store_positions=False,
positions_filepath=None,
linear_regression_time_limit=DEFAULT_LIN_REG_TIME_LIMIT,
timestamps_pickle_filepath=None,
target_timestamp=period,
station_configuration_mapping=station_configuration_mapping,
positions_seed_path=DEFAULT_POSITION_SEED_PATH,
break_on_error=break_on_error,
)

with output_file_path.open("wb") as fp_dst:
for src_path in output_dir_path.glob("*.bufr"):
with src_path.open("rb") as fp_src:
fp_dst.write(fp_src.read())
else:
main_logger.info(f"Output file exists. Skipping {output_file_path}")


# %%


def main():
import argparse
import glob
import sys

logger_format_string = "%(asctime)s; %(levelname)s; %(name)s; %(message)s"
logging.basicConfig(
level=logging.ERROR,
stream=sys.stdout,
format=logger_format_string,
)

main_handler = logging.StreamHandler(sys.stdout)
main_handler.setLevel(logging.INFO)
formatter = logging.Formatter(logger_format_string)
main_handler.setFormatter(formatter)
main_logger.addHandler(main_handler)
main_logger.setLevel(logging.INFO)
BaptisteVandecrux marked this conversation as resolved.
Show resolved Hide resolved

parser = argparse.ArgumentParser("Create BUFR files from L3 tx .csv files.")
parser.add_argument(
"--input_files",
"--l3-filepath",
"-i",
type=Path,
nargs="+",
required=True,
help="Path to L3 tx .csv files. Can be direct paths or glob patterns",
)
parser.add_argument(
"--period_start",
"-s",
required=True,
help="Datetime string for period start. Eg '2024-01-01T00:00' or '20240101",
)
parser.add_argument(
"--period_end", "-e", required=True, help="Datetime string for period end"
)
parser.add_argument(
"--output_root",
"-o",
required=True,
type=Path,
help="Output dir for both bufr files for individual stations and compiled. Organized in two sub directories.",
)
parser.add_argument(
"--station_configuration_root",
"-c",
required=True,
type=Path,
help="Root directory containing station configuration toml files",
)
parser.add_argument(
"--override",
"-f",
default=False,
action="store_true",
help="Recreate and overide existing output files",
)
args = parser.parse_args()

# Interpret all input file paths as glob patterns if they don't exist
input_files: List[Path] = list()
for path in args.input_files:
if path.exists():
input_files.append(path)
else:
# The input path might be a glob pattern
input_files += map(Path, glob.glob(path.as_posix()))

main_logger.info(f"Processing {len(input_files)} input files")
create_bufr_files(
input_files=input_files,
period_start=args.period_start,
period_end=args.period_end,
output_root=args.output_root,
override=args.override,
station_configuration_root=args.station_configuration_root,
)


if __name__ == "__main__":
main()
Loading
Loading