From 06017fbfc12bbd2eb7fec022701972d011ea936d Mon Sep 17 00:00:00 2001 From: Gluzskiy Alexandr Date: Wed, 24 Jun 2020 06:30:07 +0300 Subject: [PATCH] first portion of changes use writeback to external io handler in transport_write if external io is in use logic fix implemented write handler for external io added support for external io mode into transport_read_pdu instead of duplicating it. added missed EOL in new file logic fix dropped unneeded 'else' WIP: making whole io switchable moved streampool related code to separate function implemented switchable Read implemented switchable write changes requested by akallabeth DataHandler inplementation should not be switched, it is intended to be used from external user use SSIZE_T type for signed size type improoved 'freerdp_set_transport_callbacks' logic a bit overwrite already allocated rdpIoUpdate instead of changing pointer to new one dropped noisy log messages --- include/freerdp/freerdp.h | 1 + include/freerdp/io.h | 51 +++++++ include/freerdp/settings.h | 1 + include/freerdp/update.h | 4 +- libfreerdp/core/freerdp.c | 2 + libfreerdp/core/peer.c | 1 + libfreerdp/core/transport.c | 278 +++++++++++++++++++++++++++--------- libfreerdp/core/transport.h | 4 + libfreerdp/core/update.c | 16 ++- 9 files changed, 288 insertions(+), 70 deletions(-) create mode 100644 include/freerdp/io.h diff --git a/include/freerdp/freerdp.h b/include/freerdp/freerdp.h index 2302341897385..6627ef1320b42 100644 --- a/include/freerdp/freerdp.h +++ b/include/freerdp/freerdp.h @@ -503,6 +503,7 @@ extern "C" FREERDP_API const char* freerdp_get_logon_error_info_data(UINT32 data); FREERDP_API ULONG freerdp_get_transport_sent(rdpContext* context, BOOL resetCount); + FREERDP_API void freerdp_set_transport_callbacks(rdpContext* context, void *io_callbacks); FREERDP_API BOOL freerdp_nla_impersonate(rdpContext* context); FREERDP_API BOOL freerdp_nla_revert_to_self(rdpContext* context); diff --git a/include/freerdp/io.h b/include/freerdp/io.h new file mode 100644 index 0000000000000..f3c956ad37cae --- /dev/null +++ b/include/freerdp/io.h @@ -0,0 +1,51 @@ +/** + * FreeRDP: A Remote Desktop Protocol Implementation + * IO Update Interface API + * + * Copyright 2020 Gluzskiy Alexandr + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FREERDP_UPDATE_IO_H +#define FREERDP_UPDATE_IO_H + +#include + +typedef int (*pRead)(rdpContext* context, const uint8_t* buf, size_t buf_size); +typedef int (*pWrite)(rdpContext* context, const uint8_t* buf, size_t buf_size); +typedef int (*pDataHandler)(rdpContext* context, const uint8_t* buf, size_t buf_size); + +struct rdp_io_update +{ + rdpContext* context; /* 0 */ + UINT32 paddingA[16 - 1]; /* 1 */ + + /* switchable read + * used to read bytes from IO backend */ + pWrite Read; /* 16 */ + + /* switchable write + * used to write bytes to IO backend */ + pWrite Write; /* 17 */ + + /* switchable data handler + * used if IO backed doing internal polling and reading + * and just passing recieved data to freerdp */ + pDataHandler DataHandler; /* 18 */ + UINT32 paddingB[32 - 19]; /* 19 */ +}; +typedef struct rdp_io_update rdpIoUpdate; + + +#endif /* FREERDP_UPDATE_IO_H */ diff --git a/include/freerdp/settings.h b/include/freerdp/settings.h index 99bc0d2ddbe8c..aa4f3699001cd 100644 --- a/include/freerdp/settings.h +++ b/include/freerdp/settings.h @@ -1559,6 +1559,7 @@ struct rdp_settings default value - currently UNUSED! */ ALIGN64 char* ActionScript; ALIGN64 DWORD Floatbar; + }; typedef struct rdp_settings rdpSettings; diff --git a/include/freerdp/update.h b/include/freerdp/update.h index d2797a28e320b..6d4434d2ac261 100644 --- a/include/freerdp/update.h +++ b/include/freerdp/update.h @@ -40,6 +40,7 @@ typedef struct rdp_update rdpUpdate; #include #include #include +#include /* Bitmap Updates */ #define EX_COMPRESSED_BITMAP_HEADER_PRESENT 0x01 @@ -223,7 +224,8 @@ struct rdp_update rdpSecondaryUpdate* secondary; /* 34 */ rdpAltSecUpdate* altsec; /* 35 */ rdpWindowUpdate* window; /* 36 */ - UINT32 paddingC[48 - 37]; /* 37 */ + rdpIoUpdate* io; /* 37 */ + UINT32 paddingC[48 - 38]; /* 38 */ pRefreshRect RefreshRect; /* 48 */ pSuppressOutput SuppressOutput; /* 49 */ diff --git a/libfreerdp/core/freerdp.c b/libfreerdp/core/freerdp.c index 9094bc346713f..34d0e2df1a90c 100644 --- a/libfreerdp/core/freerdp.c +++ b/libfreerdp/core/freerdp.c @@ -706,6 +706,8 @@ BOOL freerdp_context_new(freerdp* instance) update_register_client_callbacks(rdp->update); instance->context->abortEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + transport_register_default_io_callbacks(rdp->update); + if (!instance->context->abortEvent) goto fail; diff --git a/libfreerdp/core/peer.c b/libfreerdp/core/peer.c index 9a69900dd5af4..47ec85e9f6d94 100644 --- a/libfreerdp/core/peer.c +++ b/libfreerdp/core/peer.c @@ -811,6 +811,7 @@ BOOL freerdp_peer_context_new(freerdp_peer* client) client->autodetect->context = context; update_register_server_callbacks(client->update); autodetect_register_server_callbacks(client->autodetect); + transport_register_default_io_callbacks(rdp->update); if (!(context->errorDescription = calloc(1, 500))) { diff --git a/libfreerdp/core/transport.c b/libfreerdp/core/transport.c index 40795e941e916..ef79f44aff846 100644 --- a/libfreerdp/core/transport.c +++ b/libfreerdp/core/transport.c @@ -626,15 +626,19 @@ static SSIZE_T transport_read_layer(rdpTransport* transport, BYTE* data, size_t static SSIZE_T transport_read_layer_bytes(rdpTransport* transport, wStream* s, size_t toRead) { SSIZE_T status; - if (toRead > SSIZE_MAX) + rdpContext *context = NULL; + if (!transport) return 0; - - status = transport_read_layer(transport, Stream_Pointer(s), toRead); + context = transport->context; + if (!context) + return 0; + status = context->update->io->Read(context, Stream_Pointer(s), toRead); if (status <= 0) return status; Stream_Seek(s, (size_t)status); + return status == (SSIZE_T)toRead ? 1 : 0; } @@ -652,7 +656,30 @@ static SSIZE_T transport_read_layer_bytes(rdpTransport* transport, wStream* s, s */ int transport_read_pdu(rdpTransport* transport, wStream* s) { - int status; + int status = 0, read = 0; + SSIZE_T left = 0; + for (status = transport_handle_pdu(transport, s, &left); !status; + status = transport_handle_pdu(transport, s, &left)) + { + read = transport_read_layer_bytes(transport, s, left ? left : 1); + WLog_Print(transport->log, WLOG_DEBUG, "read: %d, left to read: %d", read, left); + if (read < 0) + return read; + } + WLog_Print(transport->log, WLOG_DEBUG, "status: %d", status); + return status; +} + +/** + * @brief Handle a complete PDU (NLA, fast-path or tpkt) from the filled wStream buffer. + * + * @param[in] transport rdpTransport + * @param[in] s wStream + * @return < 0 on error; 0 if not enough data is available; > 0 number of + * bytes of the *complete* pdu + */ +int transport_handle_pdu(rdpTransport* transport, wStream* s, SSIZE_T *left_to_read) +{ size_t position; size_t pduLength; BYTE* header; @@ -671,11 +698,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) return -1; /* Make sure at least two bytes are read for further processing */ - if (position < 2 && (status = transport_read_layer_bytes(transport, s, 2 - position)) != 1) - { - /* No data available at the moment */ - return status; - } + if (position < 2) + return 0; /* update position value for further checks */ position = Stream_GetPosition(s); @@ -697,9 +721,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) if ((header[1] & ~(0x80)) == 1) { /* check for header bytes already was readed in previous calls */ - if (position < 3 && - (status = transport_read_layer_bytes(transport, s, 3 - position)) != 1) - return status; + if (position < 3) + return 0; pduLength = header[2]; pduLength += 3; @@ -707,9 +730,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) else if ((header[1] & ~(0x80)) == 2) { /* check for header bytes already was readed in previous calls */ - if (position < 4 && - (status = transport_read_layer_bytes(transport, s, 4 - position)) != 1) - return status; + if (position < 4) + return 0; pduLength = (header[2] << 8) | header[3]; pduLength += 4; @@ -733,9 +755,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) { /* TPKT header */ /* check for header bytes already was readed in previous calls */ - if (position < 4 && - (status = transport_read_layer_bytes(transport, s, 4 - position)) != 1) - return status; + if (position < 4) + return 0; pduLength = (header[2] << 8) | header[3]; @@ -753,10 +774,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) if (header[1] & 0x80) { /* check for header bytes already was readed in previous calls */ - if (position < 3 && - (status = transport_read_layer_bytes(transport, s, 3 - position)) != 1) - return status; - + if (position < 3) + return 0; pduLength = ((header[1] & 0x7F) << 8) | header[2]; } else @@ -779,10 +798,13 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) if (!Stream_EnsureCapacity(s, Stream_GetPosition(s) + pduLength)) return -1; - status = transport_read_layer_bytes(transport, s, pduLength - Stream_GetPosition(s)); + if (left_to_read) + *left_to_read = pduLength - Stream_GetPosition(s); - if (status != 1) - return status; + + /* pdu not yet complete */ + if (Stream_GetPosition(s) < pduLength) + return 0; if (Stream_GetPosition(s) >= pduLength) WLog_Packet(transport->log, WLOG_TRACE, Stream_Buffer(s), pduLength, WLOG_PACKET_INBOUND); @@ -792,18 +814,97 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) return Stream_Length(s); } -int transport_write(rdpTransport* transport, wStream* s) +static BOOL transport_prepare_stream(rdpTransport *transport, wStream *s) { - size_t length; - int status = -1; - int writtenlength = 0; - rdpRdp* rdp = transport->context->rdp; + if (!(transport->ReceiveBuffer = StreamPool_Take(transport->ReceivePool, 0))) + { + Stream_Release(s); + return FALSE; + } + if (transport->NextPDUBytesLeft < 0) + { + /* if we currenlty have part of next pdu in buffer, + * we need to move it into next pdu's buffer */ + Stream_SetPosition(s, Stream_Length(s) + transport->NextPDUBytesLeft); + Stream_Read(s, transport->ReceiveBuffer->pointer, -(transport->NextPDUBytesLeft)); + transport->NextPDUBytesLeft = 0; + } + Stream_Release(s); + return TRUE; +} - if (!s) +/** + * @brief Callback function to handle bytes from io backends + * + * @param[in] context rdpContext + * @param[in] buf const uint8_t* + * @param[in] buf_size size_t + * @return count of bytes succesfully handled, -1 on error + */ +static int transport_io_data_handler(rdpContext* context, + const uint8_t* buf, size_t buf_size) +{ + rdpTransport *transport = NULL; + wStream *pdu = NULL; + if (buf_size > SSIZE_MAX) return -1; - + if (!context->rdp) + return -1; + transport = context->rdp->transport; if (!transport) - goto fail; + return -1; + + Stream_Write(transport->ReceiveBuffer, buf, buf_size); + pdu = transport->ReceiveBuffer; + if (transport_handle_pdu(transport, pdu, 0) < 1) + return 0; + transport->ReceiveCallback(transport, pdu, transport->ReceiveExtra); + + if (!transport_prepare_stream(transport, pdu)) + return -1; + + return buf_size; +} + +/** + * @brief Callback function for io backends read implementation + * + * @param[in] context rdpContext + * @param[out] buf const uint8_t* + * @param[in] buf_size size_t + * @return count of bytes succesfully handled + */ +static int transport_io_data_read(rdpContext* context, + const uint8_t* buf, size_t buf_size) +{ + SSIZE_T status; + rdpTransport *transport = NULL; + if (buf_size > SSIZE_MAX) + return 0; + + transport = context->rdp->transport; + + status = transport_read_layer(transport, (BYTE*)buf, buf_size); + + return status; +} + +/** + * @brief Callback function for io backend write implementation + * + * @param[in] context rdpContext + * @param[in] buf const uint8_t* + * @param[in] buf_size size_t + * @return count of bytes succesfully handled + */ +static int transport_io_data_write(rdpContext* context, + const uint8_t* buf, size_t buf_size) +{ + int status = -1; + size_t length; + int writtenlength = 0; + rdpRdp *rdp = context->rdp; + rdpTransport *transport = context->rdp->transport; if (!transport->frontBio) { @@ -813,19 +914,19 @@ int transport_write(rdpTransport* transport, wStream* s) } EnterCriticalSection(&(transport->WriteLock)); - length = Stream_GetPosition(s); + length = buf_size; writtenlength = length; - Stream_SetPosition(s, 0); if (length > 0) { rdp->outBytes += length; - WLog_Packet(transport->log, WLOG_TRACE, Stream_Buffer(s), length, WLOG_PACKET_OUTBOUND); + WLog_Packet(transport->log, WLOG_TRACE, buf, length, WLOG_PACKET_OUTBOUND); } while (length > 0) { - status = BIO_write(transport->frontBio, Stream_Pointer(s), length); + status = BIO_write(transport->frontBio, buf + writtenlength - length, + length); if (status <= 0) { @@ -877,10 +978,8 @@ int transport_write(rdpTransport* transport, wStream* s) } length -= status; - Stream_Seek(s, status); } - transport->written += writtenlength; out_cleanup: if (status < 0) @@ -891,6 +990,51 @@ int transport_write(rdpTransport* transport, wStream* s) } LeaveCriticalSection(&(transport->WriteLock)); +fail: + if (!length) + return writtenlength; + return status; +} + +void transport_register_default_io_callbacks(rdpUpdate* update) +{ + rdpIoUpdate io = { 0 }; + io.context = update->context; + io.DataHandler = transport_io_data_handler; + io.Read = transport_io_data_read; + io.Write = transport_io_data_write; + freerdp_set_transport_callbacks(update->context, &io); +} + + +void freerdp_set_transport_callbacks(rdpContext* context, void *io_callbacks) +{ + rdpIoUpdate* io = ((rdpIoUpdate*)io_callbacks); + /* NOTE: DataHandler should not be switched */ + io->DataHandler = transport_io_data_handler; + /* NOTE: add context ponter in case user forgot to do so */ + io->context = context; + *(context->update->io) = *io; +} + + +int transport_write(rdpTransport* transport, wStream* s) +{ + int status = -1; + rdpRdp* rdp = NULL; + + if (!s) + return -1; + + if (!transport) + goto fail; + + rdp = transport->context->rdp; + + status = rdp->update->io->Write(rdp->context, Stream_Buffer(s), Stream_GetPosition(s)); + + if (status > 0) + transport->written += status; fail: Stream_Release(s); return status; @@ -1002,8 +1146,9 @@ int transport_check_fds(rdpTransport* transport) int status; int recv_status; wStream* received; - UINT64 now = GetTickCount64(); - UINT64 dueDate = 0; +// UINT64 now = GetTickCount64(); +// UINT64 dueDate = 0; +// WLog_Print(transport->log, WLOG_DEBUG, "transport_check_fds"); if (!transport) return -1; @@ -1015,43 +1160,44 @@ int transport_check_fds(rdpTransport* transport) return -1; } - dueDate = now + transport->settings->MaxTimeInCheckLoop; + // dueDate = now + transport->settings->MaxTimeInCheckLoop; - if (transport->haveMoreBytesToRead) +/* if (transport->haveMoreBytesToRead) { transport->haveMoreBytesToRead = FALSE; ResetEvent(transport->rereadEvent); - } + } */ - while (now < dueDate) +// while (now < dueDate) { if (freerdp_shall_disconnect(transport->context->instance)) { return -1; } - /** - * Note: transport_read_pdu tries to read one PDU from - * the transport layer. - * The ReceiveBuffer might have a position > 0 in case of a non blocking - * transport. If transport_read_pdu returns 0 the pdu couldn't be read at - * this point. - * Note that transport->ReceiveBuffer is replaced after each iteration - * of this loop with a fresh stream instance from a pool. - */ - if ((status = transport_read_pdu(transport, transport->ReceiveBuffer)) <= 0) + if ((status = transport_read_layer_bytes(transport, transport->ReceiveBuffer, + transport->NextPDUBytesLeft ? transport->NextPDUBytesLeft : 1)) <= 0) { if (status < 0) WLog_Print(transport->log, WLOG_DEBUG, - "transport_check_fds: transport_read_pdu() - %i", status); + "transport_check_fds: transport_read_layer_bytes() - %i", status); + + return status; + } + if ((status = transport_handle_pdu(transport, transport->ReceiveBuffer, + &transport->NextPDUBytesLeft)) <= 0) + { + if (status < 0) + WLog_Print(transport->log, WLOG_DEBUG, + "transport_check_fds: transport_handle_pdu() - %i", status); return status; } received = transport->ReceiveBuffer; - if (!(transport->ReceiveBuffer = StreamPool_Take(transport->ReceivePool, 0))) - return -1; +/* if (!(transport->ReceiveBuffer = StreamPool_Take(transport->ReceivePool, 0))) + return -1; */ /** * status: @@ -1060,7 +1206,9 @@ int transport_check_fds(rdpTransport* transport) * 1: redirection */ recv_status = transport->ReceiveCallback(transport, received, transport->ReceiveExtra); - Stream_Release(received); + + if (!transport_prepare_stream(transport, received)) + return -1; /* session redirection or activation */ if (recv_status == 1 || recv_status == 2) @@ -1075,14 +1223,14 @@ int transport_check_fds(rdpTransport* transport) return -1; } - now = GetTickCount64(); + //now = GetTickCount64(); } - if (now >= dueDate) - { - SetEvent(transport->rereadEvent); - transport->haveMoreBytesToRead = TRUE; - } +/* if (now >= dueDate) + { */ +// SetEvent(transport->rereadEvent); +// transport->haveMoreBytesToRead = TRUE; +// } return 0; } diff --git a/libfreerdp/core/transport.h b/libfreerdp/core/transport.h index 944f5ce30ded1..feb63b35af615 100644 --- a/libfreerdp/core/transport.h +++ b/libfreerdp/core/transport.h @@ -65,6 +65,7 @@ struct rdp_transport rdpSettings* settings; void* ReceiveExtra; wStream* ReceiveBuffer; + SSIZE_T NextPDUBytesLeft; TransportRecv ReceiveCallback; wStreamPool* ReceivePool; HANDLE connectedEvent; @@ -92,6 +93,7 @@ FREERDP_LOCAL BOOL transport_accept_tls(rdpTransport* transport); FREERDP_LOCAL BOOL transport_accept_nla(rdpTransport* transport); FREERDP_LOCAL int transport_read_pdu(rdpTransport* transport, wStream* s); +FREERDP_LOCAL int transport_handle_pdu(rdpTransport* transport, wStream* s, SSIZE_T *left_to_read); FREERDP_LOCAL int transport_write(rdpTransport* transport, wStream* s); FREERDP_LOCAL void transport_get_fds(rdpTransport* transport, void** rfds, int* rcount); @@ -112,4 +114,6 @@ FREERDP_LOCAL int transport_receive_pool_return(rdpTransport* transport, wStream FREERDP_LOCAL rdpTransport* transport_new(rdpContext* context); FREERDP_LOCAL void transport_free(rdpTransport* transport); +FREERDP_LOCAL void transport_register_default_io_callbacks(rdpUpdate* update); + #endif /* FREERDP_LIB_CORE_TRANSPORT_H */ diff --git a/libfreerdp/core/update.c b/libfreerdp/core/update.c index ebb82fc2df7ea..780a0eb9851b4 100644 --- a/libfreerdp/core/update.c +++ b/libfreerdp/core/update.c @@ -2936,6 +2936,11 @@ rdpUpdate* update_new(rdpRdp* rdp) if (!update->window) goto fail; + update->io = (rdpIoUpdate*)calloc(1, sizeof(rdpIoUpdate)); + + if (!update->io) + goto fail; + deleteList = &(update->altsec->create_offscreen_bitmap.deleteList); deleteList->sIndices = 64; deleteList->indices = calloc(deleteList->sIndices, 2); @@ -2978,12 +2983,15 @@ void update_free(rdpUpdate* update) } free(update->secondary); + update->secondary = NULL; free(update->altsec); + update->altsec = NULL; - if (update->window) - { - free(update->window); - } + free(update->window); + update->window = NULL; + + free(update->io); + update->io = NULL; MessageQueue_Free(update->queue); DeleteCriticalSection(&update->mux);