Skip to content

Commit

Permalink
feat: add websocket client example (philips-software#235)
Browse files Browse the repository at this point in the history
* hal/generic: Add SerialCommunicationConsole

* services/network/WebSocketServerConnectionObserver: Take into account max stream size of upstream connection; Allow RequestSendStream on the SendStreamAvailable call

* Add examples/websocket_client

* Apply suggestions from code review

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

* protobuf/echo/Echo: Add ServiceForwarderAll

* protobuf/echo/test: Extract TestEchoOnConnection from protobuf/protoc_echo_plugin/test/TestProtoCEchoPlugin

(cherry picked from commit 37a9695)

* protobuf/echo/test: Add TestEchoOnMessageCommunication

(cherry picked from commit 78f714e)

* protobuf/echo/test: Extract ServiceStub and EchoMock, adapt to new interface

* protobuf/echo/test: Refactor

* protobuf/echo: Rename test_helper to test_doubles

* protobuf/echo/Echo: Extract ServiceForwarder

* Update protobuf/echo/test/TestServiceForwarder.cpp

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
richardapeters and github-actions[bot] committed Apr 17, 2023
1 parent e3ebac9 commit d4aa174
Show file tree
Hide file tree
Showing 28 changed files with 922 additions and 349 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ if (EMIL_HOST_BUILD)
add_subdirectory(serial_output)
add_subdirectory(sntp)
add_subdirectory(threading)
add_subdirectory(websocket_client)
endif()
2 changes: 1 addition & 1 deletion examples/rpc/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class RpcServer
void ConnectionAccepted(infra::AutoResetFunction<void(infra::SharedPtr<services::ConnectionObserver> connectionObserver)>&& createdObserver, services::IPAddress address) override
{
if (connection)
connection->::services::EchoOnConnection::Subject().AbortAndDestroy();
connection->::services::ConnectionObserver::Subject().AbortAndDestroy();

if (connection.Allocatable())
createdObserver(connection.Emplace());
Expand Down
3 changes: 3 additions & 0 deletions examples/websocket_client/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
add_executable(examples.websocket_client)
target_sources(examples.websocket_client PRIVATE Client.cpp)
target_link_libraries(examples.websocket_client PRIVATE args services.network_instantiations hal.generic)
184 changes: 184 additions & 0 deletions examples/websocket_client/Client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#define NOMINMAX
#include "args.hxx"
#include "hal/generic/SynchronousRandomDataGeneratorGeneric.hpp"
#include "hal/generic/TimerServiceGeneric.hpp"
#include "services/network/ConnectionFactoryWithNameResolver.hpp"
#include "services/network/HttpClientBasic.hpp"
#include "services/network/HttpClientImpl.hpp"
#include "services/network/WebSocketClientConnectionObserver.hpp"
#include "services/network_instantiations/NetworkAdapter.hpp"
#include "services/tracer/TracerOnIoOutputInfrastructure.hpp"
#include <mutex>

namespace application
{
class ConsoleClientConnection
: public services::ConnectionObserver
{
public:
explicit ConsoleClientConnection();

// Implementation of ConnectionObserver
void SendStreamAvailable(infra::SharedPtr<infra::StreamWriter>&& writer) override;
void DataReceived() override;
void Attached() override;

private:
void CheckDataToBeSent();

private:
std::mutex mutex;
std::string dataToBeSent;
infra::SharedPtr<infra::StreamWriter> writer;
};

ConsoleClientConnection::ConsoleClientConnection()
{
std::thread([this]()
{
while (true)
{
std::string data;
std::getline(std::cin, data);
data += "\n";

std::scoped_lock lock(mutex);
dataToBeSent += data;
infra::EventDispatcher::Instance().Schedule([this]() { CheckDataToBeSent(); });
} })
.detach();
}

void ConsoleClientConnection::SendStreamAvailable(infra::SharedPtr<infra::StreamWriter>&& writer)
{
this->writer = writer;
writer = nullptr;

CheckDataToBeSent();
}

void ConsoleClientConnection::DataReceived()
{
auto reader = Subject().ReceiveStream();
infra::DataInputStream::WithErrorPolicy stream(*reader);

while (!stream.Empty())
std::cout << infra::ByteRangeAsStdString(stream.ContiguousRange()) << std::flush;

Subject().AckReceived();
}

void ConsoleClientConnection::Attached()
{
services::ConnectionObserver::Subject().RequestSendStream(services::ConnectionObserver::Subject().MaxSendStreamSize());
}

void ConsoleClientConnection::CheckDataToBeSent()
{
std::scoped_lock lock(mutex);

if (writer != nullptr && !dataToBeSent.empty())
{
infra::TextOutputStream::WithErrorPolicy stream(*writer);
std::size_t amount = std::min(stream.Available(), dataToBeSent.size());
stream << dataToBeSent.substr(0, amount);
dataToBeSent.erase(0, amount);

writer = nullptr;
services::ConnectionObserver::Subject().RequestSendStream(services::ConnectionObserver::Subject().MaxSendStreamSize());
}
}

class ConsoleFactory
: public services::WebSocketClientObserverFactory
{
public:
explicit ConsoleFactory(infra::BoundedString url, services::Tracer& tracer);

public:
virtual infra::BoundedString Url() const override;
virtual uint16_t Port() const override;
virtual void ConnectionEstablished(infra::AutoResetFunction<void(infra::SharedPtr<services::ConnectionObserver> client)>&& createdClientObserver) override;
virtual void ConnectionFailed(ConnectFailReason reason) override;

private:
infra::BoundedString url;
services::Tracer& tracer;
infra::SharedOptional<ConsoleClientConnection> consoleClientConnection;
};

ConsoleFactory::ConsoleFactory(infra::BoundedString url, services::Tracer& tracer)
: url(url)
, tracer(tracer)
{}

infra::BoundedString ConsoleFactory::Url() const
{
return url;
}

uint16_t ConsoleFactory::Port() const
{
return 80;
}

void ConsoleFactory::ConnectionEstablished(infra::AutoResetFunction<void(infra::SharedPtr<services::ConnectionObserver> client)>&& createdClientObserver)
{
tracer.Trace() << "Connection established";
tracer.Trace();
createdClientObserver(consoleClientConnection.Emplace());
}

void ConsoleFactory::ConnectionFailed(ConnectFailReason reason)
{
tracer.Trace() << "Connection failed";
exit(1);
}
}

int main(int argc, const char* argv[], const char* env[])
{
std::string toolname = argv[0];
args::ArgumentParser parser(toolname + " is a tool to demonstrate a websocket client.");
args::Group arguments(parser, "Arguments:");
args::Positional<std::string> url(arguments, "url", "URL of a websocket", args::Options::Required);
args::Group flags(parser, "Optional flags:");
args::HelpFlag help(flags, "help", "Display this help menu.", { 'h', "help" });

try
{
parser.Prog(toolname);
parser.ParseCLI(argc, argv);

static hal::TimerServiceGeneric timerService;
static hal::SynchronousRandomDataGeneratorGeneric randomDataGenerator;
static main_::TracerOnIoOutputInfrastructure tracer;
static main_::NetworkAdapter network;

static services::ConnectionFactoryWithNameResolverImpl::WithStorage<1> connectionFactory(network.ConnectionFactory(), network.NameResolver());
static services::HttpClientConnectorWithNameResolverImpl<> clientConnector{ connectionFactory };
static infra::Creator<services::Stoppable, services::HttpClientWebSocketInitiation, void(services::WebSocketClientObserverFactory & clientObserverFactory, services::HttpClientWebSocketInitiationResult & result, hal::SynchronousRandomDataGenerator & randomDataGenerator)> httpClientInitiationCreator{ [](infra::Optional<services::HttpClientWebSocketInitiation>& value, services::WebSocketClientObserverFactory& clientObserverFactory,
services::HttpClientWebSocketInitiationResult& result, hal::SynchronousRandomDataGenerator& randomDataGenerator)
{
value.Emplace(clientObserverFactory, clientConnector, result, randomDataGenerator);
} };
static services::WebSocketClientFactorySingleConnection webSocketFactory{ randomDataGenerator, { httpClientInitiationCreator } };

application::ConsoleFactory consoleFactory(get(url), tracer.tracer);
webSocketFactory.Connect(consoleFactory);

network.Run();
}
catch (const args::Help&)
{
std::cout << parser;
return 1;
}
catch (const std::exception& ex)
{
std::cerr << ex.what() << std::endl;
return 1;
}

return 0;
}
2 changes: 2 additions & 0 deletions hal/generic/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ if (EMIL_HOST_BUILD)
target_sources(hal.generic PRIVATE
FileSystemGeneric.cpp
FileSystemGeneric.hpp
SerialCommunicationConsole.cpp
SerialCommunicationConsole.hpp
SynchronousRandomDataGeneratorGeneric.cpp
SynchronousRandomDataGeneratorGeneric.hpp
TimeKeeperGeneric.cpp
Expand Down
32 changes: 32 additions & 0 deletions hal/generic/SerialCommunicationConsole.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "hal/generic/SerialCommunicationConsole.hpp"
#include "infra/event/EventDispatcher.hpp"
#include "infra/util/BoundedString.hpp"

namespace hal
{
SerialCommunicationConsole::SerialCommunicationConsole()
{
std::thread([this]()
{
while (true)
{
std::string data;
std::getline(std::cin, data);
data += "\n";
if (dataReceived)
dataReceived(infra::StdStringAsByteRange(data));
} })
.detach();
}

void SerialCommunicationConsole::SendData(infra::ConstByteRange data, infra::Function<void()> actionOnCompletion)
{
std::cout << infra::ByteRangeAsStdString(data);
infra::EventDispatcher::Instance().Schedule(actionOnCompletion);
}

void SerialCommunicationConsole::ReceiveData(infra::Function<void(infra::ConstByteRange data)> dataReceived)
{
this->dataReceived = dataReceived;
}
}
24 changes: 24 additions & 0 deletions hal/generic/SerialCommunicationConsole.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#ifndef HAL_SERIAL_COMMUNICATION_CONSOLE_HPP
#define HAL_SERIAL_COMMUNICATION_CONSOLE_HPP

#include "hal/interfaces/SerialCommunication.hpp"
#include <iostream>
#include <thread>

namespace hal
{
class SerialCommunicationConsole
: public SerialCommunication
{
public:
SerialCommunicationConsole();

virtual void SendData(infra::ConstByteRange data, infra::Function<void()> actionOnCompletion) override;
virtual void ReceiveData(infra::Function<void(infra::ConstByteRange data)> dataReceived) override;

private:
infra::Function<void(infra::ConstByteRange data)> dataReceived;
};
}

#endif
8 changes: 7 additions & 1 deletion protobuf/echo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ target_sources(protobuf.echo PRIVATE
Echo.cpp
Echo.hpp
EchoInstantiation.hpp
ServiceForwarder.cpp
ServiceForwarder.hpp
TracingEcho.cpp
TracingEcho.hpp
)

if (BUILD_TESTING)
add_subdirectory(test_helper)
add_subdirectory(test_doubles)
endif()

include(protocol_buffer_echo.cmake)

if (EMIL_BUILD_TESTS)
add_subdirectory(test)
endif()
Loading

0 comments on commit d4aa174

Please sign in to comment.