Skip to content

Commit cdcd2a8

Browse files
authored
Add dynamic message handlers. (#1)
1 parent 689a197 commit cdcd2a8

8 files changed

+357
-216
lines changed

Diff for: cppesphomeapi/src/api_connection.cpp

+139-46
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#include "api_connection.hpp"
2+
#include <print>
3+
#include <boost/asio.hpp>
24
#include "api.pb.h"
35
#include "make_unexpected_result.hpp"
4-
5-
#include <print>
6+
#include "plain_text_protocol.hpp"
67

78
namespace asio = boost::asio;
89
namespace this_coro = asio::this_coro;
@@ -38,17 +39,20 @@ AsyncResult<void> ApiConnection::connect()
3839
co_await socket_.async_connect(resolved->endpoint());
3940
socket_.set_option(asio::socket_base::keep_alive{true});
4041

42+
asio::co_spawn(strand_, std::bind(&ApiConnection::async_receive, this), asio::detached);
43+
4144
REQUIRE_SUCCESS(co_await send_message_hello());
4245
REQUIRE_SUCCESS(co_await send_message_connect());
4346

47+
asio::co_spawn(executor, std::bind(&ApiConnection::subscribe_logs, this), asio::detached);
4448
co_return Result<void>{};
4549
}
4650

4751
AsyncResult<void> ApiConnection::disconnect()
4852
{
4953
proto::DisconnectRequest request;
5054
REQUIRE_SUCCESS(co_await send_message(request));
51-
REQUIRE_SUCCESS(co_await receive_message<proto::DisconnectResponse>());
55+
REQUIRE_SUCCESS(co_await receive_message<proto::DisconnectResponse>(asio::use_awaitable));
5256
co_return Result<void>{};
5357
}
5458

@@ -57,10 +61,11 @@ AsyncResult<void> ApiConnection::send_message_hello()
5761
proto::HelloRequest request;
5862
request.set_client_info(std::string{"cppapi"});
5963
REQUIRE_SUCCESS(co_await send_message(request));
60-
const auto response = co_await receive_message<proto::HelloResponse>();
64+
auto response = co_await receive_message<proto::HelloResponse>(asio::use_awaitable);
6165
REQUIRE_SUCCESS(response);
62-
device_name_ = response->name();
63-
api_version_ = ApiVersion{.major = response->api_version_major(), .minor = response->api_version_minor()};
66+
auto message = std::move(response.value());
67+
device_name_ = message->name();
68+
api_version_ = ApiVersion{.major = message->api_version_major(), .minor = message->api_version_minor()};
6469
co_return Result<void>{};
6570
}
6671

@@ -69,8 +74,10 @@ AsyncResult<void> ApiConnection::send_message_connect()
6974
proto::ConnectRequest request;
7075
request.set_password(password_);
7176
REQUIRE_SUCCESS(co_await send_message(request));
72-
const auto response = co_await receive_message<proto::ConnectResponse>();
73-
if (response->invalid_password())
77+
auto response = co_await receive_message<proto::ConnectResponse>(asio::use_awaitable);
78+
REQUIRE_SUCCESS(response);
79+
auto message = std::move(response.value());
80+
if (message->invalid_password())
7481
{
7582
co_return make_unexpected_result(ApiErrorCode::AuthentificationError, "Invalid password");
7683
}
@@ -81,57 +88,59 @@ AsyncResult<DeviceInfo> ApiConnection::request_device_info()
8188
{
8289
proto::DeviceInfoRequest device_request{};
8390
REQUIRE_SUCCESS(co_await send_message(device_request));
84-
const auto response = co_await receive_message<proto::DeviceInfoResponse>();
91+
const auto response = co_await receive_message<proto::DeviceInfoResponse>(asio::use_awaitable);
8592
REQUIRE_SUCCESS(response);
93+
auto message = std::move(response.value());
8694

8795
co_return DeviceInfo{
88-
.uses_password = response->uses_password(),
89-
.has_deep_sleep = response->has_deep_sleep(),
90-
.name = response->name(),
91-
.friendly_name = response->friendly_name(),
92-
.mac_address = response->mac_address(),
93-
.compilation_time = response->compilation_time(), // todo: maybe parse directly into std::chrono?
94-
.model = response->model(),
95-
.manufacturer = response->manufacturer(),
96-
.esphome_version = response->esphome_version(),
97-
.webserver_port = static_cast<uint16_t>(response->webserver_port()),
98-
.suggested_area = response->suggested_area(),
96+
.uses_password = message->uses_password(),
97+
.has_deep_sleep = message->has_deep_sleep(),
98+
.name = message->name(),
99+
.friendly_name = message->friendly_name(),
100+
.mac_address = message->mac_address(),
101+
.compilation_time = message->compilation_time(), // todo: maybe parse directly into std::chrono?
102+
.model = message->model(),
103+
.manufacturer = message->manufacturer(),
104+
.esphome_version = message->esphome_version(),
105+
.webserver_port = static_cast<uint16_t>(message->webserver_port()),
106+
.suggested_area = message->suggested_area(),
99107
};
100108
}
101109

