From c3fd97e4d26bfae8bd53eeeff16468860073ac8f Mon Sep 17 00:00:00 2001 From: Ken Gilmer Date: Thu, 6 May 2021 19:35:56 -0700 Subject: [PATCH 1/5] Add s3 file ingestion sample to examples source tree for M1 release. --- examples/build.gradle.kts | 2 +- examples/gradle.properties | 2 +- examples/s3-media-ingestion/build.gradle.kts | 11 ++ .../src/main/kotlin/Main.kt | 122 ++++++++++++++++++ examples/settings.gradle.kts | 1 + 5 files changed, 136 insertions(+), 2 deletions(-) create mode 100644 examples/s3-media-ingestion/build.gradle.kts create mode 100644 examples/s3-media-ingestion/src/main/kotlin/Main.kt diff --git a/examples/build.gradle.kts b/examples/build.gradle.kts index ef6d35ca4a6..eeec1a5f9fa 100644 --- a/examples/build.gradle.kts +++ b/examples/build.gradle.kts @@ -4,7 +4,7 @@ plugins { allprojects { group = "aws.sdk.kotlin.example" - version = "1.0-SNAPSHOT" + version = "2.0-SNAPSHOT" repositories { maven { diff --git a/examples/gradle.properties b/examples/gradle.properties index a27c31caa6d..856b6c1aa15 100644 --- a/examples/gradle.properties +++ b/examples/gradle.properties @@ -1,4 +1,4 @@ # AWS SDK -awsSdkKotlinVersion=0.1.0 +awsSdkKotlinVersion=0.2.0 diff --git a/examples/s3-media-ingestion/build.gradle.kts b/examples/s3-media-ingestion/build.gradle.kts new file mode 100644 index 00000000000..601108f4aba --- /dev/null +++ b/examples/s3-media-ingestion/build.gradle.kts @@ -0,0 +1,11 @@ +plugins { + kotlin("jvm") +} + +val awsSdkKotlinVersion: String by project + +dependencies { + implementation(kotlin("stdlib")) + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.3") + implementation("aws.sdk.kotlin:s3:$awsSdkKotlinVersion") +} diff --git a/examples/s3-media-ingestion/src/main/kotlin/Main.kt b/examples/s3-media-ingestion/src/main/kotlin/Main.kt new file mode 100644 index 00000000000..0f905ab291b --- /dev/null +++ b/examples/s3-media-ingestion/src/main/kotlin/Main.kt @@ -0,0 +1,122 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ +import aws.sdk.kotlin.services.s3.S3Client +import aws.sdk.kotlin.services.s3.model.ListBucketsRequest +import aws.sdk.kotlin.services.s3.model.ListObjectsRequest +import aws.sdk.kotlin.services.s3.model.PutObjectRequest +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.runBlocking +import software.aws.clientrt.content.ByteStream +import software.aws.clientrt.content.fromFile +import java.io.File +import java.nio.file.Files + +const val bucketName = "s3-media-ingestion-example" +const val ingestionDirPath = "/tmp/media-in" +const val completedDirPath = "/tmp/media-processed" +const val failedDirPath = "/tmp/media-failed" + +// media metadata is extracted from filename: _<year>.avi +val filenameMetadataRegex = "([\\w\\s]+)_([\\d]+).avi".toRegex() + +/** + * This program reads media files from a specified directory and uploads media files to S3 + */ +fun main() = runBlocking { + val client = S3Client { region = "us-east-2" } + + try { + validateS3(client) + listOf(completedDirPath, failedDirPath).forEach { validateDirectory(it) } + val ingestionDir = validateDirectory(ingestionDirPath) + + val uploadResults = ingestionDir + .walk().asFlow() + .mapNotNull(::mediaMetadataExtractor) + .map { mediaMetadata -> + uploadToS3(client, mediaMetadata) + } + .toList() + + moveFiles(uploadResults) + + val (successes, failures) = uploadResults.partition { it is Success } + when (failures.isEmpty()) { + true -> println("Media uploaded successfully: $successes") + false -> println("Successfully uploaded: $successes \nFailed to upload: $failures") + } + } finally { + client.close() + } +} + +// Check for valid S3 configuration based on account +suspend fun validateS3(s3Client: S3Client) { + val listBucketResponse = s3Client.listBuckets(ListBucketsRequest { }) + check(listBucketResponse.buckets?.any { it.name == bucketName } ?: false) { "Bucket $bucketName does not exist" } +} + +// Move files to directories based on upload results +fun moveFiles(uploadResults: List<UploadResult>) = + uploadResults + .map { uploadResult -> uploadResult.mediaMetadata.file.toPath() to (uploadResult is Success) } + .forEach { (file, uploadSuccess) -> + val targetFilePath = if (uploadSuccess) completedDirPath else failedDirPath + val targetPath = File(targetFilePath) + Files.move(file, File(targetPath, file.fileName.toString()).toPath()) + } + +// Classes for S3 upload results +sealed class UploadResult { abstract val mediaMetadata: MediaMetadata } +data class Success(val location: String, override val mediaMetadata: MediaMetadata) : UploadResult() +data class UploadError(val error: Throwable, override val mediaMetadata: MediaMetadata) : UploadResult() +data class Failure(val reason: String, override val mediaMetadata: MediaMetadata) : UploadResult() + +// Upload to S3 if file not already uploaded +suspend fun uploadToS3(s3Client: S3Client, mediaMetadata: MediaMetadata): UploadResult { + val existsInS3 = s3Client + .listObjects(ListObjectsRequest { bucket = bucketName }) + .contents?.any { it.key == mediaMetadata.s3KeyName } ?: false + + if (existsInS3) return Failure("${mediaMetadata.s3KeyName} already uploaded.", mediaMetadata) + + return try { + s3Client.putObject( + PutObjectRequest { + bucket = bucketName + key = mediaMetadata.s3KeyName + body = ByteStream.fromFile(mediaMetadata.file) + metadata = mediaMetadata.toMap() + } + ) + Success("$bucketName/${mediaMetadata.s3KeyName}", mediaMetadata) + } catch (e: Exception) { + UploadError(e, mediaMetadata) + } +} + +// Classes, properties, and functions for media metadata +data class MediaMetadata(val title: String, val year: Int, val file: File) +val MediaMetadata.s3KeyName get() = "$title-$year" +fun MediaMetadata.toMap() = mapOf("title" to title, "year" to year.toString()) +fun mediaMetadataExtractor(file: File): MediaMetadata? { + if (!file.isFile || file.length() == 0L) return null + + val matchResult = filenameMetadataRegex.find(file.name) ?: return null + + val (title, year) = matchResult.destructured + return MediaMetadata(title, year.toInt(), file) +} + +// Validate file path and optionally create directory +fun validateDirectory(dirPath: String): File { + val dir = File(dirPath) + + require(dir.isDirectory || !dir.exists()) { "Unable to use $dir" } + + if (!dir.exists()) require(dir.mkdirs()) { "Unable to create $dir" } + + return dir +} diff --git a/examples/settings.gradle.kts b/examples/settings.gradle.kts index 0cfef73a710..90b1085dd58 100644 --- a/examples/settings.gradle.kts +++ b/examples/settings.gradle.kts @@ -1,3 +1,4 @@ rootProject.name = "aws-sdk-kotlin-examples" include(":dynamodb-movies") +include(":s3-media-ingestion") \ No newline at end of file From af5fac41fdf39732daed432e110ca7f9ad78b2c3 Mon Sep 17 00:00:00 2001 From: Ken Gilmer <kggilmer@amazon.com> Date: Fri, 7 May 2021 14:53:12 -0700 Subject: [PATCH 2/5] Updates based on PR feedback. --- .../src/main/kotlin/Main.kt | 131 ++++++++++++------ 1 file changed, 87 insertions(+), 44 deletions(-) diff --git a/examples/s3-media-ingestion/src/main/kotlin/Main.kt b/examples/s3-media-ingestion/src/main/kotlin/Main.kt index 0f905ab291b..3e8bc0c2206 100644 --- a/examples/s3-media-ingestion/src/main/kotlin/Main.kt +++ b/examples/s3-media-ingestion/src/main/kotlin/Main.kt @@ -3,87 +3,91 @@ * SPDX-License-Identifier: Apache-2.0. */ import aws.sdk.kotlin.services.s3.S3Client -import aws.sdk.kotlin.services.s3.model.ListBucketsRequest -import aws.sdk.kotlin.services.s3.model.ListObjectsRequest -import aws.sdk.kotlin.services.s3.model.PutObjectRequest +import aws.sdk.kotlin.services.s3.model.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.runBlocking import software.aws.clientrt.content.ByteStream import software.aws.clientrt.content.fromFile +import software.aws.clientrt.content.writeToFile import java.io.File import java.nio.file.Files -const val bucketName = "s3-media-ingestion-example" +/** + * This program reads media files from a specified directory and uploads media files to S3 + * + * This is purely an example. Any file with the extension `.avi` will be processed. To test + * create a text file and name it such that it matches the [filenameMetadataRegex] regex, ex: + * `title_2000.avi`. + * + * When running the sample adjust the following path constants as needed for your local environment. + */ +const val bucketName = "s3-media-ingestion-example2" const val ingestionDirPath = "/tmp/media-in" const val completedDirPath = "/tmp/media-processed" const val failedDirPath = "/tmp/media-failed" +const val downloadDirPath = "/tmp/media-failed" // media metadata is extracted from filename: <title>_<year>.avi val filenameMetadataRegex = "([\\w\\s]+)_([\\d]+).avi".toRegex() -/** - * This program reads media files from a specified directory and uploads media files to S3 - */ -fun main() = runBlocking { +fun main(): Unit = runBlocking { val client = S3Client { region = "us-east-2" } try { - validateS3(client) - listOf(completedDirPath, failedDirPath).forEach { validateDirectory(it) } + // Setup + client.ensureBucketExists(bucketName) + listOf(completedDirPath, failedDirPath, downloadDirPath).forEach { validateDirectory(it) } val ingestionDir = validateDirectory(ingestionDirPath) + // Upload files val uploadResults = ingestionDir - .walk().asFlow() + .walk() + .asFlow() .mapNotNull(::mediaMetadataExtractor) - .map { mediaMetadata -> - uploadToS3(client, mediaMetadata) - } + .map { mediaMetadata -> client.uploadToS3(mediaMetadata) } .toList() moveFiles(uploadResults) + // Print results of operation val (successes, failures) = uploadResults.partition { it is Success } when (failures.isEmpty()) { true -> println("Media uploaded successfully: $successes") false -> println("Successfully uploaded: $successes \nFailed to upload: $failures") } + + // Download files to verify + client.listObjects(ListObjectsRequest { bucket = bucketName }).contents?.forEach { obj -> + client.getObject(GetObjectRequest { key = obj.key; bucket = bucketName }) { response -> + response.body?.writeToFile(File(downloadDirPath, obj.key!!)) + } + } } finally { client.close() } } -// Check for valid S3 configuration based on account -suspend fun validateS3(s3Client: S3Client) { - val listBucketResponse = s3Client.listBuckets(ListBucketsRequest { }) - check(listBucketResponse.buckets?.any { it.name == bucketName } ?: false) { "Bucket $bucketName does not exist" } +/** Check for valid S3 configuration based on account */ +suspend fun S3Client.ensureBucketExists(bucketName: String) { + if (!bucketExists(bucketName)) { + createBucket( + CreateBucketRequest { + bucket = bucketName + createBucketConfiguration { + locationConstraint = BucketLocationConstraint.UsEast2 + } + } + ) + } } -// Move files to directories based on upload results -fun moveFiles(uploadResults: List<UploadResult>) = - uploadResults - .map { uploadResult -> uploadResult.mediaMetadata.file.toPath() to (uploadResult is Success) } - .forEach { (file, uploadSuccess) -> - val targetFilePath = if (uploadSuccess) completedDirPath else failedDirPath - val targetPath = File(targetFilePath) - Files.move(file, File(targetPath, file.fileName.toString()).toPath()) - } - -// Classes for S3 upload results -sealed class UploadResult { abstract val mediaMetadata: MediaMetadata } -data class Success(val location: String, override val mediaMetadata: MediaMetadata) : UploadResult() -data class UploadError(val error: Throwable, override val mediaMetadata: MediaMetadata) : UploadResult() -data class Failure(val reason: String, override val mediaMetadata: MediaMetadata) : UploadResult() - -// Upload to S3 if file not already uploaded -suspend fun uploadToS3(s3Client: S3Client, mediaMetadata: MediaMetadata): UploadResult { - val existsInS3 = s3Client - .listObjects(ListObjectsRequest { bucket = bucketName }) - .contents?.any { it.key == mediaMetadata.s3KeyName } ?: false - - if (existsInS3) return Failure("${mediaMetadata.s3KeyName} already uploaded.", mediaMetadata) +/** Upload to S3 if file not already uploaded */ +suspend fun S3Client.uploadToS3(mediaMetadata: MediaMetadata): UploadResult { + if (keyExists(bucketName, mediaMetadata.s3KeyName)) + return FileExistsError("${mediaMetadata.s3KeyName} already uploaded.", mediaMetadata) return try { - s3Client.putObject( + putObject( PutObjectRequest { bucket = bucketName key = mediaMetadata.s3KeyName @@ -92,11 +96,50 @@ suspend fun uploadToS3(s3Client: S3Client, mediaMetadata: MediaMetadata): Upload } ) Success("$bucketName/${mediaMetadata.s3KeyName}", mediaMetadata) - } catch (e: Exception) { + } catch (e: Exception) { // Checking Service Exception coming in future release UploadError(e, mediaMetadata) } } +/** Determine if a object exists in a bucket */ +suspend fun S3Client.keyExists(s3bucket: String, s3key: String) = + try { + headObject( + HeadObjectRequest { + bucket = s3bucket + key = s3key + } + ) + true + } catch (e: Exception) { // Checking Service Exception coming in future release + false + } + +/** Determine if a object exists in a bucket */ +suspend fun S3Client.bucketExists(s3bucket: String) = + try { + headBucket(HeadBucketRequest { bucket = s3bucket }) + true + } catch (e: Exception) { // Checking Service Exception coming in future release + false + } + +/** Move files to directories based on upload results */ +fun moveFiles(uploadResults: List<UploadResult>) = + uploadResults + .map { uploadResult -> uploadResult.mediaMetadata.file.toPath() to (uploadResult is Success) } + .forEach { (file, uploadSuccess) -> + val targetFilePath = if (uploadSuccess) completedDirPath else failedDirPath + val targetPath = File(targetFilePath) + Files.move(file, File(targetPath, file.fileName.toString()).toPath()) + } + +// Classes for S3 upload results +sealed class UploadResult { abstract val mediaMetadata: MediaMetadata } +data class Success(val location: String, override val mediaMetadata: MediaMetadata) : UploadResult() +data class UploadError(val error: Throwable, override val mediaMetadata: MediaMetadata) : UploadResult() +data class FileExistsError(val reason: String, override val mediaMetadata: MediaMetadata) : UploadResult() + // Classes, properties, and functions for media metadata data class MediaMetadata(val title: String, val year: Int, val file: File) val MediaMetadata.s3KeyName get() = "$title-$year" @@ -110,7 +153,7 @@ fun mediaMetadataExtractor(file: File): MediaMetadata? { return MediaMetadata(title, year.toInt(), file) } -// Validate file path and optionally create directory +/** Validate file path and optionally create directory */ fun validateDirectory(dirPath: String): File { val dir = File(dirPath) From 9f198c98f82af7e12c20c2ea0623398ff4c8b845 Mon Sep 17 00:00:00 2001 From: Ken Gilmer <kggilmer@amazon.com> Date: Fri, 7 May 2021 14:56:44 -0700 Subject: [PATCH 3/5] Tweaks to comments and constants --- examples/s3-media-ingestion/src/main/kotlin/Main.kt | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/s3-media-ingestion/src/main/kotlin/Main.kt b/examples/s3-media-ingestion/src/main/kotlin/Main.kt index 3e8bc0c2206..1c0e865e708 100644 --- a/examples/s3-media-ingestion/src/main/kotlin/Main.kt +++ b/examples/s3-media-ingestion/src/main/kotlin/Main.kt @@ -13,10 +13,11 @@ import java.io.File import java.nio.file.Files /** - * This program reads media files from a specified directory and uploads media files to S3 + * This program reads media files from a specified directory and uploads media files to S3. + * After uploading it will then download uploaded files back into a local directory. * - * This is purely an example. Any file with the extension `.avi` will be processed. To test - * create a text file and name it such that it matches the [filenameMetadataRegex] regex, ex: + * Any file with the extension `.avi` will be processed. To test create a text file and + * name it such that it matches the [filenameMetadataRegex] regex, ex: * `title_2000.avi`. * * When running the sample adjust the following path constants as needed for your local environment. @@ -25,7 +26,7 @@ const val bucketName = "s3-media-ingestion-example2" const val ingestionDirPath = "/tmp/media-in" const val completedDirPath = "/tmp/media-processed" const val failedDirPath = "/tmp/media-failed" -const val downloadDirPath = "/tmp/media-failed" +const val downloadDirPath = "/tmp/media-down" // media metadata is extracted from filename: <title>_<year>.avi val filenameMetadataRegex = "([\\w\\s]+)_([\\d]+).avi".toRegex() @@ -59,7 +60,9 @@ fun main(): Unit = runBlocking { // Download files to verify client.listObjects(ListObjectsRequest { bucket = bucketName }).contents?.forEach { obj -> client.getObject(GetObjectRequest { key = obj.key; bucket = bucketName }) { response -> - response.body?.writeToFile(File(downloadDirPath, obj.key!!)) + val outputFile = File(downloadDirPath, obj.key!!) + response.body?.writeToFile(outputFile) + println("Downloaded $outputFile from S3") } } } finally { From 03c18737d0888bf71e732609dbcfac73a72b71d5 Mon Sep 17 00:00:00 2001 From: Ken Gilmer <kggilmer@amazon.com> Date: Fri, 7 May 2021 15:01:23 -0700 Subject: [PATCH 4/5] Another tweak to avoid invalid log message in error case --- examples/s3-media-ingestion/src/main/kotlin/Main.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/s3-media-ingestion/src/main/kotlin/Main.kt b/examples/s3-media-ingestion/src/main/kotlin/Main.kt index 1c0e865e708..d4b78d0d694 100644 --- a/examples/s3-media-ingestion/src/main/kotlin/Main.kt +++ b/examples/s3-media-ingestion/src/main/kotlin/Main.kt @@ -61,8 +61,9 @@ fun main(): Unit = runBlocking { client.listObjects(ListObjectsRequest { bucket = bucketName }).contents?.forEach { obj -> client.getObject(GetObjectRequest { key = obj.key; bucket = bucketName }) { response -> val outputFile = File(downloadDirPath, obj.key!!) - response.body?.writeToFile(outputFile) - println("Downloaded $outputFile from S3") + response.body?.writeToFile(outputFile).also { + println("Downloaded $outputFile from S3") + } } } } finally { From 9e8973bafe61a9ad037ae953b9ca089046f61724 Mon Sep 17 00:00:00 2001 From: Ken Gilmer <kggilmer@amazon.com> Date: Fri, 7 May 2021 15:03:14 -0700 Subject: [PATCH 5/5] fix bucket name and log bytes downloaded --- examples/s3-media-ingestion/src/main/kotlin/Main.kt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/s3-media-ingestion/src/main/kotlin/Main.kt b/examples/s3-media-ingestion/src/main/kotlin/Main.kt index d4b78d0d694..0c405aa2284 100644 --- a/examples/s3-media-ingestion/src/main/kotlin/Main.kt +++ b/examples/s3-media-ingestion/src/main/kotlin/Main.kt @@ -22,7 +22,7 @@ import java.nio.file.Files * * When running the sample adjust the following path constants as needed for your local environment. */ -const val bucketName = "s3-media-ingestion-example2" +const val bucketName = "s3-media-ingestion-example" const val ingestionDirPath = "/tmp/media-in" const val completedDirPath = "/tmp/media-processed" const val failedDirPath = "/tmp/media-failed" @@ -61,8 +61,8 @@ fun main(): Unit = runBlocking { client.listObjects(ListObjectsRequest { bucket = bucketName }).contents?.forEach { obj -> client.getObject(GetObjectRequest { key = obj.key; bucket = bucketName }) { response -> val outputFile = File(downloadDirPath, obj.key!!) - response.body?.writeToFile(outputFile).also { - println("Downloaded $outputFile from S3") + response.body?.writeToFile(outputFile).also { size -> + println("Downloaded $outputFile ($size bytes) from S3") } } }