Skip to content

Commit

Permalink
DPL: improve WebSockets support (#5301)
Browse files Browse the repository at this point in the history
* add ability to parse websocket url
* add beginChunk() and endChunk() method to have bulk action done
  on incoming data before / after all the frames have been processed.
  • Loading branch information
ktf committed Jan 26, 2021
1 parent 358876d commit 8002c6f
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 5 deletions.
26 changes: 24 additions & 2 deletions Framework/Core/src/HTTPParser.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@
// or submit itself to any jurisdiction.

#include "HTTPParser.h"
#include "Framework/RuntimeError.h"
#include <string_view>
#include "SHA1.h"
#include "Base64.h"
#include <regex>

using namespace o2::framework::internal;
namespace o2::framework
Expand Down Expand Up @@ -87,6 +89,7 @@ void encode_websocket_frames(std::vector<uv_buf_t>& outputs, char const* src, si
void decode_websocket(char* start, size_t size, WebSocketHandler& handler)
{
char* cur = start;
handler.beginChunk();
while (cur - start < size) {
WebSocketFrameTiny* header = (WebSocketFrameTiny*)cur;
size_t payloadSize = 0;
Expand All @@ -110,6 +113,7 @@ void decode_websocket(char* start, size_t size, WebSocketHandler& handler)
handler.frame(cur + headerSize, payloadSize);
cur += headerSize + payloadSize;
}
handler.endChunk();
}

std::string encode_websocket_handshake_request(const char* endpoint, const char* protocol, int version, char const* nonce,
Expand Down Expand Up @@ -364,9 +368,27 @@ void parse_http_request(char const* start, size_t size, HTTPParser* parser)
break;
default:
parser->states.push_back(HTTPState::IN_ERROR);
;
;
break;
}
}
}

std::pair<std::string, unsigned short> parse_websocket_url(char const* url)
{
std::string s = url;
if (s == "ws://") {
s = "ws://127.0.0.1:8080";
}
const std::regex urlMatcher("^ws://([0-9-_.]+)[:]([0-9]+)$");
std::smatch parts;
if (!std::regex_match(s, parts, urlMatcher)) {
throw runtime_error_f(
"Unable to parse driver client url: %s.\n"
"Format should be ws://[<driver ip>:<port>] e.g. ws://127.0.0.1:8080 or just ws://");
}
std::string ip = std::string{parts[1]};
auto portS = std::string(parts[2]);
unsigned short port = std::stoul(portS);
return {ip, port};
}
} // namespace o2::framework
12 changes: 9 additions & 3 deletions Framework/Core/src/HTTPParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,15 @@ struct WebSocketHandler {
virtual void beginFragmentation(){};
/// Invoked when a frame it's parsed. Notice you do not own the data and you must
/// not free the memory.
virtual void frame(char const* frame, size_t s){};
virtual void frame(char const* frame, size_t s) {}
/// Invoked before processing the next round of input
virtual void beginChunk() {}
/// Invoked whenever we have no more input to process
virtual void endChunk() {}
/// FIXME: not implemented
virtual void endFragmentation(){};
virtual void endFragmentation() {}
/// FIXME: not implemented
virtual void control(char const* frame, size_t s){};
virtual void control(char const* frame, size_t s) {}
};

/// Decoder for websocket data. For now we assume that the frame was not split. However multiple
Expand Down Expand Up @@ -158,5 +162,7 @@ struct HTTPParserHelpers {
};

void parse_http_request(char const* start, size_t size, HTTPParser* parser);

std::pair<std::string, unsigned short> parse_websocket_url(const char* s);
} // namespace o2::framework
#endif
24 changes: 24 additions & 0 deletions Framework/Core/test/test_HTTPParser.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,27 @@ BOOST_AUTO_TEST_CASE(HTTPParser1)
BOOST_CHECK_EQUAL(reply, checkReply);
}
}

BOOST_AUTO_TEST_CASE(URLParser)
{
{
auto [ip, port] = o2::framework::parse_websocket_url("ws://");
BOOST_CHECK_EQUAL(ip, "127.0.0.1");
BOOST_CHECK_EQUAL(port, 8080);
}
{
auto [ip, port] = o2::framework::parse_websocket_url("ws://127.0.0.1:8080");
BOOST_CHECK_EQUAL(ip, "127.0.0.1");
BOOST_CHECK_EQUAL(port, 8080);
}
{
auto [ip, port] = o2::framework::parse_websocket_url("ws://0.0.0.0:8080");
BOOST_CHECK_EQUAL(ip, "0.0.0.0");
BOOST_CHECK_EQUAL(port, 8080);
}
{
auto [ip, port] = o2::framework::parse_websocket_url("ws://0.0.0.0:8081");
BOOST_CHECK_EQUAL(ip, "0.0.0.0");
BOOST_CHECK_EQUAL(port, 8081);
}
}

0 comments on commit 8002c6f

Please sign in to comment.