Skip to content

Commit

Permalink
Centaur blob filesystem (#7104)
Browse files Browse the repository at this point in the history
  • Loading branch information
THWiseman committed Apr 4, 2023
1 parent 2768cca commit f01cc4b
Show file tree
Hide file tree
Showing 10 changed files with 128 additions and 35 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(cloudSupport)

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,11 @@ metadata {
status: Succeeded
"outputs.azure_blob_storage_read.s1": "This is my test file! Did it work??"
}

# az:// is the root of the container specified in reference.conf.
# Here, we verify that exactly one log was written.

fileSystemCheck: "blob"
outputExpectations: {
"az://test-cromwell-workflow-logs/workflow.<<UUID>>.log" : 1
}
6 changes: 6 additions & 0 deletions centaur/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,11 @@ centaur {
include "centaur_aws_credentials.conf"
}

azure {
container: "test-blob"
endpoint: "https://centaurtesting.blob.core.windows.net"
subscription: "62b22893-6bc1-46d9-8a90-806bb3cce3c9"
}

log-request-failures = false
}
11 changes: 11 additions & 0 deletions centaur/src/main/scala/centaur/test/FilesChecker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ object AWSFilesChecker extends FilesChecker {

override def countObjectsAtPath: String => Int = s3Client.countObjects(s3PrefixRegex)
}

object BlobFilesChecker extends FilesChecker {
import ObjectCounterInstances.blobObjectCounter
import ObjectCounterSyntax._

private lazy val containerClient = Operations.blobContainerClient

// The root of the endpoint + container specified in reference.conf will be substituted for az://
private val azurePrefixRange = "^az:\\/\\/.*"
override def countObjectsAtPath: String => Int = ObjectCounterSyntax(containerClient).countObjects(azurePrefixRange)
}
20 changes: 20 additions & 0 deletions centaur/src/main/scala/centaur/test/ObjectCounter.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package centaur.test

import com.azure.storage.blob.BlobContainerClient
import com.google.cloud.storage.Storage.BlobListOption
import com.google.cloud.storage.{Blob, Storage}
import software.amazon.awssdk.services.s3.S3Client
Expand Down Expand Up @@ -38,6 +39,25 @@ object ObjectCounterInstances {
storage.list(g.bucket, BlobListOption.prefix(g.directory)).iterateAll.asScala
listObjectsAtPath(_).size
}

implicit val blobObjectCounter: ObjectCounter[BlobContainerClient] = (containerClient : BlobContainerClient) => {
val pathToInt: Path => Int = providedPath => {
//Our path parsing is somewhat GCP centric. Convert to a blob path starting from the container root.
def pathToBlobPath(parsedPath : Path) : String = {
(Option(parsedPath.bucket), Option(parsedPath.directory)) match {
case (None, _) => ""
case (Some(_), None) => parsedPath.bucket
case (Some(_), Some(_)) => parsedPath.bucket + "/" + parsedPath.directory
}
}

val fullPath = pathToBlobPath(providedPath)
val blobsInFolder = containerClient.listBlobsByHierarchy(fullPath)
//if something "isPrefix", it's a directory. Otherwise, its a file. We just want to count files.
blobsInFolder.asScala.count(!_.isPrefix)
}
pathToInt(_)
}
}

