Skip to content

Commit

Permalink
Merge pull request #7 from golioth/feat/observe-events
Browse files Browse the repository at this point in the history
feat: support handling observe messages
  • Loading branch information
hasheddan committed Jul 20, 2023
2 parents 22b2a45 + 7408faa commit 498b585
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 41 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ endif
# Targets

build: setup
@xk6 build --with github.com/golioth/xk6-coap=.
@xk6 build --with github.com/golioth/xk6-coap=. --with github.com/grafana/xk6-timers
86 changes: 54 additions & 32 deletions coap/coap.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/dop251/goja"
"github.com/mstoykov/k6-taskqueue-lib/taskqueue"
piondtls "github.com/pion/dtls/v2"
"github.com/plgd-dev/go-coap/v3/dtls"
"github.com/plgd-dev/go-coap/v3/message"
Expand All @@ -22,8 +23,8 @@ const (
defaultPSKIDEnv = "COAP_PSK_ID"
)

// Response is a CoAP response message.
type Response struct {
// Message is a CoAP message.
type Message struct {
Code string
Body []byte
}
Expand Down Expand Up @@ -89,6 +90,7 @@ func (c *CoAP) client(cc goja.ConstructorCall) *goja.Object {

client := &client{
vu: c.vu,
tq: taskqueue.New(c.vu.RegisterCallback),
conn: conn,
obj: rt.NewObject(),
}
Expand Down Expand Up @@ -117,119 +119,139 @@ func (c *CoAP) client(cc goja.ConstructorCall) *goja.Object {
// client is a CoAP client with a DTLS connection.
type client struct {
vu modules.VU
tq *taskqueue.TaskQueue
conn *udp.Conn
obj *goja.Object
}

// Get sends a GET message to the specified path.
func (c *client) Get(path string, timeout int) Response {
func (c *client) Get(path string, timeout int) Message {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
resp, err := c.conn.Get(ctx, path)
msg, err := c.conn.Get(ctx, path)
if err != nil {
common.Throw(c.vu.Runtime(), err)
}
var b []byte
if body := resp.Body(); body != nil {
if body := msg.Body(); body != nil {
if b, err = io.ReadAll(body); err != nil {
common.Throw(c.vu.Runtime(), err)
}
}
return Response{
Code: resp.Code().String(),
return Message{
Code: msg.Code().String(),
Body: b,
}
}

// Observe sends an OBSERVE message to the specified path. It waits for messages
// until the specified timeout.
func (c *client) Observe(path string, timeout int) {
func (c *client) Observe(path string, timeout int, listener func(Message)) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
obs, err := c.conn.Observe(ctx, path, func(req *pool.Message) {
// TODO(hasheddan): emit metrics on observed messages.

obs, err := c.conn.Observe(ctx, path, func(msg *pool.Message) {
var b []byte
var err error
if body := msg.Body(); body != nil {
if b, err = io.ReadAll(body); err != nil {
common.Throw(c.vu.Runtime(), err)
return
}
}
c.tq.Queue(func() error {
listener(Message{
Code: msg.Code().String(),
Body: b,
})
return nil
})
})
if err != nil {
defer cancel()
common.Throw(c.vu.Runtime(), err)
return
}
<-ctx.Done()
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
defer cancel()
if err := obs.Cancel(ctx); err != nil {
common.Throw(c.vu.Runtime(), err)
}
go func() {
<-ctx.Done()
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := obs.Cancel(ctx); err != nil {
c.vu.State().Logger.Warnf("failed to cancel observation: %v", err)
}
}()
}

// Put sends a PUT message with the provided content to the specified path.
func (c *client) Put(path, mediaType string, content []byte, timeout int) Response {
func (c *client) Put(path, mediaType string, content []byte, timeout int) Message {
mt, err := message.ToMediaType(mediaType)
if err != nil {
common.Throw(c.vu.Runtime(), err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
resp, err := c.conn.Put(ctx, path, mt, bytes.NewReader(content))
msg, err := c.conn.Put(ctx, path, mt, bytes.NewReader(content))
if err != nil {
common.Throw(c.vu.Runtime(), err)
}
var b []byte
if body := resp.Body(); body != nil {
if body := msg.Body(); body != nil {
if b, err = io.ReadAll(body); err != nil {
common.Throw(c.vu.Runtime(), err)
}
}
return Response{
Code: resp.Code().String(),
return Message{
Code: msg.Code().String(),
Body: b,
}
}

// Post sends a POST message with the provided content to the specified path.
func (c *client) Post(path, mediaType string, content []byte, timeout int) Response {
func (c *client) Post(path, mediaType string, content []byte, timeout int) Message {
mt, err := message.ToMediaType(mediaType)
if err != nil {
common.Throw(c.vu.Runtime(), err)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
resp, err := c.conn.Post(ctx, path, mt, bytes.NewReader(content))
msg, err := c.conn.Post(ctx, path, mt, bytes.NewReader(content))
if err != nil {
common.Throw(c.vu.Runtime(), err)
}
var b []byte
if body := resp.Body(); body != nil {
if body := msg.Body(); body != nil {
if b, err = io.ReadAll(body); err != nil {
common.Throw(c.vu.Runtime(), err)
}
}
return Response{
Code: resp.Code().String(),
return Message{
Code: msg.Code().String(),
Body: b,
}
}

// Post sends a POST message with the provided content to the specified path.
func (c *client) Delete(path string, timeout int) Response {
func (c *client) Delete(path string, timeout int) Message {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
defer cancel()
resp, err := c.conn.Delete(ctx, path)
msg, err := c.conn.Delete(ctx, path)
if err != nil {
common.Throw(c.vu.Runtime(), err)
}
var b []byte
if body := resp.Body(); body != nil {
if body := msg.Body(); body != nil {
if b, err = io.ReadAll(body); err != nil {
common.Throw(c.vu.Runtime(), err)
}
}
return Response{
Code: resp.Code().String(),
return Message{
Code: msg.Code().String(),
Body: b,
}
}

// Close closes the underlying connection.
func (c *client) Close() {
defer c.tq.Close()
if err := c.conn.Close(); err != nil {
common.Throw(c.vu.Runtime(), err)
}
Expand Down
60 changes: 52 additions & 8 deletions examples/simple.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
import { fail } from 'k6';
import { setTimeout } from "k6/x/timers"
import { Client } from 'k6/x/coap';


export default function() {
// Create new client and connect.
let client;
try {
client = new Client("coap.golioth.io:5684");
client = new Client("coap.golioth.dev:5684");
} catch (e) {
console.log(e);
fail(e);
}

// Verify connection.
try {
let res = client.get("/hello", 10);
console.log(String.fromCharCode(...res.body));
} catch (e) {
console.log(e);
fail(e);
}

// Send data.
try {
let res = client.post("/.s", "application/json", '{"hello": "world"}', 10);
console.log(res.code);
} catch (e) {
console.log(e);
fail(e);
}

// Get JSON data.
Expand All @@ -31,13 +34,54 @@ export default function() {
let json = JSON.parse(String.fromCharCode(...res.body));
console.log(json.sequenceNumber);
} catch (e) {
console.log(e);
fail(e);
}

// Start RPC observation.
try {
client.observe("/.rpc", 15, (req) => {
let json;
try {
json = JSON.parse(String.fromCharCode(...req.body));
} catch (e) {
// First message is acknowledgement of
// observation.
console.log(e);
return;
}
try {
console.log(json);
client.post("/.rpc/status", "application/json", '{"id": "' + json.id + '", "statusCode": 0, "detail":"ack"}', 10);
} catch (e) {
fail(e);
}
});
} catch (e) {
fail(e);
}

// Close connection.
// Start OTA observation.
try {
client.close();
client.observe("/.u/desired", 15, (req) => {
let json;
try {
json = JSON.parse(String.fromCharCode(...req.body));
} catch (e) {
return;
}
console.log(json);
});
} catch (e) {
console.log(e);
fail(e);
}

// Wait for observations to complete.
setTimeout(() => {
// Close connection.
try {
client.close();
} catch (e) {
fail(e);
}
}, 20000)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.20

require (
github.com/dop251/goja v0.0.0-20230402114112-623f9dda9079
github.com/mstoykov/k6-taskqueue-lib v0.1.0
github.com/pion/dtls/v2 v2.2.6
github.com/plgd-dev/go-coap/v3 v3.1.2
go.k6.io/k6 v0.44.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ github.com/mccutchen/go-httpbin v1.1.2-0.20190116014521-c5cb2f4802fa h1:lx8ZnNPw
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd h1:AC3N94irbx2kWGA8f/2Ks7EQl2LxKIRQYuT9IJDwgiI=
github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd/go.mod h1:9vRHVuLCjoFfE3GT06X0spdOAO+Zzo4AMjdIwUHBvAk=
github.com/mstoykov/envconfig v1.4.1-0.20220114105314-765c6d8c76f1 h1:94EkGmhXrVUEal+uLwFUf4fMXPhZpM5tYxuIsxrCCbI=
github.com/mstoykov/k6-taskqueue-lib v0.1.0 h1:M3eww1HSOLEN6rIkbNOJHhOVhlqnqkhYj7GTieiMBz4=
github.com/mstoykov/k6-taskqueue-lib v0.1.0/go.mod h1:PXdINulapvmzF545Auw++SCD69942FeNvUztaa9dVe4=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
Expand Down

0 comments on commit 498b585

Please sign in to comment.