Skip to content

Commit

Permalink
switch-off uthread on not supported platform (#66)
Browse files Browse the repository at this point in the history
Signed-off-by: Rain Mark <rain.by.zhou@gmail.com>
  • Loading branch information
RainMark committed Mar 23, 2022
1 parent 0144f11 commit b7b5e1b
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 83 deletions.
60 changes: 36 additions & 24 deletions async_simple/CMakeLists.txt
@@ -1,53 +1,65 @@
file(GLOB coro_src "coro/*.cpp")
file(GLOB executors_src "executors/*.cpp")
file(GLOB uthread_src "uthread/internal/*.cc")

EXECUTE_PROCESS(COMMAND uname -m COMMAND tr -d '\n' OUTPUT_VARIABLE ARCHITECTURE)
if ("${ARCHITECTURE}" STREQUAL "aarch64")
if("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux") # uname -s
file(GLOB uthread_src "uthread/internal/*.cc")
if ("${CMAKE_SYSTEM_PROCESSOR}" STREQUAL "aarch64") # uname -p
file(GLOB uthread_asm_src "uthread/internal/*arm64_aapcs_elf*")
elseif ("${ARCHITECTURE}" STREQUAL "x86_64")
set(UTHREAD ON)
elseif ("${CMAKE_SYSTEM_PROCESSOR}" STREQUAL "x86_64")
file(GLOB uthread_asm_src "uthread/internal/*x86_64_sysv_elf*")
else()
message(FATAL_ERROR "Unsupported Target: ${ARCHITECTURE}")
set(UTHREAD ON)
else()
message(STATUS "Uthread Unsupported Target: ${CMAKE_SYSTEM_PROCESSOR}")
endif()
endif()

file(GLOB headers "*.h")
file(GLOB coro_header "coro/*.h")
file(GLOB executors_header "executors/*.h")
file(GLOB experimental_header "experimental/*.h")
file(GLOB util_header "util/*.h")
file(GLOB uthread_header "uthread/*.h")
file(GLOB uthread_internal_header "uthread/internal/*.h")
if(UTHREAD)
file(GLOB uthread_header "uthread/*.h")
file(GLOB uthread_internal_header "uthread/internal/*.h")
endif()

message(STATUS "uthread_asm_src: ${uthread_asm_src}")

set(SRCS
${coro_src}
${executors_src}
${uthread_asm_src}
${uthread_src}
)
${coro_src}
${executors_src}
)
if(UTHREAD)
list(APPEND SRCS ${uthread_src})
list(APPEND SRCS ${uthread_asm_src})
endif()

add_library(async_simple_static STATIC ${SRCS})
add_library(async_simple SHARED ${SRCS})
target_link_libraries(async_simple ${deplibs})
target_link_libraries(async_simple_static ${deplibs})
if(SRCS)
add_library(async_simple_static STATIC ${SRCS})
add_library(async_simple SHARED ${SRCS})
target_link_libraries(async_simple ${deplibs})
target_link_libraries(async_simple_static ${deplibs})

set_target_properties(async_simple_static PROPERTIES OUTPUT_NAME "async_simple")
set_target_properties(async_simple_static PROPERTIES OUTPUT_NAME "async_simple")

install(TARGETS async_simple DESTINATION lib/)
install(TARGETS async_simple_static DESTINATION lib/)
install(TARGETS async_simple DESTINATION lib/)
install(TARGETS async_simple_static DESTINATION lib/)
endif()

install(FILES ${headers} DESTINATION include/async_simple)
install(FILES ${coro_header} DESTINATION include/async_simple/coro)
install(FILES ${executors_header} DESTINATION include/async_simple/executors)
install(FILES ${experimental_header} DESTINATION include/async_simple/experimental)
install(FILES ${util_header} DESTINATION include/async_simple/util)
install(FILES ${uthread_header} DESTINATION include/async_simple/uthread)
install(FILES ${uthread_internal_header} DESTINATION include/async_simple/uthread/internal)
if(UTHREAD)
install(FILES ${uthread_header} DESTINATION include/async_simple/uthread)
install(FILES ${uthread_internal_header} DESTINATION include/async_simple/uthread/internal)
endif()

