Skip to content

Commit

Permalink
Fix issues found when creating integration tests (#951)
Browse files Browse the repository at this point in the history
The resulting PR was too large, so splitting these fixes out from the full integration PR:
1. FakeUserDefinedOutputPlugin is now thread safe.
2. AggregateGlobalOutput takes in UserDefinedOutput so that it can account for errors
  - Previously there was no way for the global aggregate to know about missing outputs which could have created confusion for users
3. FakeUserDefinedOutput - handleResponseData now ignores empty data responses
  - We uncovered an issue where handleData gets called twice per request, once with empty data - more investigation required in #950 
4. fake_user_defined_output plugin can no longer be test only because of a dependency chain that leads to non-test code, such as benchmarks and dynamic_config. It is too complex to break this dependency chain at the moment.

Signed-off-by: Nathan Perry <nbperry@google.com>
  • Loading branch information
dubious90 committed Dec 2, 2022
1 parent 201b828 commit a8cf502
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 56 deletions.
1 change: 1 addition & 0 deletions include/nighthawk/user_defined_output/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ envoy_basic_cc_library(
],
include_prefix = "nighthawk/user_defined_output",
deps = [
"//api/client:base_cc_proto",
"@envoy//envoy/buffer:buffer_interface",
"@envoy//envoy/common:base_includes",
"@envoy//source/common/common:statusor_lib_with_external_headers",
Expand Down
23 changes: 13 additions & 10 deletions include/nighthawk/user_defined_output/user_defined_output_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
#include "external/envoy/source/common/http/header_map_impl.h"
#include "external/envoy_api/envoy/config/core/v3/extension.pb.h"

#include "api/client/output.pb.h"

namespace Nighthawk {

// Information about a Nighthawk worker thread. May expand to contain more fields over time as
Expand Down Expand Up @@ -77,9 +79,8 @@ class UserDefinedOutputPlugin {
* processes.
*
* Plugins should return statuses for invalid data or when they fail to process the data. Any
* non-ok status will be logged and increment a counter that will be added to the worker Result.
* Callers can also provide a failure predicate for this counter that will abort the request
* after n plugin failures.
* non-ok status will be logged and included as a UserDefinedOutput with an error_message instead
* of a typed_output. Standard nighthawk processing will be unaffected.
*
* @return output Any-packed per_worker output to add to the worker's Result.
*/
Expand Down Expand Up @@ -117,12 +118,14 @@ class UserDefinedOutputPluginFactory : public Envoy::Config::TypedFactory {
* Aggregates the outputs from every worker's UserDefinedOutputPlugin instance into a global
* output, representing the cumulative data across all of the plugins combined.
*
* The protobuf type of the inputs and output must all be the same type.
* If a plugin returned an error when generating its per-worker output, it will still be included
* in per_worker_outputs as a UserDefinedOutput with an error message. It is up to the plugin
* author the correct thing to do on aggregation if one or more of the per worker outputs
* contains errors.
*
* This method should return statuses for invalid data or when they fail to process the data. Any
* non-ok status will be logged and increment a counter that will be added to the worker Result.
* Callers can also provide a failure predicate for this counter that will abort the request
* after n plugin failures.
* non-ok status will be logged and included as a UserDefinedOutput with an error_message instead
* of a typed_output. Standard nighthawk processing will be unaffected.
*
* Pseudocode Example:
* AggregateGlobalOutput(
Expand All @@ -132,12 +135,12 @@ class UserDefinedOutputPluginFactory : public Envoy::Config::TypedFactory {
* {int_value: 3, array_value: ["a","b","c"]}
*
* @param per_worker_outputs List of the outputs that every per-worker instance of the User
* Defined Output Plugin created.
* Defined Output Plugin created, including errors in generating that output.
* @return global_output Any-packed aggregated output to add to the global Result.
*/
virtual absl::StatusOr<Envoy::ProtobufWkt::Any>
AggregateGlobalOutput(absl::Span<const Envoy::ProtobufWkt::Any> per_worker_outputs) PURE;
virtual absl::StatusOr<Envoy::ProtobufWkt::Any> AggregateGlobalOutput(
absl::Span<const nighthawk::client::UserDefinedOutput> per_worker_outputs) PURE;
};

using UserDefinedOutputConfigFactoryPair =
Expand Down
4 changes: 2 additions & 2 deletions source/user_defined_output/log_response_headers_plugin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ LogResponseHeadersPluginFactory::createUserDefinedOutputPlugin(
return std::make_unique<LogResponseHeadersPlugin>(config, worker_metadata);
}

absl::StatusOr<Envoy::ProtobufWkt::Any>
LogResponseHeadersPluginFactory::AggregateGlobalOutput(absl::Span<const Envoy::ProtobufWkt::Any>) {
absl::StatusOr<Envoy::ProtobufWkt::Any> LogResponseHeadersPluginFactory::AggregateGlobalOutput(
absl::Span<const nighthawk::client::UserDefinedOutput>) {
return createEmptyOutput();
}

Expand Down
4 changes: 2 additions & 2 deletions source/user_defined_output/log_response_headers_plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class LogResponseHeadersPluginFactory : public UserDefinedOutputPluginFactory {
createUserDefinedOutputPlugin(const Envoy::ProtobufWkt::Any& config_any,
const WorkerMetadata& worker_metadata) override;

absl::StatusOr<Envoy::ProtobufWkt::Any>
AggregateGlobalOutput(absl::Span<const Envoy::ProtobufWkt::Any> per_worker_outputs) override;
absl::StatusOr<Envoy::ProtobufWkt::Any> AggregateGlobalOutput(
absl::Span<const nighthawk::client::UserDefinedOutput> per_worker_outputs) override;
};

DECLARE_FACTORY(LogResponseHeadersPluginFactory);
Expand Down
3 changes: 3 additions & 0 deletions test/benchmark_http_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ TEST_F(BenchmarkClientHttpTest, IncrementsCounterWhenUserDefinedPluginHandleHead
TEST_F(BenchmarkClientHttpTest, CallsUserDefinedPluginHandleData) {
RequestGenerator default_request_generator = getDefaultRequestGenerator();
Envoy::MockBuffer buffer;
buffer.add("notempty");
UserDefinedOutputPluginPtr plugin = CreateTestUserDefinedOutputPlugin(R"(
name: "nighthawk.fake_user_defined_output",
typed_config {
Expand All @@ -557,6 +558,7 @@ TEST_F(BenchmarkClientHttpTest, CallsUserDefinedPluginHandleData) {
TEST_F(BenchmarkClientHttpTest, IncrementsCounterWhenUserDefinedPluginHandleDataFails) {
RequestGenerator default_request_generator = getDefaultRequestGenerator();
Envoy::MockBuffer buffer;
buffer.add("notempty");
UserDefinedOutputPluginPtr plugin = CreateTestUserDefinedOutputPlugin(R"(
name: "nighthawk.fake_user_defined_output",
typed_config {
Expand Down Expand Up @@ -589,6 +591,7 @@ TEST_F(BenchmarkClientHttpTest, GetUserDefinedOutputResultsReturnsResults) {
{"test_header_name", "test_header_value"},
});
Envoy::MockBuffer buffer;
buffer.add("notempty");
UserDefinedOutputPluginPtr plugin = CreateTestUserDefinedOutputPlugin(R"(
name: "nighthawk.fake_user_defined_output",
typed_config {
Expand Down
2 changes: 1 addition & 1 deletion test/mocks/common/mock_termination_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ class MockTerminationPredicate : public TerminationPredicate {
MOCK_METHOD(TerminationPredicate::Status, evaluate, (), (override));
};

} // namespace Nighthawk
} // namespace Nighthawk
6 changes: 4 additions & 2 deletions test/user_defined_output/fake_plugin/BUILD
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
load("@envoy_api//bazel:api_build_system.bzl", "api_cc_py_proto_library")
load(
"@envoy//bazel:envoy_build_system.bzl",
"envoy_cc_library",
"envoy_cc_test",
"envoy_cc_test_library",
"envoy_package",
)

Expand All @@ -22,7 +22,7 @@ api_cc_py_proto_library(
],
)

envoy_cc_test_library(
envoy_cc_library(
name = "fake_user_defined_output",
srcs = [
"fake_user_defined_output.cc",
Expand All @@ -35,7 +35,9 @@ envoy_cc_test_library(
":fake_user_defined_output_proto_cc_proto",
"//include/nighthawk/user_defined_output:user_defined_output_plugin",
"@envoy//source/common/common:assert_lib_with_external_headers",
"@envoy//source/common/common:lock_guard_lib_with_external_headers",
"@envoy//source/common/common:statusor_lib_with_external_headers",
"@envoy//source/common/common:thread_lib_with_external_headers",
"@envoy//source/common/config:utility_lib_with_external_headers",
"@envoy//source/common/protobuf:protobuf_with_external_headers",
],
Expand Down
39 changes: 27 additions & 12 deletions test/user_defined_output/fake_plugin/fake_user_defined_output.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ FakeUserDefinedOutputPlugin::FakeUserDefinedOutputPlugin(FakeUserDefinedOutputCo

absl::Status
FakeUserDefinedOutputPlugin::handleResponseHeaders(const Envoy::Http::ResponseHeaderMap&) {
Envoy::Thread::LockGuard guard(lock_);
headers_called_++;
if (config_.fail_headers()) {
if (headers_called_ > config_.header_failure_countdown()) {
Expand All @@ -23,18 +24,24 @@ FakeUserDefinedOutputPlugin::handleResponseHeaders(const Envoy::Http::ResponseHe
return absl::OkStatus();
}

absl::Status FakeUserDefinedOutputPlugin::handleResponseData(const Envoy::Buffer::Instance&) {
absl::Status FakeUserDefinedOutputPlugin::handleResponseData(const Envoy::Buffer::Instance& data) {
Envoy::Thread::LockGuard guard(lock_);
if (data.toString().empty()) {
// TODO(950): handleResponseData seemingly gets called twice per request, once always empty,
// once with the expected data.
return absl::OkStatus();
}
data_called_++;
if (config_.fail_data()) {
if (data_called_ > config_.data_failure_countdown()) {
return absl::InternalError("Intentional FakeUserDefinedOutputPlugin failure on data");
}
}

return absl::OkStatus();
}

absl::StatusOr<Envoy::ProtobufWkt::Any> FakeUserDefinedOutputPlugin::getPerWorkerOutput() const {
Envoy::Thread::LockGuard guard(lock_);
if (config_.fail_per_worker_output()) {
return absl::InternalError(
"Intentional FakeUserDefinedOutputPlugin failure on getting PerWorkerOutput");
Expand Down Expand Up @@ -69,28 +76,36 @@ FakeUserDefinedOutputPluginFactory::createUserDefinedOutputPlugin(
}

absl::StatusOr<Envoy::ProtobufWkt::Any> FakeUserDefinedOutputPluginFactory::AggregateGlobalOutput(
absl::Span<const Envoy::ProtobufWkt::Any> per_worker_outputs) {
absl::Span<const nighthawk::client::UserDefinedOutput> per_worker_outputs) {
FakeUserDefinedOutput global_output;
global_output.set_worker_name("global");
int data_called = 0;
int headers_called = 0;
for (const Envoy::ProtobufWkt::Any& any : per_worker_outputs) {
FakeUserDefinedOutput output;
absl::Status status = Envoy::MessageUtil::unpackToNoThrow(any, output);
if (status.ok()) {
data_called += output.data_called();
headers_called += output.headers_called();
for (const nighthawk::client::UserDefinedOutput& user_defined_output : per_worker_outputs) {
if (user_defined_output.has_typed_output()) {
Envoy::ProtobufWkt::Any any = user_defined_output.typed_output();
FakeUserDefinedOutput output;
absl::Status status = Envoy::MessageUtil::unpackToNoThrow(any, output);
if (status.ok()) {
data_called += output.data_called();
headers_called += output.headers_called();
} else {
return status;
}
} else {
return status;
// This does not exit NH execution, but the UserDefinedOutput on the global output will return
// this error message instead of a typed_output.
return absl::InvalidArgumentError(
absl::StrCat("Cannot aggregate if any per_worker_outputs failed. See per worker outputs "
"for full failure information. First failure was: ",
user_defined_output.error_message()));
}
}

global_output.set_data_called(data_called);
global_output.set_headers_called(headers_called);

Envoy::ProtobufWkt::Any global_any;
global_any.PackFrom(global_output);

return global_any;
}

Expand Down
14 changes: 8 additions & 6 deletions test/user_defined_output/fake_plugin/fake_user_defined_output.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

#include "nighthawk/user_defined_output/user_defined_output_plugin.h"

#include "external/envoy/source/common/common/lock_guard.h"
#include "external/envoy/source/common/common/statusor.h"
#include "external/envoy/source/common/common/thread.h"

#include "api/client/options.pb.h"

Expand All @@ -19,8 +21,7 @@ namespace Nighthawk {
* This plugin should be used in tests to prove that plugins receive the correct calls and can
* handle failures appropriately.
*
* This class is not thread-safe.
* TODO(dubious90): This plugin must be thread safe.
* This class is thread-safe.
*/
class FakeUserDefinedOutputPlugin : public UserDefinedOutputPlugin {
public:
Expand Down Expand Up @@ -49,8 +50,9 @@ class FakeUserDefinedOutputPlugin : public UserDefinedOutputPlugin {
absl::StatusOr<Envoy::ProtobufWkt::Any> getPerWorkerOutput() const override;

private:
int data_called_ = 0;
int headers_called_ = 0;
mutable Envoy::Thread::MutexBasicLockable lock_;
int data_called_ ABSL_GUARDED_BY(lock_) = 0;
int headers_called_ ABSL_GUARDED_BY(lock_) = 0;
const nighthawk::FakeUserDefinedOutputConfig config_;
const WorkerMetadata worker_metadata_;
};
Expand All @@ -67,8 +69,8 @@ class FakeUserDefinedOutputPluginFactory : public UserDefinedOutputPluginFactory
createUserDefinedOutputPlugin(const Envoy::ProtobufWkt::Any& config_any,
const WorkerMetadata& worker_metadata) override;

absl::StatusOr<Envoy::ProtobufWkt::Any>
AggregateGlobalOutput(absl::Span<const Envoy::ProtobufWkt::Any> per_worker_outputs) override;
absl::StatusOr<Envoy::ProtobufWkt::Any> AggregateGlobalOutput(
absl::Span<const nighthawk::client::UserDefinedOutput> per_worker_outputs) override;

/**
* Returns the number of times this factory was called to make a plugin.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include "external/envoy/source/common/protobuf/protobuf.h"
#include "external/envoy/test/mocks/buffer/mocks.h"

#include "api/client/output.pb.h"

#include "test/test_common/proto_matchers.h"
#include "test/user_defined_output/fake_plugin/fake_user_defined_output.h"
#include "test/user_defined_output/fake_plugin/fake_user_defined_output.pb.h"
Expand Down Expand Up @@ -37,7 +39,8 @@ absl::StatusOr<UserDefinedOutputPluginPtr> CreatePlugin(const std::string& confi
return factory.createUserDefinedOutputPlugin(config_any, metadata);
}

Envoy::ProtobufWkt::Any CreateOutput(const std::string& textproto) {
// Packs a FakeUserDefinedOutput into an Any.
Envoy::ProtobufWkt::Any CreateOutputAny(const std::string& textproto) {
FakeUserDefinedOutput output;
TextFormat::ParseFromString(textproto, &output);

Expand All @@ -47,6 +50,14 @@ Envoy::ProtobufWkt::Any CreateOutput(const std::string& textproto) {
return output_any;
}

// Packs a FakeUserDefinedOutput into a UserDefinedOutput
nighthawk::client::UserDefinedOutput CreateUserDefinedOutput(const std::string& textproto) {
nighthawk::client::UserDefinedOutput output;
*output.mutable_plugin_name() = "nighthawk.fake_user_defined_output";
*output.mutable_typed_output() = CreateOutputAny(textproto);
return output;
}

TEST(FakeUserDefinedOutputPluginFactory, CreateEmptyConfigProtoCreatesCorrectType) {
auto& factory = Envoy::Config::Utility::getAndCheckFactoryByName<UserDefinedOutputPluginFactory>(
"nighthawk.fake_user_defined_output");
Expand Down Expand Up @@ -89,7 +100,7 @@ TEST(GetPerWorkerOutput, ReturnsCorrectWorkerNumber) {
absl::StatusOr<UserDefinedOutputPluginPtr> plugin = CreatePlugin("", /*worker_number=*/13);
ASSERT_TRUE(plugin.ok());

Envoy::ProtobufWkt::Any expected_output = CreateOutput(R"pb(
Envoy::ProtobufWkt::Any expected_output = CreateOutputAny(R"pb(
worker_name: "worker_13"
)pb");

Expand All @@ -113,7 +124,7 @@ TEST(HandleResponseHeaders, IncrementsHeadersCalledCount) {
EXPECT_TRUE((*plugin)->handleResponseHeaders(headers).ok());
EXPECT_TRUE((*plugin)->handleResponseHeaders(headers).ok());

Envoy::ProtobufWkt::Any expected_output = CreateOutput(R"pb(
Envoy::ProtobufWkt::Any expected_output = CreateOutputAny(R"pb(
headers_called: 2
worker_name: "worker_0"
)pb");
Expand All @@ -132,14 +143,18 @@ TEST(HandleResponseHeaders, FailsAfterCorrectIterationsIfConfigured) {
EXPECT_EQ((*plugin)->handleResponseHeaders(headers).code(), absl::StatusCode::kInternal);
}

TEST(HandleResponseData, IncrementsDataCalledCount) {
TEST(HandleResponseData, IncrementsDataCalledCountIfNotEmpty) {
absl::StatusOr<UserDefinedOutputPluginPtr> plugin = CreatePlugin("", /*worker_number=*/0);
ASSERT_TRUE(plugin.ok());
Envoy::MockBuffer buffer;
EXPECT_TRUE((*plugin)->handleResponseData(buffer).ok());
EXPECT_TRUE((*plugin)->handleResponseData(buffer).ok());

Envoy::ProtobufWkt::Any expected_output = CreateOutput(R"pb(
Envoy::MockBuffer filled_buffer;
filled_buffer.add("notempty");
Envoy::MockBuffer empty_buffer;
EXPECT_TRUE((*plugin)->handleResponseData(filled_buffer).ok());
EXPECT_TRUE((*plugin)->handleResponseData(filled_buffer).ok());
EXPECT_TRUE((*plugin)->handleResponseData(empty_buffer).ok());
EXPECT_TRUE((*plugin)->handleResponseData(empty_buffer).ok());

Envoy::ProtobufWkt::Any expected_output = CreateOutputAny(R"pb(
data_called: 2
worker_name: "worker_0"
)pb");
Expand All @@ -153,26 +168,27 @@ TEST(HandleResponseData, FailsAfterCorrectIterationsIfConfigured) {
CreatePlugin("fail_data: true data_failure_countdown: 2", /*worker_number=*/0);
ASSERT_TRUE(plugin.ok());
Envoy::MockBuffer buffer;
buffer.add("notempty");
EXPECT_TRUE((*plugin)->handleResponseData(buffer).ok());
EXPECT_TRUE((*plugin)->handleResponseData(buffer).ok());
EXPECT_EQ((*plugin)->handleResponseData(buffer).code(), absl::StatusCode::kInternal);
}

TEST(AggregateGlobalOutput, BuildsOutputsCorrectly) {
std::vector<Envoy::ProtobufWkt::Any> per_worker_outputs({
CreateOutput(R"pb(
std::vector<nighthawk::client::UserDefinedOutput> per_worker_outputs({
CreateUserDefinedOutput(R"pb(
data_called: 1
headers_called: 3
worker_name: "worker_0"
)pb"),
CreateOutput(R"pb(
CreateUserDefinedOutput(R"pb(
data_called: 5
headers_called: 7
worker_name: "worker_1"
)pb"),
});

Envoy::ProtobufWkt::Any expected_aggregate = CreateOutput(R"pb(
Envoy::ProtobufWkt::Any expected_aggregate = CreateOutputAny(R"pb(
data_called: 6
headers_called: 10
worker_name: "global"
Expand All @@ -191,7 +207,9 @@ TEST(AggregateGlobalOutput, FailsElegantlyWithIncorrectInput) {
Envoy::ProtobufWkt::Any invalid_any;
FakeUserDefinedOutputConfig wrong_type;
invalid_any.PackFrom(wrong_type);
std::vector<Envoy::ProtobufWkt::Any> per_worker_outputs = {invalid_any};
nighthawk::client::UserDefinedOutput user_defined_output;
*user_defined_output.mutable_typed_output() = invalid_any;
std::vector<nighthawk::client::UserDefinedOutput> per_worker_outputs = {user_defined_output};

auto& factory = Envoy::Config::Utility::getAndCheckFactoryByName<UserDefinedOutputPluginFactory>(
"nighthawk.fake_user_defined_output");
Expand All @@ -202,4 +220,4 @@ TEST(AggregateGlobalOutput, FailsElegantlyWithIncorrectInput) {
}

} // namespace
} // namespace Nighthawk
} // namespace Nighthawk

0 comments on commit a8cf502

Please sign in to comment.