Skip to content

Commit 69de595

Browse files
dustmopreconbot
authored andcommitted
fix(windows): Asynchronous callbacks for reading and writing (#1328)
Instead of ReadFile and WriteFile, which block and transfer data synchronously, use ReadFileEx and WriteFileEx, which both allow async callbacks. In addition, change how timeouts are used for ReadFile*, using an unlimited timeout for the first byte, and no timeout for the rest of the data in the input buffer. This removes the need to poll entirely, while still retrieving all data available in the input buffer. In both cases, the I/O operations happen in their own threads, since Windows requires IOCompletion callbacks to wait for their calling thread to be in an "alertable wait state". Fixes #1221
1 parent c7a3be4 commit 69de595

File tree

2 files changed

+165
-113
lines changed

2 files changed

+165
-113
lines changed

src/serialport_win.cpp

Lines changed: 155 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ void EIO_Open(uv_work_t* req) {
8989
dcb.Parity = NOPARITY;
9090
dcb.ByteSize = 8;
9191
dcb.StopBits = ONESTOPBIT;
92-
93-
92+
93+
9494
dcb.fOutxDsrFlow = FALSE;
9595
dcb.fOutxCtsFlow = FALSE;
9696

@@ -152,11 +152,10 @@ void EIO_Open(uv_work_t* req) {
152152
return;
153153
}
154154

155-
// Set the timeouts for read and write operations read operation is to return
156-
// immediately with the bytes that have already been received, even if no bytes
157-
// have been received.
158-
COMMTIMEOUTS commTimeouts = {0};
159-
commTimeouts.ReadIntervalTimeout = MAXDWORD; // Never timeout
155+
// Set the timeouts for read and write operations.
156+
// Read operation will wait for at least 1 byte to be received.
157+
COMMTIMEOUTS commTimeouts = {};
158+
commTimeouts.ReadIntervalTimeout = 0; // Never timeout, always wait for data.
160159
commTimeouts.ReadTotalTimeoutMultiplier = 0; // Do not allow big read timeout when big read buffer used
161160
commTimeouts.ReadTotalTimeoutConstant = 0; // Total read timeout (period of read loop)
162161
commTimeouts.WriteTotalTimeoutConstant = 0; // Const part of write timeout
@@ -291,58 +290,63 @@ NAN_METHOD(Write) {
291290
baton->bufferLength = bufferLength;
292291
baton->offset = 0;
293292
baton->callback.Reset(info[2].As<v8::Function>());
293+
baton->complete = false;
294+
// WriteFileEx requires a thread that can block. Create a new thread to
295+
// run the write operation, saving the handle so it can be deallocated later.
296+
baton->hThread = CreateThread(NULL, 0, WriteThread, baton, 0, NULL);
297+
}
294298

295-
uv_work_t* req = new uv_work_t();
296-
req->data = baton;
297-
298-
uv_queue_work(uv_default_loop(), req, EIO_Write, (uv_after_work_cb)EIO_AfterWrite);
299+
void __stdcall WriteIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
300+
WriteBaton* baton = static_cast<WriteBaton*>(ov->hEvent);
301+
DWORD bytesWritten;
302+
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesWritten, TRUE)) {
303+
errorCode = GetLastError();
304+
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", errorCode, baton->errorString);
305+
baton->complete = true;
306+
return;
307+
}
308+
if (bytesWritten) {
309+
baton->offset += bytesWritten;
310+
if (baton->offset >= baton->bufferLength) {
311+
baton->complete = true;
312+
}
313+
}
299314
}
300315

