Skip to content

Commit

Permalink
Merge 1c9e26d into 69b60ab
Browse files Browse the repository at this point in the history
  • Loading branch information
davidangb committed May 4, 2020
2 parents 69b60ab + 1c9e26d commit f314ef0
Show file tree
Hide file tree
Showing 12 changed files with 234 additions and 300 deletions.
40 changes: 33 additions & 7 deletions src/main/resources/swagger/api-docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3527,6 +3527,38 @@ paths:
description: Internal Error

/api/workspaces/{workspaceNamespace}/{workspaceName}/importPFB:
get:
x-passthrough: false
tags:
- Entities
operationId: listImportPFBJobs
summary: List PFB import jobs in this workspace
description: >
Lists all imports for this workspace, optionally filtered to only those imports currently in progress
produces:
- application/json
parameters:
- $ref: '#/parameters/workspaceNamespaceParam'
- $ref: '#/parameters/workspaceNameParam'
- in: query
description: When true, filters to only those imports currently in progress
name: running_only
required: false
type: boolean
default: false
responses:
200:
description: Successful Request
schema:
type: array
items:
$ref: '#/definitions/PFBStatusResponse'
400:
description: Bad Request
404:
description: Workspace not found
500:
description: Internal Error
post:
x-passthrough: false
tags:
Expand Down Expand Up @@ -3589,7 +3621,7 @@ paths:
schema:
$ref: '#/definitions/PFBStatusResponse'
404:
description: job ID not found
description: workspace or job ID not found
500:
description: Internal Error

Expand Down Expand Up @@ -6821,15 +6853,9 @@ definitions:
message:
type: string
description: extended explanation for the import job.
timestamp:
type: integer
format: int64
description: time of the response
required:
- jobId
- status
- message
- timestamp

Profile:
type: object
Expand Down
160 changes: 31 additions & 129 deletions src/main/scala/org/broadinstitute/dsde/firecloud/EntityClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import org.broadinstitute.dsde.firecloud.model.ModelSchema
import org.broadinstitute.dsde.rawls.model._
import org.broadinstitute.dsde.firecloud.model.ModelJsonProtocol._
import org.broadinstitute.dsde.firecloud.model._
import org.broadinstitute.dsde.firecloud.service.{FireCloudDirectiveUtils, FireCloudRequestBuilding, TSVFileSupport, TsvTypes}
import org.broadinstitute.dsde.firecloud.service.{FireCloudDirectiveUtils, TSVFileSupport, TsvTypes}
import org.broadinstitute.dsde.firecloud.service.PerRequest.{PerRequestMessage, RequestComplete}
import org.broadinstitute.dsde.firecloud.service.TsvTypes.TsvType
import org.broadinstitute.dsde.firecloud.utils.TSVLoadFile
import org.broadinstitute.dsde.firecloud.utils.{RestJsonClient, TSVLoadFile}
import spray.client.pipelining._
import spray.http.HttpEncodings._
import spray.http.HttpHeaders.`Accept-Encoding`
Expand All @@ -41,18 +41,13 @@ import scala.util.{Failure, Success, Try}