object ObjectCounterSyntax {
Expand Down
9 changes: 9 additions & 0 deletions centaur/src/main/scala/centaur/test/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import centaur.test.metadata.WorkflowFlatMetadata
import centaur.test.metadata.WorkflowFlatMetadata._
import centaur.test.submit.SubmitHttpResponse
import centaur.test.workflow.Workflow
import com.azure.storage.blob.BlobContainerClient
import com.google.api.services.genomics.v2alpha1.{Genomics, GenomicsScopes}
import com.google.api.services.storage.StorageScopes
import com.google.auth.Credentials
Expand All @@ -23,6 +24,7 @@ import configs.syntax._
import cromwell.api.CromwellClient.UnsuccessfulRequestException
import cromwell.api.model.{CallCacheDiff, Failed, HashDifference, SubmittedWorkflow, Succeeded, TerminalStatus, WaasDescription, WorkflowId, WorkflowMetadata, WorkflowStatus}
import cromwell.cloudsupport.aws.AwsConfiguration
import cromwell.cloudsupport.azure.{AzureUtils}
import cromwell.cloudsupport.gcp.GoogleConfiguration
import cromwell.cloudsupport.gcp.auth.GoogleAuthMode
import io.circe.parser._
Expand Down Expand Up @@ -150,6 +152,13 @@ object Operations extends StrictLogging {
.build()
}

lazy val azureConfig: Config = CentaurConfig.conf.getConfig("azure")
val azureSubscription = azureConfig.getString("subscription")
val blobContainer = azureConfig.getString("container")
val azureEndpoint = azureConfig.getString("endpoint")
//NB: Centaur will throw an exception if it isn't able to authenticate with Azure blob storage via the local environment.
lazy val blobContainerClient: BlobContainerClient = AzureUtils.buildContainerClientFromLocalEnvironment(blobContainer, azureEndpoint, Option(azureSubscription)).get

def submitWorkflow(workflow: Workflow): Test[SubmittedWorkflow] = {
new Test[SubmittedWorkflow] {
override def run: IO[SubmittedWorkflow] = for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package centaur.test.workflow

import cats.data.Validated._
import cats.syntax.all._
import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker}
import centaur.test.{AWSFilesChecker, FilesChecker, LocalFilesChecker, PipelinesFilesChecker, BlobFilesChecker}
import com.typesafe.config.Config
import common.validation.ErrorOr.ErrorOr
import configs.Result
Expand All @@ -25,8 +25,9 @@ object DirectoryContentCountCheck {
case Result.Success("gcs") => valid(PipelinesFilesChecker)
case Result.Success("local") => valid(LocalFilesChecker)
case Result.Success("aws") => valid(AWSFilesChecker)
case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'")
case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs' or 'aws'")
case Result.Success("blob") => valid(BlobFilesChecker)
case Result.Success(_) => invalidNel(s"Test '$name': Invalid 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'")
case Result.Failure(_) => invalidNel(s"Test '$name': Must specify a 'fileSystemCheck' value (must be either 'local', 'gcs', 'blob', or 'aws'")
}

(directoryContentCountsValidation, fileSystemChecker) mapN { (d, f) => Option(DirectoryContentCountCheck(d, f)) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cromwell.cloudsupport.azure

import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.google.common.net.UrlEscapers

import java.net.URI
import scala.jdk.CollectionConverters.IterableHasAsScala
import scala.util.{Failure, Success, Try}

object AzureUtils {
/**
* Generates a BlobContainerClient that can interact with the specified container. Authenticates using the local azure client running on the same machine.
* @param blobContainer Name of the blob container. Looks something like "my-blob-container".
* @param azureEndpoint Azure endpoint of the container. Looks something like https://somedomain.blob.core.windows.net.
* @param subscription Azure subscription. A globally unique identifier. If not provided, a default subscription will be used.
* @return A blob container client capable of interacting with the specified container.
*/
def buildContainerClientFromLocalEnvironment(blobContainer: String, azureEndpoint: String, subscription : Option[String]): Try[BlobContainerClient] = {
def parseURI(string: String): Try[URI] = Try(URI.create(UrlEscapers.urlFragmentEscaper().escape(string)))
def parseStorageAccount(uri: URI): Try[String] = uri.getHost.split("\\.").find(_.nonEmpty)
.map(Success(_)).getOrElse(Failure(new Exception("Could not parse storage account")))

val azureProfile = new AzureProfile(AzureEnvironment.AZURE)

def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build

def authenticateWithSubscription(sub: String) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub)

def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()

def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)

def findAzureStorageAccount(storageAccountName: String) = azure.storageAccounts.list.asScala.find(_.name.equals(storageAccountName))
.map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found.")))

def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpointURL: String, blobContainerName: String): BlobContainerClient = {
new BlobContainerClientBuilder()
.credential(credential)
.endpoint(endpointURL)
.containerName(blobContainerName)
.buildClient()
}

def generateBlobContainerClient: Try[BlobContainerClient] = for {
uri <- parseURI(azureEndpoint)
configuredAccount <- parseStorageAccount(uri)
azureAccount <- findAzureStorageAccount(configuredAccount)
keys = azureAccount.getKeys.asScala
key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_))
first = key.value
sskc = new StorageSharedKeyCredential(configuredAccount, first)
bcc = buildBlobContainerClient(sskc, azureEndpoint, blobContainer)
} yield bcc

generateBlobContainerClient
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package cromwell.filesystems.blob

