Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

XML Workflow submission API endpoint #2662

Merged
merged 10 commits into from
Jul 27, 2020
16 changes: 9 additions & 7 deletions apps/api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ EXPOSE 8000
# --------------------------------------------------------------------------
# Variables to store in docker image (most of them come from the base image)
# --------------------------------------------------------------------------
ENV AUTH_REQ="yes" \
HOST_ONLY="no" \
PGHOST="postgres"


# COMMAND TO RUN
RUN apt-get update \
&& apt-get install libsodium-dev -y \
Expand All @@ -26,10 +23,15 @@ RUN apt-get update \
&& Rscript -e "devtools::install_github('rstudio/swagger')" \
&& Rscript -e "devtools::install_github('rstudio/plumber')"

ENV AUTH_REQ="TRUE" \
HOST_ONLY="FALSE" \
PGHOST="postgres"\
RABBITMQ_URI="amqp://guest:guest@rabbitmq/%2F"\
DATA_DIR="/data/"\
DBFILES_DIR="/data/dbfiles/"

WORKDIR /api/R

CMD Rscript entrypoint.R

COPY ./ /api


COPY ./ /api
15 changes: 10 additions & 5 deletions apps/api/R/auth.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,15 @@ validate_crypt_pass <- function(username, crypt_pass) {
res <- tbl(dbcon, "users") %>%
filter(login == username,
crypted_password == crypt_pass) %>%
count() %>%
collect()

PEcAn.DB::db.close(dbcon)

if (res == 1) {
return(TRUE)
if (nrow(res) == 1) {
return(res$id)
}

return(FALSE)
return(NA)
}

#* Filter to authenticate a user calling the PEcAn API
Expand All @@ -59,6 +58,8 @@ authenticate_user <- function(req, res) {
grepl("ping", req$PATH_INFO, ignore.case = TRUE) ||
grepl("status", req$PATH_INFO, ignore.case = TRUE))
{
req$user$userid <- NA
req$user$username <- ""
return(plumber::forward())
}

Expand All @@ -70,7 +71,11 @@ authenticate_user <- function(req, res) {
password <- auth_details[2]
crypt_pass <- get_crypt_pass(username, password)

if(validate_crypt_pass(username, crypt_pass)){
userid <- validate_crypt_pass(username, crypt_pass)

if(! is.na(userid)){
req$user$userid <- userid
req$user$username <- username
return(plumber::forward())
}

Expand Down
186 changes: 186 additions & 0 deletions apps/api/R/submit.workflow.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
library(dplyr)

#* Submit a workflow submitted as XML
#* @param workflowXmlString String containing the XML workflow from request body
#* @param userDetails List containing userid & username
#* @return ID & status of the submitted workflow
#* @author Tezan Sahu
submit.workflow.xml <- function(workflowXmlString, userDetails){

workflowXml <- XML::xmlParseString(stringr::str_replace(workflowXmlString, "<?.*?>\n", ""))
workflowList <- XML::xmlToList(workflowXml)

# Fix details about the database
workflowList$database <- list(bety = PEcAn.DB::get_postgres_envvars(
host = "localhost",
dbname = "bety",
user = "bety",
password = "bety",
driver = "PostgreSQL"
)
)

# Fix RabbitMQ details
dbcon <- PEcAn.DB::betyConnect()
hostInfo <- PEcAn.DB::dbHostInfo(dbcon)
PEcAn.DB::db.close(dbcon)
workflowList$host <- list(
rabbitmq = list(
uri = Sys.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost/%2F"),
queue = paste0(workflowList$model$type, "_", workflowList$model$revision)
)
)
workflowList$host$name <- if(hostInfo$hostname == "") "localhost" else hostInfo$hostname

# Fix the info
workflowList$info$notes <- workflowList$info$notes
if(is.null(workflowList$info$userid)){
workflowList$info$userid <- userDetails$userid
}
if(is.null(workflowList$info$username)){
workflowList$info$username <- userDetails$username
}
if(is.null(workflowList$info$date)){
workflowList$info$date <- Sys.time()
}

# Add entry to workflows table in database
workflow_id <- insert.workflow(workflowList)
workflowList$workflow$id <- workflow_id

# Add entry to attributes table in database
insert.attribute(workflowList)

# Fix the output directory
outdir <- paste0(Sys.getenv("DATA_DIR", "/data/"), "workflows/PEcAn_", workflow_id)
workflowList$outdir <- outdir

# Create output diretory
dir.create(outdir, recursive=TRUE)

# Modify the `dbfiles` path & create the directory if needed
workflowList$run$dbfiles <- Sys.getenv("DBFILES_DIR", "/data/dbfiles/")
if(! dir.exists(workflowList$run$dbfiles)){
dir.create(workflowList$run$dbfiles, recursive = TRUE)
}

# Convert settings list to XML & save it into outdir
workflowXml <- PEcAn.settings::listToXml(workflowList, "pecan")
XML::saveXML(workflowXml, paste0(outdir, "/pecan.xml"))
res <- file.copy("/work/workflow.R", outdir)

# Post workflow to RabbitMQ
message <- list(folder = outdir, workflowid = workflow_id)
res <- PEcAn.remote::rabbitmq_post_message(workflowList$host$rabbitmq$uri, "pecan", message, "rabbitmq")

if(res$routed){
return(list(workflow_id = as.character(workflow_id), status = "Submitted successfully"))
}
else{
return(list(status = "Error", message = "Could not submit to RabbitMQ"))
}


}


#* Insert the workflow into workflows table to obtain the workflow_id
#* @param workflowList List containing the workflow details
#* @return ID of the submitted workflow
#* @author Tezan Sahu
insert.workflow <- function(workflowList){
dbcon <- PEcAn.DB::betyConnect()

model_id <- workflowList$model$id
if(is.null(model_id)){
model_id <- PEcAn.DB::get.id("models", c("model_name", "revision"), c(workflowList$model$type, workflowList$model$revision), dbcon)
}

start_time <- Sys.time()

workflow_df <- tibble::tibble(
"site_id" = c(bit64::as.integer64(workflowList$run$site$id)),
"model_id" = c(bit64::as.integer64(model_id)),
"folder" = "temp_dir",
"hostname" = c("docker"),
"start_date" = c(as.POSIXct(workflowList$run$start.date)),
"end_date" = c(as.POSIXct(workflowList$run$end.date)),
"advanced_edit" = c(FALSE),
"started_at" = c(start_time),
stringsAsFactors = FALSE
)

if(! is.na(workflowList$info$userid)){
workflow_df <- workflow_df %>% tibble::add_column("user_id" = c(bit64::as.integer64(workflowList$info$userid)))
}

insert <- PEcAn.DB::insert_table(workflow_df, "workflows", dbcon)
workflow_id <- dplyr::tbl(dbcon, "workflows") %>%
filter(started_at == start_time
&& site_id == bit64::as.integer64(workflowList$run$site$id)
&& model_id == bit64::as.integer64(model_id)
) %>%
pull(id)

update_qry <- paste0("UPDATE workflows SET folder = 'data/workflows/PEcAn_", workflow_id, "' WHERE id = '", workflow_id, "';")
PEcAn.DB::db.query(update_qry, dbcon)

PEcAn.DB::db.close(dbcon)

return(workflow_id)
}


#* Insert the workflow into attributes table
#* @param workflowList List containing the workflow details
#* @author Tezan Sahu
insert.attribute <- function(workflowList){
dbcon <- PEcAn.DB::betyConnect()

# Create an array of PFTs
pfts <- c()
for(i in seq(length(workflowList$pfts))){
pfts <- c(pfts, workflowList$pfts[i]$pft$name)
}

# Obtain the model_id
model_id <- workflowList$model$id
if(is.null(model_id)){
model_id <- PEcAn.DB::get.id("models", c("model_name", "revision"), c(workflowList$model$type, workflowList$model$revision), dbcon)
}

# Fill in the properties
properties <- list(
start = as.POSIXct(workflowList$run$start.date),
end = as.POSIXct(workflowList$run$end.date),
pfts = pfts,
runs = if(is.null(workflowList$ensemble$size)) 1 else workflowList$ensemble$size,
modelid = model_id,
siteid = bit64::as.integer64(workflowList$run$site$id),
sitename = dplyr::tbl(dbcon, "sites") %>% filter(id == bit64::as.integer64(workflowList$run$site$id)) %>% pull(sitename),
#sitegroupid <-
lat = if(is.null(workflowList$run$site$lat)) "" else workflowList$run$site$lat,
lon = if(is.null(workflowList$run$site$lon)) "" else workflowList$run$site$lon,
email = if(is.na(workflowList$info$userid) || workflowList$info$userid == -1) "" else
dplyr::tbl(dbcon, "users") %>% filter(id == bit64::as.integer64(workflowList$info$userid)) %>% pull(email),
notes = if(is.null(workflowList$info$notes)) "" else workflowList$info$notes,
input_met = workflowList$run$inputs$met$id,
variables = workflowList$ensemble$variable
)
if(! is.null(workflowList$ensemble$parameters$method)) properties$parm_method <- workflowList$ensemble$parameters$method
if(! is.null(workflowList$sensitivity.analysis$quantiles)){
sensitivity <- c()
for(i in seq(length(workflowList$sensitivity.analysis$quantiles))){
sensitivity <- c(sensitivity, workflowList$sensitivity.analysis$quantiles[i]$sigma)
}
properties$sensitivity <- paste0(sensitivity, collapse=",")
}
# More variables can be added later

# Insert properties into attributes table
value_json <- as.character(jsonlite::toJSON(properties, auto_unbox = TRUE))

res <- DBI::dbSendStatement(dbcon,
"INSERT INTO attributes (container_type, container_id, value) VALUES ($1, $2, $3)",
list("workflows", bit64::as.integer64(workflowList$workflow$id), value_json))
}
24 changes: 24 additions & 0 deletions apps/api/R/workflows.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
library(dplyr)
source("submit.workflow.R")

#' Get the list of workflows (using a particular model & site, if specified)
#' @param model_id Model id (character)
Expand Down Expand Up @@ -125,4 +126,27 @@ getWorkflowDetails <- function(id, res){

return(res)
}
}

#################################################################################################

#' Post a workflow for execution
#' @param req Request sent
#' @return ID & status of the submitted workflow
#' @author Tezan Sahu
#* @post /
submitWorkflow <- function(req, res){
if(req$HTTP_CONTENT_TYPE == "application/xml"){
submission_res <- submit.workflow.xml(req$postBody, req$user)
if(submission_res$status == "Error"){
res$status <- 400
return(submission_res)
}
res$status <- 201
return(submission_res)
}
else{
res$status <- 415
return(paste("Unsupported request content type:", req$HTTP_CONTENT_TYPE))
}
}