Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/34fc4fc2-ad2d-4264-976c-5013324c10b7.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"id": "34fc4fc2-ad2d-4264-976c-5013324c10b7",
"type": "feature",
"description": "Add support for event streams",
"issues": [
"awslabs/aws-sdk-kotlin#543"
]
}
2 changes: 0 additions & 2 deletions codegen/sdk/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ data class AwsService(


val disabledServices = setOf(
// Only contains event streams
"transcribe-streaming",
// timestream requires endpoint discovery
// https://github.com/awslabs/smithy-kotlin/issues/146
"timestream-write",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@

package aws.sdk.kotlin.codegen.customization

import software.amazon.smithy.aws.traits.protocols.RestJson1Trait
import software.amazon.smithy.aws.traits.protocols.RestXmlTrait
import software.amazon.smithy.kotlin.codegen.KotlinSettings
import software.amazon.smithy.kotlin.codegen.integration.KotlinIntegration
import software.amazon.smithy.kotlin.codegen.model.expectShape
import software.amazon.smithy.kotlin.codegen.model.findStreamingMember
import software.amazon.smithy.kotlin.codegen.utils.getOrNull
import software.amazon.smithy.model.Model
import software.amazon.smithy.model.knowledge.ServiceIndex
import software.amazon.smithy.model.shapes.OperationShape
import software.amazon.smithy.model.shapes.ShapeId
import software.amazon.smithy.model.shapes.StructureShape
import software.amazon.smithy.model.transform.ModelTransformer
import java.util.logging.Logger
Expand All @@ -25,13 +27,18 @@ class RemoveEventStreamOperations : KotlinIntegration {
override val order: Byte = -127
private val logger = Logger.getLogger(javaClass.name)

private val supportedServiceIds = setOf(
// integration tests
"aws.sdk.kotlin.test.eventstream#TestService",
).map(ShapeId::from).toSet()
private val supportedProtocols = setOf(
RestXmlTrait.ID,
RestJson1Trait.ID,
)
override fun enabledForService(model: Model, settings: KotlinSettings): Boolean {
val serviceIndex = ServiceIndex(model)
val protocols = serviceIndex.getProtocols(settings.service)
.values
.map { it.toShapeId() }

override fun enabledForService(model: Model, settings: KotlinSettings): Boolean =
settings.service !in supportedServiceIds
return protocols.any { it !in supportedProtocols }
}

override fun preprocessModel(model: Model, settings: KotlinSettings): Model =
ModelTransformer.create().filterShapes(model) { parentShape ->
Expand Down
9 changes: 4 additions & 5 deletions services/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,14 @@ subprojects {

if (project.file("e2eTest").exists()) {
jvm().compilations {
val main by getting
val e2eTest by creating {
defaultSourceSet {
kotlin.srcDir("e2eTest")
kotlin.srcDir("e2eTest/src")
resources.srcDir("e2eTest/test-resources")
dependsOn(sourceSets.getByName("commonMain"))
dependsOn(sourceSets.getByName("jvmMain"))

dependencies {
// Compile against the main compilation's compile classpath and outputs:
implementation(main.compileDependencyFiles + main.runtimeDependencyFiles + main.output.classesDirs)

implementation(kotlin("test"))
implementation(kotlin("test-junit5"))
implementation(project(":aws-runtime:testing"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,8 @@
*/
package aws.sdk.kotlin.e2etest

import aws.sdk.kotlin.services.s3.S3Client
import aws.sdk.kotlin.services.s3.completeMultipartUpload
import aws.sdk.kotlin.services.s3.createMultipartUpload
import aws.sdk.kotlin.services.s3.listObjects
import aws.sdk.kotlin.services.s3.model.CompletedPart
import aws.sdk.kotlin.services.s3.model.GetObjectRequest
import aws.sdk.kotlin.services.s3.putObject
import aws.sdk.kotlin.services.s3.uploadPart
import aws.sdk.kotlin.services.s3.*
import aws.sdk.kotlin.services.s3.model.*
import aws.sdk.kotlin.testing.PRINTABLE_CHARS
import aws.sdk.kotlin.testing.withAllEngines
import aws.smithy.kotlin.runtime.content.ByteStream
Expand All @@ -23,13 +17,15 @@ import aws.smithy.kotlin.runtime.hashing.sha256
import aws.smithy.kotlin.runtime.testing.RandomTempFile
import aws.smithy.kotlin.runtime.util.encodeToHex
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.toList
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.TestInstance
import java.io.File
import java.util.UUID
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.time.Duration.Companion.seconds
import kotlin.time.ExperimentalTime

Expand Down Expand Up @@ -217,6 +213,62 @@ class S3BucketOpsIntegrationTest {
assertEquals(expectedSha256, actualSha256)
}
}

@Test
fun testSelectObjectEventStream(): Unit = runBlocking {
S3Client.fromEnvironment().use { s3 ->
// upload our content to select from
val objKey = "developers.csv"

val content = """
Name,PhoneNumber,City,Occupation
Sam,(949) 555-6701,Irvine,Solutions Architect
Vinod,(949) 555-6702,Los Angeles,Solutions Architect
Jeff,(949) 555-6703,Seattle,AWS Evangelist
Jane,(949) 555-6704,Chicago,Developer
Sean,(949) 555-6705,Indianapolis,Developer
Mary,(949) 555-6706,Detroit,Developer
Kate,(949) 555-6707,Boston,Solutions Architect
""".trimIndent()

s3.putObject {
bucket = testBucket
key = objKey
body = ByteStream.fromString(content)
}

// select content as an event stream
val req = SelectObjectContentRequest {
bucket = testBucket
key = objKey
expressionType = ExpressionType.Sql
expression = """SELECT * FROM s3object s where s."Name" = 'Jane'"""
inputSerialization {
csv {
fileHeaderInfo = FileHeaderInfo.Use
}
compressionType = CompressionType.None
}
outputSerialization {
csv { }
}
}

val events = s3.selectObjectContent(req) { resp ->
// collect flow to list
resp.payload!!.toList()
}

assertEquals(3, events.size)

val records = assertIs<SelectObjectContentEventStream.Records>(events[0])
assertIs<SelectObjectContentEventStream.Stats>(events[1])
assertIs<SelectObjectContentEventStream.End>(events[2])

val expectedRecord = "Jane,(949) 555-6704,Chicago,Developer\n"
assertEquals(expectedRecord, records.value.payload?.decodeToString())
}
}
}

// generate sequence of "chunks" where each range defines the inclusive start and end bytes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
package aws.sdk.kotlin.e2etest

import aws.sdk.kotlin.services.transcribestreaming.TranscribeStreamingClient
import aws.sdk.kotlin.services.transcribestreaming.model.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.TestInstance
import java.io.File
import java.nio.file.Paths
import javax.sound.sampled.AudioSystem
import kotlin.test.assertTrue

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class TranscribeStreamingIntegrationTest {

@Test
fun testTranscribeEventStream(): Unit = runBlocking {
val url = this::class.java.classLoader.getResource("hello-kotlin-8000.wav") ?: error("failed to load test resource")
val audioFile = Paths.get(url.toURI()).toFile()

TranscribeStreamingClient { region = "us-east-2" }.use { client ->
val transcript = getTranscript(client, audioFile)
assertTrue(transcript.startsWith("Hello from", true), "full transcript: $transcript")
}
}
}

private const val FRAMES_PER_CHUNK = 4096

private fun audioStreamFromFile(file: File): Flow<AudioStream> {
val format = AudioSystem.getAudioFileFormat(file)
val ais = AudioSystem.getAudioInputStream(file)
val bytesPerFrame = ais.format.frameSize
println("audio stream format of $file: $format; bytesPerFrame=$bytesPerFrame")

return flow {
while (true) {
val frameBuffer = ByteArray(FRAMES_PER_CHUNK * bytesPerFrame)
val rc = ais.read(frameBuffer)
if (rc <= 0) {
break
}

val chunk = if (rc < frameBuffer.size) frameBuffer.sliceArray(0 until rc) else frameBuffer
val event = AudioStream.AudioEvent(
AudioEvent {
audioChunk = chunk
},
)

println("emitting event")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove println.

emit(event)
}
}.flowOn(Dispatchers.IO)
}

private suspend fun getTranscript(client: TranscribeStreamingClient, audioFile: File): String {
val req = StartStreamTranscriptionRequest {
languageCode = LanguageCode.EnUs
mediaSampleRateHertz = 8000
mediaEncoding = MediaEncoding.Pcm
audioStream = audioStreamFromFile(audioFile)
}

val transcript = client.startStreamTranscription(req) { resp ->
val fullMessage = StringBuilder()
resp.transcriptResultStream?.collect { event ->
when (event) {
is TranscriptResultStream.TranscriptEvent -> {
event.value.transcript?.results?.forEach { result ->
val transcript = result.alternatives?.firstOrNull()?.transcript
println("received TranscriptEvent: isPartial=${result.isPartial}; transcript=$transcript")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Remove println.

if (!result.isPartial) {
transcript?.let { fullMessage.append(it) }
}
}
}
else -> error("unknown event $event")
}
}
fullMessage.toString()
}

return transcript
}
Binary file not shown.