Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

postgres: support for upstream SSL #23990

Merged
merged 19 commits into from
Dec 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ option (udpa.annotations.file_status).package_version_status = ACTIVE;
// [#extension: envoy.filters.network.postgres_proxy]

message PostgresProxy {
// Upstream SSL operational modes.
enum SSLMode {
// Do not encrypt upstream connection to the server.
DISABLE = 0;

// Establish upstream SSL connection to the server. If the server does not
// accept the request for SSL connection, the session is terminated.
REQUIRE = 1;
}

// The human readable prefix to use when emitting :ref:`statistics
// <config_network_filters_postgres_proxy_stats>`.
string stat_prefix = 1 [(validate.rules).string = {min_len: 1}];
Expand All @@ -39,4 +49,12 @@ message PostgresProxy {
// Refer to official documentation for details
// `SSL Session Encryption Message Flow <https://www.postgresql.org/docs/current/protocol-flow.html#id-1.10.5.7.11>`_.
bool terminate_ssl = 3;

// Controls whether to establish upstream SSL connection to the server.
// Envoy will try to establish upstream SSL connection to the server only when
// Postgres filter is able to read Postgres payload in clear-text. It happens when
// a client established a clear-text connection to Envoy or when a client established
// SSL connection to Envoy and Postgres filter is configured to terminate SSL.
// Defaults to SSL_DISABLE.
SSLMode upstream_ssl = 4;
}
3 changes: 3 additions & 0 deletions changelogs/current.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ new_features:
- area: matching
change: |
support filter chain selection based on the dynamic metadata and the filter state using :ref:`formatter actions <extension_envoy.matching.actions.format_string>`.
- area: postgres
change: |
added support for upstream SSL.
- area: redis
change: |
extended :ref:`cluster support <arch_overview_redis_cluster_support>` by adding a :ref:`dns_cache_config <envoy_v3_api_field_extensions.filters.network.redis_proxy.v3.RedisProxy.ConnPoolSettings.dns_cache_config>` option that can be used to resolve hostnames returned by MOVED/ASK responses.
Expand Down
1 change: 1 addition & 0 deletions contrib/postgres_proxy/filters/network/source/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ envoy_cc_library(
"//source/common/buffer:buffer_lib",
"//source/common/network:filter_lib",
"//source/extensions/filters/network:well_known_names",
"@envoy_api//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg_cc_proto",
],
)

Expand Down
1 change: 1 addition & 0 deletions contrib/postgres_proxy/filters/network/source/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ NetworkFilters::PostgresProxy::PostgresConfigFactory::createFilterFactoryFromPro
config_options.enable_sql_parsing_ =
PROTOBUF_GET_WRAPPED_OR_DEFAULT(proto_config, enable_sql_parsing, true);
config_options.terminate_ssl_ = proto_config.terminate_ssl();
config_options.upstream_ssl_ = proto_config.upstream_ssl();

