Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
task: reimplement safe_late_task with future task (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
shengofsun authored May 17, 2018
1 parent 3dbda51 commit e9b1dde
Show file tree
Hide file tree
Showing 18 changed files with 271 additions and 249 deletions.
23 changes: 0 additions & 23 deletions include/dsn/cpp/clientlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,6 @@ inline task_ptr enqueue_timer(task_code evt,
tsk->enqueue();
return tsk;
}

template <typename TCallback>
inline dsn::ref_ptr<dsn::safe_late_task<TCallback>> create_late_task(
dsn::task_code code, const TCallback &callback, int hash = 0, task_tracker *tracker = nullptr)
{
using result_task_type = safe_late_task<typename std::remove_cv<TCallback>::type>;
dsn::ref_ptr<result_task_type> ptr(
new result_task_type(code, std::move(callback), hash, nullptr));
ptr->set_tracker(tracker);
ptr->spec().on_task_create.execute(::dsn::task::get_current_task(), ptr);
return ptr;
}

template <typename TResponse>
void call_safe_late_task(const dsn::task_ptr &t, TResponse &&response)
{
typedef std::function<void(const TResponse &)> TCallback;
typedef dsn::safe_late_task<TCallback> task_type;
task_type *real_task = reinterpret_cast<task_type *>(t.get());
real_task->bind_and_enqueue([r = std::move(response)](TCallback & callback) {
return std::bind(callback, std::move(r));
});
}
}
/*@}*/

Expand Down
18 changes: 18 additions & 0 deletions include/dsn/dist/block_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ struct ls_response
ls_response() : entries(std::make_shared<std::vector<ls_entry>>()) {}
};
typedef std::function<void(const ls_response &)> ls_callback;
typedef future_task<ls_response> ls_future;
typedef dsn::ref_ptr<ls_future> ls_future_ptr;