import com.azure.core.credential.AzureSasCredential
import com.azure.core.management.AzureEnvironment
import com.azure.core.management.profile.AzureProfile
import com.azure.identity.DefaultAzureCredentialBuilder
import com.azure.resourcemanager.AzureResourceManager
import com.azure.resourcemanager.storage.models.StorageAccountKey
import com.azure.storage.blob.nio.AzureFileSystem
import com.azure.storage.blob.sas.{BlobContainerSasPermission, BlobServiceSasSignatureValues}
import com.azure.storage.blob.{BlobContainerClient, BlobContainerClientBuilder}
import com.azure.storage.common.StorageSharedKeyCredential
import com.typesafe.config.Config
import com.typesafe.scalalogging.LazyLogging
import common.validation.Validation._
import cromwell.cloudsupport.azure.AzureUtils

import java.net.URI
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
Expand Down Expand Up @@ -223,22 +217,6 @@ case class WSMBlobSasTokenGenerator(container: BlobContainerName,
}

case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: EndpointURL, subscription: Option[SubscriptionId] = None) extends BlobSasTokenGenerator {
private val azureProfile = new AzureProfile(AzureEnvironment.AZURE)
private def azureCredentialBuilder = new DefaultAzureCredentialBuilder()
.authorityHost(azureProfile.getEnvironment.getActiveDirectoryEndpoint)
.build
private def authenticateWithSubscription(sub: SubscriptionId) = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withSubscription(sub.toString)
private def authenticateWithDefaultSubscription = AzureResourceManager.authenticate(azureCredentialBuilder, azureProfile).withDefaultSubscription()
private def azure = subscription.map(authenticateWithSubscription(_)).getOrElse(authenticateWithDefaultSubscription)
private def findAzureStorageAccount(name: StorageAccountName) = azure.storageAccounts.list.asScala.find(_.name.equals(name.value))
.map(Success(_)).getOrElse(Failure(new Exception("Azure Storage Account not found")))
private def buildBlobContainerClient(credential: StorageSharedKeyCredential, endpoint: EndpointURL, container: BlobContainerName): BlobContainerClient = {
new BlobContainerClientBuilder()
.credential(credential)
.endpoint(endpoint.value)
.containerName(container.value)
.buildClient()
}
private val bcsp = new BlobContainerSasPermission()
.setReadPermission(true)
.setCreatePermission(true)
Expand All @@ -252,14 +230,7 @@ case class NativeBlobSasTokenGenerator(container: BlobContainerName, endpoint: E
* @return an AzureSasCredential for accessing a blob container
*/
def generateBlobSasToken: Try[AzureSasCredential] = for {
uri <- BlobPathBuilder.parseURI(endpoint.value)
configuredAccount <- BlobPathBuilder.parseStorageAccount(uri)
azureAccount <- findAzureStorageAccount(configuredAccount)
keys = azureAccount.getKeys.asScala
key <- keys.headOption.fold[Try[StorageAccountKey]](Failure(new Exception("Storage account has no keys")))(Success(_))
first = key.value
sskc = new StorageSharedKeyCredential(configuredAccount.value, first)
bcc = buildBlobContainerClient(sskc, endpoint, container)
bcc <- AzureUtils.buildContainerClientFromLocalEnvironment(container.toString, endpoint.toString, subscription.map(_.toString))
bsssv = new BlobServiceSasSignatureValues(OffsetDateTime.now.plusDays(1), bcsp)
asc = new AzureSasCredential(bcc.generateSas(bsssv))
} yield asc
Expand Down
3 changes: 2 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ object Dependencies {
exclude("jakarta.xml.bind", "jakarta.xml.bind-api")
exclude("jakarta.activation", "jakarta.activation-api"),
"com.azure" % "azure-core-management" % "1.7.1",
"com.fasterxml.jackson.dataformat" % "jackson-dataformat-xml" % jacksonV,
"com.azure.resourcemanager" % "azure-resourcemanager" % "2.18.0"
)

Expand Down Expand Up @@ -409,7 +410,7 @@ object Dependencies {
"com.lihaoyi" %% "pprint" % pprintV,
) ++ catsDependencies ++ configDependencies ++ slf4jFacadeDependencies ++ refinedTypeDependenciesList

val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies
val cloudSupportDependencies: List[ModuleID] = googleApiClientDependencies ++ googleCloudDependencies ++ betterFilesDependencies ++ awsCloudDependencies ++ azureDependencies

val databaseSqlDependencies: List[ModuleID] = List(
"commons-io" % "commons-io" % commonsIoV,
Expand Down

0 comments on commit f01cc4b

Please sign in to comment.