102110
AsyncResult<std::vector<EntityInfo>> ApiConnection::request_entities_and_services()
103111
{
104112
proto::ListEntitiesRequest request;
113+
auto message_receiver = receive_messages<proto::ListEntitiesDoneResponse,
114+
proto::ListEntitiesAlarmControlPanelResponse,
115+
proto::ListEntitiesBinarySensorResponse,
116+
proto::ListEntitiesButtonResponse,
117+
proto::ListEntitiesCameraResponse,
118+
proto::ListEntitiesClimateResponse,
119+
proto::ListEntitiesCoverResponse,
120+
proto::ListEntitiesDateResponse,
121+
proto::ListEntitiesDateTimeResponse,
122+
proto::ListEntitiesEventResponse,
123+
proto::ListEntitiesFanResponse,
124+
proto::ListEntitiesLightResponse,
125+
proto::ListEntitiesLockResponse,
126+
proto::ListEntitiesMediaPlayerResponse,
127+
proto::ListEntitiesNumberResponse,
128+
proto::ListEntitiesSelectResponse,
129+
proto::ListEntitiesSensorResponse,
130+
proto::ListEntitiesServicesResponse,
131+
proto::ListEntitiesSwitchResponse,
132+
proto::ListEntitiesTextResponse,
133+
proto::ListEntitiesTextSensorResponse,
134+
proto::ListEntitiesTimeResponse,
135+
proto::ListEntitiesUpdateResponse,
136+
proto::ListEntitiesValveResponse>(asio::deferred);
105137
REQUIRE_SUCCESS(co_await send_message(request));
106-
const auto messages = co_await receive_messages<proto::ListEntitiesDoneResponse,
107-
proto::ListEntitiesAlarmControlPanelResponse,
108-
proto::ListEntitiesBinarySensorResponse,
109-
proto::ListEntitiesButtonResponse,
110-
proto::ListEntitiesCameraResponse,
111-
proto::ListEntitiesClimateResponse,
112-
proto::ListEntitiesCoverResponse,
113-
proto::ListEntitiesDateResponse,
114-
proto::ListEntitiesDateTimeResponse,
115-
proto::ListEntitiesEventResponse,
116-
proto::ListEntitiesFanResponse,
117-
proto::ListEntitiesLightResponse,
118-
proto::ListEntitiesLockResponse,
119-
proto::ListEntitiesMediaPlayerResponse,
120-
proto::ListEntitiesNumberResponse,
121-
proto::ListEntitiesSelectResponse,
122-
proto::ListEntitiesSensorResponse,
123-
proto::ListEntitiesServicesResponse,
124-
proto::ListEntitiesSwitchResponse,
125-
proto::ListEntitiesTextResponse,
126-
proto::ListEntitiesTextSensorResponse,
127-
proto::ListEntitiesTimeResponse,
128-
proto::ListEntitiesUpdateResponse,
129-
proto::ListEntitiesValveResponse>();
138+
auto messages = co_await std::move(message_receiver);
130139
REQUIRE_SUCCESS(messages);
131140

132141
for (auto &&msg : messages.value())
133142
{
134-
std::println("GOT LIST .{}", std::visit([](auto &&msg) { return msg.key(); }, msg));
143+
std::println("GOT LIST .{}", std::visit([](auto &&msg) { return msg->key(); }, msg));
135144
}
136145
co_return std::vector<EntityInfo>{};
137146
}
@@ -150,7 +159,7 @@ AsyncResult<void> ApiConnection::light_command(LightCommand light_command)
150159

