diff --git a/Makefile b/Makefile index c50430a..cb8d85d 100644 --- a/Makefile +++ b/Makefile @@ -12,4 +12,4 @@ endif # Targets build: setup - @xk6 build --with github.com/golioth/xk6-coap=. + @xk6 build --with github.com/golioth/xk6-coap=. --with github.com/grafana/xk6-timers diff --git a/coap/coap.go b/coap/coap.go index ffef775..fac31fa 100644 --- a/coap/coap.go +++ b/coap/coap.go @@ -8,6 +8,7 @@ import ( "time" "github.com/dop251/goja" + "github.com/mstoykov/k6-taskqueue-lib/taskqueue" piondtls "github.com/pion/dtls/v2" "github.com/plgd-dev/go-coap/v3/dtls" "github.com/plgd-dev/go-coap/v3/message" @@ -22,8 +23,8 @@ const ( defaultPSKIDEnv = "COAP_PSK_ID" ) -// Response is a CoAP response message. -type Response struct { +// Message is a CoAP message. +type Message struct { Code string Body []byte } @@ -89,6 +90,7 @@ func (c *CoAP) client(cc goja.ConstructorCall) *goja.Object { client := &client{ vu: c.vu, + tq: taskqueue.New(c.vu.RegisterCallback), conn: conn, obj: rt.NewObject(), } @@ -117,119 +119,139 @@ func (c *CoAP) client(cc goja.ConstructorCall) *goja.Object { // client is a CoAP client with a DTLS connection. type client struct { vu modules.VU + tq *taskqueue.TaskQueue conn *udp.Conn obj *goja.Object } // Get sends a GET message to the specified path. -func (c *client) Get(path string, timeout int) Response { +func (c *client) Get(path string, timeout int) Message { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) defer cancel() - resp, err := c.conn.Get(ctx, path) + msg, err := c.conn.Get(ctx, path) if err != nil { common.Throw(c.vu.Runtime(), err) } var b []byte - if body := resp.Body(); body != nil { + if body := msg.Body(); body != nil { if b, err = io.ReadAll(body); err != nil { common.Throw(c.vu.Runtime(), err) } } - return Response{ - Code: resp.Code().String(), + return Message{ + Code: msg.Code().String(), Body: b, } } // Observe sends an OBSERVE message to the specified path. It waits for messages // until the specified timeout. -func (c *client) Observe(path string, timeout int) { +func (c *client) Observe(path string, timeout int, listener func(Message)) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) - defer cancel() - obs, err := c.conn.Observe(ctx, path, func(req *pool.Message) { - // TODO(hasheddan): emit metrics on observed messages. + + obs, err := c.conn.Observe(ctx, path, func(msg *pool.Message) { + var b []byte + var err error + if body := msg.Body(); body != nil { + if b, err = io.ReadAll(body); err != nil { + common.Throw(c.vu.Runtime(), err) + return + } + } + c.tq.Queue(func() error { + listener(Message{ + Code: msg.Code().String(), + Body: b, + }) + return nil + }) }) if err != nil { + defer cancel() common.Throw(c.vu.Runtime(), err) + return } - <-ctx.Done() - ctx, cancel = context.WithTimeout(context.Background(), time.Second) - defer cancel() - if err := obs.Cancel(ctx); err != nil { - common.Throw(c.vu.Runtime(), err) - } + go func() { + <-ctx.Done() + ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := obs.Cancel(ctx); err != nil { + c.vu.State().Logger.Warnf("failed to cancel observation: %v", err) + } + }() } // Put sends a PUT message with the provided content to the specified path. -func (c *client) Put(path, mediaType string, content []byte, timeout int) Response { +func (c *client) Put(path, mediaType string, content []byte, timeout int) Message { mt, err := message.ToMediaType(mediaType) if err != nil { common.Throw(c.vu.Runtime(), err) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) defer cancel() - resp, err := c.conn.Put(ctx, path, mt, bytes.NewReader(content)) + msg, err := c.conn.Put(ctx, path, mt, bytes.NewReader(content)) if err != nil { common.Throw(c.vu.Runtime(), err) } var b []byte - if body := resp.Body(); body != nil { + if body := msg.Body(); body != nil { if b, err = io.ReadAll(body); err != nil { common.Throw(c.vu.Runtime(), err) } } - return Response{ - Code: resp.Code().String(), + return Message{ + Code: msg.Code().String(), Body: b, } } // Post sends a POST message with the provided content to the specified path. -func (c *client) Post(path, mediaType string, content []byte, timeout int) Response { +func (c *client) Post(path, mediaType string, content []byte, timeout int) Message { mt, err := message.ToMediaType(mediaType) if err != nil { common.Throw(c.vu.Runtime(), err) } ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) defer cancel() - resp, err := c.conn.Post(ctx, path, mt, bytes.NewReader(content)) + msg, err := c.conn.Post(ctx, path, mt, bytes.NewReader(content)) if err != nil { common.Throw(c.vu.Runtime(), err) } var b []byte - if body := resp.Body(); body != nil { + if body := msg.Body(); body != nil { if b, err = io.ReadAll(body); err != nil { common.Throw(c.vu.Runtime(), err) } } - return Response{ - Code: resp.Code().String(), + return Message{ + Code: msg.Code().String(), Body: b, } } // Post sends a POST message with the provided content to the specified path. -func (c *client) Delete(path string, timeout int) Response { +func (c *client) Delete(path string, timeout int) Message { ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout)) defer cancel() - resp, err := c.conn.Delete(ctx, path) + msg, err := c.conn.Delete(ctx, path) if err != nil { common.Throw(c.vu.Runtime(), err) } var b []byte - if body := resp.Body(); body != nil { + if body := msg.Body(); body != nil { if b, err = io.ReadAll(body); err != nil { common.Throw(c.vu.Runtime(), err) } } - return Response{ - Code: resp.Code().String(), + return Message{ + Code: msg.Code().String(), Body: b, } } // Close closes the underlying connection. func (c *client) Close() { + defer c.tq.Close() if err := c.conn.Close(); err != nil { common.Throw(c.vu.Runtime(), err) } diff --git a/examples/simple.js b/examples/simple.js index b649011..1ae5bd8 100644 --- a/examples/simple.js +++ b/examples/simple.js @@ -1,12 +1,15 @@ +import { fail } from 'k6'; +import { setTimeout } from "k6/x/timers" import { Client } from 'k6/x/coap'; + export default function() { // Create new client and connect. let client; try { - client = new Client("coap.golioth.io:5684"); + client = new Client("coap.golioth.dev:5684"); } catch (e) { - console.log(e); + fail(e); } // Verify connection. @@ -14,7 +17,7 @@ export default function() { let res = client.get("/hello", 10); console.log(String.fromCharCode(...res.body)); } catch (e) { - console.log(e); + fail(e); } // Send data. @@ -22,7 +25,7 @@ export default function() { let res = client.post("/.s", "application/json", '{"hello": "world"}', 10); console.log(res.code); } catch (e) { - console.log(e); + fail(e); } // Get JSON data. @@ -31,13 +34,54 @@ export default function() { let json = JSON.parse(String.fromCharCode(...res.body)); console.log(json.sequenceNumber); } catch (e) { - console.log(e); + fail(e); + } + + // Start RPC observation. + try { + client.observe("/.rpc", 15, (req) => { + let json; + try { + json = JSON.parse(String.fromCharCode(...req.body)); + } catch (e) { + // First message is acknowledgement of + // observation. + console.log(e); + return; + } + try { + console.log(json); + client.post("/.rpc/status", "application/json", '{"id": "' + json.id + '", "statusCode": 0, "detail":"ack"}', 10); + } catch (e) { + fail(e); + } + }); + } catch (e) { + fail(e); } - // Close connection. + // Start OTA observation. try { - client.close(); + client.observe("/.u/desired", 15, (req) => { + let json; + try { + json = JSON.parse(String.fromCharCode(...req.body)); + } catch (e) { + return; + } + console.log(json); + }); } catch (e) { - console.log(e); + fail(e); } + + // Wait for observations to complete. + setTimeout(() => { + // Close connection. + try { + client.close(); + } catch (e) { + fail(e); + } + }, 20000) } diff --git a/go.mod b/go.mod index 84ea2a9..02588c9 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.20 require ( github.com/dop251/goja v0.0.0-20230402114112-623f9dda9079 + github.com/mstoykov/k6-taskqueue-lib v0.1.0 github.com/pion/dtls/v2 v2.2.6 github.com/plgd-dev/go-coap/v3 v3.1.2 go.k6.io/k6 v0.44.1 diff --git a/go.sum b/go.sum index 327563d..8955b36 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/mccutchen/go-httpbin v1.1.2-0.20190116014521-c5cb2f4802fa h1:lx8ZnNPw github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd h1:AC3N94irbx2kWGA8f/2Ks7EQl2LxKIRQYuT9IJDwgiI= github.com/mstoykov/atlas v0.0.0-20220811071828-388f114305dd/go.mod h1:9vRHVuLCjoFfE3GT06X0spdOAO+Zzo4AMjdIwUHBvAk= github.com/mstoykov/envconfig v1.4.1-0.20220114105314-765c6d8c76f1 h1:94EkGmhXrVUEal+uLwFUf4fMXPhZpM5tYxuIsxrCCbI= +github.com/mstoykov/k6-taskqueue-lib v0.1.0 h1:M3eww1HSOLEN6rIkbNOJHhOVhlqnqkhYj7GTieiMBz4= +github.com/mstoykov/k6-taskqueue-lib v0.1.0/go.mod h1:PXdINulapvmzF545Auw++SCD69942FeNvUztaa9dVe4= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=