Skip to content

Commit

Permalink
refactor: Load native module dynamically. So the in-compatible electr…
Browse files Browse the repository at this point in the history
…on won't break directly
  • Loading branch information
ci010 committed Apr 20, 2024
1 parent a21a8f8 commit 64a9d15
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 44 deletions.
17 changes: 17 additions & 0 deletions xmcl-runtime/peer/NodeDataChannel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { NativeModuleLoader } from '~/util/NativeModule'
import { dependencies } from '../package.json'

type NodeDataChannel = typeof import('node-datachannel')

const version = dependencies['node-datachannel']
const os = process.platform
const arch = process.arch
const url = `https://github.com/murat-dogan/node-datachannel/releases/download/v${version}/node-datachannel-v${version}-napi-v8-${os}-${arch}.tar.gz`

export const NodeDataChannelModule = new NativeModuleLoader('node_datachannel.node', () => [url, url], (_, binding) => {
if (!binding) {
// eslint-disable-next-line @typescript-eslint/no-var-requires
return require('node-datachannel') as NodeDataChannel
}
return binding as NodeDataChannel
})
45 changes: 25 additions & 20 deletions xmcl-runtime/peer/PeerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { AbortableTask, BaseTask } from '@xmcl/task'
import { randomUUID } from 'crypto'
import { createWriteStream } from 'fs'
import { ensureFile } from 'fs-extra'
import { IceServer, initLogger } from 'node-datachannel'
import type { IceServer } from 'node-datachannel'
import { join } from 'path'
import { Readable } from 'stream'
import { pipeline } from 'stream/promises'
Expand All @@ -24,6 +24,7 @@ import { PeerSession } from './connection'
import { mapLocalPort, parseCandidate } from './mapAndGetPortCanidate'
import { MessageShareManifest } from './messages/download'
import { MessageLan } from './messages/lan'
import { NodeDataChannelModule } from './NodeDataChannel'

const pBrotliDecompress = promisify(brotliDecompress)
const pBrotliCompress = promisify(brotliCompress)
Expand Down Expand Up @@ -79,27 +80,31 @@ export class PeerService extends StatefulService<PeerState> implements IPeerServ
this.discoverV6.destroy()
})