151160
AsyncResult<void> ApiConnection::send_message(const google::protobuf::Message &message)
152161
{
153-
const auto packet = plain_text_serialize(message);
162+
const auto packet = PlainTextProtocol::serialize(message);
154163
if (packet.has_value())
155164
{
156165
const auto written = co_await socket_.async_write_some(asio::buffer(packet.value()));
@@ -176,4 +185,88 @@ const std::string &ApiConnection::device_name() const
176185
{
177186
return device_name_;
178187
}
188+
189+
boost::asio::awaitable<void> ApiConnection::subscribe_logs()
190+
{
191+
proto::SubscribeLogsRequest request;
192+
request.set_dump_config(true);
193+
request.set_level(::cppesphomeapi::proto::LogLevel::LOG_LEVEL_VERY_VERBOSE);
194+
co_await send_message(request);
195+
while (true)
196+
{
197+
auto response = co_await receive_message<proto::SubscribeLogsResponse>(asio::use_awaitable);
198+
if (response.has_value())
199+
{
200+
std::println("Got log message {}", response.value()->message());
201+
}
202+
}
203+
}
204+
205+
boost::asio::awaitable<void> ApiConnection::async_receive()
206+
{
207+
namespace asio = boost::asio;
208+
std::array<std::uint8_t, 2048> buffer{};
209+
bool do_receive{true};
210+
211+
while (do_receive)
212+
{
213+
const auto received_bytes = co_await socket_.async_receive(asio::buffer(buffer));
214+
auto result = PlainTextProtocol{}
215+
.decode_multiple<proto::SubscribeLogsResponse,
216+
proto::DeviceInfoResponse,
217+
proto::ConnectResponse,
218+
proto::HelloResponse,
219+
proto::DisconnectResponse,
220+
proto::ListEntitiesDoneResponse,
221+
proto::ListEntitiesAlarmControlPanelResponse,
222+
proto::ListEntitiesBinarySensorResponse,
223+
proto::ListEntitiesButtonResponse,
224+
proto::ListEntitiesCameraResponse,
225+
proto::ListEntitiesClimateResponse,
226+
proto::ListEntitiesCoverResponse,
227+
proto::ListEntitiesDateResponse,
228+
proto::ListEntitiesDateTimeResponse,
229+
proto::ListEntitiesEventResponse,
230+
proto::ListEntitiesFanResponse,
231+
proto::ListEntitiesLightResponse,
232+
proto::ListEntitiesLockResponse,
233+
proto::ListEntitiesMediaPlayerResponse,
234+
proto::ListEntitiesNumberResponse,
235+
proto::ListEntitiesSelectResponse,
236+
proto::ListEntitiesSensorResponse,
237+
proto::ListEntitiesServicesResponse,
238+
proto::ListEntitiesSwitchResponse,
239+
proto::ListEntitiesTextResponse,
240+
proto::ListEntitiesTextSensorResponse,
241+
proto::ListEntitiesTimeResponse,
242+
proto::ListEntitiesUpdateResponse,
243+
proto::ListEntitiesValveResponse>(
244+
std::span{buffer.begin(), received_bytes}, [this](auto &&message) {
245+
std::vector<boost::asio::any_completion_handler<void(MessageWrapper)>> handlers;
246+
{
247+
std::unique_lock l{handler_mtx_};
248+
handlers = std::exchange(handlers_, {});
249+
}
250+
// todo: add small ring buffer if the handlers are empty or try to return the
251+
// acceptance from the handler.
252+
for (auto &&handler : handlers)
253+
{
254+
auto work = boost::asio::make_work_guard(handler);
255+
auto alloc = boost::asio::get_associated_allocator(
256+
handler, boost::asio::recycling_allocator<void>());
257+
258+
// Dispatch the completion handler through the handler's associated
259+
// executor, using the handler's associated allocator.
260+
boost::asio::dispatch(
261+
work.get_executor(),
262+
boost::asio::bind_allocator(
263+
alloc, [handler = std::move(handler), result = message]() mutable {
264+
std::move(handler)(std::move(result));
265+
}));
266+
}
267+
// std::println("Received message {}", message->GetTypeName());
268+
});
269+
}
270+
std::println("RECEIVE ENDED!");
271+
}
179272
} // namespace cppesphomeapi

0 commit comments

Comments
 (0)