Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions plugins/multiplexer/ats-multiplexer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,27 @@ DoRemap(const Instance &i, TSHttpTxn t)
assert(buffer != nullptr);
assert(location != nullptr);

int length;
const char *const method = TSHttpHdrMethodGet(buffer, location, &length);

Dbg(dbg_ctl, "Method is %s.", std::string(method, length).c_str());

if (i.skipPostPut && ((length == TS_HTTP_LEN_POST && memcmp(TS_HTTP_METHOD_POST, method, TS_HTTP_LEN_POST) == 0) ||
(length == TS_HTTP_LEN_PUT && memcmp(TS_HTTP_METHOD_PUT, method, TS_HTTP_LEN_PUT) == 0))) {
TSHandleMLocRelease(buffer, TS_NULL_MLOC, location);
int method_length;
const char *const method = TSHttpHdrMethodGet(buffer, location, &method_length);

Dbg(dbg_ctl, "Method is %s.", std::string(method, method_length).c_str());

// A value of -1 is used to indicate there was no Content-Length header.
int content_length = -1;
// Retrieve the value of the Content-Length header.
auto field_loc = TSMimeHdrFieldFind(buffer, location, TS_MIME_FIELD_CONTENT_LENGTH, TS_MIME_LEN_CONTENT_LENGTH);
if (field_loc != TS_NULL_MLOC) {
content_length = TSMimeHdrFieldValueUintGet(buffer, location, field_loc, -1);
TSHandleMLocRelease(buffer, location, field_loc);
}
bool const is_post_or_put = (method_length == TS_HTTP_LEN_POST && memcmp(TS_HTTP_METHOD_POST, method, TS_HTTP_LEN_POST) == 0) ||
(method_length == TS_HTTP_LEN_PUT && memcmp(TS_HTTP_METHOD_PUT, method, TS_HTTP_LEN_PUT) == 0);
if (i.skipPostPut && is_post_or_put) {
Dbg(dbg_ctl, "skip_post_put: skipping a POST or PUT request.");
} else if (content_length < 0 && is_post_or_put) {
// HttpSM would need an update for POST request transforms to support
// chunked request bodies. It currently does not support this.
Dbg(dbg_ctl, "Skipping a non-Content-Length POST or PUT request.");
} else {
{
TSMLoc field;
Expand All @@ -145,21 +158,20 @@ DoRemap(const Instance &i, TSHttpTxn t)
generateRequests(i.origins, buffer, location, requests);
assert(requests.size() == i.origins.size());

if ((length == TS_HTTP_LEN_POST && memcmp(TS_HTTP_METHOD_POST, method, TS_HTTP_LEN_POST) == 0) ||
(length == TS_HTTP_LEN_PUT && memcmp(TS_HTTP_METHOD_PUT, method, TS_HTTP_LEN_PUT) == 0)) {
if (is_post_or_put) {
const TSVConn vconnection = TSTransformCreate(handlePost, t);
assert(vconnection != nullptr);
TSContDataSet(vconnection, new PostState(requests));
PostState *state = new PostState(requests, content_length);
TSContDataSet(vconnection, state);
assert(requests.empty());
TSHttpTxnHookAdd(t, TS_HTTP_REQUEST_TRANSFORM_HOOK, vconnection);
} else {
dispatch(requests, timeout);
}

TSHandleMLocRelease(buffer, TS_NULL_MLOC, location);

TSStatIntIncrement(statistics.requests, 1);
}
TSHandleMLocRelease(buffer, TS_NULL_MLOC, location);
}

TSRemapStatus
Expand Down
8 changes: 6 additions & 2 deletions plugins/multiplexer/post.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ PostState::~PostState()
}
}

PostState::PostState(Requests &r) : origin_buffer(nullptr), clone_reader(nullptr), output_vio(nullptr)
PostState::PostState(Requests &r, int content_length)
: content_length{content_length}, origin_buffer(nullptr), clone_reader(nullptr), output_vio(nullptr)
{
assert(!r.empty());
requests.swap(r);
Expand Down Expand Up @@ -72,7 +73,10 @@ postTransform(const TSCont c, PostState &s)
s.clone_reader = TSIOBufferReaderClone(origin_reader);
assert(s.clone_reader != nullptr);

s.output_vio = TSVConnWrite(output_vconn, c, origin_reader, std::numeric_limits<int64_t>::max());
// A future patch should support chunked POST bodies. In those cases, we
// can use INT64_MAX instead of s.content_length.
assert(s.content_length > 0);
s.output_vio = TSVConnWrite(output_vconn, c, origin_reader, s.content_length);
assert(s.output_vio != nullptr);
}

Expand Down
5 changes: 4 additions & 1 deletion plugins/multiplexer/post.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@
struct PostState {
Requests requests;

/// The Content-Length value of the POST/PUT request.
int content_length = -1;

TSIOBuffer origin_buffer;
TSIOBufferReader clone_reader;
/// The VIO for the original (non-clone) origin.
TSVIO output_vio;

~PostState();
PostState(Requests &);
PostState(Requests &, int content_length);
};

