Skip to content

Commit

Permalink
Make attachWorker return a promise by default
Browse files Browse the repository at this point in the history
Last week, we've seen an issue on some Samsung and LG TVs when relying
on the RxPlayer new experimental `MULTI_THREAD` feature due to specific
opcodes found in our WebAssembly files which were not compatible with
some of those TVs' browser.

Though we isolated and fixed the issue in #1372, it might be better to
also find a longer term solution to rollback the `MULTI_THREAD` feature
when an issue is detected with it preventing us from playing.

This could be done in several ways, from throwing errors, to new events,
to just return a rejecting Promise in the `attachWorker` method.

I chose to go with the latter of those solutions now because it appears
logical API-wise and implementation-wise to have that method return a
Promise which resolves only if we're able to communicate with a
WebWorker (and reject either if the browser does not support it, if a
security policy prevent from running the worker, if the request for the
worker's code fail or if the code evualation itself fails).

I've also added a specialized error just for that API to give more
context about what failed (missing feature? etc.).

I was afraid that relying on this new Promise to indicate an issue at
WebAssembly-compilation-time for our MPD parser would bother us in the
future if we ever add other WebAssembly modules (e.g. a smooth parser),
which could also independently fail (should we reject the Promise when
either compilation fails? Even if we could theoretically still play DASH
contents? How would we mix this way with a potentially lazy-loading of
features where we wouldn't be compiling right away? and so on...), but
after exposing all the potential future paths I could see this
`MULTI_THREAD` feature taking, I was able to find an adapted solution
still compatible with returning a Promise on the `attachWorker` API.

I also tried to automatically fallback from a "multithread mode" to the
regular monothread one inside the RxPlayer but doing this was complex.
So for now, if `attachWorker` fails, the RxPlayer will remove the worker
from its state (new `loadVideo` calls won't depend on it) but it is the
responsibility of the application to reload if a content was loaded in
"multithread mode" was loaded in the meantime.

If an application doesn't want to handle that supplementary complexity,
it can just await the Promise returned by `attachWorker` before loading
the first content (and catching eventual errors). As the RxPlayer
automatically removes the worker if its initialization fails, this
will lead to automatically fallback on main thread. At the cost of some
time compared to load and initialize the worker parallely.
  • Loading branch information
peaBerberian committed Feb 5, 2024
1 parent bec07b3 commit 77a389a
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 44 deletions.
119 changes: 81 additions & 38 deletions src/core/api/public_api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
IErrorType,
MediaError,
} from "../../errors";
import WorkerInitializationError from "../../errors/worker_initialization_error";
import features, {
addFeatures,
IFeature,
Expand All @@ -54,7 +55,11 @@ import Manifest, {
ManifestMetadataFormat,
createRepresentationFilterFromFnString,
} from "../../manifest";
import { MainThreadMessageType } from "../../multithread_types";
import {
IWorkerMessage,
MainThreadMessageType,
WorkerMessageType,
} from "../../multithread_types";
import {
IAudioRepresentation,
IAudioRepresentationsSwitchingMode,
Expand Down Expand Up @@ -435,50 +440,88 @@ class Player extends EventEmitter<IPublicAPIEvent> {
*/
public attachWorker(
workerSettings : IWorkerSettings
) : void {
if (!hasWebassembly) {
log.warn("API: Cannot rely on a WebWorker: WebAssembly unavailable");
} else {
if (typeof workerSettings.workerUrl === "string") {
this._priv_worker = new Worker(workerSettings.workerUrl);
) : Promise<void> {
return new Promise((res, rej) => {
if (typeof Worker !== "function") {
log.warn("API: Cannot rely on a WebWorker: Worker API unavailable");
return rej(new WorkerInitializationError("INCOMPATIBLE_ERROR",
"Worker unavailable"));
} else if (!hasWebassembly) {
log.warn("API: Cannot rely on a WebWorker: WebAssembly unavailable");
return rej(new WorkerInitializationError("INCOMPATIBLE_ERROR",
"WebAssembly unavailable"));
} else {
const blobUrl = URL.createObjectURL(workerSettings.workerUrl);
this._priv_worker = new Worker(blobUrl);
URL.revokeObjectURL(blobUrl);
}

this._priv_worker.onerror = (evt: ErrorEvent) => {
this._priv_worker = null;
log.error("Unexpected worker error",
evt.error instanceof Error ? evt.error : undefined);
};
log.debug("---> Sending To Worker:", MainThreadMessageType.Init);
this._priv_worker.postMessage({
type: MainThreadMessageType.Init,
value: {
dashWasmUrl: workerSettings.dashWasmUrl,
logLevel: log.getLevel(),
sendBackLogs: isDebugModeEnabled(),
date: Date.now(),
timestamp: getMonotonicTimeStamp(),
hasVideo: this.videoElement?.nodeName.toLowerCase() === "video",
hasMseInWorker,
},
});
log.addEventListener("onLogLevelChange", (logLevel) => {
if (this._priv_worker === null) {
return;
if (typeof workerSettings.workerUrl === "string") {
this._priv_worker = new Worker(workerSettings.workerUrl);
} else {
const blobUrl = URL.createObjectURL(workerSettings.workerUrl);
this._priv_worker = new Worker(blobUrl);
URL.revokeObjectURL(blobUrl);
}
log.debug("---> Sending To Worker:", MainThreadMessageType.LogLevelUpdate);

this._priv_worker.onerror = (evt: ErrorEvent) => {
if (this._priv_worker !== null) {
this._priv_worker.terminate();
this._priv_worker = null;
}
log.error("API: Unexpected worker error",
evt.error instanceof Error ? evt.error : undefined);
rej(new WorkerInitializationError("UNKNOWN_ERROR",
"Unexpected Worker \"error\" event"));
};
const handleInitMessages = (msg: MessageEvent) => {
const msgData = msg.data as unknown as IWorkerMessage;
if (msgData.type === WorkerMessageType.InitError) {
log.warn("API: Processing InitError worker message: detaching worker");
if (this._priv_worker !== null) {
this._priv_worker.removeEventListener("message", handleInitMessages);
this._priv_worker.terminate();
this._priv_worker = null;
}
rej(
new WorkerInitializationError(
"SETUP_ERROR",
"Worker parser initialization failed: " + msgData.value.errorMessage
)
);
} else if (msgData.type === WorkerMessageType.InitSuccess) {
log.info("API: InitSuccess received from worker.");
if (this._priv_worker !== null) {
this._priv_worker.removeEventListener("message", handleInitMessages);
}
res();
}
};
this._priv_worker.addEventListener("message", handleInitMessages);

log.debug("---> Sending To Worker:", MainThreadMessageType.Init);
this._priv_worker.postMessage({
type: MainThreadMessageType.LogLevelUpdate,
type: MainThreadMessageType.Init,
value: {
logLevel,
dashWasmUrl: workerSettings.dashWasmUrl,
logLevel: log.getLevel(),
sendBackLogs: isDebugModeEnabled(),
date: Date.now(),
timestamp: getMonotonicTimeStamp(),
hasVideo: this.videoElement?.nodeName.toLowerCase() === "video",
hasMseInWorker,
},
});
}, this._destroyCanceller.signal);
}
log.addEventListener("onLogLevelChange", (logLevel) => {
if (this._priv_worker === null) {
return;
}
log.debug("---> Sending To Worker:", MainThreadMessageType.LogLevelUpdate);
this._priv_worker.postMessage({
type: MainThreadMessageType.LogLevelUpdate,
value: {
logLevel,
sendBackLogs: isDebugModeEnabled(),
},
});
}, this._destroyCanceller.signal);
}
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,11 @@ export default class MultiThreadContentInitializer extends ContentInitializer {
break;
}

case WorkerMessageType.InitSuccess:
case WorkerMessageType.InitError:
// Should already be handled by the API
break;

default:
assertUnreachable(msgData);
}
Expand Down
20 changes: 14 additions & 6 deletions src/core/init/multithread/worker/worker_portal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,20 @@ export default function initializeWorkerPortal() {
const diffWorker = Date.now() - performance.now();
mainThreadTimestampDiff.setValueIfChanged(diffWorker - diffMain);
updateLoggerLevel(msg.value.logLevel, msg.value.sendBackLogs);
dashWasmParser.initialize({ wasmUrl: msg.value.dashWasmUrl }).catch((err) => {
const error = err instanceof Error ?
err.toString() :
"Unknown Error";
log.error("Worker: Could not initialize DASH_WASM parser", error);
});
dashWasmParser.initialize({ wasmUrl: msg.value.dashWasmUrl }).then(
() => {
sendMessage({ type: WorkerMessageType.InitSuccess,
value: null });
}, (err) => {
const error = err instanceof Error ?
err.toString() :
"Unknown Error";
log.error("Worker: Could not initialize DASH_WASM parser", error);
sendMessage({ type: WorkerMessageType.InitError,
value: { errorMessage: error,
kind: "dashWasmInitialization" } });

});

if (!msg.value.hasVideo || msg.value.hasMseInWorker) {
contentPreparer.disposeCurrentContent();
Expand Down
36 changes: 36 additions & 0 deletions src/errors/worker_initialization_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import errorMessage from "./error_message";

type IWorkerInitializationErrorCode = "UNKNOWN_ERROR" |
"SETUP_ERROR" |
"INCOMPATIBLE_ERROR";

/**
* Error linked to the WebWorker initialization.
*
* @class WorkerInitializationError
* @extends Error
*/
export default class WorkerInitializationError extends Error {
public readonly name : "WorkerInitializationError";
public readonly type : "WORKER_INITIALIZATION_ERROR";
public readonly message : string;
public readonly code : IWorkerInitializationErrorCode;

/**
* @param {string} code
* @param {string} message
*/
constructor(
code : IWorkerInitializationErrorCode,
message : string
) {
super();
// @see https://stackoverflow.com/questions/41102060/typescript-extending-error-class
Object.setPrototypeOf(this, WorkerInitializationError.prototype);

this.name = "WorkerInitializationError";
this.type = "WORKER_INITIALIZATION_ERROR";
this.code = code;
this.message = errorMessage(this.code, message);
}
}
42 changes: 42 additions & 0 deletions src/multithread_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,44 @@ export type ISentError = ISerializedNetworkError |
ISerializedEncryptedMediaError |
ISerializedOtherError;

/**
* Message sent by the WebWorker when its initialization, started implicitely
* as soon as the `new Worker` call was made for it, has finished and succeeded.
*
* Once that message has been received, you can ensure that no
* `IInitErrorWorkerMessage` will ever be received for the same worker.
*
* Note that receiving this message is not a requirement before preparing and
* loading a content, both initialization and content loading can be started in
* parallel.
*/
export interface IInitSuccessWorkerMessage {
type: WorkerMessageType.InitSuccess;
value: null;
}

/**
* Message sent by the WebWorker when its initialization, started implicitely
* as soon as the `new Worker` call was made for it, has finished and failed.
*
* Once that message has been received, you can ensure that no
* `IInitErrorWorkerMessage` will ever be received for the same worker.
*
* Note that you may received this message while preparing and/or loading a
* content, both initialization and content loading can be started in
* parallel.
* As such, this message may be coupled with a content error.
*/
export interface IInitErrorWorkerMessage {
type: WorkerMessageType.InitError;
value: {
/** A string describing the error encountered. */
errorMessage: string;

kind: "dashWasmInitialization";
};
}

export interface INeedsBufferFlushWorkerMessage {
type: WorkerMessageType.NeedsBufferFlush;
contentId: string;
Expand Down Expand Up @@ -883,6 +921,8 @@ export const enum WorkerMessageType {
EndOfStream = "end-of-stream",
Error = "error",
InbandEvent = "inband-event",
InitError = "init-error",
InitSuccess = "init-success",
InterruptEndOfStream = "stop-end-of-stream",
InterruptMediaSourceDurationUpdate = "stop-media-source-duration",
LockedStream = "locked-stream",
Expand Down Expand Up @@ -921,6 +961,8 @@ export type IWorkerMessage = IAbortBufferWorkerMessage |
IEndOfStreamWorkerMessage |
IErrorWorkerMessage |
IInbandEventWorkerMessage |
IInitSuccessWorkerMessage |
IInitErrorWorkerMessage |
IInterruptMediaSourceDurationWorkerMessage |
ILockedStreamWorkerMessage |
ILogMessageWorkerMessage |
Expand Down

0 comments on commit 77a389a

Please sign in to comment.