Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid some unnecessary copies in generic actors #3024

Merged
merged 1 commit into from Apr 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion flow/genericactors.actor.cpp
Expand Up @@ -69,6 +69,8 @@ ACTOR Future<Void> timeoutWarningCollector( FutureStream<Void> input, double log
ACTOR Future<bool> quorumEqualsTrue( std::vector<Future<bool>> futures, int required ) {
state std::vector< Future<Void> > true_futures;
state std::vector< Future<Void> > false_futures;
true_futures.reserve(futures.size());
false_futures.reserve(futures.size());
for(int i=0; i<futures.size(); i++) {
true_futures.push_back( onEqual( futures[i], true ) );
false_futures.push_back( onEqual( futures[i], false ) );
Expand All @@ -87,6 +89,7 @@ ACTOR Future<bool> quorumEqualsTrue( std::vector<Future<bool>> futures, int requ
ACTOR Future<bool> shortCircuitAny( std::vector<Future<bool>> f )
{
std::vector<Future<Void>> sc;
sc.reserve(f.size());
for(Future<bool> fut : f) {
sc.push_back(returnIfTrue(fut));
}
Expand All @@ -96,7 +99,7 @@ ACTOR Future<bool> shortCircuitAny( std::vector<Future<bool>> f )
// Handle a possible race condition? If the _last_ term to
// be evaluated triggers the waitForAll before bubbling
// out of the returnIfTrue quorum
for ( auto fut : f ) {
for (const auto& fut : f) {
if ( fut.get() ) {
return true;
}
Expand Down
25 changes: 15 additions & 10 deletions flow/genericactors.actor.h
Expand Up @@ -312,8 +312,8 @@ template<class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> mapAsync(std::vector<Future<T>> const& what, F const& actorFunc)
{
std::vector<std::invoke_result_t<F, T>> ret;
for(auto f : what)
ret.push_back(mapAsync( f, actorFunc ));
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(mapAsync(f, actorFunc));
return ret;
}

Expand Down Expand Up @@ -371,8 +371,8 @@ template<class T, class F>
std::vector<Future<std::invoke_result_t<F, T>>> map(std::vector<Future<T>> const& what, F const& func)
{
std::vector<Future<std::invoke_result_t<F, T>>> ret;
for(auto f : what)
ret.push_back(map( f, func ));
ret.reserve(what.size());
for (const auto& f : what) ret.push_back(map(f, func));
return ret;
}

Expand Down Expand Up @@ -585,6 +585,7 @@ class AsyncMap : NonCopyable {
}
std::vector<K> getKeys() {
std::vector<K> keys;
keys.reserve(items.size());
for(auto i = items.begin(); i != items.end(); ++i)
keys.push_back( i->first );
return keys;
Expand Down Expand Up @@ -887,6 +888,7 @@ Future<Void> streamHelper( PromiseStream<T> output, PromiseStream<Error> errors,
template <class T>
Future<Void> makeStream( const std::vector<Future<T>>& futures, PromiseStream<T>& stream, PromiseStream<Error>& errors ) {
std::vector<Future<Void>> forwarders;
forwarders.reserve(futures.size());
for(int f=0; f<futures.size(); f++)
forwarders.push_back( streamHelper( stream, errors, futures[f] ) );
return cancelOnly(forwarders);
Expand Down Expand Up @@ -1016,6 +1018,7 @@ Future<std::vector<T>> getAll( std::vector<Future<T>> input ) {
wait( quorum( input, input.size() ) );

std::vector<T> output;
output.reserve(input.size());
for(int i=0; i<input.size(); i++)
output.push_back( input[i].get() );
return output;
Expand All @@ -1026,6 +1029,12 @@ Future<std::vector<T>> appendAll( std::vector<Future<std::vector<T>>> input ) {
wait( quorum( input, input.size() ) );

std::vector<T> output;
size_t sz = 0;
for (const auto& f : input) {
sz += f.get().size();
}
output.reserve(sz);

for(int i=0; i<input.size(); i++) {
auto const& r = input[i].get();
output.insert( output.end(), r.begin(), r.end() );
Expand Down Expand Up @@ -1167,10 +1176,7 @@ inline Future<Void> operator &&( Future<Void> const& lhs, Future<Void> const& rh
else return lhs;
}

std::vector<Future<Void>> v;
v.push_back( lhs );
v.push_back( rhs );
return waitForAll(v);
return waitForAll(std::vector<Future<Void>>{ lhs, rhs });
}

// error || unset -> error
Expand Down Expand Up @@ -1626,8 +1632,7 @@ class AndFuture {
return futures[0];

Future<Void> f = waitForAll(futures);
futures = std::vector<Future<Void>>();
futures.push_back(f);
futures = std::vector<Future<Void>>{ f };
return f;
}

Expand Down