Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
f2f08e7
ready for demo
RogerZhongAWS Jun 16, 2022
cf46451
fix log level to trace
RogerZhongAWS Aug 16, 2022
91b86ab
continue tcp socket read loop if connection does not exist
RogerZhongAWS Aug 16, 2022
06bac66
add cflag
RogerZhongAWS Aug 17, 2022
daa638e
add log
RogerZhongAWS Aug 17, 2022
96b36ff
fix bug with remote endpoint not connected
RogerZhongAWS Aug 17, 2022
1969751
add try catch block
RogerZhongAWS Aug 17, 2022
2d41367
remove extra line
RogerZhongAWS Aug 17, 2022
f8bf3d3
try sending stream start upon destination startup
RogerZhongAWS Aug 17, 2022
d7f5611
wrap conditional around server pointer dereference
RogerZhongAWS Aug 17, 2022
5d818b8
change connection id to nonzero
RogerZhongAWS Aug 17, 2022
a5d0b19
remove commented code
RogerZhongAWS Aug 17, 2022
d3f6ae1
update test protocol version and fix backwards compatability hang
RogerZhongAWS Aug 19, 2022
1fdeb30
partial fix for backwards compatiblity issue
RogerZhongAWS Aug 19, 2022
639e0e1
add missing line for backwards compatibility fix
RogerZhongAWS Aug 19, 2022
7461bbd
add log
RogerZhongAWS Aug 19, 2022
1d86854
add backwards compatiblity to stream reset handler
RogerZhongAWS Aug 19, 2022
728db3a
add v2 message flag
RogerZhongAWS Aug 19, 2022
cdc0ac7
keep connection_id at 1 if v2 flag is set
RogerZhongAWS Aug 19, 2022
e107ff5
fix tests
RogerZhongAWS Aug 24, 2022
ad1cfa1
clean up code and comments
RogerZhongAWS Aug 24, 2022
b488cc2
remove more comments, changed to uint32 for connection id member var …
RogerZhongAWS Aug 24, 2022
d4f10fa
update readme
RogerZhongAWS Sep 2, 2022
02448dd
touch up protocol readme
RogerZhongAWS Sep 2, 2022
5aec363
updated protocol guide to address comments
RogerZhongAWS Sep 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
327 changes: 327 additions & 0 deletions V3WebSocketProtocolGuide.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions resources/Message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ message Message {
bytes payload = 4;
string serviceId = 5;
repeated string availableServiceIds = 6;
uint32 connectionId = 7;

enum Type {
UNKNOWN = 0;
Expand All @@ -22,5 +23,7 @@ message Message {
STREAM_RESET = 3;
SESSION_RESET = 4;
SERVICE_IDS = 5;
CONNECTION_START = 6;
CONNECTION_RESET = 7;
}
}
6 changes: 5 additions & 1 deletion src/LocalproxyConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ namespace aws {
/**
* The web proxy endpoint port. This will be set only if a web proxy is necessary. defaults to 3128.
*/
std::uint16_t web_proxy_port {0 };
std::uint16_t web_proxy_port { 0 };
/**
* The web proxy authN. This will be set only if an web proxy is necessary and it requires authN.
*/
Expand Down Expand Up @@ -105,6 +105,10 @@ namespace aws {
* If this is set to true, it means that v2 local proxy won't validate service id field.
*/
bool is_v1_message_format {false};
/**
* A flag to judge if v3 local proxy needs to fallback to communicate using v2 local proxy message format.
*/
bool is_v2_message_format {false};
};
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/ProxySettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
std::size_t const DEFAULT_MAX_DATA_FRAME_SIZE = DEFAULT_MESSAGE_MAX_SIZE + DEFAULT_DATA_LENGTH_SIZE;

char const * const KEY_TCP_CONNECTION_RETRY_COUNT = "tunneling.proxy.tcp.connection_retry_count";
std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = 5;
std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = -1;

char const * const KEY_TCP_CONNECTION_RETRY_DELAY_MS = "tunneling.proxy.tcp.connection_retry_delay_ms";
std::uint32_t const DEFAULT_TCP_CONNECTION_RETRY_DELAY_MS = 1000;
Expand All @@ -34,6 +34,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings

char const * const KEY_MESSAGE_MAX_SIZE = "tunneling.proxy.message.max_size";
std::size_t const DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;

char const * const KEY_MAX_ACTIVE_CONNECTIONS = "tunneling.proxy.tcp.max_active_connections";
std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS = 128;

char const * const KEY_WEB_SOCKET_PING_PERIOD_MS = "tunneling.proxy.websocket.ping_period_ms";
std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS = 5000;
Expand All @@ -48,7 +51,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
bool const DEFAULT_WEB_SOCKET_DATA_ERROR_RETRY = true;

char const * const KEY_WEB_SOCKET_SUBPROTOCOL = "tunneling.proxy.websocket.subprotocol";
std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-2.0";
std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-3.0";

char const * const KEY_WEB_SOCKET_MAX_FRAME_SIZE = "tunneling.proxy.websocket.max_frame_size";
std::size_t const DEFAULT_WEB_SOCKET_MAX_FRAME_SIZE = DEFAULT_MAX_DATA_FRAME_SIZE * 2;
Expand Down Expand Up @@ -83,6 +86,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
ADD_SETTING_DEFAULT(settings, TCP_READ_BUFFER_SIZE);
ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_PAYLOAD_SIZE);
ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_SIZE);
ADD_SETTING_DEFAULT(settings, MAX_ACTIVE_CONNECTIONS);
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_PING_PERIOD_MS);
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_DELAY_MS);
ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_COUNT);
Expand Down
3 changes: 3 additions & 0 deletions src/ProxySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings
extern char const * const KEY_MESSAGE_MAX_SIZE;
extern std::size_t const DEFAULT_MESSAGE_MAX_SIZE;

