/
AddTasks.h
135 lines (113 loc) · 3.63 KB
/
AddTasks.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <functional>
#include <vector>
#include <folly/Optional.h>
#include <folly/Try.h>
#include <folly/fibers/FiberManagerInternal.h>
#include <folly/fibers/Promise.h>
namespace folly {
namespace fibers {
template <typename T>
class TaskIterator;
/**
* Schedules several tasks and immediately returns an iterator, that
* allow one to traverse tasks in the order of their completion. All results
* and exceptions thrown are stored alongside with the task id and are
* accessible via iterator.
*
* @param first Range of tasks to be scheduled
* @param last
*
* @return movable, non-copyable iterator
*/
template <class InputIterator>
TaskIterator<invoke_result_t<
typename std::iterator_traits<InputIterator>::
value_type>> inline addTasks(InputIterator first, InputIterator last);
template <typename T>
class TaskIterator {
public:
typedef T value_type;
TaskIterator() : fm_(FiberManager::getFiberManager()) {}
// not copyable
TaskIterator(const TaskIterator& other) = delete;
TaskIterator& operator=(const TaskIterator& other) = delete;
// movable
TaskIterator(TaskIterator&& other) noexcept;
TaskIterator& operator=(TaskIterator&& other) = delete;
/**
* Add one more task to the TaskIterator.
*
* @param func task to be added, will be scheduled on current FiberManager
*/
template <typename F>
void addTask(F&& func);
/**
* @return True if there are tasks immediately available to be consumed (no
* need to await on them).
*/
bool hasCompleted() const;
/**
* @return True if there are tasks pending execution (need to awaited on).
*/
bool hasPending() const;
/**
* @return True if there are any tasks (hasCompleted() || hasPending()).
*/
bool hasNext() const;
/**
* Await for another task to complete. Will not await if the result is
* already available.
*
* @return result of the task completed.
* @throw exception thrown by the task.
*/
T awaitNext();
/**
* Await until the specified number of tasks completes or there are no
* tasks left to await for.
* Note: Will not await if there are already the specified number of tasks
* available.
*
* @param n Number of tasks to await for completition.
*/
void reserve(size_t n);
/**
* @return id of the last task that was processed by awaitNext().
*/
size_t getTaskID() const;
private:
template <class InputIterator>
friend TaskIterator<
invoke_result_t<typename std::iterator_traits<InputIterator>::value_type>>
addTasks(InputIterator first, InputIterator last);
struct Context {
std::vector<std::pair<size_t, folly::Try<T>>> results;
folly::Optional<Promise<void>> promise;
size_t totalTasks{0};
size_t tasksConsumed{0};
size_t tasksToFulfillPromise{0};
};
std::shared_ptr<Context> context_{std::make_shared<Context>()};
size_t id_{std::numeric_limits<size_t>::max()};
FiberManager& fm_;
folly::Try<T> awaitNextResult();
};
} // namespace fibers
} // namespace folly
#include <folly/fibers/AddTasks-inl.h>