Skip to content

Commit

Permalink
fix: Catch more unhandled promise rejections for EZSP adapter (#853)
Browse files Browse the repository at this point in the history
* Fix unhandled promise rejection in ezsp driver

* Other fixes that ended up working

* Fix CI with a slight workaround
  • Loading branch information
mikekap committed Jan 5, 2024
1 parent 23157e7 commit 33fa223
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 51 deletions.
2 changes: 2 additions & 0 deletions src/adapter/ezsp/driver/driver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,8 @@ export class Driver extends EventEmitter {
const frame = this.makeApsFrame(requestCmd as number, false);
const payload = this.makeZDOframe(requestCmd as number, {transId: frame.sequence, ...params});
const waiter = this.waitFor(networkAddress, responseCmd as number, frame.sequence).start();
// if the request takes longer than the timeout, avoid an unhandled promise rejection.
waiter.promise.catch(() => {});
const res = await this.request(networkAddress, frame, payload);
if (!res) {
debug.error(`zdoRequest error`);
Expand Down
4 changes: 3 additions & 1 deletion src/adapter/ezsp/driver/ezsp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ export class Ezsp extends EventEmitter {
}

public async connect(path: string, options: Record<string, number|boolean>): Promise<void> {
let lastError = null;
for (let i = 1; i < 5; i += 1) {
try {
await this.serialDriver.connect(path, options);
Expand All @@ -271,10 +272,11 @@ export class Ezsp extends EventEmitter {
debug.error(`Connection attempt ${i} error: ${error.stack}`);
await Wait(5000);
debug.log(`Next attempt ${i+1}`);
lastError = error;
}
}
if (!this.serialDriver.isInitialized()) {
throw new Error("Failure to connect");
throw new Error("Failure to connect", {cause: lastError});
}
if (WATCHDOG_WAKE_PERIOD) {
this.watchdogTimer = setInterval(
Expand Down
50 changes: 28 additions & 22 deletions src/adapter/ezsp/driver/uart.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,29 +83,35 @@ export class SerialDriver extends EventEmitter {
this.parser = new Parser();
this.serialPort.pipe(this.parser);
this.parser.on('parsed', this.onParsed.bind(this));

return new Promise((resolve, reject): void => {
this.serialPort.open(async (error): Promise<void> => {
if (error) {
this.initialized = false;
if (this.serialPort.isOpen) {
this.serialPort.close();
try {
await new Promise((resolve, reject): void => {
this.serialPort.open(async (error): Promise<void> => {
if (error) {
reject(new Error(`Error while opening serialport '${error}'`));
} else {
resolve(null);
}
reject(new Error(`Error while opening serialport '${error}'`));
} else {
debug('Serialport opened');
this.serialPort.once('close', this.onPortClose.bind(this));
this.serialPort.once('error', (error) => {
debug(`Serialport error: ${error}`);
});
// reset
await this.reset();
this.initialized = true;
this.emit('connected');
resolve();
}
});
});
});

debug('Serialport opened');
this.serialPort.once('close', this.onPortClose.bind(this));
this.serialPort.once('error', (error) => {
debug(`Serialport error: ${error}`);
});

// reset
await this.reset();
this.initialized = true;
this.emit('connected');
} catch (e) {
this.initialized = false;
if (this.serialPort.isOpen) {
this.serialPort.close();
}
throw e;
}

}

private async openSocketPort(path: string): Promise<void> {
Expand Down Expand Up @@ -196,7 +202,7 @@ export class SerialDriver extends EventEmitter {
case (data[0] === 0xC2):
debug(`<-- Error: ${data.toString('hex')}`);
// send reset
this.reset();
this.reset().catch((e) => debug(`Failed to reset: ${e}`));
break;
default:
debug("UNKNOWN FRAME RECEIVED: %r", data);
Expand Down
6 changes: 4 additions & 2 deletions src/controller/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,10 @@ class Controller extends events.EventEmitter {

// Zigbee 3 networks automatically close after max 255 seconds, keep network open.
this.permitJoinNetworkClosedTimer = setInterval(async (): Promise<void> => {
await this.adapter.permitJoin(254, !device ? null : device.networkAddress);
await this.greenPower.permitJoin(254, !device ? null : device.networkAddress);
await catcho(async () => {
await this.adapter.permitJoin(254, !device ? null : device.networkAddress);
await this.greenPower.permitJoin(254, !device ? null : device.networkAddress);
}, "Failed to keep permit join alive");
}, 200 * 1000);

if (typeof time === 'number') {
Expand Down
54 changes: 30 additions & 24 deletions src/utils/queue.ts
Original file line number Diff line number Diff line change
@@ -1,47 +1,53 @@
interface Job<T> {
interface Job {
key: string | number;
func: () => Promise<T>;
running: boolean;
resolve: (result: T) => void;
reject: (error: Error) => void;
start: () => void;
}

class Queue {
private jobs: Job<unknown>[];
private jobs: Job[];
private readonly concurrent: number;

constructor(concurrent = 1) {
this.jobs = [];
this.concurrent = concurrent;
}

public execute<T>(func: () => Promise<T>, key: string | number = null): Promise<T> {
return new Promise((resolve, reject): void => {
this.jobs.push({key, func, running: false, resolve, reject});
public async execute<T>(func: () => Promise<T>, key: string | number = null): Promise<T> {
const job : Job = {key, running: false, start: null};
// Minor optimization/workaround: various tests like the idea that a job that is
// immediately runnable is run without an event loop spin. This also helps with stack
// traces in some cases, so avoid an `await` if we can help it.
this.jobs.push(job);
if (this.getNext() !== job) {
await new Promise((resolve): void => {
job.start = (): void => {
job.running = true;
resolve(null);
};
this.executeNext();
});
} else {
job.running = true;
}

try {
return await func();
} finally {
this.jobs.splice(this.jobs.indexOf(job), 1);
this.executeNext();
});
}
}

private async executeNext(): Promise<void> {
private executeNext(): void {
const job = this.getNext();

if (job) {
job.running = true;

try {
const result = await job.func();
this.jobs.splice(this.jobs.indexOf(job), 1);
job.resolve(result);
this.executeNext();
} catch (error) {
this.jobs.splice(this.jobs.indexOf(job), 1);
job.reject(error);
this.executeNext();
}
job.start();
}
}

private getNext(): Job<unknown> {
private getNext(): Job {
if (this.jobs.filter((j) => j.running).length > (this.concurrent - 1)) {
return null;
}
Expand All @@ -66,4 +72,4 @@ class Queue {
}
}

export default Queue;
export default Queue;
7 changes: 5 additions & 2 deletions src/utils/waitress.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,13 @@ class Waitress<TPayload, TMatcher> {
const start = (): {promise: Promise<TPayload>; ID: number} => {
const waiter = this.waiters.get(ID);
if (waiter && !waiter.resolved && !waiter.timer) {
// Capture the stack trace from the caller of start()
const error = new Error();
Error.captureStackTrace(error);
waiter.timer = setTimeout((): void => {
const message = this.timeoutFormatter(matcher, timeout);
error.message = this.timeoutFormatter(matcher, timeout);
waiter.timedout = true;
waiter.reject(new Error(message));
waiter.reject(error);
}, timeout);
}

Expand Down

0 comments on commit 33fa223

Please sign in to comment.