Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

v1.28 Backports 2024-06-01 #786

Merged
merged 8 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ git_repository(
"@//patches:0002-upstream-Add-callback-for-upstream-authorization.patch",
"@//patches:0003-tcp_proxy-Add-filter-state-proxy_read_before_connect.patch",
"@//patches:0004-listener-add-socket-options.patch",
"@//patches:0005-original_dst_cluster-Avoid-multiple-hosts-for-the-sa.patch",
],
# // clang-format off: Envoy's format check: Only repository_locations.bzl may contains URL references
remote = "https://github.com/envoyproxy/envoy.git",
Expand Down
1 change: 1 addition & 0 deletions cilium/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ envoy_cc_library(
"//cilium:accesslog_lib",
"//cilium:conntrack_lib",
"//cilium:grpc_subscription_lib",
"//cilium:ipcache_lib",
"//cilium/api:npds_cc_proto",
"@envoy//envoy/config:subscription_interface",
"@envoy//envoy/singleton:manager_interface",
Expand Down
13 changes: 13 additions & 0 deletions cilium/bpf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ void Bpf::close() {
bool Bpf::open(const std::string& path) {
bool log_on_error = ENVOY_LOG_CHECK_LEVEL(trace);

// close old fd if any
close();

// store the path for later
if (path != path_)
path_ = path;

auto& cilium_calls = PrivilegedService::Singleton::get();
auto ret = cilium_calls.bpf_open(path.c_str());
fd_ = ret.return_value_;
Expand Down Expand Up @@ -100,6 +107,12 @@ bool Bpf::open(const std::string& path) {
}

bool Bpf::lookup(const void* key, void* value) {
// Try reopen if open failed previously
if (fd_ < 0) {
if (!open(path_))
return false;
}

auto& cilium_calls = PrivilegedService::Singleton::get();
auto result = cilium_calls.bpf_lookup(fd_, key, key_size_, value, value_size_);

Expand Down
1 change: 1 addition & 0 deletions cilium/bpf.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class Bpf : public Logger::Loggable<Logger::Id::filter> {
bool lookup(const void* key, void* value);

protected:
std::string path_;
int fd_;

public:
Expand Down
21 changes: 5 additions & 16 deletions cilium/bpf_metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ namespace BpfMetadata {
// Singleton registration via macro defined in envoy/singleton/manager.h
SINGLETON_MANAGER_REGISTRATION(cilium_bpf_conntrack);
SINGLETON_MANAGER_REGISTRATION(cilium_host_map);
SINGLETON_MANAGER_REGISTRATION(cilium_ipcache);
SINGLETON_MANAGER_REGISTRATION(cilium_network_policy);

namespace {
Expand All @@ -97,7 +96,7 @@ createPolicyMap(Server::Configuration::FactoryContext& context, Cilium::CtMapSha
return context.singletonManager().getTyped<const Cilium::NetworkPolicyMap>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_network_policy), [&context, &ct] {
auto map = std::make_shared<Cilium::NetworkPolicyMap>(context, ct);
map->startSubscription(context);
map->startSubscription();
return map;
});
}
Expand Down Expand Up @@ -142,14 +141,7 @@ Config::Config(const ::cilium::BpfMetadata& config,
// later.
return std::make_shared<Cilium::CtMap>(bpf_root);
});
ipcache_ = context.singletonManager().getTyped<Cilium::IPCache>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_ipcache), [&bpf_root] {
auto ipcache = std::make_shared<Cilium::IPCache>(bpf_root);
if (!ipcache->Open()) {
ipcache.reset();
}
return ipcache;
});
ipcache_ = IPCache::NewIPCache(context.getServerFactoryContext(), bpf_root);
if (bpf_root != ct_maps_->bpfRoot()) {
// bpf root may not change during runtime
throw EnvoyException(fmt::format("cilium.bpf_metadata: Invalid bpf_root: {}", bpf_root));
Expand Down Expand Up @@ -384,12 +376,9 @@ bool Config::getMetadata(Network::ConnectionSocket& socket) {
// This means that a local host IP is used if no IP is configured to be used instead of it
// ('ip' above is null).
src_address = nullptr;
} else if (!(use_original_source_address_ &&
!(destination_identity & Cilium::ID::LocalIdentityFlag) &&
destination_identity != Cilium::ID::WORLD && !npmap_->exists(other_ip))) {
// Otherwise only use the original source address if permitted, destination identity is not a
// locally allocated identity, is not classified as WORLD, and the destination is not in the
// same node.
} else if (!use_original_source_address_ || npmap_->exists(other_ip)) {
// Otherwise only use the original source address if permitted and the destination is not
// in the same node.

// Original source address is not used
src_address = nullptr;
Expand Down
4 changes: 2 additions & 2 deletions cilium/grpc_subscription.cc
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
};

return std::make_unique<Config::GrpcSubscriptionImpl>(
std::make_shared<Config::GrpcMuxImpl>(grpc_mux_context,
api_config_source.set_node_on_first_message_only()),
std::make_shared<GrpcMuxImpl>(grpc_mux_context,
api_config_source.set_node_on_first_message_only()),
callbacks, resource_decoder, stats, type_url, dispatcher, init_fetch_timeout,
/*is_aggregated*/ false, options);
}
Expand Down
25 changes: 25 additions & 0 deletions cilium/grpc_subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "envoy/local_info/local_info.h"
#include "envoy/upstream/cluster_manager.h"

#include "source/extensions/config_subscription/grpc/grpc_mux_impl.h"
#include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"

namespace Envoy {
Expand All @@ -16,6 +17,30 @@ namespace Cilium {
// Cilium XDS API config source. Used for all Cilium XDS.
extern envoy::config::core::v3::ConfigSource cilium_xds_api_config;

// GrpcMux wrapper to get access to control plane identifier
class GrpcMuxImpl : public Config::GrpcMuxImpl {
public:
GrpcMuxImpl(Config::GrpcMuxContext& grpc_mux_context, bool skip_subsequent_node)
: Config::GrpcMuxImpl(grpc_mux_context, skip_subsequent_node) {}

~GrpcMuxImpl() override {}

void onStreamEstablished() override {
new_stream_ = true;
Config::GrpcMuxImpl::onStreamEstablished();
}

// isNewStream returns true for the first call after a new stream has been established
bool isNewStream() {
bool new_stream = new_stream_;
new_stream_ = false;
return new_stream;
}

private:
bool new_stream_ = true;
};

std::unique_ptr<Config::GrpcSubscriptionImpl>
subscribe(const std::string& type_url, const LocalInfo::LocalInfo& local_info,
Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher,
Expand Down
20 changes: 20 additions & 0 deletions cilium/ipcache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <arpa/inet.h>

#include "envoy/common/platform.h"
#include "envoy/singleton/manager.h"

#include "source/common/common/utility.h"

Expand Down Expand Up @@ -47,6 +48,25 @@ struct remote_endpoint_info {
#define ENDPOINT_KEY_IPV4 1
#define ENDPOINT_KEY_IPV6 2

SINGLETON_MANAGER_REGISTRATION(cilium_ipcache);

IPCacheSharedPtr IPCache::NewIPCache(Server::Configuration::ServerFactoryContext& context,
const std::string& bpf_root) {
return context.singletonManager().getTyped<Cilium::IPCache>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_ipcache), [&bpf_root] {
auto ipcache = std::make_shared<Cilium::IPCache>(bpf_root);
if (!ipcache->Open()) {
ipcache.reset();
}
return ipcache;
});
}

IPCacheSharedPtr IPCache::GetIPCache(Server::Configuration::ServerFactoryContext& context) {
return context.singletonManager().getTyped<Cilium::IPCache>(
SINGLETON_MANAGER_REGISTERED_NAME(cilium_ipcache));
}

IPCache::IPCache(const std::string& bpf_root)
: Bpf(BPF_MAP_TYPE_LPM_TRIE, sizeof(struct ipcache_key), sizeof(struct remote_endpoint_info)),
bpf_root_(bpf_root) {}
Expand Down
5 changes: 5 additions & 0 deletions cilium/ipcache.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "envoy/network/address.h"
#include "envoy/server/factory_context.h"
#include "envoy/singleton/instance.h"

#include "source/common/common/logger.h"
Expand All @@ -12,6 +13,10 @@ namespace Cilium {

class IPCache : public Singleton::Instance, public Bpf {
public:
static std::shared_ptr<IPCache> NewIPCache(Server::Configuration::ServerFactoryContext& context,
const std::string& bpf_root);
static std::shared_ptr<IPCache> GetIPCache(Server::Configuration::ServerFactoryContext& context);

IPCache(const std::string& bpf_root);
bool Open();

Expand Down
55 changes: 41 additions & 14 deletions cilium/network_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "absl/container/flat_hash_set.h"
#include "absl/container/node_hash_map.h"
#include "cilium/grpc_subscription.h"
#include "cilium/ipcache.h"
#include "cilium/secret_watcher.h"

namespace Envoy {
Expand Down Expand Up @@ -1063,28 +1064,27 @@ class PolicyInstanceImpl : public PolicyInstance {
// Common base constructor
// This is used directly for testing with a file-based subscription
NetworkPolicyMap::NetworkPolicyMap(Server::Configuration::FactoryContext& context)
: tls_map_(context.threadLocal()),
local_ip_str_(context.localInfo().address()->ip()->addressAsString()),
: context_(context.getServerFactoryContext()), tls_map_(context_.threadLocal()),
local_ip_str_(context_.localInfo().address()->ip()->addressAsString()),
name_(fmt::format("cilium.policymap.{}.{}.", local_ip_str_, ++instance_id_)),
scope_(context.serverScope().createScope(name_)),
scope_(context_.serverScope().createScope(name_)),
init_target_(fmt::format("Cilium Network Policy subscription start"),
[this]() { subscription_->start({}); }),
transport_factory_context_(
std::make_shared<Server::Configuration::TransportSocketFactoryContextImpl>(
context.getServerFactoryContext(),
context.getTransportSocketFactoryContext().sslContextManager(), *scope_,
context.getServerFactoryContext().clusterManager(),
context.messageValidationContext().dynamicValidationVisitor())) {
context_, context.getTransportSocketFactoryContext().sslContextManager(), *scope_,
context_.clusterManager(),
context_.messageValidationContext().dynamicValidationVisitor())) {
// Use listener init manager for the first initialization
transport_factory_context_->setInitManager(context.initManager());
context.initManager().add(init_target_);

ENVOY_LOG(trace, "NetworkPolicyMap({}) created.", name_);
tls_map_.set([&](Event::Dispatcher&) { return std::make_shared<ThreadLocalPolicyMap>(); });

if (context.admin().has_value()) {
if (context_.admin().has_value()) {
ENVOY_LOG(debug, "Registering NetworkPolicies to config tracker");
config_tracker_entry_ = context.admin()->getConfigTracker().add(
config_tracker_entry_ = context_.admin()->getConfigTracker().add(
"networkpolicies", [this](const Matchers::StringMatcher& name_matcher) {
return dumpNetworkPolicyConfigs(name_matcher);
});
Expand All @@ -1103,10 +1103,10 @@ NetworkPolicyMap::NetworkPolicyMap(Server::Configuration::FactoryContext& contex
// shared_from_this(), which cannot be called before a shared
// pointer is formed by the caller of the constructor, hence this
// can't be called from the constructor!
void NetworkPolicyMap::startSubscription(Server::Configuration::FactoryContext& context) {
subscription_ = subscribe("type.googleapis.com/cilium.NetworkPolicy", context.localInfo(),
context.clusterManager(), context.mainThreadDispatcher(),
context.api().randomGenerator(), *scope_, *this,
void NetworkPolicyMap::startSubscription() {
subscription_ = subscribe("type.googleapis.com/cilium.NetworkPolicy", context_.localInfo(),
context_.clusterManager(), context_.mainThreadDispatcher(),
context_.api().randomGenerator(), *scope_, *this,
std::make_shared<NetworkPolicyDecoder>());
}

Expand Down Expand Up @@ -1135,6 +1135,20 @@ void NetworkPolicyMap::pause() {

void NetworkPolicyMap::resume() { resume_.reset(); }

bool NetworkPolicyMap::isNewStream() {
auto sub = dynamic_cast<Config::GrpcSubscriptionImpl*>(subscription_.get());
if (!sub) {
ENVOY_LOG(error, "Cilium NetworkPolicyMap: Cannot get GrpcSubscriptionImpl");
return false;
}
auto mux = dynamic_cast<GrpcMuxImpl*>(sub->grpcMux().get());
if (!mux) {
ENVOY_LOG(error, "Cilium NetworkPolicyMap: Cannot get GrpcMuxImpl");
return false;
}
return mux->isNewStream();
}

void ThreadLocalPolicyMap::Update(std::vector<std::shared_ptr<PolicyInstanceImpl>>& added,
std::vector<std::string>& deleted,
const std::string& version_info) {
Expand Down Expand Up @@ -1241,6 +1255,19 @@ NetworkPolicyMap::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourc
// First setting of this also causes future updates to use the local init manager.
version_init_target_ = std::make_shared<Init::TargetImpl>(version_name, []() {});

// Reopen IPcache for every new stream. Cilium agent re-creates IP cache on restart,
// and that is also when the old stream terminates and a new one is created.
// New security identities (e.g., for FQDN policies) only get inserted to the new IP cache,
// so open it before the workers get a chance to enforce policy on the new IDs.
if (isNewStream()) {
// Get ipcache singleton only if it was successfully created previously
IPCacheSharedPtr ipcache = IPCache::GetIPCache(context_);
if (ipcache != nullptr) {
ENVOY_LOG(trace, "Reopening ipcache on new stream");
ipcache->Open();
}
}

// Skip pausing if nothing to be done
if (to_be_added->size() == 0 && to_be_deleted->size() == 0 && cts_to_be_closed->size() == 0) {
ENVOY_LOG(trace, "Skipping empty or duplicate policy update.");
Expand Down Expand Up @@ -1270,7 +1297,7 @@ NetworkPolicyMap::onConfigUpdate(const std::vector<Envoy::Config::DecodedResourc
ENVOY_LOG(trace, "Resuming NPDS subscription");
shared_this->resume();
} else {
ENVOY_LOG_MISC(debug, "NetworkPolicyMap expired on watcher completion!");
ENVOY_LOG(debug, "NetworkPolicyMap expired on watcher completion!");
}
});

Expand Down
5 changes: 4 additions & 1 deletion cilium/network_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class NetworkPolicyMap : public Singleton::Instance,
// shared_from_this(), which cannot be called before a shared
// pointer is formed by the caller of the constructor, hence this
// can't be called from the constructor!
void startSubscription(Server::Configuration::FactoryContext& context);
void startSubscription();

// This is used for testing with a file-based subscription
void startSubscription(std::unique_ptr<Envoy::Config::Subscription>&& subscription) {
Expand Down Expand Up @@ -223,8 +223,11 @@ class NetworkPolicyMap : public Singleton::Instance,
void pause();
void resume();

bool isNewStream();

static uint64_t instance_id_;

Server::Configuration::ServerFactoryContext& context_;
ThreadLocal::TypedSlot<ThreadLocalPolicyMap> tls_map_;
const std::string local_ip_str_;
std::string name_;
Expand Down
12 changes: 9 additions & 3 deletions cilium/socket_option.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/config/core/v3/base.pb.h"
#include "envoy/network/listen_socket.h"

#include "source/common/common/hex.h"
#include "source/common/common/logger.h"
#include "source/common/common/utility.h"

Expand Down Expand Up @@ -189,9 +190,14 @@ class SocketMarkOption : public Network::Socket::Option,
absl::uint128 raw_address = ip->ipv6()->address();
addressIntoVector(key, raw_address);
}
// Add source port to the hash key
key.emplace_back(uint8_t(port >> 16));
key.emplace_back(uint8_t(port));
// Add source port to the hash key if defined
if (port != 0) {
ENVOY_LOG(trace, "hashKey port: {:x}", port);
key.emplace_back(uint8_t(port >> 8));
key.emplace_back(uint8_t(port));
}
ENVOY_LOG(trace, "hashKey after Cilium: {}, source: {}", Hex::encode(key),
original_source_address_->asString());
} else {
// Add the source identity to the hash key. This will separate upstream
// connection pools per security ID.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
github.com/petermattis/goid v0.0.0-20180202154549-b0b1615b78e5 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/proto/otlp v1.2.0 h1:pVeZGk7nXDC9O2hncA6nHldxEjm6LByfA2aN8IOkz94=
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
Loading
Loading