Skip to content

Commit

Permalink
- feat(faas-adapter+fc-adapter) supports mns topic message queue trig…
Browse files Browse the repository at this point in the history
…ger, which is enabled by the mode attribute `mode: [mns-topic] `. The sample configuration is as follows:

```yaml
mode: [ mns-topic ]
malagu:
  faas-adapter:
    trigger:
      invocationRole: acs:ram::123456:role/app-mns-role
      sourceARN: acs:mns:cn-hangzhou:123456:/topics/test
      triggerConfig:
        topicName: test
        # filterTag: foo
```
- fix(web) replace `alscontext` with `cls-hooked` package to fix `undefined` error.
- feat(faas-adapter) adds the `FaaSEventListener` interface, which can be used to monitor the event when the function is triggered to execute.
  • Loading branch information
muxiangqiu committed Jul 7, 2021
1 parent 5d1c06b commit c48d2a7
Show file tree
Hide file tree
Showing 24 changed files with 221 additions and 18 deletions.
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,38 @@

## v1.24.1

- feat(faas-adapter+fc-adapter) supports mns topic message queue trigger, which is enabled by the mode attribute `mode: [mns-topic] `. The sample configuration is as follows:
```yaml
mode: [ mns-topic ]
malagu:
faas-adapter:
trigger:
invocationRole: acs:ram::123456:role/app-mns-role
sourceARN: acs:mns:cn-hangzhou:123456:/topics/test
triggerConfig:
topicName: test
# filterTag: foo
```
- fix(web) replace `alscontext` with `cls-hooked` package to fix `undefined` error.
- feat(faas-adapter) adds the `FaaSEventListener` interface, which can be used to monitor the event when the function is triggered to execute.

- feat(faas-adapter+fc-adapter) 支持 mns topic 消息队列触发器,通过模式属性 `mode: [ mns-topic ] ` 开启。示例配置如下:
```yaml
mode: [ mns-topic ]
malagu:
faas-adapter:
trigger:
invocationRole: acs:ram::123456:role/app-mns-role
sourceARN: acs:mns:cn-hangzhou:123456:/topics/test
triggerConfig:
topicName: test
# filterTag: foo
```
- fix(web) 使用 `cls-hooked` 包替换 `alscontext`,修复 `undefined` 错误。
- feat(faas-adapter) 添加 `FaaSEventListener` 接口,可用于监听函数被触发执行的事件。

## v1.24.1

- fix(cli) 修复 monorepo 应用模板
- refactor(serve-static) 重构 serve-static 组件(#24)

Expand Down
10 changes: 10 additions & 0 deletions packages/faas-adapter/src/node/event/event-listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Component } from '@malagu/core';
import { FaaSEventListener } from './event-protocol';

@Component(FaaSEventListener)
export class NoOpEventListener implements FaaSEventListener<{}> {
async onTrigger(event: {}): Promise<void> {
// NoOp
}

}
5 changes: 5 additions & 0 deletions packages/faas-adapter/src/node/event/event-protocol.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export const FaaSEventListener = Symbol('FaaSEventListener');

export interface FaaSEventListener<T> {
onTrigger(event: T): Promise<void>
}
2 changes: 2 additions & 0 deletions packages/faas-adapter/src/node/event/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './event-listener';
export * from './event-protocol';
1 change: 1 addition & 0 deletions packages/faas-adapter/src/node/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
export * from './credentials-provider';
export * from './regin-provider';
export * from './timer';
export * from './event';
export * from './factory';
2 changes: 1 addition & 1 deletion packages/faas-adapter/src/node/timer/clock.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Component } from '@malagu/core';
import { Emitter, Event } from './events';
import { Clock } from './timer-protocal';
import { Clock } from './timer-protocol';

@Component(Clock)
export class ClockImpl implements Clock {
Expand Down
2 changes: 1 addition & 1 deletion packages/faas-adapter/src/node/timer/cron-job.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Disposable } from '@malagu/core';
import { CronJob, CronTime } from 'cron';
import { CronJobOptions } from '@malagu/schedule';
import { Clock } from './timer-protocal';
import { Clock } from './timer-protocol';

