Skip to content

Commit

Permalink
refactor: pulling cluster services to top level and removing handler …
Browse files Browse the repository at this point in the history
…naming as isnt just everything?
  • Loading branch information
yasserf committed Jun 25, 2019
1 parent aec59f0 commit 9d919c5
Show file tree
Hide file tree
Showing 112 changed files with 694 additions and 802 deletions.
70 changes: 70 additions & 0 deletions bin/deepstream-cluster.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// @ts-ignore
import { Command } from 'commander'
import * as cluster from 'cluster'
import { EVENT } from '../src/constants'

export const command = (program: Command) => {
program
.command('cluster')
.description('start a daemon for deepstream server')

.option('-c, --config [file]', 'configuration file, parent directory will be used as prefix for other config files')
.option('-l, --lib-dir [directory]', 'path where to lookup for plugins like connectors and logger')

.option('--cluster-size <amount>', 'the amount of nodes to run in the cluster')
.option('--host <host>', 'host for the HTTP/websocket server')
.option('--port <port>', 'port for the HTTP/websocket server')
.option('--http-host <host>', 'host for the HTTP server')
.option('--http-port <port>', 'port for the HTTP server')
.option('--disable-auth', 'Force deepstream to use "none" auth type')
.option('--disable-permissions', 'Force deepstream to use "none" permissions')
.option('--log-level <level>', 'Log messages with this level and above')
.action(action)
}

function action () {
// @ts-ignore
global.deepstreamCLI = this
const workers = new Set<any>()

const setupWorkerProcesses = () => {
console.log('Master cluster setting up ' + global.deepstreamCLI.clusterSize + ' deepstream nodes')

for (let i = 0; i < global.deepstreamCLI.clusterSize; i++) {
workers.add(cluster.fork())
}

// process is clustered on a core and process id is assigned
cluster.on('online', (worker) => {
console.log(`Deepstream ${worker.process.pid} is listening`)
})

// if any of the worker process dies then start a new one by simply forking another one
cluster.on('exit', (worker, code, signal) => {
console.log(`Deepstream ${worker.process.pid} died with code: ${code}, and signal: ${signal}`)
console.log('Starting a new worker')
workers.delete(worker)
workers.add(cluster.fork())
})
}

// if it is a master process then call setting up worker process
if (cluster.isMaster) {
setupWorkerProcesses()
} else {
const { Deepstream } = require('../src/deepstream.io')
try {
const ds = new Deepstream(null)
ds.on(EVENT.PLUGIN_INITIALIZATION_ERROR, () => process.exit(1))
ds.start()
process
.removeAllListeners('SIGINT').on('SIGINT', () => {
ds.on('stopped', () => process.exit(0))
ds.stop()
})
} catch (err) {
console.error(err.toString())
process.exit(1)
}
}
}
4 changes: 2 additions & 2 deletions bin/deepstream-hash.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import FileAuthenticationHandler from '../src/authentication/file-based-authentication-handler'
import * as jsYamlLoader from '../src/config/js-yaml-loader'
import { Command } from 'commander'
import { FileBasedAuthentication } from '../src/services/authentication/file/file-based-authentication'