object EntityClient {

lazy val arrowRoot: String = FireCloudConfig.Arrow.baseUrl
lazy val arrowAppName: String = FireCloudConfig.Arrow.appName
lazy val avroToRawlsURL: String = s"$arrowRoot/$arrowAppName"

case class ImportEntitiesFromTSV(workspaceNamespace: String,
workspaceName: String,
tsvString: String)

case class ImportBagit(workspaceNamespace: String, workspaceName: String, bagitRq: BagitImportRequest)

case class ImportPFB(workspaceNamespace: String, workspaceName: String, pfbRequest: PfbImportRequest, userInfo: UserInfo)
case class PFBImportStatus(workspaceNamespace: String, workspaceName: String, jobId: String, userInfo: UserInfo)

def props(entityClientConstructor: (RequestContext, ModelSchema) => EntityClient, requestContext: RequestContext,
modelSchema: ModelSchema)(implicit executionContext: ExecutionContext): Props = {
Expand Down Expand Up @@ -129,22 +124,28 @@ object EntityClient {

}

class EntityClient (requestContext: RequestContext, modelSchema: ModelSchema, googleServicesDAO: GoogleServicesDAO)(implicit protected val executionContext: ExecutionContext)
extends Actor with FireCloudRequestBuilding with TSVFileSupport with LazyLogging {
class EntityClient(requestContext: RequestContext, modelSchema: ModelSchema, googleServicesDAO: GoogleServicesDAO)(implicit val executionContext: ExecutionContext)
extends Actor with RestJsonClient with TSVFileSupport with LazyLogging {

val format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZZ")

// ========================
// we have ambiguous implicit ActorRefFactories from Actor and RestJsonClient, so we need to tell sendReceive which to use,
// and we have to satisfy RestJsonClient's implicit
override implicit val system = context.system
private def sendRec = sendReceive(context, executionContext)
// ========================


override def receive: Receive = {
case ImportEntitiesFromTSV(workspaceNamespace: String, workspaceName: String, tsvString: String) =>
val pipeline = authHeaders(requestContext) ~> sendReceive
val pipeline = authHeaders(requestContext) ~> sendRec
importEntitiesFromTSV(pipeline, workspaceNamespace, workspaceName, tsvString) pipeTo sender
case ImportBagit(workspaceNamespace: String, workspaceName: String, bagitRq: BagitImportRequest) =>
val pipeline = authHeaders(requestContext) ~> sendReceive
val pipeline = authHeaders(requestContext) ~> sendRec
importBagit(pipeline, workspaceNamespace, workspaceName, bagitRq) pipeTo sender
case ImportPFB(workspaceNamespace: String, workspaceName: String, pfbRequest: PfbImportRequest, userInfo: UserInfo) =>
importPFB(workspaceNamespace, workspaceName, pfbRequest, userInfo) pipeTo sender
case PFBImportStatus(workspaceNamespace: String, workspaceName: String, jobId: String, userInfo: UserInfo) =>
pfbImportStatus(workspaceNamespace, workspaceName, jobId, userInfo) pipeTo sender
}


Expand Down Expand Up @@ -368,129 +369,30 @@ class EntityClient (requestContext: RequestContext, modelSchema: ModelSchema, go

def importPFB(workspaceNamespace: String, workspaceName: String, pfbRequest: PfbImportRequest, userInfo: UserInfo): Future[PerRequestMessage] = {

// initial validation of the presigned url provided as an argument
def validatePfbUrl(pfbRequest: PfbImportRequest)(op: URL => Future[PerRequestMessage]) = {
val pfbUrl = try {
new URL(pfbRequest.url)
} catch {
case e: Exception => throw new FireCloudExceptionWithErrorReport(ErrorReport(BadRequest, s"Invalid URL: ${pfbRequest.url}", e))
}
val acceptableProtocols = Seq("https") //for when we inevitably change our mind and need to support others

if (!acceptableProtocols.contains(pfbUrl.getProtocol)) {
throw new FireCloudExceptionWithErrorReport(ErrorReport(BadRequest, "Invalid URL protocol: must be https only"))
}
def enc(str: String) = java.net.URLEncoder.encode(str, "utf-8")

// TODO: add real (semantic) validation for origins, to ensure caller isn't supplying malice
op(pfbUrl)
}
// the payload to Import Service sends "path" and filetype. Here, we force-hardcode filetype because this API
// should only be used for PFBs.
val importServicePayload: ImportServiceRequest = ImportServiceRequest(path = pfbRequest.url.getOrElse(""), filetype = "pfb")

// probe Rawls with an empty upsert to this workspace. This checks that the workspace exists and that the user has
// permissions to perform an upsert.
def validateUpsertPermissions()(op: Boolean => PerRequestMessage) = {
val pipeline: WithTransformerConcatenation[HttpRequest, Future[HttpResponse]] = authHeaders(requestContext) ~> sendReceive
val importServiceUrl = s"${FireCloudConfig.ImportService.server}/${enc(workspaceNamespace)}/${enc(workspaceName)}/imports"

val probeUpsert = pipeline {
Post(FireCloudDirectiveUtils.encodeUri(Rawls.entityPathFromWorkspace(workspaceNamespace, workspaceName) + "/batchUpsert"), List.empty[String])
}
userAuthedRequest(Post(importServiceUrl, importServicePayload))(userInfo) map {
case resp if resp.status == Created =>
val importServiceResponse = unmarshal[ImportServiceResponse].apply(resp)
// for backwards compatibility, we return Accepted(202), even though import service returns Created(201),
// and we return a different response payload than what import service returns.

probeUpsert.map {
case resp if resp.status == NoContent => op(true)
case resp => RequestComplete(resp) // bubble up errors
}
}

// validate presigned URL
validatePfbUrl(pfbRequest) { pfbUrl =>
// empty-array batchUpsert to Rawls to verify workspace existence and permissions
validateUpsertPermissions() { _ =>
// generate job UUID
val jobId = java.util.UUID.randomUUID()

// the payload to Arrow needs to include the PFB url, job id, workspace info, and user info
val user = WorkbenchUserInfo(userInfo.id, userInfo.userEmail)
val arrowPayload = pfbRequest.copy(jobId = Option(jobId.toString),
workspace = Option(WorkspaceName(workspaceNamespace, workspaceName)),
user = Option(user))

// fire-and-forget call Arrow with UUID, workspace info, and presigned URL
val idToken = googleServicesDAO.getAdminIdentityToken
val gzipPipeline = addHeader (`Accept-Encoding`(gzip)) ~> addCredentials(OAuth2BearerToken(idToken)) ~> sendReceive ~> decode(Gzip)
gzipPipeline { Post(avroToRawlsURL, arrowPayload) }

// the response payload to the end user omits the userid/email
val responsePayload = arrowPayload.copy(user = None)
val responsePayload:PfbImportResponse = PfbImportResponse(
jobId = importServiceResponse.jobId,
url = pfbRequest.url.getOrElse(""),
workspace = WorkspaceName(workspaceNamespace, workspaceName)
)

// return Accepted with UUID
RequestComplete(Accepted, responsePayload)
}
}
}

def pfbImportStatus(workspaceNamespace: String, workspaceName: String, jobId: String, userInfo: UserInfo): Future[PerRequestMessage] = {

val bucketName = FireCloudConfig.Arrow.bucketName

def readStatusFile(filename: String, default: String): String = {
Try(googleServicesDAO.getObjectContentsAsRawlsSA(bucketName, s"$jobId/$filename")) match {
case Success(msg) =>
if (msg.trim.nonEmpty) msg else default
case Failure(ex) =>
logger.warn(s"error reading GCS object gs://$bucketName/$jobId/$filename", ex)
default
}
}

// list all files in the jobId's subdirectory and strip jobId prefix so we have just the filenames
Try(googleServicesDAO.listObjectsAsRawlsSA(bucketName, jobId + "/").map(_.replace(jobId + "/",""))) match {

case Failure(ex) =>
logger.warn(s"Error listing GCS objects for prefix gs://$bucketName/$jobId", ex)
// do not expose the bucket name in the error report we send to users
Future.successful(RequestCompleteWithErrorReport(InternalServerError, "Error listing bucket"))

case Success(files) =>
// what files do we have?
val (statusCode, statusString, message) = files.toSet match {

// no files yet, or subdirectory doesn't exist
case nothing if nothing.isEmpty =>
(NotFound, "NOT FOUND", "jobId not found or job not yet started")

// somebody wrote an error.json: it's an error.
case error if error.contains("error.json") =>
// try to read the error contents
val errorMessage = readStatusFile("error.json", "Error!")
(OK, "ERROR", errorMessage)

// we finished with a success.json: it's a success. Yay!
case success if success.contains("success.json") =>
// try to read the success contents
val successMessage = readStatusFile("success.json", "Success! Import completed.")
(OK, "SUCCESS", successMessage)

// we have a start marker, but do not yet have a success or error marker. It's in progress.
case started if started.contains("running.json") =>
val runningMessage = readStatusFile("running.json", "import in progress")
(OK, "RUNNING", runningMessage)

// we've got either/both of the upsert and the metadata, but no other markers - it's in progress.
case alsostarted if alsostarted == Set("upsert.json") || alsostarted == Set("metadata.json") || alsostarted == Set("upsert.json", "metadata.json") =>
(OK, "RUNNING", "import in progress")

// the files don't make sense; give up.
case _ =>
logger.warn(s"could not determine importPFB status for $jobId: $files")
(InternalServerError, "UNKNOWN", "could not determine status")
}

val responsePayload = JsObject(
"status" -> JsString(statusString),
"message" -> JsString(message),
"jobId" -> JsString(jobId)
)
case otherResp =>
RequestCompleteWithErrorReport(otherResp.status, otherResp.toString)

Future.successful(RequestComplete(statusCode, responsePayload))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,10 +229,8 @@ object FireCloudConfig {
val baseUrl: String = staticNotebooks.getString("baseUrl")
}

object Arrow {
private val arrow = config.getConfig("arrow")
val appName: String = if (arrow.hasPath("appName")) arrow.getString("appName") else "avro-import"
val baseUrl: String = arrow.getString("baseUrl")
val bucketName: String = arrow.getString("bucketName")
object ImportService {
lazy val server: String = if (config.hasPath("importService.server")) config.getString("importService.server") else ""
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ trait GoogleServicesDAO extends ReportsSubsystemStatus {
implicit val errorReportSource = ErrorReportSource(GoogleServicesDAO.serviceName)

def getAdminUserAccessToken: String
def getAdminIdentityToken: String
def getTrialBillingManagerAccessToken: String
def getTrialBillingManagerEmail: String
def getTrialSpreadsheetAccessToken: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,6 @@ object HttpGoogleServicesDAO extends GoogleServicesDAO with FireCloudRequestBuil
.refreshAccessToken().getTokenValue
}

override def getAdminIdentityToken: String = {
getScopedServiceAccountCredentials(firecloudAdminSACreds, Seq("openid", "email"))
.idTokenWithAudience(EntityClient.avroToRawlsURL, List()).getTokenValue
}

def getTrialBillingManagerAccessToken = {
getScopedServiceAccountCredentials(trialBillingSACreds, authScopes ++ billingScope)
.refreshAccessToken().getTokenValue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ object ModelJsonProtocol extends WorkspaceJsonSupport {
implicit val impUserImportPermission = jsonFormat2(UserImportPermission)

implicit val impBagitImportRequest = jsonFormat2(BagitImportRequest)
implicit val impPfbImportRequest = jsonFormat4(PfbImportRequest)
implicit val impPfbImportRequest = jsonFormat1(PfbImportRequest)
implicit val impPfbImportResponse = jsonFormat3(PfbImportResponse)
implicit val impImportServiceRequest = jsonFormat2(ImportServiceRequest)
implicit val impImportServiceResponse = jsonFormat3(ImportServiceResponse)

implicit val impWorkspaceStorageCostEstimate = jsonFormat1(WorkspaceStorageCostEstimate)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,20 @@ case class EntityId(entityType: String, entityName: String)

case class BagitImportRequest(bagitURL: String, format: String)

case class PfbImportRequest(
url: String,
jobId: Option[String] = None,
workspace: Option[WorkspaceName] = None,
user: Option[WorkbenchUserInfo] = None)
case class PfbImportRequest(url: Option[String])

case class PfbImportResponse(url: String,
jobId: String,
workspace: WorkspaceName)

case class ImportServiceRequest(
path: String,
filetype: String)

case class ImportServiceResponse(
jobId: String,
status: String,
message: Option[String])

case class MethodConfigurationId(
name: Option[String] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,19 @@ trait WorkspaceApiService extends HttpService with FireCloudRequestBuilding
}
}
}
} ~
get {
requireUserInfo() { _ =>
extract(_.request.uri.query) { query =>
passthrough(Uri(s"${FireCloudConfig.ImportService.server}/$workspaceNamespace/$workspaceName/imports").withQuery(query), HttpMethods.GET)
}
}
}
} ~
path("importPFB" / Segment) { jobId =>
get {
requireUserInfo() { userInfo =>
respondWithJSON { requestContext =>
perRequest(requestContext, EntityClient.props(entityClientConstructor, requestContext, FlexibleModelSchema),
EntityClient.PFBImportStatus(workspaceNamespace, workspaceName, jobId, userInfo))
}
passthrough(Uri(s"${FireCloudConfig.ImportService.server}/$workspaceNamespace/$workspaceName/imports/$jobId"), HttpMethods.GET)
}
}
} ~
Expand Down
6 changes: 2 additions & 4 deletions src/test/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ notification {
fullyQualifiedNotificationTopic = "dummy"
}

arrow {
baseUrl = "http://localhost:9394"
appName = "avro-import"
bucketName = "unit-test-arrow-bucket"
importService {
server = "http://localhost:9394"
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class MockGoogleServicesDAO extends GoogleServicesDAO {
val pubsubMessages = new LinkedBlockingQueue[String]()

override def getAdminUserAccessToken: String = "adminUserAccessToken"
override def getAdminIdentityToken: String = "adminIdentityToken"
override def getTrialBillingManagerAccessToken: String = "billingManagerAccessToken"
override def getTrialBillingManagerEmail: String = "mock-trial-billing-mgr-email"
override def getTrialSpreadsheetAccessToken: String = "trialSpreadsheetAccessToken"
Expand Down
Loading

0 comments on commit f314ef0

Please sign in to comment.