/**
* @brief The create_file_request struct, used to create a block_file_ptr
Expand Down Expand Up @@ -83,6 +85,8 @@ struct create_file_response
block_file_ptr file_handle;
};
typedef std::function<void(const create_file_response &)> create_file_callback;
typedef future_task<create_file_response> create_file_future;
typedef dsn::ref_ptr<create_file_future> create_file_future_ptr;

/**
* @brief The delete_file_request struct, use to delete a file
Expand All @@ -106,6 +110,8 @@ struct delete_file_response
dsn::error_code err;
};
typedef std::function<void(const delete_file_response &)> delete_file_callback;
typedef future_task<delete_file_response> delete_file_future;
typedef dsn::ref_ptr<delete_file_future> delete_file_future_ptr;

/**
* @brief The exist_request struct
Expand All @@ -129,6 +135,8 @@ struct exist_response
dsn::error_code err;
};
typedef std::function<void(const exist_response &)> exist_callback;
typedef future_task<exist_response> exist_future;
typedef dsn::ref_ptr<exist_future> exist_future_ptr;

/**
* @brief The remove_path_request struct
Expand Down Expand Up @@ -157,6 +165,8 @@ struct remove_path_response
dsn::error_code err;
};
typedef std::function<void(const remove_path_response &)> remove_path_callback;
typedef future_task<remove_path_response> remove_path_future;
typedef dsn::ref_ptr<remove_path_future> remove_path_future_ptr;

/**
* @brief The read_request struct
Expand Down Expand Up @@ -191,6 +201,8 @@ struct read_response
dsn::blob buffer;
};
typedef std::function<void(const read_response &)> read_callback;
typedef future_task<read_response> read_future;
typedef dsn::ref_ptr<read_future> read_future_ptr;

/**
* @brief The write_request struct
Expand Down Expand Up @@ -219,6 +231,8 @@ struct write_response
uint64_t written_size;
};
typedef std::function<void(const write_response &)> write_callback;
typedef future_task<write_response> write_future;
typedef dsn::ref_ptr<write_future> write_future_ptr;

/**
* @brief The upload_request struct
Expand All @@ -242,6 +256,8 @@ struct upload_response
uint64_t uploaded_size;
};
typedef std::function<void(const upload_response &)> upload_callback;
typedef future_task<upload_response> upload_future;
typedef dsn::ref_ptr<upload_future> upload_future_ptr;

/**
* @brief The download_request struct
Expand All @@ -266,6 +282,8 @@ struct download_response
uint64_t downloaded_size;
};
typedef std::function<void(const download_response &)> download_callback;
typedef future_task<download_response> download_future;
typedef dsn::ref_ptr<download_future> download_future_ptr;

class block_filesystem
{
Expand Down
11 changes: 7 additions & 4 deletions include/dsn/dist/distributed_lock_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,19 @@

#include <dsn/service_api_cpp.h>
#include <dsn/utility/error_code.h>
#include <dsn/tool-api/future_types.h>
#include <string>
#include <functional>
#include <utility>

namespace dsn {
namespace dist {

typedef std::function<void(error_code ec, const std::string &owner_id, uint64_t version)>
lock_callback;
typedef future_task<error_code, std::string, uint64_t> lock_future;
typedef dsn::ref_ptr<lock_future> lock_future_ptr;

class distributed_lock_service
{
public:
Expand All @@ -61,10 +68,6 @@ class distributed_lock_service
typedef distributed_lock_service *(*factory)();

public:
typedef std::function<void(error_code ec)> err_callback;
typedef std::function<void(error_code ec, const std::string &owner_id, uint64_t version)>
lock_callback;

struct lock_options
{
bool create_if_not_exist;
Expand Down
15 changes: 10 additions & 5 deletions include/dsn/dist/meta_state_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,21 @@

#include <dsn/service_api_cpp.h>
#include <dsn/utility/error_code.h>
#include <dsn/tool-api/future_types.h>
#include <string>
#include <functional>

namespace dsn {
namespace dist {
typedef std::function<void(error_code ec, const blob &val)> err_value_callback;
typedef future_task<error_code, blob> err_value_future;
typedef dsn::ref_ptr<err_value_future> err_value_future_ptr;

typedef std::function<void(error_code ec, const std::vector<std::string> &ret_strv)>
err_stringv_callback;
typedef future_task<error_code, std::vector<std::string>> err_stringv_future;
typedef dsn::ref_ptr<err_stringv_future> err_stringv_future_ptr;

class meta_state_service
{
public:
Expand All @@ -62,11 +72,6 @@ class meta_state_service
typedef meta_state_service *(*factory)();

public:
typedef std::function<void(error_code ec, const blob &val)> err_value_callback;
typedef std::function<void(error_code ec, const std::vector<std::string> &ret_strv)>
err_stringv_callback;
typedef std::function<void(error_code ec)> err_callback;

/* providers should implement this to support transaction */
class transaction_entries
{
Expand Down
36 changes: 36 additions & 0 deletions include/dsn/tool-api/future_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once

#include <functional>
#include <dsn/utility/error_code.h>
#include <dsn/tool-api/task.h>

namespace dsn {
typedef std::function<void(dsn::error_code)> err_callback;
typedef future_task<dsn::error_code> error_code_future;
typedef dsn::ref_ptr<error_code_future> error_code_future_ptr;
}
62 changes: 36 additions & 26 deletions include/dsn/tool-api/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
#pragma once

#include <functional>
#include <tuple>
#include <dsn/utility/ports.h>
#include <dsn/utility/extensible_object.h>
#include <dsn/utility/callocator.h>
#include <dsn/utility/utils.h>
#include <dsn/utility/apply.h>
#include <dsn/utility/binary_writer.h>
#include <dsn/tool-api/task_spec.h>
#include <dsn/tool-api/task_tracker.h>
Expand Down Expand Up @@ -272,9 +274,10 @@ class task : public ref_counter, public extensible_object<task, 4>, public trans

virtual void exec() = 0;
//
// this function is used for clearing the callback assigned to this task.
// as the callback is usually a functor, circular reference may occur if
// we don't clear manually, for example:
// this function is used for clearing the non-trivial objects assigned to this task, like
// callback functors and some task-specific values.
//
// circular reference may occur if we don't clear them manually, for example:
//
// class A: public dsn::ref_counter {
// public:
Expand All @@ -287,18 +290,19 @@ class task : public ref_counter, public extensible_object<task, 4>, public trans
// [a_obj](){ std::cout << value << std::endl; });
//
// in the case above, a_obj holds a ref_counter for my_task,
// my task holds a ref_counter for a_obj because it owns a lambda
// my task holds a ref_counter for a_obj because it owns a functor
// which captures a_obj by value
//
// in order to prevent this case, we let the task to clear the calback functor when
// in order to prevent this case, we let the task to clear these non-trival objects when
// a task is finished or cancelled.
//
// we may call this function in "exec_internal" or "cancel". however, it's still subclass's
// duty to define "how to clear the callback".
//
// don't declare this as pure virtual function, coz it is not necessary for every subclass
// to have a callback to clear.
// to have non trivial objects to clear.
//
virtual void clear_callback() {}
virtual void clear_non_trivial_on_task_end() {}

bool _is_null;
error_code _error;
Expand Down Expand Up @@ -349,7 +353,7 @@ class raw_task : public task
}