add_subdirectory(test)
add_subdirectory(util/test)
add_subdirectory(coro/test)
add_subdirectory(executors/test)
add_subdirectory(uthread/test)
if(UTHREAD)
add_subdirectory(uthread/test)
endif()
40 changes: 5 additions & 35 deletions async_simple/Future.h
Expand Up @@ -23,8 +23,6 @@
#include <async_simple/Traits.h>
#include <type_traits>

#include <async_simple/uthread/internal/thread_impl.h>

namespace async_simple {

// The well-known Future/Promise pairs mimic a producer/consuerm pair.
Expand All @@ -34,9 +32,7 @@ namespace async_simple {
// could be able to appear in different thread.
//
// To get the value of Future synchronously, user should use `get()`
// method. In case that Uthread (stackful coroutine) is enabled,
// `get()` would checkout the current Uthread. Otherwise, the it
// would blocking the current thread by using condition variable.
// method. It would blocking the current thread by using condition variable.
//
// To get the value of Future asynchronously, user could use `thenValue(F)`
// or `thenTry(F)`. See the seperate comments for details.
Expand Down Expand Up @@ -98,40 +94,14 @@ class Future {
Try<T>& result() & { return getTry(*this); }
const Try<T>& result() const& { return getTry(*this); }

// Implementation for get() to wait asynchronously.
void await() {
logicAssert(valid(), "Future is broken");
if (hasResult()) {
return;
}
assert(currentThreadInExecutor());

auto ctx = uthread::internal::thread_impl::get();
_sharedState->checkout();
_sharedState->setForceSched();
_sharedState->setContinuation([ctx](Try<T>&& t) mutable {
uthread::internal::thread_impl::switch_in(ctx);
});

do {
uthread::internal::thread_impl::switch_out(ctx);
assert(_sharedState->hasResult());
} while (!_sharedState->hasResult());
}

// get is only allowed on rvalue, aka, Future is not valid after get
// invoked.
//
// When the future doesn't have a value, if the future is in a Uthread,
// the Uhtread would be checked out until the future gets a value. And if
// the future is not in a Uthread, it would block the current thread until
// the future gets a value.
// Get value blocked thread when the future doesn't have a value.
// If future in uthread context, use await(future) to get value without
// thread blocked.
T get() && {
if (uthread::internal::thread_impl::can_switch_out()) {
await();
} else {
wait();
}
wait();
return (std::move(*this)).value();
}
// Implementation for get() to wait synchronously.
Expand Down
2 changes: 1 addition & 1 deletion async_simple/coro/test/CMakeLists.txt
@@ -1,7 +1,7 @@
file(GLOB coro_test_src "*.cpp")
add_executable(async_simple_coro_test ${coro_test_src} ${PROJECT_SOURCE_DIR}/async_simple/test/dotest.cpp)

target_link_libraries(async_simple_coro_test async_simple ${deplibs} ${testdeplibs})
target_link_libraries(async_simple_coro_test ${deplibs} ${testdeplibs})

add_test(NAME run_async_simple_coro_test COMMAND async_simple_coro_test)

2 changes: 1 addition & 1 deletion async_simple/executors/test/CMakeLists.txt
@@ -1,7 +1,7 @@
file(GLOB executor_test_src "*.cpp")
add_executable(async_simple_executor_test ${executor_test_src} ${PROJECT_SOURCE_DIR}/async_simple/test/dotest.cpp)

target_link_libraries(async_simple_executor_test async_simple ${deplibs} ${testdeplibs})
target_link_libraries(async_simple_executor_test ${deplibs} ${testdeplibs})

add_test(NAME run_async_simple_executor_test COMMAND async_simple_executor_test)

2 changes: 2 additions & 0 deletions async_simple/executors/test/SimpleIOExecutorTest.cpp
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ASYNC_SIMPLE_HAS_NOT_AIO
#include <fcntl.h>
#include <gtest/gtest.h>
#include <malloc.h>
Expand Down Expand Up @@ -83,3 +84,4 @@ TEST_F(SimpleIOExecutorTest, testException) {
}

} // namespace async_simple
#endif
2 changes: 1 addition & 1 deletion async_simple/test/CMakeLists.txt
@@ -1,6 +1,6 @@
file(GLOB test_src "*.cpp")
add_executable(async_simple_test ${test_src})

target_link_libraries(async_simple_test async_simple ${deplibs} ${testdeplibs})
target_link_libraries(async_simple_test ${deplibs} ${testdeplibs})

add_test(NAME run_async_simple_test COMMAND async_simple_test)
38 changes: 34 additions & 4 deletions async_simple/uthread/Await.h
Expand Up @@ -26,11 +26,41 @@

#include <async_simple/Future.h>
#include <async_simple/coro/Lazy.h>
#include <async_simple/uthread/internal/thread_impl.h>
#include <type_traits>

namespace async_simple {
namespace uthread {

// Use to async get future value in uthread context.
// Invoke await will not block current thread.
// The current uthread will be suspend until promise.setValue() be called.
template <class T>
T await(Future<T>&& fut) {
logicAssert(fut.valid(), "Future is broken");
if (fut.hasResult()) {
return fut.value();
}
assert(currentThreadInExecutor());
assert(fut.getExecutor());
Promise<T> p;
auto f = p.getFuture().via(fut.getExecutor());
p.checkout();
p.forceSched();

auto ctx = uthread::internal::thread_impl::get();
f.setContinuation(
[ctx](auto&&) { uthread::internal::thread_impl::switch_in(ctx); });

std::move(fut).thenTry(
[p = std::move(p)](Try<T>&& t) mutable { p.setValue(std::move(t)); });
do {
uthread::internal::thread_impl::switch_out(ctx);
assert(f.hasResult());
} while (!f.hasResult());
return f.value();
}

// This await interface focus on await non-static member function of an object.
// Here is an example:
//
Expand All @@ -55,7 +85,7 @@ decltype(auto) await(Executor* ex, Fn B::*fn, C* cls, Ts&&... ts) {
co_return;
};
lazy(std::forward<Ts&&>(ts)...).setEx(ex).start([](auto&&) {});
return std::move(f).get();
return await(std::move(f));
}

// This await interface focus on await non-member functions. Here is the
Expand All @@ -80,7 +110,7 @@ decltype(auto) await(Executor* ex, Fn&& fn, Ts&&... ts) {
co_return;
};
lazy(std::forward<Ts&&>(ts)...).setEx(ex).start([](auto&&) {});
return std::move(f).get();
return await(std::move(f));
}

// This await interface is special. It would accept the function who receive an
Expand All @@ -100,10 +130,10 @@ T await(Executor* ex, Fn&& fn) {
Promise<T> p;
auto f = p.getFuture().via(ex);
fn(std::move(p));
return std::move(f).get();
return await(std::move(f));
}

} // namespace uthread
} // namespace async_simple

