Permalink
Browse files

Anjay 1.14.0

Improvements:
- Added anjay_configuration_t::stored_notification_limit configuration
  option for limiting the maximum number of notifications stored when
  the client is offline.
  • Loading branch information...
dextero committed Dec 4, 2018
1 parent 4d12539 commit 21896ffaa10aa74d0394023ab591f9c9a2c2c461
@@ -15,7 +15,7 @@
cmake_minimum_required(VERSION 2.8.12)
project(anjay C)
set(ANJAY_VERSION "1.13.1" CACHE STRING "Anjay library version")
set(ANJAY_VERSION "1.14.0" CACHE STRING "Anjay library version")
set(ANJAY_BINARY_VERSION 1.0.0)
set(ANJAY_SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}")
@@ -1,4 +1,4 @@
# Anjay LwM2M library [<img align="right" height="50px" src="https://avsystem.github.io/Anjay-doc/_images/avsystem_logo.png">](http://www.avsystem.com/)
# Anjay LwM2M library [<img align="right" height="50px" src="https://encrypted-tbn2.gstatic.com/images?q=tbn:ANd9GcSoiMy6rnzARUEdR0OjHmPGxTeiAMLBFlUYwIB9baWYWmuUwTbo">](http://www.avsystem.com/)
[![Build Status](https://travis-ci.org/AVSystem/Anjay.svg?branch=master)](https://travis-ci.org/AVSystem/Anjay)
[![Coverity Status](https://scan.coverity.com/projects/13206/badge.svg)](https://scan.coverity.com/projects/avsystem-anjay)
@@ -280,7 +280,8 @@ static int demo_init(anjay_demo_t *demo, cmdline_args_t *cmdline_args) {
.disable_server_initiated_bootstrap =
cmdline_args->disable_server_initiated_bootstrap,
.udp_tx_params = &cmdline_args->tx_params,
.udp_dtls_hs_tx_params = &cmdline_args->dtls_hs_tx_params
.udp_dtls_hs_tx_params = &cmdline_args->dtls_hs_tx_params,
.stored_notification_limit = cmdline_args->stored_notification_limit
};
const avs_net_security_info_t *fw_security_info_ptr = NULL;
@@ -145,6 +145,9 @@ static void print_option_help(const struct option *opt) {
{ 'l', "SECONDS", "86400",
"set registration lifetime. If SECONDS <= 0, use default value and "
"don't send lifetime in Register/Update messages." },
{ 'L', "MAX_NOTIFICATIONS", "0",
"set limit of queued notifications in queue/offline mode. 0: "
"unlimited; >0: keep that much newest ones" },
{ 'c', "CSV_FILE", NULL, "file to load location CSV from" },
{ 'f', "SECONDS", "1", "location update frequency in seconds" },
{ 'p', "PORT", NULL, "bind all sockets to the specified UDP port." },
@@ -282,6 +285,18 @@ static int parse_u16(const char *str, uint16_t *out_value) {
return 0;
}
static int parse_size(const char *str, size_t *out_value) {
long long_value;
if (demo_parse_long(str, &long_value) || long_value < 0
|| (size_t) long_value > SIZE_MAX) {
demo_log(ERROR, "value out of range: %s", str);
return -1;
}
*out_value = (size_t) long_value;
return 0;
}
static int parse_double(const char *str, double *out_value) {
errno = 0;
char *endptr = NULL;
@@ -408,6 +423,7 @@ int demo_parse_argv(cmdline_args_t *parsed_args, int argc, char *argv[]) {
{ "endpoint-name", required_argument, 0, 'e' },
{ "help", no_argument, 0, 'h' },
{ "lifetime", required_argument, 0, 'l' },
{ "stored-notification-limit", required_argument, 0, 'L' },
{ "location-csv", required_argument, 0, 'c' },
{ "location-update-freq-s", required_argument, 0, 'f' },
{ "port", required_argument, 0, 'p' },
@@ -543,6 +559,11 @@ int demo_parse_argv(cmdline_args_t *parsed_args, int argc, char *argv[]) {
goto finish;
}
break;
case 'L':
if (parse_size(optarg, &parsed_args->stored_notification_limit)) {
goto finish;
}
break;
case 'c':
parsed_args->location_csv = optarg;
break;
@@ -53,6 +53,7 @@ typedef struct cmdline_args {
*/
bool fwu_tx_params_modified;
avs_coap_tx_params_t fwu_tx_params;
size_t stored_notification_limit;
} cmdline_args_t;
int demo_parse_argv(cmdline_args_t *parsed_args, int argc, char **argv);
@@ -361,10 +361,18 @@ static void cmd_set_attrs(anjay_demo_t *demo, const char *args_string) {
const char *args = NULL, *pmin = NULL, *pmax = NULL, *lt = NULL, *gt = NULL,
*st = NULL;
anjay_dm_resource_attributes_t attrs;
int ssid;
if (sscanf(args_string, "%s%n", path, &path_len) != 1) {
if (sscanf(args_string, "%s %d%n", path, &ssid, &path_len) != 2) {
goto error;
}
if (ssid < 0 || UINT16_MAX <= ssid) {
demo_log(ERROR, "invalid SSID: expected 0 <= ssid < 65535, got %d",
ssid);
goto error;
}
args = args_string + path_len;
attrs = ANJAY_RES_ATTRIBS_EMPTY;
pmin = strstr(args, "pmin=");
@@ -392,21 +400,22 @@ static void cmd_set_attrs(anjay_demo_t *demo, const char *args_string) {
switch (sscanf(path, "/%d/%d/%d", &oid, &iid, &rid)) {
case 3:
if (anjay_attr_storage_set_resource_attrs(
demo->anjay, 1, (anjay_oid_t) oid, (anjay_iid_t) iid,
(anjay_rid_t) rid, &attrs)) {
demo->anjay, (anjay_ssid_t) ssid, (anjay_oid_t) oid,
(anjay_iid_t) iid, (anjay_rid_t) rid, &attrs)) {
demo_log(ERROR, "failed to set resource level attributes");
}
goto finish;
case 2:
if (anjay_attr_storage_set_instance_attrs(
demo->anjay, 1, (anjay_oid_t) oid, (anjay_iid_t) iid,
&attrs.common)) {
demo->anjay, (anjay_ssid_t) ssid, (anjay_oid_t) oid,
(anjay_iid_t) iid, &attrs.common)) {
demo_log(ERROR, "failed to set instance level attributes");
}
goto finish;
case 1:
if (anjay_attr_storage_set_object_attrs(
demo->anjay, 1, (anjay_oid_t) oid, &attrs.common)) {
demo->anjay, (anjay_ssid_t) ssid, (anjay_oid_t) oid,
&attrs.common)) {
demo_log(ERROR, "failed to set object level attributes");
}
goto finish;
@@ -268,6 +268,22 @@ typedef struct anjay_configuration {
* bootstrap sequence.
*/
bool disable_server_initiated_bootstrap;
/**
* If "Notification Storing When Disabled or Offline" resource is set to
* true and either the client is in offline mode, or uses Queue Mode,
* Notify messages are enqueued and sent whenever the client is online
* again. This value allows one to limit the size of said notification
* queue. The limit applies to notifications queued for all servers.
*
* If set to 0, size of the stored notification queue is only limited by
* the amount of available RAM.
*
* If set to a positive value, that much *most recent* notifications are
* stored. Attempting to add a notification to the queue while it is
* already full drops the oldest one to make room for new one.
*/
size_t stored_notification_limit;
} anjay_configuration_t;
/**
@@ -129,7 +129,8 @@ static int init(anjay_t *anjay, const anjay_configuration_t *config) {
}
if (_anjay_observe_init(&anjay->observe,
config->confirmable_notifications)) {
config->confirmable_notifications,
config->stored_notification_limit)) {
return -1;
}
@@ -81,13 +81,22 @@ int _anjay_observe_entry_cmp(const void *left, const void *right) {
}
int _anjay_observe_init(anjay_observe_state_t *observe,
bool confirmable_notifications) {
bool confirmable_notifications,
size_t stored_notification_limit) {
if (!(observe->connection_entries = AVS_RBTREE_NEW(
anjay_observe_connection_entry_t, connection_state_cmp))) {
anjay_log(ERROR, "Could not initialize Observe structures");
return -1;
}
observe->confirmable_notifications = confirmable_notifications;
if (stored_notification_limit == 0) {
observe->notify_queue_limit_mode = NOTIFY_QUEUE_UNLIMITED;
} else {
observe->notify_queue_limit = stored_notification_limit;
observe->notify_queue_limit_mode = NOTIFY_QUEUE_DROP_OLDEST;
}
return 0;
}
@@ -273,19 +282,103 @@ create_resource_value(const anjay_msg_details_t *details,
return result;
}
static int insert_new_value(anjay_observe_connection_entry_t *conn_state,
static size_t count_queued_notifications(const anjay_observe_state_t *observe) {
size_t count = 0;
AVS_RBTREE_ELEM(anjay_observe_connection_entry_t) conn;
AVS_RBTREE_FOREACH(conn, observe->connection_entries) {
count += AVS_LIST_SIZE(conn->unsent);
}
return count;
}
static bool is_observe_queue_full(const anjay_observe_state_t *observe) {
if (observe->notify_queue_limit_mode == NOTIFY_QUEUE_UNLIMITED) {
return false;
}
size_t num_queued = count_queued_notifications(observe);
anjay_log(TRACE, "%u/%u queued notifications", (unsigned) num_queued,
(unsigned) observe->notify_queue_limit);
assert(num_queued <= observe->notify_queue_limit);
return num_queued >= observe->notify_queue_limit;
}
static AVS_LIST(anjay_observe_connection_entry_t)
find_oldest_queued_notification(anjay_observe_state_t *observe) {
AVS_LIST(anjay_observe_connection_entry_t) oldest = NULL;
AVS_RBTREE_ELEM(anjay_observe_connection_entry_t) conn;
AVS_RBTREE_FOREACH(conn, observe->connection_entries) {
if (conn->unsent) {
if (!oldest
|| avs_time_real_before(conn->unsent->timestamp,
oldest->unsent->timestamp)) {
oldest = conn;
}
}
}
return oldest;
}
static anjay_observe_resource_value_t *
detach_first_unsent_value(anjay_observe_connection_entry_t *conn_state) {
assert(conn_state->unsent);
anjay_observe_entry_t *entry = conn_state->unsent->ref;
if (entry->last_unsent == conn_state->unsent) {
entry->last_unsent = NULL;
}
anjay_observe_resource_value_t *result =
AVS_LIST_DETACH(&conn_state->unsent);
if (conn_state->unsent_last == result) {
assert(!conn_state->unsent);
conn_state->unsent_last = NULL;
}
return result;
}
static void drop_oldest_queued_notification(anjay_observe_state_t *observe) {
AVS_LIST(anjay_observe_connection_entry_t) oldest =
find_oldest_queued_notification(observe);
AVS_ASSERT(oldest, "function is not supposed to be called when there are "
"no queued notifications");
anjay_observe_resource_value_t *entry = detach_first_unsent_value(oldest);
AVS_LIST_DELETE(&entry);
}
static int insert_new_value(anjay_observe_state_t *observe,
anjay_observe_connection_entry_t *conn_state,
anjay_observe_entry_t *entry,
const anjay_msg_details_t *details,
const avs_coap_msg_identity_t *identity,
double numeric,
const void *data,
size_t size) {
if (is_observe_queue_full(observe)) {
switch (observe->notify_queue_limit_mode) {
case NOTIFY_QUEUE_UNLIMITED:
AVS_UNREACHABLE("is_observe_queue_full broken");
return -1;
case NOTIFY_QUEUE_DROP_OLDEST:
assert(observe->notify_queue_limit != 0);
drop_oldest_queued_notification(observe);
break;
}
}
AVS_LIST(anjay_observe_resource_value_t) res_value =
create_resource_value(details, entry, identity, numeric, data,
size);
if (!res_value) {
return -1;
}
AVS_LIST_APPEND(&conn_state->unsent_last, res_value);
conn_state->unsent_last = res_value;
if (!conn_state->unsent) {
@@ -306,8 +399,8 @@ static int insert_error(anjay_t *anjay,
.msg_code = _anjay_make_error_response_code(outer_result),
.format = AVS_COAP_FORMAT_NONE
};
return insert_new_value(conn_state, entry, &details, identity, NAN, NULL,
0);
return insert_new_value(&anjay->observe, conn_state, entry, &details,
identity, NAN, NULL, 0);
}
static int get_effective_attrs(anjay_t *anjay,
@@ -644,22 +737,6 @@ static bool confirmable_required(const avs_time_real_t now,
avs_time_duration_from_scalar(1, AVS_TIME_DAY));
}
static anjay_observe_resource_value_t *
detach_first_unsent_value(anjay_observe_connection_entry_t *conn_state) {
assert(conn_state->unsent);
anjay_observe_entry_t *entry = conn_state->unsent->ref;
if (entry->last_unsent == conn_state->unsent) {
entry->last_unsent = NULL;
}
anjay_observe_resource_value_t *result =
AVS_LIST_DETACH(&conn_state->unsent);
if (conn_state->unsent_last == result) {
assert(!conn_state->unsent);
conn_state->unsent_last = NULL;
}
return result;
}
static void value_sent(anjay_observe_connection_entry_t *conn_state) {
anjay_observe_resource_value_t *sent =
detach_first_unsent_value(conn_state);
@@ -932,7 +1009,8 @@ update_notification_value(anjay_t *anjay,
if (pmax_expired
|| should_update(newest_value(entry), &attrs.standard,
&observe_details, numeric, buf, (size_t) size)) {
result = insert_new_value(conn_state, entry, &observe_details,
result = insert_new_value(&anjay->observe, conn_state, entry,
&observe_details,
&newest_value(entry)->identity, numeric, buf,
(size_t) size);
}
@@ -48,9 +48,17 @@ typedef struct anjay_observe_entry_struct anjay_observe_entry_t;
typedef struct anjay_observe_connection_entry_struct
anjay_observe_connection_entry_t;
typedef enum {
NOTIFY_QUEUE_UNLIMITED,
NOTIFY_QUEUE_DROP_OLDEST
} notify_queue_limit_mode_t;
typedef struct {
AVS_RBTREE(anjay_observe_connection_entry_t) connection_entries;
bool confirmable_notifications;
notify_queue_limit_mode_t notify_queue_limit_mode;
size_t notify_queue_limit;
} anjay_observe_state_t;
typedef struct {
@@ -72,7 +80,8 @@ typedef struct {
} anjay_observe_key_t;
int _anjay_observe_init(anjay_observe_state_t *observe,
bool confirmable_notifications);
bool confirmable_notifications,
size_t stored_notification_limit);
void _anjay_observe_cleanup(anjay_observe_state_t *observe,
anjay_sched_t *sched);
@@ -1322,7 +1322,7 @@ static void test_observe_entry(anjay_t *anjay,
static anjay_t *create_test_env(void) {
anjay_t *anjay = (anjay_t *) avs_calloc(1, sizeof(anjay_t));
_anjay_observe_init(&anjay->observe, false);
_anjay_observe_init(&anjay->observe, false, 0);
test_observe_entry(anjay, 1, ANJAY_CONNECTION_UDP, 2, 3, 1);
test_observe_entry(anjay, 1, ANJAY_CONNECTION_UDP, 2, 3, 2);
test_observe_entry(anjay, 1, ANJAY_CONNECTION_UDP, 2, 9, 4);
Oops, something went wrong.

0 comments on commit 21896ff

Please sign in to comment.