-
Notifications
You must be signed in to change notification settings - Fork 4.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC wasm as upstream filter #33119
POC wasm as upstream filter #33119
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,15 +5,141 @@ namespace Extensions { | |
namespace HttpFilters { | ||
namespace Wasm { | ||
|
||
struct MyAsyncClientManager: public Grpc::AsyncClientManager{ | ||
absl::StatusOr<Grpc::RawAsyncClientSharedPtr> | ||
getOrCreateRawAsyncClient(const envoy::config::core::v3::GrpcService& , | ||
Stats::Scope& , bool ) override {return absl::StatusOr<Grpc::RawAsyncClientSharedPtr>();}; | ||
|
||
absl::StatusOr<Grpc::RawAsyncClientSharedPtr> | ||
getOrCreateRawAsyncClientWithHashKey(const Grpc::GrpcServiceConfigWithHashKey& , | ||
Stats::Scope& , bool ) override {return absl::StatusOr<Grpc::RawAsyncClientSharedPtr>();} | ||
|
||
absl::StatusOr<Grpc::AsyncClientFactoryPtr> | ||
factoryForGrpcService(const envoy::config::core::v3::GrpcService& , | ||
Stats::Scope& , bool ) override {return absl::StatusOr<Grpc::AsyncClientFactoryPtr>();} | ||
|
||
}; | ||
|
||
class MyClusterManagerFactory: public Upstream::ClusterManagerFactory { | ||
public: | ||
|
||
Upstream::ClusterManagerPtr | ||
clusterManagerFromProto(const envoy::config::bootstrap::v3::Bootstrap& ) override {return nullptr;} | ||
|
||
virtual Http::ConnectionPool::InstancePtr | ||
allocateConnPool(Event::Dispatcher& , Upstream::HostConstSharedPtr , | ||
Upstream::ResourcePriority , std::vector<Http::Protocol>& , | ||
const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>& | ||
, | ||
const Network::ConnectionSocket::OptionsSharedPtr& , | ||
const Network::TransportSocketOptionsConstSharedPtr& , | ||
TimeSource& , Upstream::ClusterConnectivityState& , | ||
Http::PersistentQuicInfoPtr& ) override {return nullptr;} | ||
|
||
virtual Tcp::ConnectionPool::InstancePtr | ||
allocateTcpConnPool(Event::Dispatcher& , Upstream::HostConstSharedPtr , | ||
Upstream::ResourcePriority , | ||
const Network::ConnectionSocket::OptionsSharedPtr& , | ||
Network::TransportSocketOptionsConstSharedPtr , | ||
Upstream::ClusterConnectivityState& , | ||
absl::optional<std::chrono::milliseconds> ) override {return nullptr;} | ||
|
||
virtual absl::StatusOr<std::pair<Upstream::ClusterSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> | ||
clusterFromProto(const envoy::config::cluster::v3::Cluster& , Upstream::ClusterManager& , | ||
Upstream::Outlier::EventLoggerSharedPtr , bool ) override {return absl::StatusOr<std::pair<Upstream::ClusterSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>();}; | ||
|
||
virtual Upstream::CdsApiPtr createCds(const envoy::config::core::v3::ConfigSource& , | ||
const xds::core::v3::ResourceLocator* , | ||
Upstream::ClusterManager& ) override {return nullptr;} | ||
|
||
virtual Secret::SecretManager& secretManager() override { throw 1; } | ||
|
||
virtual Singleton::Manager& singletonManager() override { throw 1; } | ||
}; | ||
|
||
struct MyClusterManager : public Upstream::ClusterManager { | ||
bool addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& ,const std::string& ) override {return true;} | ||
|
||
|
||
const Upstream::ClusterLbStatNames& clusterLbStatNames() const override { throw 1; } | ||
const Upstream::ClusterEndpointStatNames& clusterEndpointStatNames() const override { | ||
throw 1; | ||
} | ||
const Upstream::ClusterLoadReportStatNames& clusterLoadReportStatNames() const override { | ||
throw 1; | ||
} | ||
const Upstream::ClusterCircuitBreakersStatNames& clusterCircuitBreakersStatNames() const override { | ||
throw 1; | ||
} | ||
const Upstream::ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const override { | ||
throw 1; | ||
} | ||
const Upstream::ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override { | ||
throw 1; | ||
} | ||
void drainConnections(const std::string& , | ||
DrainConnectionsHostPredicate ) override{}; | ||
void drainConnections(DrainConnectionsHostPredicate ) override{}; | ||
absl::Status checkActiveStaticCluster(const std::string& ) override{ return absl::OkStatus(); }; | ||
//void notifyMissingCluster(absl::string_view) override{}; | ||
|
||
std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig> getCommonLbConfigPtr( | ||
const envoy::config::cluster::v3::Cluster::CommonLbConfig& ) override { | ||
return nullptr; | ||
} | ||
Config::EdsResourcesCacheOptRef edsResourcesCache() override { return Config::EdsResourcesCacheOptRef();} | ||
void setPrimaryClustersInitializedCb(PrimaryClustersReadyCallback ) override {} | ||
void setInitializedCb(InitializationCompleteCallback ) override {} | ||
bool removeCluster(const std::string& ) override {return true;} | ||
void shutdown() override {} | ||
bool isShutdown() override {return true;} | ||
const absl::optional<std::string>& localClusterName() const override {return any;} | ||
|
||
ClusterInfoMaps clusters() const override {return clusters_;} | ||
const absl::optional<envoy::config::core::v3::BindConfig>& bindConfig() const override {return config_;} | ||
Upstream::ThreadLocalCluster* getThreadLocalCluster(absl::string_view ) override {return nullptr;} | ||
Grpc::AsyncClientManager& grpcAsyncClientManager() override {return async_manager;} | ||
Config::GrpcMuxSharedPtr adsMux() override {return nullptr;} | ||
Upstream::OdCdsApiHandlePtr | ||
allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& , | ||
OptRef<xds::core::v3::ResourceLocator> , | ||
ProtobufMessage::ValidationVisitor& ) override {return nullptr;} | ||
absl::Status | ||
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& ) override {return absl::Status();} | ||
const Upstream::ClusterConfigUpdateStatNames& clusterConfigUpdateStatNames() const override {throw 1;} | ||
Upstream::ClusterUpdateCallbacksHandlePtr | ||
addThreadLocalClusterUpdateCallbacks(Upstream::ClusterUpdateCallbacks& ) override {return nullptr;} | ||
const Upstream::ClusterTrafficStatNames& clusterStatNames() const override {throw 1;} | ||
const ClusterSet& primaryClusters() override {return clusterset_;} | ||
Upstream::ClusterManagerFactory& clusterManagerFactory() override { return c_manager_; } | ||
Config::SubscriptionFactory& subscriptionFactory() override { throw 1;} | ||
|
||
//Upstream::ClusterConfigUpdateStatNames cluster_config_update_stat_names_; | ||
//Upstream::ClusterLbStatNames cluster_lb_stat_names_; | ||
//Upstream::ClusterEndpointStatNames cluster_endpoint_stat_names_; | ||
//Upstream::ClusterLoadReportStatNames cluster_load_report_stat_names_; | ||
//Upstream::ClusterCircuitBreakersStatNames cluster_circuit_breakers_stat_names_; | ||
//Upstream::ClusterRequestResponseSizeStatNames cluster_request_response_size_stat_names_; | ||
//Upstream::ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_; | ||
//Upstream::ClusterTrafficStatNames cluster_traffic_stat_name_; | ||
absl::optional<std::string> any{""}; | ||
|
||
ClusterInfoMaps clusters_; | ||
ClusterSet clusterset_; | ||
absl::optional<envoy::config::core::v3::BindConfig> config_; | ||
MyAsyncClientManager async_manager; | ||
MyClusterManagerFactory c_manager_; | ||
} mycluster; | ||
|
||
FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Wasm& config, | ||
Server::Configuration::FactoryContext& context) { | ||
auto& server = context.serverFactoryContext(); | ||
Server::Configuration::ServerFactoryContext& context) { | ||
auto& server = context; | ||
tls_slot_ = ThreadLocal::TypedSlot<Common::Wasm::PluginHandleSharedPtrThreadLocal>::makeUnique( | ||
server.threadLocal()); | ||
|
||
const auto plugin = std::make_shared<Common::Wasm::Plugin>( | ||
config.config(), context.listenerInfo().direction(), server.localInfo(), | ||
&context.listenerInfo().metadata()); | ||
config.config(), envoy::config::core::v3::TrafficDirection::INBOUND, server.localInfo(), | ||
nullptr); | ||
|
||
auto callback = [plugin, this](const Common::Wasm::WasmHandleSharedPtr& base_wasm) { | ||
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call. | ||
|
@@ -23,7 +149,7 @@ FilterConfig::FilterConfig(const envoy::extensions::filters::http::wasm::v3::Was | |
}); | ||
}; | ||
|
||
if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), server.clusterManager(), | ||
if (!Common::Wasm::createWasm(plugin, context.scope().createScope(""), mycluster /*server.clusterManager()*/, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here is the main issue I found. server.clusterManager() assert due to the manager is still not initialized (I guess) at this moment There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where is this assert failing? was it a manual run of Envoy with upstream filter? If it's in a unit test, the unit test is probably just not set up correctly. if you add an e2e test with wasm upstream filters it will give me something Ican patch and run. Usually I suggest folks doing upstream filter work parameterize the e2e test using prependFilter like we did for test/integration/filter_integration_test.cc There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @alyssawilk Just creating the filter in normal envoy work, it tried to get the cluster manager from the server factory and it is not initialized. I guess @yanjunxiang-google get the same issue while working to convert composite filter and propose a change to solve it #33221 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's the same issue: #33218, which is resolved now. |
||
context.initManager(), server.mainThreadDispatcher(), server.api(), | ||
server.lifecycleNotifier(), remote_data_provider_, | ||
std::move(callback))) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,7 +139,7 @@ TEST_F(EvironmentCredentialsProviderTest, NoSessionToken) { | |
class CredentialsFileCredentialsProviderTest : public testing::Test { | ||
public: | ||
CredentialsFileCredentialsProviderTest() | ||
: api_(Api::createApiForTest(time_system_)), provider_(*api_) {} | ||
: api_(Api::createApiForTest(time_system_)), provider_(*api_, "") {} | ||
|
||
~CredentialsFileCredentialsProviderTest() override { | ||
TestEnvironment::unsetEnvVar("AWS_SHARED_CREDENTIALS_FILE"); | ||
|
@@ -183,6 +183,30 @@ TEST_F(CredentialsFileCredentialsProviderTest, DefaultCredentialsFile) { | |
EXPECT_EQ("profile1_token", credentials.sessionToken().value()); | ||
} | ||
|
||
TEST_F(CredentialsFileCredentialsProviderTest, CustomProfileFromConfigShouldBeHonored) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ignore this, belong to other Patch i was working on |
||
auto file_path = | ||
TestEnvironment::writeStringToFileForTest(CREDENTIALS_FILE, CREDENTIALS_FILE_CONTENTS); | ||
TestEnvironment::setEnvVar("AWS_SHARED_CREDENTIALS_FILE", file_path, 1); | ||
|
||
auto provider = CredentialsFileCredentialsProvider(*api_, "profile4"); | ||
const auto credentials = provider.getCredentials(); | ||
EXPECT_EQ("profile4_access_key", credentials.accessKeyId().value()); | ||
EXPECT_EQ("profile4_secret", credentials.secretAccessKey().value()); | ||
EXPECT_EQ("profile4_token", credentials.sessionToken().value()); | ||
} | ||
|
||
TEST_F(CredentialsFileCredentialsProviderTest, UnexistingCustomProfileFomConfig) { | ||
auto file_path = | ||
TestEnvironment::writeStringToFileForTest(CREDENTIALS_FILE, CREDENTIALS_FILE_CONTENTS); | ||
TestEnvironment::setEnvVar("AWS_SHARED_CREDENTIALS_FILE", file_path, 1); | ||
|
||
auto provider = CredentialsFileCredentialsProvider(*api_, "unexistening_profile"); | ||
const auto credentials = provider.getCredentials(); | ||
EXPECT_FALSE(credentials.accessKeyId().has_value()); | ||
EXPECT_FALSE(credentials.secretAccessKey().has_value()); | ||
EXPECT_FALSE(credentials.sessionToken().has_value()); | ||
} | ||
|
||
TEST_F(CredentialsFileCredentialsProviderTest, ProfileDoesNotExist) { | ||
setUpTest(CREDENTIALS_FILE_CONTENTS, "invalid_profile"); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used?