301-
void EIO_Write(uv_work_t* req) {
302-
WriteBaton* data = static_cast<WriteBaton*>(req->data);
303-
data->result = 0;
304-
305-
do {
306-
OVERLAPPED ov = {0};
307-
// Event used by GetOverlappedResult(..., TRUE) to wait for outgoing data or timeout
308-
// Event MUST be used if program has several simultaneous asynchronous operations
309-
// on the same handle (i.e. ReadFile and WriteFile)
310-
ov.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
311-
312-
// Start write operation - synchronous or asynchronous
313-
DWORD bytesWritten = 0;
314-
if (!WriteFile((HANDLE)data->fd, data->bufferData, static_cast<DWORD>(data->bufferLength), &bytesWritten, &ov)) {
315-
DWORD lastError = GetLastError();
316-
if (lastError != ERROR_IO_PENDING) {
317-
// Write operation error
318-
ErrorCodeToString("Writing to COM port (WriteFile)", lastError, data->errorString);
319-
CloseHandle(ov.hEvent);
320-
return;
321-
}
322-
// Write operation is completing asynchronously
323-
// We MUST wait for the operation completion before deallocation of OVERLAPPED struct
324-
// or write data buffer
325-
326-
// block for async write operation completion
327-
bytesWritten = 0;
328-
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesWritten, TRUE)) {
329-
// Write operation error
330-
DWORD lastError = GetLastError();
331-
ErrorCodeToString("Writing to COM port (GetOverlappedResult)", lastError, data->errorString);
332-
CloseHandle(ov.hEvent);
333-
return;
334-
}
316+
DWORD __stdcall WriteThread(LPVOID param) {
317+
WriteBaton* baton = static_cast<WriteBaton*>(param);
318+
319+
OVERLAPPED* ov = new OVERLAPPED;
320+
memset(ov, 0, sizeof(ov));
321+
ov->hEvent = static_cast<void*>(baton);
322+
323+
while (!baton->complete) {
324+
char* offsetPtr = baton->bufferData + baton->offset;
325+
// WriteFileEx requires calling GetLastError even upon success. Clear the error beforehand.
326+
SetLastError(0);
327+
WriteFileEx((HANDLE)baton->fd, offsetPtr, static_cast<DWORD>(baton->bufferLength), ov, WriteIOCompletion);
328+
// Error codes when call is successful, such as ERROR_MORE_DATA.
329+
DWORD lastError = GetLastError();
330+
if (lastError != ERROR_SUCCESS) {
331+
ErrorCodeToString("Writing to COM port (WriteFileEx)", lastError, baton->errorString);
332+
break;
335333
}
336-
// Write operation completed synchronously
337-
data->result = bytesWritten;
338-
data->offset += data->result;
339-
CloseHandle(ov.hEvent);
340-
} while (data->bufferLength > data->offset);
334+
// IOCompletion routine is only called once this thread is in an alertable wait state.
335+
SleepEx(INFINITE, TRUE);
336+
}
337+
delete ov;
338+
// Signal the main thread to run the callback.
339+
uv_async_t* async = new uv_async_t;
340+
uv_async_init(uv_default_loop(), async, EIO_AfterWrite);
341+
async->data = baton;
342+
uv_async_send(async);
343+
ExitThread(0);
341344
}
342345

