From b7b5e1bdc4181ff2dd198de4640e94f30323b024 Mon Sep 17 00:00:00 2001 From: RainMark Date: Wed, 23 Mar 2022 14:56:12 +0800 Subject: [PATCH] switch-off uthread on not supported platform (#66) Signed-off-by: Rain Mark --- async_simple/CMakeLists.txt | 60 +++++++++++-------- async_simple/Future.h | 40 ++----------- async_simple/coro/test/CMakeLists.txt | 2 +- async_simple/executors/test/CMakeLists.txt | 2 +- .../executors/test/SimpleIOExecutorTest.cpp | 2 + async_simple/test/CMakeLists.txt | 2 +- async_simple/uthread/Await.h | 38 ++++++++++-- async_simple/uthread/Latch.h | 4 +- async_simple/uthread/test/UthreadTest.cpp | 10 ++-- async_simple/util/test/CMakeLists.txt | 2 +- demo_example/CMakeLists.txt | 22 ++++--- 11 files changed, 101 insertions(+), 83 deletions(-) diff --git a/async_simple/CMakeLists.txt b/async_simple/CMakeLists.txt index 4e614770d..92c006dd6 100644 --- a/async_simple/CMakeLists.txt +++ b/async_simple/CMakeLists.txt @@ -1,14 +1,17 @@ file(GLOB coro_src "coro/*.cpp") file(GLOB executors_src "executors/*.cpp") -file(GLOB uthread_src "uthread/internal/*.cc") -EXECUTE_PROCESS(COMMAND uname -m COMMAND tr -d '\n' OUTPUT_VARIABLE ARCHITECTURE) -if ("${ARCHITECTURE}" STREQUAL "aarch64") +if("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") # uname -s + file(GLOB uthread_src "uthread/internal/*.cc") + if ("${CMAKE_SYSTEM_PROCESSOR}" STREQUAL "aarch64") # uname -p file(GLOB uthread_asm_src "uthread/internal/*arm64_aapcs_elf*") -elseif ("${ARCHITECTURE}" STREQUAL "x86_64") + set(UTHREAD ON) + elseif ("${CMAKE_SYSTEM_PROCESSOR}" STREQUAL "x86_64") file(GLOB uthread_asm_src "uthread/internal/*x86_64_sysv_elf*") -else() - message(FATAL_ERROR "Unsupported Target: ${ARCHITECTURE}") + set(UTHREAD ON) + else() + message(STATUS "Uthread Unsupported Target: ${CMAKE_SYSTEM_PROCESSOR}") + endif() endif() file(GLOB headers "*.h") @@ -16,38 +19,47 @@ file(GLOB coro_header "coro/*.h") file(GLOB executors_header "executors/*.h") file(GLOB experimental_header "experimental/*.h") file(GLOB util_header "util/*.h") -file(GLOB uthread_header "uthread/*.h") -file(GLOB uthread_internal_header "uthread/internal/*.h") +if(UTHREAD) + file(GLOB uthread_header "uthread/*.h") + file(GLOB uthread_internal_header "uthread/internal/*.h") +endif() -message(STATUS "uthread_asm_src: ${uthread_asm_src}") set(SRCS - ${coro_src} - ${executors_src} - ${uthread_asm_src} - ${uthread_src} - ) + ${coro_src} + ${executors_src} + ) +if(UTHREAD) + list(APPEND SRCS ${uthread_src}) + list(APPEND SRCS ${uthread_asm_src}) +endif() -add_library(async_simple_static STATIC ${SRCS}) -add_library(async_simple SHARED ${SRCS}) -target_link_libraries(async_simple ${deplibs}) -target_link_libraries(async_simple_static ${deplibs}) +if(SRCS) + add_library(async_simple_static STATIC ${SRCS}) + add_library(async_simple SHARED ${SRCS}) + target_link_libraries(async_simple ${deplibs}) + target_link_libraries(async_simple_static ${deplibs}) -set_target_properties(async_simple_static PROPERTIES OUTPUT_NAME "async_simple") + set_target_properties(async_simple_static PROPERTIES OUTPUT_NAME "async_simple") -install(TARGETS async_simple DESTINATION lib/) -install(TARGETS async_simple_static DESTINATION lib/) + install(TARGETS async_simple DESTINATION lib/) + install(TARGETS async_simple_static DESTINATION lib/) +endif() install(FILES ${headers} DESTINATION include/async_simple) install(FILES ${coro_header} DESTINATION include/async_simple/coro) install(FILES ${executors_header} DESTINATION include/async_simple/executors) install(FILES ${experimental_header} DESTINATION include/async_simple/experimental) install(FILES ${util_header} DESTINATION include/async_simple/util) -install(FILES ${uthread_header} DESTINATION include/async_simple/uthread) -install(FILES ${uthread_internal_header} DESTINATION include/async_simple/uthread/internal) +if(UTHREAD) + install(FILES ${uthread_header} DESTINATION include/async_simple/uthread) + install(FILES ${uthread_internal_header} DESTINATION include/async_simple/uthread/internal) +endif() add_subdirectory(test) add_subdirectory(util/test) add_subdirectory(coro/test) add_subdirectory(executors/test) -add_subdirectory(uthread/test) +if(UTHREAD) + add_subdirectory(uthread/test) +endif() diff --git a/async_simple/Future.h b/async_simple/Future.h index 2ed1f3e1e..ea289928e 100644 --- a/async_simple/Future.h +++ b/async_simple/Future.h @@ -23,8 +23,6 @@ #include #include -#include - namespace async_simple { // The well-known Future/Promise pairs mimic a producer/consuerm pair. @@ -34,9 +32,7 @@ namespace async_simple { // could be able to appear in different thread. // // To get the value of Future synchronously, user should use `get()` -// method. In case that Uthread (stackful coroutine) is enabled, -// `get()` would checkout the current Uthread. Otherwise, the it -// would blocking the current thread by using condition variable. +// method. It would blocking the current thread by using condition variable. // // To get the value of Future asynchronously, user could use `thenValue(F)` // or `thenTry(F)`. See the seperate comments for details. @@ -98,40 +94,14 @@ class Future { Try& result() & { return getTry(*this); } const Try& result() const& { return getTry(*this); } - // Implementation for get() to wait asynchronously. - void await() { - logicAssert(valid(), "Future is broken"); - if (hasResult()) { - return; - } - assert(currentThreadInExecutor()); - - auto ctx = uthread::internal::thread_impl::get(); - _sharedState->checkout(); - _sharedState->setForceSched(); - _sharedState->setContinuation([ctx](Try&& t) mutable { - uthread::internal::thread_impl::switch_in(ctx); - }); - - do { - uthread::internal::thread_impl::switch_out(ctx); - assert(_sharedState->hasResult()); - } while (!_sharedState->hasResult()); - } - // get is only allowed on rvalue, aka, Future is not valid after get // invoked. // - // When the future doesn't have a value, if the future is in a Uthread, - // the Uhtread would be checked out until the future gets a value. And if - // the future is not in a Uthread, it would block the current thread until - // the future gets a value. + // Get value blocked thread when the future doesn't have a value. + // If future in uthread context, use await(future) to get value without + // thread blocked. T get() && { - if (uthread::internal::thread_impl::can_switch_out()) { - await(); - } else { - wait(); - } + wait(); return (std::move(*this)).value(); } // Implementation for get() to wait synchronously. diff --git a/async_simple/coro/test/CMakeLists.txt b/async_simple/coro/test/CMakeLists.txt index c33930218..209becc37 100644 --- a/async_simple/coro/test/CMakeLists.txt +++ b/async_simple/coro/test/CMakeLists.txt @@ -1,7 +1,7 @@ file(GLOB coro_test_src "*.cpp") add_executable(async_simple_coro_test ${coro_test_src} ${PROJECT_SOURCE_DIR}/async_simple/test/dotest.cpp) -target_link_libraries(async_simple_coro_test async_simple ${deplibs} ${testdeplibs}) +target_link_libraries(async_simple_coro_test ${deplibs} ${testdeplibs}) add_test(NAME run_async_simple_coro_test COMMAND async_simple_coro_test) diff --git a/async_simple/executors/test/CMakeLists.txt b/async_simple/executors/test/CMakeLists.txt index 9f3fe081e..e4eb02bfc 100644 --- a/async_simple/executors/test/CMakeLists.txt +++ b/async_simple/executors/test/CMakeLists.txt @@ -1,7 +1,7 @@ file(GLOB executor_test_src "*.cpp") add_executable(async_simple_executor_test ${executor_test_src} ${PROJECT_SOURCE_DIR}/async_simple/test/dotest.cpp) -target_link_libraries(async_simple_executor_test async_simple ${deplibs} ${testdeplibs}) +target_link_libraries(async_simple_executor_test ${deplibs} ${testdeplibs}) add_test(NAME run_async_simple_executor_test COMMAND async_simple_executor_test) diff --git a/async_simple/executors/test/SimpleIOExecutorTest.cpp b/async_simple/executors/test/SimpleIOExecutorTest.cpp index 3604d0521..973da681c 100644 --- a/async_simple/executors/test/SimpleIOExecutorTest.cpp +++ b/async_simple/executors/test/SimpleIOExecutorTest.cpp @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +#ifndef ASYNC_SIMPLE_HAS_NOT_AIO #include #include #include @@ -83,3 +84,4 @@ TEST_F(SimpleIOExecutorTest, testException) { } } // namespace async_simple +#endif diff --git a/async_simple/test/CMakeLists.txt b/async_simple/test/CMakeLists.txt index 03e48968e..f496c63a0 100644 --- a/async_simple/test/CMakeLists.txt +++ b/async_simple/test/CMakeLists.txt @@ -1,6 +1,6 @@ file(GLOB test_src "*.cpp") add_executable(async_simple_test ${test_src}) -target_link_libraries(async_simple_test async_simple ${deplibs} ${testdeplibs}) +target_link_libraries(async_simple_test ${deplibs} ${testdeplibs}) add_test(NAME run_async_simple_test COMMAND async_simple_test) diff --git a/async_simple/uthread/Await.h b/async_simple/uthread/Await.h index 16522fec9..0121ce27c 100644 --- a/async_simple/uthread/Await.h +++ b/async_simple/uthread/Await.h @@ -26,11 +26,41 @@ #include #include +#include #include namespace async_simple { namespace uthread { +// Use to async get future value in uthread context. +// Invoke await will not block current thread. +// The current uthread will be suspend until promise.setValue() be called. +template +T await(Future&& fut) { + logicAssert(fut.valid(), "Future is broken"); + if (fut.hasResult()) { + return fut.value(); + } + assert(currentThreadInExecutor()); + assert(fut.getExecutor()); + Promise p; + auto f = p.getFuture().via(fut.getExecutor()); + p.checkout(); + p.forceSched(); + + auto ctx = uthread::internal::thread_impl::get(); + f.setContinuation( + [ctx](auto&&) { uthread::internal::thread_impl::switch_in(ctx); }); + + std::move(fut).thenTry( + [p = std::move(p)](Try&& t) mutable { p.setValue(std::move(t)); }); + do { + uthread::internal::thread_impl::switch_out(ctx); + assert(f.hasResult()); + } while (!f.hasResult()); + return f.value(); +} + // This await interface focus on await non-static member function of an object. // Here is an example: // @@ -55,7 +85,7 @@ decltype(auto) await(Executor* ex, Fn B::*fn, C* cls, Ts&&... ts) { co_return; }; lazy(std::forward(ts)...).setEx(ex).start([](auto&&) {}); - return std::move(f).get(); + return await(std::move(f)); } // This await interface focus on await non-member functions. Here is the @@ -80,7 +110,7 @@ decltype(auto) await(Executor* ex, Fn&& fn, Ts&&... ts) { co_return; }; lazy(std::forward(ts)...).setEx(ex).start([](auto&&) {}); - return std::move(f).get(); + return await(std::move(f)); } // This await interface is special. It would accept the function who receive an @@ -100,10 +130,10 @@ T await(Executor* ex, Fn&& fn) { Promise p; auto f = p.getFuture().via(ex); fn(std::move(p)); - return std::move(f).get(); + return await(std::move(f)); } } // namespace uthread } // namespace async_simple -#endif // ASYNC_SIMPLE_UTHREAD_AWAIT_H \ No newline at end of file +#endif // ASYNC_SIMPLE_UTHREAD_AWAIT_H diff --git a/async_simple/uthread/Latch.h b/async_simple/uthread/Latch.h index 0cc9f55b2..1b11bf36f 100644 --- a/async_simple/uthread/Latch.h +++ b/async_simple/uthread/Latch.h @@ -60,7 +60,7 @@ class Latch { if (_skip) { return; } - _promise.getFuture().via(ex).get(); + uthread::await(_promise.getFuture().via(ex)); } std::size_t currentCount() const { return _count.load(std::memory_order_acquire); @@ -75,4 +75,4 @@ class Latch { } // namespace uthread } // namespace async_simple -#endif // ASYNC_SIMPLE_UTHREAD_LATCH_H \ No newline at end of file +#endif // ASYNC_SIMPLE_UTHREAD_LATCH_H diff --git a/async_simple/uthread/test/UthreadTest.cpp b/async_simple/uthread/test/UthreadTest.cpp index 45e895561..8fb6032e9 100644 --- a/async_simple/uthread/test/UthreadTest.cpp +++ b/async_simple/uthread/test/UthreadTest.cpp @@ -122,7 +122,7 @@ TEST_F(UthreadTest, testSwitch) { _executor.schedule([ex, &running, &show, &ioJob]() mutable { Uthread task1(Attribute{ex}, [&running, &show, &ioJob]() { show("task1 start"); - auto value = ioJob().get(); + auto value = await(ioJob()); EXPECT_EQ(1024, value); show("task1 done"); running--; @@ -178,7 +178,7 @@ TEST_F(UthreadTest, testScheduleInTwoThread) { ex->schedule([ex, &running, &show, &ioJob]() mutable { Uthread task(Attribute{ex}, [&running, &show, &ioJob]() { show("task start"); - auto value = ioJob().get(); + auto value = await(ioJob()); EXPECT_EQ(1024, value); show("task done"); running--; @@ -212,7 +212,7 @@ TEST_F(UthreadTest, testAsync) { async( [&running, &show, &ioJob]() { show("task1 start"); - auto value = ioJob().get(); + auto value = await(ioJob()); EXPECT_EQ(1024, value); show("task1 done"); running--; @@ -409,7 +409,7 @@ TEST_F(UthreadTest, testCollectAllSlow) { std::vector> fs; for (size_t i = 0; i < kMaxTask; ++i) { fs.emplace_back([i, &ioJob]() -> std::size_t { - return i + ioJob(kMaxTask - i).get(); + return i + await(ioJob(kMaxTask - i)); }); } @@ -448,7 +448,7 @@ TEST_F(UthreadTest, testCollectAllSlowSingleThread) { std::vector> fs; for (size_t i = 0; i < kMaxTask; ++i) { fs.emplace_back([i, &ioJob]() -> std::size_t { - return i + ioJob(kMaxTask - i).get(); + return i + await(ioJob(kMaxTask - i)); }); } diff --git a/async_simple/util/test/CMakeLists.txt b/async_simple/util/test/CMakeLists.txt index 91e36f5a9..a51d7d77a 100644 --- a/async_simple/util/test/CMakeLists.txt +++ b/async_simple/util/test/CMakeLists.txt @@ -1,6 +1,6 @@ file(GLOB util_test_src "*.cpp") add_executable(async_simple_util_test ${util_test_src} ${PROJECT_SOURCE_DIR}/async_simple/test/dotest.cpp) -target_link_libraries(async_simple_util_test async_simple ${deplibs} ${testdeplibs}) +target_link_libraries(async_simple_util_test ${deplibs} ${testdeplibs}) add_test(NAME run_async_simple_util_test COMMAND async_simple_util_test) diff --git a/demo_example/CMakeLists.txt b/demo_example/CMakeLists.txt index 7722b62d9..8d0e391ac 100644 --- a/demo_example/CMakeLists.txt +++ b/demo_example/CMakeLists.txt @@ -1,8 +1,12 @@ add_executable(CountChar CountChar.cpp) -target_link_libraries(CountChar async_simple) +target_link_libraries(CountChar) add_executable(ReadFiles ReadFiles.cpp) -target_link_libraries(ReadFiles async_simple) +if(LIBAIO_INCLUDE_DIR AND LIBAIO_LIBRARIES) + target_link_libraries(ReadFiles pthread aio) +else() + target_link_libraries(ReadFiles pthread) +endif() add_custom_command( TARGET ReadFiles POST_BUILD @@ -13,10 +17,10 @@ add_custom_command( include_directories(asio) add_executable(async_echo_server async_echo_server.cpp) -target_link_libraries(async_echo_server async_simple) +target_link_libraries(async_echo_server pthread) add_executable(async_echo_client async_echo_client.cpp) -target_link_libraries(async_echo_client async_simple) +target_link_libraries(async_echo_client pthread) add_executable(block_echo_server block_echo_server.cpp) target_link_libraries(block_echo_server pthread) @@ -25,13 +29,13 @@ add_executable(block_echo_client block_echo_client.cpp) target_link_libraries(block_echo_client pthread) add_executable(http_server http/coroutine_http/http_server.cpp) -target_link_libraries(http_server async_simple) +target_link_libraries(http_server pthread) add_executable(http_client http/coroutine_http/http_client.cpp) -target_link_libraries(http_client async_simple) +target_link_libraries(http_client pthread) add_executable(block_http_server http/block_http/block_http_server.cpp) -target_link_libraries(block_http_server async_simple) +target_link_libraries(block_http_server pthread) SET(ENABLE_SSL OFF) @@ -43,7 +47,7 @@ endif() add_executable(smtp_client smtp/smtp_client.cpp) if (ENABLE_SSL) - target_link_libraries(smtp_client stdc++fs async_simple OpenSSL::SSL) + target_link_libraries(smtp_client stdc++fs pthread OpenSSL::SSL) else() - target_link_libraries(smtp_client stdc++fs async_simple) + target_link_libraries(smtp_client stdc++fs pthread) endif()