#endif // ASYNC_SIMPLE_UTHREAD_AWAIT_H
#endif // ASYNC_SIMPLE_UTHREAD_AWAIT_H
4 changes: 2 additions & 2 deletions async_simple/uthread/Latch.h
Expand Up @@ -60,7 +60,7 @@ class Latch {
if (_skip) {
return;
}
_promise.getFuture().via(ex).get();
uthread::await(_promise.getFuture().via(ex));
}
std::size_t currentCount() const {
return _count.load(std::memory_order_acquire);
Expand All @@ -75,4 +75,4 @@ class Latch {
} // namespace uthread
} // namespace async_simple

#endif // ASYNC_SIMPLE_UTHREAD_LATCH_H
#endif // ASYNC_SIMPLE_UTHREAD_LATCH_H
10 changes: 5 additions & 5 deletions async_simple/uthread/test/UthreadTest.cpp
Expand Up @@ -122,7 +122,7 @@ TEST_F(UthreadTest, testSwitch) {
_executor.schedule([ex, &running, &show, &ioJob]() mutable {
Uthread task1(Attribute{ex}, [&running, &show, &ioJob]() {
show("task1 start");
auto value = ioJob().get();
auto value = await(ioJob());
EXPECT_EQ(1024, value);
show("task1 done");
running--;
Expand Down Expand Up @@ -178,7 +178,7 @@ TEST_F(UthreadTest, testScheduleInTwoThread) {
ex->schedule([ex, &running, &show, &ioJob]() mutable {
Uthread task(Attribute{ex}, [&running, &show, &ioJob]() {
show("task start");
auto value = ioJob().get();
auto value = await(ioJob());
EXPECT_EQ(1024, value);
show("task done");
running--;
Expand Down Expand Up @@ -212,7 +212,7 @@ TEST_F(UthreadTest, testAsync) {
async<Launch::Schedule>(
[&running, &show, &ioJob]() {
show("task1 start");
auto value = ioJob().get();
auto value = await(ioJob());
EXPECT_EQ(1024, value);
show("task1 done");
running--;
Expand Down Expand Up @@ -409,7 +409,7 @@ TEST_F(UthreadTest, testCollectAllSlow) {
std::vector<std::function<std::size_t()>> fs;
for (size_t i = 0; i < kMaxTask; ++i) {
fs.emplace_back([i, &ioJob]() -> std::size_t {
return i + ioJob(kMaxTask - i).get();
return i + await(ioJob(kMaxTask - i));
});
}

Expand Down Expand Up @@ -448,7 +448,7 @@ TEST_F(UthreadTest, testCollectAllSlowSingleThread) {
std::vector<std::function<std::size_t()>> fs;
for (size_t i = 0; i < kMaxTask; ++i) {
fs.emplace_back([i, &ioJob]() -> std::size_t {
return i + ioJob(kMaxTask - i).get();
return i + await(ioJob(kMaxTask - i));
});
}

Expand Down
2 changes: 1 addition & 1 deletion async_simple/util/test/CMakeLists.txt
@@ -1,6 +1,6 @@
file(GLOB util_test_src "*.cpp")
add_executable(async_simple_util_test ${util_test_src} ${PROJECT_SOURCE_DIR}/async_simple/test/dotest.cpp)

target_link_libraries(async_simple_util_test async_simple ${deplibs} ${testdeplibs})
target_link_libraries(async_simple_util_test ${deplibs} ${testdeplibs})

add_test(NAME run_async_simple_util_test COMMAND async_simple_util_test)
22 changes: 13 additions & 9 deletions demo_example/CMakeLists.txt
@@ -1,8 +1,12 @@
add_executable(CountChar CountChar.cpp)
target_link_libraries(CountChar async_simple)
target_link_libraries(CountChar)

add_executable(ReadFiles ReadFiles.cpp)
target_link_libraries(ReadFiles async_simple)
if(LIBAIO_INCLUDE_DIR AND LIBAIO_LIBRARIES)
target_link_libraries(ReadFiles pthread aio)
else()
target_link_libraries(ReadFiles pthread)
endif()

add_custom_command(
TARGET ReadFiles POST_BUILD
Expand All @@ -13,10 +17,10 @@ add_custom_command(
include_directories(asio)

add_executable(async_echo_server async_echo_server.cpp)
target_link_libraries(async_echo_server async_simple)
target_link_libraries(async_echo_server pthread)

add_executable(async_echo_client async_echo_client.cpp)
target_link_libraries(async_echo_client async_simple)
target_link_libraries(async_echo_client pthread)

add_executable(block_echo_server block_echo_server.cpp)
target_link_libraries(block_echo_server pthread)
Expand All @@ -25,13 +29,13 @@ add_executable(block_echo_client block_echo_client.cpp)
target_link_libraries(block_echo_client pthread)

add_executable(http_server http/coroutine_http/http_server.cpp)
target_link_libraries(http_server async_simple)
target_link_libraries(http_server pthread)

add_executable(http_client http/coroutine_http/http_client.cpp)
target_link_libraries(http_client async_simple)
target_link_libraries(http_client pthread)

add_executable(block_http_server http/block_http/block_http_server.cpp)
target_link_libraries(block_http_server async_simple)
target_link_libraries(block_http_server pthread)

SET(ENABLE_SSL OFF)

Expand All @@ -43,7 +47,7 @@ endif()

add_executable(smtp_client smtp/smtp_client.cpp)
if (ENABLE_SSL)
target_link_libraries(smtp_client stdc++fs async_simple OpenSSL::SSL)
target_link_libraries(smtp_client stdc++fs pthread OpenSSL::SSL)
else()
target_link_libraries(smtp_client stdc++fs async_simple)
target_link_libraries(smtp_client stdc++fs pthread)
endif()

1 comment on commit b7b5e1b

@ChuanqiXu9
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

It looks like this one hit an infinite loop.

Please sign in to comment.