diff --git a/src/acquisition_master.R b/src/acquisition_master.R index 4cb9ec40..af018484 100644 --- a/src/acquisition_master.R +++ b/src/acquisition_master.R @@ -203,7 +203,7 @@ ms_init <- function(use_gpu = FALSE, return(instance_details) } -ms_instance <- ms_init(use_ms_error_handling = TRUE, +ms_instance <- ms_init(use_ms_error_handling = FALSE, # force_machine_status = 'n00b', config_storage_location = 'remote') @@ -264,7 +264,7 @@ ms_globals <- c(ls(all.names = TRUE), 'ms_globals') dir.create('logs', showWarnings = FALSE) -# dmnrow = 8 +# dmnrow = 12 # print(network_domain, n=50) for(dmnrow in 1:nrow(network_domain)){ @@ -281,7 +281,7 @@ for(dmnrow in 1:nrow(network_domain)){ # held_data = invalidate_tracked_data(network, domain, 'derive') # owrite_tracker(network, domain) - # held_data = invalidate_tracked_data(network, domain, 'munge', 'precipitation') + # held_data = invalidate_tracked_data(network, domain, 'munge', 'stream_chemistry') # owrite_tracker(network, domain) # held_data = invalidate_tracked_data(network, domain, 'derive', 'stream_flux_inst') # owrite_tracker(network, domain) @@ -297,7 +297,7 @@ for(dmnrow in 1:nrow(network_domain)){ update_product_statuses(network = network, domain = domain) get_all_local_helpers(network = network, - domain = domain) + domain = domain) ms_retrieve(network = network, # prodname_filter = c('stream_chemistry'), @@ -312,7 +312,7 @@ for(dmnrow in 1:nrow(network_domain)){ verbose = TRUE)) } ms_derive(network = network, - prodname_filter = c('discharge'), + prodname_filter = c('precip_pchem_pflux'), domain = domain) if(domain != 'mcmurdo'){ diff --git a/src/dev/dev_helpers.R b/src/dev/dev_helpers.R index 8c841aab..6f9afeda 100644 --- a/src/dev/dev_helpers.R +++ b/src/dev/dev_helpers.R @@ -1423,3 +1423,17 @@ insert_retrieval_datetimes <- function(){ write_lines(rt, f) } } + +get_nonnumerics <- function(d){ + + #gets unique nonnumeric values by row. useful for identifying quality codes + #within data columns + + nonnumerics = apply(d, 2, function(x){ + xx = as.numeric(x) + nonnumerics = is.na(xx) + out = unique(x[nonnumerics]) + }) + + return(nonnumerics) +} diff --git a/src/global/function_aliases.R b/src/global/function_aliases.R index 4e9a0ba9..383152a4 100644 --- a/src/global/function_aliases.R +++ b/src/global/function_aliases.R @@ -50,5 +50,9 @@ map = purrr::map map2 = purrr::map st_read = sf::st_read errors = errors::errors +drop_errors = errors::drop_errors +set_errors = errors::set_errors pivot_wider = tidyr::pivot_wider pivot_longer = tidyr::pivot_longer +rename = dplyr::rename +where = tidyselect:::where diff --git a/src/global/general_kernels.R b/src/global/general_kernels.R index ef2d545f..12b6debe 100644 --- a/src/global/general_kernels.R +++ b/src/global/general_kernels.R @@ -1822,36 +1822,24 @@ process_3_ms824 <- function(network, domain, prodname_ms, site_code, googledrive::drive_rm('GEE/rgee.csv', verbose = FALSE) - - final <- fin_table %>% + fin_table <- fin_table %>% select(date, site_code, dayl, prcp, srad, swe, tmax, tmin, vp) - if(nrow(final) == 0){ + if(nrow(fin_table) == 0){ return(generate_ms_exception(glue('No data was retrived for {s}', s = site_code))) } dir.create(glue('data/{n}/{d}/ws_traits/daymet/', n = network, - d = domain)) + d = domain), + showWarnings = FALSE) file_path <- glue('data/{n}/{d}/ws_traits/daymet/domain_climate.feather', n = network, d = domain) - write_feather(final, file_path) - - # type <- str_split_fixed(prodname_ms, '__', n = Inf)[,1] - # - # dir <- glue('data/{n}/{d}/ws_traits/{v}/', - # n = network, d = domain, v = type) - # - # final <- append_unprod_prefix(final, prodname_ms) - # final_sum <- append_unprod_prefix(final_sum, prodname_ms) - # - # save_general_files(final_file = final_sum, - # raw_file = final, - # domain_dir = dir) + write_feather(fin_table, file_path) return() } diff --git a/src/global/global_helpers.R b/src/global/global_helpers.R index 40047a99..b57ee38f 100644 --- a/src/global/global_helpers.R +++ b/src/global/global_helpers.R @@ -409,8 +409,8 @@ identify_sampling_bypass <- function(df, prodname_ms, sampling_type = NULL){ - #This case is used (primarily for neon) when use of d_raw and - # ms_cast_flag are not used because of incaomptable data structures + #This case is used (primarily for neon) when use of ms_read_raw_csv and + # ms_cast_and_reflag are prohibited because of incompatible data structures #checks if(!is.logical(is_sensor)){ @@ -700,8 +700,10 @@ ms_read_raw_csv <- function(filepath, #set_to_NA: For values such as 9999 that are proxies for NA values. #convert_to_BDL_flag: character vector of QC flags that should be interpreted # as "below detection limit". For numeric codes, e.g. -888, give their - # character representations, i.e. "-888". - # This is only for below-detection-limit flags within data columns. + # character representations, i.e. "-888". Accepts '#*#' as a wildcard that + # can stand in for any numeral or a decimal. Wildcard is useful for forms like + # "<0.03", "<0.05", etc. Instead of listing these, you can just pass "<#*#". + # This parameter is only for below-detection-limit flags within data columns. # Codes will be standardized to "BDL" and extracted into the variable-flag column # corresponding to each data variable. Variable-flag columns will be created # as necessary. See ms_cast_and_reflag for the next step in handling BDL data. @@ -973,10 +975,17 @@ ms_read_raw_csv <- function(filepath, #which will be converted to 1/2 detlim downstream bdl_cols_do_not_drop <- c() new_varflag_cols <- c() + all_datacols <- c(data_cols, alt_datacols) for(i in seq_along(convert_to_BDL_flag)){ bdl_flag <- convert_to_BDL_flag[i] - all_datacols <- c(data_cols, alt_datacols) + if(grepl('#*#', bdl_flag)){ + bdl_flag <- sub('#*#', '[0-9\\.]+', bdl_flag, fixed = TRUE) + has_wildcard <- TRUE + } else { + has_wildcard <- FALSE + } + for(j in seq_along(all_datacols)){ d_varcode <- unname(all_datacols)[j] @@ -984,7 +993,12 @@ ms_read_raw_csv <- function(filepath, d_clm <- d[[d_colname]] if(is.null(d_clm)) next #column doesn't exist - bdl_inds <- ! is.na(d_clm) & d_clm == bdl_flag + if(has_wildcard){ + bdl_inds <- ! is.na(d_clm) & grepl(bdl_flag, d_clm) + } else { + bdl_inds <- ! is.na(d_clm) & d_clm == bdl_flag + } + if(! any(bdl_inds)) next #this bdl code doesn't exist in this column if(! (length(var_flagcols) == 1 && is.na(var_flagcols))){ @@ -995,6 +1009,10 @@ ms_read_raw_csv <- function(filepath, var_flagcol_already_exists <- FALSE } + if(candidate_flagcol %in% new_varflag_cols){ + var_flagcol_already_exists <- TRUE + } + if(! var_flagcol_already_exists){ d[[candidate_flagcol]] <- NA_character_ new_varflag_cols <- c(new_varflag_cols, candidate_flagcol) @@ -5935,7 +5953,8 @@ shortcut_idw <- function(encompassing_dem, d_elev <- tibble(site_code = rownames(dk), d = dk[,1]) %>% left_join(data_locations, - by = 'site_code') + by = 'site_code') %>% + mutate(d = errors::drop_errors(d)) mod <- lm(d ~ elevation, data = d_elev) ab <- as.list(mod$coefficients) @@ -5945,9 +5964,10 @@ shortcut_idw <- function(encompassing_dem, # Set all negative values to 0 d_from_elev[d_from_elev < 0] <- 0 - #average both approaches (this should be weighted toward idw - #when close to any data location, and weighted half and half when far) - d_idw <- (d_idw + d_from_elev) / 2 + #get weighted mean of both approaches: + #weight on idw is 1; weight on elev-predicted is R^2 + rsq <- cor(d_elev$d, mod$fitted.values)^2 + d_idw <- (d_idw + d_from_elev * rsq) / (1 + rsq) } ws_mean[k] <- mean(d_idw, na.rm=TRUE) @@ -6567,12 +6587,12 @@ ms_linear_interpolate <- function(d, interval){ ms_interp_column <- is.na(d$val) d_interp <- d %>% - mutate( + mutate(val_err = errors::errors(val), + val = errors::drop_errors(val), - #carry ms_status to any rows that have just been populated (probably - #redundant now, but can't hurt) ms_status = imputeTS::na_locf(ms_status, - na_remaining = 'rev'), + na_remaining = 'rev', + maxgap = max_samples_to_impute), # val = if(sum(! is.na(val)) > 2){ # @@ -6591,38 +6611,31 @@ ms_linear_interpolate <- function(d, interval){ maxgap = max_samples_to_impute) #unless not enough data in group; then do nothing - } else val + } else val, + val_err = if(sum(! is.na(val_err)) > 1){ + #do the same for uncertainty + imputeTS::na_interpolation(val_err, + maxgap = max_samples_to_impute) + } else val_err ) - err <- errors(d_interp$val) #extract error from data vals - err[err == 0] <- NA_real_ #change new uncerts (0s by default) to NA - if(sum(! is.na(err)) > 0){ - #and then carry error to interped rows - errors(d_interp$val) <- imputeTS::na_locf(err, na_remaining = 'rev') - } else { - errors(d_interp$val) <- 0 # #unless not enough error to interp - } + errors::errors(d_interp$val) <- d_interp$val_err + d_interp$val_err <- NULL + + # err <- errors(d_interp$val) #extract error from data vals + # err[err == 0] <- NA_real_ #change new uncerts (0s by default) to NA + # if(sum(! is.na(err)) > 0){ + # #and then carry error to interped rows + # errors(d_interp$val) <- imputeTS::na_locf(err, na_remaining = 'rev') + # } else { + # errors(d_interp$val) <- 0 # #unless not enough error to interp + # } d_interp <- d_interp %>% select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp'))) %>% arrange(site_code, var, datetime) - # mutate( - # err = errors(val), #extract error from data vals - # err = case_when( - # err == 0 ~ NA_real_, #change new uncerts (0s by default) to NA - # TRUE ~ err), - # val = if(sum(! is.na(err)) > 0){ - # set_errors(val, #and then carry error to interped rows - # imputeTS::na_locf(err, - # na_remaining = 'rev')) - # } else { - # set_errors(val, #unless not enough error to interp - # 0) - # }) %>% - # select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp'))) %>% - # arrange(site_code, var, datetime) - + d_interp$ms_status[is.na(d_interp$ms_status)] = 0 ms_interp_column <- ms_interp_column & ! is.na(d_interp$val) d_interp$ms_interp <- as.numeric(ms_interp_column) d_interp <- filter(d_interp, @@ -6666,13 +6679,14 @@ ms_nocb_interpolate <- function(d, interval){ ms_interp_column <- is.na(d$val) d_interp <- d %>% - mutate( + mutate(val_err = errors::errors(val), + val = errors::drop_errors(val), - #carry ms_status to any rows that have just been populated (probably - #redundant now, but can't hurt) + #carry ms_status to any rows that have just been populated ms_status = imputeTS::na_locf(ms_status, option = 'nocb', - na_remaining = 'rev'), + na_remaining = 'rev', + maxgap = max_samples_to_impute), val = if(sum(! is.na(val)) > 1){ @@ -6683,22 +6697,90 @@ ms_nocb_interpolate <- function(d, interval){ maxgap = max_samples_to_impute) #unless not enough data in group; then do nothing - } else val + } else val, + val_err = if(sum(! is.na(val_err)) > 1){ + + #do the same for uncertainty + imputeTS::na_locf(val_err, + option = 'nocb', + na_remaining = 'keep', + maxgap = max_samples_to_impute) + } else val_err ) - err <- errors(d_interp$val) #extract error from data vals - err[err == 0] <- NA_real_ #change new uncerts (0s by default) to NA - if(sum(! is.na(err)) > 0){ - #and then carry error to interped rows - errors(d_interp$val) <- imputeTS::na_locf(err, option = 'nocb') - } else { - errors(d_interp$val) <- 0 # #unless not enough error to interp + errors::errors(d_interp$val) <- d_interp$val_err + d_interp$val_err <- NULL + + d_interp <- d_interp %>% + select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp'))) %>% + arrange(site_code, var, datetime) + + + d_interp$ms_status[is.na(d_interp$ms_status)] = 0 + ms_interp_column <- ms_interp_column & ! is.na(d_interp$val) + d_interp$ms_interp <- as.numeric(ms_interp_column) + d_interp <- filter(d_interp, + ! is.na(val)) + + return(d_interp) +} + +ms_zero_interpolate <- function(d, interval){ + + #d: a ms tibble with no ms_interp column (this will be created) + #interval: the sampling interval (either '15 min' or '1 day'). + + #for precip only, and only relevant at konza (so far) + + #fills gaps up to maxgap (determined automatically), then removes missing values + + if(length(unique(d$site_code)) > 1){ + stop(paste('ms_zero_interpolate is not designed to handle datasets', + 'with more than one site.')) } + if(length(unique(d$var)) > 1){ + stop(paste('ms_zero_interpolate is not designed to handle datasets', + 'with more than one variable')) + } + + if(! interval %in% c('15 min', '1 day')){ + stop('interval must be "15 min" or "1 day", unless we have decided otherwise') + } + + var <- drop_var_prefix(d$var[1]) + max_samples_to_impute <- 45 #fixed because this func is only called for precip + + if(interval == '15 min'){ + max_samples_to_impute <- max_samples_to_impute * 96 + } + + d <- arrange(d, datetime) + ms_interp_column <- is.na(d$val) + + d_interp <- d %>% + mutate( + + ms_status = imputeTS::na_replace(ms_status, + fill = 1, + maxgap = max_samples_to_impute), + + val = if(sum(! is.na(val)) > 1){ + + #nocb interp NA vals + imputeTS::na_replace(val, + fill = 0, + maxgap = max_samples_to_impute) + + #unless not enough data in group; then do nothing + } else val + ) + d_interp <- d_interp %>% select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp'))) %>% arrange(site_code, var, datetime) + d_interp$ms_status[is.na(d_interp$ms_status)] = 0 ms_interp_column <- ms_interp_column & ! is.na(d_interp$val) d_interp$ms_interp <- as.numeric(ms_interp_column) d_interp <- filter(d_interp, @@ -6744,13 +6826,15 @@ ms_nocb_mean_interpolate <- function(d, interval){ ms_interp_column <- is.na(d$val) d_interp <- d %>% - mutate( + mutate(val_err = errors::errors(val), + val = errors::drop_errors(val), #carry ms_status to any rows that have just been populated (probably #redundant now, but can't hurt) ms_status = imputeTS::na_locf(ms_status, option = 'nocb', - na_remaining = 'rev'), + na_remaining = 'rev', + maxgap = max_samples_to_impute), val = if(sum(! is.na(val)) > 1){ @@ -6761,17 +6845,28 @@ ms_nocb_mean_interpolate <- function(d, interval){ maxgap = max_samples_to_impute) #unless not enough data in group; then do nothing - } else val + } else val, + val_err = if(sum(! is.na(val_err)) > 1){ + + #do the same for uncertainty + imputeTS::na_locf(val_err, + option = 'nocb', + na_remaining = 'keep', + maxgap = max_samples_to_impute) + } else val_err ) - err <- errors(d_interp$val) #extract error from data vals - err[err == 0] <- NA_real_ #change new uncerts (0s by default) to NA - if(sum(! is.na(err)) > 0){ - #and then carry error to interped rows - errors(d_interp$val) <- imputeTS::na_locf(err, option = 'nocb') - } else { - errors(d_interp$val) <- 0 # #unless not enough error to interp - } + errors::errors(d_interp$val) <- d_interp$val_err + d_interp$val_err <- NULL + + # err <- errors(d_interp$val) #extract error from data vals + # err[err == 0] <- NA_real_ #change new uncerts (0s by default) to NA + # if(sum(! is.na(err)) > 0){ + # #and then carry error to interped rows + # errors(d_interp$val) <- imputeTS::na_locf(err, option = 'nocb') + # } else { + # errors(d_interp$val) <- 0 # #unless not enough error to interp + # } d_interp <- d_interp %>% select(any_of(c('datetime', 'site_code', 'var', 'val', 'ms_status', 'ms_interp'))) %>% @@ -6789,19 +6884,30 @@ ms_nocb_mean_interpolate <- function(d, interval){ err_ <- errors::errors(d_interp$val) d_interp$val <- errors::drop_errors(d_interp$val) - vals_interpted <- d_interp$val * laginterp + vals_interped <- d_interp$val * laginterp + err_interped <- err_ * laginterp #use run length encoding to do the division quickly - vals_new <- rle2(vals_interpted) %>% + vals_new <- rle2(vals_interped) %>% mutate(values = values / lengths) %>% select(lengths, values) %>% as.list() class(vals_new) <- 'rle' vals_new <- inverse.rle(vals_new) + #same for uncertainty + err_new <- rle2(err_interped) %>% + mutate(values = values / lengths) %>% + select(lengths, values) %>% + as.list() + class(err_new) <- 'rle' + err_new <- inverse.rle(err_new) + real_vals_new <- vals_new != 0 d_interp$val[real_vals_new] <- vals_new[real_vals_new] - errors::errors(d_interp$val) <- err_ + errors::errors(d_interp$val) <- err_new + + d_interp$ms_status[is.na(d_interp$ms_status)] = 0 return(d_interp) } @@ -6984,9 +7090,12 @@ synchronize_timestep <- function(d, prodname_ms_ = get('prodname_ms')){ d = sitevar_chunk, interval = rounding_intervals[i]) } else { #precip - d_split[[i]] <- ms_nocb_mean_interpolate( + d_split[[i]] <- ms_zero_interpolate( #so far only needed for konza d = sitevar_chunk, interval = rounding_intervals[i]) + # d_split[[i]] <- ms_nocb_mean_interpolate( #this might apply in some cases, but not yet. + # d = sitevar_chunk, + # interval = rounding_intervals[i]) } } @@ -11456,6 +11565,12 @@ approxjoin_datetime <- function(x, indices_only = FALSE){ #direction = 'forward'){ + #TODO: update to match nearest non-NA value by column if we ever go sub-daily. + #some code for this in place below, but note that implementing this will + #break incides_only. also there will be no good way to select the matching + #date in cases where there are multiple data columns. i.e. there will be + #seprate matched dates for each column... not sure how to handle. + #x and y: macrosheds standard tibbles with only one site_code, # which must be the same in x and y. Nonstandard tibbles may also work, # so long as they have datetime columns, but the only case where we need @@ -11515,37 +11630,47 @@ approxjoin_datetime <- function(x, } if(! is.logical(indices_only)) stop('indices_only must be a logical') - #deal with the case of x or y being a specialized "flow" tibble - # x_is_flowtibble <- y_is_flowtibble <- FALSE - # if('flow' %in% colnames(x)) x_is_flowtibble <- TRUE - # if('flow' %in% colnames(y)) y_is_flowtibble <- TRUE - # if(x_is_flowtibble && ! y_is_flowtibble){ - # varname <- y$var[1] - # y$var = NULL - # } else if(y_is_flowtibble && ! x_is_flowtibble){ - # varname <- x$var[1] - # x$var = NULL - # } else if(! x_is_flowtibble && ! y_is_flowtibble){ - # varname <- x$var[1] - # x$var = NULL - # y$var = NULL - # } else { - # stop('x and y are both "flow" tibbles. There should be no need for this') - # } - # if(x_is_flowtibble) x <- rename(x, val = flow) - # if(y_is_flowtibble) y <- rename(y, val = flow) - #data.table doesn't work with the errors package, so error needs - #to be separated into its own column. also give same-name columns suffixes + #to be separated into its own column and handled with care. + + # #this will be useful if we go sub-daily + # if(any(c('val', 'ms_status') %in% colnames(x))){ + # + # x <- x %>% + # mutate( + # # across(where(~inherits(., 'errors')), + # # ~case_when(! is.na(.) & is.na(errors(.)) ~ set_errors(., 0), TRUE ~ .)), + # across(where(~inherits(., 'errors')), + # ~errors(.), + # .names = '{.col}_err'), + # across(where(~inherits(., 'errors')), + # ~drop_errors(.))) %>% + # rename_with(.fn = ~paste0(., '_x'), + # .cols = everything()) %>% + # # rename(datetime_x = datetime) %>% + # as.data.table() + # + # y <- y %>% + # mutate( + # # across(where(~inherits(., 'errors')), + # # ~case_when(! is.na(.) & is.na(errors(.)) ~ set_errors(., 0), TRUE ~ .)), + # across(where(~inherits(., 'errors')), + # ~errors(.), + # .names = '{.col}_err'), + # across(where(~inherits(., 'errors')), + # ~drop_errors(.))) %>% + # rename_with(.fn = ~paste0(., '_y'), + # .cols = everything()) %>% + # # rename(datetime_y = datetime) %>% + # as.data.table() + + if('val' %in% colnames(x)){ - if('val' %in% colnames(x)){ #crude catch for nonstandard ms tibbles (fine for now) x <- x %>% mutate(err = errors(val), val = errors::drop_errors(val)) %>% rename_with(.fn = ~paste0(., '_x'), .cols = everything()) %>% - # .cols = any_of(c('site_code', 'var', 'val', - # 'ms_status', 'ms_interp'))) %>% as.data.table() y <- y %>% @@ -11554,9 +11679,23 @@ approxjoin_datetime <- function(x, rename_with(.fn = ~paste0(., '_y'), .cols = everything()) %>% as.data.table() + } else { - x <- dplyr::rename(x, datetime_x = datetime) %>% as.data.table() - y <- dplyr::rename(y, datetime_y = datetime) %>% as.data.table() + + if(indices_only){ + x <- rename(x, datetime_x = datetime) %>% + mutate(across(where(~inherits(., 'errors')), + ~drop_errors(.))) %>% + as.data.table() + + y <- rename(y, datetime_y = datetime) %>% + mutate(across(where(~inherits(., 'errors')), + ~drop_errors(.))) %>% + as.data.table() + } else { + stop('this case not yet handled') + } + } #alternative implementation of the "on" argument in data.table joins... @@ -11582,14 +11721,6 @@ approxjoin_datetime <- function(x, datetime_max = datetime_x + rollmax)] y[, `:=` (datetime_y_orig = datetime_y)] #datetime col will be dropped from y - # if(indices_only){ - # y_indices <- y[x, - # on = .(datetime_y <= datetime_max, - # datetime_y >= datetime_min), - # which = TRUE] - # return(y_indices) - # } - #join x rows to y if y's datetime falls within the x range joined <- y[x, on = .(datetime_y <= datetime_max, datetime_y >= datetime_min)] @@ -11600,6 +11731,10 @@ approxjoin_datetime <- function(x, joined[, `:=` (datetime_match_diff = abs(datetime_x - datetime_y_orig))] joined <- joined[, .SD[which.min(datetime_match_diff)], by = datetime_x] joined <- joined[, .SD[which.min(datetime_match_diff)], by = datetime_y_orig] + #this will grab the nearest non-NA for each column, but that messes up the datatime indices + # joined = joined[order(datetime_match_diff), + # lapply(.SD, function(z) dplyr::first(na.omit(z))), + # by = datetime_x] if(indices_only){ y_indices <- which(y$datetime_y %in% joined$datetime_y_orig) @@ -11616,31 +11751,25 @@ approxjoin_datetime <- function(x, setnames(joined, 'datetime_y_orig', 'datetime') } - #restore error objects, var column, original column names (with suffixes). - #original column order + # #restore error objects, var column, original column names (with suffixes). + # #original column order (incomplete. execution always returns before this point + # #in idw, which is the only place where it would be necessary) + # ernames = grep('_err_[xy]$', colnames(joined), value = TRUE) + # ernames = ernames[sub('err_', '', ernames) %in% colnames(joined)] + # for(erc in ernames){ + # dac = sub('err_', '', erc) + # if(dac %in% + # set(joined, j = dac, + # value = set_errors(joined[[dac]], joined[[erc]])) + # } + joined <- as_tibble(joined) %>% mutate(val_x = errors::set_errors(val_x, err_x), val_y = errors::set_errors(val_y, err_y)) %>% select(-err_x, -err_y) - # mutate(var = !!varname) - - # if(x_is_flowtibble) joined <- rename(joined, - # flow = val_x, - # ms_status_flow = ms_status_x, - # ms_interp_flow = ms_interp_x) - # if(y_is_flowtibble) joined <- rename(joined, - # flow = val_y, - # ms_status_flow = ms_status_y, - # ms_interp_flow = ms_interp_y) - - # if(! sum(grepl('^val_[xy]$', colnames(joined))) > 1){ - # joined <- rename(joined, val = matches('^val_[xy]$')) - # } joined <- select(joined, datetime, - # matches('^val_?[xy]?$'), - # any_of('flow'), starts_with('site_code'), any_of(c(starts_with('var_'), matches('^var$'))), any_of(c(starts_with('val_'), matches('^val$'))), diff --git a/src/lter/arctic/processing_kernels.R b/src/lter/arctic/processing_kernels.R index 50532900..904aff01 100644 --- a/src/lter/arctic/processing_kernels.R +++ b/src/lter/arctic/processing_kernels.R @@ -344,9 +344,7 @@ process_1_10303 <- function(network, domain, prodname_ms, site_code, d <- d %>% full_join(var_names, by = 'Type') %>% select(site_code, datetime=Date, val=Value, Comments, var) %>% - mutate(ms_status = case_when(Comments == 'ISCO' ~ 0, - Comments == ' ' ~ 0, - TRUE ~ 1)) %>% + mutate(ms_status = ifelse(Flag == 1, 0, 1)) %>% select(-Comments) %>% mutate(date = str_split_fixed(datetime, ' ', n = Inf)[,1]) %>% # mutate(time = str_split_fixed(datetime, ' ', n = Inf)[,2]) %>% @@ -456,7 +454,7 @@ process_1_10303 <- function(network, domain, prodname_ms, site_code, E_pheophy = 'mg/cm2', T_ECHL = 'mg/cm2', CH4 = 'mg/l')) - + remove_1_vars <- d %>% group_by(site_code, var) %>% summarise(n = n()) %>% @@ -495,7 +493,7 @@ process_1_1489 <- function(network, domain, prodname_ms, site_code, summary_flagcols = 'Flag_Daily_Precip_Total_mm') d <- ms_cast_and_reflag(d, - summary_flags_clean = list('Flag_Daily_Precip_Total_mm' = ''), + summary_flags_to_drop = list('Flag_Daily_Precip_Total_mm' = 'ensuring any other flag gets ms_status of 0'), summary_flags_dirty = list('Flag_Daily_Precip_Total_mm' = 'E'), varflag_col_pattern = NA) @@ -536,7 +534,7 @@ process_1_20120 <- function(network, domain, prodname_ms, site_code, sampling_type = 'I') d <- ms_cast_and_reflag(d, - summary_flags_clean = list('Comments' = ''), + summary_flags_to_drop = list('Comments' = 'ensuring any other comment gets ms_status of 0'), summary_flags_dirty = list('Comments' = c('Late season staff gauge', 'WINTER')), varflag_col_pattern = NA) @@ -561,7 +559,7 @@ process_1_20120 <- function(network, domain, prodname_ms, site_code, sampling_type = 'I') d <- ms_cast_and_reflag(d, - summary_flags_clean = list('Comments' = ''), + summary_flags_to_drop = list('Comments' = 'ensuring any other comment gets ms_status of 0'), summary_flags_dirty = list('Comments' = c('Late season staff gauge', 'WINTER')), varflag_col_pattern = NA) diff --git a/src/lter/hbef/processing_kernels.R b/src/lter/hbef/processing_kernels.R index 4adbbd7c..79c8a25f 100644 --- a/src/lter/hbef/processing_kernels.R +++ b/src/lter/hbef/processing_kernels.R @@ -230,8 +230,8 @@ process_1_208 <- function(network, domain, prodname_ms, site_code, 'w9' = c('9', 'W9')), data_cols = c('pH', 'DIC', 'spCond', 'temp', 'ANC960', 'ANCMet', 'Ca', 'Mg', 'K', - 'Na', 'TMAl', 'OMAl', 'Al_ICP', 'NH4', - 'SO4', 'NO3', 'Cl', + 'Na', 'TMAl', 'OMAl', 'Al_ICP', 'Al_ferron', + 'NH4', 'SO4', 'NO3', 'Cl', 'PO4', 'DOC', 'TDN', 'DON', 'SiO2', 'Mn', 'Fe', 'F', 'cationCharge', 'anionCharge', @@ -243,7 +243,7 @@ process_1_208 <- function(network, domain, prodname_ms, site_code, summary_flagcols = 'fieldCode') d <- ms_cast_and_reflag(d, - summary_flags_clean = list(fieldCode = NA), + summary_flags_to_drop = list(fieldCode = 9999), #not a real code summary_flags_dirty = list(fieldCode = c(969, 970)), varflag_col_pattern = NA) diff --git a/src/lter/konza/processing_kernels.R b/src/lter/konza/processing_kernels.R index b0e7b94b..c73c8289 100644 --- a/src/lter/konza/processing_kernels.R +++ b/src/lter/konza/processing_kernels.R @@ -459,7 +459,7 @@ process_1_50 <- function(network, domain, prodname_ms, site_code, is_sensor = FALSE) d <- ms_cast_and_reflag(d, - # variable_flags_clean = 'FALSE', + variable_flags_to_drop = 'ensuring other flags get ms_status of 0', variable_flags_dirty = 'dirty', variable_flags_bdl = 'BDL', summary_flags_clean = list('comments' = c(0, NA)), @@ -499,18 +499,17 @@ process_1_51 <- function(network, domain, prodname_ms, site_code, datetime_tz = 'US/Central', site_code_col = 'Site', alt_site_code = list('N04D' = 'n04d', - 'N02B'='n02b', + 'N02B' = 'n02b', 'N20B' = 'n20b', 'N01B' = 'n01b'), data_cols = c('Conduct' = 'spCond'), data_col_pattern = '#V#', is_sensor = FALSE) - d <- d %>% - rename(val = 3) %>% - mutate(var = 'GN_spCond', - ms_status = 0) %>% - filter(site_code %in% c('N04D', 'N02B', 'N20B', 'N01B')) + d <- ms_cast_and_reflag(d, + varflag_col_pattern = NA) + + d <- filter(d, site_code %in% c('N04D', 'N02B', 'N20B', 'N01B')) return(d) } @@ -556,8 +555,8 @@ process_1_20 <- function(network, domain, prodname_ms, site_code, d <- ms_cast_and_reflag(d, varflag_col_pattern = NA, - summary_flags_to_drop = list(comments = 'bad'), - summary_flags_dirty = list(comments = 'remove')) + summary_flags_to_drop = list(comments = 'ensuring all get ms_status=0'), + summary_flags_dirty = list(comments = 'ensuring all get ms_status = 0')) return(d) } @@ -592,6 +591,7 @@ process_1_21 <- function(network, domain, prodname_ms, site_code, 'ODO' = 'DO', 'ODOsat' = 'DO_sat'), data_col_pattern = '#V#', + set_to_NA = '', summary_flagcols = 'Comments', is_sensor = TRUE) @@ -638,10 +638,6 @@ process_1_16 <- function(network, domain, prodname_ms, site_code, sampling_type = 'I', is_sensor = TRUE) - d %>% - ggplot(aes(datetime, `IS_temp__|dat`, colour = site_code)) + - geom_line() - d <- ms_cast_and_reflag(d, varflag_col_pattern = NA) @@ -666,9 +662,7 @@ process_1_43 <- function(network, domain, prodname_ms, site_code, mutate(num_d = nchar(RecDay)) %>% mutate(day = ifelse(num_d == 1, paste0('0', as.character(RecDay)), as.character(RecDay))) %>% select(-num_d, -RecDay) %>% - filter(Watershed != '001d', - Watershed != 'n01d', - Watershed != '') + filter(! Watershed %in% c('001d', 'n01d', '')) d <- ms_read_raw_csv(preprocessed_tibble = d, datetime_cols = list('RecYear' = '%Y', @@ -678,25 +672,26 @@ process_1_43 <- function(network, domain, prodname_ms, site_code, site_code_col = 'Watershed', alt_site_code = list('002C' = c('R20B', '001c', 'r20b', '001c'), '020B' = '020b', - 'HQ02' = c('00HQ', '00hq', 'hq'), + 'HQ' = c('00HQ', '00hq', 'hq'), 'N4DF' = c('N04D', 'n04d'), 'N01B' = 'n01b'), data_cols = c('NO3'='NO3_N', 'NH4'='NH4_N', 'TPsN'='TPN', 'SRP', 'TPP'), data_col_pattern = '#V#', + set_to_NA = c('', '.', ' '), summary_flagcols = 'Comments', is_sensor = FALSE) - # Could use a search function to look for words that indicates bad data. - # over all the data with comments seem to indicate the data is in some way - # compromised - d <- d %>% - mutate(Comments = ifelse(Comments != '', 'bad', 'ok')) + clean_comments <- is.na(d$Comments) | + grepl('^(?:rain|snow|cloudy ?=? ?)?[0-9; \\)\\(\\/\\.=\\-]*$', d$Comments) + # unique(na.omit(d[clean_comments,]$Comments)) #verify + d$Comments <- 'dirty' + d$Comments[clean_comments] <- 'clean' d <- ms_cast_and_reflag(d, varflag_col_pattern = NA, - summary_flags_dirty = list(Comments = 'bad'), - summary_flags_to_drop = list(Comments = 'remove')) + summary_flags_dirty = list(Comments = 'dirty'), + summary_flags_clean = list(Comments = 'clean')) d <- ms_conversions(d, convert_units_from = c(NO3_N = 'ug/l', diff --git a/src/lter/niwot/processing_kernels.R b/src/lter/niwot/processing_kernels.R index 334eb9ca..e8396190 100644 --- a/src/lter/niwot/processing_kernels.R +++ b/src/lter/niwot/processing_kernels.R @@ -155,14 +155,11 @@ process_1_213 <- function(network, domain, prodname_ms, site_code, alt_site_code = list('SODDIE' = 'SODDIE STREAM'), data_cols = data_cols, data_col_pattern = '#V#', - set_to_NA = c('NP', 'DNS', 'QNS', 'trace'), - convert_to_BDL_flag = c('u', '<0.50', 'trace','<0.02', - '<0.06', '<0.03', '<0.04'), + set_to_NA = c('NP', 'DNS', 'QNS'), + convert_to_BDL_flag = c('u', '<#*#', 'trace'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -262,22 +259,10 @@ process_1_103 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL'), - convert_to_BDL_flag = unique(c('u', 'trace', '<0.71', '<0.38', - '<0.22', '<0.1428', '<0.143', - '<0.449', '<0.082', '<0.087', - '<0.102', '<0.28', '<0.846', - '<0.16', '<0.323', '<0.833', - '<0.063', '<0.03', '<0.08', - '<0.0387', '<0.0388', '<0.0214', - '<0.571', '<0.04', '<0.01', - '<0.0258', '<0.0258', '<0.0258', - '<0.03', '<0.01', '<0.04', - '<0.50', '<0.02', '<0.06')), + convert_to_BDL_flag = c('u', 'trace', '<#*#'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -382,12 +367,10 @@ process_1_107 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS'), - convert_to_BDL_flag = c('u', 'trace'), + convert_to_BDL_flag = c('u', 'trace', '<#*#'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -489,15 +472,10 @@ process_1_108 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'NV', 'EQCL', 'NSS'), - convert_to_BDL_flag = c('<0.38', '<0.4', '<0.22', '<0.1428', - '<0.323', '<0.03', '<0.0388', '<0.0214', - '<0.04', '<0.0258', '<0.01', 'trace', - 'u'), + convert_to_BDL_flag = c('<#*#', 'trace', 'u'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -598,15 +576,10 @@ process_1_109 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'NV', 'EQCL'), - convert_to_BDL_flag = c('<0.4', '<0.22', '<0.1428', - '<0.087', '<0.28', '<0.161', '<0.063', - '<0.0387', '<0.0214', '<0.56', '<0.0258', - '<0.03', 'u', 'trace'), + convert_to_BDL_flag = c('<#*#', 'u', 'trace'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -711,12 +684,10 @@ process_1_110 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS'), - convert_to_BDL_flag = c('<0.71', 'u', 'trace'), + convert_to_BDL_flag = c('<#*#', 'u', 'trace'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -816,16 +787,10 @@ process_1_112 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL'), - convert_to_BDL_flag = c('<0.71', '<0.38', '<0.4', '<0.22', - '<0.1428', '<0.063', '<0.03', '<0.08', - '<0.0388', '<0.0214', '<0.04', - '<0.01', '<0.0258', 'trace', - 'u'), + convert_to_BDL_flag = c('<#*#', 'trace', 'u'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -925,13 +890,10 @@ process_1_113 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL', 'NSS'), - convert_to_BDL_flag = c('<0.71', '<0.38', '<0.4', '<0.22', - '<0.1428', 'u', 'trace'), + convert_to_BDL_flag = c('<#*#', 'u', 'trace'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -1032,14 +994,10 @@ process_1_9 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS'), - convert_to_BDL_flag = c('<0.50', '<0.02', '<0.323', '<0.06', - '<0.0214', '<0.03', '<0.0258', - 'u', 'trace'), + convert_to_BDL_flag = c('<#*#' 'u', 'trace'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -1140,15 +1098,10 @@ process_1_160 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL', 'NV'), - convert_to_BDL_flag = c('<0.4', '<0.22', '<0.1428', - '<0.02', '<0.323', '<0.063', '<0.0388', - '<0.0214', '<0.04', '<0.0258', '<0.03', - 'trace', 'u'), + convert_to_BDL_flag = c('<#*#' 'trace', 'u'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -1248,15 +1201,10 @@ process_1_162 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL'), - convert_to_BDL_flag = c('<0.4', '<0.22', '<0.063', '<0.03', - '<0.08', '<0.04', '<0.0096855', - '<-0.042314', '<0.01', 'trace', - 'u'), + convert_to_BDL_flag = c('<#*#', 'trace', 'u'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -1357,14 +1305,10 @@ process_1_163 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL', 'NV'), - convert_to_BDL_flag = c('<0.4', '<0.1428', '<0.143', - '<0.0214', '<0.0258', 'trace', - 'u'), + convert_to_BDL_flag = c('<#*#', 'trace', 'u'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l', @@ -1470,13 +1414,10 @@ process_1_278 <- function(network, domain, prodname_ms, site_code, 'POC' = 'POC'), data_col_pattern = '#V#', set_to_NA = c('NP', 'DNS', 'QNS', 'EQCL'), - convert_to_BDL_flag = c('<0.47', '<0.50', '<0.02', '<0.05', - '<0.06', '<0.04', 'trace', 'u'), + convert_to_BDL_flag = c('<#*#', 'trace', 'u'), is_sensor = FALSE) - d <- ms_cast_and_reflag(d, - variable_flags_bdl = 'BDL', - variable_flags_dirty = 'dirty') + d <- ms_cast_and_reflag(d, variable_flags_bdl = 'BDL') d <- ms_conversions(d, convert_units_from = c(NH4 = 'ueq/l',