PostgresFilterConfigSharedPtr filter_config(
std::make_shared<PostgresFilterConfig>(config_options, context.scope()));
Expand Down
58 changes: 56 additions & 2 deletions contrib/postgres_proxy/filters/network/source/postgres_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ Decoder::Result DecoderImpl::onData(Buffer::Instance& data, bool frontend) {
return onDataIgnore(data, frontend);
case State::InSyncState:
return onDataInSync(data, frontend);
case State::NegotiatingUpstreamSSL:
return onDataInNegotiating(data, frontend);
default:
PANIC("not implemented");
}
Expand Down Expand Up @@ -240,7 +242,6 @@ Decoder::Result DecoderImpl::onDataInit(Buffer::Instance& data, bool) {

Decoder::Result result = Decoder::Result::ReadyForNext;
uint32_t code = data.peekBEInt<uint32_t>(4);
data.drain(4);
// Startup message with 1234 in the most significant 16 bits
// indicate request to encrypt.
if (code >= 0x04d20000) {
Expand Down Expand Up @@ -268,8 +269,24 @@ Decoder::Result DecoderImpl::onDataInit(Buffer::Instance& data, bool) {
}
} else {
ENVOY_LOG(debug, "Detected version {}.{} of Postgres", code >> 16, code & 0x0000FFFF);
state_ = State::InSyncState;
if (callbacks_->shouldEncryptUpstream()) {
// Copy the received initial request.
temp_storage_.add(data.linearize(data.length()), data.length());
// Send SSL request to upstream.
Buffer::OwnedImpl ssl_request;
uint32_t len = 8;
ssl_request.writeBEInt<uint32_t>(len);
uint32_t ssl_code = 0x04d2162f;
ssl_request.writeBEInt<uint32_t>(ssl_code);

callbacks_->sendUpstream(ssl_request);
result = Decoder::Result::Stopped;
state_ = State::NegotiatingUpstreamSSL;
} else {
state_ = State::InSyncState;
}
}
data.drain(4);

processMessageBody(data, FRONTEND, message_len_ - 4, first_, msgParser);
data.drain(message_len_);
Expand Down Expand Up @@ -412,6 +429,43 @@ void DecoderImpl::decodeBackendStatements() {
}
}

Decoder::Result DecoderImpl::onDataInNegotiating(Buffer::Instance& data, bool frontend) {
if (frontend) {
// No data from downstream is allowed when negotiating upstream SSL
// with the server.
data.drain(data.length());
state_ = State::OutOfSyncState;
return Decoder::Result::ReadyForNext;
}

// This should be reply from the server indicating if it accepted
// request to use SSL. It is only one character long packet, where
// 'S' means use SSL, 'E' means do not use.

// Indicate to the filter, the response and give the initial
// packet temporarily buffered to be sent upstream.
bool upstreamSSL = false;
state_ = State::InitState;
if (data.length() == 1) {
const char c = data.peekInt<char, ByteOrder::Host, 1>(0);
if (c == 'S') {
upstreamSSL = true;
} else {
if (c != 'E') {
state_ = State::OutOfSyncState;
}
}
} else {
state_ = State::OutOfSyncState;
}

data.drain(data.length());

callbacks_->encryptUpstream(upstreamSSL, temp_storage_);

return Decoder::Result::Stopped;
}