extern char const * const KEY_MAX_ACTIVE_CONNECTIONS;
extern std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS;

extern char const * const KEY_WEB_SOCKET_PING_PERIOD_MS;
extern std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS;

Expand Down
826 changes: 563 additions & 263 deletions src/TcpAdapterProxy.cpp

Large diffs are not rendered by default.

48 changes: 29 additions & 19 deletions src/TcpAdapterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,14 @@ namespace aws { namespace iot { namespace securedtunneling {
wss{ nullptr },
wss_resolver{ io_ctx },
wss_response{ },
num_active_connections{ 0 },
stream_id{ -1 },
service_id{ "" },
serviceId_to_streamId_map{},
serviceId_to_tcp_server_map{},
serviceId_to_tcp_client_map{},
serviceId_to_control_message_handler_map{},
serviceId_to_data_message_handler_map{},
bind_address_actual{ },
is_web_socket_reading{ false },
is_service_ids_received{ false },
Expand All @@ -105,6 +108,8 @@ namespace aws { namespace iot { namespace securedtunneling {
//debuggability.
boost::beast::websocket::response_type wss_response;

std::atomic_uint16_t num_active_connections;

//represents the current stream ID to expect data from
//care should be taken how(if) this is updated directly
// To be deleted
Expand All @@ -113,6 +118,8 @@ namespace aws { namespace iot { namespace securedtunneling {
std::unordered_map<std::string, std::int32_t> serviceId_to_streamId_map;
std::unordered_map<std::string, tcp_server::pointer> serviceId_to_tcp_server_map;
std::unordered_map<std::string, tcp_client::pointer> serviceId_to_tcp_client_map;
std::unordered_map<std::string, std::function<bool(message const &)>> serviceId_to_control_message_handler_map;
std::unordered_map<std::string, std::function<bool(message const &)>> serviceId_to_data_message_handler_map;
std::string bind_address_actual;
//flag set to true while web socket data is being drained
//necessary for better TCP socket recovery rather than destroying
Expand Down Expand Up @@ -152,15 +159,15 @@ namespace aws { namespace iot { namespace securedtunneling {

int run_proxy();
private:
void update_message_handlers(tcp_adapter_context &tac, std::function<bool(message const &)> handler);
void setup_tcp_socket(tcp_adapter_context &tac, std::string const & service_id);
void setup_tcp_sockets(tcp_adapter_context &tac);
//setup async io flow to connect tcp socket to the adapter config's data host/port
void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id);
void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id);
void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection);
void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection);
void async_setup_source_tcp_sockets(tcp_adapter_context &tac);
void async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string service_id);
void initialize_tcp_clients(tcp_adapter_context &tac);
void initialize_tcp_servers(tcp_adapter_context &tac);
void do_accept_tcp_connection(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string service_id, std::uint16_t local_port, bool is_first_connection);
void setup_web_socket(tcp_adapter_context &tac);
//setup async web socket, and as soon as connection is up, setup async ping schedule
void async_setup_web_socket(tcp_adapter_context &tac);
Expand All @@ -169,10 +176,13 @@ namespace aws { namespace iot { namespace securedtunneling {
//then the reset is intentionally reset via web socket, and retries
//occur definitely (regardless of retry configuration)
void tcp_socket_reset_all(tcp_adapter_context &tac, std::function<void()> post_reset_operation);
void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, std::function<void()> post_reset_operation);
tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id);
void tcp_socket_reset_init(tcp_adapter_context &tac, std::string service_id, std::function<void()> post_reset_operation);
void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id, std::function<void()> post_reset_operation);
void tcp_socket_close(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);
tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);

void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id);
void delete_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id, uint32_t const & connection_id);

//sets up a web socket read loop that will read, and ignore most messages until a stream start
//is read and then do something with it (likely, connect to configured endpoint)
Expand All @@ -197,22 +207,21 @@ namespace aws { namespace iot { namespace securedtunneling {
//invokes after_setup_web_socket_read_until_stream_start() after stream start is encountered
bool async_wait_for_stream_start(tcp_adapter_context &tac, message const &message);
bool async_wait_for_service_ids(tcp_adapter_context &tac);
void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id);
void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);

//below loop does continuous writes to TCP socket from the TCP adapter
//context's tcp_write_buffer. After consuming chunks out of the buffer
//the behavior will be to check
void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id);
//the behavior will be to check
void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id);

void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id);
void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id);
void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);