NodeDataChannelModule.init(this.getAppDataPath())
const logger = this.app.getLogger('wrtc', 'wrtc')
if (IS_DEV) {
const logger = this.app.getLogger('wrtc', 'wrtc')
initLogger('Verbose', (level, message) => {
if (level === 'Info' || level === 'Debug' || level === 'Verbose') {
logger.log(message)
} else if (level === 'Fatal' || level === 'Error') {
logger.warn(message)
} else if (level === 'Warning') {
logger.warn(message)
}
NodeDataChannelModule.getInstance().then(m => {
m.initLogger('Verbose', (level, message) => {
if (level === 'Info' || level === 'Debug' || level === 'Verbose') {
logger.log(message)
} else if (level === 'Fatal' || level === 'Error') {
logger.warn(message)
} else if (level === 'Warning') {
logger.warn(message)
}
})
})
} else {
const logger = this.app.getLogger('wrtc', 'wrtc')
initLogger('Info', (level, message) => {
if (level === 'Info' || level === 'Debug' || level === 'Verbose') {
logger.log(message)
} else if (level === 'Fatal' || level === 'Error') {
logger.warn(message)
} else if (level === 'Warning') {
logger.warn(message)
}
NodeDataChannelModule.getInstance().then(m => {
m.initLogger('Info', (level, message) => {
if (level === 'Info' || level === 'Debug' || level === 'Verbose') {
logger.log(message)
} else if (level === 'Fatal' || level === 'Error') {
logger.warn(message)
} else if (level === 'Warning') {
logger.warn(message)
}
})
})
}

Expand Down Expand Up @@ -262,7 +267,7 @@ export class PeerService extends StatefulService<PeerState> implements IPeerServ
const natService = this.natService
const privatePort = this.portCandidate

const conn = new PeerSession(sessionId, this.settings.allowTurn ? this.iceServers : [], {
const conn = await PeerSession.createPeerSession(sessionId, this.settings.allowTurn ? this.iceServers : [], {
onHeartbeat: (session, ping) => {
this.state.connectionPing({ id: session, ping })
},
Expand Down
32 changes: 20 additions & 12 deletions xmcl-runtime/peer/connection.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { createReadStream, existsSync } from 'fs'
import debounce from 'lodash.debounce'
import { createConnection } from 'net'
import { DataChannel, DescriptionType, IceServer, PeerConnection } from 'node-datachannel'
import type { DataChannel, DescriptionType, IceServer, PeerConnection } from 'node-datachannel'
import { join } from 'path'
import { Readable } from 'stream'
import { Logger } from '~/logger'
Expand All @@ -13,6 +13,7 @@ import { MessageIdentity, MessageIdentityEntry } from './messages/identity'
import { MessageLanEntry } from './messages/lan'
import { MessageEntry, MessageHandler, MessageType } from './messages/message'
import { iceServers as _iceServers } from './stun'
import { NodeDataChannelModule } from './NodeDataChannel'

const getRegistry = (entries: MessageEntry<any>[]) => {
const reg: Record<string, MessageHandler<any>> = {}
Expand All @@ -32,7 +33,6 @@ const handlers = getRegistry([
])

export class PeerSession {
readonly connection: PeerConnection
/**
* The basic communicate channel
*/
Expand All @@ -47,24 +47,32 @@ export class PeerSession {

public lastGameChannelId = undefined as undefined | number

constructor(
/**
* The session id
*/
readonly id: string,
readonly iceServers: (string | IceServer)[],
readonly host: PeerHost,
readonly logger: Logger,
static async createPeerSession(
id: string,
iceServers: (string | IceServer)[],
host: PeerHost,
logger: Logger,
portBegin?: number,
) {
this.connection = new PeerConnection(this.id, {
const { PeerConnection } = await NodeDataChannelModule.getInstance()
return new PeerSession(new PeerConnection(id, {
iceServers: [..._iceServers, ...iceServers],
iceTransportPolicy: 'all',
portRangeBegin: portBegin,
portRangeEnd: portBegin,
enableIceUdpMux: true,
})
}), id, host, logger)
}

constructor(
readonly connection: PeerConnection,
/**
* The session id
*/
readonly id: string,
readonly host: PeerHost,
readonly logger: Logger,
) {
const updateDescriptor = debounce(() => {
const description = this.description!
host.onDescriptorUpdate(this.id, description.sdp, description.type, this.candidates)
Expand Down
2 changes: 1 addition & 1 deletion xmcl-runtime/peer/messages/lan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { LanServerInfo } from '@xmcl/client'
import { createServer } from 'net'
import { defineMessage, MessageType } from './message'
import { ServerProxy } from '../ServerProxy'
import { DataChannelInitConfig } from 'node-datachannel'
import type { DataChannelInitConfig } from 'node-datachannel'
import { listen } from '~/util/server'

export const MessageLan: MessageType<LanServerInfo> = 'lan'
Expand Down
3 changes: 2 additions & 1 deletion xmcl-runtime/resource/core/ResourceContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import type { Database as SQLite } from 'better-sqlite3'

export interface ResourceContext {
readonly db: Kysely<Database>
readonly sqlite: SQLite

getSqlite(): Promise<SQLite>

readonly image: ImageStorage

Expand Down
14 changes: 6 additions & 8 deletions xmcl-runtime/resource/core/createResourceContext.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import SQLite from 'better-sqlite3'
import EventEmitter from 'events'
import { Kysely, KyselyPlugin, OperationNodeTransformer, ParseJSONResultsPlugin, PluginTransformQueryArgs, PluginTransformResultArgs, PrimitiveValueListNode, QueryResult, RootOperationNode, SqliteDialect, UnknownRow, ValueNode } from 'kysely'
import { ResourceContext } from './ResourceContext'
import { Database } from './schema'
import { ImageStorage } from '~/imageStore'
import { Logger } from '~/logger'
import { SQLiteModule } from '../sqlite'
import { ResourceContext } from './ResourceContext'
import { Database } from './schema'

class JSONPlugin implements KyselyPlugin {
#tranformer = new JSONTransformer()
Expand Down Expand Up @@ -39,11 +39,9 @@ class JSONTransformer extends OperationNodeTransformer {
}
}

export function createResourceContext(root: string, imageStore: ImageStorage, eventBus: EventEmitter, logger: Logger, delegates: Pick<ResourceContext, 'hash' | 'parse' | 'hashAndFileType'>) {
const sqlite = new SQLite(root, {
})
export function createResourceContext(imageStore: ImageStorage, eventBus: EventEmitter, logger: Logger, delegates: Pick<ResourceContext, 'hash' | 'parse' | 'hashAndFileType'>) {
const dialect = new SqliteDialect({
database: sqlite,
database: () => SQLiteModule.getInstance(),
})

// Database interface is passed to Kysely's constructor, and from now on, Kysely
Expand All @@ -62,7 +60,7 @@ export function createResourceContext(root: string, imageStore: ImageStorage, ev

const context: ResourceContext = {
db,
sqlite,
getSqlite: SQLiteModule.getInstance,
image: imageStore,
hash: delegates.hash,
hashAndFileType: delegates.hashAndFileType,
Expand Down
8 changes: 6 additions & 2 deletions xmcl-runtime/resource/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { resolveDomain } from './core/resolveDomain'
import { tryPersistResource } from './core/tryPersistResource'
import { upsertMetadata } from './core/upsertMetadata'
import { watchResources } from './core/watchResources'
import { SQLiteModule } from './sqlite'
import { ResourceWorker, kResourceWorker } from './worker'

const EMPTY_RESOURCE_SHA1 = 'da39a3ee5e6b4b0d3255bfef95601890afd80709'
Expand Down Expand Up @@ -106,7 +107,9 @@ export class ResourceService extends AbstractService implements IResourceService
this.error(new Error('Fail to initialize resource-images folder', { cause: e }))
})
})
this.context = createResourceContext(this.getAppDataPath('resources.sqlite'), imageStore, this, this, worker)

SQLiteModule.init(this.getAppDataPath())
this.context = createResourceContext(imageStore, this, this, worker)

app.registryDisposer(async () => {
for (const watcher of Object.values(this.watchers)) {
Expand All @@ -117,7 +120,8 @@ export class ResourceService extends AbstractService implements IResourceService
}

async isResourceDatabaseOpened() {
return this.context.sqlite.open
const db = await this.context.getSqlite()
return db.open
}

async getResourceMetadataByUri(uri: string): Promise<ResourceMetadata[]> {
Expand Down
14 changes: 14 additions & 0 deletions xmcl-runtime/resource/sqlite.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import SQLite from 'better-sqlite3'
import { join } from 'path'
import { NativeModuleLoader } from '~/util/NativeModule'
import { dependencies } from '../package.json'

const version = dependencies['better-sqlite3']
const os = process.platform
const arch = process.arch
const modules = process.versions.modules
const url = `https://github.com/WiseLibs/better-sqlite3/releases/download/v${version}/better-sqlite3-v${version}-electron-v${modules}-${os}-${arch}.tar.gz`

export const SQLiteModule = new NativeModuleLoader('better_sqlite3.node', () => [url, url], (root, nativeBinding) => new SQLite(join(root, 'resources.sqlite'), {
nativeBinding,
}))
103 changes: 103 additions & 0 deletions xmcl-runtime/util/NativeModule.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { createPromiseSignal } from '@xmcl/runtime-api'
import { writeFile } from 'fs-extra'
import { unlink } from 'fs/promises'
import { join } from 'path'
import { PassThrough } from 'stream'
import { extract } from 'tar-stream'
import { stream } from 'undici'
import { createGunzip } from 'zlib'
import { AnyError } from '~/util/error'

export class NativeModuleLoader<T> {
#retryCount = 0
#signal = createPromiseSignal<T>()
#initPromise: Promise<void> | undefined

constructor(
readonly nodeFileName: string,
readonly getUrl: () => [string, string],
readonly loader: (root: string, mod: any) => T,
) { }

#tryResolve = async (root: string): Promise<void> => {
try {
const nativeModule = getDependencyIfExists(root, this.nodeFileName)
const result = this.loader(root, nativeModule)
this.#signal.resolve(result)
} catch {
if (this.#retryCount > 3) {
this.#signal.reject(new AnyError('NativeInitError', 'Failed to load ' + this.nodeFileName))
return
}
await downloadNative(root, ...this.getUrl(), this.nodeFileName)
this.#retryCount++
return this.#tryResolve(root)
}
}

init(root: string) {
if (this.#initPromise) return this.#initPromise
this.#initPromise = this.#tryResolve(root)
}

get retryCount() {
return this.#retryCount
}

getInstance = () => {
return this.#signal.promise
}
}

function getDependencyIfExists(dir: string, fileName: string) {
const dest = join(dir, fileName)
try {
// eslint-disable-next-line @typescript-eslint/no-var-requires
const mod = require(dest)
return mod
} catch {
unlink(dest).catch(() => { })
return undefined
}
}

async function downloadNative(dir: string, primary: string, fallback: string, fileName: string) {
// Download the tarball and extract it to the specified directory
const dest = join(dir, fileName)

const unzip = createGunzip()
const extractStream = extract()
unzip.pipe(extractStream)

const download = (u: string) => stream(u, { opaque: { unzip }, method: 'GET' }, ({ opaque, headers, statusCode }) => {
if (statusCode === 200) return (opaque as any).unzip
Object.assign(opaque as any, {
failed: true,
headers,
statusCode,
})
return new PassThrough()
})

const { opaque } = await download(primary)

if ((opaque as any).failed) {
const { opaque } = await download(fallback)

if ((opaque as any).failed) {
throw new AnyError('NativeDownloadError', 'Failed to download ' + fileName)
}
}

for await (const e of extractStream) {
if (e.header.name.endsWith(fileName)) {
const bufs = [] as Buffer[]
for await (const d of e) {
bufs.push(d)
}
await writeFile(dest, Buffer.concat(bufs))
}
}

return dest
}

0 comments on commit 64a9d15

Please sign in to comment.