diff --git a/generated/src/aws-cpp-sdk-s3-crt/include/aws/s3-crt/S3CrtClient.h b/generated/src/aws-cpp-sdk-s3-crt/include/aws/s3-crt/S3CrtClient.h index 4e148850e49..68efcf4c9d2 100644 --- a/generated/src/aws-cpp-sdk-s3-crt/include/aws/s3-crt/S3CrtClient.h +++ b/generated/src/aws-cpp-sdk-s3-crt/include/aws/s3-crt/S3CrtClient.h @@ -6976,14 +6976,12 @@ namespace Aws std::shared_ptr clientShutdownSem; }; - static void CrtClientShutdownCallback(void *data) { - auto *wrappedData = static_cast(data); - if (wrappedData->fn) - { - wrappedData->fn(wrappedData->data); - } - wrappedData->clientShutdownSem->Release(); - } + static void CrtClientShutdownCallback(void *data); + void CancelCrtRequestAsync(aws_s3_meta_request *meta_request) const; + static int S3CrtRequestHeadersCallback(aws_s3_meta_request *meta_request, const struct aws_http_headers *headers, int response_status, void *user_data); + static int S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, uint64_t range_start, void *user_data); + static void S3CrtRequestProgressCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_progress *progress, void *user_data); + static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_result *meta_request_result, void *user_data); void InitCrtEndpointFromUri(aws_uri &endpoint_uri, const Aws::Http::URI &uri) const; diff --git a/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp b/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp index 820fa54224a..d72275f1f09 100644 --- a/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp +++ b/generated/src/aws-cpp-sdk-s3-crt/source/S3CrtClient.cpp @@ -517,28 +517,59 @@ void S3CrtClient::OverrideEndpoint(const Aws::String& endpoint) m_endpointProvider->OverrideEndpoint(endpoint); } +void S3CrtClient::CrtClientShutdownCallback(void *data) +{ + AWS_CHECK_PTR(SERVICE_NAME, data); + auto *wrappedData = static_cast(data); + if (wrappedData->fn) + { + wrappedData->fn(wrappedData->data); + } + AWS_CHECK_PTR(SERVICE_NAME, wrappedData->clientShutdownSem); + wrappedData->clientShutdownSem->Release(); +} + +void S3CrtClient::CancelCrtRequestAsync(aws_s3_meta_request *meta_request) const { + assert(meta_request); + m_clientConfiguration.executor->Submit([meta_request]() { + aws_s3_meta_request_cancel(meta_request); + }); +} -static int S3CrtRequestHeadersCallback(struct aws_s3_meta_request *meta_request, const struct aws_http_headers *headers, +int S3CrtClient::S3CrtRequestHeadersCallback(struct aws_s3_meta_request *meta_request, const struct aws_http_headers *headers, int response_status, void *user_data) { AWS_UNREFERENCED_PARAM(meta_request); auto *userData = static_cast(user_data); + if (!userData || !userData->response || !userData->originalRequest) { + return AWS_OP_ERR; + } + size_t headersCount = aws_http_headers_count(headers); - for (size_t i = 0; i < headersCount; i++) - { + for (size_t i = 0; i < headersCount; i++) { struct aws_http_header header; aws_http_headers_get_index(headers, i, &header); userData->response->AddHeader(StringUtils::FromByteCursor(header.name), StringUtils::FromByteCursor(header.value)); } userData->response->SetResponseCode(static_cast(response_status)); + + auto& shouldContinueFn = userData->originalRequest->GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + userData->s3CrtClient->CancelCrtRequestAsync(meta_request); + } + return AWS_OP_SUCCESS; } -static int S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, uint64_t range_start, void *user_data) +int S3CrtClient::S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, uint64_t range_start, void *user_data) { AWS_UNREFERENCED_PARAM(range_start); auto *userData = static_cast(user_data); + if (!userData || !userData->response || !userData->request) { + return AWS_OP_ERR; + } auto& bodyStream = userData->response->GetResponseBody(); bodyStream.write(reinterpret_cast(body->ptr), static_cast(body->len)); @@ -557,26 +588,90 @@ static int S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, receivedHandler(userData->request.get(), userData->response.get(), static_cast(body->len)); } AWS_LOGSTREAM_TRACE(ALLOCATION_TAG, body->len << " bytes written to response."); + auto& shouldContinueFn = userData->originalRequest->GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + userData->s3CrtClient->CancelCrtRequestAsync(meta_request); + } return AWS_OP_SUCCESS; } -static void S3CrtRequestProgressCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_progress *progress, void *user_data) +void S3CrtClient::S3CrtRequestProgressCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_progress *progress, void *user_data) { AWS_UNREFERENCED_PARAM(meta_request); + AWS_CHECK_PTR(SERVICE_NAME, user_data); auto *userData = static_cast(user_data); + AWS_CHECK_PTR(SERVICE_NAME, userData->request); auto& progressHandler = userData->request->GetDataSentEventHandler(); - if (progressHandler) - { - progressHandler(userData->request.get(), static_cast(progress->bytes_transferred)); + if (progressHandler) { + progressHandler(userData->request.get(), static_cast(progress->bytes_transferred)); } AWS_LOGSTREAM_TRACE(ALLOCATION_TAG, progress->bytes_transferred << " bytes transferred."); + AWS_CHECK_PTR(SERVICE_NAME, userData->originalRequest); + auto& shouldContinueFn = userData->originalRequest->GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + userData->s3CrtClient->CancelCrtRequestAsync(meta_request); + } return; } -static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, +CoreErrors MapCrtError(const int crtErrorCode) { + switch (crtErrorCode) { + case aws_s3_errors::AWS_ERROR_S3_REQUEST_HAS_COMPLETED: + return CoreErrors::OK; + case aws_s3_errors::AWS_ERROR_S3_MISSING_CONTENT_RANGE_HEADER: + case aws_s3_errors::AWS_ERROR_S3_MISSING_CONTENT_LENGTH_HEADER: + case aws_s3_errors::AWS_ERROR_S3_MISSING_ETAG: + case aws_s3_errors::AWS_ERROR_S3_MISSING_UPLOAD_ID: + return CoreErrors::MISSING_PARAMETER; + case aws_s3_errors::AWS_ERROR_S3_INVALID_CONTENT_RANGE_HEADER: + case aws_s3_errors::AWS_ERROR_S3_INVALID_CONTENT_LENGTH_HEADER: + case aws_s3_errors::AWS_ERROR_S3_INVALID_RANGE_HEADER: + case aws_s3_errors::AWS_ERROR_S3_MULTIRANGE_HEADER_UNSUPPORTED: + case aws_s3_errors::AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH: + case aws_s3_errors::AWS_ERROR_S3_INVALID_MEMORY_LIMIT_CONFIG: + return CoreErrors::INVALID_PARAMETER_VALUE; + case aws_s3_errors::AWS_ERROR_S3_INTERNAL_ERROR: + case aws_s3_errors::AWS_ERROR_S3_PROXY_PARSE_FAILED: + case aws_s3_errors::AWS_ERROR_S3_UNSUPPORTED_PROXY_SCHEME: + case aws_s3_errors::AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR: + case aws_s3_errors::AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE: + case aws_s3_errors::AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT: + return CoreErrors::INTERNAL_FAILURE; + case aws_s3_errors::AWS_ERROR_S3_SLOW_DOWN: + return CoreErrors::SLOW_DOWN; + case aws_s3_errors::AWS_ERROR_S3_INVALID_RESPONSE_STATUS: + case aws_s3_errors::AWS_ERROR_S3_RESPONSE_CHECKSUM_MISMATCH: + case aws_s3_errors::AWS_ERROR_S3_CHECKSUM_CALCULATION_FAILED: + case aws_s3_errors::AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED: + case aws_s3_errors::AWS_ERROR_S3_RESUMED_PART_CHECKSUM_MISMATCH: + case aws_s3_errors::AWS_ERROR_S3_FILE_MODIFIED: + case aws_s3_errors::AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE: + case aws_s3_errors::AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS: + case aws_s3_errors::AWS_ERROR_S3_RECV_FILE_NOT_FOUND: + return CoreErrors::VALIDATION; + case aws_s3_errors::AWS_ERROR_S3_CANCELED: + return CoreErrors::USER_CANCELLED; + case aws_s3_errors::AWS_ERROR_S3_PAUSED: + case aws_s3_errors::AWS_ERROR_S3_RESUME_FAILED: + case aws_s3_errors::AWS_ERROR_S3_OBJECT_MODIFIED: + return CoreErrors::UNKNOWN; + case aws_s3_errors::AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED: + return CoreErrors::REQUEST_TIME_TOO_SKEWED; + case aws_s3_errors::AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED: + return CoreErrors::CLIENT_SIGNING_FAILURE; + case aws_s3_errors::AWS_ERROR_S3_REQUEST_TIMEOUT: + return CoreErrors::REQUEST_TIMEOUT; + default: + return CoreErrors::INTERNAL_FAILURE; + } +} + +void S3CrtClient::S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_result *meta_request_result, void *user_data) { AWS_UNREFERENCED_PARAM(meta_request); @@ -623,7 +718,7 @@ static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, << " (" << aws_error_lib_name(meta_request_result->error_code) << ": " << aws_error_name(meta_request_result->error_code) << ")"; userData->response->SetClientErrorMessage(ss.str()); - userData->response->SetClientErrorType(CoreErrors::INTERNAL_FAILURE); + userData->response->SetClientErrorType(MapCrtError(meta_request_result->error_code)); } aws_s3_meta_request_release(meta_request); @@ -876,9 +971,16 @@ void S3CrtClient::CopyObjectAsync(const CopyObjectRequest& request, const CopyOb options.message= crtHttpRequest->GetUnderlyingMessage(); userData->crtHttpRequest = crtHttpRequest; - if (aws_s3_client_make_meta_request(m_s3CrtClient, &options) == nullptr) - { - return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); + aws_s3_meta_request* meta_request = aws_s3_client_make_meta_request(m_s3CrtClient, &options); + if (meta_request == nullptr) { + return handler(this, request, + CopyObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", + "Unable to create s3 meta request", false)), handlerContext); + } + auto& shouldContinueFn = request.GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + aws_s3_meta_request_cancel(meta_request); } } @@ -1021,9 +1123,16 @@ void S3CrtClient::GetObjectAsync(const GetObjectRequest& request, const GetObjec options.message= crtHttpRequest->GetUnderlyingMessage(); userData->crtHttpRequest = crtHttpRequest; - if (aws_s3_client_make_meta_request(m_s3CrtClient, &options) == nullptr) - { - return handler(this, request, GetObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); + aws_s3_meta_request* meta_request = aws_s3_client_make_meta_request(m_s3CrtClient, &options); + if (meta_request == nullptr) { + return handler(this, request, + GetObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", + "Unable to create s3 meta request", false)), handlerContext); + } + auto& shouldContinueFn = request.GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + aws_s3_meta_request_cancel(meta_request); } } @@ -1201,9 +1310,16 @@ void S3CrtClient::PutObjectAsync(const PutObjectRequest& request, const PutObjec options.message= crtHttpRequest->GetUnderlyingMessage(); userData->crtHttpRequest = crtHttpRequest; - if (aws_s3_client_make_meta_request(m_s3CrtClient, &options) == nullptr) - { - return handler(this, request, PutObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); + aws_s3_meta_request* meta_request = aws_s3_client_make_meta_request(m_s3CrtClient, &options); + if (meta_request == nullptr) { + return handler(this, request, + PutObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", + "Unable to create s3 meta request", false)), handlerContext); + } + auto& shouldContinueFn = request.GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + aws_s3_meta_request_cancel(meta_request); } } diff --git a/tests/aws-cpp-sdk-s3-crt-integration-tests/CMakeLists.txt b/tests/aws-cpp-sdk-s3-crt-integration-tests/CMakeLists.txt index e0b70c69f01..a87c8b3ed89 100644 --- a/tests/aws-cpp-sdk-s3-crt-integration-tests/CMakeLists.txt +++ b/tests/aws-cpp-sdk-s3-crt-integration-tests/CMakeLists.txt @@ -20,6 +20,7 @@ foreach(TEST IN LISTS TEST_LIST) "${TEST_MAIN_FILE}" "BucketAndObjectOperationTest.cpp" "S3ExpressTest.cpp" + "CancelCrtRequestTest.cpp" ) file(GLOB AWS_S3_CRT_INTEGRATION_TESTS_SRC diff --git a/tests/aws-cpp-sdk-s3-crt-integration-tests/CancelCrtRequestTest.cpp b/tests/aws-cpp-sdk-s3-crt-integration-tests/CancelCrtRequestTest.cpp new file mode 100644 index 00000000000..df80374d3f4 --- /dev/null +++ b/tests/aws-cpp-sdk-s3-crt-integration-tests/CancelCrtRequestTest.cpp @@ -0,0 +1,273 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#ifdef _WIN32 +#pragma warning(disable : 4127) +#ifdef GetObject +#undef GetObject +#endif +#endif + +using namespace Aws; +using namespace Aws::Client; +using namespace Aws::Http; +using namespace Aws::S3Crt; +using namespace Aws::S3Crt::Model; +using namespace Aws::Utils; + +namespace { +const char ALLOCATION_TAG[] = "CancelCrtRequestTest"; +const char BUCKET_NAME[] = "test-cancel-crt-request"; + +class CancelCrtRequestTest : public ::testing::Test { + public: + protected: + void SetUp() override { + S3Crt::ClientConfiguration configuration; + configuration.region = "us-east-1"; + m_client = Aws::MakeShared(ALLOCATION_TAG, configuration); + + m_bucketName = CalculateBucketName(BUCKET_NAME); + SCOPED_TRACE(Aws::String("FullBucketName ") + m_bucketName); + CreateBucketRequest createBucketRequest; + createBucketRequest.SetBucket(m_bucketName); + createBucketRequest.SetACL(BucketCannedACL::private_); + + CreateBucketOutcome createBucketOutcome = m_client->CreateBucket(createBucketRequest); + AWS_EXPECT_SUCCESS(createBucketOutcome); + EXPECT_TRUE(WaitForBucketToPropagate(m_bucketName)); + TagTestBucket(m_bucketName); + } + + void TearDown() override { + DeleteBucket(m_bucketName); + m_client.reset(); + } + + void EmptyBucket(const Aws::String& bucketName) { + ListObjectsRequest listObjectsRequest; + listObjectsRequest.SetBucket(bucketName); + + ListObjectsOutcome listObjectsOutcome = m_client->ListObjects(listObjectsRequest); + + if (!listObjectsOutcome.IsSuccess()) return; + + for (const auto& object : listObjectsOutcome.GetResult().GetContents()) { + DeleteObjectRequest deleteObjectRequest; + deleteObjectRequest.SetBucket(bucketName); + deleteObjectRequest.SetKey(object.GetKey()); + auto deleteOutcome = m_client->DeleteObject(deleteObjectRequest); + AWS_UNREFERENCED_PARAM(deleteOutcome); + } + } + + void WaitForBucketToEmpty(const Aws::String& bucketName) { + ListObjectsRequest listObjectsRequest; + listObjectsRequest.SetBucket(bucketName); + + unsigned checkForObjectsCount = 0; + static const int TIMEOUT_MAX = 20; + while (checkForObjectsCount++ < TIMEOUT_MAX) { + ListObjectsOutcome listObjectsOutcome = m_client->ListObjects(listObjectsRequest); + AWS_ASSERT_SUCCESS(listObjectsOutcome); + + if (!listObjectsOutcome.GetResult().GetContents().empty()) { + std::this_thread::sleep_for(std::chrono::seconds(5)); + } else { + break; + } + } + } + + void DeleteBucket(const Aws::String& bucketName) { + HeadBucketRequest headBucketRequest; + headBucketRequest.SetBucket(bucketName); + HeadBucketOutcome bucketOutcome = m_client->HeadBucket(headBucketRequest); + + if (bucketOutcome.IsSuccess()) { + EmptyBucket(bucketName); + WaitForBucketToEmpty(bucketName); + + DeleteBucketRequest deleteBucketRequest; + deleteBucketRequest.SetBucket(bucketName); + + auto deleteBucketOutcome = + CallOperationWithUnconditionalRetry(m_client.get(), &Aws::S3Crt::S3CrtClient::DeleteBucket, deleteBucketRequest); + AWS_ASSERT_SUCCESS(deleteBucketOutcome); + } + } + + bool WaitForBucketToPropagate(const Aws::String& bucketName) { + unsigned timeoutCount = 0; + static const int TIMEOUT_MAX = 20; + while (timeoutCount++ < TIMEOUT_MAX) { + ListObjectsRequest listObjectsRequest; + listObjectsRequest.SetBucket(bucketName); + ListObjectsOutcome listObjectsOutcome = m_client->ListObjects(listObjectsRequest); + if (listObjectsOutcome.IsSuccess()) { + return true; + } + + std::this_thread::sleep_for(std::chrono::seconds(10)); + } + + return false; + } + + void TagTestBucket(const Aws::String& bucketName) { + ASSERT_TRUE(!bucketName.empty()); + ASSERT_TRUE(m_client); + + PutBucketTaggingRequest taggingRequest; + taggingRequest.SetBucket(bucketName); + Tag tag; + static const char* TEST_BUCKET_TAG = "IntegrationTestResource"; + tag.SetKey(TEST_BUCKET_TAG); + tag.SetValue(TEST_BUCKET_TAG); + Tagging tagging; + tagging.AddTagSet(tag); + taggingRequest.SetTagging(tagging); + + auto taggingOutcome = CallOperationWithUnconditionalRetry(m_client.get(), &Aws::S3Crt::S3CrtClient::PutBucketTagging, taggingRequest); + AWS_ASSERT_SUCCESS(taggingOutcome); + } + + static Aws::String CalculateBucketName(const Aws::String& bucketPrefix) { return Aws::Testing::GetAwsResourcePrefix() + bucketPrefix; } + + static Aws::String randomString() { return StringUtils::ToLower(Aws::String(UUID::RandomUUID()).c_str()); } + + std::shared_ptr m_client; + Aws::String m_bucketName; +}; + +TEST_F(CancelCrtRequestTest, ShouldCancelCrtRequest) { + const char TEST_KEY[] = "should-cancel-crt-request"; + // Put something + { + PutObjectRequest putObjectRequest1; + putObjectRequest1.SetBucket(m_bucketName); + std::shared_ptr objectStream = Aws::MakeShared("CancelCrtRequestTest"); + *objectStream << "Test Object First Call"; + putObjectRequest1.SetBody(objectStream); + putObjectRequest1.SetKey(TEST_KEY); + putObjectRequest1.SetContentType("text/plain"); + + PutObjectOutcome putObjectOutcome1 = m_client->PutObject(putObjectRequest1); + AWS_ASSERT_SUCCESS(putObjectOutcome1); + } + + // Try to put something else but abort the operation + { + /** + * Just a test StreamBuf that won't give anything until "ready" + */ + class NotifyingTestStream : public Aws::Utils::Stream::SimpleStreamBuf { + public: + std::function m_onReadCallback; + + explicit NotifyingTestStream(const Aws::String& value, std::function&& onReadCallback) + : SimpleStreamBuf(value), m_onReadCallback(std::move(onReadCallback)) {} + + protected: + std::streamsize xsgetn(char_type* s, std::streamsize count) override { + m_onReadCallback(); + + return SimpleStreamBuf::xsgetn(s, count); + } + }; + + PutObjectRequest putObjectRequest2; + putObjectRequest2.SetBucket(m_bucketName); + putObjectRequest2.SetKey(TEST_KEY); + std::atomic shouldContinueAtomic{true}; + putObjectRequest2.SetContinueRequestHandler([&shouldContinueAtomic](const HttpRequest*) { return shouldContinueAtomic.load(); }); + + static const uint32_t tenMB = 5 * 1024 * 1024; + Aws::String largePayloadToBeSplitIntoMultiPart; + while (largePayloadToBeSplitIntoMultiPart.size() < tenMB) { + largePayloadToBeSplitIntoMultiPart += "Test Object Second Call\n"; + } + NotifyingTestStream testStream(largePayloadToBeSplitIntoMultiPart, [&shouldContinueAtomic]() { shouldContinueAtomic.store(false); }); + std::shared_ptr objectStream = Aws::MakeShared("CancelCrtRequestTest", &testStream); + + putObjectRequest2.SetBody(objectStream); + putObjectRequest2.SetContentType("text/plain"); + + PutObjectOutcome putObjectOutcome2; + std::mutex mtx; + std::condition_variable cv; + bool handlerCalled = false; + auto asyncHandler = [&putObjectOutcome2, &mtx, &cv, &handlerCalled](const S3CrtClient*, const Model::PutObjectRequest&, + const Model::PutObjectOutcome& outcome, + const std::shared_ptr&) { + std::unique_lock lock(mtx); + + putObjectOutcome2 = outcome; + handlerCalled = true; + cv.notify_one(); + }; + m_client->PutObjectAsync(putObjectRequest2, asyncHandler); + if (!handlerCalled) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&handlerCalled]() { return handlerCalled; }); + } + ASSERT_TRUE(handlerCalled) << "User handler of async operatioin PutObjectAsync was not called within 60 seconds!"; + + ASSERT_FALSE(putObjectOutcome2.IsSuccess()); + ASSERT_EQ((CoreErrors)putObjectOutcome2.GetError().GetErrorType(), CoreErrors::USER_CANCELLED); + ASSERT_EQ(putObjectOutcome2.GetError().GetMessage(), "Request successfully cancelled (aws-c-s3: AWS_ERROR_S3_CANCELED)"); + } + + // Try to get already cancelled request + { + GetObjectRequest getObjectRequest; + getObjectRequest.SetBucket(m_bucketName); + getObjectRequest.SetKey(TEST_KEY); + std::atomic shouldContinueAtomic{false}; + getObjectRequest.SetContinueRequestHandler([&shouldContinueAtomic](const HttpRequest*) { return shouldContinueAtomic.load(); }); + + GetObjectOutcome getObjectOutcome; + std::mutex mtx; + std::condition_variable cv; + bool handlerCalled = false; + auto asyncHandler = [&getObjectOutcome, &mtx, &cv, &handlerCalled](const S3CrtClient*, const Model::GetObjectRequest&, + const Model::GetObjectOutcome& outcome, + const std::shared_ptr&) { + std::unique_lock lock(mtx); + + getObjectOutcome = outcome.GetError(); + handlerCalled = true; + cv.notify_one(); + }; + m_client->GetObjectAsync(getObjectRequest, asyncHandler); + if (!handlerCalled) { + std::unique_lock lock(mtx); + cv.wait_for(lock, std::chrono::seconds(60), [&handlerCalled]() { return handlerCalled; }); + } + ASSERT_TRUE(handlerCalled) << "User handler of async operatioin GetObjectAsync was not called within 60 seconds!"; + + ASSERT_FALSE(getObjectOutcome.IsSuccess()); + ASSERT_EQ((CoreErrors)getObjectOutcome.GetError().GetErrorType(), CoreErrors::USER_CANCELLED); + ASSERT_EQ(getObjectOutcome.GetError().GetMessage(), "Request successfully cancelled (aws-c-s3: AWS_ERROR_S3_CANCELED)"); + } +} +} // namespace diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm index 22d8500e65d..ebb82274db3 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/S3ClientHeader.vm @@ -225,14 +225,12 @@ namespace ${rootNamespace} std::shared_ptr clientShutdownSem; }; - static void CrtClientShutdownCallback(void *data) { - auto *wrappedData = static_cast(data); - if (wrappedData->fn) - { - wrappedData->fn(wrappedData->data); - } - wrappedData->clientShutdownSem->Release(); - } + static void CrtClientShutdownCallback(void *data); + void CancelCrtRequestAsync(aws_s3_meta_request *meta_request) const; + static int S3CrtRequestHeadersCallback(aws_s3_meta_request *meta_request, const struct aws_http_headers *headers, int response_status, void *user_data); + static int S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, uint64_t range_start, void *user_data); + static void S3CrtRequestProgressCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_progress *progress, void *user_data); + static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_result *meta_request_result, void *user_data); void InitCrtEndpointFromUri(aws_uri &endpoint_uri, const Aws::Http::URI &uri) const; diff --git a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm index 5e286cc5e63..eee83350be0 100644 --- a/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm +++ b/tools/code-generation/generator/src/main/resources/com/amazonaws/util/awsclientgenerator/velocity/cpp/s3/s3-crt/S3CrtSpecificOperations.vm @@ -1,26 +1,57 @@ #if(!${onlyGeneratedOperations}) +void S3CrtClient::CrtClientShutdownCallback(void *data) +{ + AWS_CHECK_PTR(SERVICE_NAME, data); + auto *wrappedData = static_cast(data); + if (wrappedData->fn) + { + wrappedData->fn(wrappedData->data); + } + AWS_CHECK_PTR(SERVICE_NAME, wrappedData->clientShutdownSem); + wrappedData->clientShutdownSem->Release(); +} -static int S3CrtRequestHeadersCallback(struct aws_s3_meta_request *meta_request, const struct aws_http_headers *headers, +void S3CrtClient::CancelCrtRequestAsync(aws_s3_meta_request *meta_request) const { + assert(meta_request); + m_clientConfiguration.executor->Submit([meta_request]() { + aws_s3_meta_request_cancel(meta_request); + }); +} + +int S3CrtClient::S3CrtRequestHeadersCallback(struct aws_s3_meta_request *meta_request, const struct aws_http_headers *headers, int response_status, void *user_data) { AWS_UNREFERENCED_PARAM(meta_request); auto *userData = static_cast(user_data); + if (!userData || !userData->response || !userData->originalRequest) { + return AWS_OP_ERR; + } + size_t headersCount = aws_http_headers_count(headers); - for (size_t i = 0; i < headersCount; i++) - { + for (size_t i = 0; i < headersCount; i++) { struct aws_http_header header; aws_http_headers_get_index(headers, i, &header); userData->response->AddHeader(StringUtils::FromByteCursor(header.name), StringUtils::FromByteCursor(header.value)); } userData->response->SetResponseCode(static_cast(response_status)); + + auto& shouldContinueFn = userData->originalRequest->GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + userData->s3CrtClient->CancelCrtRequestAsync(meta_request); + } + return AWS_OP_SUCCESS; } -static int S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, uint64_t range_start, void *user_data) +int S3CrtClient::S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, const struct aws_byte_cursor *body, uint64_t range_start, void *user_data) { AWS_UNREFERENCED_PARAM(range_start); auto *userData = static_cast(user_data); + if (!userData || !userData->response || !userData->request) { + return AWS_OP_ERR; + } auto& bodyStream = userData->response->GetResponseBody(); bodyStream.write(reinterpret_cast(body->ptr), static_cast(body->len)); @@ -39,26 +70,90 @@ static int S3CrtRequestGetBodyCallback(struct aws_s3_meta_request *meta_request, receivedHandler(userData->request.get(), userData->response.get(), static_cast(body->len)); } AWS_LOGSTREAM_TRACE(ALLOCATION_TAG, body->len << " bytes written to response."); + auto& shouldContinueFn = userData->originalRequest->GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + userData->s3CrtClient->CancelCrtRequestAsync(meta_request); + } return AWS_OP_SUCCESS; } -static void S3CrtRequestProgressCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_progress *progress, void *user_data) +void S3CrtClient::S3CrtRequestProgressCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_progress *progress, void *user_data) { AWS_UNREFERENCED_PARAM(meta_request); + AWS_CHECK_PTR(SERVICE_NAME, user_data); auto *userData = static_cast(user_data); + AWS_CHECK_PTR(SERVICE_NAME, userData->request); auto& progressHandler = userData->request->GetDataSentEventHandler(); - if (progressHandler) - { - progressHandler(userData->request.get(), static_cast(progress->bytes_transferred)); + if (progressHandler) { + progressHandler(userData->request.get(), static_cast(progress->bytes_transferred)); } AWS_LOGSTREAM_TRACE(ALLOCATION_TAG, progress->bytes_transferred << " bytes transferred."); + AWS_CHECK_PTR(SERVICE_NAME, userData->originalRequest); + auto& shouldContinueFn = userData->originalRequest->GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + userData->s3CrtClient->CancelCrtRequestAsync(meta_request); + } return; } -static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, +CoreErrors MapCrtError(const int crtErrorCode) { + switch (crtErrorCode) { + case aws_s3_errors::AWS_ERROR_S3_REQUEST_HAS_COMPLETED: + return CoreErrors::OK; + case aws_s3_errors::AWS_ERROR_S3_MISSING_CONTENT_RANGE_HEADER: + case aws_s3_errors::AWS_ERROR_S3_MISSING_CONTENT_LENGTH_HEADER: + case aws_s3_errors::AWS_ERROR_S3_MISSING_ETAG: + case aws_s3_errors::AWS_ERROR_S3_MISSING_UPLOAD_ID: + return CoreErrors::MISSING_PARAMETER; + case aws_s3_errors::AWS_ERROR_S3_INVALID_CONTENT_RANGE_HEADER: + case aws_s3_errors::AWS_ERROR_S3_INVALID_CONTENT_LENGTH_HEADER: + case aws_s3_errors::AWS_ERROR_S3_INVALID_RANGE_HEADER: + case aws_s3_errors::AWS_ERROR_S3_MULTIRANGE_HEADER_UNSUPPORTED: + case aws_s3_errors::AWS_ERROR_S3_INCORRECT_CONTENT_LENGTH: + case aws_s3_errors::AWS_ERROR_S3_INVALID_MEMORY_LIMIT_CONFIG: + return CoreErrors::INVALID_PARAMETER_VALUE; + case aws_s3_errors::AWS_ERROR_S3_INTERNAL_ERROR: + case aws_s3_errors::AWS_ERROR_S3_PROXY_PARSE_FAILED: + case aws_s3_errors::AWS_ERROR_S3_UNSUPPORTED_PROXY_SCHEME: + case aws_s3_errors::AWS_ERROR_S3_NON_RECOVERABLE_ASYNC_ERROR: + case aws_s3_errors::AWS_ERROR_S3_METRIC_DATA_NOT_AVAILABLE: + case aws_s3_errors::AWS_ERROR_S3_EXCEEDS_MEMORY_LIMIT: + return CoreErrors::INTERNAL_FAILURE; + case aws_s3_errors::AWS_ERROR_S3_SLOW_DOWN: + return CoreErrors::SLOW_DOWN; + case aws_s3_errors::AWS_ERROR_S3_INVALID_RESPONSE_STATUS: + case aws_s3_errors::AWS_ERROR_S3_RESPONSE_CHECKSUM_MISMATCH: + case aws_s3_errors::AWS_ERROR_S3_CHECKSUM_CALCULATION_FAILED: + case aws_s3_errors::AWS_ERROR_S3_LIST_PARTS_PARSE_FAILED: + case aws_s3_errors::AWS_ERROR_S3_RESUMED_PART_CHECKSUM_MISMATCH: + case aws_s3_errors::AWS_ERROR_S3_FILE_MODIFIED: + case aws_s3_errors::AWS_ERROR_S3_INTERNAL_PART_SIZE_MISMATCH_RETRYING_WITH_RANGE: + case aws_s3_errors::AWS_ERROR_S3_RECV_FILE_ALREADY_EXISTS: + case aws_s3_errors::AWS_ERROR_S3_RECV_FILE_NOT_FOUND: + return CoreErrors::VALIDATION; + case aws_s3_errors::AWS_ERROR_S3_CANCELED: + return CoreErrors::USER_CANCELLED; + case aws_s3_errors::AWS_ERROR_S3_PAUSED: + case aws_s3_errors::AWS_ERROR_S3_RESUME_FAILED: + case aws_s3_errors::AWS_ERROR_S3_OBJECT_MODIFIED: + return CoreErrors::UNKNOWN; + case aws_s3_errors::AWS_ERROR_S3_REQUEST_TIME_TOO_SKEWED: + return CoreErrors::REQUEST_TIME_TOO_SKEWED; + case aws_s3_errors::AWS_ERROR_S3EXPRESS_CREATE_SESSION_FAILED: + return CoreErrors::CLIENT_SIGNING_FAILURE; + case aws_s3_errors::AWS_ERROR_S3_REQUEST_TIMEOUT: + return CoreErrors::REQUEST_TIMEOUT; + default: + return CoreErrors::INTERNAL_FAILURE; + } +} + +void S3CrtClient::S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, const struct aws_s3_meta_request_result *meta_request_result, void *user_data) { AWS_UNREFERENCED_PARAM(meta_request); @@ -105,7 +200,7 @@ static void S3CrtRequestFinishCallback(struct aws_s3_meta_request *meta_request, << " (" << aws_error_lib_name(meta_request_result->error_code) << ": " << aws_error_name(meta_request_result->error_code) << ")"; userData->response->SetClientErrorMessage(ss.str()); - userData->response->SetClientErrorType(CoreErrors::INTERNAL_FAILURE); + userData->response->SetClientErrorType(MapCrtError(meta_request_result->error_code)); } aws_s3_meta_request_release(meta_request); @@ -418,15 +513,16 @@ void ${className}::${operation.name}Async(${constText}${operation.request.shape. options.message= crtHttpRequest->GetUnderlyingMessage(); userData->crtHttpRequest = crtHttpRequest; - if (aws_s3_client_make_meta_request(m_s3CrtClient, &options) == nullptr) - { -#if($operation.name == "PutObject") - return handler(this, request, PutObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); -#elseif($operation.name == "GetObject") - return handler(this, request, GetObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); -#elseif($operation.name == "CopyObject") - return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); -#end + aws_s3_meta_request* meta_request = aws_s3_client_make_meta_request(m_s3CrtClient, &options); + if (meta_request == nullptr) { + return handler(this, request, + ${operation.name}Outcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", + "Unable to create s3 meta request", false)), handlerContext); + } + auto& shouldContinueFn = request.GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + aws_s3_meta_request_cancel(meta_request); } } @@ -505,15 +601,16 @@ void ${className}::${operation.name}Async(${constText}${operation.name}ResponseR options.message= crtHttpRequest->GetUnderlyingMessage(); userData->crtHttpRequest = crtHttpRequest; - if (aws_s3_client_make_meta_request(m_s3CrtClient, &options) == nullptr) - { -#if($operation.name == "PutObject") - return handler(this, request, PutObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); -#elseif($operation.name == "GetObject") - return handler(this, request, GetObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); -#elseif($operation.name == "CopyObject") - return handler(this, request, CopyObjectOutcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", "Unable to create s3 meta request", false)), handlerContext); -#end + aws_s3_meta_request* meta_request = aws_s3_client_make_meta_request(m_s3CrtClient, &options); + if (meta_request == nullptr) { + return handler(this, request, + ${operation.name}Outcome(Aws::Client::AWSError(S3CrtErrors::INTERNAL_FAILURE, "INTERNAL_FAILURE", + "Unable to create s3 meta request", false)), handlerContext); + } + auto& shouldContinueFn = request.GetContinueRequestHandler(); + const HttpRequest* httpRequest = userData->request ? userData->request.get() : nullptr; + if (shouldContinueFn && !shouldContinueFn(httpRequest)) { + aws_s3_meta_request_cancel(meta_request); } }