export class FaaSCronJob extends CronJob {

Expand Down
2 changes: 1 addition & 1 deletion packages/faas-adapter/src/node/timer/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './clock';
export * from './cron-job';
export * from './timer-protocal';
export * from './timer-protocol';
20 changes: 20 additions & 0 deletions packages/fc-adapter/malagu-mns-topic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
backend:
entry: lib/node/event-application-entry
malagu:
faas-adapter:
function:
runtime: nodejs12
trigger:
name: ${malagu['faas-adapter'].function.name}-${malagu['faas-adapter'].alias.name}-mns_topic
qualifier: ${malagu['faas-adapter'].alias.name}
functionName: ${malagu['faas-adapter'].function.name}
serviceName: ${malagu['faas-adapter'].service.name}
triggerType: mns_topic
# invocationRole: acs:ram::123456:role/app-mns-role
# sourceARN: acs:mns:cn-hangzhou:123456:/topics/test
triggerConfig:
# topicName: test-topic
notifyContentFormat: JSON
notifyStrategy: BACKOFF_RETRY
# filterTag: foo

2 changes: 1 addition & 1 deletion packages/fc-adapter/malagu-remote.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
mode: "${'api-gateway' in currentMode ? 'api-gateway' : 'sample-http' in currentMode ? 'sample-http' : 'timer' in currentMode ? 'timer' : 'http'}"
mode: "${'api-gateway' in currentMode ? 'api-gateway' : 'sample-http' in currentMode ? 'sample-http' : 'timer' in currentMode ? 'timer' : 'mns-topic' in currentMode ? 'mns-topic' : 'http'}"
malagu:
faas-adapter:
secure: true
Expand Down
2 changes: 1 addition & 1 deletion packages/fc-adapter/src/hooks/build.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { FaaSAdapterUtils } from '@malagu/faas-adapter/lib/hooks';
export default async (context: BuildContext) => {
const { pkg, cfg } = context;
const adapterConfig = FaaSAdapterUtils.getConfiguration<any>(cfg);
if (adapterConfig.type === 'custom') {
if (adapterConfig.function?.runtime === 'custom') {
const destDir = join(getHomePath(pkg), 'bootstrap');
const bootstrap = adapterConfig.function.bootstrap;
delete adapterConfig.function.bootstrap;
Expand Down
11 changes: 10 additions & 1 deletion packages/fc-adapter/src/hooks/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,16 @@ async function createOrUpdateTrigger(trigger: any) {
try {
await fcClient.getTrigger(serviceName, functionName, name);
await spinner(`Update ${name} trigger`, async () => {
await fcClient.updateTrigger(serviceName, functionName, name, opts);
try {
await fcClient.updateTrigger(serviceName, functionName, name, opts);
} catch (error) {
if (error.message?.includes('Updating trigger is not supported yet')) {
await fcClient.deleteTrigger(serviceName, functionName, name);
await fcClient.createTrigger(serviceName, functionName, opts);
return;
}
throw error;
}
});

} catch (ex) {
Expand Down
5 changes: 5 additions & 0 deletions packages/fc-adapter/src/node/api-gateway-application-entry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ import { Application } from '@malagu/core/lib/common/application/application-pro
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { Dispatcher } from '@malagu/web/lib/node/dispatcher/dispatcher-protocol';
import { Context, HttpContext } from '@malagu/web/lib/node/context';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';
import * as express from 'express';
import * as proxy from '@webserverless/fc-express';

let listeners: FaaSEventListener<any>[];

const app = express();
app.use(express.json());
app.use(express.raw());
Expand All @@ -22,6 +25,7 @@ async function start() {
const httpContext = new HttpContext(req, res);
Context.run(() => dispatcher.dispatch(httpContext));
});
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);

return c.get<Application>(Application).start();
}
Expand All @@ -39,6 +43,7 @@ export async function handler(event: string, context: any, callback: any) {
process.env.IGNORE_CONTEXT_HEADER = 'true';
try {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
} catch (error) {
callback(undefined, error);
}
Expand Down
32 changes: 32 additions & 0 deletions packages/fc-adapter/src/node/event-application-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { container } from '@malagu/core/lib/common/container/dynamic-container';
import { Application } from '@malagu/core/lib/common/application/application-protocol';
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';

let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
await c.get<Application>(Application).start();
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);
}

const startPromise = start();

export async function handler(event: string, context: any, callback: any) {
process.env.ALIBABA_ACCOUNT_ID = context.accountId;
process.env.ALIBABA_ACCESS_KEY_ID = context.credentials?.accessKeyId;
process.env.ALIBABA_ACCESS_KEY_SECRET = context.credentials?.accessKeySecret;
process.env.ALIBABA_SECURITY_TOKEN = context.credentials?.securityToken;
process.env.ALIBABA_REQUEST_ID = context.requestId;
process.env.ALIBABA_REGION = context.region;
try {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
callback();
} catch (error) {
callback(undefined, error);
}

}
6 changes: 5 additions & 1 deletion packages/fc-adapter/src/node/timer-application-entry.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { container } from '@malagu/core/lib/common/container/dynamic-container';
import { Application } from '@malagu/core/lib/common/application/application-protocol';
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { Clock } from '@malagu/faas-adapter/lib/node/timer/timer-protocal';
import { Clock } from '@malagu/faas-adapter/lib/node/timer/timer-protocol';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';

let clock: Clock;
let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
await c.get<Application>(Application).start();
clock = c.get<Clock>(Clock);
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);
}

