-
Notifications
You must be signed in to change notification settings - Fork 4.2k
/
wasm.go
202 lines (172 loc) · 7.54 KB
/
wasm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// wasm is a simple example that loads and executes a wasm file function.
// greet.wasm, Cargo.toml and greet.rs were copied from the example provided by the wazero library:
// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
//
// New Concepts:
// 1. Load a wasm file compiled from: cargo build --release --target wasm32-unknown-unknown
// 2. Execute a wasm function within a DoFn
package main
import (
"context"
_ "embed"
"flag"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
)
const (
wasmFunctionName = "greeting"
wasmAllocateFunctionName = "allocate"
wasmDeallocateFunctionName = "deallocate"
)
//go:embed greet.wasm
var greetWasm []byte
var (
output = flag.String("output", "", "Output file (required).")
)
func init() {
// register.DoFnXxY registers a struct DoFn so that it can be correctly
// serialized and does some optimization to avoid runtime reflection. Since
// embeddedWasmFn's ProcessElement func has 2 inputs (context.Context) and 2 outputs (string, error),
// we use register.DoFn2x2 and provide its input and output types as its constraints.
// Struct DoFns must be registered for a pipeline to run.
register.DoFn2x2[context.Context, string, string, error](&embeddedWasmFn{})
}
func preRun() error {
if *output == "" {
return fmt.Errorf("--output is required")
}
return nil
}
func main() {
flag.Parse()
beam.Init()
ctx := context.Background()
if err := preRun(); err != nil {
log.Fatal(ctx, err)
}
if err := run(ctx); err != nil {
log.Fatal(ctx, err)
}
}
func run(ctx context.Context) error {
p, s := beam.NewPipelineWithRoot()
in := beam.Create(s, "Ada", "Lovelace", "World", "Beam", "Senior López", "Random unicorn emoji 🦄")
out := beam.ParDo(s, &embeddedWasmFn{}, in)
textio.Write(s, *output, out)
if err := beamx.Run(ctx, p); err != nil {
return fmt.Errorf("failed to run pipeline: %v", err)
}
return nil
}
// Concept #2 wrap wasm function execution within a DoFn.
// wasmFn wraps a DoFn to execute a Rust compiled wasm function
type embeddedWasmFn struct {
r wazero.Runtime
mod api.Module
greeting, allocate, deallocate api.Function
}
// Setup loads and initializes the embedded wasm functions
// Concept #1: Load a compiled wasm file []byte content and function.
// This example is derived from
// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
func (fn *embeddedWasmFn) Setup(ctx context.Context) error {
// Create a new WebAssembly Runtime.
// Typically, a defer r.Close() would be called subsequently after. Yet, we need to keep this in memory
// throughout the DoFn lifecycle after which we invoke r.Close(); see Teardown below.
fn.r = wazero.NewRuntime(ctx)
// Instantiate a Go-defined module named "env" that exports a function to
// log to the console.
_, err := fn.r.NewHostModuleBuilder("env").
NewFunctionBuilder().WithFunc(logString).Export("log").
Instantiate(ctx)
if err != nil {
return fmt.Errorf("failed to instantiate host module: %w", err)
}
// Instantiate a WebAssembly module that imports the "log" function defined
// in "env" and exports "memory" and functions we'll use in this example.
fn.mod, err = fn.r.Instantiate(ctx, greetWasm)
if err != nil {
return fmt.Errorf("failed to instantiate wasm module: %v", err)
}
// Get references to WebAssembly functions we'll use in this example.
fn.greeting = fn.mod.ExportedFunction(wasmFunctionName)
fn.allocate = fn.mod.ExportedFunction(wasmAllocateFunctionName)
fn.deallocate = fn.mod.ExportedFunction(wasmDeallocateFunctionName)
return nil
}
// ProcessElement processes a string calling a wasm function written in Rust
// This example is derived from
// https://github.com/tetratelabs/wazero/blob/v1.0.0-pre.3/examples/allocation/rust/greet.go
func (fn *embeddedWasmFn) ProcessElement(ctx context.Context, s string) (string, error) {
// We need to compute the size of s to use Rust's memory allocator.
size := uint64(len(s))
// Instead of an arbitrary memory offset, use Rust's allocator. Notice
// there is nothing string-specific in this allocation function. The same
// function could be used to pass binary serialized data to Wasm.
results, err := fn.allocate.Call(ctx, size)
if err != nil {
return "", fmt.Errorf("error calling allocate: %w", err)
}
ptr := results[0]
// This pointer was allocated by Rust, but owned by Go, So, we have to
// deallocate it when finished; defer means that this statement will be called when the function exits
defer fn.deallocate.Call(ctx, ptr, size)
// The pointer is a linear memory offset, which is where we write the value of the DoFn's input element s.
if !fn.mod.Memory().Write(uint32(ptr), []byte(s)) {
return "", fmt.Errorf("Memory.Write(%d, %d) out of range of memory size %d",
ptr, size, fn.mod.Memory().Size())
}
// Finally, we get the greeting message "Hello" concatenated to the DoFn's input element s.
// This shows how to read-back something allocated by Rust.
ptrSize, err := fn.greeting.Call(ctx, ptr, size)
resultPtr := uint32(ptrSize[0] >> 32)
resultSize := uint32(ptrSize[0])
// This pointer was allocated by Rust, but owned by Go, So, we have to
// deallocate it when finished; again defer flags Go to execute this statement when the function exits
defer fn.deallocate.Call(ctx, uint64(resultPtr), uint64(resultSize))
// The pointer is a linear memory offset, which is where we wrote the results of the string concatenation.
bytes, ok := fn.mod.Memory().Read(resultPtr, resultSize)
if !ok {
return "", fmt.Errorf("Memory.Read(%d, %d) out of range of memory size %d",
resultPtr, resultSize, fn.mod.Memory().Size())
}
// bytes contains our final result that we emit into the output PCollection
return string(bytes), nil
}
// Teardown the wazero.Runtime during the DoFn teardown lifecycle
func (fn *embeddedWasmFn) Teardown(ctx context.Context) error {
// Typically we would proceed wazero.Runtime's Close method with Go's defer keyword, just after instantiation.
// However, we need to keep the property in memory until the end of the DoFn lifecycle
if err := fn.r.Close(ctx); err != nil {
return fmt.Errorf("failed to close runtime: %w", err)
}
return nil
}
// logString is an exported function to the wasm module that logs to console output.
func logString(ctx context.Context, m api.Module, offset, byteCount uint32) {
buf, ok := m.Memory().Read(offset, byteCount)
if !ok {
log.Fatalf(ctx, "Memory.Read(%d, %d) out of range", offset, byteCount)
}
log.Info(ctx, string(buf))
}