343-
void EIO_AfterWrite(uv_work_t* req) {
346+
void EIO_AfterWrite(uv_async_t* req) {
344347
Nan::HandleScope scope;
345348
WriteBaton* baton = static_cast<WriteBaton*>(req->data);
349+
WaitForSingleObject(baton->hThread, INFINITE);
346350
delete req;
347351

348352
v8::Local<v8::Value> argv[1];
@@ -404,81 +408,121 @@ NAN_METHOD(Read) {
404408
baton->bufferLength = bufferLength;
405409
baton->bufferData = node::Buffer::Data(buffer);
406410
baton->callback.Reset(info[4].As<v8::Function>());
407-
408-
uv_work_t* req = new uv_work_t();
409-
req->data = baton;
410-
uv_queue_work(uv_default_loop(), req, EIO_Read, (uv_after_work_cb)EIO_AfterRead);
411+
baton->complete = false;
412+
// ReadFileEx requires a thread that can block. Create a new thread to
413+
// run the read operation, saving the handle so it can be deallocated later.
414+
baton->hThread = CreateThread(NULL, 0, ReadThread, baton, 0, NULL);
411415
}
412416

413-
void EIO_Read(uv_work_t* req) {
414-
ReadBaton* data = static_cast<ReadBaton*>(req->data);
415-
data->bytesRead = 0;
416-
int errorCode = ERROR_SUCCESS;
417+
void __stdcall ReadIOCompletion(DWORD errorCode, DWORD bytesTransferred, OVERLAPPED* ov) {
418+
ReadBaton* baton = static_cast<ReadBaton*>(ov->hEvent);
417419

418-
char* offsetPtr = data->bufferData;
419-
offsetPtr += data->offset;
420+
if (errorCode) {
421+
ErrorCodeToString("Reading from COM port (ReadIOCompletion)", errorCode, baton->errorString);
422+
baton->complete = true;
423+
return;
424+
}
420425

421-
// Event used by GetOverlappedResult(..., TRUE) to wait for incoming data or timeout
422-
// Event MUST be used if program has several simultaneous asynchronous operations
423-
// on the same handle (i.e. ReadFile and WriteFile)
424-
HANDLE hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
426+
DWORD lastError;
427+
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
428+
lastError = GetLastError();
429+
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
430+
baton->complete = true;
431+
return;
432+
}
433+
if (bytesTransferred) {
434+
baton->bytesRead += bytesTransferred;
435+
baton->offset += bytesTransferred;
436+
}
425437

426-
while (true) {
427-
OVERLAPPED ov = {0};
428-
ov.hEvent = hEvent;
429-
430-
// Start read operation - synchrounous or asynchronous
431-
DWORD bytesReadSync = 0;
432-
if (!ReadFile((HANDLE)data->fd, offsetPtr, data->bytesToRead, &bytesReadSync, &ov)) {
433-
errorCode = GetLastError();
434-
if (errorCode != ERROR_IO_PENDING) {
435-
// Read operation error
436-
if (errorCode == ERROR_OPERATION_ABORTED) {
437-
} else {
438-
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, data->errorString);
439-
CloseHandle(hEvent);
440-
return;
441-
}
442-
break;
443-
}
438+
// ReadFileEx and GetOverlappedResult retrieved only 1 byte. Read any additional data in the input
439+
// buffer. Set the timeout to MAXDWORD in order to disable timeouts, so the read operation will
440+
// return immediately no matter how much data is available.
441+
COMMTIMEOUTS commTimeouts = {};
442+
commTimeouts.ReadIntervalTimeout = MAXDWORD;
443+
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
444+
lastError = GetLastError();
445+
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
446+
baton->complete = true;
447+
return;
448+
}
444449

445-
// Read operation is asynchronous and is pending
446-
// We MUST wait for operation completion before deallocation of OVERLAPPED struct
447-
// or read data buffer
448-
449-
// Wait for async read operation completion or timeout
450-
DWORD bytesReadAsync = 0;
451-
if (!GetOverlappedResult((HANDLE)data->fd, &ov, &bytesReadAsync, TRUE)) {
452-
// Read operation error
453-
errorCode = GetLastError();
454-
if (errorCode == ERROR_OPERATION_ABORTED) {
455-
} else {
456-
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", errorCode, data->errorString);
457-
CloseHandle(hEvent);
458-
return;
459-
}
460-
break;
461-
} else {
462-
// Read operation completed asynchronously
463-
data->bytesRead = bytesReadAsync;
464-
}
465-
} else {
466-
// Read operation completed synchronously
467-
data->bytesRead = bytesReadSync;
450+
// Store additional data after whatever data has already been read.
451+
char* offsetPtr = baton->bufferData + baton->offset;
452+
453+
// ReadFile, unlike ReadFileEx, needs an event in the overlapped structure.
454+
ov->hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
455+
if (!ReadFile((HANDLE)baton->fd, offsetPtr, baton->bytesToRead, &bytesTransferred, ov)) {
456+
errorCode = GetLastError();
457+
458+
if (errorCode != ERROR_IO_PENDING) {
459+
ErrorCodeToString("Reading from COM port (ReadFile)", errorCode, baton->errorString);
460+
baton->complete = true;
461+
CloseHandle(ov->hEvent);
462+
return;
468463
}
469464

470-
// Return data received if any
471-
if (data->bytesRead > 0) {
472-
break;
465+
if (!GetOverlappedResult((HANDLE)baton->fd, ov, &bytesTransferred, TRUE)) {
466+
lastError = GetLastError();
467+
ErrorCodeToString("Reading from COM port (GetOverlappedResult)", lastError, baton->errorString);
468+
baton->complete = true;
469+
CloseHandle(ov->hEvent);
470+
return;
473471
}
474472
}
473+
CloseHandle(ov->hEvent);
474+
475+
baton->bytesToRead -= bytesTransferred;
476+
baton->bytesRead += bytesTransferred;
477+
baton->complete = true;
478+
}
475479

476-
CloseHandle(hEvent);
480+
DWORD __stdcall ReadThread(LPVOID param) {
481+
ReadBaton* baton = static_cast<ReadBaton*>(param);
482+
DWORD lastError;
483+
484+
OVERLAPPED* ov = new OVERLAPPED;
485+
memset(ov, 0, sizeof(OVERLAPPED));
486+
ov->hEvent = static_cast<void*>(baton);
487+
488+
while (!baton->complete) {
489+
// Reset the read timeout to 0, so that it will block until more data arrives.
490+
COMMTIMEOUTS commTimeouts = {};
491+
commTimeouts.ReadIntervalTimeout = 0;
492+
if (!SetCommTimeouts((HANDLE)baton->fd, &commTimeouts)) {
493+
lastError = GetLastError();
494+
ErrorCodeToString("Setting COM timeout (SetCommTimeouts)", lastError, baton->errorString);
495+
break;
496+
}
497+
// ReadFileEx doesn't use overlapped's hEvent, so it is reserved for user data.
498+
ov->hEvent = static_cast<HANDLE>(baton);
499+
char* offsetPtr = baton->bufferData + baton->offset;
500+
// ReadFileEx requires calling GetLastError even upon success. Clear the error beforehand.
501+
SetLastError(0);
502+
// Only read 1 byte, so that the callback will be triggered once any data arrives.
503+
ReadFileEx((HANDLE)baton->fd, offsetPtr, 1, ov, ReadIOCompletion);
504+
// Error codes when call is successful, such as ERROR_MORE_DATA.
505+
lastError = GetLastError();
506+
if (lastError != ERROR_SUCCESS) {
507+
ErrorCodeToString("Reading from COM port (ReadFileEx)", lastError, baton->errorString);
508+
break;
509+
}
510+
// IOCompletion routine is only called once this thread is in an alertable wait state.
511+
SleepEx(INFINITE, TRUE);
512+
}
513+
delete ov;
514+
// Signal the main thread to run the callback.
515+
uv_async_t* async = new uv_async_t;
516+
uv_async_init(uv_default_loop(), async, EIO_AfterRead);
517+
async->data = baton;
518+
uv_async_send(async);
519+
ExitThread(0);
477520
}
478521

479-
void EIO_AfterRead(uv_work_t* req) {
522+
void EIO_AfterRead(uv_async_t* req) {
480523
Nan::HandleScope scope;
481524
ReadBaton* baton = static_cast<ReadBaton*>(req->data);
525+
WaitForSingleObject(baton->hThread, INFINITE);
482526
delete req;
483527

484528
v8::Local<v8::Value> argv[2];

src/serialport_win.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ struct WriteBaton {
1515
size_t bufferLength;
1616
size_t offset;
1717
size_t bytesWritten;
18+
void* hThread;
19+
bool complete;
1820
Nan::Persistent<v8::Object> buffer;
1921
Nan::Callback callback;
2022
int result;
@@ -23,7 +25,9 @@ struct WriteBaton {
2325

2426
NAN_METHOD(Write);
2527
void EIO_Write(uv_work_t* req);
26-
void EIO_AfterWrite(uv_work_t* req);
28+
void EIO_AfterWrite(uv_async_t* req);
29+
DWORD __stdcall WriteThread(LPVOID param);
30+
2731

2832
struct ReadBaton {
2933
int fd;
@@ -32,13 +36,17 @@ struct ReadBaton {
3236
size_t bytesRead;
3337
size_t bytesToRead;
3438
size_t offset;
39+
void* hThread;
40+
bool complete;
3541
char errorString[ERROR_STRING_SIZE];
3642
Nan::Callback callback;
3743
};
3844

3945
NAN_METHOD(Read);
4046
void EIO_Read(uv_work_t* req);
41-
void EIO_AfterRead(uv_work_t* req);
47+
void EIO_AfterRead(uv_async_t* req);
48+
DWORD __stdcall ReadThread(LPVOID param);
49+
4250

4351
NAN_METHOD(List);
4452
void EIO_List(uv_work_t* req);

0 commit comments

Comments
 (0)