diff --git a/.github/workflows/process_l2_test.yml b/.github/workflows/process_l2_test.yml new file mode 100644 index 00000000..810086cb --- /dev/null +++ b/.github/workflows/process_l2_test.yml @@ -0,0 +1,52 @@ +on: + pull_request: + types: [opened, reopened, synchronize, edited] + workflow_dispatch: + +jobs: + build: + runs-on: ubuntu-latest + name: process_l2_test + steps: + - name: Install Python + uses: actions/setup-python@v4 + with: + python-version: "3.8" + - name: Checkout repo + uses: actions/checkout@v3 + with: + path: "main" + token: ${{ secrets.GITHUB_TOKEN }} + - name: Install dependencies + shell: bash + run: | + python -m pip install --upgrade pip + pip install wheel + python3 -m pip install --upgrade setuptools + cd $GITHUB_WORKSPACE/main + pip install . + - name: Clone AWS Level 0 data repo for testing + env: + GITLAB_TOKEN : ${{ secrets.GITLAB_TOKEN }} + run: | + cd $GITHUB_WORKSPACE + git clone --depth 1 https://oauth2:${{ env.GITLAB_TOKEN }}@geusgitlab.geus.dk/glaciology-and-climate/promice/aws-l0.git + - name: Run L0 to L3 processing + env: + TEST_STATION: KAN_U HUM + shell: bash + run: | + mkdir $GITHUB_WORKSPACE/out/ + mkdir $GITHUB_WORKSPACE/out/L2/ + for i in $(echo ${{ env.TEST_STATION }} | tr ' ' '\n'); do + python3 $GITHUB_WORKSPACE/main/src/pypromice/process/get_l2.py -c $GITHUB_WORKSPACE/aws-l0/tx/config/$i.toml -i $GITHUB_WORKSPACE/aws-l0/tx -o $GITHUB_WORKSPACE/out/L2/ + done +# mkdir $GITHUB_WORKSPACE/out/L3/ +# for i in $(echo ${{ env.TEST_STATION }} | tr ' ' '\n'); do +# python3 $GITHUB_WORKSPACE/main/src/pypromice/process/get_l2tol3.py -i $GITHUB_WORKSPACE/out/L2/$i/$i_hour.nc -o $GITHUB_WORKSPACE/out/ -t 60min +# done + - name: Upload test output + uses: actions/upload-artifact@v3 + with: + name: result + path: out diff --git a/.github/workflows/process_test.yml b/.github/workflows/process_l3_test.yml similarity index 81% rename from .github/workflows/process_test.yml rename to .github/workflows/process_l3_test.yml index 5ba3d51a..97f8a034 100644 --- a/.github/workflows/process_test.yml +++ b/.github/workflows/process_l3_test.yml @@ -6,7 +6,7 @@ on: jobs: build: runs-on: ubuntu-latest - name: process_test + name: process_l3_test steps: - name: Install Python uses: actions/setup-python@v4 @@ -33,12 +33,12 @@ jobs: git clone --depth 1 https://oauth2:${{ env.GITLAB_TOKEN }}@geusgitlab.geus.dk/glaciology-and-climate/promice/aws-l0.git - name: Run data processing env: - TEST_STATION: KPC_U CEN2 JAR + TEST_STATION: KPC_U CEN2 shell: bash run: | mkdir $GITHUB_WORKSPACE/out/ for i in $(echo ${{ env.TEST_STATION }} | tr ' ' '\n'); do - python3 $GITHUB_WORKSPACE/main/src/pypromice/process/get_l3.py -v $GITHUB_WORKSPACE/main/src/pypromice/process/variables.csv -m $GITHUB_WORKSPACE/main/src/pypromice/process/metadata.csv -c $GITHUB_WORKSPACE/aws-l0/raw/config/$i.toml -i $GITHUB_WORKSPACE/aws-l0/raw -o $GITHUB_WORKSPACE/out/ + python3 $GITHUB_WORKSPACE/main/src/pypromice/process/get_l3.py -c $GITHUB_WORKSPACE/aws-l0/raw/config/$i.toml -i $GITHUB_WORKSPACE/aws-l0/raw -o $GITHUB_WORKSPACE/out/ done - name: Upload test output uses: actions/upload-artifact@v3 diff --git a/setup.py b/setup.py index 2a4638d3..5675d6f6 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setuptools.setup( name="pypromice", - version="1.3.6", + version="1.4.0", author="GEUS Glaciology and Climate", description="PROMICE/GC-Net data processing toolbox", long_description=long_description, @@ -36,14 +36,16 @@ "pypromice.qc.percentiles": ["thresholds.csv"], "pypromice.postprocess": ["station_configurations.toml", "positions_seed.csv"], }, - install_requires=['numpy>=1.23.0', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'Bottleneck', 'netcdf4', 'pyDataverse', 'eccodes','scikit-learn>=1.1.0'], + install_requires=['numpy>=1.23.0', 'pandas>=1.5.0', 'xarray>=2022.6.0', 'toml', 'scipy>=1.9.0', 'Bottleneck', 'netcdf4', 'pyDataverse==0.3.1', 'eccodes', 'scikit-learn>=1.1.0'], # extras_require={'postprocess': ['eccodes','scikit-learn>=1.1.0']}, entry_points={ 'console_scripts': [ 'get_promice_data = pypromice.get.get_promice_data:get_promice_data', 'get_l0tx = pypromice.tx.get_l0tx:get_l0tx', + 'join_l2 = pypromice.process.join_l2:join_l2', + 'get_l2 = pypromice.process.get_l2:get_l2', 'get_l3 = pypromice.process.get_l3:get_l3', - 'join_l3 = pypromice.process.join_l3:join_l3', + 'get_l2tol3 = pypromice.process.get_l2tol3:get_l2tol3', 'get_watsontx = pypromice.tx.get_watsontx:get_watsontx', 'get_bufr = pypromice.postprocess.get_bufr:main', 'get_msg = pypromice.tx.get_msg:get_msg' diff --git a/src/pypromice/process/L1toL2.py b/src/pypromice/process/L1toL2.py index 73ca1a7d..d9ee1e8e 100644 --- a/src/pypromice/process/L1toL2.py +++ b/src/pypromice/process/L1toL2.py @@ -30,7 +30,18 @@ def toL2( eps_clear=9.36508e-6, emissivity=0.97, ) -> xr.Dataset: - '''Process one Level 1 (L1) product to Level 2 + '''Process one Level 1 (L1) product to Level 2. + In this step we do: + - manual flagging and adjustments + - automated QC: persistence, percentile + - custom filter: gps_alt filter, NaN t_rad removed from dlr & ulr + - smoothing of tilt and rot + - calculation of rh with regards to ice in subfreezin conditions + - calculation of cloud coverage + - correction of dsr and usr for tilt + - filtering of dsr based on a theoritical TOA irradiance and grazing light + - calculation of albedo + - calculation of directional wind speed Parameters ---------- @@ -85,10 +96,20 @@ def toL2( ds['dlr'] = ds.dlr.where(ds.t_rad.notnull()) ds['ulr'] = ds.ulr.where(ds.t_rad.notnull()) + # calculating realtive humidity with regard to ice T_100 = _getTempK(T_0) ds['rh_u_cor'] = correctHumidity(ds['rh_u'], ds['t_u'], T_0, T_100, ews, ei0) + if ds.attrs['number_of_booms']==2: + ds['rh_l_cor'] = correctHumidity(ds['rh_l'], ds['t_l'], + T_0, T_100, ews, ei0) + + if hasattr(ds,'t_i'): + if ~ds['t_i'].isnull().all(): + ds['rh_i_cor'] = correctHumidity(ds['rh_i'], ds['t_i'], + T_0, T_100, ews, ei0) + # Determiune cloud cover for on-ice stations cc = calcCloudCoverage(ds['t_u'], T_0, eps_overcast, eps_clear, # Calculate cloud coverage ds['dlr'], ds.attrs['station_id']) @@ -176,22 +197,52 @@ def toL2( ds['precip_u_cor'], ds['precip_u_rate'] = correctPrecip(ds['precip_u'], ds['wspd_u']) if ds.attrs['number_of_booms']==2: - ds['rh_l_cor'] = correctHumidity(ds['rh_l'], ds['t_l'], # Correct relative humidity - T_0, T_100, ews, ei0) - if ~ds['precip_l'].isnull().all() and precip_flag: # Correct precipitation ds['precip_l_cor'], ds['precip_l_rate']= correctPrecip(ds['precip_l'], ds['wspd_l']) - if hasattr(ds,'t_i'): - if ~ds['t_i'].isnull().all(): # Instantaneous msg processing - ds['rh_i_cor'] = correctHumidity(ds['rh_i'], ds['t_i'], # Correct relative humidity - T_0, T_100, ews, ei0) + # Get directional wind speed + ds['wdir_u'] = ds['wdir_u'].where(ds['wspd_u'] != 0) + ds['wspd_x_u'], ds['wspd_y_u'] = calcDirWindSpeeds(ds['wspd_u'], ds['wdir_u']) + + if ds.attrs['number_of_booms']==2: + ds['wdir_l'] = ds['wdir_l'].where(ds['wspd_l'] != 0) + ds['wspd_x_l'], ds['wspd_y_l'] = calcDirWindSpeeds(ds['wspd_l'], ds['wdir_l']) + + if hasattr(ds, 'wdir_i'): + if ~ds['wdir_i'].isnull().all() and ~ds['wspd_i'].isnull().all(): + ds['wdir_i'] = ds['wdir_i'].where(ds['wspd_i'] != 0) + ds['wspd_x_i'], ds['wspd_y_i'] = calcDirWindSpeeds(ds['wspd_i'], ds['wdir_i']) + ds = clip_values(ds, vars_df) return ds +def calcDirWindSpeeds(wspd, wdir, deg2rad=np.pi/180): + '''Calculate directional wind speed from wind speed and direction + + Parameters + ---------- + wspd : xr.Dataarray + Wind speed data array + wdir : xr.Dataarray + Wind direction data array + deg2rad : float + Degree to radians coefficient. The default is np.pi/180 + + Returns + ------- + wspd_x : xr.Dataarray + Wind speed in X direction + wspd_y : xr.Datarray + Wind speed in Y direction + ''' + wspd_x = wspd * np.sin(wdir * deg2rad) + wspd_y = wspd * np.cos(wdir * deg2rad) + return wspd_x, wspd_y + + def calcCloudCoverage(T, T_0, eps_overcast, eps_clear, dlr, station_id): '''Calculate cloud cover from T and T_0 diff --git a/src/pypromice/process/L2toL3.py b/src/pypromice/process/L2toL3.py index a71a4028..9751122e 100755 --- a/src/pypromice/process/L2toL3.py +++ b/src/pypromice/process/L2toL3.py @@ -7,7 +7,10 @@ def toL3(L2, T_0=273.15, z_0=0.001, R_d=287.05, eps=0.622, es_0=6.1071, es_100=1013.246): - '''Process one Level 2 (L2) product to Level 3 (L3) + '''Process one Level 2 (L2) product to Level 3 (L3) meaning calculating all + derived variables: + - Sensible fluxes + Parameters ---------- @@ -32,9 +35,6 @@ def toL3(L2, T_0=273.15, z_0=0.001, R_d=287.05, eps=0.622, es_0=6.1071, T_100 = _getTempK(T_0) # Get steam point temperature as K - ds['wdir_u'] = ds['wdir_u'].where(ds['wspd_u'] != 0) # Get directional wind speed - ds['wspd_x_u'], ds['wspd_y_u'] = calcDirWindSpeeds(ds['wspd_u'], ds['wdir_u']) - # Upper boom bulk calculation T_h_u = ds['t_u'].copy() # Copy for processing p_h_u = ds['p_u'].copy() @@ -85,41 +85,9 @@ def toL3(L2, T_0=273.15, z_0=0.001, R_d=287.05, eps=0.622, es_0=6.1071, q_h_l = cleanSpHumid(q_h_l, T_h_l, Tsurf_h, p_h_l, RH_cor_h_l) # Clean sp.humid values ds['qh_l'] = (('time'), q_h_l.data) - ds['wdir_l'] = ds['wdir_l'].where(ds['wspd_l'] != 0) # Get directional wind speed - ds['wspd_x_l'], ds['wspd_y_l'] = calcDirWindSpeeds(ds['wspd_l'], ds['wdir_l']) - - if hasattr(ds, 'wdir_i'): - if ~ds['wdir_i'].isnull().all() and ~ds['wspd_i'].isnull().all(): # Instantaneous msg processing - ds['wdir_i'] = ds['wdir_i'].where(ds['wspd_i'] != 0) # Get directional wind speed - ds['wspd_x_i'], ds['wspd_y_i'] = calcDirWindSpeeds(ds['wspd_i'], ds['wdir_i']) - return ds -def calcDirWindSpeeds(wspd, wdir, deg2rad=np.pi/180): - '''Calculate directional wind speed from wind speed and direction - - Parameters - ---------- - wspd : xr.Dataarray - Wind speed data array - wdir : xr.Dataarray - Wind direction data array - deg2rad : float - Degree to radians coefficient. The default is np.pi/180 - - Returns - ------- - wspd_x : xr.Dataarray - Wind speed in X direction - wspd_y : xr.Datarray - Wind speed in Y direction - ''' - wspd_x = wspd * np.sin(wdir * deg2rad) - wspd_y = wspd * np.cos(wdir * deg2rad) - return wspd_x, wspd_y - - def calcHeatFlux(T_0, T_h, Tsurf_h, rho_atm, WS_h, z_WS, z_T, nu, q_h, p_h, kappa=0.4, WS_lim=1., z_0=0.001, g=9.82, es_0=6.1071, eps=0.622, gamma=16., L_sub=2.83e6, L_dif_max=0.01, c_pd=1005., aa=0.7, diff --git a/src/pypromice/process/aws.py b/src/pypromice/process/aws.py index 58440595..0d830f34 100644 --- a/src/pypromice/process/aws.py +++ b/src/pypromice/process/aws.py @@ -2,32 +2,24 @@ """ AWS data processing module """ -import logging -from functools import reduce -from importlib import metadata -import os, unittest, toml, datetime, uuid, pkg_resources -from typing import Sequence, Optional - -import numpy as np import warnings - warnings.simplefilter(action='ignore', category=FutureWarning) + +import logging, os import pandas as pd import xarray as xr -from datetime import timedelta +from functools import reduce from pypromice.process.L0toL1 import toL1 from pypromice.process.L1toL2 import toL2 from pypromice.process.L2toL3 import toL3 +from pypromice.process import write, load, utilities +from pypromice.process.resample import resample_dataset pd.set_option('display.precision', 2) xr.set_options(keep_attrs=True) - logger = logging.getLogger(__name__) -#------------------------------------------------------------------------------ - - class AWS(object): '''AWS object to load and process PROMICE AWS data''' @@ -53,15 +45,17 @@ def __init__(self, config_file, inpath, var_file=None, meta_file=None): # Load config, variables CSF standards, and L0 files self.config = self.loadConfig(config_file, inpath) - self.vars = getVars(var_file) - self.meta = getMeta(meta_file) + self.vars = load.getVars(var_file) + self.meta = load.getMeta(meta_file) # Load config file L0 = self.loadL0() self.L0=[] for l in L0: - n = getColNames(self.vars, l.attrs['number_of_booms'], l.attrs['format']) - self.L0.append(popCols(l, n)) + n = write.getColNames(self.vars, + l.attrs['number_of_booms'], + l.attrs['format']) + self.L0.append(utilities.popCols(l, n)) self.L1 = None self.L1A = None @@ -78,18 +72,26 @@ def process(self): self.getL2() self.getL3() - def write(self, outpath): - '''Write L3 data to .csv and .nc file''' + def writeL2(self, outpath): + '''Write L2 data to .csv and .nc file''' if os.path.isdir(outpath): - self.writeArr(outpath) + self.writeArr(self.L2, outpath) else: logger.info(f'Outpath f{outpath} does not exist. Unable to save to file') pass + def writeL3(self, outpath): + '''Write L3 data to .csv and .nc file''' + if os.path.isdir(outpath): + self.writeArr(self.L3, outpath) + else: + logger.info(f'Outpath f{outpath} does not exist. Unable to save to file') + pass + def getL1(self): '''Perform L0 to L1 data processing''' logger.info('Level 1 processing...') - self.L0 = [addBasicMeta(item, self.vars) for item in self.L0] + self.L0 = [utilities.addBasicMeta(item, self.vars) for item in self.L0] self.L1 = [toL1(item, self.vars) for item in self.L0] self.L1A = reduce(xr.Dataset.combine_first, self.L1) @@ -104,82 +106,58 @@ def getL3(self): logger.info('Level 3 processing...') self.L3 = toL3(self.L2) - # Resample L3 product - f = [l.attrs['format'] for l in self.L0] - if 'raw' in f or 'STM' in f: - logger.info('Resampling to 10 minute') - self.L3 = resampleL3(self.L3, '10min') - else: - self.L3 = resampleL3(self.L3, '60min') - logger.info('Resampling to hour') - - # Re-format time - t = self.L3['time'].values - self.L3['time'] = list(t) - - # Switch gps_lon to negative (degrees_east) - # Do this here, and NOT in addMeta, otherwise we switch back to positive - # when calling getMeta in joinL3! PJW - if self.L3.attrs['station_id'] not in ['UWN', 'Roof_GEUS', 'Roof_PROMICE']: - self.L3['gps_lon'] = self.L3['gps_lon'] * -1 - - # Add variable attributes and metadata - self.L3 = self.addAttributes(self.L3) - - # Round all values to specified decimals places - self.L3 = roundValues(self.L3, self.vars) - - def addAttributes(self, L3): - '''Add variable and attribute metadata - - Parameters - ---------- - L3 : xr.Dataset - Level-3 data object - - Returns - ------- - L3 : xr.Dataset - Level-3 data object with attributes - ''' - L3 = addVars(L3, self.vars) - L3 = addMeta(L3, self.meta) - return L3 - - def writeArr(self, outpath): + # def resample(self, dataset): + # '''Resample dataset to specific temporal resolution (based on input + # data type)''' + # f = [l.attrs['format'] for l in self.L0] + # if 'raw' in f or 'STM' in f: + # logger.info('Resampling to 10 minute') + # resampled = resample_dataset(dataset, '10min') + # else: + # resampled = resample_dataset(dataset, '60min') + # logger.info('Resampling to hour') + # return resampled + + def writeArr(self, dataset, outpath, t=None): '''Write L3 data to .nc and .csv hourly and daily files Parameters ---------- + dataset : xarray.Dataset + Dataset to write to file outpath : str Output directory - L3 : AWS.L3 - Level-3 data object + t : str + Resampling string. This is automatically defined based + on the data type if not given. The default is None. ''' - outdir = os.path.join(outpath, self.L3.attrs['station_id']) - if not os.path.isdir(outdir): - os.mkdir(outdir) - - col_names = getColNames( - self.vars, - self.L3.attrs['number_of_booms'], - self.L3.attrs['format'], - self.L3.attrs['bedrock'], - ) - - t = int(pd.Timedelta((self.L3['time'][1] - self.L3['time'][0]).values).total_seconds()) - logger.info('Writing to files...') - if t == 600: - out_csv = os.path.join(outdir, self.L3.attrs['station_id']+'_10min.csv') - out_nc = os.path.join(outdir, self.L3.attrs['station_id']+'_10min.nc') + if t is not None: + write.prepare_and_write(dataset, outpath, self.vars, self.meta, t) else: - out_csv = os.path.join(outdir, self.L3.attrs['station_id']+'_hour.csv') - out_nc = os.path.join(outdir, self.L3.attrs['station_id']+'_hour.nc') - writeCSV(out_csv, self.L3, col_names) - col_names = col_names + ['lat', 'lon', 'alt'] - writeNC(out_nc, self.L3, col_names) - logger.info(f'Written to {out_csv}') - logger.info(f'Written to {out_nc}') + f = [l.attrs['format'] for l in self.L0] + if 'raw' in f or 'STM' in f: + write.prepare_and_write(dataset, outpath, self.vars, + self.meta, '10min') + else: + write.prepare_and_write(dataset, outpath, self.vars, + self.meta, '60min') + + # def addAttributes(self, dataset): + # '''Add variable and attribute metadata + + # Parameters + # ---------- + # dataset : xr.Dataset + # Dataset (i.e. L2 or L3) object + + # Returns + # ------- + # d2 : xr.Dataset + # Data object with attributes + # ''' + # d2 = utilities.addVars(dataset, self.vars) + # d2 = utilities.addMeta(d2, self.meta) + # return d2 def loadConfig(self, config_file, inpath): '''Load configuration from .toml file @@ -196,7 +174,7 @@ def loadConfig(self, config_file, inpath): conf : dict Configuration parameters ''' - conf = getConfig(config_file, inpath) + conf = load.getConfig(config_file, inpath) return conf def loadL0(self): @@ -246,687 +224,7 @@ def readL0file(self, conf): L0 data ''' file_version = conf.get('file_version', -1) - ds = getL0(conf['file'], conf['nodata'], conf['columns'], + ds = load.getL0(conf['file'], conf['nodata'], conf['columns'], conf["skiprows"], file_version, time_offset=conf.get('time_offset')) - ds = populateMeta(ds, conf, ["columns", "skiprows", "modem"]) + ds = utilities.populateMeta(ds, conf, ["columns", "skiprows", "modem"]) return ds - -#------------------------------------------------------------------------------ - -def getConfig(config_file, inpath, default_columns: Sequence[str] = ('msg_lat', 'msg_lon')): - '''Load configuration from .toml file. PROMICE .toml files support defining - features at the top level which apply to all nested properties, but do not - overwrite nested properties if they are defined - - Parameters - ---------- - config_file : str - TOML file path - inpath : str - Input folder directory where L0 files can be found - - Returns - ------- - conf : dict - Configuration dictionary - ''' - conf = toml.load(config_file) # Move all top level keys to nested properties, - top = [_ for _ in conf.keys() if not type(conf[_]) is dict] # if they are not already defined in the nested properties - subs = [_ for _ in conf.keys() if type(conf[_]) is dict] # Insert the section name (config_file) as a file property and config file - for s in subs: - for t in top: - if t not in conf[s].keys(): - conf[s][t] = conf[t] - - conf[s]['conf'] = config_file - conf[s]['file'] = os.path.join(inpath, s) - conf[s]["columns"].extend(default_columns) - - for t in top: conf.pop(t) # Delete all top level keys beause each file - # should carry all properties with it - for k in conf.keys(): # Check required fields are present - for field in ["columns", "station_id", "format", "skiprows"]: - assert(field in conf[k].keys()), field+" not in config keys" - return conf - -def getL0(infile, nodata, cols, skiprows, file_version, - delimiter=',', comment='#', time_offset: Optional[float] = None) -> xr.Dataset: - ''' Read L0 data file into pandas DataFrame object - - Parameters - ---------- - infile : str - L0 file path - nodata : list - List containing value for nan values and reassigned value - cols : list - List of columns in file - skiprows : int - Skip rows value - file_version : int - Version of L0 file - delimiter : str - String delimiter for L0 file - comment : str - Notifier of commented sections in L0 file - time_offset : Optional[float] - Time offset in hours for correcting for non utc time data. - Returns - ------- - ds : xarray.Dataset - L0 Dataset - ''' - if file_version == 1: - df = pd.read_csv(infile, comment=comment, index_col=0, - na_values=nodata, names=cols, - sep=delimiter, - skiprows=skiprows, skip_blank_lines=True, - usecols=range(len(cols)), - low_memory=False) - df['time'] = pd.to_datetime( - df.year.astype(str) \ - + df.doy.astype(str).str.zfill(3) \ - + df.hhmm.astype(str).str.zfill(4), - format='%Y%j%H%M' - ) - df = df.set_index('time') - - else: - df = pd.read_csv(infile, comment=comment, index_col=0, - na_values=nodata, names=cols, parse_dates=True, - sep=delimiter, skiprows=skiprows, - skip_blank_lines=True, - usecols=range(len(cols)), - low_memory=False) - try: - df.index = pd.to_datetime(df.index) - except ValueError as e: - logger.info("\n", infile) - logger.info("\nValueError:") - logger.info(e) - logger.info('\t\t> Trying pd.to_datetime with format=mixed') - try: - df.index = pd.to_datetime(df.index, format='mixed') - except Exception as e: - logger.info("\nDateParseError:") - logger.info(e) - logger.info('\t\t> Trying again removing apostrophes in timestamp (old files format)') - df.index = pd.to_datetime(df.index.str.replace("\"","")) - - if time_offset is not None: - df.index = df.index + timedelta(hours=time_offset) - - # Drop SKIP columns - for c in df.columns: - if c[0:4] == 'SKIP': - df.drop(columns=c, inplace=True) - - # Carry relevant metadata with ds - ds = xr.Dataset.from_dataframe(df) - return ds - -def addBasicMeta(ds, vars_df): - ''' Use a variable lookup table DataFrame to add the basic metadata - to the xarray dataset. This is later amended to finalise L3 - - Parameters - ---------- - ds : xr.Dataset - Dataset to add metadata to - vars_df : pd.DataFrame - Metadata dataframe - - Returns - ------- - ds : xr.Dataset - Dataset with added metadata - ''' - for v in vars_df.index: - if v == 'time': continue # coordinate variable, not normal var - if v not in list(ds.variables): continue - for c in ['standard_name', 'long_name', 'units']: - if isinstance(vars_df[c][v], float) and np.isnan(vars_df[c][v]): continue - ds[v].attrs[c] = vars_df[c][v] - return ds - -def populateMeta(ds, conf, skip): - '''Populate L0 Dataset with metadata dictionary - - Parameters - ---------- - ds : xarray.Dataset - L0 dataset - conf : dict - Metadata dictionary - skip : list - List of column names to skip parsing to metadata - - Returns - ------- - ds : xarray.Dataset - L0 dataset with metadata populated as Dataset attributes - ''' - meta = {} - # skip = ["columns", "skiprows"] - for k in conf.keys(): - if k not in skip: meta[k] = conf[k] - ds.attrs = meta - return ds - -def writeCSV(outfile, Lx, csv_order): - '''Write data product to CSV file - - Parameters - ---------- - outfile : str - Output file path - Lx : xr.Dataset - Dataset to write to file - csv_order : list - List order of variables - ''' - Lcsv = Lx.to_dataframe().dropna(how='all') - if csv_order is not None: - names = [c for c in csv_order if c in list(Lcsv.columns)] - Lcsv = Lcsv[names] - Lcsv.to_csv(outfile) - -def writeNC(outfile, Lx, col_names=None): - '''Write data product to NetCDF file - - Parameters - ---------- - outfile : str - Output file path - Lx : xr.Dataset - Dataset to write to file - ''' - if os.path.isfile(outfile): - os.remove(outfile) - if col_names is not None: - names = [c for c in col_names if c in list(Lx.keys())] - else: - names = list(Lx.keys()) - Lx[names].to_netcdf(outfile, mode='w', format='NETCDF4', compute=True) - -def writeAll(outpath, station_id, l3_h, l3_d, l3_m, csv_order=None): - '''Write L3 hourly, daily and monthly datasets to .nc and .csv - files - - outpath : str - Output file path - station_id : str - Station name - l3_h : xr.Dataset - L3 hourly data - l3_d : xr.Dataset - L3 daily data - l3_m : xr.Dataset - L3 monthly data - csv_order : list, optional - List order of variables - ''' - if not os.path.isdir(outpath): - os.mkdir(outpath) - outfile_h = os.path.join(outpath, station_id + '_hour') - outfile_d = os.path.join(outpath, station_id + '_day') - outfile_m = os.path.join(outpath, station_id + '_month') - for o,l in zip([outfile_h, outfile_d, outfile_m], [l3_h ,l3_d, l3_m]): - writeCSV(o+'.csv',l, csv_order) - writeNC(o+'.nc',l) - - -def popCols(ds, names): - '''Populate dataset with all given variable names - - Parammeters - ----------- - ds : xr.Dataset - Dataset - names : list - List of variable names to populate - ''' - for v in names: - if v not in list(ds.variables): - ds[v] = (('time'), np.arange(ds['time'].size)*np.nan) - return ds - -def getColNames(vars_df, booms=None, data_type=None, bedrock=False): - '''Get all variable names for a given data type, based on a variables - look-up table - - Parameters - ---------- - vars_df : pd.DataFrame - Variables look-up table - booms : int, optional - Number of booms. If this parameter is empty then all variables - regardless of boom type will be passed. The default is None. - data_type : str, optional - Data type, "tx", "STM" or "raw". If this parameter is empty then all - variables regardless of data type will be passed. The default is None. - - Returns - ------- - list - Variable names - ''' - if booms==1: - vars_df = vars_df.loc[vars_df['station_type'].isin(['one-boom','all'])] - elif booms==2: - vars_df = vars_df.loc[vars_df['station_type'].isin(['two-boom','all'])] - - if data_type=='TX': - vars_df = vars_df.loc[vars_df['data_type'].isin(['TX','all'])] - elif data_type=='STM' or data_type=='raw': - vars_df = vars_df.loc[vars_df['data_type'].isin(['raw','all'])] - - col_names = list(vars_df.index) - if isinstance(bedrock, str): - bedrock = (bedrock.lower() == 'true') - if bedrock == True: - col_names.remove('cc') - for v in ['dlhf_u', 'dlhf_l', 'dshf_u', 'dshf_l']: - try: - col_names.remove(v) - except: - pass - return col_names - -def roundValues(ds, df, col='max_decimals'): - '''Round all variable values in data array based on pre-defined rounding - value in variables look-up table DataFrame - - Parameters - ---------- - ds : xr.Dataset - Dataset to round values in - df : pd.Dataframe - Variable look-up table with rounding values - col : str - Column in variable look-up table that contains rounding values. The - default is "max_decimals" - ''' - df = df[col] - df = df.dropna(how='all') - for var in df.index: - if var not in list(ds.variables): - continue - if df[var] is not np.nan: - ds[var] = ds[var].round(decimals=int(df[var])) - return ds - -def addVars(ds, variables): - '''Add variable attributes from file to dataset - - Parameters - ---------- - ds : xarray.Dataset - Dataset to add variable attributes to - variables : pandas.DataFrame - Variables lookup table file - - Returns - ------- - ds : xarray.Dataset - Dataset with metadata - ''' - for k in ds.keys(): - if k not in variables.index: continue - ds[k].attrs['standard_name'] = variables.loc[k]['standard_name'] - ds[k].attrs['long_name'] = variables.loc[k]['long_name'] - ds[k].attrs['units'] = variables.loc[k]['units'] - ds[k].attrs['coverage_content_type'] = variables.loc[k]['coverage_content_type'] - ds[k].attrs['coordinates'] = variables.loc[k]['coordinates'] - return ds - -def addMeta(ds, meta): - '''Add metadata attributes from file to dataset - - Parameters - ---------- - ds : xarray.Dataset - Dataset to add metadata attributes to - meta : dict - Metadata file - - Returns - ------- - ds : xarray.Dataset - Dataset with metadata - ''' - ds['lon'] = ds['gps_lon'].mean() - ds['lon'].attrs = ds['gps_lon'].attrs - - ds['lat'] = ds['gps_lat'].mean() - ds['lat'].attrs = ds['gps_lat'].attrs - - ds['alt'] = ds['gps_alt'].mean() - ds['alt'].attrs = ds['gps_alt'].attrs - - # for k in ds.keys(): # for each var - # if 'units' in ds[k].attrs: - # if ds[k].attrs['units'] == 'C': - # ds[k].attrs['units'] = 'degrees_C' - - # https://wiki.esipfed.org/Attribute_Convention_for_Data_Discovery_1-3#geospatial_bounds - ds.attrs['id'] = 'dk.geus.promice:' + str(uuid.uuid3(uuid.NAMESPACE_DNS, ds.attrs['station_id'])) - ds.attrs['history'] = 'Generated on ' + datetime.datetime.utcnow().isoformat() - ds.attrs['date_created'] = str(datetime.datetime.now().isoformat()) - ds.attrs['date_modified'] = ds.attrs['date_created'] - ds.attrs['date_issued'] = ds.attrs['date_created'] - ds.attrs['date_metadata_modified'] = ds.attrs['date_created'] - - ds.attrs['geospatial_bounds'] = "POLYGON((" + \ - f"{ds['lat'].min().values} {ds['lon'].min().values}, " + \ - f"{ds['lat'].min().values} {ds['lon'].max().values}, " + \ - f"{ds['lat'].max().values} {ds['lon'].max().values}, " + \ - f"{ds['lat'].max().values} {ds['lon'].min().values}, " + \ - f"{ds['lat'].min().values} {ds['lon'].min().values}))" - - ds.attrs['geospatial_lat_min'] = str(ds['lat'].min().values) - ds.attrs['geospatial_lat_max'] = str(ds['lat'].max().values) - ds.attrs['geospatial_lon_min'] = str(ds['lon'].min().values) - ds.attrs['geospatial_lon_max'] = str(ds['lon'].max().values) - ds.attrs['geospatial_vertical_min'] = str(ds['alt'].min().values) - ds.attrs['geospatial_vertical_max'] = str(ds['alt'].max().values) - ds.attrs['geospatial_vertical_positive'] = 'up' - ds.attrs['time_coverage_start'] = str(ds['time'][0].values) - ds.attrs['time_coverage_end'] = str(ds['time'][-1].values) - - try: - ds.attrs['source']= 'pypromice v' + str(metadata.version('pypromice')) - except: - ds.attrs['source'] = 'pypromice' - - # https://www.digi.com/resources/documentation/digidocs/90001437-13/reference/r_iso_8601_duration_format.htm - try: - ds.attrs['time_coverage_duration'] = str(pd.Timedelta((ds['time'][-1] - ds['time'][0]).values).isoformat()) - ds.attrs['time_coverage_resolution'] = str(pd.Timedelta((ds['time'][1] - ds['time'][0]).values).isoformat()) - except: - ds.attrs['time_coverage_duration'] = str(pd.Timedelta(0).isoformat()) - ds.attrs['time_coverage_resolution'] = str(pd.Timedelta(0).isoformat()) - - # Note: int64 dtype (long int) is incompatible with OPeNDAP access via THREDDS for NetCDF files - # See https://stackoverflow.com/questions/48895227/output-int32-time-dimension-in-netcdf-using-xarray - ds.time.encoding["dtype"] = "i4" # 32-bit signed integer - #ds.time.encoding["calendar"] = 'proleptic_gregorian' # this is default - - # Load metadata attributes and add to Dataset - [_addAttr(ds, key, value) for key,value in meta.items()] - - # Check attribute formating - for k,v in ds.attrs.items(): - if not isinstance(v, str) or not isinstance(v, int): - ds.attrs[k]=str(v) - return ds - - -def getVars(v_file=None): - '''Load variables.csv file - - Parameters - ---------- - v_file : str - Variable lookup table file path - - Returns - ------- - pandas.DataFrame - Variables dataframe - ''' - if v_file is None: - with pkg_resources.resource_stream('pypromice', 'process/variables.csv') as stream: - return pd.read_csv(stream, index_col=0, comment="#", encoding='utf-8') - else: - return pd.read_csv(v_file, index_col=0, comment="#") - - -def getMeta(m_file=None, delimiter=','): #TODO change to DataFrame output to match variables.csv - '''Load metadata table - - Parameters - ---------- - m_file : str - Metadata file path - delimiter : str - Metadata character delimiter. The default is "," - - Returns - ------- - meta : dict - Metadata dictionary - ''' - meta={} - if m_file is None: - with pkg_resources.resource_stream('pypromice', 'process/metadata.csv') as stream: - lines = stream.read().decode("utf-8") - lines = lines.split("\n") - else: - with open(m_file, 'r') as f: - lines = f.readlines() - for l in lines[1:]: - try: - meta[l.split(',')[0]] = l.split(delimiter)[1].split('\n')[0].replace(';',',') - except IndexError: - pass - return meta - -def resampleL3(ds_h, t): - '''Resample L3 AWS data, e.g. hourly to daily average. This uses pandas - DataFrame resampling at the moment as a work-around to the xarray Dataset - resampling. As stated, xarray resampling is a lengthy process that takes - ~2-3 minutes per operation: ds_d = ds_h.resample({'time':"1D"}).mean() - This has now been fixed, so needs implementing: - https://github.com/pydata/xarray/issues/4498#event-6610799698 - - Parameters - ---------- - ds_h : xarray.Dataset - L3 AWS dataset either at 10 min (for raw data) or hourly (for tx data) - t : str - Resample factor, same variable definition as in - pandas.DataFrame.resample() - - Returns - ------- - ds_d : xarray.Dataset - L3 AWS dataset resampled to the frequency defined by t - ''' - df_d = ds_h.to_dataframe().resample(t).mean() - - # recalculating wind direction from averaged directional wind speeds - for var in ['wdir_u','wdir_l','wdir_i']: - if var in df_d.columns: - if ('wspd_x_'+var.split('_')[1] in df_d.columns) & ('wspd_x_'+var.split('_')[1] in df_d.columns): - df_d[var] = _calcWindDir(df_d['wspd_x_'+var.split('_')[1]], - df_d['wspd_y_'+var.split('_')[1]]) - else: - logger.info(var,'in dataframe but not','wspd_x_'+var.split('_')[1],'wspd_x_'+var.split('_')[1]) - - # recalculating relative humidity from average vapour pressure and average - # saturation vapor pressure - for var in ['rh_u','rh_l']: - lvl = var.split('_')[1] - if var in df_d.columns: - if ('t_'+lvl in ds_h.keys()): - es_wtr, es_cor = calculateSaturationVaporPressure(ds_h['t_'+lvl]) - p_vap = ds_h[var] / 100 * es_wtr - - df_d[var] = (p_vap.to_series().resample(t).mean() \ - / es_wtr.to_series().resample(t).mean())*100 - df_d[var+'_cor'] = (p_vap.to_series().resample(t).mean() \ - / es_cor.to_series().resample(t).mean())*100 - - vals = [xr.DataArray(data=df_d[c], dims=['time'], - coords={'time':df_d.index}, attrs=ds_h[c].attrs) for c in df_d.columns] - ds_d = xr.Dataset(dict(zip(df_d.columns,vals)), attrs=ds_h.attrs) - return ds_d - - -def calculateSaturationVaporPressure(t, T_0=273.15, T_100=373.15, es_0=6.1071, - es_100=1013.246, eps=0.622): - '''Calculate specific humidity - - Parameters - ---------- - T_0 : float - Steam point temperature. Default is 273.15. - T_100 : float - Steam point temperature in Kelvin - t : xarray.DataArray - Air temperature - es_0 : float - Saturation vapour pressure at the melting point (hPa) - es_100 : float - Saturation vapour pressure at steam point temperature (hPa) - - Returns - ------- - xarray.DataArray - Saturation vapour pressure with regard to water above 0 C (hPa) - xarray.DataArray - Saturation vapour pressure where subfreezing timestamps are with regards to ice (hPa) - ''' - # Saturation vapour pressure above 0 C (hPa) - es_wtr = 10**(-7.90298 * (T_100 / (t + T_0) - 1) + 5.02808 * np.log10(T_100 / (t + T_0)) - - 1.3816E-7 * (10**(11.344 * (1 - (t + T_0) / T_100)) - 1) - + 8.1328E-3 * (10**(-3.49149 * (T_100 / (t + T_0) -1)) - 1) + np.log10(es_100)) - - # Saturation vapour pressure below 0 C (hPa) - es_ice = 10**(-9.09718 * (T_0 / (t + T_0) - 1) - 3.56654 - * np.log10(T_0 / (t + T_0)) + 0.876793 - * (1 - (t + T_0) / T_0) - + np.log10(es_0)) - - # Saturation vapour pressure (hPa) - es_cor = xr.where(t < 0, es_ice, es_wtr) - - return es_wtr, es_cor - - -def _calcWindDir(wspd_x, wspd_y): - '''Calculate wind direction in degrees - - Parameters - ---------- - wspd_x : xarray.DataArray - Wind speed in X direction - wspd_y : xarray.DataArray - Wind speed in Y direction - - Returns - ------- - wdir : xarray.DataArray - Wind direction''' - deg2rad = np.pi / 180 - rad2deg = 1 / deg2rad - wdir = np.arctan2(wspd_x, wspd_y) * rad2deg - wdir = (wdir + 360) % 360 - return wdir - - -def _addAttr(ds, key, value): - '''Add attribute to xarray dataset - - ds : xr.Dataset - Dataset to add attribute to - key : str - Attribute name, with "." denoting variable attributes - value : str/int - Value for attribute''' - if len(key.split('.')) == 2: - try: - ds[key.split('.')[0]].attrs[key.split('.')[1]] = str(value) - except: - pass - # logger.info(f'Unable to add metadata to {key.split(".")[0]}') - else: - ds.attrs[key] = value - - -#------------------------------------------------------------------------------ - -class TestProcess(unittest.TestCase): - - def testgetVars(self): - '''Test variable table lookup retrieval''' - v = getVars() - self.assertIsInstance(v, pd.DataFrame) - self.assertTrue(v.columns[0] in 'standard_name') - self.assertTrue(v.columns[2] in 'units') - - def testgetMeta(self): - '''Test AWS names retrieval''' - m = getMeta() - self.assertIsInstance(m, dict) - self.assertTrue('references' in m) - - def testAddAll(self): - '''Test variable and metadata attributes added to Dataset''' - d = xr.Dataset() - v = getVars() - att = list(v.index) - att1 = ['gps_lon', 'gps_lat', 'gps_alt', 'albedo', 'p'] - for a in att: - d[a]=[0,1] - for a in att1: - d[a]=[0,1] - d['time'] = [datetime.datetime.now(), - datetime.datetime.now()-timedelta(days=365)] - d.attrs['station_id']='TEST' - meta = getMeta() - d = addVars(d, v) - d = addMeta(d, meta) - self.assertTrue(d.attrs['station_id']=='TEST') - self.assertIsInstance(d.attrs['references'], str) - - def testL0toL3(self): - '''Test L0 to L3 processing''' - try: - import pypromice - pAWS = AWS(os.path.join(os.path.dirname(pypromice.__file__),'test/test_config1.toml'), - os.path.join(os.path.dirname(pypromice.__file__),'test')) - except: - pAWS = AWS('../test/test_config1.toml', '../test/') - pAWS.process() - self.assertIsInstance(pAWS.L3, xr.Dataset) - self.assertTrue(pAWS.L3.attrs['station_id']=='TEST1') - - def testCLIgetl3(self): - '''Test get_l3 CLI''' - exit_status = os.system('get_l3 -h') - self.assertEqual(exit_status, 0) - - def testCLIjoinl3(self): - '''Test join_l3 CLI''' - exit_status = os.system('join_l3 -h') - self.assertEqual(exit_status, 0) - -#------------------------------------------------------------------------------ - -if __name__ == "__main__": - - # # Test an individual station - # test_station = 'xxx' - # # config_file = '../../../../aws-l0/raw/config/{}.toml'.format(test_station) - # config_file = '../../../../aws-l0/tx/config/{}.toml'.format(test_station) - # # inpath= '../../../../aws-l0/raw/{}/'.format(test_station) - # inpath= '../../../../aws-l0/tx/' - # vari = 'variables.csv' - # pAWS_gc = AWS(config_file, inpath, var_file=vari) - # pAWS_gc.process() - # pAWS_gc.getL1() - # pAWS_gc.getL2() - # pAWS_gc.getL3() - - # # Use test configs - # config_files = ['test/test_config1.toml', 'test/test_config2.toml'] - # inpath= 'test/' - # outpath = 'test/' - # vari = 'variables.csv' - # for cf in config_files: - # pAWS_gc = AWS(cf, inpath, var_file=vari) - # pAWS_gc.process() - - unittest.main() diff --git a/src/pypromice/process/get_l2.py b/src/pypromice/process/get_l2.py new file mode 100644 index 00000000..0122b53d --- /dev/null +++ b/src/pypromice/process/get_l2.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python +import logging, os, sys, unittest +from argparse import ArgumentParser +import pypromice +from pypromice.process.aws import AWS +from pypromice.process.write import prepare_and_write +from pypromice.process.load import getVars, getMeta + +def parse_arguments_l2(): + parser = ArgumentParser(description="AWS L2 processor") + + parser.add_argument('-c', '--config_file', type=str, required=True, + help='Path to config (TOML) file') + parser.add_argument('-i', '--inpath', type=str, required=True, + help='Path to input data') + parser.add_argument('-o', '--outpath', default=None, type=str, required=False, + help='Path where to write output') + parser.add_argument('-v', '--variables', default=None, type=str, + required=False, help='File path to variables look-up table') + parser.add_argument('-m', '--metadata', default=None, type=str, + required=False, help='File path to metadata') + args = parser.parse_args() + return args + +def get_l2(): + args = parse_arguments_l2() + + logging.basicConfig( + format="%(asctime)s; %(levelname)s; %(name)s; %(message)s", + level=logging.INFO, + stream=sys.stdout, + ) + + # Define variables (either from file or pypromice package defaults) + if args.variables is None: + v = os.path.join(os.path.dirname(pypromice.__file__),'process/variables.csv') + else: + v = args.variables + + # Define metadata (either from file or pypromice package defaults) + if args.variables is None: + m = os.path.join(os.path.dirname(pypromice.__file__),'process/metadata.csv') + else: + m = args.metadata + + # Define input path + station_name = args.config_file.split('/')[-1].split('.')[0] + station_path = os.path.join(args.inpath, station_name) + if os.path.exists(station_path): + aws = AWS(args.config_file, station_path, v, m) + else: + aws = AWS(args.config_file, args.inpath, v, m) + + # Perform level 1 and 2 processing + aws.getL1() + aws.getL2() + + # Write out level 2 + if args.outpath is not None: + if not os.path.isdir(args.outpath): + os.mkdir(args.outpath) + if aws.L2.attrs['format'] == 'raw': + prepare_and_write(aws.L2, args.outpath, getVars(), getMeta(), '10min') + prepare_and_write(aws.L2, args.outpath, getVars(), getMeta(), '60min') + + +if __name__ == "__main__": + get_l2() + \ No newline at end of file diff --git a/src/pypromice/process/get_l2tol3.py b/src/pypromice/process/get_l2tol3.py new file mode 100644 index 00000000..e8ed5a96 --- /dev/null +++ b/src/pypromice/process/get_l2tol3.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +import os, logging, sys +import xarray as xr +from argparse import ArgumentParser +import pypromice +from pypromice.process.load import getVars, getMeta +from pypromice.process.L2toL3 import toL3 +from pypromice.process.write import prepare_and_write + +def parse_arguments_l2tol3(debug_args=None): + parser = ArgumentParser(description="AWS L3 script for the processing L3 "+ + "data from L2 and merging the L3 data with its "+ + "historical site. An hourly, daily and monthly L3 "+ + "data product is outputted to the defined output path") + parser.add_argument('-i', '--inpath', type=str, required=True, + help='Path to Level 2 .nc data file') + parser.add_argument('-o', '--outpath', default=None, type=str, required=False, + help='Path where to write output') + parser.add_argument('-v', '--variables', default=None, type=str, + required=False, help='File path to variables look-up table') + parser.add_argument('-m', '--metadata', default=None, type=str, + required=False, help='File path to metadata') + parser.add_argument('-g', '--gcnet_historical', default=None, type=str, + required=False, help='File path to historical GC-Net data file') + + # here will come additional arguments for the merging with historical stations + args = parser.parse_args(args=debug_args) + return args + +def get_l2tol3(): + args = parse_arguments_l2tol3() + logging.basicConfig( + format="%(asctime)s; %(levelname)s; %(name)s; %(message)s", + level=logging.INFO, + stream=sys.stdout, + ) + + # Define variables and metadata (either from file or pypromice package defaults) + v = getVars(args.variables) + m = getMeta(args.metadata) + + # Define Level 2 dataset from file + with xr.open_dataset(args.inpath) as l2: + l2.load() + if 'bedrock' in l2.attrs.keys(): + l2.attrs['bedrock'] = l2.attrs['bedrock'] == 'True' + if 'number_of_booms' in l2.attrs.keys(): + l2.attrs['number_of_booms'] = int(l2.attrs['number_of_booms']) + + # Perform Level 3 processing + l3 = toL3(l2) + + # Write Level 3 dataset to file if output directory given + if args.outpath is not None: + prepare_and_write(l3, args.outpath, v, m, '60min') + prepare_and_write(l3, args.outpath, v, m, '1D') + prepare_and_write(l3, args.outpath, v, m, 'M') + +if __name__ == "__main__": + get_l2tol3() diff --git a/src/pypromice/process/get_l3.py b/src/pypromice/process/get_l3.py old mode 100755 new mode 100644 index 4cc18436..688ada59 --- a/src/pypromice/process/get_l3.py +++ b/src/pypromice/process/get_l3.py @@ -1,6 +1,7 @@ #!/usr/bin/env python import logging, os, sys, unittest from argparse import ArgumentParser +import pypromice from pypromice.process.aws import AWS def parse_arguments_l3(): @@ -8,7 +9,7 @@ def parse_arguments_l3(): parser.add_argument('-c', '--config_file', type=str, required=True, help='Path to config (TOML) file') - parser.add_argument('-i', '--inpath', default='data', type=str, required=True, + parser.add_argument('-i', '--inpath', type=str, required=True, help='Path to input data') parser.add_argument('-o', '--outpath', default=None, type=str, required=False, help='Path where to write output') @@ -16,6 +17,8 @@ def parse_arguments_l3(): required=False, help='File path to variables look-up table') parser.add_argument('-m', '--metadata', default=None, type=str, required=False, help='File path to metadata') + parser.add_argument('-t', '--time', default=None, type=str, + required=False, help='Resampling frequency') args = parser.parse_args() return args @@ -28,18 +31,36 @@ def get_l3(): stream=sys.stdout, ) + # Define variables (either from file or pypromice package defaults) + if args.variables is None: + v = os.path.join(os.path.dirname(pypromice.__file__),'process/variables.csv') + else: + v = args.variables + + # Define metadata (either from file or pypromice package defaults) + if args.variables is None: + m = os.path.join(os.path.dirname(pypromice.__file__),'process/metadata.csv') + else: + m = args.metadata + + # Define input path station_name = args.config_file.split('/')[-1].split('.')[0] station_path = os.path.join(args.inpath, station_name) - if os.path.exists(station_path): - aws = AWS(args.config_file, station_path, args.variables, args.metadata) + aws = AWS(args.config_file, station_path, v, m) else: - aws = AWS(args.config_file, args.inpath, args.variables, args.metadata) + aws = AWS(args.config_file, args.inpath, v, m) - aws.process() - + # Perform level 1 to 3 processing + aws.getL1() + aws.getL2() + aws.getL3() + + # Write out level 3 if args.outpath is not None: - aws.write(args.outpath) + if not os.path.isdir(args.outpath): + os.mkdir(args.outpath) + aws.writeArr(aws.L3, args.outpath, args.time) if __name__ == "__main__": get_l3() diff --git a/src/pypromice/process/join_l3.py b/src/pypromice/process/join_l2.py similarity index 59% rename from src/pypromice/process/join_l3.py rename to src/pypromice/process/join_l2.py index 2e0d3fe3..28bf2e0e 100644 --- a/src/pypromice/process/join_l3.py +++ b/src/pypromice/process/join_l2.py @@ -3,47 +3,55 @@ import pandas as pd import xarray as xr from argparse import ArgumentParser -from pypromice.process import getVars, getMeta, addMeta, getColNames, \ - roundValues, resampleL3, writeAll +from pypromice.process.load import getVars, getMeta +from pypromice.process.utilities import addMeta, roundValues +from pypromice.process.write import prepare_and_write from pypromice.process.L1toL2 import correctPrecip def parse_arguments_join(): - parser = ArgumentParser(description="AWS L3 joiner for merging together two L3 products, for example an L3 RAW and L3 TX data product. An hourly, daily and monthly L3 data product is outputted to the defined output path") + parser = ArgumentParser(description="AWS L2 joiner for merging together two L2 products, for example an L2 RAW and L2 TX data product. An hourly, daily and monthly L2 data product is outputted to the defined output path") parser.add_argument('-s', '--file1', type=str, required=True, - help='Path to source L3 file, which will be preferenced in merge process') + help='Path to source L2 file, which will be preferenced in merge process') parser.add_argument('-t', '--file2', type=str, required=True, - help='Path to target L3 file, which will be used to fill gaps in merge process') + help='Path to target L2 file, which will be used to fill gaps in merge process') parser.add_argument('-o', '--outpath', default=os.getcwd(), type=str, required=True, help='Path where to write output') parser.add_argument('-v', '--variables', default=None, type=str, required=False, help='Path to variables look-up table .csv file for variable name retained'''), parser.add_argument('-m', '--metadata', default=None, type=str, required=False, help='Path to metadata table .csv file for metadata information'''), - parser.add_argument('-d', '--datatype', default='raw', type=str, required=False, - help='Data type to output, raw or tx') args = parser.parse_args() return args def loadArr(infile): - if infile.split('.')[-1].lower() in 'csv': + if infile.split('.')[-1].lower() == 'csv': df = pd.read_csv(infile, index_col=0, parse_dates=True) ds = xr.Dataset.from_dataframe(df) - - elif infile.split('.')[-1].lower() in 'nc': - ds = xr.open_dataset(infile) - + elif infile.split('.')[-1].lower() == 'nc': + with xr.open_dataset(infile) as ds: + ds.load() + try: - name = ds.attrs['station_name'] + name = ds.attrs['station_id'] except: name = infile.split('/')[-1].split('.')[0].split('_hour')[0].split('_10min')[0] - + ds.attrs['station_id'] = name + if 'bedrock' in ds.attrs.keys(): + ds.attrs['bedrock'] = ds.attrs['bedrock'] == 'True' + if 'number_of_booms' in ds.attrs.keys(): + ds.attrs['number_of_booms'] = int(ds.attrs['number_of_booms']) + print(f'{name} array loaded from {infile}') return ds, name -def join_l3(): +def join_l2(): args = parse_arguments_join() - + + # Define variables and metadata (either from file or pypromice package defaults) + v = getVars(args.variables) + m = getMeta(args.metadata) + # Check files if os.path.isfile(args.file1) and os.path.isfile(args.file2): @@ -85,46 +93,12 @@ def join_l3(): else: print(f'Invalid files {args.file1}, {args.file2}') exit() - - # Get hourly, daily and monthly datasets - print('Resampling L3 data to hourly, daily and monthly resolutions...') - l3_h = resampleL3(all_ds, '60min') - l3_d = resampleL3(all_ds, '1D') - l3_m = resampleL3(all_ds, 'M') - - print(f'Adding variable information from {args.variables}...') - - # Load variables look-up table - var = getVars(args.variables) - - # Round all values to specified decimals places - l3_h = roundValues(l3_h, var) - l3_d = roundValues(l3_d, var) - l3_m = roundValues(l3_m, var) - - # Get columns to keep - if hasattr(all_ds, 'p_l'): - col_names = getColNames(var, 2, args.datatype.lower()) - else: - col_names = getColNames(var, 1, args.datatype.lower()) - # Assign station id - for l in [l3_h, l3_d, l3_m]: - l.attrs['station_id'] = name - - # Assign metadata - print(f'Adding metadata from {args.metadata}...') - m = getMeta(args.metadata) - l3_h = addMeta(l3_h, m) - l3_d = addMeta(l3_d, m) - l3_m = addMeta(l3_m, m) - - # Set up output path - out = os.path.join(args.outpath, name) + + # Resample to hourly, daily and monthly datasets and write to file + prepare_and_write(all_ds, args.outpath, v, m, resample = False) - # Write to files - writeAll(out, name, l3_h, l3_d, l3_m, col_names) - print(f'Files saved to {os.path.join(out, name)}...') + print(f'Files saved to {os.path.join(args.outpath, name)}...') if __name__ == "__main__": - join_l3() + join_levels() diff --git a/src/pypromice/process/load.py b/src/pypromice/process/load.py new file mode 100644 index 00000000..186784c6 --- /dev/null +++ b/src/pypromice/process/load.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Load module +""" +import pkg_resources, toml, os, logging +import pandas as pd +import xarray as xr +from typing import Sequence, Optional +from datetime import timedelta +logger = logging.getLogger(__name__) + +def getConfig(config_file, inpath, default_columns: Sequence[str] = ('msg_lat', 'msg_lon')): + '''Load configuration from .toml file. PROMICE .toml files support defining + features at the top level which apply to all nested properties, but do not + overwrite nested properties if they are defined + + Parameters + ---------- + config_file : str + TOML file path + inpath : str + Input folder directory where L0 files can be found + + Returns + ------- + conf : dict + Configuration dictionary + ''' + conf = toml.load(config_file) # Move all top level keys to nested properties, + top = [_ for _ in conf.keys() if not type(conf[_]) is dict] # if they are not already defined in the nested properties + subs = [_ for _ in conf.keys() if type(conf[_]) is dict] # Insert the section name (config_file) as a file property and config file + for s in subs: + for t in top: + if t not in conf[s].keys(): + conf[s][t] = conf[t] + + conf[s]['conf'] = config_file + conf[s]['file'] = os.path.join(inpath, s) + conf[s]["columns"].extend(default_columns) + + for t in top: conf.pop(t) # Delete all top level keys beause each file + # should carry all properties with it + for k in conf.keys(): # Check required fields are present + for field in ["columns", "station_id", "format", "skiprows"]: + assert(field in conf[k].keys()), field+" not in config keys" + return conf + +def getL0(infile, nodata, cols, skiprows, file_version, + delimiter=',', comment='#', time_offset: Optional[float] = None) -> xr.Dataset: + ''' Read L0 data file into pandas DataFrame object + + Parameters + ---------- + infile : str + L0 file path + nodata : list + List containing value for nan values and reassigned value + cols : list + List of columns in file + skiprows : int + Skip rows value + file_version : int + Version of L0 file + delimiter : str + String delimiter for L0 file + comment : str + Notifier of commented sections in L0 file + time_offset : Optional[float] + Time offset in hours for correcting for non utc time data. + Returns + ------- + ds : xarray.Dataset + L0 Dataset + ''' + if file_version == 1: + df = pd.read_csv(infile, comment=comment, index_col=0, + na_values=nodata, names=cols, + sep=delimiter, + skiprows=skiprows, skip_blank_lines=True, + usecols=range(len(cols)), + low_memory=False) + df['time'] = pd.to_datetime( + df.year.astype(str) \ + + df.doy.astype(str).str.zfill(3) \ + + df.hhmm.astype(str).str.zfill(4), + format='%Y%j%H%M' + ) + df = df.set_index('time') + + else: + df = pd.read_csv(infile, comment=comment, index_col=0, + na_values=nodata, names=cols, parse_dates=True, + sep=delimiter, skiprows=skiprows, + skip_blank_lines=True, + usecols=range(len(cols)), + low_memory=False) + try: + df.index = pd.to_datetime(df.index) + except ValueError as e: + logger.info("\n", infile) + logger.info("\nValueError:") + logger.info(e) + logger.info('\t\t> Trying pd.to_datetime with format=mixed') + try: + df.index = pd.to_datetime(df.index, format='mixed') + except Exception as e: + logger.info("\nDateParseError:") + logger.info(e) + logger.info('\t\t> Trying again removing apostrophes in timestamp (old files format)') + df.index = pd.to_datetime(df.index.str.replace("\"","")) + + if time_offset is not None: + df.index = df.index + timedelta(hours=time_offset) + + # Drop SKIP columns + for c in df.columns: + if c[0:4] == 'SKIP': + df.drop(columns=c, inplace=True) + + # Carry relevant metadata with ds + ds = xr.Dataset.from_dataframe(df) + return ds + +def getVars(v_file=None): + '''Load variables.csv file + + Parameters + ---------- + v_file : str + Variable lookup table file path + + Returns + ------- + pandas.DataFrame + Variables dataframe + ''' + if v_file is None: + with pkg_resources.resource_stream('pypromice', 'process/variables.csv') as stream: + return pd.read_csv(stream, index_col=0, comment="#", encoding='utf-8') + else: + return pd.read_csv(v_file, index_col=0, comment="#") + + +def getMeta(m_file=None, delimiter=','): #TODO change to DataFrame output to match variables.csv + '''Load metadata table + + Parameters + ---------- + m_file : str + Metadata file path + delimiter : str + Metadata character delimiter. The default is "," + + Returns + ------- + meta : dict + Metadata dictionary + ''' + meta={} + if m_file is None: + with pkg_resources.resource_stream('pypromice', 'process/metadata.csv') as stream: + lines = stream.read().decode("utf-8") + lines = lines.split("\n") + else: + with open(m_file, 'r') as f: + lines = f.readlines() + for l in lines[1:]: + try: + meta[l.split(',')[0]] = l.split(delimiter)[1].split('\n')[0].replace(';',',') + except IndexError: + pass + return meta \ No newline at end of file diff --git a/src/pypromice/process/resample.py b/src/pypromice/process/resample.py new file mode 100644 index 00000000..b4af07e2 --- /dev/null +++ b/src/pypromice/process/resample.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Mon Jun 10 10:58:39 2024 + +@author: pho +""" +import logging +import numpy as np +import xarray as xr +logger = logging.getLogger(__name__) + +def resample_dataset(ds_h, t): + '''Resample L2 AWS data, e.g. hourly to daily average. This uses pandas + DataFrame resampling at the moment as a work-around to the xarray Dataset + resampling. As stated, xarray resampling is a lengthy process that takes + ~2-3 minutes per operation: ds_d = ds_h.resample({'time':"1D"}).mean() + This has now been fixed, so needs implementing: + https://github.com/pydata/xarray/issues/4498#event-6610799698 + + Parameters + ---------- + ds_h : xarray.Dataset + L3 AWS dataset either at 10 min (for raw data) or hourly (for tx data) + t : str + Resample factor, same variable definition as in + pandas.DataFrame.resample() + + Returns + ------- + ds_d : xarray.Dataset + L3 AWS dataset resampled to the frequency defined by t + ''' + df_d = ds_h.to_dataframe().resample(t).mean() + + # recalculating wind direction from averaged directional wind speeds + for var in ['wdir_u','wdir_l','wdir_i']: + if var in df_d.columns: + if ('wspd_x_'+var.split('_')[1] in df_d.columns) & ('wspd_x_'+var.split('_')[1] in df_d.columns): + df_d[var] = _calcWindDir(df_d['wspd_x_'+var.split('_')[1]], + df_d['wspd_y_'+var.split('_')[1]]) + else: + logger.info(var,'in dataframe but not','wspd_x_'+var.split('_')[1],'wspd_x_'+var.split('_')[1]) + + # recalculating relative humidity from average vapour pressure and average + # saturation vapor pressure + for var in ['rh_u','rh_l']: + lvl = var.split('_')[1] + if var in df_d.columns: + if ('t_'+lvl in ds_h.keys()): + es_wtr, es_cor = calculateSaturationVaporPressure(ds_h['t_'+lvl]) + p_vap = ds_h[var] / 100 * es_wtr + + df_d[var] = (p_vap.to_series().resample(t).mean() \ + / es_wtr.to_series().resample(t).mean())*100 + df_d[var+'_cor'] = (p_vap.to_series().resample(t).mean() \ + / es_cor.to_series().resample(t).mean())*100 + + vals = [xr.DataArray(data=df_d[c], dims=['time'], + coords={'time':df_d.index}, attrs=ds_h[c].attrs) for c in df_d.columns] + ds_d = xr.Dataset(dict(zip(df_d.columns,vals)), attrs=ds_h.attrs) + return ds_d + + +def calculateSaturationVaporPressure(t, T_0=273.15, T_100=373.15, es_0=6.1071, + es_100=1013.246, eps=0.622): + '''Calculate specific humidity + + Parameters + ---------- + T_0 : float + Steam point temperature. Default is 273.15. + T_100 : float + Steam point temperature in Kelvin + t : xarray.DataArray + Air temperature + es_0 : float + Saturation vapour pressure at the melting point (hPa) + es_100 : float + Saturation vapour pressure at steam point temperature (hPa) + + Returns + ------- + xarray.DataArray + Saturation vapour pressure with regard to water above 0 C (hPa) + xarray.DataArray + Saturation vapour pressure where subfreezing timestamps are with regards to ice (hPa) + ''' + # Saturation vapour pressure above 0 C (hPa) + es_wtr = 10**(-7.90298 * (T_100 / (t + T_0) - 1) + 5.02808 * np.log10(T_100 / (t + T_0)) + - 1.3816E-7 * (10**(11.344 * (1 - (t + T_0) / T_100)) - 1) + + 8.1328E-3 * (10**(-3.49149 * (T_100 / (t + T_0) -1)) - 1) + np.log10(es_100)) + + # Saturation vapour pressure below 0 C (hPa) + es_ice = 10**(-9.09718 * (T_0 / (t + T_0) - 1) - 3.56654 + * np.log10(T_0 / (t + T_0)) + 0.876793 + * (1 - (t + T_0) / T_0) + + np.log10(es_0)) + + # Saturation vapour pressure (hPa) + es_cor = xr.where(t < 0, es_ice, es_wtr) + + return es_wtr, es_cor + +def _calcWindDir(wspd_x, wspd_y): + '''Calculate wind direction in degrees + + Parameters + ---------- + wspd_x : xarray.DataArray + Wind speed in X direction + wspd_y : xarray.DataArray + Wind speed in Y direction + + Returns + ------- + wdir : xarray.DataArray + Wind direction''' + deg2rad = np.pi / 180 + rad2deg = 1 / deg2rad + wdir = np.arctan2(wspd_x, wspd_y) * rad2deg + wdir = (wdir + 360) % 360 + return wdir diff --git a/src/pypromice/process/test.py b/src/pypromice/process/test.py new file mode 100644 index 00000000..df81e88d --- /dev/null +++ b/src/pypromice/process/test.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Processing test module +""" +from pypromice.process.aws import AWS +from pypromice.process.load import getVars, getMeta +from pypromice.process.utilities import addVars, addMeta +import xarray as xr +import pandas as pd +import unittest, datetime, os +from datetime import timedelta + +class TestProcess(unittest.TestCase): + + def testgetVars(self): + '''Test variable table lookup retrieval''' + v = getVars() + self.assertIsInstance(v, pd.DataFrame) + self.assertTrue(v.columns[0] in 'standard_name') + self.assertTrue(v.columns[2] in 'units') + + def testgetMeta(self): + '''Test AWS names retrieval''' + m = getMeta() + self.assertIsInstance(m, dict) + self.assertTrue('references' in m) + + def testAddAll(self): + '''Test variable and metadata attributes added to Dataset''' + d = xr.Dataset() + v = getVars() + att = list(v.index) + att1 = ['gps_lon', 'gps_lat', 'gps_alt', 'albedo', 'p'] + for a in att: + d[a]=[0,1] + for a in att1: + d[a]=[0,1] + d['time'] = [datetime.datetime.now(), + datetime.datetime.now()-timedelta(days=365)] + d.attrs['station_id']='TEST' + meta = getMeta() + d = addVars(d, v) + d = addMeta(d, meta) + self.assertTrue(d.attrs['station_id']=='TEST') + self.assertIsInstance(d.attrs['references'], str) + + def testL0toL3(self): + '''Test L0 to L3 processing''' + try: + import pypromice + pAWS = AWS(os.path.join(os.path.dirname(pypromice.__file__),'test/test_config1.toml'), + os.path.join(os.path.dirname(pypromice.__file__),'test')) + except: + pAWS = AWS('../test/test_config1.toml', '../test/') + pAWS.process() + self.assertIsInstance(pAWS.L2, xr.Dataset) + self.assertTrue(pAWS.L2.attrs['station_id']=='TEST1') + + def testCLIgetl2(self): + '''Test get_l2 CLI''' + exit_status = os.system('get_l2 -h') + self.assertEqual(exit_status, 0) + + def testCLIgetl3(self): + '''Test get_l3 CLI''' + exit_status = os.system('get_l3 -h') + self.assertEqual(exit_status, 0) + + def testCLIjoinl2(self): + '''Test join_l2 CLI''' + exit_status = os.system('join_l2 -h') + self.assertEqual(exit_status, 0) + + def testCLIjoinl3(self): + '''Test join_l2 CLI''' + exit_status = os.system('get_l2tol3 -h') + self.assertEqual(exit_status, 0) + +if __name__ == "__main__": + + unittest.main() diff --git a/src/pypromice/process/utilities.py b/src/pypromice/process/utilities.py new file mode 100644 index 00000000..ad0d66ca --- /dev/null +++ b/src/pypromice/process/utilities.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Utilities module for data formatting, populating and metadata handling +""" +import datetime, uuid +from importlib import metadata +import pandas as pd +import numpy as np + +def roundValues(ds, df, col='max_decimals'): + '''Round all variable values in data array based on pre-defined rounding + value in variables look-up table DataFrame + + Parameters + ---------- + ds : xr.Dataset + Dataset to round values in + df : pd.Dataframe + Variable look-up table with rounding values + col : str + Column in variable look-up table that contains rounding values. The + default is "max_decimals" + ''' + df = df[col] + df = df.dropna(how='all') + for var in df.index: + if var not in list(ds.variables): + continue + if df[var] is not np.nan: + ds[var] = ds[var].round(decimals=int(df[var])) + return ds + +def reformat_time(dataset): + '''Re-format time''' + t = dataset['time'].values + dataset['time'] = list(t) + return dataset + +def reformat_lon(dataset, exempt=['UWN', 'Roof_GEUS', 'Roof_PROMICE']): + '''Switch gps_lon to negative values (degrees_east). We do this here, and + NOT in addMeta, otherwise we switch back to positive when calling getMeta + in joinL2''' + if dataset.attrs['station_id'] not in exempt: + dataset['gps_lon'] = dataset['gps_lon'] * -1 + return dataset + +def popCols(ds, names): + '''Populate dataset with all given variable names + + Parammeters + ----------- + ds : xr.Dataset + Dataset + names : list + List of variable names to populate + ''' + for v in names: + if v not in list(ds.variables): + ds[v] = (('time'), np.arange(ds['time'].size)*np.nan) + return ds + +def addBasicMeta(ds, vars_df): + ''' Use a variable lookup table DataFrame to add the basic metadata + to the xarray dataset. This is later amended to finalise L3 + + Parameters + ---------- + ds : xr.Dataset + Dataset to add metadata to + vars_df : pd.DataFrame + Metadata dataframe + + Returns + ------- + ds : xr.Dataset + Dataset with added metadata + ''' + for v in vars_df.index: + if v == 'time': continue # coordinate variable, not normal var + if v not in list(ds.variables): continue + for c in ['standard_name', 'long_name', 'units']: + if isinstance(vars_df[c][v], float) and np.isnan(vars_df[c][v]): continue + ds[v].attrs[c] = vars_df[c][v] + return ds + +def populateMeta(ds, conf, skip): + '''Populate L0 Dataset with metadata dictionary + + Parameters + ---------- + ds : xarray.Dataset + L0 dataset + conf : dict + Metadata dictionary + skip : list + List of column names to skip parsing to metadata + + Returns + ------- + ds : xarray.Dataset + L0 dataset with metadata populated as Dataset attributes + ''' + meta = {} + # skip = ["columns", "skiprows"] + for k in conf.keys(): + if k not in skip: meta[k] = conf[k] + ds.attrs = meta + return ds + + +def addVars(ds, variables): + '''Add variable attributes from file to dataset + + Parameters + ---------- + ds : xarray.Dataset + Dataset to add variable attributes to + variables : pandas.DataFrame + Variables lookup table file + + Returns + ------- + ds : xarray.Dataset + Dataset with metadata + ''' + for k in ds.keys(): + if k not in variables.index: continue + ds[k].attrs['standard_name'] = variables.loc[k]['standard_name'] + ds[k].attrs['long_name'] = variables.loc[k]['long_name'] + ds[k].attrs['units'] = variables.loc[k]['units'] + ds[k].attrs['coverage_content_type'] = variables.loc[k]['coverage_content_type'] + ds[k].attrs['coordinates'] = variables.loc[k]['coordinates'] + return ds + +def addMeta(ds, meta): + '''Add metadata attributes from file to dataset + + Parameters + ---------- + ds : xarray.Dataset + Dataset to add metadata attributes to + meta : dict + Metadata file + + Returns + ------- + ds : xarray.Dataset + Dataset with metadata + ''' + ds['lon'] = ds['gps_lon'].mean() + ds['lon'].attrs = ds['gps_lon'].attrs + + ds['lat'] = ds['gps_lat'].mean() + ds['lat'].attrs = ds['gps_lat'].attrs + + ds['alt'] = ds['gps_alt'].mean() + ds['alt'].attrs = ds['gps_alt'].attrs + + # for k in ds.keys(): # for each var + # if 'units' in ds[k].attrs: + # if ds[k].attrs['units'] == 'C': + # ds[k].attrs['units'] = 'degrees_C' + + # https://wiki.esipfed.org/Attribute_Convention_for_Data_Discovery_1-3#geospatial_bounds + ds.attrs['id'] = 'dk.geus.promice:' + str(uuid.uuid3(uuid.NAMESPACE_DNS, ds.attrs['station_id'])) + ds.attrs['history'] = 'Generated on ' + datetime.datetime.utcnow().isoformat() + ds.attrs['date_created'] = str(datetime.datetime.now().isoformat()) + ds.attrs['date_modified'] = ds.attrs['date_created'] + ds.attrs['date_issued'] = ds.attrs['date_created'] + ds.attrs['date_metadata_modified'] = ds.attrs['date_created'] + + ds.attrs['geospatial_bounds'] = "POLYGON((" + \ + f"{ds['lat'].min().values} {ds['lon'].min().values}, " + \ + f"{ds['lat'].min().values} {ds['lon'].max().values}, " + \ + f"{ds['lat'].max().values} {ds['lon'].max().values}, " + \ + f"{ds['lat'].max().values} {ds['lon'].min().values}, " + \ + f"{ds['lat'].min().values} {ds['lon'].min().values}))" + + ds.attrs['geospatial_lat_min'] = str(ds['lat'].min().values) + ds.attrs['geospatial_lat_max'] = str(ds['lat'].max().values) + ds.attrs['geospatial_lon_min'] = str(ds['lon'].min().values) + ds.attrs['geospatial_lon_max'] = str(ds['lon'].max().values) + ds.attrs['geospatial_vertical_min'] = str(ds['alt'].min().values) + ds.attrs['geospatial_vertical_max'] = str(ds['alt'].max().values) + ds.attrs['geospatial_vertical_positive'] = 'up' + ds.attrs['time_coverage_start'] = str(ds['time'][0].values) + ds.attrs['time_coverage_end'] = str(ds['time'][-1].values) + + try: + ds.attrs['source']= 'pypromice v' + str(metadata.version('pypromice')) + except: + ds.attrs['source'] = 'pypromice' + + # https://www.digi.com/resources/documentation/digidocs/90001437-13/reference/r_iso_8601_duration_format.htm + try: + ds.attrs['time_coverage_duration'] = str(pd.Timedelta((ds['time'][-1] - ds['time'][0]).values).isoformat()) + ds.attrs['time_coverage_resolution'] = str(pd.Timedelta((ds['time'][1] - ds['time'][0]).values).isoformat()) + except: + ds.attrs['time_coverage_duration'] = str(pd.Timedelta(0).isoformat()) + ds.attrs['time_coverage_resolution'] = str(pd.Timedelta(0).isoformat()) + + # Note: int64 dtype (long int) is incompatible with OPeNDAP access via THREDDS for NetCDF files + # See https://stackoverflow.com/questions/48895227/output-int32-time-dimension-in-netcdf-using-xarray + ds.time.encoding["dtype"] = "i4" # 32-bit signed integer + #ds.time.encoding["calendar"] = 'proleptic_gregorian' # this is default + + # Load metadata attributes and add to Dataset + [_addAttr(ds, key, value) for key,value in meta.items()] + + # Check attribute formating + for k,v in ds.attrs.items(): + if not isinstance(v, str) or not isinstance(v, int): + ds.attrs[k]=str(v) + return ds + +def _addAttr(ds, key, value): + '''Add attribute to xarray dataset + + ds : xr.Dataset + Dataset to add attribute to + key : str + Attribute name, with "." denoting variable attributes + value : str/int + Value for attribute''' + if len(key.split('.')) == 2: + try: + ds[key.split('.')[0]].attrs[key.split('.')[1]] = str(value) + except: + pass + # logger.info(f'Unable to add metadata to {key.split(".")[0]}') + else: + ds.attrs[key] = value \ No newline at end of file diff --git a/src/pypromice/process/write.py b/src/pypromice/process/write.py new file mode 100644 index 00000000..75c36485 --- /dev/null +++ b/src/pypromice/process/write.py @@ -0,0 +1,193 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Write dataset module +""" +import os, logging +import pandas as pd +logger = logging.getLogger(__name__) + +from pypromice.process.resample import resample_dataset +from pypromice.process import utilities, write + +def prepare_and_write(dataset, outpath, vars_df, meta_dict, time='60min', resample=True): + '''Prepare data with resampling, formating and metadata population; then + write data to .nc and .csv hourly and daily files + + Parameters + ---------- + dataset : xarray.Dataset + Dataset to write to file + outpath : str + Output directory + vars_df : pandas.DataFrame + Variables look-up table dataframe + meta_dict : dictionary + Metadata dictionary to write to dataset + time : str + Resampling interval for output dataset + ''' + # Resample dataset + if resample: + d2 = resample_dataset(dataset, time) + logger.info('Resampling to '+str(time)) + else: + d2 = dataset.copy() + + # Reformat time + d2 = utilities.reformat_time(d2) + + # Reformat longitude (to negative values) + d2 = utilities.reformat_lon(d2) + + # Add variable attributes and metadata + d2 = utilities.addVars(d2, vars_df) + d2 = utilities.addMeta(d2, meta_dict) + + # Round all values to specified decimals places + d2 = utilities.roundValues(d2, vars_df) + + # Create out directory + outdir = os.path.join(outpath, d2.attrs['station_id']) + if not os.path.isdir(outdir): + os.mkdir(outdir) + + # Get variable names to write out + col_names = write.getColNames( + vars_df, + d2.attrs['number_of_booms'], + d2.attrs['format'], + d2.attrs['bedrock'], + ) + + # Define filename based on resample rate + t = int(pd.Timedelta((d2['time'][1] - d2['time'][0]).values).total_seconds()) + if t == 600: + out_csv = os.path.join(outdir, d2.attrs['station_id']+'_10min.csv') + out_nc = os.path.join(outdir, d2.attrs['station_id']+'_10min.nc') + elif t == 3600: + out_csv = os.path.join(outdir, d2.attrs['station_id']+'_hour.csv') + out_nc = os.path.join(outdir, d2.attrs['station_id']+'_hour.nc') + elif t == 86400: + out_csv = os.path.join(outdir, d2.attrs['station_id']+'_day.csv') + out_nc = os.path.join(outdir, d2.attrs['station_id']+'_day.nc') + else: + out_csv = os.path.join(outdir, d2.attrs['station_id']+'_month.csv') + out_nc = os.path.join(outdir, d2.attrs['station_id']+'_month.nc') + if not os.path.isdir(outdir): + os.mkdir(outdir) + # Write to csv file + logger.info('Writing to files...') + write.writeCSV(out_csv, d2, col_names) + + # Write to netcdf file + col_names = col_names + ['lat', 'lon', 'alt'] + write.writeNC(out_nc, d2, col_names) + logger.info(f'Written to {out_csv}') + logger.info(f'Written to {out_nc}') + +def writeAll(outpath, station_id, l3_h, l3_d, l3_m, csv_order=None): + '''Write L3 hourly, daily and monthly datasets to .nc and .csv + files + + Parameters + ---------- + outpath : str + Output file path + station_id : str + Station name + l3_h : xr.Dataset + L3 hourly data + l3_d : xr.Dataset + L3 daily data + l3_m : xr.Dataset + L3 monthly data + csv_order : list, optional + List order of variables + ''' + if not os.path.isdir(outpath): + os.mkdir(outpath) + outfile_h = os.path.join(outpath, station_id + '_hour') + outfile_d = os.path.join(outpath, station_id + '_day') + outfile_m = os.path.join(outpath, station_id + '_month') + for o,l in zip([outfile_h, outfile_d, outfile_m], [l3_h ,l3_d, l3_m]): + writeCSV(o+'.csv',l, csv_order) + writeNC(o+'.nc',l) + +def writeCSV(outfile, Lx, csv_order): + '''Write data product to CSV file + + Parameters + ---------- + outfile : str + Output file path + Lx : xr.Dataset + Dataset to write to file + csv_order : list + List order of variables + ''' + Lcsv = Lx.to_dataframe().dropna(how='all') + if csv_order is not None: + names = [c for c in csv_order if c in list(Lcsv.columns)] + Lcsv = Lcsv[names] + Lcsv.to_csv(outfile) + +def writeNC(outfile, Lx, col_names=None): + '''Write data product to NetCDF file + + Parameters + ---------- + outfile : str + Output file path + Lx : xr.Dataset + Dataset to write to file + ''' + if os.path.isfile(outfile): + os.remove(outfile) + if col_names is not None: + names = [c for c in col_names if c in list(Lx.keys())] + else: + names = list(Lx.keys()) + Lx[names].to_netcdf(outfile, mode='w', format='NETCDF4', compute=True) + +def getColNames(vars_df, booms=None, data_type=None, bedrock=False): + '''Get all variable names for a given data type, based on a variables + look-up table. This is mainly for exporting purposes + + Parameters + ---------- + vars_df : pd.DataFrame + Variables look-up table + booms : int, optional + Number of booms. If this parameter is empty then all variables + regardless of boom type will be passed. The default is None. + data_type : str, optional + Data type, "tx", "STM" or "raw". If this parameter is empty then all + variables regardless of data type will be passed. The default is None. + + Returns + ------- + list + Variable names + ''' + if booms==1: + vars_df = vars_df.loc[vars_df['station_type'].isin(['one-boom','all'])] + elif booms==2: + vars_df = vars_df.loc[vars_df['station_type'].isin(['two-boom','all'])] + + if data_type=='TX': + vars_df = vars_df.loc[vars_df['data_type'].isin(['TX','all'])] + elif data_type=='STM' or data_type=='raw': + vars_df = vars_df.loc[vars_df['data_type'].isin(['raw','all'])] + + col_names = list(vars_df.index) + if isinstance(bedrock, str): + bedrock = (bedrock.lower() == 'true') + if bedrock == True: + col_names.remove('cc') + for v in ['dlhf_u', 'dlhf_l', 'dshf_u', 'dshf_l']: + try: + col_names.remove(v) + except: + pass + return col_names diff --git a/src/pypromice/tx/get_l0tx.py b/src/pypromice/tx/get_l0tx.py index 05a9051e..5720a14f 100644 --- a/src/pypromice/tx/get_l0tx.py +++ b/src/pypromice/tx/get_l0tx.py @@ -6,7 +6,7 @@ from glob import glob from datetime import datetime, timedelta -from pypromice.tx import getMail, L0tx, sortLines +from pypromice.tx import getMail, L0tx, sortLines, isModified def parse_arguments_l0tx(): @@ -134,9 +134,12 @@ def get_l0tx(): #---------------------------------- if out_dir is not None: - # Sort L0tx files and add tails - print(out_dir+'/'+args.name+'*.txt') - for f in glob(out_dir+'/'+args.name+'*.txt'): + + # Find modified files (within the last hour) + mfiles = [mfile for mfile in glob(out_dir+'/'+args.name+'*.txt') if isModified(mfile, 1)] + + # Sort L0tx files and add tails + for f in mfiles: # Sort lines in L0tx file and remove duplicates in_dirn, in_fn = os.path.split(f) diff --git a/src/pypromice/tx/tx.py b/src/pypromice/tx/tx.py index 983f2239..2167b874 100644 --- a/src/pypromice/tx/tx.py +++ b/src/pypromice/tx/tx.py @@ -886,13 +886,13 @@ def sortLines(in_file, out_file, replace_unsorted=True): # lines = in_f.readlines() # Remove duplicate lines and sort - unique_lines = findDuplicates(lines) + unique_lines = findDuplicates(lines.copy()) unique_lines.sort() - - # Write sorted file - with open(out_file, 'w') as out_f: - # out_f.write(headers) - out_f.writelines(unique_lines) + if lines != unique_lines: + # Write sorted file + with open(out_file, 'w') as out_f: + # out_f.write(headers) + out_f.writelines(unique_lines) # Replace input file with new sorted file if replace_unsorted: @@ -930,6 +930,27 @@ def addTail(in_file, out_dir, aws_name, header_names='', lines_limit=100): out_f.writelines(tail) print(f'Tails file written to {out_pn}') +def isModified(filename, time_threshold=1): + '''Return flag denoting if file is modified within a certain timeframe + + Parameters + ---------- + filename : str + File path + time_threshold : int + Time threshold (provided in hours) + + Returns + ------- + bool + Flag denoting if modified (True) or not (False) + ''' + delta = time.time() - os.path.getmtime(filename) + delta = delta / (60*60) + if delta < time_threshold: + return True + return False + def _loadTestMsg(): '''Load test .msg email file''' with pkg_resources.resource_stream('pypromice', 'test/test_email') as stream: