Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC wasm as upstream filter #33119

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ WasmResult Context::setHeaderMapPairs(WasmHeaderMapType type, const Pairs& pairs
const Http::LowerCaseString lower_key{std::string(p.first)};
map->addCopy(lower_key, std::string(p.second));
}
if (type == WasmHeaderMapType::RequestHeaders && decoder_callbacks_) {
if (type == WasmHeaderMapType::RequestHeaders && decoder_callbacks_ && decoder_callbacks_->downstreamCallbacks()) {
decoder_callbacks_->downstreamCallbacks()->clearRouteCache();
}
return WasmResult::Ok;
Expand Down
12 changes: 8 additions & 4 deletions source/extensions/filters/http/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ namespace Extensions {
namespace HttpFilters {
namespace Wasm {

Http::FilterFactoryCb WasmFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::wasm::v3::Wasm& proto_config, const std::string&,
Server::Configuration::FactoryContext& context) {
context.serverFactoryContext().api().customStatNamespaces().registerStatNamespace(
absl::StatusOr<Http::FilterFactoryCb> WasmFilterConfig::createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::wasm::v3::Wasm& proto_config,
const std::string&, DualInfo,
Server::Configuration::ServerFactoryContext& context) {

context.api().customStatNamespaces().registerStatNamespace(
Extensions::Common::Wasm::CustomStatNamespace);
auto filter_config = std::make_shared<FilterConfig>(proto_config, context);
return [filter_config](Http::FilterChainFactoryCallbacks& callbacks) -> void {
Expand All @@ -33,6 +35,8 @@ Http::FilterFactoryCb WasmFilterConfig::createFilterFactoryFromProtoTyped(
* Static registration for the Wasm filter. @see RegisterFactory.
*/
REGISTER_FACTORY(WasmFilterConfig, Server::Configuration::NamedHttpFilterConfigFactory);
REGISTER_FACTORY(UpstreamWasmFilterConfig,
Server::Configuration::UpstreamHttpFilterConfigFactory);

} // namespace Wasm
} // namespace HttpFilters
Expand Down
13 changes: 8 additions & 5 deletions source/extensions/filters/http/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,19 @@ namespace Wasm {
* Config registration for the Wasm filter. @see NamedHttpFilterConfigFactory.
*/
class WasmFilterConfig
: public Common::FactoryBase<envoy::extensions::filters::http::wasm::v3::Wasm> {
: public Common::DualFactoryBase<envoy::extensions::filters::http::wasm::v3::Wasm> {
public:
WasmFilterConfig() : FactoryBase("envoy.filters.http.wasm") {}
WasmFilterConfig() : DualFactoryBase("envoy.filters.http.wasm") {}

private:
Http::FilterFactoryCb createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::wasm::v3::Wasm& proto_config, const std::string&,
Server::Configuration::FactoryContext& context) override;
absl::StatusOr<Http::FilterFactoryCb> createFilterFactoryFromProtoTyped(
const envoy::extensions::filters::http::wasm::v3::Wasm& proto_config,
const std::string& stats_prefix, DualInfo dual_info,
Server::Configuration::ServerFactoryContext& context) override;
};

using UpstreamWasmFilterConfig = WasmFilterConfig;

} // namespace Wasm
} // namespace HttpFilters
} // namespace Extensions
Expand Down
136 changes: 131 additions & 5 deletions source/extensions/filters/http/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,141 @@ namespace Extensions {
namespace HttpFilters {
namespace Wasm {

struct MyAsyncClientManager: public Grpc::AsyncClientManager{
absl::StatusOr<Grpc::RawAsyncClientSharedPtr>
getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& ,
Stats::Scope& , bool ) override {return absl::StatusOr<Grpc::RawAsyncClientSharedPtr>();};

absl::StatusOr<Grpc::RawAsyncClientSharedPtr>
getOrCreateRawAsyncClientWithHashKey(const Grpc::GrpcServiceConfigWithHashKey& ,
Stats::Scope& , bool ) override {return absl::StatusOr<Grpc::RawAsyncClientSharedPtr>();}

absl::StatusOr<Grpc::AsyncClientFactoryPtr>
factoryForGrpcService(const envoy::config::core::v3::GrpcService& ,
Stats::Scope& , bool ) override {return absl::StatusOr<Grpc::AsyncClientFactoryPtr>();}

};

class MyClusterManagerFactory: public Upstream::ClusterManagerFactory {
public:

Upstream::ClusterManagerPtr
clusterManagerFromProto(const envoy::config::bootstrap::v3::Bootstrap& ) override {return nullptr;}

virtual Http::ConnectionPool::InstancePtr
allocateConnPool(Event::Dispatcher& , Upstream::HostConstSharedPtr ,
Upstream::ResourcePriority , std::vector<Http::Protocol>& ,
const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
,
const Network::ConnectionSocket::OptionsSharedPtr& ,
const Network::TransportSocketOptionsConstSharedPtr& ,
TimeSource& , Upstream::ClusterConnectivityState& ,
Http::PersistentQuicInfoPtr& ) override {return nullptr;}

virtual Tcp::ConnectionPool::InstancePtr
allocateTcpConnPool(Event::Dispatcher& , Upstream::HostConstSharedPtr ,
Upstream::ResourcePriority ,
const Network::ConnectionSocket::OptionsSharedPtr& ,
Network::TransportSocketOptionsConstSharedPtr ,
Upstream::ClusterConnectivityState& ,
absl::optional<std::chrono::milliseconds> ) override {return nullptr;}

virtual absl::StatusOr<std::pair<Upstream::ClusterSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
clusterFromProto(const envoy::config::cluster::v3::Cluster& , Upstream::ClusterManager& ,
Upstream::Outlier::EventLoggerSharedPtr , bool ) override {return absl::StatusOr<std::pair<Upstream::ClusterSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>();};

virtual Upstream::CdsApiPtr createCds(const envoy::config::core::v3::ConfigSource& ,
const xds::core::v3::ResourceLocator* ,
Upstream::ClusterManager& ) override {return nullptr;}

virtual Secret::SecretManager& secretManager() override { throw 1; }

virtual Singleton::Manager& singletonManager() override { throw 1; }
};

struct MyClusterManager : public Upstream::ClusterManager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?

bool addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& ,const std::string& ) override {return true;}


const Upstream::ClusterLbStatNames& clusterLbStatNames() const override { throw 1; }
const Upstream::ClusterEndpointStatNames& clusterEndpointStatNames() const override {
throw 1;
}
const Upstream::ClusterLoadReportStatNames& clusterLoadReportStatNames() const override {
throw 1;
}
const Upstream::ClusterCircuitBreakersStatNames& clusterCircuitBreakersStatNames() const override {
throw 1;
}
const Upstream::ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const override {
throw 1;
}
const Upstream::ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override {
throw 1;
}
void drainConnections(const std::string& ,
DrainConnectionsHostPredicate ) override{};
void drainConnections(DrainConnectionsHostPredicate ) override{};
absl::Status checkActiveStaticCluster(const std::string& ) override{ return absl::OkStatus(); };
//void notifyMissingCluster(absl::string_view) override{};

std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig> getCommonLbConfigPtr(
const envoy::config::cluster::v3::Cluster::CommonLbConfig& ) override {
return nullptr;
}
Config::EdsResourcesCacheOptRef edsResourcesCache() override { return Config::EdsResourcesCacheOptRef();}
void setPrimaryClustersInitializedCb(PrimaryClustersReadyCallback ) override {}
void setInitializedCb(InitializationCompleteCallback ) override {}
bool removeCluster(const std::string& ) override {return true;}
void shutdown() override {}
bool isShutdown() override {return true;}
const absl::optional<std::string>& localClusterName() const override {return any;}

ClusterInfoMaps clusters() const override {return clusters_;}
const absl::optional<envoy::config::core::v3::BindConfig>& bindConfig() const override {return config_;}
Upstream::ThreadLocalCluster* getThreadLocalCluster(absl::string_view ) override {return nullptr;}
Grpc::AsyncClientManager& grpcAsyncClientManager() override {return async_manager;}
Config::GrpcMuxSharedPtr adsMux() override {return nullptr;}
Upstream::OdCdsApiHandlePtr
allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& ,
OptRef<xds::core::v3::ResourceLocator> ,
ProtobufMessage::ValidationVisitor& ) override {return nullptr;}
absl::Status
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& ) override {return absl::Status();}
const Upstream::ClusterConfigUpdateStatNames& clusterConfigUpdateStatNames() const override {throw 1;}
Upstream::ClusterUpdateCallbacksHandlePtr
addThreadLocalClusterUpdateCallbacks(Upstream::ClusterUpdateCallbacks& ) override {return nullptr;}
const Upstream::ClusterTrafficStatNames& clusterStatNames() const override {throw 1;}
const ClusterSet& primaryClusters() override {return clusterset_;}
Upstream::ClusterManagerFactory& clusterManagerFactory() override { return c_manager_; }
Config::SubscriptionFactory& subscriptionFactory() override { throw 1;}

//Upstream::ClusterConfigUpdateStatNames cluster_config_update_stat_names_;
//Upstream::ClusterLbStatNames cluster_lb_stat_names_;
//Upstream::ClusterEndpointStatNames cluster_endpoint_stat_names_;
//Upstream::ClusterLoadReportStatNames cluster_load_report_stat_names_;
//Upstream::ClusterCircuitBreakersStatNames cluster_circuit_breakers_stat_names_;
//Upstream::ClusterRequestResponseSizeStatNames cluster_request_response_size_stat_names_;
//Upstream::ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_;
//Upstream::ClusterTrafficStatNames cluster_traffic_stat_name_;
absl::optional<std::string> any{""};

ClusterInfoMaps clusters_;
ClusterSet clusterset_;
absl::optional<envoy::config::core::v3::BindConfig> config_;
MyAsyncClientManager async_manager;
MyClusterManagerFactory c_manager_;
} mycluster;

FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Wasm& config,
Server::Configuration::FactoryContext& context) {
auto& server = context.serverFactoryContext();
Server::Configuration::ServerFactoryContext& context) {
auto& server = context;
tls_slot_ = ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique(
server.threadLocal());

const auto plugin = std::make_shared<Common::Wasm::Plugin>(
config.config(), context.listenerInfo().direction(), server.localInfo(),
&context.listenerInfo().metadata());
config.config(), envoy::config::core::v3::TrafficDirection::INBOUND, server.localInfo(),
nullptr);

auto callback = [plugin, this](const Common::Wasm::WasmHandleSharedPtr& base_wasm) {
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
Expand All @@ -23,7 +149,7 @@ FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Was
});
};

if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), server.clusterManager(),
if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), mycluster /*server.clusterManager()*/,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the main issue I found. server.clusterManager() assert due to the manager is still not initialized (I guess) at this moment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this assert failing? was it a manual run of Envoy with upstream filter? If it's in a unit test, the unit test is probably just not set up correctly. if you add an e2e test with wasm upstream filters it will give me something Ican patch and run.

Usually I suggest folks doing upstream filter work parameterize the e2e test using prependFilter like we did for test/integration/filter_integration_test.cc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alyssawilk Just creating the filter in normal envoy work, it tried to get the cluster manager from the server factory and it is not initialized. I guess @yanjunxiang-google get the same issue while working to convert composite filter and propose a change to solve it #33221

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's the same issue: #33218, which is resolved now.

context.initManager(), server.mainThreadDispatcher(), server.api(),
server.lifecycleNotifier(), remote_data_provider_,
std::move(callback))) {
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/wasm/wasm_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ using Envoy::Extensions::Common::Wasm::Wasm;
class FilterConfig : Logger::Loggable<Logger::Id::wasm> {
public:
FilterConfig(const envoy::extensions::filters::http::wasm::v3::Wasm& config,
Server::Configuration::FactoryContext& context);
Server::Configuration::ServerFactoryContext& context);

std::shared_ptr<Context> createFilter() {
Wasm* wasm = nullptr;
Expand Down
26 changes: 25 additions & 1 deletion test/extensions/common/aws/credentials_provider_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ TEST_F(EvironmentCredentialsProviderTest, NoSessionToken) {
class CredentialsFileCredentialsProviderTest : public testing::Test {
public:
CredentialsFileCredentialsProviderTest()
: api_(Api::createApiForTest(time_system_)), provider_(*api_) {}
: api_(Api::createApiForTest(time_system_)), provider_(*api_, "") {}

~CredentialsFileCredentialsProviderTest() override {
TestEnvironment::unsetEnvVar("AWS_SHARED_CREDENTIALS_FILE");
Expand Down Expand Up @@ -183,6 +183,30 @@ TEST_F(CredentialsFileCredentialsProviderTest, DefaultCredentialsFile) {
EXPECT_EQ("profile1_token", credentials.sessionToken().value());
}

TEST_F(CredentialsFileCredentialsProviderTest, CustomProfileFromConfigShouldBeHonored) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore this, belong to other Patch i was working on

auto file_path =
TestEnvironment::writeStringToFileForTest(CREDENTIALS_FILE, CREDENTIALS_FILE_CONTENTS);
TestEnvironment::setEnvVar("AWS_SHARED_CREDENTIALS_FILE", file_path, 1);

auto provider = CredentialsFileCredentialsProvider(*api_, "profile4");
const auto credentials = provider.getCredentials();
EXPECT_EQ("profile4_access_key", credentials.accessKeyId().value());
EXPECT_EQ("profile4_secret", credentials.secretAccessKey().value());
EXPECT_EQ("profile4_token", credentials.sessionToken().value());
}

TEST_F(CredentialsFileCredentialsProviderTest, UnexistingCustomProfileFomConfig) {
auto file_path =
TestEnvironment::writeStringToFileForTest(CREDENTIALS_FILE, CREDENTIALS_FILE_CONTENTS);
TestEnvironment::setEnvVar("AWS_SHARED_CREDENTIALS_FILE", file_path, 1);

auto provider = CredentialsFileCredentialsProvider(*api_, "unexistening_profile");
const auto credentials = provider.getCredentials();
EXPECT_FALSE(credentials.accessKeyId().has_value());
EXPECT_FALSE(credentials.secretAccessKey().has_value());
EXPECT_FALSE(credentials.sessionToken().has_value());
}

TEST_F(CredentialsFileCredentialsProviderTest, ProfileDoesNotExist) {
setUpTest(CREDENTIALS_FILE_CONTENTS, "invalid_profile");

Expand Down
Loading