From 129695da2a20a9c586c7cd9b50f0bd96b95ea310 Mon Sep 17 00:00:00 2001 From: Andrew Cleland Date: Wed, 24 Jan 2024 21:17:59 +1000 Subject: [PATCH 1/4] Use 07D instead of 01W for IGS week span --- gnssanalysis/filenames.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/gnssanalysis/filenames.py b/gnssanalysis/filenames.py index f45acfa..77c0caf 100644 --- a/gnssanalysis/filenames.py +++ b/gnssanalysis/filenames.py @@ -306,9 +306,12 @@ def nominal_span_string(span_seconds: float) -> str: unit = "W" span_unit_counts = int(span_seconds // gn_const.SEC_IN_WEEK) elif span_seconds >= gn_const.SEC_IN_WEEK: - if (span_seconds % gn_const.SEC_IN_WEEK) < gn_const.SEC_IN_DAY: + num_weeks = int(span_seconds // gn_const.SEC_IN_WEEK) + # IGS uses 07D to represent a week + # TODO: Handle JPL - uses 01W for a week + if (span_seconds % gn_const.SEC_IN_WEEK) < gn_const.SEC_IN_DAY and num_weeks > 1: unit = "W" - span_unit_counts = int(span_seconds // gn_const.SEC_IN_WEEK) + span_unit_counts = num_weeks else: unit = "D" span_unit_counts = int(span_seconds // gn_const.SEC_IN_DAY) From 1f0a227904ee1f3ac1a1078150e45524bbdc6ea8 Mon Sep 17 00:00:00 2001 From: Andrew Cleland Date: Wed, 24 Jan 2024 21:18:41 +1000 Subject: [PATCH 2/4] Add ftp download functions from auto_download_PPP --- gnssanalysis/gn_download.py | 65 +++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/gnssanalysis/gn_download.py b/gnssanalysis/gn_download.py index 6a197da..5bd25e2 100644 --- a/gnssanalysis/gn_download.py +++ b/gnssanalysis/gn_download.py @@ -5,14 +5,19 @@ clk rnx (including transformation from crx to rnx) """ +import concurrent as _concurrent +from contextlib import contextmanager as _contextmanager import datetime as _datetime +from itertools import repeat as _repeat import logging import os as _os +import random as _random import shutil import subprocess as _subprocess import sys as _sys import threading import time as _time +import ftplib as _ftplib from ftplib import FTP_TLS as _FTP_TLS from pathlib import Path as _Path from typing import Optional as _Optional, Union as _Union @@ -28,6 +33,8 @@ MB = 1024 * 1024 +CDDIS_FTP = "gdc.cddis.eosdis.nasa.gov" + # s3client = boto3.client('s3', region_name='eu-central-1') @@ -339,6 +346,7 @@ def get_install_crx2rnx(override=False, verbose=False): logging.info(f"crx2rnx already present in {_sys.path[0]}") +# TODO: Deprecate in favour of the contextmanager? def connect_cddis(verbose=False): """ Output an FTP_TLS object connected to the cddis server root @@ -356,6 +364,25 @@ def connect_cddis(verbose=False): return ftps +@_contextmanager +def ftp_tls(url: str, **kwargs) -> None: + kwargs.setdefault("timeout", 30) + with _FTP_TLS(url, **kwargs) as ftps: + ftps.login() + ftps.prot_p() + yield ftps + ftps.quit() + + +@_contextmanager +def ftp_tls_cddis(connection=None, **kwargs) -> None: + if connection is None: + with ftp_tls(CDDIS_FTP, **kwargs) as ftps: + yield ftps + else: + yield connection + + def select_mr_file(mr_files, f_typ, ac): """ Given a list of most recent files, find files matching type and AC of interest @@ -445,6 +472,44 @@ def download_most_recent( return ret_vars +def download_file_from_cddis( + filename: str, + ftp_folder: str, + output_folder: _Path, + ftps: _Optional[_FTP_TLS] = None, + max_retries: int = 3, + uncomp: bool = True, +) -> None: + with ftp_tls_cddis(ftps) as ftps: + ftps.cwd(ftp_folder) + retries = 0 + download_done = False + while not download_done and retries <= max_retries: + try: + logging.info(f"Attempting Download of: {filename}") + check_n_download(filename, str(output_folder) + "/", ftps, uncomp=uncomp) + download_done = True + logging.info(f"Downloaded {filename}") + except _ftplib.all_errors as e: + retries += 1 + if retries > max_retries: + logging.warning(f"Failed to download {filename} and reached maximum retry count ({max_retries}).") + if (output_folder / filename).is_file(): + (output_folder / filename).unlink() + raise e + + logging.debug(f"Received an error ({e}) while try to download {filename}, retrying({retries}).") + # Add some backoff time (exponential random as it appears to be contention based?) + _time.sleep(_random.uniform(0.0, 2.0**retries)) + + +def download_multiple_files_from_cddis(files: list, ftp_folder: str, output_folder: _Path) -> None: + with _concurrent.futures.ThreadPoolExecutor() as executor: + # Wrap this in a list to force iteration of results and so get the first exception if any were raised + list(executor.map(download_file_from_cddis, files, _repeat(ftp_folder), _repeat(output_folder))) + + +# TODO: Deprecate? Only supports legacy filenames def download_prod( dates, dest, From 1ee9439de37e151b4e47bae23c6e78f2fd31de76 Mon Sep 17 00:00:00 2001 From: Andrew Cleland Date: Thu, 25 Jan 2024 08:58:47 +1000 Subject: [PATCH 3/4] Black formatting --- gnssanalysis/filenames.py | 8 ++++---- gnssanalysis/gn_download.py | 4 ---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/gnssanalysis/filenames.py b/gnssanalysis/filenames.py index 77c0caf..148afaa 100644 --- a/gnssanalysis/filenames.py +++ b/gnssanalysis/filenames.py @@ -251,11 +251,11 @@ def generate_IGS_long_filename( if isinstance(timespan, str): timespan_str = timespan - else: + else: if end_epoch is None: if timespan is None: raise ValueError("Either end_epoch or timespan must be supplied") - else : + else: timespan = end_epoch - start_epoch timespan_str = nominal_span_string(timespan.total_seconds()) @@ -406,7 +406,7 @@ def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]: # clk_df.reset_index("J2000").groupby(level="CODE")["J2000"].diff().groupby(level="CODE").median().median() # ) # The pandas stubs seem to assume .index returns an Index (not MultiIndex), so we need to ignore the typing for now - sampling_rate = np.median(np.diff(clk_df.index.levels[1])) #type: ignore + sampling_rate = np.median(np.diff(clk_df.index.levels[1])) # type: ignore # Alternatively: sampling_rate = np.median(np.diff(clk_df.index.get_level_values("J2000").unique())) end_epoch = gn_datetime.j2000_to_pydatetime(end_j2000sec + sampling_rate) @@ -609,7 +609,7 @@ def determine_sp3_name_props(file_path: pathlib.Path) -> Dict[str, Any]: # sp3_df.reset_index(0, names="Epoch").groupby(level=0)["Epoch"].diff().groupby(level=0).median().median() # ) # The pandas stubs seem to assume .index returns an Index (not MultiIndex), so we need to ignore the typing for now - sampling_rate = np.median(np.diff(sp3_df.index.levels[0])) # type: ignore + sampling_rate = np.median(np.diff(sp3_df.index.levels[0])) # type: ignore # Alternatively: # sampling_rate = np.median(np.diff(sp3_df.index.get_level_values(0).unique())) name_props["start_epoch"] = start_epoch diff --git a/gnssanalysis/gn_download.py b/gnssanalysis/gn_download.py index 5bd25e2..2a09b29 100644 --- a/gnssanalysis/gn_download.py +++ b/gnssanalysis/gn_download.py @@ -285,7 +285,6 @@ def check_n_download(comp_filename, dwndir, ftps, uncomp=True, remove_crx=False, dwndir += "/" if no_check or (not check_file_present(comp_filename, dwndir)): - logging.debug(f"Downloading {comp_filename}") with open(comp_file, "wb") as local_f: @@ -569,9 +568,7 @@ def download_prod( for dt in dt_list: for f_typ in f_types: - if dwn_src == "cddis": - if repro3: f, gpswk = gen_prod_filename(dt, pref=ac, suff=suff, f_type=f_typ, repro3=True) elif (ac == "igs") and (f_typ == "erp"): @@ -788,7 +785,6 @@ def download_rinex3(dates, stations, dest, dwn_src="cddis", ftps=False, f_dict=F for dt in dt_list: for station in stations: - f_pref = f"{station}_R_" f_suff_crx = f"0000_01D_30S_MO.crx.gz" f = f_pref + dt.strftime("%Y%j") + f_suff_crx From 1b6dfc70af02cbc5cde151495c2cbbce1c16b578 Mon Sep 17 00:00:00 2001 From: Andrew Cleland Date: Thu, 25 Jan 2024 09:38:04 +1000 Subject: [PATCH 4/4] Add comments --- gnssanalysis/gn_download.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/gnssanalysis/gn_download.py b/gnssanalysis/gn_download.py index 2a09b29..a591d3a 100644 --- a/gnssanalysis/gn_download.py +++ b/gnssanalysis/gn_download.py @@ -365,6 +365,10 @@ def connect_cddis(verbose=False): @_contextmanager def ftp_tls(url: str, **kwargs) -> None: + """Opens a connect to specified ftp server over tls. + + :param: url: Remote ftp url + """ kwargs.setdefault("timeout", 30) with _FTP_TLS(url, **kwargs) as ftps: ftps.login() @@ -374,7 +378,11 @@ def ftp_tls(url: str, **kwargs) -> None: @_contextmanager -def ftp_tls_cddis(connection=None, **kwargs) -> None: +def ftp_tls_cddis(connection: _FTP_TLS = None, **kwargs) -> None: + """Establish an ftp tls connection to CDDIS. Opens a new connection if one does not already exist. + + :param connection: Active connection which is passed through to allow reuse + """ if connection is None: with ftp_tls(CDDIS_FTP, **kwargs) as ftps: yield ftps @@ -479,6 +487,15 @@ def download_file_from_cddis( max_retries: int = 3, uncomp: bool = True, ) -> None: + """Downloads a single file from the cddis ftp server. + + :param filename: Name of the file to download + :ftp_folder: Folder where the file is stored on the remote + :output_folder: Folder to store the output file + :ftps: Optional active connection object which is reused + :max_retries: Number of retries before raising error + :uncomp: If true, uncompress files on download + """ with ftp_tls_cddis(ftps) as ftps: ftps.cwd(ftp_folder) retries = 0 @@ -502,7 +519,13 @@ def download_file_from_cddis( _time.sleep(_random.uniform(0.0, 2.0**retries)) -def download_multiple_files_from_cddis(files: list, ftp_folder: str, output_folder: _Path) -> None: +def download_multiple_files_from_cddis(files: [str], ftp_folder: str, output_folder: _Path) -> None: + """Downloads multiple files in a single folder from cddis in a thread pool. + + :param files: List of str filenames + :ftp_folder: Folder where the file is stored on the remote + :output_folder: Folder to store the output files + """ with _concurrent.futures.ThreadPoolExecutor() as executor: # Wrap this in a list to force iteration of results and so get the first exception if any were raised list(executor.map(download_file_from_cddis, files, _repeat(ftp_folder), _repeat(output_folder)))