protected:
void clear_callback() override { _cb = nullptr; }
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

protected:
task_handler _cb;
Expand All @@ -376,7 +380,7 @@ class timer_task : public task
void enqueue() override;

protected:
void clear_callback() override { _cb = nullptr; }
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

private:
// ATTENTION: if _interval_milliseconds <= 0, then timer task will just be executed once;
Expand All @@ -385,38 +389,44 @@ class timer_task : public task
task_handler _cb;
};

template <typename TCallback>
class safe_late_task : public raw_task
template <typename First, typename... Remaining>
class future_task : public task
{
public:
typedef std::function<task_handler(TCallback &)> currying;
safe_late_task(task_code code, const TCallback &cb, int hash = 0, service_node *node = nullptr)
: raw_task(code, nullptr, hash, node), _user_cb(cb)
typedef std::function<void(const First, const Remaining &...)> TCallback;
future_task(task_code code, const TCallback &cb, int hash, service_node *node = nullptr)
: task(code, hash, node), _cb(cb)
{
}
safe_late_task(task_code code, TCallback &&cb, int hash = 0, service_node *node = nullptr)
: raw_task(code, nullptr, hash, node), _user_cb(std::move(cb))
future_task(task_code code, TCallback &&cb, int hash, service_node *node = nullptr)
: task(code, hash, node), _cb(std::move(cb))
{
}
virtual void exec() override { dsn::apply(_cb, std::move(_values)); }

void bind_and_enqueue(const currying &c, int delay_ms = 0)
void enqueue_with(const First &t, const Remaining &... r, int delay_ms = 0)
{
if (dsn_likely(_user_cb != nullptr)) {
raw_task::_cb = c(_user_cb);
}
_values = std::make_tuple(t, r...);
set_delay(delay_ms);
enqueue();
}
void enqueue_with(First &&t, Remaining &&... r, int delay_ms = 0)
{
_values = std::make_tuple(std::move(t), std::forward<Remaining>(r)...);
set_delay(delay_ms);
enqueue();
}

protected:
void clear_callback() override
void clear_non_trivial_on_task_end() override
{
_cb = nullptr;
_user_cb = nullptr;
_values = {};
}

private:
TCallback _user_cb;
TCallback _cb;
std::tuple<First, Remaining...> _values;
};

class rpc_request_task : public task
Expand Down Expand Up @@ -446,7 +456,7 @@ class rpc_request_task : public task
}

protected:
void clear_callback() override { _handler = nullptr; }
void clear_non_trivial_on_task_end() override { _handler = nullptr; }

protected:
message_ex *_request;
Expand Down Expand Up @@ -521,7 +531,7 @@ class rpc_response_task : public task
void set_caller_pool(task_worker_pool *pl) { _caller_pool = pl; }

protected:
void clear_callback() override { _cb = nullptr; }
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

private:
message_ex *_request;
Expand Down Expand Up @@ -614,7 +624,7 @@ class aio_task : public task
blob _merged_write_buffer_holder;

protected:
void clear_callback() override { _cb = nullptr; }
void clear_non_trivial_on_task_end() override { _cb = nullptr; }

protected:
disk_aio *_aio;
Expand Down
6 changes: 3 additions & 3 deletions src/core/core/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ void task::exec_internal()
std::memory_order_release,
std::memory_order_relaxed)) {
_spec->on_task_end.execute(this);
clear_callback();
clear_non_trivial_on_task_end();
} else {
if (!_wait_for_cancel) {
// for retried tasks such as timer or rpc_response_task
Expand All @@ -219,7 +219,7 @@ void task::exec_internal()

// for timer task, we must call reset_callback after cancelled, because we don't
// reset callback after exec()
clear_callback();
clear_non_trivial_on_task_end();
}
}

Expand Down Expand Up @@ -343,7 +343,7 @@ bool task::cancel(bool wait_until_finished, /*out*/ bool *finished /*= nullptr*/
// we call clear_callback only cancelling succeed.
// otherwise, task will successfully exececuted and clear_callback will be called
// in "exec_internal".
clear_callback();
clear_non_trivial_on_task_end();
}

if (finished)
Expand Down
Loading

0 comments on commit e9b1dde

Please sign in to comment.