Skip to content

Commit

Permalink
rename WasmFilter to WasmHost
Browse files Browse the repository at this point in the history
  • Loading branch information
localvar committed Jul 15, 2021
1 parent 0463813 commit c96d0ff
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 83 deletions.
4 changes: 2 additions & 2 deletions Makefile
Expand Up @@ -28,8 +28,8 @@ ENABLE_CGO= CGO_ENABLED=0
# Check Go build tags, the tags are from command line of make
ifdef GOTAGS
GO_BUILD_TAGS= -tags ${GOTAGS}
# Must enable Cgo when wasmfilter is included
ifeq ($(findstring wasmfilter,${GOTAGS}), wasmfilter)
# Must enable Cgo when wasmhost is included
ifeq ($(findstring wasmhost,${GOTAGS}), wasmhost)
ENABLE_CGO= CGO_ENABLED=1
endif
endif
Expand Down
14 changes: 7 additions & 7 deletions doc/filters.md
Expand Up @@ -43,7 +43,7 @@
- [Validator](#validator)
- [Configuration](#configuration-13)
- [Results](#results-13)
- [WasmFilter](#wasmfilter)
- [WasmHost](#wasmhost)
- [Configuration](#configuration-14)
- [Results](#results-14)
- [Common Types](#common-types)
Expand Down Expand Up @@ -601,13 +601,13 @@ oauth2:
| ------- | ----------------------------------- |
| invalid | The request doesn't pass validation |

## WasmFilter
## WasmHost

The WasmFilter executes user-developed [WebAssembly](https://webassembly.org/) code. Below is an example configuration that loads wasm code from a file.
The WasmHost filter implements a host environment for user-developed [WebAssembly](https://webassembly.org/) code. Below is an example configuration that loads wasm code from a file.

```yaml
name: wasm-filter-example
kind: WasmFilter
name: wasm-host-example
kind: WasmHost
maxConcurrency: 2
code: /home/megaease/wasm/hello.wasm
timeout: 200ms
Expand All @@ -616,13 +616,13 @@ timeout: 200ms
Note: this filter is disabled in the default build of `Easegress`, it can be enabled by:

```bash
$ make GOTAGS=wasmfilter
$ make GOTAGS=wasmhost
```

or

```bash
$ go build -tags=wasmfilter
$ go build -tags=wasmhost
```

### Configuration
Expand Down
2 changes: 1 addition & 1 deletion doc/reference.md
Expand Up @@ -13,4 +13,4 @@
* [Retryer](./filters.md#Retryer)
* [ResponseAdaptor](./filters.md#ResponseAdaptor)
* [Validator](./filters.md#Validator)
* [WasmFilter](./filters.md#WasmFilter)
* [WasmHost](./filters.md#WasmHost)
2 changes: 1 addition & 1 deletion pkg/api/wasm.go
@@ -1,4 +1,4 @@
// +build wasmfilter
// +build wasmhost

/*
* Copyright (c) 2017, MegaEase
Expand Down
2 changes: 1 addition & 1 deletion pkg/filter/wasm/doc.go → pkg/filter/wasmhost/doc.go
Expand Up @@ -15,4 +15,4 @@
* limitations under the License.
*/

package wasm
package wasmhost
@@ -1,4 +1,4 @@
// +build wasmfilter
// +build wasmhost

/*
* Copyright (c) 2017, MegaEase
Expand All @@ -17,7 +17,7 @@
* limitations under the License.
*/

package wasm
package wasmhost

import (
"bufio"
Expand Down
4 changes: 2 additions & 2 deletions pkg/filter/wasm/vm.go → pkg/filter/wasmhost/vm.go
@@ -1,4 +1,4 @@
// +build wasmfilter
// +build wasmhost

/*
* Copyright (c) 2017, MegaEase
Expand All @@ -17,7 +17,7 @@
* limitations under the License.
*/

package wasm
package wasmhost

import (
"fmt"
Expand Down
132 changes: 66 additions & 66 deletions pkg/filter/wasm/wasm.go → pkg/filter/wasmhost/wasmhost.go
@@ -1,4 +1,4 @@
// +build wasmfilter
/// +build wasmhost

/*
* Copyright (c) 2017, MegaEase
Expand All @@ -17,7 +17,7 @@
* limitations under the License.
*/

package wasm
package wasmhost

import (
"bytes"
Expand All @@ -38,8 +38,8 @@ import (
)

const (
// Kind is the kind of WasmFilter.
Kind = "WasmFilter"
// Kind is the kind of WasmHost.
Kind = "WasmHost"
maxWasmResult = 9
)

Expand All @@ -60,7 +60,7 @@ func init() {
for i := int32(1); i <= maxWasmResult; i++ {
results = append(results, wasmResultToFilterResult(i))
}
httppipeline.Register(&WasmFilter{})
httppipeline.Register(&WasmHost{})
}

type (
Expand All @@ -71,45 +71,45 @@ type (
timeout time.Duration
}

WasmFilter struct {
WasmHost struct {
pipeSpec *httppipeline.FilterSpec
spec *Spec

code []byte
vmPool atomic.Value
chStop chan struct{}

totalRequest int64
wasmError int64
numOfRequest int64
numOfWasmError int64
}

Status struct {
Health string `yaml:"health"`
TotalRequest int64 `yaml:"totalRequest"`
WasmError int64 `yaml:"wasmError"`
Health string `yaml:"health"`
NumOfRequest int64 `yaml:"numOfRequest"`
NumOfWasmError int64 `yaml:"numOfWasmError"`
}
)

// Kind returns the kind of WasmFilter.
func (f *WasmFilter) Kind() string {
// Kind returns the kind of WasmHost.
func (wh *WasmHost) Kind() string {
return Kind
}

// DefaultSpec returns the default spec of WasmFilter.
func (f *WasmFilter) DefaultSpec() interface{} {
// DefaultSpec returns the default spec of WasmHost.
func (wh *WasmHost) DefaultSpec() interface{} {
return &Spec{
MaxConcurrency: 10,
Timeout: "100ms",
}
}

// Description returns the description of WasmFilter
func (f *WasmFilter) Description() string {
return "WasmFilter implements a filter which runs web assembly"
// Description returns the description of WasmHost
func (wh *WasmHost) Description() string {
return "WasmHost implements a host environment for WebAssembly"
}

// Results returns the results of WasmFilter.
func (f *WasmFilter) Results() []string {
// Results returns the results of WasmHost.
func (wh *WasmHost) Results() []string {
return results
}

Expand All @@ -135,47 +135,47 @@ func isURL(str string) bool {
return false
}

func (f *WasmFilter) readWasmCode() ([]byte, error) {
if isURL(f.spec.Code) {
return readWasmCodeFromURL(f.spec.Code)
func (wh *WasmHost) readWasmCode() ([]byte, error) {
if isURL(wh.spec.Code) {
return readWasmCodeFromURL(wh.spec.Code)
}
if _, e := os.Stat(f.spec.Code); e == nil {
return os.ReadFile(f.spec.Code)
if _, e := os.Stat(wh.spec.Code); e == nil {
return os.ReadFile(wh.spec.Code)
}
return base64.StdEncoding.DecodeString(f.spec.Code)
return base64.StdEncoding.DecodeString(wh.spec.Code)
}

func (f *WasmFilter) loadWasmCode() error {
code, e := f.readWasmCode()
func (wh *WasmHost) loadWasmCode() error {
code, e := wh.readWasmCode()
if e != nil {
logger.Errorf("failed to load wasm code: %v", e)
return e
}

if len(f.code) > 0 && bytes.Equal(f.code, code) {
if len(wh.code) > 0 && bytes.Equal(wh.code, code) {
return nil
}

p, e := NewWasmVMPool(f.spec.MaxConcurrency, code)
p, e := NewWasmVMPool(wh.spec.MaxConcurrency, code)
if e != nil {
logger.Errorf("failed to create wasm VM pool: %v", e)
return e
}
f.code = code
wh.code = code

f.vmPool.Store(p)
wh.vmPool.Store(p)
return nil
}

func (f *WasmFilter) watchWasmCode() {
func (wh *WasmHost) watchWasmCode() {
var (
chWasm <-chan *string
syncer *cluster.Syncer
err error
)

for {
c := f.pipeSpec.Super().Cluster()
c := wh.pipeSpec.Super().Cluster()
syncer, err = c.Syncer(time.Minute)
if err == nil {
chWasm, err = syncer.Sync(c.Layout().WasmCodeEvent())
Expand All @@ -186,60 +186,60 @@ func (f *WasmFilter) watchWasmCode() {
logger.Errorf("failed to watch wasm code event: %v", err)
select {
case <-time.After(10 * time.Second):
case <-f.chStop:
case <-wh.chStop:
return
}
}

for {
select {
case <-chWasm:
err = f.loadWasmCode()
err = wh.loadWasmCode()

case <-time.After(30 * time.Second):
if err != nil || len(f.code) == 0 {
err = f.loadWasmCode()
if err != nil || len(wh.code) == 0 {
err = wh.loadWasmCode()
}

case <-f.chStop:
case <-wh.chStop:
return
}
}
}

func (f *WasmFilter) reload(pipeSpec *httppipeline.FilterSpec) {
f.pipeSpec = pipeSpec
f.spec = pipeSpec.FilterSpec().(*Spec)
func (wh *WasmHost) reload(pipeSpec *httppipeline.FilterSpec) {
wh.pipeSpec = pipeSpec
wh.spec = pipeSpec.FilterSpec().(*Spec)

f.spec.timeout, _ = time.ParseDuration(f.spec.Timeout)
f.chStop = make(chan struct{})
wh.spec.timeout, _ = time.ParseDuration(wh.spec.Timeout)
wh.chStop = make(chan struct{})

f.loadWasmCode()
go f.watchWasmCode()
wh.loadWasmCode()
go wh.watchWasmCode()
}

// Init initializes WasmFilter.
func (f *WasmFilter) Init(pipeSpec *httppipeline.FilterSpec) {
f.reload(pipeSpec)
// Init initializes WasmHost.
func (wh *WasmHost) Init(pipeSpec *httppipeline.FilterSpec) {
wh.reload(pipeSpec)
}

// Inherit inherits previous generation of WasmFilter.
func (f *WasmFilter) Inherit(pipeSpec *httppipeline.FilterSpec, previousGeneration httppipeline.Filter) {
// Inherit inherits previous generation of WasmHost.
func (wh *WasmHost) Inherit(pipeSpec *httppipeline.FilterSpec, previousGeneration httppipeline.Filter) {
previousGeneration.Close()
f.reload(pipeSpec)
wh.reload(pipeSpec)
}

// Handle handles HTTP request
func (f *WasmFilter) Handle(ctx context.HTTPContext) string {
result := f.handle(ctx)
func (wh *WasmHost) Handle(ctx context.HTTPContext) string {
result := wh.handle(ctx)
return ctx.CallNextHandler(result)
}

func (f *WasmFilter) handle(ctx context.HTTPContext) (result string) {
func (wh *WasmHost) handle(ctx context.HTTPContext) (result string) {
// we must save the pool to a local variable for later use as it will be
// replaced when updating the wasm code
var pool *WasmVMPool
if p := f.vmPool.Load(); p == nil {
if p := wh.vmPool.Load(); p == nil {
ctx.AddTag("wasm VM pool is not initialized")
return resultOutOfVM
} else {
Expand All @@ -253,7 +253,7 @@ func (f *WasmFilter) handle(ctx context.HTTPContext) (result string) {
return resultOutOfVM
}
vm.ctx = ctx
atomic.AddInt64(&f.totalRequest, 1)
atomic.AddInt64(&wh.numOfRequest, 1)

var wg sync.WaitGroup
chCancelInterrupt := make(chan struct{})
Expand All @@ -266,7 +266,7 @@ func (f *WasmFilter) handle(ctx context.HTTPContext) (result string) {
if e := recover(); e != nil {
logger.Errorf("recovered from wasm error: %v", e)
result = resultWasmError
atomic.AddInt64(&f.wasmError, 1)
atomic.AddInt64(&wh.numOfWasmError, 1)
vm = nil
}

Expand All @@ -278,7 +278,7 @@ func (f *WasmFilter) handle(ctx context.HTTPContext) (result string) {
go func() {
defer wg.Done()

timer := time.NewTimer(f.spec.timeout)
timer := time.NewTimer(wh.spec.timeout)

select {
case <-chCancelInterrupt:
Expand Down Expand Up @@ -308,21 +308,21 @@ func (f *WasmFilter) handle(ctx context.HTTPContext) (result string) {
}

// Status returns Status genreated by the filter.
func (f *WasmFilter) Status() interface{} {
p := f.vmPool.Load()
func (wh *WasmHost) Status() interface{} {
p := wh.vmPool.Load()
s := &Status{}
if p == nil {
s.Health = "VM pool is not initialized"
} else {
s.Health = "ready"
}

s.TotalRequest = atomic.LoadInt64(&f.totalRequest)
s.WasmError = atomic.LoadInt64(&f.wasmError)
s.NumOfRequest = atomic.LoadInt64(&wh.numOfRequest)
s.NumOfWasmError = atomic.LoadInt64(&wh.numOfWasmError)
return s
}

// Close closes WasmFilter.
func (f *WasmFilter) Close() {
close(f.chStop)
// Close closes WasmHost.
func (wh *WasmHost) Close() {
close(wh.chStop)
}

0 comments on commit c96d0ff

Please sign in to comment.