//returns a boolean that indicates if another web socket data read message can be put
//onto the tcp write buffer. We have no way of knowing what the next message is and if
//it will be too big to process, thus we don't do the read applying back pressure on
//the socket. Implicitly, this means that an async_read is not happening on the web socket
bool tcp_has_enough_write_buffer_space(tcp_connection::pointer connection);
bool tcp_has_enough_write_buffer_space(tcp_adapter_context const &tac);

//returns a boolean that indicates if another tcp socket read's data can be put on the
//web socket write buffer. It's a bit different from tcp write buffer space requirements
Expand All @@ -226,8 +235,11 @@ namespace aws { namespace iot { namespace securedtunneling {
bool is_valid_stream_id(tcp_adapter_context const& tac, message const &message);

void async_send_message(tcp_adapter_context &tac, message const &message);
void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id);
void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id);
void async_send_message(tcp_adapter_context &tac, message const &message, std::string const & service_id, uint32_t const & connection_id);
void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_send_connection_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);
void async_send_connection_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id);

//handler for successfully sent ping will delay the next one
void async_ping_handler_loop(tcp_adapter_context &tac,
Expand All @@ -239,16 +251,14 @@ namespace aws { namespace iot { namespace securedtunneling {
void clear_ws_buffers(tcp_adapter_context &tac);
void clear_tcp_connection_buffers(tcp_connection::pointer connection);

void tcp_socket_ensure_closed(boost::asio::ip::tcp::socket & tcp_socket);

//closes the websocket connection
//1 - shutdown the receive side of TCP
//2 - drain the web socket write buffer
//3 - send a web socket close frame
//4 - perform teardown procedure on websocket
void web_socket_close_and_stop(tcp_adapter_context &tac);

void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, boost::system::error_code const &ec, tcp::resolver::results_type results);
void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr<basic_retry_config> retry_config, std::string const & service_id, uint32_t const & connection_id, boost::system::error_code const &ec, tcp::resolver::results_type results);

bool process_incoming_websocket_buffer(tcp_adapter_context &tac, boost::beast::multi_buffer &message_buffer);

Expand All @@ -264,7 +274,7 @@ namespace aws { namespace iot { namespace securedtunneling {

bool fall_back_to_v1_message_format(std::unordered_map<std::string, std::string> const & serviceId_to_endpoint_map);

void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& ss, std::string const & service_id);
void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr<boost::beast::flat_buffer> const& ss, std::string const & service_id, uint32_t const & connection_id);

void async_setup_destination_tcp_sockets(tcp_adapter_context &tac);

Expand Down
8 changes: 5 additions & 3 deletions src/TcpClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,20 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
tcp_client(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size)
: resolver_(io_context)
{
connection_ =
tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size);

}
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size)
{
return pointer(new tcp_client(io_context, write_buf_size, read_buf_size, ws_write_buf_size));
}

tcp_connection::pointer connection_;
tcp::resolver resolver_;

std::unordered_map<uint32_t, tcp_connection::pointer> connectionId_to_tcp_connection_map;

// function object defines what to do after set up a tcp socket
std::function<void()> after_setup_tcp_socket = nullptr;

// function object defines what to do receiving control message: stream start
std::function<void()> on_receive_stream_start = nullptr;
};
Expand Down
11 changes: 8 additions & 3 deletions src/TcpConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
public:
typedef boost::shared_ptr<tcp_connection> pointer;

static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size)
static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id)
{
return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size));
return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size, connection_id));
}

tcp::socket& socket()
{
return socket_;
}

tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size)
tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id)
: socket_(io_context)
, tcp_write_buffer_(write_buf_size)
, tcp_read_buffer_(read_buf_size)
, web_socket_data_write_buffer_(ws_write_buf_size)
, connection_id_(connection_id)

{
}

Expand All @@ -51,6 +53,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
//condense smaller TCP read chunks to bigger web socket writes. It also makes
//it impossible to "inject" a non-data message in data sequence order
boost::beast::multi_buffer web_socket_data_write_buffer_;

uint32_t connection_id_; // assigned connection_id for tcp connection

// Is this tcp socket currently writing
bool is_tcp_socket_writing_{ false };
// Is this tcp socket currently reading
Expand Down
10 changes: 7 additions & 3 deletions src/TcpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <unordered_map>
#include "TcpConnection.h"

namespace aws { namespace iot { namespace securedtunneling { namespace connection {
Expand All @@ -16,8 +17,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio
: acceptor_(io_context)
, resolver_(io_context)
{
connection_ =
tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size);
highest_connection_id = 0;
}

static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size)
Expand All @@ -32,7 +32,11 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio

tcp::acceptor acceptor_;
tcp::resolver resolver_;
tcp_connection::pointer connection_;

std::unordered_map<uint32_t, tcp_connection::pointer> connectionId_to_tcp_connection_map;

std::atomic_uint32_t highest_connection_id;

// function object defines what to do after set up a tcp socket
std::function<void()> after_setup_tcp_socket = nullptr;
};
Expand Down
Loading