export const hash = (program: Command) => {
program
Expand Down Expand Up @@ -34,7 +34,7 @@ function action (this: any, password: string) {
// Mock file loading since a users.yml file is not required
// jsYamlLoader.readAndParseFile = function () {}

const fileAuthenticationHandler = new FileAuthenticationHandler(config.auth.options)
const fileAuthenticationHandler = new FileBasedAuthentication(config.auth.options)
fileAuthenticationHandler.createHash(password, (err: Error, passwordHash: string) => {
if (err) {
console.error('Hash could not be created', err)
Expand Down
2 changes: 1 addition & 1 deletion scripts/resources/package-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ auth:

# Permissioning
permission:
# Only config or custom permissionHandler at the moment
# Only config or custom permission at the moment
type: config
options:
# Path to the permissionFile. Can be json, js or yml
Expand Down
32 changes: 0 additions & 32 deletions src/cluster/distributed-cluster-node.ts

This file was deleted.

24 changes: 12 additions & 12 deletions src/config/config-initialiser.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('config-initialiser', () => {
type: 'none'
} as any
const result = configInitialiser.initialise(config)
expect(result.services.authenticationHandler.description).to.equal('Open Authentication')
expect(result.services.authentication.description).to.equal('Open Authentication')
})

it('works for authtype: user', () => {
Expand All @@ -119,8 +119,8 @@ describe('config-initialiser', () => {
}
}
const result = configInitialiser.initialise(config)
expect(result.services.authenticationHandler.description).to.contain('file using')
expect(result.services.authenticationHandler.description).to.contain(path.resolve('src/test/config/users.json'))
expect(result.services.authentication.description).to.contain('file using')
expect(result.services.authentication.description).to.contain(path.resolve('src/test/config/users.json'))
})

it('works for authtype: http', () => {
Expand All @@ -139,7 +139,7 @@ describe('config-initialiser', () => {
}

const result = configInitialiser.initialise(config)
expect(result.services.authenticationHandler.description).to.equal('http webhook to http://some-url.com')
expect(result.services.authentication.description).to.equal('http webhook to http://some-url.com')
})

it('fails for missing auth sections', () => {
Expand All @@ -163,7 +163,7 @@ describe('config-initialiser', () => {
}

const result = configInitialiser.initialise(config)
expect(result.services.authenticationHandler.isReady).to.equal(true)
expect(result.services.authentication.isReady).to.equal(true)
})

it('tries to find a custom authentication handler from name', () => {
Expand Down Expand Up @@ -202,13 +202,13 @@ describe('config-initialiser', () => {
}

const result = configInitialiser.initialise(config)
expect(result.services.authenticationHandler.description).to.equal('Open Authentication')
expect(result.services.authentication.description).to.equal('Open Authentication')
delete global.deepstreamCLI
})
})

describe('creates the permissionHandler', () => {
it('creates the config permission handler', () => {
describe('creates the permission service', () => {
it('creates the config permission service', () => {
global.deepstreamConfDir = './src/test/config'
const config = defaultConfig.get()

Expand All @@ -219,8 +219,8 @@ describe('config-initialiser', () => {
}
}
const result = configInitialiser.initialise(config)
expect(result.services.permissionHandler.description).to.contain('valve permissions loaded from')
expect(result.services.permissionHandler.description).to.contain(path.resolve('./src/test/config/basic-permission-config.json'))
expect(result.services.permission.description).to.contain('valve permissions loaded from')
expect(result.services.permission.description).to.contain(path.resolve('./src/test/config/basic-permission-config.json'))
})

it('fails for invalid permission types', () => {
Expand Down Expand Up @@ -248,7 +248,7 @@ describe('config-initialiser', () => {
}

const result = configInitialiser.initialise(config)
expect(result.services.permissionHandler.isReady).to.equal(true)
expect(result.services.permission.isReady).to.equal(true)
})

it('tries to find a custom authentication handler from name', () => {
Expand Down Expand Up @@ -283,7 +283,7 @@ describe('config-initialiser', () => {
}

const result = configInitialiser.initialise(config)
expect(result.services.permissionHandler.description).to.equal('none')
expect(result.services.permission.description).to.equal('none')
delete global.deepstreamCLI
})
})
Expand Down
85 changes: 42 additions & 43 deletions src/config/config-initialiser.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,39 @@
import * as fs from 'fs'
import FileAuthenticationHandler from '../authentication/file-based-authentication-handler'
import OpenAuthenticationHandler from '../authentication/open-authentication-handler'
import HTTPAuthenticationHAndler from '../authentication/http-authentication-handler'
import { LOG_LEVEL } from '../constants'
import DefaultCache from '../default-plugins/local-cache'
import DefaultStorage from '../default-plugins/noop-storage'
import DefaultMonitoring from '../default-plugins/noop-monitoring'
import DefaultLogger from '../default-plugins/std-out-logger'
import HTTPConnectionEndpoint from '../message/http/connection-endpoint'
import UWSConnectionEndpoint from '../message/uws/connection-endpoint'
import ConfigPermissionHandler from '../permission/config-permission-handler'
import OpenPermissionHandler from '../permission/open-permission-handler'
import * as utils from '../utils/utils'
import * as fileUtils from './file-utils'
import { DeepstreamConfig, DeepstreamServices, ConnectionEndpoint, PluginConfig, Logger, Storage, StorageReadCallback, StorageWriteCallback, AuthenticationHandler, PermissionHandler } from '../types'
import { DeepstreamConfig, DeepstreamServices, ConnectionEndpoint, PluginConfig, Logger, Storage, StorageReadCallback, StorageWriteCallback, Authentication, Permission, LOG_LEVEL } from '../types'
import { JSONObject } from '../../binary-protocol/src/message-constants'
import { DistributedLockRegistry } from '../cluster/distributed-lock-registry'
import { DistributedClusterRegistry } from '../cluster/distributed-cluster-registry'
import { DistributedStateRegistryFactory } from '../cluster/distributed-state-registry-factory'
import { SingleClusterNode } from '../cluster/single-cluster-node'
import { DefaultSubscriptionRegistryFactory } from '../utils/default-subscription-registry-factory';
import { DistributedClusterRegistry } from '../services/cluster-registry/distributed-cluster-registry'
import { SingleClusterNode } from '../services/cluster-node/single-cluster-node'
import { DefaultSubscriptionRegistryFactory } from '../services/subscription-registry/default-subscription-registry-factory'
import { HTTPConnectionEndpoint } from '../connection-endpoint/http/connection-endpoint'
import { OpenAuthentication } from '../services/authentication/open/open-authentication'
import { ConfigPermission } from '../services/permission/valve/config-permission'
import { OpenPermission } from '../services/permission/open/open-permission'
import { UWSConnectionEndpoint } from '../connection-endpoint/uws/connection-endpoint'
import { FileBasedAuthentication } from '../services/authentication/file/file-based-authentication'
import { HttpAuthentication } from '../services/authentication/http/http-authentication'
import { NoopStorage } from '../services/storage/noop-storage'
import { LocalCache } from '../services/cache/local-cache'
import { StdOutLogger } from '../services/logger/std-out-logger'
import { LocalMonitoring } from '../services/monitoring/noop-monitoring'
import { DistributedLockRegistry } from '../services/lock/distributed-lock-registry'
import { DistributedStateRegistryFactory } from '../services/cluster-state/distributed-state-registry-factory'

let commandLineArguments: any

const customPlugins = new Map()

const defaultPlugins = new Map<string, any>([
['cache', DefaultCache],
['storage', DefaultStorage],
['logger', DefaultLogger],
['monitoring', DefaultMonitoring],
['message', SingleClusterNode],
const defaultPlugins = new Map<keyof DeepstreamServices, any>([
['cache', LocalCache],
['storage', NoopStorage],
['logger', StdOutLogger],
['monitoring', LocalMonitoring],
['locks', DistributedLockRegistry],
['cluster', DistributedClusterRegistry],
['states', DistributedStateRegistryFactory],
['subscriptions', DefaultSubscriptionRegistryFactory]
['subscriptions', DefaultSubscriptionRegistryFactory],
['clusterRegistry', DistributedClusterRegistry],
['clusterStates', DistributedStateRegistryFactory],
['clusterNode', SingleClusterNode]
])

/**
Expand All @@ -58,17 +57,17 @@ export const initialise = function (config: DeepstreamConfig): { config: Deepstr
services.logger = handleLogger(config, services)

services.subscriptions = new (resolvePluginClass(config.subscriptions, 'subscriptions'))(config.subscriptions.options, services, config)
services.message = new (resolvePluginClass(config.cluster.message, 'message'))(config.cluster.message.options, services, config)
services.storage = new (resolvePluginClass(config.storage, 'storage'))(config.storage.options, services, config)
services.cache = new (resolvePluginClass(config.cache, 'cache'))(config.cache.options, services, config)
services.monitoring = new (resolvePluginClass(config.monitoring, 'monitoring'))(config.monitoring.options, services, config)
services.authenticationHandler = handleAuthStrategy(config, services)
services.permissionHandler = handlePermissionStrategy(config, services)
services.authentication = handleAuthStrategy(config, services)
services.permission = handlePermissionStrategy(config, services)
services.connectionEndpoints = handleConnectionEndpoints(config, services)
services.locks = new (resolvePluginClass(config.cluster.locks, 'locks'))(config.cluster.locks.options, services, config)
services.cluster = new (resolvePluginClass(config.cluster.registry, 'cluster'))(config.cluster.registry.options, services, config)
services.states = new (resolvePluginClass(config.cluster.states, 'states'))(config.cluster.states.options, services, config)

services.locks = new (resolvePluginClass(config.locks, 'locks'))(config.locks.options, services, config)
services.clusterNode = new (resolvePluginClass(config.clusterNode, 'clusterNode'))(config.clusterNode.options, services, config)
services.clusterRegistry = new (resolvePluginClass(config.clusterRegistry, 'clusterRegistry'))(config.clusterRegistry.options, services, config)
services.clusterStates = new (resolvePluginClass(config.clusterStates, 'clusterStates'))(config.clusterStates.options, services, config)

if (services.cache.apiVersion !== 2) {
storageCompatability(services.cache)
}
Expand Down Expand Up @@ -125,7 +124,7 @@ function handleLogger (config: DeepstreamConfig, services: DeepstreamServices):
}
let LoggerClass
if (config.logger.type === 'default') {
LoggerClass = DefaultLogger
LoggerClass = StdOutLogger
} else {
LoggerClass = resolvePluginClass(config.logger, 'logger')
}
Expand Down Expand Up @@ -232,7 +231,7 @@ function handleConnectionEndpoints (config: DeepstreamConfig, services: any): Co
*
* CLI arguments will be considered.
*/
function resolvePluginClass (plugin: PluginConfig, type: string): any {
function resolvePluginClass (plugin: PluginConfig, type: any): any {
if (customPlugins.has(plugin.name)) {
return customPlugins.get(plugin.name)
}
Expand Down Expand Up @@ -268,13 +267,13 @@ function resolvePluginClass (plugin: PluginConfig, type: string): any {
*
* CLI arguments will be considered.
*/
function handleAuthStrategy (config: DeepstreamConfig, services: DeepstreamServices): AuthenticationHandler {
function handleAuthStrategy (config: DeepstreamConfig, services: DeepstreamServices): Authentication {
let AuthenticationHandlerClass

const authStrategies = {
none: OpenAuthenticationHandler,
file: FileAuthenticationHandler, // eslint-disable-line
http: HTTPAuthenticationHAndler, // eslint-disable-line
none: OpenAuthentication,
file: FileBasedAuthentication,
http: HttpAuthentication,
}

if (!config.auth) {
Expand Down Expand Up @@ -309,12 +308,12 @@ function handleAuthStrategy (config: DeepstreamConfig, services: DeepstreamServi
*
* CLI arguments will be considered.
*/
function handlePermissionStrategy (config: DeepstreamConfig, services: DeepstreamServices): PermissionHandler {
function handlePermissionStrategy (config: DeepstreamConfig, services: DeepstreamServices): Permission {
let PermissionHandlerClass

const permissionStrategies = {
config: ConfigPermissionHandler,
none: OpenPermissionHandler,
config: ConfigPermission,
none: OpenPermission
}

if (!config.permission) {
Expand Down
2 changes: 1 addition & 1 deletion src/config/config-validator.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as Ajv from 'ajv'
import { LOG_LEVEL } from '../constants'
import { LOG_LEVEL } from '../types'

const generalOpts = {
libDir: { type: ['string', 'null'] },
Expand Down
3 changes: 1 addition & 2 deletions src/config/js-yaml-loader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ describe.skip('js-yaml-loader', () => {

defaultYamlConfig = utils.merge(defaultYamlConfig, {
permission: { type: 'none', options: null },
permissionHandler: null,
authenticationHandler: null,
authentication: null,
plugins: null,
serverName: null,
logger: null
Expand Down
Loading

0 comments on commit 9d919c5

Please sign in to comment.