diff --git a/include/freerdp/freerdp.h b/include/freerdp/freerdp.h index 2302341897385..6627ef1320b42 100644 --- a/include/freerdp/freerdp.h +++ b/include/freerdp/freerdp.h @@ -503,6 +503,7 @@ extern "C" FREERDP_API const char* freerdp_get_logon_error_info_data(UINT32 data); FREERDP_API ULONG freerdp_get_transport_sent(rdpContext* context, BOOL resetCount); + FREERDP_API void freerdp_set_transport_callbacks(rdpContext* context, void *io_callbacks); FREERDP_API BOOL freerdp_nla_impersonate(rdpContext* context); FREERDP_API BOOL freerdp_nla_revert_to_self(rdpContext* context); diff --git a/include/freerdp/io.h b/include/freerdp/io.h new file mode 100644 index 0000000000000..f3c956ad37cae --- /dev/null +++ b/include/freerdp/io.h @@ -0,0 +1,51 @@ +/** + * FreeRDP: A Remote Desktop Protocol Implementation + * IO Update Interface API + * + * Copyright 2020 Gluzskiy Alexandr + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FREERDP_UPDATE_IO_H +#define FREERDP_UPDATE_IO_H + +#include + +typedef int (*pRead)(rdpContext* context, const uint8_t* buf, size_t buf_size); +typedef int (*pWrite)(rdpContext* context, const uint8_t* buf, size_t buf_size); +typedef int (*pDataHandler)(rdpContext* context, const uint8_t* buf, size_t buf_size); + +struct rdp_io_update +{ + rdpContext* context; /* 0 */ + UINT32 paddingA[16 - 1]; /* 1 */ + + /* switchable read + * used to read bytes from IO backend */ + pWrite Read; /* 16 */ + + /* switchable write + * used to write bytes to IO backend */ + pWrite Write; /* 17 */ + + /* switchable data handler + * used if IO backed doing internal polling and reading + * and just passing recieved data to freerdp */ + pDataHandler DataHandler; /* 18 */ + UINT32 paddingB[32 - 19]; /* 19 */ +}; +typedef struct rdp_io_update rdpIoUpdate; + + +#endif /* FREERDP_UPDATE_IO_H */ diff --git a/include/freerdp/settings.h b/include/freerdp/settings.h index 99bc0d2ddbe8c..aa4f3699001cd 100644 --- a/include/freerdp/settings.h +++ b/include/freerdp/settings.h @@ -1559,6 +1559,7 @@ struct rdp_settings default value - currently UNUSED! */ ALIGN64 char* ActionScript; ALIGN64 DWORD Floatbar; + }; typedef struct rdp_settings rdpSettings; diff --git a/include/freerdp/update.h b/include/freerdp/update.h index d2797a28e320b..6d4434d2ac261 100644 --- a/include/freerdp/update.h +++ b/include/freerdp/update.h @@ -40,6 +40,7 @@ typedef struct rdp_update rdpUpdate; #include #include #include +#include /* Bitmap Updates */ #define EX_COMPRESSED_BITMAP_HEADER_PRESENT 0x01 @@ -223,7 +224,8 @@ struct rdp_update rdpSecondaryUpdate* secondary; /* 34 */ rdpAltSecUpdate* altsec; /* 35 */ rdpWindowUpdate* window; /* 36 */ - UINT32 paddingC[48 - 37]; /* 37 */ + rdpIoUpdate* io; /* 37 */ + UINT32 paddingC[48 - 38]; /* 38 */ pRefreshRect RefreshRect; /* 48 */ pSuppressOutput SuppressOutput; /* 49 */ diff --git a/libfreerdp/core/freerdp.c b/libfreerdp/core/freerdp.c index 9094bc346713f..34d0e2df1a90c 100644 --- a/libfreerdp/core/freerdp.c +++ b/libfreerdp/core/freerdp.c @@ -706,6 +706,8 @@ BOOL freerdp_context_new(freerdp* instance) update_register_client_callbacks(rdp->update); instance->context->abortEvent = CreateEvent(NULL, TRUE, FALSE, NULL); + transport_register_default_io_callbacks(rdp->update); + if (!instance->context->abortEvent) goto fail; diff --git a/libfreerdp/core/peer.c b/libfreerdp/core/peer.c index 9a69900dd5af4..47ec85e9f6d94 100644 --- a/libfreerdp/core/peer.c +++ b/libfreerdp/core/peer.c @@ -811,6 +811,7 @@ BOOL freerdp_peer_context_new(freerdp_peer* client) client->autodetect->context = context; update_register_server_callbacks(client->update); autodetect_register_server_callbacks(client->autodetect); + transport_register_default_io_callbacks(rdp->update); if (!(context->errorDescription = calloc(1, 500))) { diff --git a/libfreerdp/core/transport.c b/libfreerdp/core/transport.c index 40795e941e916..ef79f44aff846 100644 --- a/libfreerdp/core/transport.c +++ b/libfreerdp/core/transport.c @@ -626,15 +626,19 @@ static SSIZE_T transport_read_layer(rdpTransport* transport, BYTE* data, size_t static SSIZE_T transport_read_layer_bytes(rdpTransport* transport, wStream* s, size_t toRead) { SSIZE_T status; - if (toRead > SSIZE_MAX) + rdpContext *context = NULL; + if (!transport) return 0; - - status = transport_read_layer(transport, Stream_Pointer(s), toRead); + context = transport->context; + if (!context) + return 0; + status = context->update->io->Read(context, Stream_Pointer(s), toRead); if (status <= 0) return status; Stream_Seek(s, (size_t)status); + return status == (SSIZE_T)toRead ? 1 : 0; } @@ -652,7 +656,30 @@ static SSIZE_T transport_read_layer_bytes(rdpTransport* transport, wStream* s, s */ int transport_read_pdu(rdpTransport* transport, wStream* s) { - int status; + int status = 0, read = 0; + SSIZE_T left = 0; + for (status = transport_handle_pdu(transport, s, &left); !status; + status = transport_handle_pdu(transport, s, &left)) + { + read = transport_read_layer_bytes(transport, s, left ? left : 1); + WLog_Print(transport->log, WLOG_DEBUG, "read: %d, left to read: %d", read, left); + if (read < 0) + return read; + } + WLog_Print(transport->log, WLOG_DEBUG, "status: %d", status); + return status; +} + +/** + * @brief Handle a complete PDU (NLA, fast-path or tpkt) from the filled wStream buffer. + * + * @param[in] transport rdpTransport + * @param[in] s wStream + * @return < 0 on error; 0 if not enough data is available; > 0 number of + * bytes of the *complete* pdu + */ +int transport_handle_pdu(rdpTransport* transport, wStream* s, SSIZE_T *left_to_read) +{ size_t position; size_t pduLength; BYTE* header; @@ -671,11 +698,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) return -1; /* Make sure at least two bytes are read for further processing */ - if (position < 2 && (status = transport_read_layer_bytes(transport, s, 2 - position)) != 1) - { - /* No data available at the moment */ - return status; - } + if (position < 2) + return 0; /* update position value for further checks */ position = Stream_GetPosition(s); @@ -697,9 +721,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) if ((header[1] & ~(0x80)) == 1) { /* check for header bytes already was readed in previous calls */ - if (position < 3 && - (status = transport_read_layer_bytes(transport, s, 3 - position)) != 1) - return status; + if (position < 3) + return 0; pduLength = header[2]; pduLength += 3; @@ -707,9 +730,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) else if ((header[1] & ~(0x80)) == 2) { /* check for header bytes already was readed in previous calls */ - if (position < 4 && - (status = transport_read_layer_bytes(transport, s, 4 - position)) != 1) - return status; + if (position < 4) + return 0; pduLength = (header[2] << 8) | header[3]; pduLength += 4; @@ -733,9 +755,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) { /* TPKT header */ /* check for header bytes already was readed in previous calls */ - if (position < 4 && - (status = transport_read_layer_bytes(transport, s, 4 - position)) != 1) - return status; + if (position < 4) + return 0; pduLength = (header[2] << 8) | header[3]; @@ -753,10 +774,8 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) if (header[1] & 0x80) { /* check for header bytes already was readed in previous calls */ - if (position < 3 && - (status = transport_read_layer_bytes(transport, s, 3 - position)) != 1) - return status; - + if (position < 3) + return 0; pduLength = ((header[1] & 0x7F) << 8) | header[2]; } else @@ -779,10 +798,13 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) if (!Stream_EnsureCapacity(s, Stream_GetPosition(s) + pduLength)) return -1; - status = transport_read_layer_bytes(transport, s, pduLength - Stream_GetPosition(s)); + if (left_to_read) + *left_to_read = pduLength - Stream_GetPosition(s); - if (status != 1) - return status; + + /* pdu not yet complete */ + if (Stream_GetPosition(s) < pduLength) + return 0; if (Stream_GetPosition(s) >= pduLength) WLog_Packet(transport->log, WLOG_TRACE, Stream_Buffer(s), pduLength, WLOG_PACKET_INBOUND); @@ -792,18 +814,97 @@ int transport_read_pdu(rdpTransport* transport, wStream* s) return Stream_Length(s); } -int transport_write(rdpTransport* transport, wStream* s) +static BOOL transport_prepare_stream(rdpTransport *transport, wStream *s) { - size_t length; - int status = -1; - int writtenlength = 0; - rdpRdp* rdp = transport->context->rdp; + if (!(transport->ReceiveBuffer = StreamPool_Take(transport->ReceivePool, 0))) + { + Stream_Release(s); + return FALSE; + } + if (transport->NextPDUBytesLeft < 0) + { + /* if we currenlty have part of next pdu in buffer, + * we need to move it into next pdu's buffer */ + Stream_SetPosition(s, Stream_Length(s) + transport->NextPDUBytesLeft); + Stream_Read(s, transport->ReceiveBuffer->pointer, -(transport->NextPDUBytesLeft)); + transport->NextPDUBytesLeft = 0; + } + Stream_Release(s); + return TRUE; +} - if (!s) +/** + * @brief Callback function to handle bytes from io backends + * + * @param[in] context rdpContext + * @param[in] buf const uint8_t* + * @param[in] buf_size size_t + * @return count of bytes succesfully handled, -1 on error + */ +static int transport_io_data_handler(rdpContext* context, + const uint8_t* buf, size_t buf_size) +{ + rdpTransport *transport = NULL; + wStream *pdu = NULL; + if (buf_size > SSIZE_MAX) return -1; - + if (!context->rdp) + return -1; + transport = context->rdp->transport; if (!transport) - goto fail; + return -1; + + Stream_Write(transport->ReceiveBuffer, buf, buf_size); + pdu = transport->ReceiveBuffer; + if (transport_handle_pdu(transport, pdu, 0) < 1) + return 0; + transport->ReceiveCallback(transport, pdu, transport->ReceiveExtra); + + if (!transport_prepare_stream(transport, pdu)) + return -1; + + return buf_size; +} + +/** + * @brief Callback function for io backends read implementation + * + * @param[in] context rdpContext + * @param[out] buf const uint8_t* + * @param[in] buf_size size_t + * @return count of bytes succesfully handled + */ +static int transport_io_data_read(rdpContext* context, + const uint8_t* buf, size_t buf_size) +{ + SSIZE_T status; + rdpTransport *transport = NULL; + if (buf_size > SSIZE_MAX) + return 0; + + transport = context->rdp->transport; + + status = transport_read_layer(transport, (BYTE*)buf, buf_size); + + return status; +} + +/** + * @brief Callback function for io backend write implementation + * + * @param[in] context rdpContext + * @param[in] buf const uint8_t* + * @param[in] buf_size size_t + * @return count of bytes succesfully handled + */ +static int transport_io_data_write(rdpContext* context, + const uint8_t* buf, size_t buf_size) +{ + int status = -1; + size_t length; + int writtenlength = 0; + rdpRdp *rdp = context->rdp; + rdpTransport *transport = context->rdp->transport; if (!transport->frontBio) { @@ -813,19 +914,19 @@ int transport_write(rdpTransport* transport, wStream* s) } EnterCriticalSection(&(transport->WriteLock)); - length = Stream_GetPosition(s); + length = buf_size; writtenlength = length; - Stream_SetPosition(s, 0); if (length > 0) { rdp->outBytes += length; - WLog_Packet(transport->log, WLOG_TRACE, Stream_Buffer(s), length, WLOG_PACKET_OUTBOUND); + WLog_Packet(transport->log, WLOG_TRACE, buf, length, WLOG_PACKET_OUTBOUND); } while (length > 0) { - status = BIO_write(transport->frontBio, Stream_Pointer(s), length); + status = BIO_write(transport->frontBio, buf + writtenlength - length, + length); if (status <= 0) { @@ -877,10 +978,8 @@ int transport_write(rdpTransport* transport, wStream* s) } length -= status; - Stream_Seek(s, status); } - transport->written += writtenlength; out_cleanup: if (status < 0) @@ -891,6 +990,51 @@ int transport_write(rdpTransport* transport, wStream* s) } LeaveCriticalSection(&(transport->WriteLock)); +fail: + if (!length) + return writtenlength; + return status; +} + +void transport_register_default_io_callbacks(rdpUpdate* update) +{ + rdpIoUpdate io = { 0 }; + io.context = update->context; + io.DataHandler = transport_io_data_handler; + io.Read = transport_io_data_read; + io.Write = transport_io_data_write; + freerdp_set_transport_callbacks(update->context, &io); +} + + +void freerdp_set_transport_callbacks(rdpContext* context, void *io_callbacks) +{ + rdpIoUpdate* io = ((rdpIoUpdate*)io_callbacks); + /* NOTE: DataHandler should not be switched */ + io->DataHandler = transport_io_data_handler; + /* NOTE: add context ponter in case user forgot to do so */ + io->context = context; + *(context->update->io) = *io; +} + + +int transport_write(rdpTransport* transport, wStream* s) +{ + int status = -1; + rdpRdp* rdp = NULL; + + if (!s) + return -1; + + if (!transport) + goto fail; + + rdp = transport->context->rdp; + + status = rdp->update->io->Write(rdp->context, Stream_Buffer(s), Stream_GetPosition(s)); + + if (status > 0) + transport->written += status; fail: Stream_Release(s); return status; @@ -1002,8 +1146,9 @@ int transport_check_fds(rdpTransport* transport) int status; int recv_status; wStream* received; - UINT64 now = GetTickCount64(); - UINT64 dueDate = 0; +// UINT64 now = GetTickCount64(); +// UINT64 dueDate = 0; +// WLog_Print(transport->log, WLOG_DEBUG, "transport_check_fds"); if (!transport) return -1; @@ -1015,43 +1160,44 @@ int transport_check_fds(rdpTransport* transport) return -1; } - dueDate = now + transport->settings->MaxTimeInCheckLoop; + // dueDate = now + transport->settings->MaxTimeInCheckLoop; - if (transport->haveMoreBytesToRead) +/* if (transport->haveMoreBytesToRead) { transport->haveMoreBytesToRead = FALSE; ResetEvent(transport->rereadEvent); - } + } */ - while (now < dueDate) +// while (now < dueDate) { if (freerdp_shall_disconnect(transport->context->instance)) { return -1; } - /** - * Note: transport_read_pdu tries to read one PDU from - * the transport layer. - * The ReceiveBuffer might have a position > 0 in case of a non blocking - * transport. If transport_read_pdu returns 0 the pdu couldn't be read at - * this point. - * Note that transport->ReceiveBuffer is replaced after each iteration - * of this loop with a fresh stream instance from a pool. - */ - if ((status = transport_read_pdu(transport, transport->ReceiveBuffer)) <= 0) + if ((status = transport_read_layer_bytes(transport, transport->ReceiveBuffer, + transport->NextPDUBytesLeft ? transport->NextPDUBytesLeft : 1)) <= 0) { if (status < 0) WLog_Print(transport->log, WLOG_DEBUG, - "transport_check_fds: transport_read_pdu() - %i", status); + "transport_check_fds: transport_read_layer_bytes() - %i", status); + + return status; + } + if ((status = transport_handle_pdu(transport, transport->ReceiveBuffer, + &transport->NextPDUBytesLeft)) <= 0) + { + if (status < 0) + WLog_Print(transport->log, WLOG_DEBUG, + "transport_check_fds: transport_handle_pdu() - %i", status); return status; } received = transport->ReceiveBuffer; - if (!(transport->ReceiveBuffer = StreamPool_Take(transport->ReceivePool, 0))) - return -1; +/* if (!(transport->ReceiveBuffer = StreamPool_Take(transport->ReceivePool, 0))) + return -1; */ /** * status: @@ -1060,7 +1206,9 @@ int transport_check_fds(rdpTransport* transport) * 1: redirection */ recv_status = transport->ReceiveCallback(transport, received, transport->ReceiveExtra); - Stream_Release(received); + + if (!transport_prepare_stream(transport, received)) + return -1; /* session redirection or activation */ if (recv_status == 1 || recv_status == 2) @@ -1075,14 +1223,14 @@ int transport_check_fds(rdpTransport* transport) return -1; } - now = GetTickCount64(); + //now = GetTickCount64(); } - if (now >= dueDate) - { - SetEvent(transport->rereadEvent); - transport->haveMoreBytesToRead = TRUE; - } +/* if (now >= dueDate) + { */ +// SetEvent(transport->rereadEvent); +// transport->haveMoreBytesToRead = TRUE; +// } return 0; } diff --git a/libfreerdp/core/transport.h b/libfreerdp/core/transport.h index 944f5ce30ded1..feb63b35af615 100644 --- a/libfreerdp/core/transport.h +++ b/libfreerdp/core/transport.h @@ -65,6 +65,7 @@ struct rdp_transport rdpSettings* settings; void* ReceiveExtra; wStream* ReceiveBuffer; + SSIZE_T NextPDUBytesLeft; TransportRecv ReceiveCallback; wStreamPool* ReceivePool; HANDLE connectedEvent; @@ -92,6 +93,7 @@ FREERDP_LOCAL BOOL transport_accept_tls(rdpTransport* transport); FREERDP_LOCAL BOOL transport_accept_nla(rdpTransport* transport); FREERDP_LOCAL int transport_read_pdu(rdpTransport* transport, wStream* s); +FREERDP_LOCAL int transport_handle_pdu(rdpTransport* transport, wStream* s, SSIZE_T *left_to_read); FREERDP_LOCAL int transport_write(rdpTransport* transport, wStream* s); FREERDP_LOCAL void transport_get_fds(rdpTransport* transport, void** rfds, int* rcount); @@ -112,4 +114,6 @@ FREERDP_LOCAL int transport_receive_pool_return(rdpTransport* transport, wStream FREERDP_LOCAL rdpTransport* transport_new(rdpContext* context); FREERDP_LOCAL void transport_free(rdpTransport* transport); +FREERDP_LOCAL void transport_register_default_io_callbacks(rdpUpdate* update); + #endif /* FREERDP_LIB_CORE_TRANSPORT_H */ diff --git a/libfreerdp/core/update.c b/libfreerdp/core/update.c index ebb82fc2df7ea..780a0eb9851b4 100644 --- a/libfreerdp/core/update.c +++ b/libfreerdp/core/update.c @@ -2936,6 +2936,11 @@ rdpUpdate* update_new(rdpRdp* rdp) if (!update->window) goto fail; + update->io = (rdpIoUpdate*)calloc(1, sizeof(rdpIoUpdate)); + + if (!update->io) + goto fail; + deleteList = &(update->altsec->create_offscreen_bitmap.deleteList); deleteList->sIndices = 64; deleteList->indices = calloc(deleteList->sIndices, 2); @@ -2978,12 +2983,15 @@ void update_free(rdpUpdate* update) } free(update->secondary); + update->secondary = NULL; free(update->altsec); + update->altsec = NULL; - if (update->window) - { - free(update->window); - } + free(update->window); + update->window = NULL; + + free(update->io); + update->io = NULL; MessageQueue_Free(update->queue); DeleteCriticalSection(&update->mux);