// Method is called when X (Terminate) message
// is encountered by the decoder.
void DecoderImpl::decodeFrontendTerminate() {
Expand Down
17 changes: 16 additions & 1 deletion contrib/postgres_proxy/filters/network/source/postgres_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class DecoderCallbacks {
virtual void processQuery(const std::string&) PURE;

virtual bool onSSLRequest() PURE;
virtual bool shouldEncryptUpstream() const PURE;
virtual void sendUpstream(Buffer::Instance&) PURE;
virtual void encryptUpstream(bool, Buffer::Instance&) PURE;
};

// Postgres message decoder.
Expand Down Expand Up @@ -88,7 +91,13 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {

bool encrypted() const { return encrypted_; }

enum class State { InitState, InSyncState, OutOfSyncState, EncryptedState };
enum class State {
InitState,
InSyncState,
OutOfSyncState,
EncryptedState,
NegotiatingUpstreamSSL
};
State state() const { return state_; }
void state(State state) { state_ = state; }

Expand All @@ -98,6 +107,7 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
Result onDataInit(Buffer::Instance& data, bool frontend);
Result onDataInSync(Buffer::Instance& data, bool frontend);
Result onDataIgnore(Buffer::Instance& data, bool frontend);
Result onDataInNegotiating(Buffer::Instance& data, bool frontend);

// MsgAction defines the Decoder's method which will be invoked
// when a specific message has been decoded.
Expand Down Expand Up @@ -188,6 +198,11 @@ class DecoderImpl : public Decoder, Logger::Loggable<Logger::Id::filter> {
MsgParserDict BE_errors_;
MsgParserDict BE_notices_;

// Buffer used to temporarily store a downstream postgres packet
// while sending other packets. Currently used only when negotiating
// upstream SSL.
Buffer::OwnedImpl temp_storage_;

// MAX_STARTUP_PACKET_LENGTH is defined in Postgres source code
// as maximum size of initial packet.
// https://github.com/postgres/postgres/search?q=MAX_STARTUP_PACKET_LENGTH&type=code
Expand Down
56 changes: 51 additions & 5 deletions contrib/postgres_proxy/filters/network/source/postgres_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/buffer/buffer.h"
#include "envoy/network/connection.h"

#include "source/common/common/assert.h"
#include "source/extensions/filters/network/well_known_names.h"

#include "contrib/postgres_proxy/filters/network/source/postgres_decoder.h"
Expand All @@ -15,8 +16,8 @@ namespace PostgresProxy {
PostgresFilterConfig::PostgresFilterConfig(const PostgresFilterConfigOptions& config_options,
Stats::Scope& scope)
: enable_sql_parsing_(config_options.enable_sql_parsing_),
terminate_ssl_(config_options.terminate_ssl_), scope_{scope},
stats_{generateStats(config_options.stats_prefix_, scope)} {}
terminate_ssl_(config_options.terminate_ssl_), upstream_ssl_(config_options.upstream_ssl_),
scope_{scope}, stats_{generateStats(config_options.stats_prefix_, scope)} {}

PostgresFilter::PostgresFilter(PostgresFilterConfigSharedPtr config) : config_{config} {
if (!decoder_) {
Expand Down Expand Up @@ -50,7 +51,12 @@ Network::FilterStatus PostgresFilter::onWrite(Buffer::Instance& data, bool) {

// Backend Buffer
backend_buffer_.add(data);
return doDecode(backend_buffer_, false);
Network::FilterStatus result = doDecode(backend_buffer_, false);
if (result == Network::FilterStatus::StopIteration) {
ASSERT(backend_buffer_.length() == 0);
data.drain(data.length());
}
return result;
}

DecoderPtr PostgresFilter::createDecoder(DecoderCallbacks* callbacks) {
Expand Down Expand Up @@ -205,8 +211,9 @@ bool PostgresFilter::onSSLRequest() {
// Wait until 'S' has been transmitted.
if (bytes >= 1) {
if (!read_callbacks_->connection().startSecureTransport()) {
ENVOY_CONN_LOG(info, "postgres_proxy: cannot enable secure transport. Check configuration.",
read_callbacks_->connection());
ENVOY_CONN_LOG(
info, "postgres_proxy: cannot enable downstream secure transport. Check configuration.",
read_callbacks_->connection());
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
} else {
// Unsubscribe the callback.
Expand All @@ -227,6 +234,45 @@ bool PostgresFilter::onSSLRequest() {
return false;
}

bool PostgresFilter::shouldEncryptUpstream() const {
return (config_->upstream_ssl_ ==
envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy::REQUIRE);
}

void PostgresFilter::sendUpstream(Buffer::Instance& data) {
read_callbacks_->injectReadDataToFilterChain(data, false);
}

void PostgresFilter::encryptUpstream(bool upstream_agreed, Buffer::Instance& data) {
RELEASE_ASSERT(
config_->upstream_ssl_ !=
envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy::DISABLE,
"encryptUpstream should not be called when upstream SSL is disabled.");
if (!upstream_agreed) {
ENVOY_CONN_LOG(info,
"postgres_proxy: upstream server rejected request to establish SSL connection. "
"Terminating.",
read_callbacks_->connection());
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);

config_->stats_.sessions_upstream_ssl_failed_.inc();
} else {
// Try to switch upstream connection to use a secure channel.
if (read_callbacks_->startUpstreamSecureTransport()) {
config_->stats_.sessions_upstream_ssl_success_.inc();
read_callbacks_->injectReadDataToFilterChain(data, false);
ENVOY_CONN_LOG(trace, "postgres_proxy: upstream SSL enabled.", read_callbacks_->connection());
} else {
ENVOY_CONN_LOG(info,
"postgres_proxy: cannot enable upstream secure transport. Check "
"configuration. Terminating.",
read_callbacks_->connection());
read_callbacks_->connection().close(Network::ConnectionCloseType::NoFlush);
config_->stats_.sessions_upstream_ssl_failed_.inc();
}
}
}

Network::FilterStatus PostgresFilter::doDecode(Buffer::Instance& data, bool frontend) {
// Keep processing data until buffer is empty or decoder says
// that it cannot process data in the buffer.
Expand Down
11 changes: 11 additions & 0 deletions contrib/postgres_proxy/filters/network/source/postgres_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "source/common/buffer/buffer_impl.h"
#include "source/common/common/logger.h"

#include "contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha/postgres_proxy.pb.h"
#include "contrib/postgres_proxy/filters/network/source/postgres_decoder.h"

namespace Envoy {
Expand All @@ -32,6 +33,8 @@ namespace PostgresProxy {
COUNTER(sessions_encrypted) \
COUNTER(sessions_terminated_ssl) \
COUNTER(sessions_unencrypted) \
COUNTER(sessions_upstream_ssl_success) \
COUNTER(sessions_upstream_ssl_failed) \
COUNTER(statements) \
COUNTER(statements_insert) \
COUNTER(statements_delete) \
Expand Down Expand Up @@ -67,11 +70,16 @@ class PostgresFilterConfig {
std::string stats_prefix_;
bool enable_sql_parsing_;
bool terminate_ssl_;
envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy::SSLMode
upstream_ssl_;
};
PostgresFilterConfig(const PostgresFilterConfigOptions& config_options, Stats::Scope& scope);

bool enable_sql_parsing_{true};
bool terminate_ssl_{false};
envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy::SSLMode
upstream_ssl_{
envoy::extensions::filters::network::postgres_proxy::v3alpha::PostgresProxy::DISABLE};
Stats::Scope& scope_;
PostgresProxyStats stats_;

Expand Down Expand Up @@ -112,6 +120,9 @@ class PostgresFilter : public Network::Filter,
void incTransactionsRollback() override;
void processQuery(const std::string&) override;
bool onSSLRequest() override;
bool shouldEncryptUpstream() const override;
void sendUpstream(Buffer::Instance&) override;
void encryptUpstream(bool, Buffer::Instance&) override;

Network::FilterStatus doDecode(Buffer::Instance& data, bool);
DecoderPtr createDecoder(DecoderCallbacks* callbacks);
Expand Down
11 changes: 10 additions & 1 deletion contrib/postgres_proxy/filters/network/test/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ load(
"envoy_cc_test",
"envoy_cc_test_library",
"envoy_contrib_package",
"envoy_proto_library",
)

licenses(["notice"]) # Apache 2
Expand Down Expand Up @@ -54,22 +55,30 @@ envoy_cc_test(
],
)

envoy_proto_library(
name = "postgres_integration_proto",
srcs = [":postgres_integration_test.proto"],
)

envoy_cc_test(
name = "postgres_integration_test",
srcs = [
"postgres_integration_test.cc",
],
data = [
"postgres_test_config.yaml",
"postgres_test_config.yaml-template",
"//test/config/integration/certs",
],
deps = [
":postgres_integration_proto_cc_proto",
":postgres_test_utils_lib",
"//contrib/postgres_proxy/filters/network/source:config",
"//contrib/postgres_proxy/filters/network/source:filter",
"//source/common/tcp_proxy",
"//source/extensions/filters/network/tcp_proxy:config",
"//source/extensions/transport_sockets/starttls:config",
"//test/integration:integration_lib",
"//test/test_common:registry_lib",
"@envoy_api//contrib/envoy/extensions/filters/network/postgres_proxy/v3alpha:pkg_cc_proto",
],
)
Loading