diff --git a/src/acquisition_master.R b/src/acquisition_master.R index 9ed48b28..56ee4b46 100644 --- a/src/acquisition_master.R +++ b/src/acquisition_master.R @@ -218,6 +218,16 @@ ms_init <- function(use_gpu = FALSE, op_system <- NA } + + res <- try(setwd('C:/Users/Dell/Documents/Projects/data_processing'), silent=TRUE) #server + if(! 'try-error' %in% class(res)){ + successes <- successes + 1 + which_machine <- 'bini' + instance_type <- 'dev' + machine_status <- 'noob' + op_system <- 'windows' + } + if(successes > 1){ stop(glue('more than one working directory was available. must set the ', 'correct one manually')) @@ -267,8 +277,7 @@ googledrive::drive_auth(email = gee_login) #initialize and authorize GEE account try(rgee::ee_Initialize(user = gee_login, drive = TRUE)) - - + #set up global logger. network-domain loggers are set up later logging::basicConfig() logging::addHandler(logging::writeToFile, diff --git a/src/webb/panola/domain_helpers.R b/src/webb/panola/domain_helpers.R new file mode 100644 index 00000000..7fbd9e37 --- /dev/null +++ b/src/webb/panola/domain_helpers.R @@ -0,0 +1,81 @@ +## run these before working inside retrieve_product func +## network = network +## domain = domain +## prodname_ms = prodname_ms +## site_code = site_code +## tracker = held_data +## url = prod_info$url[i] + +retrieve_sleepers_product <- function(network, + domain, + prodname_ms, + site_code, + tracker, + url){ + # creating a string which matches the names of processing kernels + processing_func <- get(paste0('process_0_', + # these names or based off of prod names in products.csv + prodcode_from_prodname_ms(prodname_ms))) + + # tracking the "version" of the product + rt <- tracker[[prodname_ms]][[site_code]]$retrieve + + held_dt <- as.POSIXct(rt$held_version, + tz = 'UTC') + + # "deets" is a list of all the information originally from a row in products.csv + deets <- list(prodname_ms = prodname_ms, + site_code = site_code, + component = rt$component, + last_mod_dt = held_dt, + url = url) + + # these "deets" are fed as arguments to wwhatever processing kernel is currently being called + # remember, this "retrieve_product" function is being called, in the retrieve.R script, + # in a loop over the product names from products.csv -- this is why the products.csv prod names + # must match the end of the procesing kernels which are written to retrieve that product + + # if you're working on pkernels and not actually running this func, uncomment and run these lines: + ## set_details = deets + ## network = network + ## domain = domain + + result <- do.call(processing_func, + args = list(set_details = deets, + network = network, + domain = domain)) + + + new_status <- evaluate_result_status(result) + + if('access_time' %in% names(result) && any(! is.na(result$access_time))){ + deets$last_mod_dt <- result$access_time[! is.na(result$access_time)][1] + } + + update_data_tracker_r(network = network, + domain = domain, + tracker_name = 'held_data', + set_details = deets, + new_status = new_status) + + source_urls <- get_source_urls(result_obj = result, + processing_func = processing_func) + + write_metadata_r(murl = source_urls, + network = network, + domain = domain, + prodname_ms = prodname_ms) + +} + +retrieve_usgs_sleeper_daily_q <- function(set_details) { + if(grepl("w5", set_details$component) == TRUE) { + q <- dataRetrieval::readNWISdv(siteNumbers = "01135300", + parameterCd = "00060") + } else if(grepl("w3", set_details$component) == TRUE) { + q <- dataRetrieval::readNWISdv(siteNumbers = "01135150", + parameterCd = "00060") + } + + return(q) +} diff --git a/src/webb/panola/processing_kernels.R b/src/webb/panola/processing_kernels.R new file mode 100644 index 00000000..f24a81fc --- /dev/null +++ b/src/webb/panola/processing_kernels.R @@ -0,0 +1,85 @@ +# source('src/webb/sleeper/domain_helpers.R') +#retrieval kernels #### + + + +#discharge: STATUS=READY +#. handle_errors +process_0_VERSIONLESS001 <- function(set_details, network, domain) { + + raw_data_dest <- glue('data/{n}/{d}/raw/{p}/{s}', + n = network, + d = domain, + p = prodname_ms, + s = set_details$site_code) + + dir.create(path = raw_data_dest, + showWarnings = FALSE, + recursive = TRUE) + + rawfile <- glue('{rd}/{c}.csv', + rd = raw_data_dest, + c = set_details$component) + + # call our dataRetrieval function + q <- retrieve_usgs_sleeper_daily_q(set_details) + + # download it to the raw file locatin + write_csv(q, file = rawfile) + + res <- httr::HEAD(set_details$url) + + last_mod_dt <- strptime(x = substr(res$headers$`last-modified`, + start = 1, + stop = 19), + format = '%Y-%m-%dT%H:%M:%S') %>% + with_tz(tzone = 'UTC') + + deets_out <- list(url = paste(set_details$url, '(requires authentication)'), + access_time = as.character(with_tz(Sys.time(), + tzone = 'UTC')), + last_mod_dt = last_mod_dt) + + return(deets_out) +} + +#stream_chemistry: STATUS=READY +#. handle_errors +process_0_VERSIONLESS002 <- function(set_details, network, domain) { + + raw_data_dest <- glue('data/{n}/{d}/raw/{p}/{s}', + n = network, + d = domain, + p = prodname_ms, + s = set_details$site_code) + + dir.create(path = raw_data_dest, + showWarnings = FALSE, + recursive = TRUE) + + rawfile <- glue('{rd}/{c}.zip', + rd = raw_data_dest, + c = set_details$component) + + R.utils::downloadFile(url = set_details$url, + filename = rawfile, + skip = FALSE, + overwrite = TRUE) + + res <- httr::HEAD(set_details$url) + + last_mod_dt <- strptime(x = substr(res$headers$`last-modified`, + start = 1, + stop = 19), + format = '%Y-%m-%dT%H:%M:%S') %>% + with_tz(tzone = 'UTC') + + deets_out <- list(url = paste(set_details$url, '(requires authentication)'), + access_time = as.character(with_tz(Sys.time(), + tzone = 'UTC')), + last_mod_dt = last_mod_dt) + + return(deets_out) +} + + diff --git a/src/webb/panola/products.csv b/src/webb/panola/products.csv new file mode 100644 index 00000000..49d2e918 --- /dev/null +++ b/src/webb/panola/products.csv @@ -0,0 +1,4 @@ +prodcode,prodname,retrieve_status,munge_status,derive_status,precursor_of,notes,components,url +VERSIONLESS_001,discharge,,,,stream_flux_inst_ms001,,panola_discharge,http://catalog/file/get/5c5b0b83e4b070828902ac9b?f=__disk__b4%2F4d%2Fc1%2Fb44dc1405627810cbf6ef48e0a9ad77e7a3d7f62 +VERSIONLESS_002,stream_chemistry,,,,stream_flux_inst_ms001,,panola_chem,http://catalog/file/get/5c5b0b83e4b070828902ac9b?f=__disk__b4%2F4d%2Fc1%2Fb44dc1405627810cbf6ef48e0a9ad77e7a3d7f62 +ms001,stream_flux_inst,,,,NA,NA,NA,NA diff --git a/src/webb/panola/retrieve_versionless.R b/src/webb/panola/retrieve_versionless.R new file mode 100644 index 00000000..c72483b9 --- /dev/null +++ b/src/webb/panola/retrieve_versionless.R @@ -0,0 +1,101 @@ +#this is for unorganized versioness data (e.g. a single zip file for all +#sites). we could turn this into a function, and make a separate function for +#versionless data that's separated into several files. + +loginfo('Beginning retrieve (versionless products)', + logger = logger_module) + +prod_info <- get_product_info(network = network, + domain = domain, + status_level = 'retrieve', + get_statuses = 'ready') %>% + filter(grepl(pattern = '^VERSIONLESS', + x = prodcode)) + +if(! is.null(prodname_filter)){ + prod_info <- filter(prod_info, prodname %in% prodname_filter) +} + +if(nrow(prod_info) == 0) return() + +site_code <- 'sitename_NA' + +## i = 4 +for(i in seq_len(nrow(prod_info))){ + + prodcode <- prod_info$prodcode[i] + + prodname_ms <<- paste0(prod_info$prodname[i], + '__', + prodcode) + + held_data <<- get_data_tracker(network = network, + domain = domain) + + if(! product_is_tracked(tracker = held_data, + prodname_ms = prodname_ms)){ + + held_data <<- track_new_product(tracker = held_data, + prodname_ms = prodname_ms) + } + + if(! site_is_tracked(tracker = held_data, + prodname_ms = prodname_ms, + site_code = site_code)){ + + held_data <<- insert_site_skeleton( + tracker = held_data, + prodname_ms = prodname_ms, + site_code = site_code, + site_components = prod_info$components[i], + versionless = TRUE + ) + } + + update_data_tracker_r(network = network, + domain = domain, + tracker = held_data) + + dest_dir <- glue('data/{n}/{d}/raw/{p}/{s}', + n = network, + d = domain, + p = prodname_ms, + s = site_code) + + dir.create(path = dest_dir, + showWarnings = FALSE, + recursive = TRUE) + + retrieval_s <- held_data[[prodname_ms]][['sitename_NA']][['retrieve']][['status']] + + if(retrieval_s == 'ok'){ + loginfo(glue('Nothing to do for {s} {p}', + s=site_code, p=prodname_ms), logger=logger_module) + next + } else { + loginfo(glue('Retrieving {s} {p}', + s=site_code, p=prodname_ms), logger=logger_module) + } + + retrieve_sleepers_product(network = network, + domain = domain, + prodname_ms = prodname_ms, + site_code = site_code, + tracker = held_data, + url = prod_info$url[i]) + + if(! is.na(prod_info$munge_status[i])){ + update_data_tracker_m(network = network, + domain = domain, + tracker_name = 'held_data', + prodname_ms = prodname_ms, + site_code = site_code, + new_status = 'pending') + } + # } + + gc() +} + +loginfo('Retrieval complete for all versionless products', + logger = logger_module)