int handlePost(TSCont, TSEvent, void *);
31 changes: 27 additions & 4 deletions tests/gold_tests/pluginTest/multiplexer/multiplexer.test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class MultiplexerTestBase:
"""

client_counter = 0
dns_counter = 0
server_counter = 0
ts_counter = 0

Expand All @@ -39,8 +40,14 @@ def __init__(self, replay_file, multiplexed_host_replay_file, skip_post):
self.multiplexed_host_replay_file = multiplexed_host_replay_file

self.setupServers()
self.setupDns()
self.setupTS(skip_post)

def setupDns(self):
counter = MultiplexerTestBase.dns_counter
MultiplexerTestBase.dns_counter += 1
self.dns = Test.MakeDNServer(f"dns_{counter}", default='127.0.0.1')

def setupServers(self):
counter = MultiplexerTestBase.server_counter
MultiplexerTestBase.server_counter += 1
Expand All @@ -57,13 +64,18 @@ def setupServers(self):
'X-Multiplexer: original', 'Verify the HTTP multiplexed host does not receive an "original".')
self.server_https.Streams.All += Testers.ExcludesExpression(
'X-Multiplexer: original', 'Verify the HTTPS multiplexed host does not receive an "original".')
self.server_https.Streams.All += Testers.ExcludesExpression(r'\[ERROR\]', 'Verify there were no errors in the replay.')

# In addition, the original server should always receive the POST and
# PUT requests.
self.server_origin.Streams.All += Testers.ContainsExpression(
'uuid: POST', "Verify the client's original target received the POST transaction.")
self.server_origin.Streams.All += Testers.ContainsExpression(
'uuid: PUT', "Verify the client's original target received the PUT transaction.")
self.server_origin.Streams.All += Testers.ExcludesExpression(r'\[ERROR\]', 'Verify there were no errors in the replay.')
# The chunked POST should go to the origin.
self.server_origin.Streams.All += Testers.ContainsExpression(
'uuid: CHUNKED_POST', "Verify the client's original target received the chunked POST transaction.")

# Under all configurations, the GET request should be multiplexed.
self.server_origin.Streams.All += Testers.ContainsExpression(
Expand All @@ -74,11 +86,18 @@ def setupServers(self):
self.server_http.Streams.All += Testers.ContainsExpression(
'X-Multiplexer: copy', 'Verify the HTTP server received a "copy" of the request.')
self.server_http.Streams.All += Testers.ContainsExpression('uuid: GET', "Verify the HTTP server received the GET request.")
self.server_http.Streams.All += Testers.ExcludesExpression(r'\[ERROR\]', 'Verify there were no errors in the replay.')
# Chunked POST requests are not supported for multiplexing.
self.server_http.Streams.All += Testers.ExcludesExpression(
'uuid: CHUNKED_POST', 'We do not expect a multiplexed chunked POST.')

self.server_https.Streams.All += Testers.ContainsExpression(
'X-Multiplexer: copy', 'Verify the HTTPS server received a "copy" of the request.')
self.server_https.Streams.All += Testers.ContainsExpression(
'uuid: GET', "Verify the HTTPS server received the GET request.")
# Chunked POST requests are not supported for multiplexing.
self.server_https.Streams.All += Testers.ExcludesExpression(
'uuid: CHUNKED_POST', 'We do not expect a multiplexed chunked POST.')

# Verify that the HTTPS server receives a TLS connection.
self.server_https.Streams.All += Testers.ContainsExpression(
Expand All @@ -96,33 +115,37 @@ def setupTS(self, skip_post):
"proxy.config.ssl.client.verify.server.policy": 'PERMISSIVE',
'proxy.config.diags.debug.enabled': 1,
'proxy.config.diags.debug.tags': 'http|multiplexer',
'proxy.config.dns.nameservers': f'127.0.0.1:{self.dns.Variables.Port}',
'proxy.config.dns.resolv_conf': 'NULL',
})
self.ts.Disk.ssl_multicert_config.AddLine('dest_ip=* ssl_cert_name=server.pem ssl_key_name=server.key')
skip_remap_param = ''
if skip_post:
skip_remap_param = ' @pparam=proxy.config.multiplexer.skip_post_put=1'
self.ts.Disk.remap_config.AddLines(
[
f'map https://origin.server.com https://127.0.0.1:{self.server_origin.Variables.https_port} '
f'map https://origin.server.com https://backend.origin.server.com:{self.server_origin.Variables.https_port} '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does MicroDNS return the loopback address for any hostname it wasn't configured with? If so, does that mean we can't use MicroDNS to test DNS lookup failures?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, we configure the default to resolve everything to 127.0.0.1. In this PR, you can see that here:

https://github.com/apache/trafficserver/pull/10965/files#diff-ab6abc922014fa32a8c9d9a16e3b9be6e89f16a45a671728146d270ba2d848cfR49

I think MicroDNS can be used to test failures, but I'm not sure offhand. It might require some modification of it.

f'@plugin=multiplexer.so @pparam=nontls.server.com @pparam=tls.server.com'
f'{skip_remap_param}',

# Now create remap entries for the multiplexed hosts: one that
# verifies HTTP, and another that verifies HTTPS.
f'map http://nontls.server.com http://127.0.0.1:{self.server_http.Variables.http_port}',
f'map http://tls.server.com https://127.0.0.1:{self.server_https.Variables.https_port}',
f'map http://nontls.server.com http://backend.nontls.server.com:{self.server_http.Variables.http_port}',
f'map http://tls.server.com https://backend.tls.server.com:{self.server_https.Variables.https_port}',
])

def run(self):
tr = Test.AddTestRun()
self.ts.StartBefore(self.dns)
tr.Processes.Default.StartBefore(self.server_origin)
tr.Processes.Default.StartBefore(self.server_http)
tr.Processes.Default.StartBefore(self.server_https)
tr.Processes.Default.StartBefore(self.ts)

counter = MultiplexerTestBase.client_counter
MultiplexerTestBase.client_counter += 1
tr.AddVerifierClientProcess(f"client_{counter}", self.replay_file, https_ports=[self.ts.Variables.ssl_port])
client = tr.AddVerifierClientProcess(f"client_{counter}", self.replay_file, https_ports=[self.ts.Variables.ssl_port])
client.Streams.All += Testers.ExcludesExpression(r'\[ERROR\]', 'Verify there were no errors in the replay.')


class MultiplexerTest(MultiplexerTestBase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ sessions:
reason: OK
headers:
fields:
- [ Content-Length, 32 ]
- [ Content-Length, 320000 ]
- [ X-Response, first ]

# There is no client since this response terminates at ATS, so no need for
Expand Down Expand Up @@ -77,7 +77,7 @@ sessions:
reason: OK
headers:
fields:
- [ Content-Length, 32 ]
- [ Content-Length, 320000 ]
- [ X-Response, second ]

# There is no client since this response terminates at ATS, so no need for
Expand Down Expand Up @@ -106,7 +106,7 @@ sessions:
reason: OK
headers:
fields:
- [ Content-Length, 32 ]
- [ Content-Length, 320000 ]
- [ X-Response, third ]

# There is no client since this response terminates at ATS, so no need for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ sessions:
reason: OK
headers:
fields:
- [ Content-Length, 32 ]
- [ Content-Length, 320000 ]
- [ X-Response, first ]

proxy-response:
Expand All @@ -64,7 +64,7 @@ sessions:
headers:
fields:
- [ Host, origin.server.com ]
- [ Content-Length, 8 ]
- [ Content-Length, 320000 ]
- [ X-Request, second ]
- [ uuid, POST ]

Expand All @@ -80,7 +80,7 @@ sessions:
reason: OK
headers:
fields:
- [ Content-Length, 32 ]
- [ Content-Length, 320000 ]
- [ X-Response, second ]

proxy-response:
Expand All @@ -96,7 +96,7 @@ sessions:
headers:
fields:
- [ Host, origin.server.com ]
- [ Content-Length, 8 ]
- [ Content-Length, 320000 ]
- [ X-Request, third ]
- [ uuid, PUT ]

Expand All @@ -112,11 +112,47 @@ sessions:
reason: OK
headers:
fields:
- [ Content-Length, 32 ]
- [ Content-Length, 320000 ]
- [ X-Response, third ]

proxy-response:
status: 200
headers:
fields:
- [ X-Response, { value: third, as: equal } ]

# POST that is chunked. We do not support multiplexing chunked request bodies,
# but it should go to the origin fine.
- client-request:
method: "POST"
version: "1.1"
url: /path/chunked_post
headers:
fields:
- [ Host, origin.server.com ]
- [ Transfer-Encoding, chunked ]
- [ X-Request, fourth ]
- [ uuid, CHUNKED_POST ]
content:
size: 320000

proxy-request:
method: "POST"
headers:
fields:
- [ X-Request, { value: fourth, as: equal } ]
- [ X-Multiplexer, { as: absent } ]

server-response:
status: 200
reason: OK
headers:
fields:
- [ Content-Length, 320000 ]
- [ X-Response, fourth ]

proxy-response:
status: 200
headers:
fields:
- [ X-Response, { value: fourth, as: equal } ]
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,39 @@ sessions:
headers:
fields:
- [ X-Response, { value: third, as: equal } ]

# POST that is chunked. We do not support multiplexing chunked request bodies,
# but it should go to the origin fine.
- client-request:
method: "POST"
version: "1.1"
url: /path/chunked_post
headers:
fields:
- [ Host, origin.server.com ]
- [ Transfer-Encoding, chunked ]
- [ X-Request, fourth ]
- [ uuid, CHUNKED_POST ]
content:
size: 320000

proxy-request:
method: "POST"
headers:
fields:
- [ X-Request, { value: fourth, as: equal } ]
- [ X-Multiplexer, { as: absent } ]

server-response:
status: 200
reason: OK
headers:
fields:
- [ Content-Length, 320000 ]
- [ X-Response, fourth ]

proxy-response:
status: 200
headers:
fields:
- [ X-Response, { value: fourth, as: equal } ]