From 236bbc6e5d02e42016290812318c1d62179cb6e1 Mon Sep 17 00:00:00 2001 From: Thomas Dupont Date: Fri, 13 May 2022 11:27:31 +0200 Subject: [PATCH] feat: initial proposal for multithreaded execution --- README.md | 11 +- RELEASE_NOTES.md | 5 + config/app/init/base/init.json | 45 +++++- config/app/init/default.json | 10 +- .../app/init/initialize-prefilled-root.json | 10 +- config/app/init/initialize-root.json | 10 +- config/app/init/initializers/workers.json | 11 ++ config/app/main/default.json | 7 +- config/app/variables/cli/cli.json | 10 ++ config/app/variables/resolver/resolver.json | 8 ++ config/identity/access/restricted.json | 2 +- config/identity/pod/dynamic.json | 4 +- config/util/variables/default.json | 5 + package-lock.json | 17 ++- package.json | 3 +- src/index.ts | 8 +- src/init/App.ts | 5 +- src/init/AppRunner.ts | 22 ++- src/init/ModuleVersionVerifier.ts | 7 +- src/init/cluster/ClusterManager.ts | 120 ++++++++++++++++ src/init/cluster/SingleThreaded.ts | 57 ++++++++ src/init/cluster/WorkerManager.ts | 20 +++ src/logging/Logger.ts | 34 +++-- src/logging/WinstonLoggerFactory.ts | 15 +- src/server/BaseHttpServerFactory.ts | 2 +- src/storage/accessors/InMemoryDataAccessor.ts | 3 +- src/util/PathUtil.ts | 8 ++ src/util/handlers/ProcessHandler.ts | 46 ++++++ src/util/locking/MemoryResourceLocker.ts | 8 +- test/integration/Config.ts | 1 + test/unit/init/App.test.ts | 10 +- test/unit/init/AppRunner.test.ts | 135 +++++++++++++++++- test/unit/init/cluster/ClusterManager.test.ts | 106 ++++++++++++++ test/unit/init/cluster/SingleThreaded.test.ts | 61 ++++++++ test/unit/init/cluster/WorkerManager.test.ts | 15 ++ test/unit/logging/LazyLoggerFactory.test.ts | 10 +- test/unit/logging/Logger.test.ts | 35 +++-- .../unit/logging/WinstonLoggerFactory.test.ts | 1 + .../unit/util/handlers/ProcessHandler.test.ts | 72 ++++++++++ .../util/locking/MemoryResourceLocker.test.ts | 18 --- 40 files changed, 880 insertions(+), 97 deletions(-) create mode 100644 config/app/init/initializers/workers.json create mode 100644 src/init/cluster/ClusterManager.ts create mode 100644 src/init/cluster/SingleThreaded.ts create mode 100644 src/init/cluster/WorkerManager.ts create mode 100644 src/util/handlers/ProcessHandler.ts create mode 100644 test/unit/init/cluster/ClusterManager.test.ts create mode 100644 test/unit/init/cluster/SingleThreaded.test.ts create mode 100644 test/unit/init/cluster/WorkerManager.test.ts create mode 100644 test/unit/util/handlers/ProcessHandler.test.ts diff --git a/README.md b/README.md index 84ef5761f7..cb067e4d63 100644 --- a/README.md +++ b/README.md @@ -116,8 +116,17 @@ to some commonly used settings: | `--sparqlEndpoint, -s` | | URL of the SPARQL endpoint, when using a quadstore-based configuration. | | `--showStackTrace, -t` | false | Enables detailed logging on error output. | | `--podConfigJson` | `./pod-config.json` | Path to the file that keeps track of dynamic Pod configurations. Only relevant when using `@css:config/dynamic.json`. | -| `--seededPodConfigJson` | | Path to the file that keeps track of seeded Pod configurations. | +| `--seededPodConfigJson`| | Path to the file that keeps track of seeded Pod configurations. | | `--mainModulePath, -m` | | Path from where Components.js will start its lookup when initializing configurations. | +| `--workers, -w` | `1` | Run in multithreaded mode using workers. Special values are `-1` (scale to `num_cores-1`), `0` (scale to `num_cores`) and 1 (singlethreaded). | + +### ๐Ÿ”€ Multithreading +The Community Solid Server can be started in multithreaded mode with any config. The config must only contain components that are threadsafe though. If a non-threadsafe component is used in multithreaded mode, the server will describe with an error which class is the culprit. + +```node +# Running multithreaded with autoscaling to number of logical cores minus 1 +npm start -- -c config/file.json -w -1 +``` ### ๐Ÿงถ Custom configurations More substantial changes to server behavior can be achieved diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 0ca8bfd8f7..758a84b662 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -6,6 +6,7 @@ - Components.js was upgraded to v5. If you have created an external component you should also upgrade to prevent warnings and conflicts. - A new FileSystemResourceLocker has been added. It allows for true threadsafe locking without external dependencies. +- The CSS can now run multithreaded with multiple workers, this is done with the `--workers` or `-w` flag. ### Data migration The following actions are required if you are upgrading from a v4 server and want to retain your data. @@ -30,6 +31,8 @@ The following changes are relevant for v3 custom configs that replaced certain f - `/ldp/metadata-parser/default.json` - `/storage/backend/*-quota-file.json` - `/storage/backend/quota/quota-file.json` +- The structure of the init configs has changed significantly to support worker threads. + - `/app/init/*` ### Interface changes These changes are relevant if you wrote custom modules for the server that depend on existing interfaces. @@ -37,6 +40,8 @@ These changes are relevant if you wrote custom modules for the server that depen - `RedirectAllHttpHandler` was removed and fully replaced by `RedirectingHttpHandler`. - `SingleThreadedResourceLocker` has been renamed to `MemoryResourceLocker`. +A new interface `SingleThreaded` has been added. This empty interface can be implemented to mark a component as not-threadsafe. When the CSS starts in multithreaded mode, it will error and halt if any SingleThreaded components are instantiated. + ## V4.0.1 Freezes the `oidc-provider` dependency to prevent a potential issue with the solid authn client as described in https://github.com/inrupt/solid-client-authn-js/issues/2103. diff --git a/config/app/init/base/init.json b/config/app/init/base/init.json index 4d40abe179..a0d04f30d3 100644 --- a/config/app/init/base/init.json +++ b/config/app/init/base/init.json @@ -5,7 +5,8 @@ "css:config/app/init/initializers/logger.json", "css:config/app/init/initializers/server.json", "css:config/app/init/initializers/seeded-pod.json", - "css:config/app/init/initializers/version.json" + "css:config/app/init/initializers/version.json", + "css:config/app/init/initializers/workers.json" ], "@graph": [ { @@ -14,12 +15,44 @@ "@type": "SequenceHandler", "handlers": [ { "@id": "urn:solid-server:default:LoggerInitializer" }, - { "@id": "urn:solid-server:default:BaseUrlVerifier" }, - { "@id": "urn:solid-server:default:ParallelInitializer" }, - { "@id": "urn:solid-server:default:SeededPodInitializer" }, - { "@id": "urn:solid-server:default:ServerInitializer" }, - { "@id": "urn:solid-server:default:ModuleVersionVerifier" } + { "@id": "urn:solid-server:default:PrimaryInitializer" }, + { "@id": "urn:solid-server:default:WorkerInitializer" } ] + }, + { + "comment": "This wrapped sequence handler will be executed ONLY BY THE PRIMARY PROCESS when starting the server.", + "@id": "urn:solid-server:default:PrimaryInitializer", + "@type": "ProcessHandler", + "executeOnPrimary": true, + "clusterManager": { "@id": "urn:solid-server:default:ClusterManager" }, + "source": { + "comment": "These initializers will all be executed sequentially when starting the server.", + "@id": "urn:solid-server:default:PrimarySequenceInitializer", + "@type":"SequenceHandler", + "handlers": [ + { "@id": "urn:solid-server:default:BaseUrlVerifier" }, + { "@id": "urn:solid-server:default:PrimaryParallelInitializer" }, + { "@id": "urn:solid-server:default:SeededPodInitializer" }, + { "@id": "urn:solid-server:default:ModuleVersionVerifier" }, + { "@id": "urn:solid-server:default:WorkerManager" } + ] + } + }, + { + "comment": "This wrapped sequence handler will be executed ONLY BY THE WORKER PROCESSES when starting the server.", + "@id": "urn:solid-server:default:WorkerInitializer", + "@type": "ProcessHandler", + "executeOnPrimary": false, + "clusterManager": { "@id": "urn:solid-server:default:ClusterManager" }, + "source": { + "comment": "These initializers will all be executed sequentially when starting the server.", + "@id": "urn:solid-server:default:WorkerSequenceInitializer", + "@type": "SequenceHandler", + "handlers": [ + { "@id": "urn:solid-server:default:WorkerParallelInitializer" }, + { "@id": "urn:solid-server:default:ServerInitializer" } + ] + } } ] } diff --git a/config/app/init/default.json b/config/app/init/default.json index bd8734a54e..810d1ff345 100644 --- a/config/app/init/default.json +++ b/config/app/init/default.json @@ -5,8 +5,14 @@ ], "@graph": [ { - "comment": "These handlers are called whenever the server is started, and can be used to ensure that all necessary resources for booting are available.", - "@id": "urn:solid-server:default:ParallelInitializer", + "comment": "These handlers are called only for the Primary process whenever the server is started, and can be used to ensure that all necessary resources for booting are available. (in singlethreaded mode, these are always called)", + "@id": "urn:solid-server:default:PrimaryParallelInitializer", + "@type": "ParallelHandler", + "handlers": [ ] + }, + { + "comment": "These handlers are called only for the workers processes whenever the server is started, and can be used to ensure that all necessary resources for booting are available. (in singlethreaded mode, these are always called)", + "@id": "urn:solid-server:default:WorkerParallelInitializer", "@type": "ParallelHandler", "handlers": [ ] } diff --git a/config/app/init/initialize-prefilled-root.json b/config/app/init/initialize-prefilled-root.json index 34260e860e..149c49e2ae 100644 --- a/config/app/init/initialize-prefilled-root.json +++ b/config/app/init/initialize-prefilled-root.json @@ -6,12 +6,18 @@ ], "@graph": [ { - "comment": "These handlers are called whenever the server is started, and can be used to ensure that all necessary resources for booting are available.", - "@id": "urn:solid-server:default:ParallelInitializer", + "comment": "These handlers are called only for the Primary process whenever the server is started, and can be used to ensure that all necessary resources for booting are available. (in singlethreaded mode, these are always called)", + "@id": "urn:solid-server:default:PrimaryParallelInitializer", "@type": "ParallelHandler", "handlers": [ { "@id": "urn:solid-server:default:RootInitializer" } ] + }, + { + "comment": "These handlers are called only for the workers processes whenever the server is started, and can be used to ensure that all necessary resources for booting are available. (in singlethreaded mode, these are always called)", + "@id": "urn:solid-server:default:WorkerParallelInitializer", + "@type": "ParallelHandler", + "handlers": [ ] } ] } diff --git a/config/app/init/initialize-root.json b/config/app/init/initialize-root.json index ae3d60d9ee..f3305188b9 100644 --- a/config/app/init/initialize-root.json +++ b/config/app/init/initialize-root.json @@ -6,12 +6,18 @@ ], "@graph": [ { - "comment": "These handlers are called whenever the server is started, and can be used to ensure that all necessary resources for booting are available.", - "@id": "urn:solid-server:default:ParallelInitializer", + "comment": "These handlers are called only for the Primary process whenever the server is started, and can be used to ensure that all necessary resources for booting are available. (in singlethreaded mode, these are always called)", + "@id": "urn:solid-server:default:PrimaryParallelInitializer", "@type": "ParallelHandler", "handlers": [ { "@id": "urn:solid-server:default:RootInitializer" } ] + }, + { + "comment": "These handlers are called only for the workers processes whenever the server is started, and can be used to ensure that all necessary resources for booting are available. (in singlethreaded mode, these are always called)", + "@id": "urn:solid-server:default:WorkerParallelInitializer", + "@type": "ParallelHandler", + "handlers": [ ] } ] } diff --git a/config/app/init/initializers/workers.json b/config/app/init/initializers/workers.json new file mode 100644 index 0000000000..bff30944b9 --- /dev/null +++ b/config/app/init/initializers/workers.json @@ -0,0 +1,11 @@ +{ + "@context": "https://linkedsoftwaredependencies.org/bundles/npm/@solid/community-server/^5.0.0/components/context.jsonld", + "@graph": [ + { + "comment": "Spawns the required amount of workers", + "@id": "urn:solid-server:default:WorkerManager", + "@type": "WorkerManager", + "clusterManager": { "@id": "urn:solid-server:default:ClusterManager" } + } + ] +} diff --git a/config/app/main/default.json b/config/app/main/default.json index 394987674f..ac846c05bd 100644 --- a/config/app/main/default.json +++ b/config/app/main/default.json @@ -13,7 +13,12 @@ "finalizers": [ { "@id": "urn:solid-server:default:ServerInitializer" } ] - } + }, + "clusterManager": { + "@id": "urn:solid-server:default:ClusterManager", + "@type": "ClusterManager", + "workers": { "@id": "urn:solid-server:default:variable:workers" } + } } ] } diff --git a/config/app/variables/cli/cli.json b/config/app/variables/cli/cli.json index 0f67927160..af172b6029 100644 --- a/config/app/variables/cli/cli.json +++ b/config/app/variables/cli/cli.json @@ -102,6 +102,16 @@ "type": "string", "describe": "Path to the file that will be used to seed pods." } + }, + { + "@type": "YargsParameter", + "name": "workers", + "options": { + "alias": "w", + "requiresArg": true, + "type": "number", + "describe": "Run the server in multithreaded mode using workers. (special values: -1: num_cores-1, 0: num_cores). Defaults to 1 (singlethreaded)" + } } ], "options": { diff --git a/config/app/variables/resolver/resolver.json b/config/app/variables/resolver/resolver.json index 3ab0b41c76..ad98b68505 100644 --- a/config/app/variables/resolver/resolver.json +++ b/config/app/variables/resolver/resolver.json @@ -65,6 +65,14 @@ "@type": "AssetPathExtractor", "key": "seededPodConfigJson" } + }, + { + "CombinedSettingsResolver:_resolvers_key": "urn:solid-server:default:variable:workers", + "CombinedSettingsResolver:_resolvers_value": { + "@type": "KeyExtractor", + "key": "workers", + "defaultValue": 1 + } } ] } diff --git a/config/identity/access/restricted.json b/config/identity/access/restricted.json index 53a693ae6a..30070c8909 100644 --- a/config/identity/access/restricted.json +++ b/config/identity/access/restricted.json @@ -13,7 +13,7 @@ }, { "comment": "IDP-related containers require initialized resources to support authorization.", - "@id": "urn:solid-server:default:ParallelInitializer", + "@id": "urn:solid-server:default:PrimaryParallelInitializer", "@type": "ParallelHandler", "handlers": [ { "@id": "urn:solid-server:default:IdpContainerInitializer" }, diff --git a/config/identity/pod/dynamic.json b/config/identity/pod/dynamic.json index fc564c0799..70347c229b 100644 --- a/config/identity/pod/dynamic.json +++ b/config/identity/pod/dynamic.json @@ -23,8 +23,8 @@ }, { - "comment": "Add to the list of initializers.", - "@id": "urn:solid-server:default:ParallelInitializer", + "comment": "Add to the list of primary initializers.", + "@id": "urn:solid-server:default:PrimaryParallelInitializer", "@type": "ParallelHandler", "handlers": [ { diff --git a/config/util/variables/default.json b/config/util/variables/default.json index ce07b22ada..ec37676402 100644 --- a/config/util/variables/default.json +++ b/config/util/variables/default.json @@ -41,6 +41,11 @@ "comment": "Path to the JSON file used to seed pods.", "@id": "urn:solid-server:default:variable:seededPodConfigJson", "@type": "Variable" + }, + { + "comment": "Run the server in multithreaded mode with the set amount of workers.", + "@id": "urn:solid-server:default:variable:workers", + "@type": "Variable" } ] } diff --git a/package-lock.json b/package-lock.json index 4651bae095..e660920ed2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -35,7 +35,7 @@ "arrayify-stream": "^2.0.0", "async-lock": "^1.3.0", "bcrypt": "^5.0.1", - "componentsjs": "^5.0.1", + "componentsjs": "^5.1.0", "cors": "^2.8.5", "cross-fetch": "^3.1.5", "ejs": "^3.1.6", @@ -46,6 +46,7 @@ "handlebars": "^4.7.7", "ioredis": "^5.0.4", "jose": "^4.4.0", + "jsonld-context-parser": "^2.1.5", "lodash.orderby": "^4.6.0", "marked": "^4.0.12", "mime-types": "^2.1.34", @@ -5878,9 +5879,9 @@ "dev": true }, "node_modules/componentsjs": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/componentsjs/-/componentsjs-5.0.1.tgz", - "integrity": "sha512-FMpAYBTJk+/Lsq0mgL6ugyabFjFy4H9d37GkgeGFxFt45GPxe2SKNoujoUDRNCQolajFSngisl105Ju8qzp+LQ==", + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/componentsjs/-/componentsjs-5.1.0.tgz", + "integrity": "sha512-Ev2xgnBub9NEIa6Yt9fz48TvUBceO2IE9CqxE9jtujAHplyk+0PxIBaGeTaD1tVK+DrtK5Gs5ChcTKfdDY2hMA==", "dependencies": { "@rdfjs/types": "*", "@types/minimist": "^1.2.0", @@ -5892,6 +5893,7 @@ "rdf-object": "^1.13.1", "rdf-parse": "^2.0.0", "rdf-quad": "^1.5.0", + "rdf-string": "^1.6.0", "rdf-terms": "^1.7.0", "semver": "^7.3.2", "winston": "^3.3.3" @@ -19740,9 +19742,9 @@ "dev": true }, "componentsjs": { - "version": "5.0.1", - "resolved": "https://registry.npmjs.org/componentsjs/-/componentsjs-5.0.1.tgz", - "integrity": "sha512-FMpAYBTJk+/Lsq0mgL6ugyabFjFy4H9d37GkgeGFxFt45GPxe2SKNoujoUDRNCQolajFSngisl105Ju8qzp+LQ==", + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/componentsjs/-/componentsjs-5.1.0.tgz", + "integrity": "sha512-Ev2xgnBub9NEIa6Yt9fz48TvUBceO2IE9CqxE9jtujAHplyk+0PxIBaGeTaD1tVK+DrtK5Gs5ChcTKfdDY2hMA==", "requires": { "@rdfjs/types": "*", "@types/minimist": "^1.2.0", @@ -19754,6 +19756,7 @@ "rdf-object": "^1.13.1", "rdf-parse": "^2.0.0", "rdf-quad": "^1.5.0", + "rdf-string": "^1.6.0", "rdf-terms": "^1.7.0", "semver": "^7.3.2", "winston": "^3.3.3" diff --git a/package.json b/package.json index 0917971961..733c8f6f2f 100644 --- a/package.json +++ b/package.json @@ -102,7 +102,7 @@ "arrayify-stream": "^2.0.0", "async-lock": "^1.3.0", "bcrypt": "^5.0.1", - "componentsjs": "^5.0.1", + "componentsjs": "^5.1.0", "cors": "^2.8.5", "cross-fetch": "^3.1.5", "ejs": "^3.1.6", @@ -113,6 +113,7 @@ "handlebars": "^4.7.7", "ioredis": "^5.0.4", "jose": "^4.4.0", + "jsonld-context-parser": "^2.1.5", "lodash.orderby": "^4.6.0", "marked": "^4.0.12", "mime-types": "^2.1.34", diff --git a/src/index.ts b/src/index.ts index 0f3fb398ed..55f26fa3ce 100644 --- a/src/index.ts +++ b/src/index.ts @@ -187,6 +187,11 @@ export * from './identity/storage/WebIdAdapterFactory'; export * from './identity/IdentityProviderHttpHandler'; export * from './identity/OidcHttpHandler'; +// Init/Cluster +export * from './init/cluster/ClusterManager'; +export * from './init/cluster/SingleThreaded'; +export * from './init/cluster/WorkerManager'; + // Init/Final export * from './init/final/Finalizable'; export * from './init/final/ParallelFinalizer'; @@ -218,9 +223,9 @@ export * from './init/ConfigPodInitializer'; export * from './init/ContainerInitializer'; export * from './init/Initializer'; export * from './init/LoggerInitializer'; +export * from './init/ModuleVersionVerifier'; export * from './init/SeededPodInitializer'; export * from './init/ServerInitializer'; -export * from './init/ModuleVersionVerifier'; // Logging export * from './logging/LazyLoggerFactory'; @@ -401,6 +406,7 @@ export * from './util/handlers/ConditionalHandler'; export * from './util/handlers/HandlerUtil'; export * from './util/handlers/MethodFilterHandler'; export * from './util/handlers/ParallelHandler'; +export * from './util/handlers/ProcessHandler'; export * from './util/handlers/SequenceHandler'; export * from './util/handlers/StaticHandler'; export * from './util/handlers/StaticThrowHandler'; diff --git a/src/init/App.ts b/src/init/App.ts index 2d112ac106..3929a00fbe 100644 --- a/src/init/App.ts +++ b/src/init/App.ts @@ -1,3 +1,4 @@ +import type { ClusterManager } from './cluster/ClusterManager'; import type { Finalizable } from './final/Finalizable'; import type { Initializer } from './Initializer'; @@ -7,10 +8,12 @@ import type { Initializer } from './Initializer'; export class App { private readonly initializer: Initializer; private readonly finalizer: Finalizable; + public readonly clusterManager: ClusterManager; - public constructor(initializer: Initializer, finalizer: Finalizable) { + public constructor(initializer: Initializer, finalizer: Finalizable, clusterManager: ClusterManager) { this.initializer = initializer; this.finalizer = finalizer; + this.clusterManager = clusterManager; } /** diff --git a/src/init/AppRunner.ts b/src/init/AppRunner.ts index e576fc4bef..6247e9cc2d 100644 --- a/src/init/AppRunner.ts +++ b/src/init/AppRunner.ts @@ -6,9 +6,11 @@ import yargs from 'yargs'; import { LOG_LEVELS } from '../logging/LogLevel'; import { getLoggerFor } from '../logging/LogUtil'; import { createErrorMessage, isError } from '../util/errors/ErrorUtil'; +import { InternalServerError } from '../util/errors/InternalServerError'; import { resolveModulePath, resolveAssetPath } from '../util/PathUtil'; import type { App } from './App'; import type { CliResolver } from './CliResolver'; +import { listSingleThreadedComponents } from './cluster/SingleThreaded'; import type { CliArgv, VariableBindings } from './variables/Types'; const DEFAULT_CONFIG = resolveModulePath('config/default.json'); @@ -65,7 +67,7 @@ export class AppRunner { const componentsManager = await this.createComponentsManager(loaderProperties, configFile); // Create the application using the translated variable values - return componentsManager.instantiate(DEFAULT_APP, { variables: variableBindings }); + return await this.createApp(componentsManager, variableBindings); } /** @@ -177,12 +179,26 @@ export class AppRunner { * where the App is created and started using the variable mappings. */ private async createApp(componentsManager: ComponentsManager, variables: Record): Promise { + let app: App; + // Create the app try { - // Create the app - return await componentsManager.instantiate(DEFAULT_APP, { variables }); + app = await componentsManager.instantiate(DEFAULT_APP, { variables }); } catch (error: unknown) { this.resolveError(`Could not create the server`, error); } + + // Ensure thread safety + if (!app.clusterManager.isSingleThreaded()) { + const violatingClasses = await listSingleThreadedComponents(componentsManager); + if (violatingClasses.length > 0) { + const verb = violatingClasses.length > 1 ? 'are' : 'is'; + const detailedError = new InternalServerError( + `[${violatingClasses.join(', ')}] ${verb} not threadsafe and should not be run in multithreaded setups!`, + ); + this.resolveError('Cannot run a singlethreaded-only component in a multithreaded setup!', detailedError); + } + } + return app; } /** diff --git a/src/init/ModuleVersionVerifier.ts b/src/init/ModuleVersionVerifier.ts index dd17922dee..1f7a9ee245 100644 --- a/src/init/ModuleVersionVerifier.ts +++ b/src/init/ModuleVersionVerifier.ts @@ -1,10 +1,7 @@ -import { readJson } from 'fs-extra'; import type { KeyValueStorage } from '../storage/keyvalue/KeyValueStorage'; -import { resolveModulePath } from '../util/PathUtil'; +import { readPackageJson } from '../util/PathUtil'; import { Initializer } from './Initializer'; -const PACKAGE_JSON_PATH = resolveModulePath('package.json'); - /** * This initializer simply writes the version number of the server to the storage. * This will be relevant in the future when we look into migration initializers. @@ -22,7 +19,7 @@ export class ModuleVersionVerifier extends Initializer { } public async handle(): Promise { - const pkg = await readJson(PACKAGE_JSON_PATH); + const pkg = await readPackageJson(); await this.storage.set(this.storageKey, pkg.version); } } diff --git a/src/init/cluster/ClusterManager.ts b/src/init/cluster/ClusterManager.ts new file mode 100644 index 0000000000..608da28869 --- /dev/null +++ b/src/init/cluster/ClusterManager.ts @@ -0,0 +1,120 @@ +import type { Worker } from 'cluster'; +import cluster from 'cluster'; +import { cpus } from 'os'; +import { getLoggerFor } from '../../logging/LogUtil'; +import { InternalServerError } from '../../util/errors/InternalServerError'; + +/** + * Different cluster modes. + */ +enum ClusterMode { + /** Scales in relation to `core_count`. */ + autoScale, + /** Single threaded mode, no clustering */ + singleThreaded, + /** Fixed amount of workers being forked. (limited to core_count) */ + fixed +} + +/** + * Convert workers amount to {@link ClusterMode} + * @param workers - Amount of workers + * @returns ClusterMode enum value + */ +function toClusterMode(workers: number): ClusterMode { + if (workers <= 0) { + return ClusterMode.autoScale; + } + if (workers === 1) { + return ClusterMode.singleThreaded; + } + return ClusterMode.fixed; +} + +/** + * This class is responsible for deciding how many affective workers are needed. + * It also contains the logic for respawning workers when they are killed by the os. + * + * The workers values are interpreted as follows: + * value | actual workers | + * ------|--------------| + * `-m` | `num_cores - m` workers _(autoscale)_ (`m < num_cores`) | + * `-1` | `num_cores - 1` workers _(autoscale)_ | + * `0` | `num_cores` workers _(autoscale)_ | + * `1` | `single threaded mode` _(default)_ | + * `n` | `n` workers | + */ +export class ClusterManager { + private readonly logger = getLoggerFor(this); + private readonly workers: number; + private readonly clusterMode: ClusterMode; + + public constructor(workers: number | string) { + const cores = cpus().length; + // Workaround for https://github.com/CommunitySolidServer/CommunitySolidServer/issues/1182 + if (typeof workers === 'string') { + workers = Number.parseInt(workers, 10); + } + + if (workers <= -cores) { + throw new InternalServerError('Invalid workers value (should be in the interval ]-num_cores, +โˆž).'); + } + + this.workers = toClusterMode(workers) === ClusterMode.autoScale ? cores + workers : workers; + this.clusterMode = toClusterMode(this.workers); + } + + /** + * Spawn all required workers. + */ + public spawnWorkers(): void { + let counter = 0; + this.logger.info(`Setting up ${this.workers} workers`); + + for (let i = 0; i < this.workers; i++) { + cluster.fork().on('message', (msg: string): void => { + this.logger.info(msg); + }); + } + + cluster.on('online', (worker: Worker): void => { + this.logger.info(`Worker ${worker.process.pid} is listening`); + counter += 1; + if (counter === this.workers) { + this.logger.info(`All ${this.workers} requested workers have been started.`); + } + }); + + cluster.on('exit', (worker: Worker, code: number, signal: string): void => { + this.logger.warn(`Worker ${worker.process.pid} died with code ${code} and signal ${signal}`); + this.logger.warn('Starting a new worker'); + cluster.fork().on('message', (msg: string): void => { + this.logger.info(msg); + }); + }); + } + + /** + * Check whether the CSS server was booted in single threaded mode. + * @returns True is single threaded. + */ + public isSingleThreaded(): boolean { + return this.clusterMode === ClusterMode.singleThreaded; + } + + /** + * Whether the calling process is the primary process. + * @returns True if primary + */ + public isPrimary(): boolean { + return cluster.isMaster; + } + + /** + * Whether the calling process is a worker process. + * @returns True if worker + */ + public isWorker(): boolean { + return cluster.isWorker; + } +} diff --git a/src/init/cluster/SingleThreaded.ts b/src/init/cluster/SingleThreaded.ts new file mode 100644 index 0000000000..90b5cb9a3f --- /dev/null +++ b/src/init/cluster/SingleThreaded.ts @@ -0,0 +1,57 @@ +import type { ComponentsManager } from 'componentsjs'; +import { PrefetchedDocumentLoader } from 'componentsjs'; +import { ContextParser } from 'jsonld-context-parser'; +import { InternalServerError } from '../../util/errors/InternalServerError'; +import { readPackageJson } from '../../util/PathUtil'; + +/** + * Indicates a class is only meant to work in singlethreaded setups and is thus not threadsafe. + */ +export interface SingleThreaded {} + +/** + * Convert an exported interface name to the properly expected Components.js type URI. + * @param componentsManager - The currently used ComponentsManager + * @param interfaceName - An interface name + * @returns A Components.js type URI + */ +export async function toComponentsJsType(componentsManager: ComponentsManager, interfaceName: string): +Promise { + const pkg = await readPackageJson(); + const contextParser = new ContextParser({ + documentLoader: new PrefetchedDocumentLoader({ contexts: componentsManager.moduleState.contexts }), + skipValidation: true, + }); + // The keys of the package.json `lsd:contexts` array contains all the IRIs of the relevant contexts; + const lsdContexts = Object.keys(pkg['lsd:contexts']); + // Feed the lsd:context IRIs to the ContextParser + const cssContext = await contextParser.parse(lsdContexts); + // We can now expand a simple interface name, to its full Components.js type identifier. + const interfaceIRI = cssContext.expandTerm(interfaceName, true); + + if (!interfaceIRI) { + throw new InternalServerError(`Could not expand ${interfaceName} to IRI!`); + } + return interfaceIRI; +} + +/** + * Will list class names of components instantiated implementing the {@link SingleThreaded} + * interface while the application is being run in multithreaded mode. + * @param componentsManager - The componentsManager being used to set up the application + */ +export async function listSingleThreadedComponents(componentsManager: ComponentsManager): Promise { + const interfaceType = await toComponentsJsType(componentsManager, 'SingleThreaded'); + const violatingClasses: string[] = []; + + // Loop through all instantiated Resources + for (const resource of componentsManager.getInstantiatedResources()) { + // If implementing interfaceType, while not being the interfaceType itself. + if (resource?.isA(interfaceType) && resource.value !== interfaceType) { + // Part after the # in an IRI is the actual class name + const name = resource.property?.type?.value?.split('#')?.[1]; + violatingClasses.push(name); + } + } + return violatingClasses; +} diff --git a/src/init/cluster/WorkerManager.ts b/src/init/cluster/WorkerManager.ts new file mode 100644 index 0000000000..4f281185e0 --- /dev/null +++ b/src/init/cluster/WorkerManager.ts @@ -0,0 +1,20 @@ +import { Initializer } from '../Initializer'; +import type { ClusterManager } from './ClusterManager'; + +/** + * Spawns the necessary workers when starting in multithreaded mode. + */ +export class WorkerManager extends Initializer { + private readonly clusterManager: ClusterManager; + + public constructor(clusterManager: ClusterManager) { + super(); + this.clusterManager = clusterManager; + } + + public async handle(): Promise { + if (!this.clusterManager.isSingleThreaded()) { + this.clusterManager.spawnWorkers(); + } + } +} diff --git a/src/logging/Logger.ts b/src/logging/Logger.ts index 60c5df058d..040a79e4b9 100644 --- a/src/logging/Logger.ts +++ b/src/logging/Logger.ts @@ -1,5 +1,14 @@ +import cluster from 'cluster'; +import process from 'process'; import type { LogLevel } from './LogLevel'; +export interface LogMetadata { + /** Is the current process the Primary process */ + isPrimary: boolean; + /** The process id of the current process */ + pid: number; +} + /** * Logs messages on a specific level. * @@ -13,7 +22,7 @@ export interface SimpleLogger { * @param message - The message to log. * @param meta - Optional metadata to include in the log message. */ - log: (level: LogLevel, message: string) => SimpleLogger; + log: (level: LogLevel, message: string, meta?: LogMetadata) => SimpleLogger; } /** @@ -79,30 +88,35 @@ export interface Logger extends SimpleLogger { * leaving only the implementation of {@link SimpleLogger}. */ export abstract class BaseLogger implements Logger { - public abstract log(level: LogLevel, message: string): Logger; + public abstract log(level: LogLevel, message: string, meta?: LogMetadata): Logger; + + private readonly getMeta = (): LogMetadata => ({ + pid: process.pid, + isPrimary: cluster.isMaster, + }); public error(message: string): Logger { - return this.log('error', message); + return this.log('error', message, this.getMeta()); } public warn(message: string): Logger { - return this.log('warn', message); + return this.log('warn', message, this.getMeta()); } public info(message: string): Logger { - return this.log('info', message); + return this.log('info', message, this.getMeta()); } public verbose(message: string): Logger { - return this.log('verbose', message); + return this.log('verbose', message, this.getMeta()); } public debug(message: string): Logger { - return this.log('debug', message); + return this.log('debug', message, this.getMeta()); } public silly(message: string): Logger { - return this.log('silly', message); + return this.log('silly', message, this.getMeta()); } } @@ -118,8 +132,8 @@ export class WrappingLogger extends BaseLogger { this.logger = logger; } - public log(level: LogLevel, message: string): this { - this.logger.log(level, message); + public log(level: LogLevel, message: string, meta?: LogMetadata): this { + this.logger.log(level, message, meta); return this; } } diff --git a/src/logging/WinstonLoggerFactory.ts b/src/logging/WinstonLoggerFactory.ts index f3c97a740b..d920d3a384 100644 --- a/src/logging/WinstonLoggerFactory.ts +++ b/src/logging/WinstonLoggerFactory.ts @@ -1,6 +1,6 @@ import { createLogger, format, transports } from 'winston'; import type * as Transport from 'winston-transport'; -import type { Logger } from './Logger'; +import type { Logger, LogMetadata } from './Logger'; import type { LoggerFactory } from './LoggerFactory'; import { WinstonLogger } from './WinstonLogger'; @@ -17,6 +17,13 @@ export class WinstonLoggerFactory implements LoggerFactory { this.level = level; } + private readonly clusterInfo = (meta: LogMetadata): string => { + if (meta.isPrimary) { + return 'Primary'; + } + return `W-${meta.pid ?? '???'}`; + }; + public createLogger(label: string): Logger { return new WinstonLogger(createLogger({ level: this.level, @@ -24,8 +31,10 @@ export class WinstonLoggerFactory implements LoggerFactory { format.label({ label }), format.colorize(), format.timestamp(), - format.printf(({ level: levelInner, message, label: labelInner, timestamp }: Record): string => - `${timestamp} [${labelInner}] ${levelInner}: ${message}`), + format.metadata({ fillExcept: [ 'level', 'timestamp', 'label', 'message' ]}), + format.printf(({ level: levelInner, message, label: labelInner, timestamp, metadata: meta }: + Record): string => + `${timestamp} [${labelInner}] {${this.clusterInfo(meta)}} ${levelInner}: ${message}`), ), transports: this.createTransports(), })); diff --git a/src/server/BaseHttpServerFactory.ts b/src/server/BaseHttpServerFactory.ts index d840f57cda..25e702db9c 100644 --- a/src/server/BaseHttpServerFactory.ts +++ b/src/server/BaseHttpServerFactory.ts @@ -55,7 +55,7 @@ export class BaseHttpServerFactory implements HttpServerFactory { public startServer(port: number): Server { const protocol = this.options.https ? 'https' : 'http'; const url = new URL(`${protocol}://localhost:${port}/`).href; - this.logger.info(`Starting server at ${url}`); + this.logger.info(`Listening to server at ${url}`); const createServer = this.options.https ? createHttpsServer : createHttpServer; const options = this.createServerOptions(); diff --git a/src/storage/accessors/InMemoryDataAccessor.ts b/src/storage/accessors/InMemoryDataAccessor.ts index c5937f2de2..7c2c13a6fe 100644 --- a/src/storage/accessors/InMemoryDataAccessor.ts +++ b/src/storage/accessors/InMemoryDataAccessor.ts @@ -2,6 +2,7 @@ import type { Readable } from 'stream'; import arrayifyStream from 'arrayify-stream'; import { RepresentationMetadata } from '../../http/representation/RepresentationMetadata'; import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier'; +import type { SingleThreaded } from '../../init/cluster/SingleThreaded'; import { InternalServerError } from '../../util/errors/InternalServerError'; import { NotFoundHttpError } from '../../util/errors/NotFoundHttpError'; import type { Guarded } from '../../util/GuardedStream'; @@ -19,7 +20,7 @@ interface ContainerEntry { } type CacheEntry = DataEntry | ContainerEntry; -export class InMemoryDataAccessor implements DataAccessor { +export class InMemoryDataAccessor implements DataAccessor, SingleThreaded { private readonly identifierStrategy: IdentifierStrategy; // A dummy container where every entry corresponds to a root container private readonly store: { entries: Record }; diff --git a/src/util/PathUtil.ts b/src/util/PathUtil.ts index b5745f3513..1e8015aa31 100644 --- a/src/util/PathUtil.ts +++ b/src/util/PathUtil.ts @@ -1,4 +1,5 @@ import { posix, win32 } from 'path'; +import { readJson } from 'fs-extra'; import urljoin from 'url-join'; import type { TargetExtractor } from '../http/input/identifier/TargetExtractor'; import type { ResourceIdentifier } from '../http/representation/ResourceIdentifier'; @@ -270,6 +271,13 @@ export function resolveAssetPath(path = modulePathPlaceholder): string { return absoluteFilePath(path); } +/** + * Reads the project package.json and returns it. + */ +export async function readPackageJson(): Promise> { + return readJson(resolveModulePath('package.json')); +} + /** * Concatenates all the given strings into a normalized URL. * Will place slashes between input strings if necessary. diff --git a/src/util/handlers/ProcessHandler.ts b/src/util/handlers/ProcessHandler.ts new file mode 100644 index 0000000000..70144de93d --- /dev/null +++ b/src/util/handlers/ProcessHandler.ts @@ -0,0 +1,46 @@ +import type { ClusterManager } from '../../init/cluster/ClusterManager'; +import { NotImplementedHttpError } from '../errors/NotImplementedHttpError'; +import { AsyncHandler } from './AsyncHandler'; + +/** + * A wrapper handler that will only run the wrapped handler if it is executed from: + * * when running multithreaded: either the **primary** or a **worker process** + * * when running singlethreaded: **the only process** (i.e. always) + */ +export class ProcessHandler extends AsyncHandler { + private readonly clusterManager: ClusterManager; + private readonly source: AsyncHandler; + private readonly executeOnPrimary: boolean; + + /** + * Creates a new ProcessHandler + * @param source - The wrapped handler + * @param clusterManager - The ClusterManager in use + * @param executeOnPrimary - Whether to execute the source handler when the process is the _primary_ or a _worker_. + */ + public constructor(source: AsyncHandler, clusterManager: ClusterManager, executeOnPrimary: boolean) { + super(); + this.source = source; + this.clusterManager = clusterManager; + this.executeOnPrimary = executeOnPrimary; + } + + public async canHandle(input: TIn): Promise { + if (!this.canExecute()) { + throw new NotImplementedHttpError(`Will not execute on ${this.executeOnPrimary ? 'worker' : 'primary'} process.`); + } + await this.source.canHandle(input); + } + + public async handle(input: TIn): Promise { + return this.source.handle(input); + } + + /** + * Checks if the condition has already been fulfilled. + */ + private canExecute(): boolean { + return this.clusterManager.isSingleThreaded() || + (this.executeOnPrimary ? this.clusterManager.isPrimary() : this.clusterManager.isWorker()); + } +} diff --git a/src/util/locking/MemoryResourceLocker.ts b/src/util/locking/MemoryResourceLocker.ts index 74ec33a6e6..c322e2ff06 100644 --- a/src/util/locking/MemoryResourceLocker.ts +++ b/src/util/locking/MemoryResourceLocker.ts @@ -1,6 +1,6 @@ -import cluster from 'cluster'; import AsyncLock from 'async-lock'; import type { ResourceIdentifier } from '../../http/representation/ResourceIdentifier'; +import type { SingleThreaded } from '../../init/cluster/SingleThreaded'; import { getLoggerFor } from '../../logging/LogUtil'; import { InternalServerError } from '../errors/InternalServerError'; import type { ResourceLocker } from './ResourceLocker'; @@ -11,7 +11,7 @@ import type { ResourceLocker } from './ResourceLocker'; * in a memory leak if locks are never unlocked, so make sure this is covered with expiring locks for example, * and/or proper `finally` handles. */ -export class MemoryResourceLocker implements ResourceLocker { +export class MemoryResourceLocker implements ResourceLocker, SingleThreaded { protected readonly logger = getLoggerFor(this); private readonly locker: AsyncLock; @@ -20,10 +20,6 @@ export class MemoryResourceLocker implements ResourceLocker { public constructor() { this.locker = new AsyncLock(); this.unlockCallbacks = {}; - if (cluster.isWorker) { - this.logger.warn(`MemoryResourceLocker is not thread-safe/process-safe! - You should only use this locker in a single-thread/single-process CSS setup.`); - } } public async acquire(identifier: ResourceIdentifier): Promise { diff --git a/test/integration/Config.ts b/test/integration/Config.ts index 6302c26e75..7556e9e3d7 100644 --- a/test/integration/Config.ts +++ b/test/integration/Config.ts @@ -54,5 +54,6 @@ export function getDefaultVariables(port: number, baseUrl?: string): Record { let initializer: Initializer; let finalizer: Finalizable; + let clusterManager: ClusterManager; let app: App; beforeEach(async(): Promise => { initializer = { handleSafe: jest.fn() } as any; finalizer = { finalize: jest.fn() }; - app = new App(initializer, finalizer); + clusterManager = {} as any; + app = new App(initializer, finalizer, clusterManager); }); it('can start with the initializer.', async(): Promise => { @@ -22,4 +25,9 @@ describe('An App', (): void => { await expect(app.stop()).resolves.toBeUndefined(); expect(finalizer.finalize).toHaveBeenCalledTimes(1); }); + + it('can check its clusterManager for the threading mode.', async(): Promise => { + await expect(app.start()).resolves.toBeUndefined(); + expect(app.clusterManager).toBe(clusterManager); + }); }); diff --git a/test/unit/init/AppRunner.test.ts b/test/unit/init/AppRunner.test.ts index c90dcd9bc9..eb0c9f89a9 100644 --- a/test/unit/init/AppRunner.test.ts +++ b/test/unit/init/AppRunner.test.ts @@ -1,4 +1,5 @@ import { ComponentsManager } from 'componentsjs'; +import type { ClusterManager } from '../../../src'; import type { App } from '../../../src/init/App'; import { AppRunner } from '../../../src/init/AppRunner'; import type { CliExtractor } from '../../../src/init/cli/CliExtractor'; @@ -6,10 +7,6 @@ import type { SettingsResolver } from '../../../src/init/variables/SettingsResol import { joinFilePath } from '../../../src/util/PathUtil'; import { flushPromises } from '../../util/Util'; -const app: jest.Mocked = { - start: jest.fn(), -} as any; - const defaultParameters = { port: 3000, logLevel: 'info', @@ -26,6 +23,31 @@ const settingsResolver: jest.Mocked = { handleSafe: jest.fn().mockResolvedValue(defaultVariables), } as any; +const mockLogger = { + info: jest.fn(), + warn: jest.fn(), + silly: jest.fn(), + error: jest.fn(), + verbose: jest.fn(), + debug: jest.fn(), + log: jest.fn(), +}; + +const clusterManager: jest.Mocked = { + isSingleThreaded: jest.fn().mockReturnValue(false), + spawnWorkers: jest.fn(), + isPrimary: jest.fn().mockReturnValue(true), + isWorker: jest.fn().mockReturnValue(false), + logger: mockLogger, + workers: 1, + clusterMode: 1, +} as any; + +const app: jest.Mocked = { + start: jest.fn(), + clusterManager, +} as any; + const manager: jest.Mocked> = { instantiate: jest.fn(async(iri: string): Promise => { switch (iri) { @@ -39,6 +61,12 @@ const manager: jest.Mocked> = { }, } as any; +const listSingleThreadedComponentsMock = jest.fn().mockResolvedValue([]); + +jest.mock('../../../src/init/cluster/SingleThreaded', (): any => ({ + listSingleThreadedComponents: (): any => listSingleThreadedComponentsMock(), +})); + jest.mock('componentsjs', (): any => ({ // eslint-disable-next-line @typescript-eslint/naming-convention ComponentsManager: { @@ -90,6 +118,75 @@ describe('AppRunner', (): void => { expect(cliExtractor.handleSafe).toHaveBeenCalledTimes(0); expect(settingsResolver.handleSafe).toHaveBeenCalledTimes(0); expect(app.start).toHaveBeenCalledTimes(0); + expect(app.clusterManager.isSingleThreaded()).toBeFalsy(); + }); + + it('throws an error if threading issues are detected with 1 class.', async(): Promise => { + listSingleThreadedComponentsMock.mockImplementationOnce((): string[] => [ 'ViolatingClass' ]); + const variables = { + 'urn:solid-server:default:variable:port': 3000, + 'urn:solid-server:default:variable:loggingLevel': 'info', + 'urn:solid-server:default:variable:rootFilePath': '/var/cwd/', + 'urn:solid-server:default:variable:showStackTrace': false, + 'urn:solid-server:default:variable:podConfigJson': '/var/cwd/pod-config.json', + 'urn:solid-server:default:variable:seededPodConfigJson': '/var/cwd/seeded-pod-config.json', + }; + + let caughtError: Error | undefined; + try { + await new AppRunner().create( + { + mainModulePath: joinFilePath(__dirname, '../../../'), + dumpErrorState: true, + logLevel: 'info', + }, + joinFilePath(__dirname, '../../../config/default.json'), + variables, + ); + } catch (error: unknown) { + caughtError = error as Error; + } + expect(caughtError?.message).toMatch(/^Cannot run a singlethreaded-only component in a multithreaded setup!/mu); + expect(caughtError?.message).toMatch( + /\[ViolatingClass\] is not threadsafe and should not be run in multithreaded setups!/mu, + ); + + expect(write).toHaveBeenCalledTimes(0); + expect(exit).toHaveBeenCalledTimes(0); + }); + + it('throws an error if threading issues are detected with 2 class.', async(): Promise => { + listSingleThreadedComponentsMock.mockImplementationOnce((): string[] => [ 'ViolatingClass1', 'ViolatingClass2' ]); + const variables = { + 'urn:solid-server:default:variable:port': 3000, + 'urn:solid-server:default:variable:loggingLevel': 'info', + 'urn:solid-server:default:variable:rootFilePath': '/var/cwd/', + 'urn:solid-server:default:variable:showStackTrace': false, + 'urn:solid-server:default:variable:podConfigJson': '/var/cwd/pod-config.json', + 'urn:solid-server:default:variable:seededPodConfigJson': '/var/cwd/seeded-pod-config.json', + }; + + let caughtError: Error | undefined; + try { + await new AppRunner().create( + { + mainModulePath: joinFilePath(__dirname, '../../../'), + dumpErrorState: true, + logLevel: 'info', + }, + joinFilePath(__dirname, '../../../config/default.json'), + variables, + ); + } catch (error: unknown) { + caughtError = error as Error; + } + expect(caughtError?.message).toMatch(/^Cannot run a singlethreaded-only component in a multithreaded setup!/mu); + expect(caughtError?.message).toMatch( + /\[ViolatingClass1, ViolatingClass2\] are not threadsafe and should not be run in multithreaded setups!/mu, + ); + + expect(write).toHaveBeenCalledTimes(0); + expect(exit).toHaveBeenCalledTimes(0); }); }); @@ -128,6 +225,7 @@ describe('AppRunner', (): void => { expect(settingsResolver.handleSafe).toHaveBeenCalledTimes(0); expect(app.start).toHaveBeenCalledTimes(1); expect(app.start).toHaveBeenCalledWith(); + expect(app.clusterManager.isSingleThreaded()).toBeFalsy(); }); }); @@ -154,6 +252,7 @@ describe('AppRunner', (): void => { expect(manager.instantiate).toHaveBeenNthCalledWith(2, 'urn:solid-server:default:App', { variables: defaultVariables }); + expect(app.clusterManager.isSingleThreaded()).toBeFalsy(); expect(app.start).toHaveBeenCalledTimes(0); }); @@ -171,6 +270,7 @@ describe('AppRunner', (): void => { '-t', '--podConfigJson', '/different-path.json', '--seededPodConfigJson', '/different-path.json', + '-w', '1', ]; process.argv = argvParameters; @@ -195,10 +295,35 @@ describe('AppRunner', (): void => { 'urn:solid-server:default:App', { variables: defaultVariables }); expect(app.start).toHaveBeenCalledTimes(0); + expect(app.clusterManager.isSingleThreaded()).toBeFalsy(); process.argv = argv; }); + it('checks for threading issues when starting in multithreaded mode.', async(): Promise => { + const createdApp = await new AppRunner().createCli(); + expect(createdApp).toBe(app); + expect(listSingleThreadedComponentsMock).toHaveBeenCalled(); + }); + + it('throws an error if there are threading issues detected.', async(): Promise => { + listSingleThreadedComponentsMock.mockImplementationOnce((): string[] => [ 'ViolatingClass' ]); + + let caughtError: Error = new Error('should disappear'); + try { + await new AppRunner().createCli([ 'node', 'script' ]); + } catch (error: unknown) { + caughtError = error as Error; + } + expect(caughtError.message).toMatch(/^Cannot run a singlethreaded-only component in a multithreaded setup!/mu); + expect(caughtError?.message).toMatch( + /\[ViolatingClass\] is not threadsafe and should not be run in multithreaded setups!/mu, + ); + + expect(write).toHaveBeenCalledTimes(0); + expect(exit).toHaveBeenCalledTimes(0); + }); + it('throws an error if creating a ComponentsManager fails.', async(): Promise => { (manager.configRegistry.register as jest.Mock).mockRejectedValueOnce(new Error('Fatal')); @@ -291,6 +416,7 @@ describe('AppRunner', (): void => { { variables: defaultVariables }); expect(app.start).toHaveBeenCalledTimes(1); expect(app.start).toHaveBeenLastCalledWith(); + expect(app.clusterManager.isSingleThreaded()).toBeFalsy(); }); it('throws an error if the server could not start.', async(): Promise => { @@ -342,6 +468,7 @@ describe('AppRunner', (): void => { { variables: defaultVariables }); expect(app.start).toHaveBeenCalledTimes(1); expect(app.start).toHaveBeenLastCalledWith(); + expect(app.clusterManager.isSingleThreaded()).toBeFalsy(); }); it('exits the process and writes to stderr if there was an error.', async(): Promise => { diff --git a/test/unit/init/cluster/ClusterManager.test.ts b/test/unit/init/cluster/ClusterManager.test.ts new file mode 100644 index 0000000000..4541bcd8f8 --- /dev/null +++ b/test/unit/init/cluster/ClusterManager.test.ts @@ -0,0 +1,106 @@ +import cluster from 'cluster'; +import EventEmitter from 'events'; +import { cpus } from 'os'; +import { ClusterManager } from '../../../../src'; +import * as LogUtil from '../../../../src/logging/LogUtil'; + +jest.mock('cluster'); +jest.mock('os', (): any => ({ + ...jest.requireActual('os'), + cpus: jest.fn().mockImplementation((): any => [{}, {}, {}, {}, {}, {}]), +})); + +const mockWorker = new EventEmitter() as any; +mockWorker.process = { pid: 666 }; + +describe('A ClusterManager', (): void => { + const emitter = new EventEmitter(); + const mockCluster = jest.requireMock('cluster'); + const mockLogger = { info: jest.fn(), warn: jest.fn() }; + jest.spyOn(LogUtil, 'getLoggerFor').mockImplementation((): any => mockLogger); + + beforeAll((): void => { + Object.assign(mockCluster, { + fork: jest.fn().mockImplementation((): any => mockWorker), + on: jest.fn().mockImplementation(emitter.on), + emit: jest.fn().mockImplementation(emitter.emit), + isMaster: true, + isWorker: false, + }); + }); + + it('can handle workers input as string.', (): void => { + const cm = new ClusterManager('4'); + expect(cm.isSingleThreaded()).toBeFalsy(); + }); + + it('can distinguish between ClusterModes.', (): void => { + const cm1 = new ClusterManager(-1); + const cm2 = new ClusterManager(0); + const cm3 = new ClusterManager(1); + const cm4 = new ClusterManager(2); + expect(cm1.isSingleThreaded()).toBeFalsy(); + expect(cm2.isSingleThreaded()).toBeFalsy(); + expect(cm3.isSingleThreaded()).toBeTruthy(); + expect(cm4.isSingleThreaded()).toBeFalsy(); + }); + + it('errors on invalid workers amount.', (): void => { + expect((): ClusterManager => new ClusterManager('10')).toBeDefined(); + expect((): ClusterManager => new ClusterManager('2')).toBeDefined(); + expect((): ClusterManager => new ClusterManager('1')).toBeDefined(); + expect((): ClusterManager => new ClusterManager('0')).toBeDefined(); + expect((): ClusterManager => new ClusterManager('-1')).toBeDefined(); + expect((): ClusterManager => new ClusterManager('-5')).toBeDefined(); + expect((): ClusterManager => new ClusterManager('-6')).toThrow('Invalid workers value'); + expect((): ClusterManager => new ClusterManager('-10')).toThrow('Invalid workers value'); + }); + + it('has an isPrimary() that works.', (): void => { + const cm = new ClusterManager(-1); + expect(cm.isPrimary()).toBeTruthy(); + }); + + it('has an isWorker() that works.', (): void => { + const cm = new ClusterManager(-1); + expect(cm.isWorker()).toBeFalsy(); + }); + + it('can autoscale to num_cpu and applies proper logging.', (): void => { + const cm = new ClusterManager(-1); + const workers = cpus().length - 1; + expect(cpus()).toHaveLength(workers + 1); + Object.assign(cm, { logger: mockLogger }); + + cm.spawnWorkers(); + + expect(mockLogger.info).toHaveBeenCalledWith(`Setting up ${workers} workers`); + + for (let i = 0; i < workers; i++) { + mockCluster.emit('online', mockWorker); + } + + expect(cluster.on).toHaveBeenCalledWith('online', expect.any(Function)); + expect(cluster.fork).toHaveBeenCalledTimes(workers); + expect(mockLogger.info).toHaveBeenLastCalledWith(`All ${workers} requested workers have been started.`); + + expect(cluster.on).toHaveBeenCalledWith('exit', expect.any(Function)); + const code = 333; + const signal = 'exiting'; + mockCluster.emit('exit', mockWorker, code, signal); + expect(mockLogger.warn).toHaveBeenCalledWith( + `Worker ${mockWorker.process.pid} died with code ${code} and signal ${signal}`, + ); + expect(mockLogger.warn).toHaveBeenCalledWith(`Starting a new worker`); + }); + + it('can receive message from spawned workers.', (): void => { + const cm = new ClusterManager(2); + Object.assign(cm, { logger: mockLogger }); + + cm.spawnWorkers(); + const msg = 'Hi from worker!'; + mockWorker.emit('message', msg); + expect(mockLogger.info).toHaveBeenCalledWith(msg); + }); +}); diff --git a/test/unit/init/cluster/SingleThreaded.test.ts b/test/unit/init/cluster/SingleThreaded.test.ts new file mode 100644 index 0000000000..eca189121a --- /dev/null +++ b/test/unit/init/cluster/SingleThreaded.test.ts @@ -0,0 +1,61 @@ +import { ComponentsManager } from 'componentsjs'; +import type { Resource } from 'rdf-object'; +import { listSingleThreadedComponents } from '../../../../src'; + +const moduleState = { + contexts: { + 'https://linkedsoftwaredependencies.org/bundles/npm/@solid/community-server/^5.0.0/components/context.jsonld': 'dist/components/context.jsonld', + }, +}; + +const mockResource: Resource = { + isA: jest.fn().mockReturnValue(true), + value: '#ViolatingClass', + property: { type: { value: '#ViolatingClass' }}, +} as any; + +const myExpandTerm = jest.fn().mockImplementation((): any => 'http://myFullIRI'); + +function mockComponentsManagerFn(length: number): jest.Mocked> { + const resources: Resource[] = Array.from({ length }).fill(mockResource); + return { moduleState, getInstantiatedResources: jest.fn((): any => resources) } as any; +} + +jest.mock('jsonld-context-parser/lib/ContextParser', (): any => ({ + // eslint-disable-next-line @typescript-eslint/naming-convention + ContextParser: jest.fn().mockImplementation((): any => ({ + parse: jest.fn(async(): Promise => ({ + expandTerm: jest.fn((): any => myExpandTerm()), + })), + })), +})); + +jest.mock('componentsjs', (): any => ({ + // eslint-disable-next-line @typescript-eslint/naming-convention + ComponentsManager: { + build: jest.fn(async(props: any): Promise> => mockComponentsManagerFn(props.length)), + }, + // eslint-disable-next-line @typescript-eslint/naming-convention + PrefetchedDocumentLoader: jest.fn().mockImplementation((): any => ({ + load: jest.fn(), + })), +})); + +describe('A SingleThreaded', (): void => { + it('has a listSingleThreadedComponents that works with 1 resource.', async(): Promise => { + const comp = await ComponentsManager.build({ length: 1 } as any); + await expect(listSingleThreadedComponents(comp)).resolves.toEqual([ 'ViolatingClass' ]); + }); + + it('has a listSingleThreadedComponents that works with multiple resources.', async(): Promise => { + const comp = await ComponentsManager.build({ length: 2 } as any); + await expect(listSingleThreadedComponents(comp)).resolves.toEqual([ 'ViolatingClass', 'ViolatingClass' ]); + }); + + it('errors when the interface IRI cannot be expanded.', async(): Promise => { + myExpandTerm.mockReturnValueOnce(null); + const comp = await ComponentsManager.build({} as any); + await expect(listSingleThreadedComponents(comp)).rejects + .toThrow(/^Could not expand .* to IRI!/u); + }); +}); diff --git a/test/unit/init/cluster/WorkerManager.test.ts b/test/unit/init/cluster/WorkerManager.test.ts new file mode 100644 index 0000000000..cc159aea33 --- /dev/null +++ b/test/unit/init/cluster/WorkerManager.test.ts @@ -0,0 +1,15 @@ +import { ClusterManager, WorkerManager } from '../../../../src'; + +describe('A WorkerManager', (): void => { + it('can be created from a ClusterManager.', (): void => { + expect((): WorkerManager => new WorkerManager(new ClusterManager(4))).toBeDefined(); + }); + + it('can call handle.', async(): Promise => { + const cm = new ClusterManager(4); + const wm = new WorkerManager(cm); + Object.assign(cm, { spawnWorkers: jest.fn() }); + await wm.handle(); + expect(cm.spawnWorkers).toHaveBeenCalled(); + }); +}); diff --git a/test/unit/logging/LazyLoggerFactory.test.ts b/test/unit/logging/LazyLoggerFactory.test.ts index 239dd80ab8..814a71fd5f 100644 --- a/test/unit/logging/LazyLoggerFactory.test.ts +++ b/test/unit/logging/LazyLoggerFactory.test.ts @@ -53,13 +53,13 @@ describe('LazyLoggerFactory', (): void => { const wrappedA = dummyLoggerFactory.createLogger.mock.results[0].value as jest.Mocked; expect(wrappedA.log).toHaveBeenCalledTimes(2); - expect(wrappedA.log).toHaveBeenNthCalledWith(1, 'warn', 'message1'); - expect(wrappedA.log).toHaveBeenNthCalledWith(2, 'error', 'message4'); + expect(wrappedA.log).toHaveBeenNthCalledWith(1, 'warn', 'message1', undefined); + expect(wrappedA.log).toHaveBeenNthCalledWith(2, 'error', 'message4', undefined); const wrappedB = dummyLoggerFactory.createLogger.mock.results[1].value as jest.Mocked; expect(wrappedB.log).toHaveBeenCalledTimes(2); - expect(wrappedB.log).toHaveBeenNthCalledWith(1, 'warn', 'message2'); - expect(wrappedB.log).toHaveBeenNthCalledWith(2, 'error', 'message3'); + expect(wrappedB.log).toHaveBeenNthCalledWith(1, 'warn', 'message2', undefined); + expect(wrappedB.log).toHaveBeenNthCalledWith(2, 'error', 'message3', undefined); }); it('does not store more messages than the buffer limit.', (): void => { @@ -84,6 +84,6 @@ describe('LazyLoggerFactory', (): void => { expect(wrappedA.log).toHaveBeenCalledTimes(50); expect(wrappedB.log).toHaveBeenCalledTimes(49); expect(warningLogger.log).toHaveBeenCalledTimes(1); - expect(warningLogger.log).toHaveBeenCalledWith('warn', 'Memory-buffered logging limit of 100 reached'); + expect(warningLogger.log).toHaveBeenCalledWith('warn', 'Memory-buffered logging limit of 100 reached', undefined); }); }); diff --git a/test/unit/logging/Logger.test.ts b/test/unit/logging/Logger.test.ts index 136c52bac3..888ad4c38e 100644 --- a/test/unit/logging/Logger.test.ts +++ b/test/unit/logging/Logger.test.ts @@ -1,9 +1,14 @@ +import process from 'process'; import { BaseLogger, WrappingLogger } from '../../../src/logging/Logger'; -import type { SimpleLogger } from '../../../src/logging/Logger'; +import type { SimpleLogger, LogMetadata } from '../../../src/logging/Logger'; describe('Logger', (): void => { describe('a BaseLogger', (): void => { let logger: BaseLogger; + const metadata: LogMetadata = { + isPrimary: true, + pid: process.pid, + }; beforeEach(async(): Promise => { logger = new (BaseLogger as any)(); @@ -13,43 +18,47 @@ describe('Logger', (): void => { it('delegates error to log.', async(): Promise => { logger.error('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('error', 'my message'); + expect(logger.log).toHaveBeenCalledWith('error', 'my message', metadata); }); it('warn delegates to log.', async(): Promise => { logger.warn('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('warn', 'my message'); + expect(logger.log).toHaveBeenCalledWith('warn', 'my message', metadata); }); it('info delegates to log.', async(): Promise => { logger.info('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('info', 'my message'); + expect(logger.log).toHaveBeenCalledWith('info', 'my message', metadata); }); it('verbose delegates to log.', async(): Promise => { logger.verbose('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('verbose', 'my message'); + expect(logger.log).toHaveBeenCalledWith('verbose', 'my message', metadata); }); it('debug delegates to log.', async(): Promise => { logger.debug('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('debug', 'my message'); + expect(logger.log).toHaveBeenCalledWith('debug', 'my message', metadata); }); it('silly delegates to log.', async(): Promise => { logger.silly('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('silly', 'my message'); + expect(logger.log).toHaveBeenCalledWith('silly', 'my message', metadata); }); }); describe('a WrappingLogger', (): void => { let logger: SimpleLogger; let wrapper: WrappingLogger; + const metadata: LogMetadata = { + isPrimary: true, + pid: process.pid, + }; beforeEach(async(): Promise => { logger = { log: jest.fn() }; @@ -59,37 +68,37 @@ describe('Logger', (): void => { it('error delegates to the internal logger.', async(): Promise => { wrapper.error('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('error', 'my message'); + expect(logger.log).toHaveBeenCalledWith('error', 'my message', metadata); }); it('warn delegates to the internal logger.', async(): Promise => { wrapper.warn('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('warn', 'my message'); + expect(logger.log).toHaveBeenCalledWith('warn', 'my message', metadata); }); it('info delegates to the internal logger.', async(): Promise => { wrapper.info('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('info', 'my message'); + expect(logger.log).toHaveBeenCalledWith('info', 'my message', metadata); }); it('verbose delegates to the internal logger.', async(): Promise => { wrapper.verbose('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('verbose', 'my message'); + expect(logger.log).toHaveBeenCalledWith('verbose', 'my message', metadata); }); it('debug delegates to the internal logger.', async(): Promise => { wrapper.debug('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('debug', 'my message'); + expect(logger.log).toHaveBeenCalledWith('debug', 'my message', metadata); }); it('silly delegates to the internal logger.', async(): Promise => { wrapper.silly('my message'); expect(logger.log).toHaveBeenCalledTimes(1); - expect(logger.log).toHaveBeenCalledWith('silly', 'my message'); + expect(logger.log).toHaveBeenCalledWith('silly', 'my message', metadata); }); }); }); diff --git a/test/unit/logging/WinstonLoggerFactory.test.ts b/test/unit/logging/WinstonLoggerFactory.test.ts index 4d0defdbdf..a25497f046 100644 --- a/test/unit/logging/WinstonLoggerFactory.test.ts +++ b/test/unit/logging/WinstonLoggerFactory.test.ts @@ -35,6 +35,7 @@ describe('WinstonLoggerFactory', (): void => { level: expect.stringContaining('debug'), message: 'my message', timestamp: expect.any(String), + metadata: expect.any(Object), [Symbol.for('level')]: 'debug', [Symbol.for('splat')]: [ undefined ], [Symbol.for('message')]: expect.any(String), diff --git a/test/unit/util/handlers/ProcessHandler.test.ts b/test/unit/util/handlers/ProcessHandler.test.ts new file mode 100644 index 0000000000..2d3bd7c829 --- /dev/null +++ b/test/unit/util/handlers/ProcessHandler.test.ts @@ -0,0 +1,72 @@ +import type { AsyncHandler, ClusterManager } from '../../../../src'; +import { NotImplementedHttpError, ProcessHandler } from '../../../../src'; + +function createClusterManager(workers: number, primary: boolean): jest.Mocked { + return { + isSingleThreaded: jest.fn().mockReturnValue(workers === 1), + isWorker: jest.fn().mockReturnValue(!primary), + isPrimary: jest.fn().mockReturnValue(primary), + } as any; +} + +describe('A ProcessHandler', (): void => { + const source: jest.Mocked> = { + canHandle: jest.fn(), + handleSafe: jest.fn().mockResolvedValue('handledSafely'), + handle: jest.fn().mockResolvedValue('handled'), + }; + + describe('allowing only worker processes', (): void => { + it('can create a ProcessHandler.', (): void => { + expect((): ProcessHandler => + new ProcessHandler(source, createClusterManager(1, true), false)).toBeDefined(); + }); + + it('can delegate to its source when run singlethreaded from worker.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(1, false), false); + await expect(ph.handleSafe('test')).resolves.toBe('handled'); + }); + + it('can delegate to its source when run singlethreaded from primary.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(1, true), false); + await expect(ph.handleSafe('test')).resolves.toBe('handled'); + }); + + it('can delegate to its source when run multithreaded from worker.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(2, false), false); + await expect(ph.handleSafe('test')).resolves.toBe('handled'); + }); + + it('errors when run multithreaded from primary.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(2, true), false); + await expect(ph.handleSafe('test')).rejects.toThrow(NotImplementedHttpError); + }); + }); + + describe('allowing only the primary process', (): void => { + it('can create a ProcessHandler.', (): void => { + expect((): ProcessHandler => + new ProcessHandler(source, createClusterManager(1, true), true)).toBeDefined(); + }); + + it('can delegate to its source when run singlethreaded from worker.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(1, false), true); + await expect(ph.handleSafe('test')).resolves.toBe('handled'); + }); + + it('can delegate to its source when run singlethreaded from primary.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(1, true), true); + await expect(ph.handleSafe('test')).resolves.toBe('handled'); + }); + + it('can delegate to its source when run multithreaded from primary.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(2, true), true); + await expect(ph.handleSafe('test')).resolves.toBe('handled'); + }); + + it('errors when run multithreaded from worker.', async(): Promise => { + const ph = new ProcessHandler(source, createClusterManager(2, false), true); + await expect(ph.handleSafe('test')).rejects.toThrow(NotImplementedHttpError); + }); + }); +}); diff --git a/test/unit/util/locking/MemoryResourceLocker.test.ts b/test/unit/util/locking/MemoryResourceLocker.test.ts index 9914e4e730..e5088cbfc2 100644 --- a/test/unit/util/locking/MemoryResourceLocker.test.ts +++ b/test/unit/util/locking/MemoryResourceLocker.test.ts @@ -1,19 +1,6 @@ -import type { Logger } from '../../../../src'; -import { getLoggerFor } from '../../../../src'; import { InternalServerError } from '../../../../src/util/errors/InternalServerError'; import { MemoryResourceLocker } from '../../../../src/util/locking/MemoryResourceLocker'; -jest.mock('../../../../src/logging/LogUtil', (): any => { - const logger: Logger = - { error: jest.fn(), debug: jest.fn(), warn: jest.fn(), info: jest.fn(), log: jest.fn() } as any; - return { getLoggerFor: (): Logger => logger }; -}); -const logger: jest.Mocked = getLoggerFor('MemoryResourceLocker') as any; - -jest.mock('cluster', (): any => ({ - isWorker: true, -})); - describe('A MemoryResourceLocker', (): void => { let locker: MemoryResourceLocker; const identifier = { path: 'http://test.com/foo' }; @@ -21,11 +8,6 @@ describe('A MemoryResourceLocker', (): void => { locker = new MemoryResourceLocker(); }); - it('logs a warning when constructed on a worker process.', (): void => { - expect((): MemoryResourceLocker => new MemoryResourceLocker()).toBeDefined(); - expect(logger.warn).toHaveBeenCalled(); - }); - it('can lock and unlock a resource.', async(): Promise => { await expect(locker.acquire(identifier)).resolves.toBeUndefined(); await expect(locker.release(identifier)).resolves.toBeUndefined();