Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/acquisition_master.R
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,16 @@ ms_init <- function(use_gpu = FALSE,
op_system <- NA
}


res <- try(setwd('C:/Users/Dell/Documents/Projects/data_processing'), silent=TRUE) #server
if(! 'try-error' %in% class(res)){
successes <- successes + 1
which_machine <- 'bini'
instance_type <- 'dev'
machine_status <- 'noob'
op_system <- 'windows'
}

if(successes > 1){
stop(glue('more than one working directory was available. must set the ',
'correct one manually'))
Expand Down Expand Up @@ -267,8 +277,7 @@ googledrive::drive_auth(email = gee_login)
#initialize and authorize GEE account
try(rgee::ee_Initialize(user = gee_login,
drive = TRUE))



#set up global logger. network-domain loggers are set up later
logging::basicConfig()
logging::addHandler(logging::writeToFile,
Expand Down
81 changes: 81 additions & 0 deletions src/webb/panola/domain_helpers.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
## run these before working inside retrieve_product func
## network = network
## domain = domain
## prodname_ms = prodname_ms
## site_code = site_code
## tracker = held_data
## url = prod_info$url[i]

retrieve_sleepers_product <- function(network,
domain,
prodname_ms,
site_code,
tracker,
url){
# creating a string which matches the names of processing kernels
processing_func <- get(paste0('process_0_',
# these names or based off of prod names in products.csv
prodcode_from_prodname_ms(prodname_ms)))

# tracking the "version" of the product
rt <- tracker[[prodname_ms]][[site_code]]$retrieve

held_dt <- as.POSIXct(rt$held_version,
tz = 'UTC')

# "deets" is a list of all the information originally from a row in products.csv
deets <- list(prodname_ms = prodname_ms,
site_code = site_code,
component = rt$component,
last_mod_dt = held_dt,
url = url)

# these "deets" are fed as arguments to wwhatever processing kernel is currently being called
# remember, this "retrieve_product" function is being called, in the retrieve.R script,
# in a loop over the product names from products.csv -- this is why the products.csv prod names
# must match the end of the procesing kernels which are written to retrieve that product

# if you're working on pkernels and not actually running this func, uncomment and run these lines:
## set_details = deets
## network = network
## domain = domain

result <- do.call(processing_func,
args = list(set_details = deets,
network = network,
domain = domain))


new_status <- evaluate_result_status(result)

if('access_time' %in% names(result) && any(! is.na(result$access_time))){
deets$last_mod_dt <- result$access_time[! is.na(result$access_time)][1]
}

update_data_tracker_r(network = network,
domain = domain,
tracker_name = 'held_data',
set_details = deets,
new_status = new_status)

source_urls <- get_source_urls(result_obj = result,
processing_func = processing_func)

write_metadata_r(murl = source_urls,
network = network,
domain = domain,
prodname_ms = prodname_ms)

}

retrieve_usgs_sleeper_daily_q <- function(set_details) {
if(grepl("w5", set_details$component) == TRUE) {
q <- dataRetrieval::readNWISdv(siteNumbers = "01135300",
parameterCd = "00060")
} else if(grepl("w3", set_details$component) == TRUE) {
q <- dataRetrieval::readNWISdv(siteNumbers = "01135150",
parameterCd = "00060")
}

return(q)
}
85 changes: 85 additions & 0 deletions src/webb/panola/processing_kernels.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# source('src/webb/sleeper/domain_helpers.R')
#retrieval kernels ####



#discharge: STATUS=READY
#. handle_errors
process_0_VERSIONLESS001 <- function(set_details, network, domain) {

raw_data_dest <- glue('data/{n}/{d}/raw/{p}/{s}',
n = network,
d = domain,
p = prodname_ms,
s = set_details$site_code)

dir.create(path = raw_data_dest,
showWarnings = FALSE,
recursive = TRUE)

rawfile <- glue('{rd}/{c}.csv',
rd = raw_data_dest,
c = set_details$component)

# call our dataRetrieval function
q <- retrieve_usgs_sleeper_daily_q(set_details)

# download it to the raw file locatin
write_csv(q, file = rawfile)

res <- httr::HEAD(set_details$url)

last_mod_dt <- strptime(x = substr(res$headers$`last-modified`,
start = 1,
stop = 19),
format = '%Y-%m-%dT%H:%M:%S') %>%
with_tz(tzone = 'UTC')

deets_out <- list(url = paste(set_details$url, '(requires authentication)'),
access_time = as.character(with_tz(Sys.time(),
tzone = 'UTC')),
last_mod_dt = last_mod_dt)

return(deets_out)
}

#stream_chemistry: STATUS=READY
#. handle_errors
process_0_VERSIONLESS002 <- function(set_details, network, domain) {

raw_data_dest <- glue('data/{n}/{d}/raw/{p}/{s}',
n = network,
d = domain,
p = prodname_ms,
s = set_details$site_code)

dir.create(path = raw_data_dest,
showWarnings = FALSE,
recursive = TRUE)

rawfile <- glue('{rd}/{c}.zip',
rd = raw_data_dest,
c = set_details$component)

R.utils::downloadFile(url = set_details$url,
filename = rawfile,
skip = FALSE,
overwrite = TRUE)

res <- httr::HEAD(set_details$url)

last_mod_dt <- strptime(x = substr(res$headers$`last-modified`,
start = 1,
stop = 19),
format = '%Y-%m-%dT%H:%M:%S') %>%
with_tz(tzone = 'UTC')

deets_out <- list(url = paste(set_details$url, '(requires authentication)'),
access_time = as.character(with_tz(Sys.time(),
tzone = 'UTC')),
last_mod_dt = last_mod_dt)

return(deets_out)
}


4 changes: 4 additions & 0 deletions src/webb/panola/products.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
prodcode,prodname,retrieve_status,munge_status,derive_status,precursor_of,notes,components,url
VERSIONLESS_001,discharge,,,,stream_flux_inst_ms001,,panola_discharge,http://catalog/file/get/5c5b0b83e4b070828902ac9b?f=__disk__b4%2F4d%2Fc1%2Fb44dc1405627810cbf6ef48e0a9ad77e7a3d7f62
VERSIONLESS_002,stream_chemistry,,,,stream_flux_inst_ms001,,panola_chem,http://catalog/file/get/5c5b0b83e4b070828902ac9b?f=__disk__b4%2F4d%2Fc1%2Fb44dc1405627810cbf6ef48e0a9ad77e7a3d7f62
ms001,stream_flux_inst,,,,NA,NA,NA,NA
101 changes: 101 additions & 0 deletions src/webb/panola/retrieve_versionless.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#this is for unorganized versioness data (e.g. a single zip file for all
#sites). we could turn this into a function, and make a separate function for
#versionless data that's separated into several files.

loginfo('Beginning retrieve (versionless products)',
logger = logger_module)

prod_info <- get_product_info(network = network,
domain = domain,
status_level = 'retrieve',
get_statuses = 'ready') %>%
filter(grepl(pattern = '^VERSIONLESS',
x = prodcode))

if(! is.null(prodname_filter)){
prod_info <- filter(prod_info, prodname %in% prodname_filter)
}

if(nrow(prod_info) == 0) return()

site_code <- 'sitename_NA'

## i = 4
for(i in seq_len(nrow(prod_info))){

prodcode <- prod_info$prodcode[i]

prodname_ms <<- paste0(prod_info$prodname[i],
'__',
prodcode)

held_data <<- get_data_tracker(network = network,
domain = domain)

if(! product_is_tracked(tracker = held_data,
prodname_ms = prodname_ms)){

held_data <<- track_new_product(tracker = held_data,
prodname_ms = prodname_ms)
}

if(! site_is_tracked(tracker = held_data,
prodname_ms = prodname_ms,
site_code = site_code)){

held_data <<- insert_site_skeleton(
tracker = held_data,
prodname_ms = prodname_ms,
site_code = site_code,
site_components = prod_info$components[i],
versionless = TRUE
)
}

update_data_tracker_r(network = network,
domain = domain,
tracker = held_data)

dest_dir <- glue('data/{n}/{d}/raw/{p}/{s}',
n = network,
d = domain,
p = prodname_ms,
s = site_code)

dir.create(path = dest_dir,
showWarnings = FALSE,
recursive = TRUE)

retrieval_s <- held_data[[prodname_ms]][['sitename_NA']][['retrieve']][['status']]

if(retrieval_s == 'ok'){
loginfo(glue('Nothing to do for {s} {p}',
s=site_code, p=prodname_ms), logger=logger_module)
next
} else {
loginfo(glue('Retrieving {s} {p}',
s=site_code, p=prodname_ms), logger=logger_module)
}

retrieve_sleepers_product(network = network,
domain = domain,
prodname_ms = prodname_ms,
site_code = site_code,
tracker = held_data,
url = prod_info$url[i])

if(! is.na(prod_info$munge_status[i])){
update_data_tracker_m(network = network,
domain = domain,
tracker_name = 'held_data',
prodname_ms = prodname_ms,
site_code = site_code,
new_status = 'pending')
}
# }

gc()
}

loginfo('Retrieval complete for all versionless products',
logger = logger_module)