Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
609 lines (518 sloc) 16.7 KB
/*
* The MIT License (MIT)
*
* Copyright (c) 2014-2018, Erik Moqvist
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
* This file is part of the Simba project.
*/
#include "simba.h"
struct message_t {
void *buf_p;
size_t size;
};
static struct mqtt_client_t client;
static struct mqtt_conn_options_t conn_options;
static struct queue_t qout;
static struct queue_t qin;
static struct queue_t qserverout;
static struct queue_t qserverin;
static char qoutbuf[64];
static char qinbuf[64];
static char qserveroutbuf[64];
static char qserverinbuf[64];
static struct thrd_t *self_p;
THRD_STACK(stack, 1024);
THRD_STACK(server_stack, 512);
static void *server_main(void *arg_p)
{
int i;
struct message_t message;
char byte;
thrd_set_name("mqtt_server");
while (1) {
/* Read a message from the test case. */
queue_read(&qserverin, &message, sizeof(message));
if (message.buf_p == NULL) {
/* Read the message written by the MQTT client, and write
it to the test thread queue. */
for (i = 0; i < message.size; i++) {
chan_read(&qout, &byte, sizeof(byte));
chan_write(&qserverout, &byte, sizeof(byte));
}
} else {
/* Write the response to the MQTT client. */
chan_write(&qin, message.buf_p, message.size);
}
}
return (NULL);
}
static char published_topic[16];
static uint8_t published_message[16];
static size_t published_message_size;
static size_t on_publish(struct mqtt_client_t *client_p,
const char *topic_p,
void *chin_p,
size_t size)
{
std_printf(OSTR("Published topic '%s' of size %d.\r\n"),
topic_p,
size);
strncpy(&published_topic[0], topic_p, sizeof(published_topic));
chan_read(chin_p, &published_message[0], size);
published_message_size = size;
thrd_resume(self_p, 0);
return (0);
}
static int on_error(struct mqtt_client_t *client_p,
int error)
{
std_printf(FSTR("error = %d\r\n"), error);
return (0);
}
static int test_init(void)
{
struct thrd_t *thrd_p;
BTASSERT(queue_init(&qout, qoutbuf, sizeof(qoutbuf)) == 0);
BTASSERT(queue_init(&qin, qinbuf, sizeof(qinbuf)) == 0);
BTASSERT(queue_init(&qserverout, qserveroutbuf, sizeof(qserveroutbuf)) == 0);
BTASSERT(queue_init(&qserverin, qserverinbuf, sizeof(qserverinbuf)) == 0);
BTASSERT(mqtt_client_init(&client,
"mqtt_client",
NULL,
&qout,
&qin,
on_publish,
on_error) == 0);
thrd_p = thrd_spawn(mqtt_client_main,
&client,
0,
stack,
sizeof(stack));
thrd_set_log_mask(thrd_p, LOG_UPTO(DEBUG));
BTASSERT(thrd_spawn(server_main,
NULL,
0,
server_stack,
sizeof(server_stack)) != NULL);
return (0);
}
static int test_connect(void)
{
struct message_t message;
uint8_t buf[16];
/* Setup */
memset(&conn_options, 0, sizeof(conn_options));
conn_options.keep_alive_s = 4242;
conn_options.client_id.buf_p = "cid";
conn_options.client_id.size = 3;
conn_options.will.topic.buf_p = "wtop";
conn_options.will.topic.size = 4;
conn_options.will.payload.buf_p = "wpay";
conn_options.will.payload.size = 4;
conn_options.user_name.buf_p = "john";
conn_options.user_name.size = 4;
conn_options.password.buf_p = "secret";
conn_options.password.size = 6;
/* Prepare the server to receive the connection message. */
message.buf_p = NULL;
message.size = 2 + 10 + 5 + 6 + 6 + 6 + 8;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Prepare the server to send the connection ack message. */
buf[0] = 0x20;
buf[1] = 2;
buf[2] = 0;
buf[3] = 0;
message.buf_p = buf;
message.size = 4;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Connect. */
BTASSERTI(mqtt_client_connect(&client, &conn_options), ==, 0);
BTASSERTI(queue_read(&qserverout, buf, 2), ==, 2);
BTASSERTI(buf[0], ==, 0x10);
BTASSERTI(buf[1], ==, 10 + 5 + 6 + 6 + 6 + 8);
BTASSERTI(queue_read(&qserverout, buf, 10), ==, 10);
BTASSERTI(buf[0], ==, 0);
BTASSERTI(buf[1], ==, 4);
BTASSERTI(buf[2], ==, 'M');
BTASSERTI(buf[3], ==, 'Q');
BTASSERTI(buf[4], ==, 'T');
BTASSERTI(buf[5], ==, 'T');
BTASSERTI(buf[6], ==, 4);
BTASSERTI(buf[7], ==, 0xc6);
BTASSERTI(buf[8], ==, 0x10);
BTASSERTI(buf[9], ==, 0x92);
BTASSERTI(queue_read(&qserverout, buf, 5), ==, 5);
BTASSERTI(buf[0], ==, 0);
BTASSERTI(buf[1], ==, 3);
BTASSERTI(buf[2], ==, 'c');
BTASSERTI(buf[3], ==, 'i');
BTASSERTI(buf[4], ==, 'd');
BTASSERTI(queue_read(&qserverout, buf, 6), ==, 6);
BTASSERTI(buf[0], ==, 0);
BTASSERTI(buf[1], ==, 4);
BTASSERTI(buf[2], ==, 'w');
BTASSERTI(buf[3], ==, 't');
BTASSERTI(buf[4], ==, 'o');
BTASSERTI(buf[5], ==, 'p');
BTASSERTI(queue_read(&qserverout, buf, 6), ==, 6);
BTASSERTI(buf[0], ==, 0);
BTASSERTI(buf[1], ==, 4);
BTASSERTI(buf[2], ==, 'w');
BTASSERTI(buf[3], ==, 'p');
BTASSERTI(buf[4], ==, 'a');
BTASSERTI(buf[5], ==, 'y');
BTASSERTI(queue_read(&qserverout, buf, 6), ==, 6);
BTASSERTI(buf[0], ==, 0);
BTASSERTI(buf[1], ==, 4);
BTASSERTI(buf[2], ==, 'j');
BTASSERTI(buf[3], ==, 'o');
BTASSERTI(buf[4], ==, 'h');
BTASSERTI(buf[5], ==, 'n');
BTASSERTI(queue_read(&qserverout, buf, 8), ==, 8);
BTASSERTI(buf[0], ==, 0);
BTASSERTI(buf[1], ==, 6);
BTASSERTI(buf[2], ==, 's');
BTASSERTI(buf[3], ==, 'e');
BTASSERTI(buf[4], ==, 'c');
BTASSERTI(buf[5], ==, 'r');
BTASSERTI(buf[6], ==, 'e');
BTASSERTI(buf[7], ==, 't');
return (0);
}
static int test_ping(void)
{
struct message_t message;
uint8_t response[2];
uint8_t request[2];
/* Prepare the server to receive the ping request packet. */
message.buf_p = NULL;
message.size = 2;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Prepare the server to send the ping response packet. */
response[0] = (13 << 4);
response[1] = 0;
message.buf_p = response;
message.size = 2;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Send the ping. */
BTASSERT(mqtt_client_ping(&client) == 0);
/* Verify the ping message. */
BTASSERT(queue_read(&qserverout, request, 2) == 2);
BTASSERT(request[0] == (12 << 4));
BTASSERT(request[1] == 0);
return (0);
}
static int test_publish(void)
{
struct mqtt_application_message_t foobar;
struct message_t message;
uint8_t buf[16];
/* Prepare the server to receive the publish message. */
message.buf_p = NULL;
message.size = 16;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Prepare the server to send the publish ack message. */
buf[0] = (4 << 4);
buf[1] = 2;
buf[2] = 0;
buf[3] = 1;
message.buf_p = buf;
message.size = 4;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Publish a message. */
foobar.topic.buf_p = "foo/bar";
foobar.topic.size = 7;
foobar.payload.buf_p = "fie";
foobar.payload.size = 3;
foobar.qos = 1;
BTASSERT(mqtt_client_publish(&client, &foobar) == 0);
BTASSERT(queue_read(&qserverout, buf, 16) == 16);
BTASSERT(buf[0] == ((3 << 4) | (1 << 1)));
BTASSERT(buf[1] == 14);
BTASSERT(buf[2] == 0);
BTASSERT(buf[3] == 7);
BTASSERT(buf[4] == 'f');
BTASSERT(buf[5] == 'o');
BTASSERT(buf[6] == 'o');
BTASSERT(buf[7] == '/');
BTASSERT(buf[8] == 'b');
BTASSERT(buf[9] == 'a');
BTASSERT(buf[10] == 'r');
BTASSERT(buf[11] == 0);
BTASSERT(buf[12] == 1);
BTASSERT(buf[13] == 'f');
BTASSERT(buf[14] == 'i');
BTASSERT(buf[15] == 'e');
return (0);
}
static int test_subscribe(void)
{
uint8_t buf[16];
struct message_t message;
struct mqtt_application_message_t foobar;
/* Prepare the server to receive the subscribe message. */
message.buf_p = NULL;
message.size = 14;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Prepare the server to send the subscibe ack message. */
buf[0] = (9 << 4);
buf[1] = 3;
buf[2] = 0;
buf[3] = 1;
buf[4] = 0;
message.buf_p = buf;
message.size = 5;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Subscribe. */
foobar.topic.buf_p = "foo/bar";
foobar.topic.size = 7;
foobar.qos = mqtt_qos_1_t;
BTASSERT(mqtt_client_subscribe(&client, &foobar) == 0);
BTASSERT(queue_read(&qserverout, buf, 14) == 14);
BTASSERT(buf[0] == ((8 << 4) | 2));
BTASSERT(buf[1] == 12);
BTASSERT(buf[2] == 0);
BTASSERT(buf[3] == 1);
BTASSERT(buf[4] == 0);
BTASSERT(buf[5] == 7);
BTASSERT(buf[6] == 'f');
BTASSERT(buf[7] == 'o');
BTASSERT(buf[8] == 'o');
BTASSERT(buf[9] == '/');
BTASSERT(buf[10] == 'b');
BTASSERT(buf[11] == 'a');
BTASSERT(buf[12] == 'r');
BTASSERT(buf[13] == 0x01);
/* Prepare the server to send a publish message. */
buf[0] = (3 << 4);
buf[1] = 14;
buf[2] = 0;
buf[3] = 7;
buf[4] = 'f';
buf[5] = 'o';
buf[6] = 'o';
buf[7] = '/';
buf[8] = 'b';
buf[9] = 'a';
buf[10] = 'r';
buf[11] = 0;
buf[12] = 1;
buf[13] = 'f';
buf[14] = 'i';
buf[15] = 'e';
message.buf_p = buf;
message.size = 16;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Resumed from the callback. */
thrd_suspend(NULL);
/* Prepare the server to receive the unsubscribe message. */
message.buf_p = NULL;
message.size = 13;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Prepare the server to send the unsubscibe ack message. */
buf[0] = (11 << 4);
buf[1] = 2;
buf[2] = 0;
buf[3] = 2;
message.buf_p = buf;
message.size = 4;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Unsubscribe. */
foobar.topic.buf_p = "foo/bar";
foobar.topic.size = 7;
BTASSERT(mqtt_client_unsubscribe(&client, &foobar) == 0);
BTASSERT(queue_read(&qserverout, buf, 13) == 13);
BTASSERT(buf[0] == ((10 << 4) | 2));
BTASSERT(buf[1] == 11);
BTASSERT(buf[2] == 0);
BTASSERT(buf[3] == 2);
BTASSERT(buf[4] == 0);
BTASSERT(buf[5] == 7);
BTASSERT(buf[6] == 'f');
BTASSERT(buf[7] == 'o');
BTASSERT(buf[8] == 'o');
BTASSERT(buf[9] == '/');
BTASSERT(buf[10] == 'b');
BTASSERT(buf[11] == 'a');
BTASSERT(buf[12] == 'r');
return (0);
}
static int test_incoming_publish_qos0(void)
{
uint8_t buf[16];
struct message_t message;
/* Prepare the server to send a publish message. */
/* Packet fixed header */
buf[0] = ((3 << 4) | (0 << 1)); /* QoS 0. */
buf[1] = 12;
/* Variable header */
buf[2] = 0;
buf[3] = 7;
buf[4] = 'f';
buf[5] = 'o';
buf[6] = 'o';
buf[7] = '/';
buf[8] = 'b';
buf[9] = 'a';
buf[10] = 'r';
/* Payload */
buf[11] = 'f';
buf[12] = 'i';
buf[13] = 'e';
message.buf_p = buf;
message.size = 14;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Resumed from the callback. */
thrd_suspend(NULL);
/* Check the received message. */
BTASSERTM(&published_topic[0], "foo/bar", 8);
BTASSERTM(&published_message[0], "fie", 3);
BTASSERT(published_message_size == 3);
return (0);
}
static int test_incoming_publish_qos1(void)
{
uint8_t buf[16];
struct message_t message;
/* Prepare the server to send a publish message. */
/* Packet fixed header */
buf[0] = ((3 << 4) | (1 << 1)); /* QoS 1. */
buf[1] = 14;
/* Variable header */
buf[2] = 0;
buf[3] = 7;
buf[4] = 'f';
buf[5] = 'o';
buf[6] = 'o';
buf[7] = '/';
buf[8] = 'b';
buf[9] = 'a';
buf[10] = 'r';
/* Packet Identifier */
buf[11] = 0;
buf[12] = 1;
/* Payload */
buf[13] = 'f';
buf[14] = 'i';
buf[15] = 'e';
message.buf_p = buf;
message.size = 16;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Resumed from the callback. */
thrd_suspend(NULL);
/* Check the received message. */
BTASSERTM(&published_topic[0], "foo/bar", 8);
BTASSERTM(&published_message[0], "fie", 3);
BTASSERT(published_message_size == 3);
/* Prepare the server to receive the ACK message. */
message.buf_p = NULL;
message.size = 4;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Read the ACK packet */
BTASSERT(queue_read(&qserverout, buf, 4) == 4);
BTASSERT(buf[0] == (4 << 4));
BTASSERT(buf[1] == 2);
BTASSERT(buf[2] == 0);
BTASSERT(buf[3] == 1);
return (0);
}
static int test_incoming_publish_qos2(void)
{
uint8_t buf[16];
struct message_t message;
/* Prepare the server to send a publish message. */
/* Packet fixed header */
buf[0] = ((3 << 4) | (2 << 1)); /* QoS 2. */
buf[1] = 14;
/* Variable header */
buf[2] = 0;
buf[3] = 7;
buf[4] = 'f';
buf[5] = 'o';
buf[6] = 'o';
buf[7] = '/';
buf[8] = 'b';
buf[9] = 'a';
buf[10] = 'r';
/* Packet Identifier */
buf[11] = 0;
buf[12] = 1;
/* Payload */
buf[13] = 'f';
buf[14] = 'i';
buf[15] = 'e';
message.buf_p = buf;
message.size = 16;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Resumed from the callback. */
thrd_suspend(NULL);
/* Check the received message. */
BTASSERTM(&published_topic[0], "foo/bar", 8);
BTASSERTM(&published_message[0], "fie", 3);
BTASSERT(published_message_size == 3);
/* Prepare the server to receive the REC message. */
message.buf_p = NULL;
message.size = 4;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Read the ACK packet */
BTASSERT(queue_read(&qserverout, buf, 4) == 4);
BTASSERT(buf[0] == (5 << 4));
BTASSERT(buf[1] == 2);
BTASSERT(buf[2] == 0);
BTASSERT(buf[3] == 1);
return (0);
}
static int test_disconnect(void)
{
struct message_t message;
uint8_t buf[16];
/* Prepare the server to receive the disconnect message. */
message.buf_p = NULL;
message.size = 2;
BTASSERT(queue_write(&qserverin, &message, sizeof(message)) == sizeof(message));
/* Disconnect. */
BTASSERT(mqtt_client_disconnect(&client) == 0);
BTASSERT(queue_read(&qserverout, buf, 2) == 2);
BTASSERT(buf[0] == (14 << 4));
BTASSERT(buf[1] == 0);
return (0);
}
int main()
{
struct harness_testcase_t testcases[] = {
{ test_init, "test_init" },
{ test_connect, "test_connect" },
{ test_ping, "test_ping" },
{ test_publish, "test_publish" },
{ test_subscribe, "test_subscribe" },
{ test_incoming_publish_qos0, "test_incoming_publish_qos0" },
{ test_incoming_publish_qos1, "test_incoming_publish_qos1" },
{ test_incoming_publish_qos2, "test_incoming_publish_qos2" },
{ test_disconnect, "test_disconnect" },
{ NULL, NULL }
};
sys_start();
self_p = thrd_self();
harness_run(testcases);
return (0);
}