Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/LocalproxyConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ namespace aws {
* The end point will store either source listening or destination service depends on the mode of local proxy.
*/
std::unordered_map<std::string, std::string> serviceId_to_endpoint_map;

/**
* A flag to judge if v2 local proxy needs to fallback to communicate using v1 local proxy message format.
* v1 local proxy format fallback will be enabled when a tunnel is opened with no or 1 service id.
Expand Down
82 changes: 66 additions & 16 deletions src/TcpAdapterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,12 @@ namespace aws { namespace iot { namespace securedtunneling {
{
if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end())
{
BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id;
return connection_ptr;
if (tac.serviceId_to_tcp_server_map.find(tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first) == tac.serviceId_to_tcp_server_map.end())
{
BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id;
return connection_ptr;
}
service_id = tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first;;
}
tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id];
BOOST_LOG_SEV(log, trace) << "num active connections for service id " << service_id << ": " << server->connectionId_to_tcp_connection_map.size();
Expand Down Expand Up @@ -376,6 +380,7 @@ namespace aws { namespace iot { namespace securedtunneling {
BOOST_LOG_SEV(this->log, trace) << "Post-reset TCP drain complete. Closing TCP socket for service id " << service_id << " connection id " << connection_id;
BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << connection_to_reset->socket().remote_endpoint();
connection_to_reset->socket_.close();
delete_tcp_socket(tac, service_id, connection_id);
*tcp_write_buffer_drain_complete = true;
if (*web_socket_write_buffer_drain_complete)
{
Expand Down Expand Up @@ -515,8 +520,9 @@ namespace aws { namespace iot { namespace securedtunneling {
tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id);

// if simultaneous connections are not enabled, then send a stream reset
if (tac.adapter_config.is_v2_message_format)
if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format)
{
BOOST_LOG_SEV(log, info) << "simultaneous connections are not enabled, sending stream reset";
socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id);
tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1);
async_send_stream_reset(tac, service_id, connection_id);
Expand Down Expand Up @@ -600,8 +606,16 @@ namespace aws { namespace iot { namespace securedtunneling {

BOOST_LOG_SEV(log, debug) << "Sending stream start, setting new stream ID to: " << new_stream_id << ", service id: " << service_id;

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

outgoing_message.set_type(Message_Type_STREAM_START);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(new_stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand Down Expand Up @@ -638,8 +652,16 @@ namespace aws { namespace iot { namespace securedtunneling {
}
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

outgoing_message.set_type(Message_Type_CONNECTION_START);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand All @@ -657,12 +679,20 @@ namespace aws { namespace iot { namespace securedtunneling {
return;
}

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now.
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];
outgoing_message.set_type(Message_Type_STREAM_RESET);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(0);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
outgoing_message.clear_payload();
async_send_message(tac, outgoing_message, service_id, connection_id);
Expand All @@ -677,10 +707,19 @@ namespace aws { namespace iot { namespace securedtunneling {
BOOST_LOG_SEV(log, warning) << "No stream id mapping found for service id " << service_id << " . Skip connection reset.";
return;
}

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now.
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];
outgoing_message.set_type(Message_Type_CONNECTION_RESET);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand Down Expand Up @@ -1073,7 +1112,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
string service_id = message.serviceid();
Expand Down Expand Up @@ -1305,6 +1344,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// Remove empty string map and put new mapping
tac.adapter_config.serviceId_to_endpoint_map.erase("");
tac.adapter_config.serviceId_to_endpoint_map[service_id] = endpoint;

BOOST_LOG_SEV(log, info) << "Updated port mapping for v1 format: ";
for (auto m : tac.adapter_config.serviceId_to_endpoint_map)
{
Expand All @@ -1328,7 +1368,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
string service_id = message.serviceid();
Expand Down Expand Up @@ -1431,7 +1471,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
/**
Expand Down Expand Up @@ -1562,7 +1602,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id);
Expand Down Expand Up @@ -1762,8 +1802,17 @@ namespace aws { namespace iot { namespace securedtunneling {
throw proxy_exception((boost::format("No streamId exists for the service Id %1%") % service_id).str());
}
BOOST_LOG_SEV(log, debug) << "Prepare to send data message: service id: " << service_id << " stream id: " << tac.serviceId_to_streamId_map[service_id] << " connection id: " << connection_id;

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// Construct outgoing message
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(tac.serviceId_to_streamId_map[service_id]);
outgoing_message.set_connectionid(connection_id);
size_t const send_size = std::min<std::size_t>(GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE),
Expand Down Expand Up @@ -1988,9 +2037,10 @@ namespace aws { namespace iot { namespace securedtunneling {
uint32_t new_connection_id = ++server->highest_connection_id;

// backward compatibility: set connection id to 1 if simultaneous connections is not enabled
if (tac.adapter_config.is_v2_message_format)
if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format)
{
new_connection_id = 1;
BOOST_LOG_SEV(log, info) << "Falling back to older protocol, setting new connection id to 0";
new_connection_id = 0;
}
BOOST_LOG_SEV(log, info) << "creating tcp connection id " << new_connection_id;

Expand All @@ -2009,7 +2059,7 @@ namespace aws { namespace iot { namespace securedtunneling {
server->connectionId_to_tcp_connection_map[new_connection_id]->socket() = std::move(new_socket);
BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().local_endpoint().port() << " from " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().remote_endpoint();

if (is_first_connection)
if (is_first_connection || tac.adapter_config.is_v1_message_format || tac.adapter_config.is_v2_message_format)
{
async_send_stream_start(tac, service_id, new_connection_id);
}
Expand Down