Skip to content

Commit

Permalink
Merge pull request #11 from aclel/ftp-cddis
Browse files Browse the repository at this point in the history
Move download functions from ginan/scripts/auto_download_PPP to gnssanalysis
  • Loading branch information
ronaldmaj committed Jan 28, 2024
2 parents 6abad62 + 1b6dfc7 commit bf0a093
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 10 deletions.
15 changes: 9 additions & 6 deletions gnssanalysis/filenames.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,11 +251,11 @@ def generate_IGS_long_filename(

if isinstance(timespan, str):
timespan_str = timespan
else:
else:
if end_epoch is None:
if timespan is None:
raise ValueError("Either end_epoch or timespan must be supplied")
else :
else:
timespan = end_epoch - start_epoch
timespan_str = nominal_span_string(timespan.total_seconds())

Expand Down Expand Up @@ -306,9 +306,12 @@ def nominal_span_string(span_seconds: float) -> str:
unit = "W"
span_unit_counts = int(span_seconds // gn_const.SEC_IN_WEEK)
elif span_seconds >= gn_const.SEC_IN_WEEK:
if (span_seconds % gn_const.SEC_IN_WEEK) < gn_const.SEC_IN_DAY:
num_weeks = int(span_seconds // gn_const.SEC_IN_WEEK)
# IGS uses 07D to represent a week
# TODO: Handle JPL - uses 01W for a week
if (span_seconds % gn_const.SEC_IN_WEEK) < gn_const.SEC_IN_DAY and num_weeks > 1:
unit = "W"
span_unit_counts = int(span_seconds // gn_const.SEC_IN_WEEK)
span_unit_counts = num_weeks
else:
unit = "D"
span_unit_counts = int(span_seconds // gn_const.SEC_IN_DAY)
Expand Down Expand Up @@ -403,7 +406,7 @@ def determine_clk_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
# clk_df.reset_index("J2000").groupby(level="CODE")["J2000"].diff().groupby(level="CODE").median().median()
# )
# The pandas stubs seem to assume .index returns an Index (not MultiIndex), so we need to ignore the typing for now
sampling_rate = np.median(np.diff(clk_df.index.levels[1])) #type: ignore
sampling_rate = np.median(np.diff(clk_df.index.levels[1])) # type: ignore
# Alternatively:
sampling_rate = np.median(np.diff(clk_df.index.get_level_values("J2000").unique()))
end_epoch = gn_datetime.j2000_to_pydatetime(end_j2000sec + sampling_rate)
Expand Down Expand Up @@ -606,7 +609,7 @@ def determine_sp3_name_props(file_path: pathlib.Path) -> Dict[str, Any]:
# sp3_df.reset_index(0, names="Epoch").groupby(level=0)["Epoch"].diff().groupby(level=0).median().median()
# )
# The pandas stubs seem to assume .index returns an Index (not MultiIndex), so we need to ignore the typing for now
sampling_rate = np.median(np.diff(sp3_df.index.levels[0])) # type: ignore
sampling_rate = np.median(np.diff(sp3_df.index.levels[0])) # type: ignore
# Alternatively:
# sampling_rate = np.median(np.diff(sp3_df.index.get_level_values(0).unique()))
name_props["start_epoch"] = start_epoch
Expand Down
92 changes: 88 additions & 4 deletions gnssanalysis/gn_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
clk
rnx (including transformation from crx to rnx)
"""
import concurrent as _concurrent
from contextlib import contextmanager as _contextmanager
import datetime as _datetime
from itertools import repeat as _repeat
import logging
import os as _os
import random as _random
import shutil
import subprocess as _subprocess
import sys as _sys
import threading
import time as _time
import ftplib as _ftplib
from ftplib import FTP_TLS as _FTP_TLS
from pathlib import Path as _Path
from typing import Optional as _Optional, Union as _Union
Expand All @@ -28,6 +33,8 @@

MB = 1024 * 1024

CDDIS_FTP = "gdc.cddis.eosdis.nasa.gov"

# s3client = boto3.client('s3', region_name='eu-central-1')


Expand Down Expand Up @@ -278,7 +285,6 @@ def check_n_download(comp_filename, dwndir, ftps, uncomp=True, remove_crx=False,
dwndir += "/"

if no_check or (not check_file_present(comp_filename, dwndir)):

logging.debug(f"Downloading {comp_filename}")

with open(comp_file, "wb") as local_f:
Expand Down Expand Up @@ -339,6 +345,7 @@ def get_install_crx2rnx(override=False, verbose=False):
logging.info(f"crx2rnx already present in {_sys.path[0]}")


# TODO: Deprecate in favour of the contextmanager?
def connect_cddis(verbose=False):
"""
Output an FTP_TLS object connected to the cddis server root
Expand All @@ -356,6 +363,33 @@ def connect_cddis(verbose=False):
return ftps


@_contextmanager
def ftp_tls(url: str, **kwargs) -> None:
"""Opens a connect to specified ftp server over tls.
:param: url: Remote ftp url
"""
kwargs.setdefault("timeout", 30)
with _FTP_TLS(url, **kwargs) as ftps:
ftps.login()
ftps.prot_p()
yield ftps
ftps.quit()


@_contextmanager
def ftp_tls_cddis(connection: _FTP_TLS = None, **kwargs) -> None:
"""Establish an ftp tls connection to CDDIS. Opens a new connection if one does not already exist.
:param connection: Active connection which is passed through to allow reuse
"""
if connection is None:
with ftp_tls(CDDIS_FTP, **kwargs) as ftps:
yield ftps
else:
yield connection


def select_mr_file(mr_files, f_typ, ac):
"""
Given a list of most recent files, find files matching type and AC of interest
Expand Down Expand Up @@ -445,6 +479,59 @@ def download_most_recent(
return ret_vars


def download_file_from_cddis(
filename: str,
ftp_folder: str,
output_folder: _Path,
ftps: _Optional[_FTP_TLS] = None,
max_retries: int = 3,
uncomp: bool = True,
) -> None:
"""Downloads a single file from the cddis ftp server.
:param filename: Name of the file to download
:ftp_folder: Folder where the file is stored on the remote
:output_folder: Folder to store the output file
:ftps: Optional active connection object which is reused
:max_retries: Number of retries before raising error
:uncomp: If true, uncompress files on download
"""
with ftp_tls_cddis(ftps) as ftps:
ftps.cwd(ftp_folder)
retries = 0
download_done = False
while not download_done and retries <= max_retries:
try:
logging.info(f"Attempting Download of: {filename}")
check_n_download(filename, str(output_folder) + "/", ftps, uncomp=uncomp)
download_done = True
logging.info(f"Downloaded {filename}")
except _ftplib.all_errors as e:
retries += 1
if retries > max_retries:
logging.warning(f"Failed to download {filename} and reached maximum retry count ({max_retries}).")
if (output_folder / filename).is_file():
(output_folder / filename).unlink()
raise e

logging.debug(f"Received an error ({e}) while try to download {filename}, retrying({retries}).")
# Add some backoff time (exponential random as it appears to be contention based?)
_time.sleep(_random.uniform(0.0, 2.0**retries))


def download_multiple_files_from_cddis(files: [str], ftp_folder: str, output_folder: _Path) -> None:
"""Downloads multiple files in a single folder from cddis in a thread pool.
:param files: List of str filenames
:ftp_folder: Folder where the file is stored on the remote
:output_folder: Folder to store the output files
"""
with _concurrent.futures.ThreadPoolExecutor() as executor:
# Wrap this in a list to force iteration of results and so get the first exception if any were raised
list(executor.map(download_file_from_cddis, files, _repeat(ftp_folder), _repeat(output_folder)))


# TODO: Deprecate? Only supports legacy filenames
def download_prod(
dates,
dest,
Expand Down Expand Up @@ -504,9 +591,7 @@ def download_prod(

for dt in dt_list:
for f_typ in f_types:

if dwn_src == "cddis":

if repro3:
f, gpswk = gen_prod_filename(dt, pref=ac, suff=suff, f_type=f_typ, repro3=True)
elif (ac == "igs") and (f_typ == "erp"):
Expand Down Expand Up @@ -723,7 +808,6 @@ def download_rinex3(dates, stations, dest, dwn_src="cddis", ftps=False, f_dict=F

for dt in dt_list:
for station in stations:

f_pref = f"{station}_R_"
f_suff_crx = f"0000_01D_30S_MO.crx.gz"
f = f_pref + dt.strftime("%Y%j") + f_suff_crx
Expand Down

0 comments on commit bf0a093

Please sign in to comment.