diff --git a/V3WebSocketProtocolGuide.md b/V3WebSocketProtocolGuide.md new file mode 100644 index 00000000..2046fea3 --- /dev/null +++ b/V3WebSocketProtocolGuide.md @@ -0,0 +1,327 @@ +The reference implementation of the local proxy provides features that may require OS facilities not available on all device runtime environments in the industry. This guide provides details about the communication that occurs between the service and client to enable integration without or beyond the local proxy reference implementation choices. This protocol guide is only applicable for v2 local proxy. + +## Core implementation requirements + +In order to properly connect with and interpret messages from the AWS IoT Secure Tunneling service, the bare minimum is required: + +**Communications Protocols:** +* Websocket protocol ([RFC6455](https://tools.ietf.org/html/rfc6455)) over TCP/IP +* TLS 1.1+ + +**Data processing** +* ProtocolBuffers library + * Message size requirements are dependent on tunnel peer message sizes + +## Protocol Design + +The AWS IoT Secure Tunneling's usage of WebSocket is in part a subprotocol as defined by [RFC6455](https://tools.ietf.org/html/rfc6455), and there are additional restrictions when communicating with the service called out in this document. The data messages on top of WebSocket use [ProtocolBuffers](https://developers.google.com/protocol-buffers/) with a 2-byte length prefix. The messages themselves carry data and communicate tunnel connectivity information to enable tunnel clients to leverage full duplex communication. The protocol is designed to adapt TCP socket operations over a tunnel, but it is not limited to being used only for TCP based client or server applications. It is possible to implement the protocol directly and provide a network library or API to use directly in a napplication rather than a standalone process. This guide is intended to assist in those interested in directly interfacing with the WebSocket layer of AWS IoT Secure Tunneling. This document is not a programming guide so it is expected that you are familiar with the following: + +- AWS IoT Secure Tunneling service and its major concepts. Particularly the local proxy +- HTTP and WebSocket and how to use it in your preferred language and API (connect, send, and receive data) +- ProtocolBuffers and how to use it in your preferred language (generate code, parse messages, create messages) +- Conceptual familiarity with TCP sockets, and ideally API familiarity in your preferred language + +## Connecting to the proxy server and tunnel: WebSocket handshake + +The handshake performed to connect to a AWS IoT Secure Tunneling server is a standard WebSocket protocol handshake with additional requirements on the HTTP request. These requirements ensure proper access to a tunnel given a client access token: + +- The tunneling service only accepts connections secured with TLS 1.1 or higher +- The HTTP path of the upgrade request must be `/tunnel`. Requests made to any other path will result in a 400 HTTP response +- There must be a URL parameter `local-proxy-mode` specifying the tunnel connection (local proxy) mode. The value of this parameter must be `source` or `destination` +- There must be an access token specified in the request either via cookie, or an HTTP request header + - Set the access token via HTTP request header named 'access-token' or via cookie named 'awsiot-tunnel-token' + - Only one token value may be present in the request. Supplying multiple values for either the access-token header or the cookie, or both combined will cause the handshake to fail. + - Local proxy mode must match the mode of the access token or the handshake will fail. +- The HTTP request size must not exceed 4k bytes in length. Requests larger than this will be rejected +- The 'Sec-WebSocket-Protocol' header must contain at least one valid protocol string based on what is supported by the service + - Valid value: 'aws.iot.securetunneling-3.0' +- The AWS IoT Secure Tunneling server accepts a `client-token` header for specifying the client token. + - The client token is an added security layer to protect the tunnel by ensuring that only the agent that generated the client token can use a particular access token to connect to a tunnel. + - Only one client token value may be present in the request. Supplying multiple values will cause the handshake to fail. + - The client token is optional. + - The client token must be unique across all the open tunnels per AWS account + - It's recommended to use a UUIDv4 to generate the client token. + - The client token can be any string that matches the regex `^[a-zA-Z0-9-]{32,128}$` + - If a client token is provided, then local proxy needs to pass the same client token for subsequent retries + - If a client token is not provided, then the access token will become invalid after a successful handshake, and localproxy won't be able to reconnect using the same access token. + +An example URI of where to connect is as follows: + +`wss://data.tunneling.iot.us-east-1.amazonaws.com:443` + +The regional endpoint selected must match the region where the OpenTunnel call was made to acquire the client access tokens. + +An example WebSocket handshake request coming from a local proxy: + +``` +GET /tunnel?local-proxy-mode=source HTTP/1.1 +Host: data.tunneling.iot.us-east-1.amazonaws.com +Upgrade: websocket +Connection: upgrade +Sec-WebSocket-Key: 9/h0zvwMEXrg06G+RjnmcA== +Sec-WebSocket-Version: 13 +Sec-WebSocket-Protocol: aws.iot.securetunneling-3.0 +access-token: AQGAAXiVzSmRL1VaJ22G7eRb\_CrPABsAAgABQQAMOTAwNTgyMDkxNTM4AAFUAANDQVQAAQAHYXdzLWttcwBLYXJuOmF3czprbXM6dXMtZWFzdC0xOjcwMTU0NTg5ODcwNzprZXkvMmU4ZTAxMDEtYzE3YS00NjU1LTlhYWQtNjA2N2I2NGVhZWQyALgBAgEAeAJ2EsT4f5oCWm65Y8zRx\_nNaCjcG4FIeNV\_zMyhoOslAVAr521wChjzvogy-2-mxyoAAAB-MHwGCSqGSIb3DQEHBqBvMG0CAQAwaAYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAwfBUUjMYI9gDEp0xwCARCAO1VX0NAiSjfU-Ar9PWYaNI5j9v77CxLcucht3tWZd57-Zq3aRQZBM4SQiy-D0Cgv31IfZ8pgWu8asm5FAgAAAAAMAAAQAAAAAAAAAAAAAAAAACniTwIAksExcMygMJ2uHs3\_\_\_\_\_AAAAAQAAAAAAAAAAAAAAAQAAAC9e5K3Isg5gHqO9LYX0geH4hrfthPEUhdrl9ZLksPxcVrk6XC4VugzrmUvEUPuR00J3etgVQZH\_RfxWrVt7Jmg= +User-Agent: localproxy Mac OS 64-bit/boost-1.68.0/openssl-3.0.0/protobuf-3.17.3 +``` + +An example of a handshake request coming from a browser's WebSocket client may specify the following: + +``` +GET /tunnel?local-proxy-mode=source HTTP/1.1 +Host: data.tunneling.iot.us-east-1.amazonaws.com +Upgrade: websocket +Connection: upgrade +Sec-WebSocket-Key: 9/h0zvwMEXrg06G+RjnmcA== +Sec-WebSocket-Version: 13 +Sec-WebSocket-Protocol: aws.iot.securetunneling-3.0 +Cookie: awsiot-tunnel-token=AQGAAXiVzSmRL1VaJ22G7eRb\_CrPABsAAgABQQAMOTAwNTgyMDkxNTM4AAFUAANDQVQAAQAHYXdzLWttcwBLYXJuOmF3czprbXM6dXMtZWFzdC0xOjcwMTU0NTg5ODcwNzprZXkvMmU4ZTAxMDEtYzE3YS00NjU1LTlhYWQtNjA2N2I2NGVhZWQyALgBAgEAeAJ2EsT4f5oCWm65Y8zRx\_nNaCjcG4FIeNV\_zMyhoOslAVAr521wChjzvogy-2-mxyoAAAB-MHwGCSqGSIb3DQEHBqBvMG0CAQAwaAYJKoZIhvcNAQcBMB4GCWCGSAFlAwQBLjARBAwfBUUjMYI9gDEp0xwCARCAO1VX0NAiSjfU-Ar9PWYaNI5j9v77CxLcucht3tWZd57-Zq3aRQZBM4SQiy-D0Cgv31IfZ8pgWu8asm5FAgAAAAAMAAAQAAAAAAAAAAAAAAAAACniTwIAksExcMygMJ2uHs3\_\_\_\_\_AAAAAQAAAAAAAAAAAAAAAQAAAC9e5K3Isg5gHqO9LYX0geH4hrfthPEUhdrl9ZLksPxcVrk6XC4VugzrmUvEUPuR00J3etgVQZH\_RfxWrVt7Jmg= +User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:10.0) Gecko/20100101 Firefox/10.0 +``` + +On success, an example of a successful handshake response is: + +``` +HTTP/1.1 101 Switching Protocols +Date: Thu, 16 May 2019 20:56:03 GMT +Content-Length: 0 +Connection: upgrade +channel-id: 0ea2b3fffe6adc0e-0000125a-00005adb-c2f218c35b921565-17c807e1 +upgrade: websocket +sec-websocket-accept: akN+XFrGEeDLcMVNKV9HkQCOLaE= +sec-websocket-protocol: aws.iot.securetunneling-3.0 +``` + +The aspects of the response to consider above a standard successful WebSocket handshake response are: + +- The `channel-id` response header is a unique identifier for the WebSocket session with the service. It may be useful when troubleshooting any suspected issues through AWS Support +- The 'sec-websocket-protocol' response header will contain one of the values specified in the request. That the proxy Clients must understand and properly implement the subprotocol returned in this response header to ensure valid communication over the tunnel. + +After a successful WebSocket handshake with the tunneling service, full duplex communication is possible over WebSocket. Tunnel communication messages are delivered reliably and in order. + +### Handshake error responses + +* If the handshake HTTP response code is within the 500-599 range, the client should retry using an exponential backoff retry strategy. +* If the handshake HTTP response code is within the 400-499 range, the service is rejecting the clients request, or access to the tunnel is not possible or denied. Do not retry unless the problem is understood and the request changes (i.e. use another region endpoint or different client access token) +* Many handshake error responses will contain the `channel-id` header which may be helpful for AWS Support troubleshooting + +## WebSocket Subprotocol: aws.iot.securetunneling-3.0 + +While connected to the service with this protocol selected, the following restrictions apply or capabilities must be supported by clients. Violations may result in the server closing the connection abnormally, or your WebSocket client interface behaving improperly and crashing: + +- WebSocket frames will not have a payload exceeding 131076 bytes from the service +- The server will not accept WebSocket frames with a payload over 131076 bytes +- WebSocket frames of up to 131076 bytes may be sent to clients + - The peer tunnel clients do not dictate WebSocket frame sizes. The service may aggregate data and construct frames of different sizes than sent from the tunnel peer +- The service will respond to WebSocket ping frames with a pong reply containing a copy of the ping frame payload + - The local proxy reference implementation uses this to measure server response latency + - Clients may need to send ping frames to keep the connection alive + - It is not an error for the proxy server to not respond to a ping frame +- Pong frames sent to the service will not illicit a response +- Ping/pong frames received by the service are included in bandwidth consumption for traffic rate limiting +- The server will not normally initiate ping requests to clients, but clients should send a pong reply +- The proxy server will not send text WebSocket frames. This protocol operates entirely with binary messages. If any text frames are received, clients SHOULD close the WebSocket connection +- All non-control WebSocket frames sent to the service must be binary + +### Protocol behavior model: Tunneling data streams + +The core activity during tunneling is sending ProtocolBuffers messages back and forth carrying either data, or messages that manage the connection state (called _control messages_) over the WebSocket connection to the service. This WebSocket connection to the service is synonymous with being connected to the tunnel. The process to support an application data transfer successfully over the tunnel can be divided into three steps. + +#### Step 1: Establish tunnel connection and perform validations +Local proxy will initiate a web socket handshake to connect to the tunnel, using Sec-WebSocket-Protocol _aws.iot.securetunneling-3.0_. The Secure Tunneling service will acknowledge this request after authentication and validation. At this point, we can say the tunnel connection is established. After this, the Secure Tunneling service will send back control message _SERVICE_IDS_, containing a list of service IDs used in OpenTunnel API call, specified by **services** in [DestinationConfig](https://docs.aws.amazon.com/iot/latest/apireference/API_iot-secure-tunneling_DestinationConfig.html). These service IDs will be used as the source of truth of what service IDs are allowed to start local proxy. Upon receving these service IDs, local proxy will validate the service IDs provided through either configuration files or command line arguements. A validation failure on service IDs will cause local proxy fails to start. Below are two possible cases: +1. Service IDs received from the AWS IoT Secure Tunneling server does not match service IDs used to start local proxy. + For example, in OpenTunnel API call, service IDs SSH1, SSH2 are provided. When local proxy starts, it specifies the service IDs as SSH3, through _-s_ or _-d_ parameter. In this case, since SSH3 does not match SSH1 and SSH2, local proxy will fail to start. Even though there is no enforcement on the naming convetion of service ID, the value of service IDs and number of service IDs have to match between OpenTunnel call and local proxy. +2. Local proxy cannot find the port mapping for all the service IDs. + This is more likely to happen in the destination local proxy with invalid configuration files. For example, in OpenTunnel API call, service ID SSH1, SSH2 are provided. However, no port mapping is configured for service ID SSH1, neither through configuration files nor _-d_ parameter. Failing to find a port mapping for SSH1 will cause local proxy fails to start, as local proxy does not know where to route the traffic to. For source local proxy, this is unlikely to happen. When source local proxy fails to find port mapping for certain service ID, it will automatically pick up available port to use when it starts. + +#### Step 2: Start a stream +Once started successfully, source local proxy will listen for incoming connections on the configured ports. Destination local proxy, on the other hand, will wait for control message _StreamStart_. When client application connecting to a configured listening port, source local proxy will accept the TCP connection and sends a _StreamStart_ message with `connection_id = 1` to destination local proxy, for this specific service ID. When preparing to send _StreamStart_ message, source local proxy will also store service ID -> stream ID mapping for book keeping. +If multiple ports are used to start local proxy, each stream will send its own _StreamStart_ message when the TCP connection on the configured port is accepted. A _StreamStart_ message contains _streamID_, _serviceID_, and _connectionID_. _serviceID_ helps uniquely identify a service transferred over a tunnel . _streamID_ helps to reset a stream and identify stale data. _connectionID_ is uniquely mapped to each `boost::tcp_connection` object within a _serviceID_. + +#### Step 3: End to end data transfer over the tunnel + +On receiving a StreamStart, the destination local proxy will update the service ID --> Stream ID mapping, add a new connection ID --> tcp_connection mapping, and connect to the configured destination service for a service ID. The destination local proxy does not send a reply to the source local proxy on successful connection. Immediately after the source local proxy sends StreamStart and immediately after the destination establishes a valid TCP connection, each side respectively can begin to send and receive incoming messages on the active data stream. When the data stream is closed or disrupted (for the local proxy, this is a TCP close or I/O error on the TCP socket), a ConnectionReset control message with the currently stored stream ID, service ID, and connection ID should be sent through the tunnel so the tunnel peer can react appropriately and end the data stream. Control messages associated with a stream should be processed with the same stream ID filter. +Protocol V3 now supports the dynamic creation of more than one TCP connection at a time. The first TCP SYN packet sent by the client will initiate creation of the stream with a StreamStart message. Any subsequent TCP SYN packets sent while the stream is alive will build a new TCP connection a forward the signal to the destination with a ConnectionStart message. + +Here are some important things to know for a high-level understanding of tunneling data stream handling: + +- The service may use the Service ID to decide how to route traffic between connected tunnel clients. + - For example, when local proxy received a data packet with Service ID SSH1, it will look up the configuration for SSH1 and see which port this service ID is mapped to. If SSH1 is mapped to port 22 on local host, then this data packet will be forward to port 22 on local host. +- The local proxy uses the service ID -> stream ID mapping to check the current active stream ID for a specific service ID. +- The stream ID validation for a certain stream(service ID) will only be performed on message type _StreamReset_ and _Data_. If a received message failed the stream ID validation, this message is considered to be stale and will be discarded by local proxy. +- The local proxy, and library clients may use stream ID to determine how to respond to or filter incoming messages + - For example: if a source sends a _StreamStart_ with a stream ID of 345 in response to a newly accepted TCP connection, and afterwards receives a _Data_ message marked with stream ID of 565, that data should be ignored. It's origin is tied to a prior connection over the tunnel from the perspective of the tunnel peer that originated it + - Another example: if a source local proxy sends a _StreamStart_ with a stream ID of 345 in response to a newly accepted TCP connection, and afterwards receives a _StreamReset_ message marked with stream ID of 565, that message should be ignored. Only a _StreamReset_ with a stream ID of 345 should cause the client to close its local connection +- The local proxy, and library clients may use connection ID to determine how to respond to or filter incoming messages in a similar manner to that of stream id. +- Ending a TCP Connection (normally or abnormally) is accomplished by either side sending a _ConnectionReset_ with the stream ID and connection ID that is meant to be closed. +- Locally detected network failures are communicated by sending _StreamReset_ over the tunnel using the active stream ID if one is active. +- If there is a network issue with the WebSocket connection, no control message is necessary to send. However, the active stream should be considered invalid and closed. The localproxy will then reconnect to the tunnel via the service and start a new stream. +- StreamReset will immediately close all connections associated with the service. + + +### Tunneling message frames + +WebSocket binary frames contain a sequence of tunnel frames or messages. Each data message has a **2-byte unsigned short, big endian** data length prefix, followed by sequence of bytes whose length is specified by the data length. These bytes must be parsed into a ProtocolBuffers object that uses the schema shown in this document. Every message received must be processed, and should be processed in order for data stream integrity. If the order of messages is lost or cannot be understood during processing by the client, it should end the data stream with a _StreamReset_. Messages may control the state of the data stream, or it may contain actual stream data. Inspecting the message's type is the first step in processing a message. A single data length + bytes parsed into a ProtocolBuffers message represents an entire tunneling message frame, and the beginning of the next frame's length prefix follows immediately. This is a visual diagram of a single frame: + + |-----------------------------------------------------------------| + | 2-byte data length | N byte ProtocolBuffers message | + |-----------------------------------------------------------------| + +Tunneling message frames are very loosely coupled with WebSocket frames. It is not required that a WebSocket frame contain an entire tunneling message frame. The start and end of a WebSocket frame does not have to be aligned with a tunneling frame and vice versa. A WebSocket frame may contain multiple tunneling frames, or it may contain only a slice of a tunneling frame started in a previous WebSocket frame and will finish in a later WebSocket frame. This means that processing the WebSocket data must be done as pure a sequence of bytes that sequentially construct tunneling frames regardless of what the WebSocket fragmentation is. + +Additionally, the WebSocket framing decided by one tunnel peer is not guaranteed to be the same as those received by the other side. For example, the maximum WebSocket frame size in the `aws.iot.securetunneling-3.0` protocol is 131076 bytes, and the service may aggregate data to a point that aggregates multiple messages to this size into a single frame. The tunneling message frames generated by a tunnel peer are maintained by the service and cannot be aggregated or fragmented. This enables known tunnel peers to operate under more restrictive guidelines than what is valid in this protocol guide. One example of this is reducing the maximum payload of a tunneling message to 16kb down from 64kb to enable local proxy implementations to reduce the size of processing buffers. + +### ProtocolBuffers Message Schema + +The data that must be parsed into a ProtocolBuffers message conforms to the following schema: + +``` +syntax = "proto3"; + +package com.amazonaws.iot.securedtunneling; + +option java_outer_classname = "Protobuf"; +option optimize_for = LITE_RUNTIME; + +message Message { + Type type = 1; + int32 streamId = 2; + bool ignorable = 3; + bytes payload = 4; + string serviceId = 5; + repeated string availableServiceIds = 6; + uint32 connectionId = 7; + + enum Type { + UNKNOWN = 0; + DATA = 1; + STREAM_START = 2; + STREAM_RESET = 3; + SESSION_RESET = 4; + SERVICE_IDS = 5; + CONNECTION_START = 6; + CONNECTION_RESET = 7; + } +} +``` + +Tunneling frames (without the data length prefix) must parse into a _Message_ object and satisfy the following rules: + +- _Type_ field must be set to a non-zero enum value. Due to ProtocolBuffers schema recommendation, the keyword 'required' is not used in the actual schema +- It is invalid for a client connected with mode=destination to send a message with _Type_ = _StreamStart_ over the tunnel. +- It is invalid for any client to send message types associated with a stream (_StreamStart_, _ConnectionStart_, _Data_, _StreamReset_, _ConnectionReset_) with a stream ID of 0 +- Sending a message type (_StreamStart_, _ConnectionStart_, _Data_, _ConnectionReset_) without a connection ID or with a connection ID of 0 will always make destination v3 localproxy reinterpret it as connection ID set to 1. This is intended behavior. +- It is invalid for any client to send _SessionReset_. +- They payload of any message may not contain more than 63kb (64512 bytes) of data. +- It is invalid to extend the schema with additional fields and send them through the tunnel. The service will close the WebSocket connection if this occurs. +- Avoid negative stream ID numbers for message size efficiency. Stream ID of 0 is invalid. Connection ID of 0 will be ignored. +- It is invalid for any local proxy to send message types _SERVICE_IDS_. It can only be sent from the Secure Tunneling service. +- Change the tag numbers of existing field of ProtocolBuffers will cause backward compatibility issue between V1 and V2 local proxy. Fore more information, please read [Extending a Protocol Buffer](https://developers.google.com/protocol-buffers/docs/cpptutorial#extending-a-protocol-buffer). + +### Backward compatibility + +#### Backward compatibility between V2 and V3 local proxy +V2 local proxy protocol uses Sec-WebSocket-Protocol _aws.iot.securetunneling-2.0_ when communicates with AWS IoT Tunneling Service. +V3 local proxy protocol uses Sec-WebSocket-Protocol _aws.iot.securetunneling-3.0_ when communicates with AWS IoT Tunneling Service. +The communication between V2 and V3 local proxy is supported for a multiplexed tunnel with a single TCP connection per stream. +- _aws.iot.securetunneling-2.0_ and _aws.iot.securetunneling-3.0_ subprotocol are interoperable. +- An empty connection ID field or connection ID set to 0 in a message should be interpreted as the connection ID field is not present. This is because in protocol buffers _proto3_, it can not tell if a field is set with 0 or a field is not present at all.\ +- If existing local proxy receives a StreamStart message from proposed local proxy, it will ignore the connection ID field. +- The local proxy should not use a connection ID when sending to v2 protocol. Using the local proxy with simultaneous TCP services to communicate with v2 local proxy is not supported. +- If a v3 local proxy sends a StreamStart to a v2 local proxy, the first TCP connection will be established and the tunnel functions as existing until the v3 local proxy sends ConnectionStart or ConnectionReset. In this case, destination local proxy does not recognize the new message type and sends StreamReset. +- An empty connection ID field in a StreamStart message should be interpreted as a message sent from a v2 local proxy. In that case, v3 local proxy should ignore the connection ID field. All the subsequent messages should not contain connection ID (will be ignored by the v3 local proxy if they do). ConnectionStart or ConnectionReset should be treated as error. And vice versa, if the local proxy sends the first StreamStart with connection ID, then all subsequent messages should contain the connection ID field. If the subsequent messages does not contain connection ID, local proxy should see the peer as non-compliant and close the stream. + +### Message type reference + +#### StreamStart + +* _StreamStart_ is the first message sent to start and establish the new and active data stream. For local proxies, this message carries across similar meaning to a TCP SYN packet. +* When to send + * When the source tunnel client wants to initiate a new data stream with the destination, it does this by sending a _StreamStart_ with a temporally unique stream ID and service ID. Stream ID should be chosen in a way that is unlikely to repeat through a tunnel's lifetime. Service ID is determined by which port accepts the TCP connection. For example, if you configure SSH1 to listen on port 5555, SSH2 to listen on port 6666, when the connection from port 5555 is accepted, service ID will be choosen to be SSH1. +* Behavior on receive: + * Destination local proxy should treat this as a request to initiate a new stream to a configured destination service and establish the given stream ID as the current. + * If the destination mode tunnel client already has an already open/active stream and receives a _StreamStart_, it should consider the current data stream to have closed and immediately start a new active stream with the new stream ID. A _StreamReset_ MAY be sent for the replaced stream ID. + * Source mode tunnel clients SHOULD treat receiving _StreamStart_ as an error and close the active data stream and WebSocket connection. +* Notes + * After the source client sends _StreamStart_, it may immediately send request data and assume the destination will connect. Failure will result in a _StreamReset_ coming back, and success (with data response) results in receiving data on the stream ID +* Example: Message(type=STREAM_START, streamId=1, connectionId=1, payload=, serviceId=ssh1, availableServiceIds=, ignorable=) + +#### StreamReset + +* _StreamReset_ messages conveys that the data stream has ended, either in error, or closed intentionally for the tunnel peer. It is also sent to the source tunnel peer if an attempt to establish a new data stream fails on the destination side. +* When to send: + * During a stream's data transmission, if anything happens that makes it impossible to process a data stream's data correctly or in order (I/O error, logic error), a _StreamReset_ should be sent with the active stream ID and valid service ID. + * While attempting to establish a new data stream, if the destination tunnel client fails to establish a local connection, it should send a _StreamReset_ back over the tunnel with the requested stream ID and service ID. +* Behavior on receive: + * Both tunnel client modes should respond to a _StreamReset_ message by closing the active data stream or connection when the stream ID matches the current stream + * After closing the current stream, the current stream ID should be unset internally + * The tunnel client SHOULD perform an orderly shutdown of the data stream or connection and flush any local connection buffers before closing + * If the receiver does not have an active stream, it is safe to ignore a _StreamReset_ message +* Notes + * The proxy server may generate _StreamReset_ messages in the following scenarios: + * The tunnel peer is replaced (likely has reconnected) by a new peer bearing a valid access token + * An internal error has disrupted the internal routing for the tunnel +* Example: Message(type=STREAM_RESET, streamId=1, payload=, serviceId=ssh1, availableServiceIds=, ignorable=) + +#### ConnectionStart + +* _ConnectionStart_ is the message sent to start and establish a new and active connection when the stream has been established and there’s one active connection in the stream. To start the first connection, use StreamStart. For local proxies, this message carries across similar meaning to a TCP SYN packet. +* When to send + * When the source tunnel client wants to initiate a new data stream with the destination, local proxy does this by sending a ConnectionStart with the current stream ID and connection ID. Connection ID should be chosen in a way that is unlikely to repeat through a tunnel's lifetime. Connection ID is used to identify the TCP connection at the source tunnel client. + +* Behavior on receive: + * Destination local proxy should treat this as a request to initiate a TCP connection to a configured destination service and associate the new TCP connection with the given connection ID. + * If the destination mode tunnel client already has an already open/active TCP connection with the given connection ID, it should consider it an error and send ConnectionReset for the given connection ID to tunnel peer. + * Source mode tunnel clients SHOULD treat receiving ConnectionStart as an error and close the active connection for the given connection ID. +* Example: Message(type=CONNECTION_START, streamId=1, connectionId=1, payload=, serviceId=ssh1, availableServiceIds=, ignorable=) + +#### ConnectionReset + +* _ConnectionReset_ message conveys that the connection has ended, either in error, or closed intentionally for the tunnel peer. It is also sent to the source tunnel peer if an attempt to establish a new connection fails on the destination side. For local proxies, this message carries across similar meaning to a TCP RST packet. +* When to send: + * During a connection's data transmission, if anything happens that makes it impossible to process a connection's data correctly or in order (I/O error, logic error), a ConnectionReset should be sent with the active stream ID and valid connection ID. + * While attempting to establish a new connection, if the destination tunnel client fails to establish a local connection, it should send a ConnectionReset back over the tunnel with the requested stream ID and connection ID. + +* Behavior on receive: + * Both tunnel client modes should respond to a ConnectionReset message by closing the active connection when the stream ID matches the current stream and connection ID matches an active connection. + * After closing the connection, the connection ID should be unset internally + * The tunnel client SHOULD perform an orderly shutdown of the connection and flush any local connection buffers before closing + * If the receiver does not have an active stream or matching connection, it is safe to ignore a ConnectionReset message +* Example: Message(type=CONNECTION_RESET, streamId=1, connectionId=1, payload=, serviceId=ssh1, availableServiceIds=, ignorable=) + +#### SessionReset + +* _SessionReset_ messages can only originate from Secure Tunneling service if an internal data transmission error is detected +* When to send: + * N/A - tunnel client cannot send this message through the service +* Behavior on receive: + * This message should be handled the same as _StreamReset_ except that it carries no stream ID association so any active stream should be closed +* Notes + * This message type should rarely be observed. + * If the receiver does not have an active stream, it is safe to ignore a _SessionReset_ message +* Example: Message(type=SESSION_RESET, streamId=, payload=, serviceId=, availableServiceIds=, ignorable=) + + +#### Data + +* _Data_ messages carry a payload with a sequence of bytes to write to the active data stream when received by local proxy. When local proxy reads data from its local connection, those bytes should be inserted into the payload of a _Data_ message and sent over a tunnel +* When to send: + * When a tunnel client reads data on the (non-WebSocket) data stream (e.g. the TCP connection for the local proxy), it must construct _Data_ messages with the sequence of bytes put into the payload - up to 63kb in size - and set the active stream ID and valid service ID on the message. +* Behavior on receive: + * When a local proxy receives _Data_ messages, it must write the payload data directly to the (non-WebSocket) data stream +* Example: Message(type=DATA, streamId=1, connectionId=1, payload=[byte sequence], serviceId=ssh1, availableServiceIds=, ignorable=) + +#### ServiceIDs +* _ServiceIDs_ message carry a list of unique service IDs used when open a tunnel with **services** in [DestinationConfig](https://docs.aws.amazon.com/iot/latest/apireference/API_iot-secure-tunneling_DestinationConfig.html) parameter. It's sent to local proxy for providing the source of truth of what service IDs can be used in local proxy. +* When to send: + * N/A - tunnel client cannot send this message through the service +* Behavior on receive: + * Validate user input. If there is a mismatch between service ID list in OpenTunnel API and local proxy, local proxy will fail to start. + * Build or update in-memory ports mapping for book keeping. It will build or update an unordered map, with service ID as the key, and the configured port as the value. In the future, when a data packet is received from the tunnel, local proxy will extract the service ID from a data packet and find which port should this packet be forwarded using this map. + +* Example: Message(type=SERVICE_IDS, streamId=, payload=, serviceId=, availableServiceIds=ssh1, ssh2, ignorable=) + + +### Ignorable field + +If a message is received and its type is unrecognized, and this field is set to true, it is ok for the tunnel client to ignore the message safely. The tunnel client MAY still treat the unrecognized message as an error out of caution. If this field is unset, it must be considered as false. diff --git a/resources/Message.proto b/resources/Message.proto index 70ed7e92..68846303 100644 --- a/resources/Message.proto +++ b/resources/Message.proto @@ -14,6 +14,7 @@ message Message { bytes payload = 4; string serviceId = 5; repeated string availableServiceIds = 6; + uint32 connectionId = 7; enum Type { UNKNOWN = 0; @@ -22,5 +23,7 @@ message Message { STREAM_RESET = 3; SESSION_RESET = 4; SERVICE_IDS = 5; + CONNECTION_START = 6; + CONNECTION_RESET = 7; } } diff --git a/src/LocalproxyConfig.h b/src/LocalproxyConfig.h index bdf7b8ad..7fc7e048 100644 --- a/src/LocalproxyConfig.h +++ b/src/LocalproxyConfig.h @@ -57,7 +57,7 @@ namespace aws { /** * The web proxy endpoint port. This will be set only if a web proxy is necessary. defaults to 3128. */ - std::uint16_t web_proxy_port {0 }; + std::uint16_t web_proxy_port { 0 }; /** * The web proxy authN. This will be set only if an web proxy is necessary and it requires authN. */ @@ -105,6 +105,10 @@ namespace aws { * If this is set to true, it means that v2 local proxy won't validate service id field. */ bool is_v1_message_format {false}; + /** + * A flag to judge if v3 local proxy needs to fallback to communicate using v2 local proxy message format. + */ + bool is_v2_message_format {false}; }; } } diff --git a/src/ProxySettings.cpp b/src/ProxySettings.cpp index 393067cc..d713be4d 100644 --- a/src/ProxySettings.cpp +++ b/src/ProxySettings.cpp @@ -23,7 +23,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings std::size_t const DEFAULT_MAX_DATA_FRAME_SIZE = DEFAULT_MESSAGE_MAX_SIZE + DEFAULT_DATA_LENGTH_SIZE; char const * const KEY_TCP_CONNECTION_RETRY_COUNT = "tunneling.proxy.tcp.connection_retry_count"; - std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = 5; + std::int32_t const DEFAULT_TCP_CONNECTION_RETRY_COUNT = -1; char const * const KEY_TCP_CONNECTION_RETRY_DELAY_MS = "tunneling.proxy.tcp.connection_retry_delay_ms"; std::uint32_t const DEFAULT_TCP_CONNECTION_RETRY_DELAY_MS = 1000; @@ -34,6 +34,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings char const * const KEY_MESSAGE_MAX_SIZE = "tunneling.proxy.message.max_size"; std::size_t const DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024; + + char const * const KEY_MAX_ACTIVE_CONNECTIONS = "tunneling.proxy.tcp.max_active_connections"; + std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS = 128; char const * const KEY_WEB_SOCKET_PING_PERIOD_MS = "tunneling.proxy.websocket.ping_period_ms"; std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS = 5000; @@ -48,7 +51,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings bool const DEFAULT_WEB_SOCKET_DATA_ERROR_RETRY = true; char const * const KEY_WEB_SOCKET_SUBPROTOCOL = "tunneling.proxy.websocket.subprotocol"; - std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-2.0"; + std::string const DEFAULT_WEB_SOCKET_SUBPROTOCOL = "aws.iot.securetunneling-3.0"; char const * const KEY_WEB_SOCKET_MAX_FRAME_SIZE = "tunneling.proxy.websocket.max_frame_size"; std::size_t const DEFAULT_WEB_SOCKET_MAX_FRAME_SIZE = DEFAULT_MAX_DATA_FRAME_SIZE * 2; @@ -83,6 +86,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings ADD_SETTING_DEFAULT(settings, TCP_READ_BUFFER_SIZE); ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_PAYLOAD_SIZE); ADD_SETTING_DEFAULT(settings, MESSAGE_MAX_SIZE); + ADD_SETTING_DEFAULT(settings, MAX_ACTIVE_CONNECTIONS); ADD_SETTING_DEFAULT(settings, WEB_SOCKET_PING_PERIOD_MS); ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_DELAY_MS); ADD_SETTING_DEFAULT(settings, WEB_SOCKET_CONNECT_RETRY_COUNT); diff --git a/src/ProxySettings.h b/src/ProxySettings.h index 8d7fee42..9dc4e101 100644 --- a/src/ProxySettings.h +++ b/src/ProxySettings.h @@ -32,6 +32,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace settings extern char const * const KEY_MESSAGE_MAX_SIZE; extern std::size_t const DEFAULT_MESSAGE_MAX_SIZE; + extern char const * const KEY_MAX_ACTIVE_CONNECTIONS; + extern std::uint32_t const DEFAULT_MAX_ACTIVE_CONNECTIONS; + extern char const * const KEY_WEB_SOCKET_PING_PERIOD_MS; extern std::uint32_t const DEFAULT_WEB_SOCKET_PING_PERIOD_MS; diff --git a/src/TcpAdapterProxy.cpp b/src/TcpAdapterProxy.cpp index 86d1a765..d377a1a7 100644 --- a/src/TcpAdapterProxy.cpp +++ b/src/TcpAdapterProxy.cpp @@ -189,52 +189,18 @@ namespace aws { namespace iot { namespace securedtunneling { } } - void tcp_adapter_proxy::initialize_tcp_clients(tcp_adapter_context &tac) - { - BOOST_LOG_SEV(log, trace) << "Initializing tcp clients ..."; - for (auto m: tac.adapter_config.serviceId_to_endpoint_map) - { - string service_id = m.first; - // create new tcp clients if needed - if (tac.serviceId_to_tcp_client_map.find(service_id) == tac.serviceId_to_tcp_client_map.end()) - { - tac.serviceId_to_tcp_client_map[service_id] = tcp_client::create(tac.io_ctx, - GET_SETTING(settings, TCP_WRITE_BUFFER_SIZE), - GET_SETTING(settings, TCP_READ_BUFFER_SIZE), - GET_SETTING(settings, WEB_SOCKET_WRITE_BUFFER_SIZE)); - } - } - } - - void tcp_adapter_proxy::initialize_tcp_servers(tcp_adapter_context &tac) - { - BOOST_LOG_SEV(log, trace) << "Initializing tcp servers ..."; - for (auto m: tac.adapter_config.serviceId_to_endpoint_map) - { - string service_id = m.first; - // create new tcp servers if needed - if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end()) - { - tac.serviceId_to_tcp_server_map[service_id] = tcp_server::create(tac.io_ctx, - GET_SETTING(settings, TCP_WRITE_BUFFER_SIZE), - GET_SETTING(settings, TCP_READ_BUFFER_SIZE), - GET_SETTING(settings, WEB_SOCKET_WRITE_BUFFER_SIZE)); - } - } - } - void tcp_adapter_proxy::setup_tcp_sockets(tcp_adapter_context &tac) { BOOST_LOG_SEV(log, trace) << "Setting up tcp sockets "; clear_ws_buffers(tac); if (localproxy_config.mode == proxy_mode::DESTINATION) { - initialize_tcp_clients(tac); + BOOST_LOG_SEV(log, trace) << "Initializing tcp clients ..."; async_setup_destination_tcp_sockets(tac); } else { - initialize_tcp_servers(tac); + BOOST_LOG_SEV(log, trace) << "Initializing tcp servers ..."; async_setup_source_tcp_sockets(tac); } } @@ -242,19 +208,19 @@ namespace aws { namespace iot { namespace securedtunneling { void tcp_adapter_proxy::setup_tcp_socket(tcp_adapter_context &tac, std::string const & service_id) { BOOST_LOG_SEV(log, trace) << "Setting up tcp socket for service id: " << service_id; - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); + + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); if (localproxy_config.mode == proxy_mode::DESTINATION) { tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; - client->on_receive_stream_start = std::bind(&tcp_adapter_proxy::async_setup_dest_tcp_socket, this, std::ref(tac), service_id); - client->after_setup_tcp_socket = std::bind(&tcp_adapter_proxy::async_setup_bidirectional_data_transfers, this, std::ref(tac), service_id); + client->connectionId_to_tcp_connection_map.clear(); async_web_socket_read_until_stream_start(tac, service_id); } else { tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; - server->connection_->after_send_message = std::bind(&tcp_adapter_proxy::async_setup_bidirectional_data_transfers, this, std::ref(tac), service_id); - server->after_setup_tcp_socket = std::bind(&tcp_adapter_proxy::async_send_stream_start, this, std::ref(tac), service_id); + server->connectionId_to_tcp_connection_map.clear(); std::shared_ptr retry_config = std::make_shared(tac.io_ctx, GET_SETTING(settings, TCP_CONNECTION_RETRY_COUNT), @@ -277,13 +243,14 @@ namespace aws { namespace iot { namespace securedtunneling { for (auto m: tac.adapter_config.serviceId_to_endpoint_map) { string service_id = m.first; - tcp_adapter_proxy::tcp_socket_reset(tac, service_id, post_reset_operation); + tcp_adapter_proxy::tcp_socket_reset_init(tac, service_id, post_reset_operation); } } - tcp_connection::pointer tcp_adapter_proxy::get_tcp_connection(tcp_adapter_context &tac, string service_id) + tcp_connection::pointer tcp_adapter_proxy::get_tcp_connection(tcp_adapter_context &tac, string service_id, uint32_t connection_id) { - tcp_connection::pointer connection_ptr; + BOOST_LOG_SEV(log, trace) << "Getting tcp connection with id: " << connection_id; + tcp_connection::pointer connection_ptr = nullptr; if (tac.adapter_config.mode == proxy_mode::SOURCE) { if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end()) @@ -291,17 +258,36 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id; return connection_ptr; } - connection_ptr = tac.serviceId_to_tcp_server_map[service_id]->connection_; + tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; + BOOST_LOG_SEV(log, trace) << "num active connections: " << server->connectionId_to_tcp_connection_map.size(); + if (server->connectionId_to_tcp_connection_map.find(connection_id) == server->connectionId_to_tcp_connection_map.end()) + { + BOOST_LOG_SEV(log, debug) << "No connectionId_to_tcp_connection mapping for connection id: " << connection_id; + return connection_ptr; + } + else + { + connection_ptr = tac.serviceId_to_tcp_server_map[service_id]->connectionId_to_tcp_connection_map[connection_id]; + } + } else if (tac.adapter_config.mode == proxy_mode::DESTINATION) { if (tac.serviceId_to_tcp_client_map.find(service_id) == tac.serviceId_to_tcp_client_map.end()) { BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_client mapping for service_id: " << service_id; - return connection_ptr; } - connection_ptr = tac.serviceId_to_tcp_client_map[service_id]->connection_; + tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; + BOOST_LOG_SEV(log, trace) << "num active connections: " << client->connectionId_to_tcp_connection_map.size(); + if (client->connectionId_to_tcp_connection_map.find(connection_id) == client->connectionId_to_tcp_connection_map.end()) + { + return connection_ptr; + } + else + { + connection_ptr = tac.serviceId_to_tcp_client_map[service_id]->connectionId_to_tcp_connection_map[connection_id]; + } } else { @@ -310,15 +296,51 @@ namespace aws { namespace iot { namespace securedtunneling { return connection_ptr; } - void tcp_adapter_proxy::tcp_socket_reset(tcp_adapter_context &tac, string service_id, std::function post_reset_operation) + void tcp_adapter_proxy::tcp_socket_reset_init(tcp_adapter_context &tac, string service_id, std::function post_reset_operation) { - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); - if (!connection->socket_.is_open()) + BOOST_LOG_SEV(log, trace) << "Init tcp socket reset"; + std::unordered_map connection_map; + if (tac.adapter_config.mode == proxy_mode::SOURCE) + { + tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; + connection_map = server->connectionId_to_tcp_connection_map; + tac.num_active_connections = connection_map.size(); + } + else if (tac.adapter_config.mode == proxy_mode::DESTINATION) + { + tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; + connection_map = client->connectionId_to_tcp_connection_map; + tac.num_active_connections = connection_map.size(); + } + + if (!tac.num_active_connections) + { + tac.num_active_connections = 1; + tcp_adapter_proxy::tcp_socket_reset(tac, service_id, 1, post_reset_operation); + } + else + { + for (auto m: connection_map) + { + uint32_t connection_id = m.first; + tcp_adapter_proxy::tcp_socket_reset(tac, service_id, connection_id, post_reset_operation); + } + } + } + void tcp_adapter_proxy::tcp_socket_reset(tcp_adapter_context &tac, string service_id, uint32_t connection_id, std::function post_reset_operation) + { + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + if (!connection || !connection->socket_.is_open()) { BOOST_LOG_SEV(log, debug) << "Ignoring explicit reset because TCP socket is already closed"; + --tac.num_active_connections; + if (!tac.num_active_connections) + { + post_reset_operation(); // setup_tcp_socket + } return; } - BOOST_LOG_SEV(log, debug) << "Handling explicit reset by closing TCP for service id: " << service_id; + BOOST_LOG_SEV(log, debug) << "Handling explicit reset by closing TCP for service id: " << service_id << " connection id: " << connection_id; connection->socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_receive); std::shared_ptr web_socket_write_buffer_drain_complete = std::make_shared(false); @@ -333,36 +355,69 @@ namespace aws { namespace iot { namespace securedtunneling { //anyways given we know we are closing the tcp socket to create a new one anyways BOOST_LOG_SEV(this->log, trace) << "Received expected TCP socket error and ignoring it. TCP socket read loop has been canceled for service id: " << service_id; }; - connection->on_data_message = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1); - connection->on_control_message = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1); connection->on_web_socket_write_buffer_drain_complete = - [=]() + [=, &tac]() { BOOST_LOG_SEV(this->log, trace) << "Post-reset web socket drain complete"; *web_socket_write_buffer_drain_complete = true; if (*tcp_write_buffer_drain_complete) { BOOST_LOG_SEV(this->log, trace) << "Both socket drains complete."; - post_reset_operation(); + --tac.num_active_connections; + if (!tac.num_active_connections) + { + post_reset_operation(); // setup_tcp_socket + } } }; connection->on_tcp_write_buffer_drain_complete = [=, &tac]() { - tcp_connection::pointer connection_to_reset = get_tcp_connection(tac, service_id); - BOOST_LOG_SEV(this->log, trace) << "Post-reset TCP drain complete. Closing TCP socket for service id " << service_id; + tcp_connection::pointer connection_to_reset = get_tcp_connection(tac, service_id, connection_id); + BOOST_LOG_SEV(this->log, trace) << "Post-reset TCP drain complete. Closing TCP socket for service id " << service_id << " connection id " << connection_id; BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << connection_to_reset->socket().remote_endpoint(); connection_to_reset->socket_.close(); *tcp_write_buffer_drain_complete = true; if (*web_socket_write_buffer_drain_complete) { BOOST_LOG_SEV(this->log, trace) << "Both socket drains complete. Setting up TCP socket again"; - post_reset_operation(); + --tac.num_active_connections; + if (!tac.num_active_connections) + { + post_reset_operation(); // setup_tcp_socket + } } }; - async_setup_web_socket_write_buffer_drain(tac, service_id); - async_tcp_write_buffer_drain(tac, service_id); + async_setup_web_socket_write_buffer_drain(tac, service_id, connection_id); + async_tcp_write_buffer_drain(tac, service_id, connection_id); + } + + void tcp_adapter_proxy::tcp_socket_close(tcp_adapter_context &tac, string service_id, uint32_t connection_id) + { + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + if (!connection) + { + BOOST_LOG_SEV(log, debug) << "Ignoring explicit reset... TCP socket has been deleted"; + return; + } + if (!connection->socket_.is_open()) + { + BOOST_LOG_SEV(log, debug) << "Ignoring explicit reset because TCP socket is already closed"; + return; + } + BOOST_LOG_SEV(log, debug) << "Handling explicit reset by closing TCP for service id: " << service_id << " connection id: " << connection_id; + + connection->socket_.shutdown(boost::asio::ip::tcp::socket::shutdown_receive); + connection->on_tcp_write_buffer_drain_complete = + [=, &tac]() + { + tcp_connection::pointer connection_to_reset = get_tcp_connection(tac, service_id, connection_id); + BOOST_LOG_SEV(this->log, trace) << "Post-reset TCP drain complete. Closing TCP socket for service id " << service_id << " connection id " << connection_id; + BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << connection_to_reset->socket().remote_endpoint(); + connection_to_reset->socket_.close(); + delete_tcp_socket(tac, service_id, connection_id); + }; } void tcp_adapter_proxy::web_socket_close_and_stop(tcp_adapter_context &tac) @@ -397,43 +452,80 @@ namespace aws { namespace iot { namespace securedtunneling { tac.io_ctx.stop(); } - void tcp_adapter_proxy::tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &ec, string const & service_id) + void tcp_adapter_proxy::delete_tcp_socket(tcp_adapter_context &tac, string const & service_id, uint32_t const & connection_id) { - BOOST_LOG_SEV(log, debug) << "Handling tcp socket error for service id: " << service_id << " . error message:" << ec.message(); - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); - BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << connection->socket().remote_endpoint(); + if (tac.adapter_config.mode == proxy_mode::SOURCE) + { + tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; + server->connectionId_to_tcp_connection_map.erase(connection_id); + } + else if (tac.adapter_config.mode == proxy_mode::DESTINATION) + { + tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; + client->connectionId_to_tcp_connection_map.erase(connection_id); + } + } + + void tcp_adapter_proxy::tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &ec, string const & service_id, uint32_t const & connection_id) + { + BOOST_LOG_SEV(log, debug) << "Handling tcp socket error for service id: " << service_id << " connection id: " << connection_id << " . error message:" << ec.message(); + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + try + { + BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << connection->socket().remote_endpoint(); + } + catch (std::exception& e) + { + BOOST_LOG_SEV(this->log, info) << "Disconnecting... remote endpoint not found"; + } connection->socket_.close(); connection->tcp_write_buffer_.consume(connection->tcp_write_buffer_.max_size()); - connection->on_data_message = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1); - connection->on_control_message = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1); - - connection->on_web_socket_write_buffer_drain_complete = [&, service_id]() + connection->on_web_socket_write_buffer_drain_complete = [&, service_id, connection_id]() { - tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id); + BOOST_LOG_SEV(this->log, trace) << "on_web_socket_write_buffer_drain_complete callback"; + tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id); - async_send_stream_reset(tac, service_id); + + if (tac.adapter_config.is_v2_message_format) + { + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1); + async_send_stream_reset(tac, service_id, connection_id); + } + else + { + delete_tcp_socket(tac, service_id, connection_id); + async_send_connection_reset(tac, service_id, connection_id); + } + }; - async_setup_web_socket_write_buffer_drain(tac, service_id); + async_setup_web_socket_write_buffer_drain(tac, service_id, connection_id); } void tcp_adapter_proxy::async_send_message(tcp_adapter_context &tac, message const &message) + { + string service_id = message.serviceid(); + uint32_t connection_id = static_cast(message.connectionid()); + async_send_message(tac, message, service_id, connection_id); + } + + void tcp_adapter_proxy::async_send_message(tcp_adapter_context &tac, message const &message, string const &service_id, uint32_t const &connection_id) { boost::beast::flat_buffer outgoing_message_buffer; std::size_t const frame_size = static_cast(message.ByteSizeLong()) + - GET_SETTING(settings, DATA_LENGTH_SIZE); + GET_SETTING(settings, DATA_LENGTH_SIZE); void *frame_data = outgoing_message_buffer.prepare(frame_size).data(); - void *frame_data_msg_offset = reinterpret_cast(reinterpret_cast(frame_data) - + GET_SETTING(settings, DATA_LENGTH_SIZE)); + void *frame_data_msg_offset = reinterpret_cast(reinterpret_cast(frame_data) + + GET_SETTING(settings, DATA_LENGTH_SIZE)); std::uint16_t data_length = static_cast(message.ByteSizeLong()); *reinterpret_cast(frame_data) = boost::endian::native_to_big(data_length); message.SerializeToArray(frame_data_msg_offset, static_cast(GET_SETTING(settings, MESSAGE_MAX_SIZE))); outgoing_message_buffer.commit(frame_size); - string service_id = message.serviceid(); - async_send_message_to_web_socket(tac, std::make_shared(outgoing_message_buffer), service_id); + async_send_message_to_web_socket(tac, std::make_shared(outgoing_message_buffer), service_id, connection_id); } - void tcp_adapter_proxy::async_send_stream_start(tcp_adapter_context &tac, string const & service_id) + void tcp_adapter_proxy::async_send_stream_start(tcp_adapter_context &tac, string const & service_id, uint32_t const & connection_id) { using namespace com::amazonaws::iot::securedtunneling; if (!tac.is_service_ids_received) @@ -442,16 +534,20 @@ namespace aws { namespace iot { namespace securedtunneling { std::make_shared(tac.io_ctx, GET_SETTING(settings, TCP_CONNECTION_RETRY_COUNT), GET_SETTING(settings, TCP_CONNECTION_RETRY_DELAY_MS), - std::bind(&tcp_adapter_proxy::async_send_stream_start, this, std::ref(tac), service_id)); + std::bind(&tcp_adapter_proxy::async_send_stream_start, this, std::ref(tac), service_id, connection_id)); BOOST_LOG_SEV(log, error) << "No service ids received. Will retry."; basic_retry_execute(log, retry_config, []() { throw std::runtime_error("Fail all the retries to get service ids before stream start. Exit."); }); return; } - std::string src_listening_port = boost::lexical_cast(tac.serviceId_to_tcp_server_map[service_id]->acceptor().local_endpoint().port()); - if (tac.adapter_config.serviceId_to_endpoint_map.find(service_id) == tac.adapter_config.serviceId_to_endpoint_map.end() || - tac.adapter_config.serviceId_to_endpoint_map.at(service_id) != src_listening_port) + + if (tac.adapter_config.mode == proxy_mode::SOURCE) { - throw std::runtime_error((boost::format("Receive incoming connection from non-configured port: %1%") % src_listening_port).str()); + std::string src_listening_port = boost::lexical_cast(tac.serviceId_to_tcp_server_map[service_id]->acceptor().local_endpoint().port()); + if (tac.adapter_config.serviceId_to_endpoint_map.find(service_id) == tac.adapter_config.serviceId_to_endpoint_map.end() || + tac.adapter_config.serviceId_to_endpoint_map.at(service_id) != src_listening_port) + { + throw std::runtime_error((boost::format("Receive incoming connection from non-configured port: %1%") % src_listening_port).str()); + } } /** @@ -474,65 +570,126 @@ namespace aws { namespace iot { namespace securedtunneling { // Update streamId <-> serviceId mapping for future book keeping tac.serviceId_to_streamId_map[service_id] = new_stream_id; - BOOST_LOG_SEV(log, debug) << "Setting new stream ID to: " << new_stream_id << ", service id: " << service_id; + BOOST_LOG_SEV(log, debug) << "Sending stream start, setting new stream ID to: " << new_stream_id << ", service id: " << service_id; outgoing_message.set_type(Message_Type_STREAM_START); outgoing_message.set_serviceid(service_id); outgoing_message.set_streamid(new_stream_id); + outgoing_message.set_connectionid(connection_id); outgoing_message.set_ignorable(false); outgoing_message.clear_payload(); async_send_message(tac, outgoing_message); } - void tcp_adapter_proxy::async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id) + void tcp_adapter_proxy::async_send_connection_start(tcp_adapter_context &tac, string const & service_id, uint32_t const & connection_id) { + BOOST_LOG_SEV(log, info) << " sending connection start for service id: " << service_id << " connection id: " << connection_id; using namespace com::amazonaws::iot::securedtunneling; - BOOST_LOG_SEV(log, trace) << "Reset stream for service id: " << service_id; + if (!tac.is_service_ids_received) + { + std::shared_ptr retry_config = + std::make_shared(tac.io_ctx, + GET_SETTING(settings, TCP_CONNECTION_RETRY_COUNT), + GET_SETTING(settings, TCP_CONNECTION_RETRY_DELAY_MS), + std::bind(&tcp_adapter_proxy::async_send_connection_start, this, std::ref(tac), service_id, connection_id)); + BOOST_LOG_SEV(log, error) << "No service ids received. Will retry."; + basic_retry_execute(log, retry_config, []() { throw std::runtime_error("Fail all the retries to get service ids before stream start. Exit."); }); + return; + } + std::string src_listening_port = boost::lexical_cast(tac.serviceId_to_tcp_server_map[service_id]->acceptor().local_endpoint().port()); + if (tac.adapter_config.serviceId_to_endpoint_map.find(service_id) == tac.adapter_config.serviceId_to_endpoint_map.end() || + tac.adapter_config.serviceId_to_endpoint_map.at(service_id) != src_listening_port) + { + throw std::runtime_error((boost::format("Receive incoming connection from non-configured port: %1%") % src_listening_port).str()); + } + + if(tac.serviceId_to_streamId_map.find(service_id) == tac.serviceId_to_streamId_map.end()) + { + BOOST_LOG_SEV(log, error) << "No stream id found for service id:" << service_id << " stopping."; + return; + } + std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id]; + + outgoing_message.set_type(Message_Type_CONNECTION_START); + outgoing_message.set_serviceid(service_id); + outgoing_message.set_streamid(stream_id); + outgoing_message.set_connectionid(connection_id); + outgoing_message.set_ignorable(false); + outgoing_message.clear_payload(); + async_send_message(tac, outgoing_message); + } + + void tcp_adapter_proxy::async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id) + { + using namespace com::amazonaws::iot::securedtunneling; + BOOST_LOG_SEV(log, trace) << "Send reset stream for service id: " << service_id; if (tac.serviceId_to_streamId_map.find(service_id) == tac.serviceId_to_streamId_map.end()) { BOOST_LOG_SEV(log, warning) << "No stream id mapping found for service id " << service_id << " . Skip stream reset."; return; } + // NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now. std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id]; outgoing_message.set_type(Message_Type_STREAM_RESET); outgoing_message.set_serviceid(service_id); outgoing_message.set_streamid(stream_id); + outgoing_message.set_connectionid(0); + outgoing_message.set_ignorable(false); + outgoing_message.clear_payload(); + async_send_message(tac, outgoing_message, service_id, connection_id); + } + + void tcp_adapter_proxy::async_send_connection_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id) + { + using namespace com::amazonaws::iot::securedtunneling; + BOOST_LOG_SEV(log, trace) << "Reset connection for service id: " << service_id << " connection id: " << connection_id; + if (tac.serviceId_to_streamId_map.find(service_id) == tac.serviceId_to_streamId_map.end()) + { + BOOST_LOG_SEV(log, warning) << "No stream id mapping found for service id " << service_id << " . Skip connection reset."; + return; + } + // NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now. + std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id]; + outgoing_message.set_type(Message_Type_CONNECTION_RESET); + outgoing_message.set_serviceid(service_id); + outgoing_message.set_streamid(stream_id); + outgoing_message.set_connectionid(connection_id); outgoing_message.set_ignorable(false); outgoing_message.clear_payload(); async_send_message(tac, outgoing_message); } - void tcp_adapter_proxy::async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, string const & service_id) + void tcp_adapter_proxy::async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, string const & service_id, uint32_t const & connection_id) { - BOOST_LOG_SEV(log, trace) << "Setting up bi-directional data transfer for service id: " << service_id; + BOOST_LOG_SEV(log, trace) << "Setting up bi-directional data transfer for service id: " << service_id << " connection id: " << connection_id; // clear tcp_buffers for this stream - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); if (!connection) { BOOST_LOG_SEV(log, trace) << "Null connection pointers, skip"; return; } clear_tcp_connection_buffers(connection); - connection->on_control_message = std::bind(&tcp_adapter_proxy::handle_control_message_data_transfer, this, std::ref(tac), std::placeholders::_1); - connection->on_data_message = std::bind(&tcp_adapter_proxy::forward_data_message_to_tcp_write, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::handle_control_message_data_transfer, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::forward_data_message_to_tcp_write, this, std::ref(tac), std::placeholders::_1); this->async_web_socket_read_loop(tac); - this->async_tcp_socket_read_loop(tac, service_id); + this->async_tcp_socket_read_loop(tac, service_id, connection_id); } void tcp_adapter_proxy::async_web_socket_read_until_stream_start(tcp_adapter_context &tac, string const & service_id) { BOOST_LOG_SEV(log, trace) << "Waiting for stream start..."; tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; - client->connection_->on_control_message = std::bind(&tcp_adapter_proxy::async_wait_for_stream_start, this, std::ref(tac), std::placeholders::_1); - client->connection_->on_data_message = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::async_wait_for_stream_start, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); this->async_web_socket_read_loop(tac); } void tcp_adapter_proxy::handle_web_socket_control_message(tcp_adapter_context &tac, boost::beast::websocket::frame_type ws_message_type, boost::beast::string_view payload) { #ifdef DEBUG - BOOST_LOG_SEV(log, debug) << "Control message recieved enum(close=0, ping=1, pong=2): " << static_cast(ws_message_type); + BOOST_LOG_SEV(log, debug) << "Control message received enum(close=0, ping=1, pong=2): " << static_cast(ws_message_type); #endif boost::beast::websocket::ping_data pd{ payload }; long long now_millis = 0; @@ -546,7 +703,7 @@ namespace aws { namespace iot { namespace securedtunneling { break; case boost::beast::websocket::frame_type::ping: #ifdef DEBUG - BOOST_LOG_SEV(log, debug) << "Websocket ping recieved: " << pd; + BOOST_LOG_SEV(log, debug) << "Websocket ping received: " << pd; #endif tac.wss->async_pong(pd, [&](boost::system::error_code const &ec) { @@ -773,13 +930,18 @@ namespace aws { namespace iot { namespace securedtunneling { } } - void tcp_adapter_proxy::async_tcp_socket_read_loop(tcp_adapter_context & tac, string const & service_id) + void tcp_adapter_proxy::async_tcp_socket_read_loop(tcp_adapter_context & tac, string const & service_id, uint32_t const & connection_id) { - BOOST_LOG_SEV(log, trace) << "Begin tcp socket read loop for service id : " << service_id; - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); + BOOST_LOG_SEV(log, trace) << "Begin tcp socket read loop for service id : " << service_id << " connection id : " << connection_id; + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + if (!connection) + { + BOOST_LOG_SEV(log, trace) << "socket for service id : " << service_id << " connection id: " << connection_id << " does not exist, skip reading"; + return; + } if (!connection->socket().is_open()) { - BOOST_LOG_SEV(log, trace) << "socket for service id : " << service_id << " is not open yet, skip reading"; + BOOST_LOG_SEV(log, trace) << "socket for service id : " << service_id << " connection id: " << connection_id << " is not open yet, skip reading"; return; } if (connection->is_tcp_socket_reading_) @@ -794,13 +956,18 @@ namespace aws { namespace iot { namespace securedtunneling { std::size_t max_bytes_to_read = std::min(connection->web_socket_data_write_buffer_.max_size() - connection->web_socket_data_write_buffer_.size(), connection->tcp_read_buffer_.max_size()); connection->is_tcp_socket_reading_ = true; connection->socket_.async_read_some(connection->tcp_read_buffer_.prepare(max_bytes_to_read), - [&, service_id](boost::system::error_code const &ec, std::size_t const bytes_read) + [&, service_id, connection_id](boost::system::error_code const &ec, std::size_t const bytes_read) { - BOOST_LOG_SEV(log, trace) << "Reading from tcp socket for service id " << service_id; - tcp_connection::pointer socket_read_connection = get_tcp_connection(tac, service_id); + BOOST_LOG_SEV(log, trace) << "Reading from tcp socket for service id " << service_id << " connection id " << connection_id; + tcp_connection::pointer socket_read_connection = get_tcp_connection(tac, service_id, connection_id); + if (!socket_read_connection) + { + return; + } socket_read_connection->is_tcp_socket_reading_ = false; if (ec) { + BOOST_LOG_SEV(log, trace) << "received error code: " << ec; if (socket_read_connection->on_tcp_error) { socket_read_connection->on_tcp_error(ec); @@ -808,7 +975,7 @@ namespace aws { namespace iot { namespace securedtunneling { } else { - tcp_socket_error(tac, ec, service_id); + tcp_socket_error(tac, ec, service_id, connection_id); } } else @@ -824,14 +991,14 @@ namespace aws { namespace iot { namespace securedtunneling { if (wss_has_enough_write_buffer_space(socket_read_connection->web_socket_data_write_buffer_)) { - async_tcp_socket_read_loop(tac, service_id); + async_tcp_socket_read_loop(tac, service_id, connection_id); } else { BOOST_LOG_SEV(log, debug) << "No more space in web socket write buffer or tcp socket is closed. Stopping tcp read loop"; } if (socket_read_connection->web_socket_data_write_buffer_.size() > 0) { - async_setup_web_socket_write_buffer_drain(tac, service_id); + async_setup_web_socket_write_buffer_drain(tac, service_id, connection_id); } } }); @@ -876,26 +1043,40 @@ namespace aws { namespace iot { namespace securedtunneling { using namespace com::amazonaws::iot::securedtunneling; BOOST_LOG_SEV(log, trace) << "Wait for control message stream start, receive message type:" << message.type(); std::int32_t stream_id = static_cast(message.streamid()); + uint32_t connection_id = static_cast(message.connectionid()); + + // backwards compatiblity with v2 + if (!connection_id) + { + connection_id = 1; + tac.adapter_config.is_v2_message_format = true; + } string service_id = message.serviceid(); switch (message.type()) { case Message_Type_SESSION_RESET: #ifdef DEBUG - BOOST_LOG_SEV(log, trace) << "Session reset recieved"; + BOOST_LOG_SEV(log, trace) << "Session reset received"; #endif return true; case Message_Type_STREAM_RESET: - //while waiting for stream start (destination mode implied), no TCP socket is present so these - //messages are no-op + // while waiting for stream start (destination mode implied), no TCP socket is present so these + // messages are no-op #ifdef DEBUG - BOOST_LOG_SEV(log, trace) << "Stream reset recieved"; + BOOST_LOG_SEV(log, trace) << "Stream reset received"; + #endif + return true; + case Message_Type_CONNECTION_RESET: + // while waiting for stream start (destination mode implied), no TCP socket is present so these + // messages are no-op + #ifdef DEBUG + BOOST_LOG_SEV(log, trace) << "Connection reset received"; #endif return true; case Message_Type_STREAM_START: #ifdef DEBUG - BOOST_LOG_SEV(log, debug) << "Stream start recieved"; + BOOST_LOG_SEV(log, debug) << "Stream start received"; #endif - stream_id = static_cast(message.streamid()); if (!stream_id) { throw proxy_exception("No stream ID set for stream start message!"); @@ -912,10 +1093,17 @@ namespace aws { namespace iot { namespace securedtunneling { } tac.serviceId_to_streamId_map[service_id] = stream_id; - tac.serviceId_to_tcp_client_map[service_id]->on_receive_stream_start(); + async_setup_dest_tcp_socket(tac, service_id, connection_id, true); return false; + case Message_Type_CONNECTION_START: + // while waiting for stream start (destination mode implied), no TCP socket is present so these + // messages are no-op + #ifdef DEBUG + BOOST_LOG_SEV(log, trace) << "Connection start received"; + #endif + return true; case Message_Type_DATA: //handling the following cases alleviates clang compiler warnings - throw std::logic_error("Data message recieved in control message handler"); + throw std::logic_error("Data message received in control message handler"); case Message_Type_SERVICE_IDS: // service ids should already be received at this point, no actions to process again. return true; @@ -923,8 +1111,7 @@ namespace aws { namespace iot { namespace securedtunneling { case Message_Type_Message_Type_INT_MIN_SENTINEL_DO_NOT_USE_: case Message_Type_Message_Type_INT_MAX_SENTINEL_DO_NOT_USE_: //Can only use the following when linked to full ProtocolBuffers library rather than LITE - //throw proxy_exception((boost::format("Unexpected message type recieved during control message handling during data transfer: %1%") % External_MessageType_Name(message.messagetype())).str()); - throw proxy_exception((boost::format("Unexpected message type recieved while waiting for stream start: %1%") % message.type()).str()); + throw proxy_exception((boost::format("Unexpected message type received while waiting for stream start: %1%") % message.type()).str()); default: if (message.ignorable()) { return true; @@ -1111,6 +1298,14 @@ namespace aws { namespace iot { namespace securedtunneling { using namespace com::amazonaws::iot::securedtunneling; BOOST_LOG_SEV(log, trace) << "Handling control message..."; std::int32_t stream_id = static_cast(message.streamid()); + uint32_t connection_id = static_cast(message.connectionid()); + + //for backwards compatibility with v2 + if (!connection_id) + { + connection_id = 1; + tac.adapter_config.is_v2_message_format = true; + } string service_id = message.serviceid(); // v1 message format does not need to validate service id. Set to the one service id stored in memory. if (tac.adapter_config.is_v1_message_format) @@ -1121,18 +1316,24 @@ namespace aws { namespace iot { namespace securedtunneling { { case Message_Type_SESSION_RESET: #ifdef DEBUG - BOOST_LOG_SEV(log, trace) << "Session reset recieved"; + BOOST_LOG_SEV(log, trace) << "Session reset received"; #endif //validation has already been done on stream_id before calling this, so we can just listen tcp_socket_reset_all(tac, std::bind(&tcp_adapter_proxy::setup_tcp_sockets, this, std::ref(tac))); return true; //indicates we should stop reading from the web socket after processing this message case Message_Type_STREAM_RESET: - #ifdef DEBUG - BOOST_LOG_SEV(log, trace) << "Stream reset recieved"; - #endif + BOOST_LOG_SEV(log, trace) << "Stream reset received"; //validation has already been done on stream_id before calling this, so we can just listen - tcp_socket_reset(tac, service_id, std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id)); - return true; //indicates we should stop reading from the web socket after processing this message + tcp_socket_reset_init(tac, service_id, + std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), + service_id)); + return false; //indicates we should stop reading from the web socket after processing this message + + case Message_Type_CONNECTION_RESET: + BOOST_LOG_SEV(log, trace) << "Connection reset received for connection id: " << connection_id; + tcp_socket_close(tac, service_id, connection_id); + return true; + case Message_Type_STREAM_START: //could verify that this is a destination mode local proxy. Source mode shouldn't receive stream start if (!stream_id) { @@ -1148,26 +1349,48 @@ namespace aws { namespace iot { namespace securedtunneling { { BOOST_LOG_SEV(log, warning) << "Stream start received during data transfer for service id :" << service_id << "with new stream id: " << message.streamid(); BOOST_LOG_SEV(log, warning) << "Reset this stream"; - tcp_socket_reset(tac, service_id, std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id)); + tcp_socket_reset_init(tac, service_id, + std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), + service_id)); + } + return true; + + case Message_Type_CONNECTION_START: + if (!stream_id) + { + throw proxy_exception("No stream ID set for connection start message!"); + } + BOOST_LOG_SEV(log, debug) << "Received service id :" << service_id << " ,stream id: " << message.streamid() << " , connection id: " << message.connectionid() << " for connection start"; + // v1 message format does not need to validate service id. Set to the one service id stored in memory. + if (tac.adapter_config.is_v1_message_format) + { + service_id = tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first; + } + else if (tac.adapter_config.serviceId_to_endpoint_map.find(service_id) == tac.adapter_config.serviceId_to_endpoint_map.end()) + { + throw proxy_exception((boost::format("Invalid service id received for connection start: %1%") % service_id).str()); } + + async_setup_dest_tcp_socket(tac, service_id, connection_id, false); return true; + case Message_Type_SERVICE_IDS: // service ids should be received and validate before any stream can start. Ignore this control message if receive after stream already start. BOOST_LOG_SEV(log, info) << "Receive service Ids during data transfer. ignore"; return true; case Message_Type_DATA: //handling the following cases alleviates clang compiler warnings - throw std::logic_error("Data message recieved in control message handler"); + throw std::logic_error("Data message received in control message handler"); case Message_Type_UNKNOWN: case Message_Type_Message_Type_INT_MIN_SENTINEL_DO_NOT_USE_: case Message_Type_Message_Type_INT_MAX_SENTINEL_DO_NOT_USE_: //message-lite in C++ (gcc) generates a far far smaller executable. Likely a gcc issue since msvc generates reasonably sized executable either way - //throw proxy_exception((boost::format("Unexpected message type recieved during control message handling during data transfer: %1%") % External_MessageType_Name(message.messagetype())).str()); - throw proxy_exception((boost::format("Unexpected message type recieved during control message handling during data transfer: %1%") % message.type()).str()); + //throw proxy_exception((boost::format("Unexpected message type received during control message handling during data transfer: %1%") % External_MessageType_Name(message.messagetype())).str()); + throw proxy_exception((boost::format("Unexpected message type received during control message handling during data transfer: %1%") % message.type()).str()); default: if (message.ignorable()) { return true; } - throw std::logic_error((boost::format("Unrecognized message type recieved during control message handling during data transfer: %1%") % message.type()).str()); + throw std::logic_error((boost::format("Unrecognized message type received during control message handling during data transfer: %1%") % message.type()).str()); } } @@ -1176,6 +1399,15 @@ namespace aws { namespace iot { namespace securedtunneling { // Get the endpoint information based on the service id mapping // Validate if this mapping exists, if not, discard the message string service_id = message.serviceid(); + uint32_t connection_id = static_cast(message.connectionid()); + + //for backwards compatiblity with v2 + if (!connection_id) + { + connection_id = 1; + tac.adapter_config.is_v2_message_format = true; + } + BOOST_LOG_SEV(log, trace) << "Forwarding message to tcp socket with connection id: " << connection_id; /** * v1 message format does not need to have service id field, so we don't need to do validation on this field. * Fill the service id with the current one used in the local proxy mapping. @@ -1189,7 +1421,12 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, error) << "Received non exist service Id, ignore"; return false; } - tcp_connection::pointer connection = get_tcp_connection(tac, service_id);; + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + if (!connection) + { + BOOST_LOG_SEV(log, debug) << "Received non exist connection Id, skipping..."; + return true; + } //capture write buffer size (we care if it is empty, that means we will need to trigger a drain) size_t write_buffer_size_before = connection->tcp_write_buffer_.size(); boost::asio::buffer_copy(connection->tcp_write_buffer_.prepare(message.payload().size()), boost::asio::buffer(message.payload())); @@ -1197,18 +1434,15 @@ namespace aws { namespace iot { namespace securedtunneling { if (write_buffer_size_before == 0) { - async_tcp_write_buffer_drain(tac, service_id); - } - - if (tcp_has_enough_write_buffer_space(connection)) - { - return true; - } - else //tcp write buffer is full, instruct caller to not perform subsequent read - { - BOOST_LOG_SEV(log, debug) << "TCP write buffer full. Stopping web socket read loop"; - return false; + try { + async_tcp_write_buffer_drain(tac, service_id, connection_id); + } + catch (proxy_exception &e) + { + BOOST_LOG_SEV(log, info) << "Message from tunnel peer targets socket already closed: " << e.what(); + } } + return true; } void tcp_adapter_proxy::on_web_socket_read(tcp_adapter_context &tac, boost::system::error_code const &ec, size_t bytes_read) @@ -1260,14 +1494,14 @@ namespace aws { namespace iot { namespace securedtunneling { throw proxy_exception((boost::format("Could not parse web socket binary frame into message: %1%") % incoming_message.InitializationErrorString()).str()); } #ifdef DEBUG - //BOOST_LOG_SEV(log, trace) << "Message recieved:\n" << message.DebugString(); //re-add when linked to protobuf instead of protobuf-lite + //BOOST_LOG_SEV(log, trace) << "Message received:\n" << message.DebugString(); //re-add when linked to protobuf instead of protobuf-lite BOOST_LOG_SEV(log, trace) << "Message parsed successfully , type :" << incoming_message.type(); #endif if (!is_valid_stream_id(tac, incoming_message)) { continue_reading = true; #ifdef DEBUG - BOOST_LOG_SEV(log, trace) << "Stale message recieved. Dropping"; + BOOST_LOG_SEV(log, trace) << "Stale message received. Dropping"; #endif } else @@ -1278,27 +1512,33 @@ namespace aws { namespace iot { namespace securedtunneling { { service_id = tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first; } - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); - // if per connection handler is available, trigger them. + + // if per service handler is available, trigger them. if (incoming_message.type() != Message_Type_DATA) { - if (connection != nullptr && connection->on_control_message != nullptr) + BOOST_LOG_SEV(log, trace) << "Processing control message"; + if (tac.serviceId_to_control_message_handler_map.find(service_id) != tac.serviceId_to_control_message_handler_map.end()) { - continue_reading = connection->on_control_message(incoming_message); + BOOST_LOG_SEV(log, trace) << "Using service-specific handler for service id: " << service_id; + continue_reading = tac.serviceId_to_control_message_handler_map[service_id](incoming_message); } else { + BOOST_LOG_SEV(log, trace) << "Using global handler"; continue_reading = on_web_socket_control_message(incoming_message); } } else if (incoming_message.type() == Message_Type_DATA) { - if (connection != nullptr && connection->on_data_message != nullptr) + BOOST_LOG_SEV(log, trace) << "Processing data message"; + if (tac.serviceId_to_data_message_handler_map.find(service_id) != tac.serviceId_to_data_message_handler_map.end()) { - continue_reading = connection->on_data_message(incoming_message); + BOOST_LOG_SEV(log, trace) << "Using service-specific handler for service id: " << service_id; + continue_reading = tac.serviceId_to_data_message_handler_map[service_id](incoming_message); } else { + BOOST_LOG_SEV(log, trace) << "Using global handler"; continue_reading = on_web_socket_data_message(incoming_message); } @@ -1312,6 +1552,7 @@ namespace aws { namespace iot { namespace securedtunneling { } } + BOOST_LOG_SEV(log, trace) << "return continue_reading " << continue_reading; return continue_reading; } @@ -1331,13 +1572,6 @@ namespace aws { namespace iot { namespace securedtunneling { { throw std::logic_error("Cannot run web socket read loop without handlers in place for control messages and data messages"); } - if (!tcp_has_enough_write_buffer_space(tac)) - { - BOOST_LOG_SEV(log, trace) << "Scheduled async web socket read into tcp write buffer and it does not have enough space!"; - #ifdef DEBUG - BOOST_LOG_SEV(log, trace) << "Scheduled async web socket read into tcp write buffer and it does not have enough space!"; - #endif - } else if (tac.is_web_socket_reading) { @@ -1384,18 +1618,28 @@ namespace aws { namespace iot { namespace securedtunneling { } } - void tcp_adapter_proxy::async_tcp_write_buffer_drain(tcp_adapter_context &tac, string service_id) + void tcp_adapter_proxy::async_tcp_write_buffer_drain(tcp_adapter_context &tac, string service_id, uint32_t connection_id) { - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + if (!connection) + { + BOOST_LOG_SEV(log, trace) << "tcp socket does not exist for connection id: " << connection_id << ". Skipping..."; + return; + } if (!connection->socket_.is_open()) { + BOOST_LOG_SEV(log, trace) << "not open for service id " << service_id << ", connection id: " << connection_id << ". Skipping..."; throw proxy_exception((boost::format("TCP socket is not open service id: %1%") % service_id).str()); } static std::function write_done; - write_done = [&, service_id](boost::system::error_code const &ec, size_t bytes_written) + write_done = [&, service_id, connection_id](boost::system::error_code const &ec, size_t bytes_written) { - BOOST_LOG_SEV(log, trace) << "write done service id " << service_id; - tcp_connection::pointer socket_write_connection = get_tcp_connection(tac, service_id); + BOOST_LOG_SEV(log, trace) << "write done service id " << service_id << ", connection id: " << connection_id; + tcp_connection::pointer socket_write_connection = get_tcp_connection(tac, service_id, connection_id); + if (!socket_write_connection) + { + return; + } socket_write_connection->is_tcp_socket_writing_ = false; if (ec) { @@ -1406,7 +1650,7 @@ namespace aws { namespace iot { namespace securedtunneling { } else { - tcp_socket_error(tac, ec, service_id); + tcp_socket_error(tac, ec, service_id, connection_id); } } else @@ -1457,11 +1701,16 @@ namespace aws { namespace iot { namespace securedtunneling { } } - void tcp_adapter_proxy::async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id) + void tcp_adapter_proxy::async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id) { - BOOST_LOG_SEV(log, trace) << "Web socket write buffer drain for service id: " << service_id; + BOOST_LOG_SEV(log, trace) << "Web socket write buffer drain for service id: " << service_id << ", connection id: " << connection_id; boost::beast::flat_buffer outgoing_message_buffer; - tcp_connection::pointer connection = get_tcp_connection(tac, service_id); + tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id); + if (!connection) + { + BOOST_LOG_SEV(log, trace) << "Tcp connection service id: " << service_id << ", connection id: " << connection_id << " does not exist anymore. Returning."; + return; + } using namespace com::amazonaws::iot::securedtunneling; if (connection->web_socket_data_write_buffer_.size() > 0) { @@ -1476,10 +1725,11 @@ namespace aws { namespace iot { namespace securedtunneling { { throw proxy_exception((boost::format("No streamId exists for the service Id %1%") % service_id).str()); } - BOOST_LOG_SEV(log, debug) << "Prepare to send data message: service id: " << service_id << " stream id: " << tac.serviceId_to_streamId_map[service_id]; + BOOST_LOG_SEV(log, debug) << "Prepare to send data message: service id: " << service_id << " stream id: " << tac.serviceId_to_streamId_map[service_id] << " connection id: " << connection_id; // Construct outgoing message outgoing_message.set_serviceid(service_id); outgoing_message.set_streamid(tac.serviceId_to_streamId_map[service_id]); + outgoing_message.set_connectionid(connection_id); size_t const send_size = std::min(GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE), connection->web_socket_data_write_buffer_.size()); boost::asio::buffer_copy(outgoing_message_buffer.prepare(send_size), connection->web_socket_data_write_buffer_.data(), send_size); @@ -1491,14 +1741,14 @@ namespace aws { namespace iot { namespace securedtunneling { outgoing_message_buffer.consume(outgoing_message_buffer.max_size()); //after message is sent, continue with the loop - connection->after_send_message = std::bind(&tcp_adapter_proxy::async_setup_web_socket_write_buffer_drain, this, std::ref(tac), service_id); + connection->after_send_message = std::bind(&tcp_adapter_proxy::async_setup_web_socket_write_buffer_drain, this, std::ref(tac), service_id, connection_id); async_send_message(tac, outgoing_message); //if this write cleared up enough space if (wss_has_enough_write_buffer_space(connection->web_socket_data_write_buffer_)) { - BOOST_LOG_SEV(log, debug) << "Write buffer has enough space, continue tcp read loop for " << service_id ; - async_tcp_socket_read_loop(tac, service_id); + BOOST_LOG_SEV(log, debug) << "Write buffer has enough space, continue tcp read loop for " << service_id << " connection id: " << connection_id; + async_tcp_socket_read_loop(tac, service_id, connection_id); } else { @@ -1517,6 +1767,14 @@ namespace aws { namespace iot { namespace securedtunneling { for (auto m: tac.adapter_config.serviceId_to_endpoint_map) { string service_id = m.first; + if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end()) + { + tac.serviceId_to_tcp_server_map[service_id] = tcp_server::create(tac.io_ctx, + GET_SETTING(settings, TCP_WRITE_BUFFER_SIZE), + GET_SETTING(settings, TCP_READ_BUFFER_SIZE), + GET_SETTING(settings, WEB_SOCKET_WRITE_BUFFER_SIZE)); + } + BOOST_LOG_SEV(log, info) << "calling setup from loop"; setup_tcp_socket(tac, service_id); } } @@ -1526,23 +1784,38 @@ namespace aws { namespace iot { namespace securedtunneling { for (auto m: tac.adapter_config.serviceId_to_endpoint_map) { string service_id = m.first; + if (tac.serviceId_to_tcp_client_map.find(service_id) == tac.serviceId_to_tcp_client_map.end()) + { + tac.serviceId_to_tcp_client_map[service_id] = tcp_client::create(tac.io_ctx, + GET_SETTING(settings, TCP_WRITE_BUFFER_SIZE), + GET_SETTING(settings, TCP_READ_BUFFER_SIZE), + GET_SETTING(settings, WEB_SOCKET_WRITE_BUFFER_SIZE)); + } setup_tcp_socket(tac, service_id); } } - void tcp_adapter_proxy::async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr const& data_to_send, std::string const & service_id) + void tcp_adapter_proxy::async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr const& data_to_send, std::string const & service_id, uint32_t const & connection_id) { - BOOST_LOG_SEV(log, trace) << "Sending messages over web socket for service id: " << service_id; + BOOST_LOG_SEV(log, trace) << "Sending messages over web socket for service id: " << service_id << " connection id: " << connection_id; BOOST_LOG_SEV(log, trace) << "Current queue size: " << tac.web_socket_outgoing_message_queue.size(); // Always add to queue and invoke the send message complete if (data_to_send != nullptr) { - BOOST_LOG_SEV(log, trace) << "Put data " << data_to_send->size() << " bytes into the web_socket_outgoing_message_queue for service id: " << service_id; - tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id); - data_message temp = std::make_pair(data_to_send, socket_connection->after_send_message); + BOOST_LOG_SEV(log, trace) << "Put data " << data_to_send->size() << " bytes into the web_socket_outgoing_message_queue for service id: " << service_id << " connection id: " << connection_id; + data_message temp_msg; + tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); + if (socket_connection) + { + temp_msg = std::make_pair(data_to_send, socket_connection->after_send_message); + } + else + { + temp_msg = std::make_pair(data_to_send, nullptr); + } const std::lock_guard lock(tac.web_socket_outgoing_message_queue_mutex); - tac.web_socket_outgoing_message_queue.push(temp); + tac.web_socket_outgoing_message_queue.push(temp_msg); // Are we already writing? if(tac.web_socket_outgoing_message_queue.size() > 1) { @@ -1559,11 +1832,12 @@ namespace aws { namespace iot { namespace securedtunneling { { throw proxy_exception("Error sending web socket message", ec); } - BOOST_LOG_SEV(log, trace) << "Sent " << bytes_sent << " bytes over websocket for service id: " << service_id; + BOOST_LOG_SEV(log, trace) << "Sent " << bytes_sent << " bytes over websocket for service id: " << service_id << " connection id: " << connection_id; std::function capture_after_send_message = message_to_send.second; if(capture_after_send_message) { + BOOST_LOG_SEV(log, trace) << "capturing after_send_message"; capture_after_send_message(); } @@ -1574,14 +1848,13 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, trace) << "web_socket_outgoing_message_queue is empty, no more messages to send."; return; } - async_send_message_to_web_socket(tac, nullptr, service_id); + async_send_message_to_web_socket(tac, nullptr, service_id, connection_id); }); } void tcp_adapter_proxy::async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr retry_config, string service_id) { tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; - tcp_socket_ensure_closed(server->connection_->socket()); server->acceptor_.close(); static boost::asio::socket_base::reuse_address reuse_addr_option(true); @@ -1639,51 +1912,93 @@ namespace aws { namespace iot { namespace securedtunneling { { tac.adapter_config.on_listen_port_assigned(local_port, service_id); } - server->acceptor_.async_accept( - [=, &tac](boost::system::error_code const &ec, boost::asio::ip::tcp::socket new_socket) - { - - if (ec) - { - BOOST_LOG_SEV(log, error) << (boost::format("Could not listen/accept incoming connection on %1%:%2% -- %3%") - % tac.bind_address_actual % local_port % ec.message()).str(); - basic_retry_execute(log, retry_config, - [=, &ec]() { throw std::runtime_error((boost::format("Failed to accept new connection on %1% -- %2%") % local_port % ec.message()).str()); }); - } - else - { - BOOST_LOG_SEV(log, debug) << "socket port " << new_socket.local_endpoint().port(); - string endpoint = boost::lexical_cast(new_socket.local_endpoint().port()); - BOOST_LOG_SEV(log, debug) << "endpoint mapping:"; - for (auto m: tac.adapter_config.serviceId_to_endpoint_map) - { - BOOST_LOG_SEV(log, debug) << m.first << " = " << m.second; - } - tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; - server->connection_->socket() = std::move(new_socket); - BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connection_->socket().local_endpoint().port() << " from " << server->connection_->socket().remote_endpoint(); - invoke_and_clear_handler(server->after_setup_tcp_socket); - } - }); + do_accept_tcp_connection(tac, retry_config, service_id, local_port, true); } } } }); } - void tcp_adapter_proxy::async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr retry_config, string const & service_id, boost::system::error_code const &ec, tcp::resolver::results_type results) + void tcp_adapter_proxy::do_accept_tcp_connection(tcp_adapter_context &tac, std::shared_ptr retry_config, string service_id, std::uint16_t local_port, bool is_first_connection) { - BOOST_LOG_SEV(log, trace) << "Resolve destination to connect for service id: " << service_id; + retry_config->operation = std::bind(&tcp_adapter_proxy::do_accept_tcp_connection, this, std::ref(tac), retry_config, service_id, local_port, is_first_connection); + tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; + + server->acceptor_.async_accept(tac.io_ctx, + [=, &tac](boost::system::error_code const &ec, boost::asio::ip::tcp::socket new_socket) + { + if (ec) + { + BOOST_LOG_SEV(log, error) << (boost::format("Could not listen/accept incoming connection on %1%:%2% -- %3%") + % tac.bind_address_actual % local_port % ec.message()).str(); + basic_retry_execute(log, retry_config, + [=, &ec]() { throw std::runtime_error((boost::format("Failed to accept new connection on %1% -- %2%") % local_port % ec.message()).str()); }); + } + else + { + BOOST_LOG_SEV(log, debug) << "socket port " << new_socket.local_endpoint().port(); + string endpoint = boost::lexical_cast(new_socket.local_endpoint().port()); + BOOST_LOG_SEV(log, debug) << "endpoint mapping:"; + for (auto m: tac.adapter_config.serviceId_to_endpoint_map) + { + BOOST_LOG_SEV(log, debug) << m.first << " = " << m.second; + } + tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id]; + + uint32_t new_connection_id = ++server->highest_connection_id; + + // backwards compatibility + if (tac.adapter_config.is_v2_message_format) + { + new_connection_id = 1; + } + BOOST_LOG_SEV(log, info) << "creating tcp connection id " << new_connection_id; + + if (server->connectionId_to_tcp_connection_map.find(new_connection_id) == server->connectionId_to_tcp_connection_map.end() && + server->connectionId_to_tcp_connection_map.size() < GET_SETTING(settings, MAX_ACTIVE_CONNECTIONS)) + { + server->connectionId_to_tcp_connection_map[new_connection_id] = tcp_connection::create(tac.io_ctx, + GET_SETTING(settings, TCP_WRITE_BUFFER_SIZE), + GET_SETTING(settings, TCP_READ_BUFFER_SIZE), + GET_SETTING(settings, WEB_SOCKET_WRITE_BUFFER_SIZE), + new_connection_id); + } + + server->connectionId_to_tcp_connection_map[new_connection_id]->after_send_message = std::bind(&tcp_adapter_proxy::async_setup_bidirectional_data_transfers, this, std::ref(tac), service_id, new_connection_id); + + server->connectionId_to_tcp_connection_map[new_connection_id]->socket() = std::move(new_socket); + BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().local_endpoint().port() << " from " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().remote_endpoint(); + + if (is_first_connection) + { + async_send_stream_start(tac, service_id, new_connection_id); + } + else + { + async_send_connection_start(tac, service_id, new_connection_id); + } + + do_accept_tcp_connection(tac, retry_config, service_id, local_port, false); + } + }); + + } + + void tcp_adapter_proxy::async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr retry_config, string const & service_id, uint32_t const & connection_id, boost::system::error_code const &ec, tcp::resolver::results_type results) + { + BOOST_LOG_SEV(log, trace) << "Resolve destination to connect for service id: " << service_id << " connection id: " << connection_id; if (ec) { string endpoint = tac.adapter_config.serviceId_to_endpoint_map[service_id]; BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve endpoint %1%. Error message: %2%") % endpoint % ec.message()).str(); basic_retry_execute(log, retry_config, - [this, &tac, service_id]() + [this, &tac, service_id, connection_id]() { - tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id); + tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id); - async_send_stream_reset(tac, service_id); + async_send_stream_reset(tac, service_id, connection_id); }); } else { @@ -1691,32 +2006,34 @@ namespace aws { namespace iot { namespace securedtunneling { std::string dst_host = results->endpoint().address().to_string(); unsigned short dst_port = results->endpoint().port(); BOOST_LOG_SEV(log, debug) << "Resolved destination host to IP: " << dst_host << " , connecting ..."; - client->connection_->socket().async_connect(*results.begin(), - [=, &tac](boost::system::error_code const &ec) + client->connectionId_to_tcp_connection_map[connection_id]->socket().async_connect(*results.begin(), + [=, &tac](boost::system::error_code const &ec) { if (ec) { BOOST_LOG_SEV(log, error) << (boost::format("Could not connect to destination %1%:%2% -- %3%") % dst_host % dst_host % ec.message()).str(); basic_retry_execute(log, retry_config, - [this, &tac, service_id]() + [this, &tac, service_id, connection_id]() { - tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id); + BOOST_LOG_SEV(log, trace) << "ignoring all messages: "; + tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id); - async_send_stream_reset(tac, service_id); + async_send_stream_reset(tac, service_id, connection_id); }); } else { BOOST_LOG_SEV(log, info) << "Connected to " << dst_host << ", port: " << dst_port; - tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; - invoke_and_clear_handler(client->after_setup_tcp_socket); + async_setup_bidirectional_data_transfers(tac, service_id, connection_id); } } ); } } - void tcp_adapter_proxy::async_setup_dest_tcp_socket(tcp_adapter_context &tac, string const & service_id) + void tcp_adapter_proxy::async_setup_dest_tcp_socket(tcp_adapter_context &tac, string const & service_id, uint32_t const & connection_id, bool is_first_connection) { BOOST_LOG_SEV(log, trace) << "Setup destination tcp socket for service id" << service_id; std::shared_ptr retry_config = @@ -1724,14 +2041,13 @@ namespace aws { namespace iot { namespace securedtunneling { GET_SETTING(settings, TCP_CONNECTION_RETRY_COUNT), GET_SETTING(settings, TCP_CONNECTION_RETRY_DELAY_MS), nullptr); - retry_config->operation = std::bind(&tcp_adapter_proxy::async_setup_dest_tcp_socket_retry, this, std::ref(tac), retry_config, service_id); - async_setup_dest_tcp_socket_retry(tac, retry_config, service_id); + retry_config->operation = std::bind(&tcp_adapter_proxy::async_setup_dest_tcp_socket_retry, this, std::ref(tac), retry_config, service_id, connection_id, is_first_connection); + async_setup_dest_tcp_socket_retry(tac, retry_config, service_id, connection_id, is_first_connection); } - void tcp_adapter_proxy::async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr retry_config, string const & service_id) + void tcp_adapter_proxy::async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr retry_config, string const & service_id, uint32_t const & connection_id, bool is_first_connection) { tcp_client::pointer client = tac.serviceId_to_tcp_client_map[service_id]; - tcp_socket_ensure_closed(client->connection_->socket()); if (tac.adapter_config.serviceId_to_endpoint_map.find((service_id)) == tac.adapter_config.serviceId_to_endpoint_map.end()) { throw std::runtime_error((boost::format("Receive invalid service id %1%") % service_id).str()); @@ -1740,6 +2056,17 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, info) << "Attempting to establish tcp socket connection to: " << endpoint; + BOOST_LOG_SEV(log, info) << "Setting up dest socket with tcp connection id " << connection_id; + + if (client->connectionId_to_tcp_connection_map.find(connection_id) == client->connectionId_to_tcp_connection_map.end()) + { + client->connectionId_to_tcp_connection_map[connection_id] = tcp_connection::create(tac.io_ctx, + GET_SETTING(settings, TCP_WRITE_BUFFER_SIZE), + GET_SETTING(settings, TCP_READ_BUFFER_SIZE), + GET_SETTING(settings, WEB_SOCKET_WRITE_BUFFER_SIZE), + connection_id); + } + if (tac.adapter_config.bind_address.has_value()) { BOOST_LOG_SEV(log, debug) << "Resolving local address host: " << tac.adapter_config.bind_address.get(); @@ -1751,11 +2078,13 @@ namespace aws { namespace iot { namespace securedtunneling { { BOOST_LOG_SEV(log, error) << (boost::format("Could not resolve bind address: %1% -- %2%") % tac.adapter_config.bind_address.get() % ec.message()).str(); basic_retry_execute(log, retry_config, - [this, &tac, service_id]() + [this, &tac, service_id, connection_id]() { - tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id); + tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id); - async_send_stream_reset(tac, service_id); + async_send_stream_reset(tac, service_id, connection_id); }); } else @@ -1763,17 +2092,19 @@ namespace aws { namespace iot { namespace securedtunneling { BOOST_LOG_SEV(log, debug) << "Resolved bind IP: " << results->endpoint().address().to_string(); boost::system::error_code bind_ec; - client->connection_->socket().open(results->endpoint().protocol()); - client->connection_->socket().bind({results->endpoint().address(), 0}, bind_ec); + client->connectionId_to_tcp_connection_map[connection_id]->socket().open(results->endpoint().protocol()); + client->connectionId_to_tcp_connection_map[connection_id]->socket().bind({results->endpoint().address(), 0}, bind_ec); if (bind_ec) { BOOST_LOG_SEV(log, error) << (boost::format("Could not bind to address: %1% -- %2%") % results->endpoint().address().to_string() % bind_ec.message()).str(); basic_retry_execute(log, retry_config, - [this, &tac, service_id]() + [this, &tac, service_id, connection_id]() { - tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id); + tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id); + tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); + tac.serviceId_to_data_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message, this, std::ref(tac), std::placeholders::_1); socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id); - async_send_stream_reset(tac, service_id); + async_send_stream_reset(tac, service_id, connection_id); }); } else @@ -1781,8 +2112,9 @@ namespace aws { namespace iot { namespace securedtunneling { tuple endpoint_to_connect = tcp_adapter_proxy::get_host_and_port(endpoint, tac.adapter_config.bind_address.get()); std::string dst_host = std::get<0>(endpoint_to_connect); std::string dst_port = std::get<1>(endpoint_to_connect); + BOOST_LOG_SEV(log, trace) << "Resolving destination host: " << dst_host << " port: " << dst_port; client->resolver_.async_resolve(dst_host, dst_port, - std::bind(&tcp_adapter_proxy::async_resolve_destination_for_connect, this, std::ref(tac), retry_config, service_id, std::placeholders::_1, std::placeholders::_2)); + std::bind(&tcp_adapter_proxy::async_resolve_destination_for_connect, this, std::ref(tac), retry_config, service_id, connection_id, std::placeholders::_1, std::placeholders::_2)); } } }); @@ -1794,22 +2126,7 @@ namespace aws { namespace iot { namespace securedtunneling { std::string dst_port = std::get<1>(endpoint_to_connect); BOOST_LOG_SEV(log, trace) << "Resolving destination host: " << dst_host << " port: " << dst_port; client->resolver_.async_resolve(dst_host, dst_port, - std::bind(&tcp_adapter_proxy::async_resolve_destination_for_connect, this, std::ref(tac), retry_config, service_id, std::placeholders::_1, std::placeholders::_2)); - } - } - - void tcp_adapter_proxy::tcp_socket_ensure_closed(tcp::socket & tcp_socket) - { - boost::system::error_code ec; - if (tcp_socket.is_open()) - { - BOOST_LOG_SEV(log, debug) << "Previously open connection detected. Closing..."; - auto remote_endpoint = tcp_socket.remote_endpoint(ec); - if (!ec) - { - BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << remote_endpoint; - } - tcp_socket.close(); + std::bind(&tcp_adapter_proxy::async_resolve_destination_for_connect, this, std::ref(tac), retry_config, service_id, connection_id, std::placeholders::_1, std::placeholders::_2)); } } @@ -1847,7 +2164,7 @@ namespace aws { namespace iot { namespace securedtunneling { int32_t stream_id = tac.serviceId_to_streamId_map.at(service_id); if (message.streamid() == 0) { - BOOST_LOG_SEV(log, warning) << "Message recieved with streamid not set"; + BOOST_LOG_SEV(log, warning) << "Message received with streamid not set"; return false; } return stream_id == message.streamid(); @@ -1857,30 +2174,13 @@ namespace aws { namespace iot { namespace securedtunneling { bool tcp_adapter_proxy::tcp_has_enough_write_buffer_space(tcp_connection::pointer connection) { //tcp write buffer needs at least enough space to hold a max data size web socket message - //because we can't limit how much data we might recieve next frame + //because we can't limit how much data we might receive next frame return (connection->tcp_write_buffer_.max_size() - connection->tcp_write_buffer_.size()) >= GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE); } - // Check if all tcp write buffers have space. If one of them does not have enough, return false - bool tcp_adapter_proxy::tcp_has_enough_write_buffer_space(tcp_adapter_context const &tac) - { - bool has_enough_space = true; - for (auto m : tac.serviceId_to_tcp_client_map) - { - string service_id = m.first; - tcp_connection::pointer connection = m.second->connection_; - if ( (connection->tcp_write_buffer_.max_size() - connection->tcp_write_buffer_.size()) < GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE) ) - { - has_enough_space = false; - break; - } - } - return has_enough_space; - } - bool tcp_adapter_proxy::wss_has_enough_write_buffer_space(boost::beast::multi_buffer const &buffer) { //web socket write buffer only needs non-zero space because we can make TCP read - //calls that limit the data recieved + //calls that limit the data received return (buffer.max_size() - buffer.size()) > 0; } diff --git a/src/TcpAdapterProxy.h b/src/TcpAdapterProxy.h index 719498e8..0e4a648d 100644 --- a/src/TcpAdapterProxy.h +++ b/src/TcpAdapterProxy.h @@ -80,11 +80,14 @@ namespace aws { namespace iot { namespace securedtunneling { wss{ nullptr }, wss_resolver{ io_ctx }, wss_response{ }, + num_active_connections{ 0 }, stream_id{ -1 }, service_id{ "" }, serviceId_to_streamId_map{}, serviceId_to_tcp_server_map{}, serviceId_to_tcp_client_map{}, + serviceId_to_control_message_handler_map{}, + serviceId_to_data_message_handler_map{}, bind_address_actual{ }, is_web_socket_reading{ false }, is_service_ids_received{ false }, @@ -105,6 +108,8 @@ namespace aws { namespace iot { namespace securedtunneling { //debuggability. boost::beast::websocket::response_type wss_response; + std::atomic_uint16_t num_active_connections; + //represents the current stream ID to expect data from //care should be taken how(if) this is updated directly // To be deleted @@ -113,6 +118,8 @@ namespace aws { namespace iot { namespace securedtunneling { std::unordered_map serviceId_to_streamId_map; std::unordered_map serviceId_to_tcp_server_map; std::unordered_map serviceId_to_tcp_client_map; + std::unordered_map> serviceId_to_control_message_handler_map; + std::unordered_map> serviceId_to_data_message_handler_map; std::string bind_address_actual; //flag set to true while web socket data is being drained //necessary for better TCP socket recovery rather than destroying @@ -152,15 +159,15 @@ namespace aws { namespace iot { namespace securedtunneling { int run_proxy(); private: + void update_message_handlers(tcp_adapter_context &tac, std::function handler); void setup_tcp_socket(tcp_adapter_context &tac, std::string const & service_id); void setup_tcp_sockets(tcp_adapter_context &tac); //setup async io flow to connect tcp socket to the adapter config's data host/port - void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id); - void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr retry_config, std::string const & service_id); + void async_setup_dest_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection); + void async_setup_dest_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr retry_config, std::string const & service_id, uint32_t const & connection_id, bool is_first_connection); void async_setup_source_tcp_sockets(tcp_adapter_context &tac); void async_setup_source_tcp_socket_retry(tcp_adapter_context &tac, std::shared_ptr retry_config, std::string service_id); - void initialize_tcp_clients(tcp_adapter_context &tac); - void initialize_tcp_servers(tcp_adapter_context &tac); + void do_accept_tcp_connection(tcp_adapter_context &tac, std::shared_ptr retry_config, std::string service_id, std::uint16_t local_port, bool is_first_connection); void setup_web_socket(tcp_adapter_context &tac); //setup async web socket, and as soon as connection is up, setup async ping schedule void async_setup_web_socket(tcp_adapter_context &tac); @@ -169,10 +176,13 @@ namespace aws { namespace iot { namespace securedtunneling { //then the reset is intentionally reset via web socket, and retries //occur definitely (regardless of retry configuration) void tcp_socket_reset_all(tcp_adapter_context &tac, std::function post_reset_operation); - void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, std::function post_reset_operation); - tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id); + void tcp_socket_reset_init(tcp_adapter_context &tac, std::string service_id, std::function post_reset_operation); + void tcp_socket_reset(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id, std::function post_reset_operation); + void tcp_socket_close(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id); + tcp_connection::pointer get_tcp_connection(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id); - void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id); + void delete_tcp_socket(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); + void tcp_socket_error(tcp_adapter_context &tac, boost::system::error_code const &_ec, std::string const & service_id, uint32_t const & connection_id); //sets up a web socket read loop that will read, and ignore most messages until a stream start //is read and then do something with it (likely, connect to configured endpoint) @@ -197,22 +207,21 @@ namespace aws { namespace iot { namespace securedtunneling { //invokes after_setup_web_socket_read_until_stream_start() after stream start is encountered bool async_wait_for_stream_start(tcp_adapter_context &tac, message const &message); bool async_wait_for_service_ids(tcp_adapter_context &tac); - void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id); + void async_tcp_socket_read_loop(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); //below loop does continuous writes to TCP socket from the TCP adapter //context's tcp_write_buffer. After consuming chunks out of the buffer - //the behavior will be to check - void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id); + //the behavior will be to check + void async_tcp_write_buffer_drain(tcp_adapter_context &tac, std::string service_id, uint32_t connection_id); - void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id); - void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id); + void async_setup_bidirectional_data_transfers(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); + void async_setup_web_socket_write_buffer_drain(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); //returns a boolean that indicates if another web socket data read message can be put //onto the tcp write buffer. We have no way of knowing what the next message is and if //it will be too big to process, thus we don't do the read applying back pressure on //the socket. Implicitly, this means that an async_read is not happening on the web socket bool tcp_has_enough_write_buffer_space(tcp_connection::pointer connection); - bool tcp_has_enough_write_buffer_space(tcp_adapter_context const &tac); //returns a boolean that indicates if another tcp socket read's data can be put on the //web socket write buffer. It's a bit different from tcp write buffer space requirements @@ -226,8 +235,11 @@ namespace aws { namespace iot { namespace securedtunneling { bool is_valid_stream_id(tcp_adapter_context const& tac, message const &message); void async_send_message(tcp_adapter_context &tac, message const &message); - void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id); - void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id); + void async_send_message(tcp_adapter_context &tac, message const &message, std::string const & service_id, uint32_t const & connection_id); + void async_send_stream_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); + void async_send_stream_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); + void async_send_connection_start(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); + void async_send_connection_reset(tcp_adapter_context &tac, std::string const & service_id, uint32_t const & connection_id); //handler for successfully sent ping will delay the next one void async_ping_handler_loop(tcp_adapter_context &tac, @@ -239,8 +251,6 @@ namespace aws { namespace iot { namespace securedtunneling { void clear_ws_buffers(tcp_adapter_context &tac); void clear_tcp_connection_buffers(tcp_connection::pointer connection); - void tcp_socket_ensure_closed(boost::asio::ip::tcp::socket & tcp_socket); - //closes the websocket connection //1 - shutdown the receive side of TCP //2 - drain the web socket write buffer @@ -248,7 +258,7 @@ namespace aws { namespace iot { namespace securedtunneling { //4 - perform teardown procedure on websocket void web_socket_close_and_stop(tcp_adapter_context &tac); - void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr retry_config, std::string const & service_id, boost::system::error_code const &ec, tcp::resolver::results_type results); + void async_resolve_destination_for_connect(tcp_adapter_context &tac, std::shared_ptr retry_config, std::string const & service_id, uint32_t const & connection_id, boost::system::error_code const &ec, tcp::resolver::results_type results); bool process_incoming_websocket_buffer(tcp_adapter_context &tac, boost::beast::multi_buffer &message_buffer); @@ -264,7 +274,7 @@ namespace aws { namespace iot { namespace securedtunneling { bool fall_back_to_v1_message_format(std::unordered_map const & serviceId_to_endpoint_map); - void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr const& ss, std::string const & service_id); + void async_send_message_to_web_socket(tcp_adapter_context &tac, std::shared_ptr const& ss, std::string const & service_id, uint32_t const & connection_id); void async_setup_destination_tcp_sockets(tcp_adapter_context &tac); diff --git a/src/TcpClient.h b/src/TcpClient.h index 0b7162a2..4180451a 100644 --- a/src/TcpClient.h +++ b/src/TcpClient.h @@ -14,18 +14,20 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio tcp_client(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size) : resolver_(io_context) { - connection_ = - tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size); + } static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size) { return pointer(new tcp_client(io_context, write_buf_size, read_buf_size, ws_write_buf_size)); } - tcp_connection::pointer connection_; tcp::resolver resolver_; + + std::unordered_map connectionId_to_tcp_connection_map; + // function object defines what to do after set up a tcp socket std::function after_setup_tcp_socket = nullptr; + // function object defines what to do receiving control message: stream start std::function on_receive_stream_start = nullptr; }; diff --git a/src/TcpConnection.h b/src/TcpConnection.h index a9fe0c3f..7d15fc22 100644 --- a/src/TcpConnection.h +++ b/src/TcpConnection.h @@ -17,9 +17,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio public: typedef boost::shared_ptr pointer; - static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size) + static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id) { - return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size)); + return pointer(new tcp_connection(io_context, write_buf_size, read_buf_size, ws_write_buf_size, connection_id)); } tcp::socket& socket() @@ -27,11 +27,13 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio return socket_; } - tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size) + tcp_connection(boost::asio::io_context & io_context, std::size_t write_buf_size, std::size_t read_buf_size, std::size_t ws_write_buf_size, uint32_t connection_id) : socket_(io_context) , tcp_write_buffer_(write_buf_size) , tcp_read_buffer_(read_buf_size) , web_socket_data_write_buffer_(ws_write_buf_size) + , connection_id_(connection_id) + { } @@ -51,6 +53,9 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio //condense smaller TCP read chunks to bigger web socket writes. It also makes //it impossible to "inject" a non-data message in data sequence order boost::beast::multi_buffer web_socket_data_write_buffer_; + + uint32_t connection_id_; // assigned connection_id for tcp connection + // Is this tcp socket currently writing bool is_tcp_socket_writing_{ false }; // Is this tcp socket currently reading diff --git a/src/TcpServer.h b/src/TcpServer.h index 65c32673..e39a0862 100644 --- a/src/TcpServer.h +++ b/src/TcpServer.h @@ -4,6 +4,7 @@ #include #include #include +#include #include "TcpConnection.h" namespace aws { namespace iot { namespace securedtunneling { namespace connection { @@ -16,8 +17,7 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio : acceptor_(io_context) , resolver_(io_context) { - connection_ = - tcp_connection::create(io_context, write_buf_size, read_buf_size, ws_write_buf_size); + highest_connection_id = 0; } static pointer create(boost::asio::io_context& io_context, std::size_t const & write_buf_size, std::size_t const & read_buf_size, std::size_t const & ws_write_buf_size) @@ -32,7 +32,11 @@ namespace aws { namespace iot { namespace securedtunneling { namespace connectio tcp::acceptor acceptor_; tcp::resolver resolver_; - tcp_connection::pointer connection_; + + std::unordered_map connectionId_to_tcp_connection_map; + + std::atomic_uint32_t highest_connection_id; + // function object defines what to do after set up a tcp socket std::function after_setup_tcp_socket = nullptr; }; diff --git a/test/AdapterTests.cpp b/test/AdapterTests.cpp index 1f569942..dcd14ccc 100644 --- a/test/AdapterTests.cpp +++ b/test/AdapterTests.cpp @@ -217,7 +217,7 @@ TEST_CASE( "Test source mode", "[source]") { this_thread::sleep_for(chrono::milliseconds(IO_PAUSE_MS)); CHECK( ws_server.get_handshake_request().method() == boost::beast::http::verb::get ); CHECK( ws_server.get_handshake_request().target() == "/tunnel?local-proxy-mode=source" ); - CHECK( ws_server.get_handshake_request().base()["sec-websocket-protocol"] == "aws.iot.securetunneling-2.0" ); + CHECK( ws_server.get_handshake_request().base()["sec-websocket-protocol"] == "aws.iot.securetunneling-3.0" ); CHECK( ws_server.get_handshake_request().base()["access-token"] == adapter_cfg.access_token ); // Simulate cloud side sends control message Message_Type_SERVICE_IDS @@ -248,7 +248,7 @@ TEST_CASE( "Test source mode", "[source]") { ws_server.expect_next_message( [](message const&msg) { - return (msg.type() == com::amazonaws::iot::securedtunneling::Message_Type_STREAM_RESET) && msg.streamid() == 1; + return (msg.type() == com::amazonaws::iot::securedtunneling::Message_Type_CONNECTION_RESET) && msg.streamid() == 1; }); client_socket.close(); @@ -316,7 +316,7 @@ TEST_CASE( "Test source mode with client token", "[source]") { this_thread::sleep_for(chrono::milliseconds(IO_PAUSE_MS)); CHECK( ws_server.get_handshake_request().method() == boost::beast::http::verb::get ); CHECK( ws_server.get_handshake_request().target() == "/tunnel?local-proxy-mode=source" ); - CHECK( ws_server.get_handshake_request().base()["sec-websocket-protocol"] == "aws.iot.securetunneling-2.0" ); + CHECK( ws_server.get_handshake_request().base()["sec-websocket-protocol"] == "aws.iot.securetunneling-3.0" ); CHECK( ws_server.get_handshake_request().base()["access-token"] == adapter_cfg.access_token ); CHECK( ws_server.get_handshake_request().base()["client-token"] == adapter_cfg.client_token ); @@ -348,7 +348,7 @@ TEST_CASE( "Test source mode with client token", "[source]") { ws_server.expect_next_message( [](message const&msg) { - return (msg.type() == com::amazonaws::iot::securedtunneling::Message_Type_STREAM_RESET) && msg.streamid() == 1; + return (msg.type() == com::amazonaws::iot::securedtunneling::Message_Type_CONNECTION_RESET) && msg.streamid() == 1; }); client_socket.close(); @@ -427,7 +427,7 @@ TEST_CASE( "Test destination mode", "[destination]") { // Verify web socket handshake request from local proxy CHECK( ws_server.get_handshake_request().method() == boost::beast::http::verb::get ); CHECK( ws_server.get_handshake_request().target() == "/tunnel?local-proxy-mode=destination" ); - CHECK( ws_server.get_handshake_request().base()["sec-websocket-protocol"] == "aws.iot.securetunneling-2.0" ); + CHECK( ws_server.get_handshake_request().base()["sec-websocket-protocol"] == "aws.iot.securetunneling-3.0" ); CHECK( ws_server.get_handshake_request().base()["access-token"] == adapter_cfg.access_token ); // Simulate cloud side sends control message Message_Type_SERVICE_IDS