const startPromise = start();
Expand All @@ -23,6 +26,7 @@ export async function handler(event: string, context: any, callback: any) {
process.env.ALIBABA_REGION = context.region;
try {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
await clock.tick();
callback();
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Application } from '@malagu/core/lib/common/application/application-pro
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { Dispatcher } from '@malagu/web/lib/node/dispatcher/dispatcher-protocol';
import { Context, HttpContext } from '@malagu/web/lib/node/context';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';
import * as express from 'express';
const { createServer, proxy } = require('@vendia/serverless-express');

Expand All @@ -14,6 +15,8 @@ app.use(express.urlencoded({ extended: true }));

const server = createServer(app);

let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
Expand All @@ -22,6 +25,7 @@ async function start() {
const httpContext = new HttpContext(req, res);
Context.run(() => dispatcher.dispatch(httpContext));
});
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);

return c.get<Application>(Application).start();
}
Expand All @@ -30,5 +34,6 @@ const startPromise = start();

export async function handler(event: string, context: any) {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
return proxy(server, event, context, 'PROMISE').promise;
}
20 changes: 20 additions & 0 deletions packages/lambda-adapter/src/node/event-application-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { container } from '@malagu/core/lib/common/container/dynamic-container';
import { Application } from '@malagu/core/lib/common/application/application-protocol';
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';

let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
await c.get<Application>(Application).start();
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);
}

const startPromise = start();

export async function handler(event: string, context: any, callback: any) {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
}
6 changes: 5 additions & 1 deletion packages/lambda-adapter/src/node/timer-application-entry.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import { container } from '@malagu/core/lib/common/container/dynamic-container';
import { Application } from '@malagu/core/lib/common/application/application-protocol';
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { Clock } from '@malagu/faas-adapter/lib/node/timer/timer-protocal';
import { Clock } from '@malagu/faas-adapter/lib/node/timer/timer-protocol';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';

let clock: Clock;
let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
await c.get<Application>(Application).start();
clock = c.get<Clock>(Clock);
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);
}

const startPromise = start();

export async function handler(event: string, context: any) {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
return clock.tick();
}

22 changes: 22 additions & 0 deletions packages/scf-adapter/src/node/event-application-entry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { container } from '@malagu/core/lib/common/container/dynamic-container';
import { Application } from '@malagu/core/lib/common/application/application-protocol';
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';

let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
await c.get<Application>(Application).start();
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);
}

const startPromise = start();

export async function handler(event: string, context: any, callback: any) {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
context.callbackWaitsForEmptyEventLoop = false;

}
6 changes: 5 additions & 1 deletion packages/scf-adapter/src/node/timer-application-entry.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import { container } from '@malagu/core/lib/common/container/dynamic-container';
import { Application } from '@malagu/core/lib/common/application/application-protocol';
import { ContainerProvider } from '@malagu/core/lib/common/container/container-provider';
import { Clock } from '@malagu/faas-adapter/lib/node/timer/timer-protocal';
import { Clock } from '@malagu/faas-adapter/lib/node/timer/timer-protocol';
import { FaaSEventListener } from '@malagu/faas-adapter/lib/node/event/event-protocol';

let clock: Clock;
let listeners: FaaSEventListener<any>[];

async function start() {
const c = await container;
ContainerProvider.set(c);
await c.get<Application>(Application).start();
clock = c.get<Clock>(Clock);
listeners = c.getAll<FaaSEventListener<any>>(FaaSEventListener);
}

const startPromise = start();

export async function handler(event: string, context: any) {
await startPromise;
await Promise.all(listeners.map(l => l.onTrigger(event)));
await clock.tick();
context.callbackWaitsForEmptyEventLoop = false;
}
Expand Down
2 changes: 1 addition & 1 deletion packages/web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
"dependencies": {
"@malagu/core": "^1.24.1",
"@types/express": "^4.17.9",
"alscontext": "^0.0.3",
"axios": "^0.21.1",
"cls-hooked": "^4.2.2",
"cookies": "^0.8.0",
"cors": "^2.8.5",
"crc": "^3.8.0",
Expand Down
6 changes: 3 additions & 3 deletions packages/web/src/node/context.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import ALS from 'alscontext';
const createNamespace = require('cls-hooked').createNamespace;
import { Session } from './session/session-protocol';
import { Cookies } from './cookies';
import { Request, Response } from './http/http-protocol';
Expand All @@ -15,7 +15,7 @@ export const CURRENT_TENANT_REQUEST_KEY = 'CurrentTenantRequest';

const appAttrs = new Map<string, any>();

const store = new ALS();
const store = createNamespace('3f45efdf-383c-4152-877b-1e98a410e0da');

export interface Context {

Expand All @@ -30,7 +30,7 @@ export interface Context {
export namespace Context {

export function run(fn: (...args: any[]) => void) {
store.run({}, fn);
store.run(() => fn());
}

export function setCurrent(context: Context) {
Expand Down

0 comments on commit c48d2a7

Please sign in to comment.