Skip to content

Commit

Permalink
Support DefaultAWSCredentialsProviderChain for passing in AWS S3 cred…
Browse files Browse the repository at this point in the history
…entials (#292)

* Support DefaultCredentialsProviderChain for AWK SDK

* Sync for rollback

* Sync for rollback

* Retest with minio, slight refactoring, turn down logging

* Fix a bug where failed file uploads would incorrectly decrement the files/byte counts

* Updated CHANGELOG.md

Co-authored-by: Luigi Marini <lmarini@users.noreply.github.com>
  • Loading branch information
bodom0015 and lmarini committed Oct 19, 2021
1 parent 73e52ef commit 276434f
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 107 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## Unreleased

### Added
- Support the [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
for passing in credentials to the S3ByteStorageService.

### Fixed
- Upgraded extractor parameters jsonform to version `2.2.5`.
- Cleaning up after a failed upload should no longer decrement the file + byte counts.

### Changed
- now building mongo-init and monitor docker containers with python 3.8
- Upgraded extractor parameters jsonform to version `2.2.5`.

### Removed
- check image is now part of [ncsa/checks](https://github.com/ncsa/checks/)
Expand Down
8 changes: 6 additions & 2 deletions app/services/mongodb/MongoDBFileService.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,20 +847,24 @@ class MongoDBFileService @Inject() (
}
}


// delete the actual file
if(isLastPointingToLoader(file.loader, file.loader_id)) {
val fileSize = if(isLastPointingToLoader(file.loader, file.loader_id)) {
for(preview <- previews.findByFileId(file.id)){
previews.removePreview(preview)
}
if(!file.thumbnail_id.isEmpty)
thumbnails.remove(UUID(file.thumbnail_id.get))
ByteStorageService.delete(file.loader, file.loader_id, FileDAO.COLLECTION)
file.length
} else {
0
}

import UUIDConversions._
FileDAO.removeById(file.id)
appConfig.incrementCount('files, -1)
appConfig.incrementCount('bytes, -file.length)
appConfig.incrementCount('bytes, -1 * fileSize)
current.plugin[ElasticsearchPlugin].foreach {
_.delete(id.stringify)
}
Expand Down
215 changes: 124 additions & 91 deletions app/services/s3/S3ByteStorageService.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,41 @@
package services.s3

import java.io.{File, FileOutputStream, IOException, InputStream}
import java.io.{IOException, InputStream}
import models.UUID

import com.amazonaws.auth.{AWSStaticCredentialsProvider, BasicAWSCredentials}
import com.amazonaws.auth.{AWSCredentialsProviderChain, AWSStaticCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.regions.Regions
import com.amazonaws.services.s3.model.{HeadBucketRequest, CreateBucketRequest, GetObjectRequest, ObjectMetadata}
import com.amazonaws.services.s3.model.{CreateBucketRequest, GetObjectRequest, HeadBucketRequest, ObjectMetadata}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import com.amazonaws.{AmazonClientException, ClientConfiguration}
import com.google.inject.Inject
import play.Logger
import play.api.Play
import services.ByteStorageService
import com.amazonaws.AmazonServiceException
import com.amazonaws.services.s3.transfer.TransferManager
import com.amazonaws.services.s3.transfer.TransferManagerBuilder
import com.amazonaws.services.s3.transfer.Upload
import services.s3.S3ByteStorageService.{handleACE, handleASE, handleIOE, handleUnknownError}


/**
* A ByteStorageService for Clowder that enables use of S3-compatible
* object stores to serve as the file backing for Clowder. This allows
* you to use an S3 bucket on AWS or Minio to store your files.
*
*
* Available Configuration Options:
* clowder.s3.serviceEndpoint - Host/port of the service to use for storage
* clowder.s3.bucketName - the name of the bucket that should be used to store files
* clowder.s3.accessKey - access key with which to access the bucket
* clowder.s3.secretKey - secret key associated with the access key above
* clowder.s3.region - the region where your S3 bucket lives
* clowder.s3.depth - the number of sub-paths to insert (default: 3)
* NOTE: this will randomly distribute files into smaller subdirectories and is recommended for performance reasons
*
* @author Mike Lambert
*
*/

/** Available configuration options for s3 storage */
object S3ByteStorageService {
Expand All @@ -25,42 +44,120 @@ object S3ByteStorageService {
val AccessKey: String = "clowder.s3.accessKey"
val SecretKey: String = "clowder.s3.secretKey"
val Region: String = "clowder.s3.region"

/* Reusable handlers for various Exception types */
def handleUnknownError(err: Exception = null) = {
if (err != null) {
Logger.error("An unknown error occurred in the S3ByteStorageService: " + err.toString)
} else {
Logger.error("An unknown error occurred in the S3ByteStorageService.")
}
}

/* Reusable handlers for various Exception types */
def handleIOE(err: IOException) = {
Logger.error("IOException occurred in the S3ByteStorageService: " + err)
}

/* Reusable handlers for various Exception types */
def handleACE(ace: AmazonClientException) = {
Logger.error("Caught an AmazonClientException, which " + "means the client encountered " + "an internal error while trying to " + "communicate with S3, " + "such as not being able to access the network.")
Logger.error("Error Message: " + ace.getMessage)
}

/* Reusable handlers for various Exception types */
def handleASE(ase: AmazonServiceException) = {
Logger.error("Caught an AmazonServiceException, which " + "means your request made it " + "to Amazon S3, but was rejected with an error response" + " for some reason.")
Logger.error("Error Message: " + ase.getMessage)
Logger.error("HTTP Status Code: " + ase.getStatusCode)
Logger.error("AWS Error Code: " + ase.getErrorCode)
Logger.error("Error Type: " + ase.getErrorType)
Logger.error("Request ID: " + ase.getRequestId)
}
}

/**
*
* A ByteStorageService for Clowder that enables use of S3-compatible
* object stores to serve as the file backing for Clowder. This allows
* you to use an S3 bucket on AWS or Minio to store your files.
*
*
* Available Configuration Options:
* clowder.s3.serviceEndpoint - Host/port of the service to use for storage
* clowder.s3.bucketName - the name of the bucket that should be used to store files
* clowder.s3.accessKey - access key with which to access the bucket
* clowder.s3.secretKey - secret key associated with the access key above
* clowder.s3.region - the region where your S3 bucket lives (currently unused)
*
*
* @author Mike Lambert
*
*/
class S3ByteStorageService @Inject()() extends ByteStorageService {
val s3: AmazonS3 = this.buildS3Client()
this.ensureBucketAccessAndExists()

def buildS3Client(): AmazonS3 = {
// NOTE: Region is ignored for MinIO case
val s3client = (Play.current.configuration.getString(S3ByteStorageService.ServiceEndpoint), Play.current.configuration.getString(S3ByteStorageService.Region)) match {
case (Some(serviceEndpoint), Some(region)) => {
Logger.debug("Creating S3 Client with custom endpoint and region: " + serviceEndpoint + " in region " + region)
AmazonS3ClientBuilder.standard().withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region))
}
case (Some(serviceEndpoint), None) => {
Logger.debug("Creating S3 Client with custom endpoint: " + serviceEndpoint + " (using default region)")
AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, Regions.US_EAST_1.name()))
}
case (None, Some(region)) => {
Logger.debug("Creating S3 Client with custom region: " + region)
AmazonS3ClientBuilder.standard().withRegion(region)
}
case (None, None) => {
Logger.debug("Creating S3 Client with default region.")
AmazonS3ClientBuilder.standard()
}
}

// Search for AccessKey / SecretKey in config (envvar values will be populated here by default)
(Play.current.configuration.getString(S3ByteStorageService.AccessKey),
Play.current.configuration.getString(S3ByteStorageService.SecretKey)) match {
case (Some(accessKey), Some(secretKey)) => {
val credentials = new BasicAWSCredentials(accessKey, secretKey)
val clientConfiguration = new ClientConfiguration
clientConfiguration.setSignerOverride("AWSS3V4SignerType")

Logger.debug("Creating S3 Client with custom credentials.")

// Only run a single thread at a time when verifying bucket existence
synchronized {
return s3client.withClientConfiguration(clientConfiguration)
.withPathStyleAccessEnabled(true)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build()
}
case (None, None) => {
Logger.debug("Creating S3 Client with default credentials.")

return s3client.withCredentials(DefaultAWSCredentialsProviderChain.getInstance)
.withPathStyleAccessEnabled(true)
.build()
}

case _ => {
val errMsg = "Bad S3 configuration: accessKey and secretKey are both required if one is given. Falling back to default credentials..."
Logger.warn(errMsg)

return s3client.withCredentials(DefaultAWSCredentialsProviderChain.getInstance)
.withPathStyleAccessEnabled(true)
.build()
}
}
}

def ensureBucketAccessAndExists() = {
// Ensure that bucket exists and that we have access to it before continuing
Play.current.configuration.getString(S3ByteStorageService.BucketName) match {
case Some(bucketName) => {
try {
// Validate configuration by checking for bucket existence on startup
this.s3Bucket.headBucket(new HeadBucketRequest(bucketName))
this.s3.headBucket(new HeadBucketRequest(bucketName))
Logger.debug("Confirmed access to configured S3 bucket. S3ByteStorageService loading is complete.")
} catch {
case sdke @ (_: AmazonClientException | _: AmazonServiceException) => {
case sdke@(_: AmazonClientException | _: AmazonServiceException) => {
if (sdke.getMessage.contains("Status Code: 404")) {
Logger.warn("Configured S3 bucket does not exist, attempting to create it now...")
try {
// Bucket does not exist - create the bucket
this.s3Bucket.createBucket(new CreateBucketRequest(bucketName))
this.s3.createBucket(new CreateBucketRequest(bucketName))
Logger.debug("Created configured S3 bucket. S3ByteStorageService loading is complete.")
} catch {
// Bucket could not be created - abort
case _: Throwable => throw new RuntimeException("Bad S3 configuration: Bucket does not exist and could not be created.")
Expand All @@ -75,50 +172,13 @@ class S3ByteStorageService @Inject()() extends ByteStorageService {
throw new RuntimeException("Bad S3 configuration: an unknown error has occurred - " + errMsg)
}
}
case _: Throwable => handleUnknownError(_)
case _: Throwable => throw new RuntimeException("Bad S3 configuration: an unknown error has occurred.")
}
}
case _ => throw new RuntimeException("Bad S3 configuration: verify that you have set all configuration options.")
}
}

/**
* Grabs config parameters from Clowder to return a
* AmazonS3 pointing at the configured service endpoint.
*/
def s3Bucket(): AmazonS3 = {
(Play.current.configuration.getString(S3ByteStorageService.ServiceEndpoint),
Play.current.configuration.getString(S3ByteStorageService.AccessKey),
Play.current.configuration.getString(S3ByteStorageService.SecretKey)) match {
case (Some(serviceEndpoint), Some(accessKey), Some(secretKey)) => {
val credentials = new BasicAWSCredentials(accessKey, secretKey)
val clientConfiguration = new ClientConfiguration
clientConfiguration.setSignerOverride("AWSS3V4SignerType")

Logger.debug("Created S3 Client for " + serviceEndpoint)

val region = Play.current.configuration.getString(S3ByteStorageService.Region) match {
case Some(region) => region
case _ => Regions.US_EAST_1.name()
}

return AmazonS3ClientBuilder.standard()
// NOTE: Region is ignored for MinIO case?
// TODO: Allow user to set region for AWS case?
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(serviceEndpoint, region))
.withPathStyleAccessEnabled(true)
.withClientConfiguration(clientConfiguration)
.withCredentials(new AWSStaticCredentialsProvider(credentials))
.build()
}
case _ => {
val errMsg = "Bad S3 configuration: verify that you have set all configuration options."
Logger.error(errMsg)
throw new RuntimeException(errMsg)
}
}
}

/**
* Store bytes to the specified path within the configured S3 bucket.
*
Expand All @@ -131,8 +191,8 @@ class S3ByteStorageService @Inject()() extends ByteStorageService {
Play.current.configuration.getString(S3ByteStorageService.BucketName) match {
case None => Logger.error("Failed saving bytes: failed to find configured S3 bucketName.")
case Some(bucketName) => {
val xferManager: TransferManager = TransferManagerBuilder.standard().withS3Client(this.s3Bucket).build
try {
val xferManager = TransferManagerBuilder.standard().withS3Client(this.s3).build
Logger.debug("Saving file to: /" + bucketName + "/" + prefix)

val id = UUID.generate.stringify
Expand Down Expand Up @@ -163,7 +223,7 @@ class S3ByteStorageService @Inject()() extends ByteStorageService {
//XferMgrProgress.showTransferProgress(xfer)
// or block with Transfer.waitForCompletion()
xfer.waitForCompletion()
xferManager.shutdownNow()
xferManager.shutdownNow(false)

Logger.debug("File saved to: /" + bucketName + "/" + prefix + "/" + targetPath)

Expand Down Expand Up @@ -199,7 +259,7 @@ class S3ByteStorageService @Inject()() extends ByteStorageService {
// Download object from S3 bucket
// NOTE: path should already contain the prefix
val rangeObjectRequest = new GetObjectRequest(bucketName, path)
val objectPortion = this.s3Bucket.getObject(rangeObjectRequest)
val objectPortion = s3.getObject(rangeObjectRequest)

return Option(objectPortion.getObjectContent)
} catch {
Expand Down Expand Up @@ -231,7 +291,7 @@ class S3ByteStorageService @Inject()() extends ByteStorageService {
try {
// Delete object from S3 bucket
// NOTE: path should already contain the prefix
this.s3Bucket.deleteObject(bucketName, path)
s3.deleteObject(bucketName, path)
return true
} catch {
case ase: AmazonServiceException => handleASE(ase)
Expand All @@ -245,31 +305,4 @@ class S3ByteStorageService @Inject()() extends ByteStorageService {
// Return false (in case of failure)
return false
}

/* Reusable handlers for various Exception types */
def handleUnknownError(err: Exception = null) = {
if (err != null) {
Logger.error("An unknown error occurred in the S3ByteStorageService: " + err.toString)
} else {
Logger.error("An unknown error occurred in the S3ByteStorageService.")
}
}

def handleIOE(err: IOException) = {
Logger.error("IOException occurred in the S3ByteStorageService: " + err)
}

def handleACE(ace: AmazonClientException) = {
Logger.error("Caught an AmazonClientException, which " + "means the client encountered " + "an internal error while trying to " + "communicate with S3, " + "such as not being able to access the network.")
Logger.error("Error Message: " + ace.getMessage)
}

def handleASE(ase: AmazonServiceException) = {
Logger.error("Caught an AmazonServiceException, which " + "means your request made it " + "to Amazon S3, but was rejected with an error response" + " for some reason.")
Logger.error("Error Message: " + ase.getMessage)
Logger.error("HTTP Status Code: " + ase.getStatusCode)
Logger.error("AWS Error Code: " + ase.getErrorCode)
Logger.error("Error Type: " + ase.getErrorType)
Logger.error("Request ID: " + ase.getRequestId)
}
}

0 comments on commit 276434f

Please sign in to comment.