Skip to content

Commit

Permalink
added fr_network_listen_inject()
Browse files Browse the repository at this point in the history
to inject data into a socket.

Mainly to be used for connected sockets
  • Loading branch information
alandekok committed Jan 11, 2018
1 parent c9fda34 commit b63c279
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/lib/io/application.h
Expand Up @@ -105,6 +105,7 @@ typedef struct fr_app_io_t {
fr_io_get_fd_t fd; //!< Return the file descriptor from the instance.
fr_io_data_read_t read; //!< Read from a socket to a data buffer
fr_io_data_write_t write; //!< Write from a data buffer to a socket
fr_io_data_inject_t inject; //!< Inject a packet into a socket.
fr_io_data_vnode_t vnode; //!< Handle notifications that the VNODE has changed
fr_io_decode_t decode; //!< Translate raw bytes into VALUE_PAIRs and metadata.
fr_io_encode_t encode; //!< Pack VALUE_PAIRs back into a byte array.
Expand Down
1 change: 1 addition & 0 deletions src/lib/io/control.h
Expand Up @@ -57,6 +57,7 @@ typedef void (*fr_control_callback_t)(void *ctx, void const *data, size_t data_s
#define FR_CONTROL_ID_SOCKET (2)
#define FR_CONTROL_ID_WORKER (3)
#define FR_CONTROL_ID_DIRECTORY (4)
#define FR_CONTROL_ID_INJECT (5)

fr_control_t *fr_control_create(TALLOC_CTX *ctx, int kq, fr_atomic_queue_t *aq, uintptr_t ident) CC_HINT(nonnull(3));
void fr_control_free(fr_control_t *c) CC_HINT(nonnull);
Expand Down
30 changes: 30 additions & 0 deletions src/lib/io/io.h
Expand Up @@ -216,6 +216,36 @@ typedef ssize_t (*fr_io_data_read_t)(void *instance, void **packet_ctx, fr_time_
typedef ssize_t (*fr_io_data_write_t)(void *instance, void *packet_ctx, fr_time_t request_time,
uint8_t *buffer, size_t buffer_len);

/** Inject data into a socket.
*
* This function allows callers to inject data into a socket, just as if the data
* was read from a socket.
*
* Note that this function is NOT an analog to fr_io_data_read_t.
* That is, the called function MUST copy the packet pointer into an
* internal list, so that subsequent calls to read() will return this
* packet.
*
* The network side ensures that the packet buffer remains available
* to the called function for the duration of an inject() and read()
* call. i.e. the packet contents do NOT have to be saved, and the
* code can instead save the pointer to the buffer.
*
* However, this buffer MUST be immediately returned on a subsequent
* call to read(). If it is not returned, the memory is still freed,
* and the pointer becomes invalid. Subsequent access to the buffer
* will result in crashes.
*
* @param[in] instance the context for this function
* @param[in] buffer the buffer where the raw packet to be injected
* @param[in] buffer_len the length of the buffer
* @param[in] recv_time when the packet was received
* @return
* - <0 on error
* - 0 on success
*/
typedef int (*fr_io_data_inject_t)(void *instance,uint8_t *buffer, size_t buffer_len, fr_time_t recv_time);

/** Tell the IO handler that a VNODE has changed
*
* @param[in] instance the context for this function
Expand Down
84 changes: 84 additions & 0 deletions src/lib/io/network.c
Expand Up @@ -60,6 +60,13 @@ RCSID("$Id$")

#define MAX_WORKERS 32

typedef struct fr_network_inject_t {
fr_listen_t *listen;
uint8_t *packet;
size_t packet_len;
fr_time_t recv_time;
} fr_network_inject_t;

typedef struct fr_network_worker_t {
int heap_id; //!< workers are in a heap
fr_time_t cpu_time; //!< how much CPU time this worker has spent
Expand Down Expand Up @@ -797,6 +804,42 @@ static void fr_network_worker_callback(void *ctx, void const *data, size_t data_
}


/** Handle a network control message callback for a packet sent to a socket
*
* @param[in] ctx the network
* @param[in] data the message
* @param[in] data_size size of the data
* @param[in] now the current time
*/
static void fr_network_inject_callback(void *ctx, void const *data, size_t data_size, UNUSED fr_time_t now)
{
fr_network_t *nr = ctx;
fr_network_inject_t my_inject;
fr_network_socket_t *s, my_socket;

rad_assert(data_size == sizeof(my_inject));

memcpy(&my_inject, data, data_size);

my_socket.listen = my_inject.listen;
s = rbtree_finddata(nr->sockets, &my_socket);
if (!s) {
talloc_free(my_inject.packet); /* MUST be it's own TALLOC_CTX */
return;
}

/*
* Inject the packet, and then read it back from the
* network.
*/
if (s->listen->app_io->inject(s->listen->app_io_instance, my_inject.packet, my_inject.packet_len, my_inject.recv_time) == 0) {
fr_network_read(nr->el, s->fd, 0, s);
}

talloc_free(my_inject.packet);
}


/** Service a control-plane event.
*
* @param[in] kq the kq to service
Expand Down Expand Up @@ -898,6 +941,11 @@ fr_network_t *fr_network_create(TALLOC_CTX *ctx, fr_event_list_t *el, fr_log_t c
goto fail2;
}

if (fr_control_callback_add(nr->control, FR_CONTROL_ID_INJECT, nr, fr_network_inject_callback) < 0) {
fr_strerror_printf("Failed adding packet injection callback: %s", fr_strerror());
goto fail2;
}

/*
* Create the various heaps.
*/
Expand Down Expand Up @@ -1248,3 +1296,39 @@ void fr_network_listen_read(fr_network_t *nr, fr_listen_t const *listen)
*/
fr_network_read(nr->el, s->fd, 0, s);
}

/** Inject a packet for a listener
*
* @param nr the network
* @param listen the listener where the packet is being injected
* @param packet the packet to be injected
* @param packet_len the length of the packet
* @param recv_time when the packet was received.
* @return
* - <0 on error
* - 0 on success
*/
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *listen, uint8_t *packet, size_t packet_len, fr_time_t recv_time)
{
int rcode;
fr_network_inject_t my_inject;

(void) talloc_get_type_abort(nr, fr_network_t);
(void) talloc_get_type_abort(listen, fr_listen_t);

/*
* Can't inject to injection-less destinations.
*/
if (!listen->app_io->inject) return -1;

my_inject.listen = listen;
my_inject.packet = packet;
my_inject.packet_len = packet_len;
my_inject.recv_time = recv_time;

PTHREAD_MUTEX_LOCK(&nr->mutex);
rcode = fr_control_message_send(nr->control, nr->rb, FR_CONTROL_ID_INJECT, &my_inject, sizeof(my_inject));
PTHREAD_MUTEX_UNLOCK(&nr->mutex);

return rcode;
}
1 change: 1 addition & 0 deletions src/lib/io/network.h
Expand Up @@ -42,6 +42,7 @@ int fr_network_socket_add(fr_network_t *nr, fr_listen_t const *io) CC_HINT(nonnu
int fr_network_directory_add(fr_network_t *nr, fr_listen_t const *listen) CC_HINT(nonnull);
int fr_network_worker_add(fr_network_t *nr, fr_worker_t *worker) CC_HINT(nonnull);
void fr_network_listen_read(fr_network_t *nr, fr_listen_t const *listen) CC_HINT(nonnull);
int fr_network_listen_inject(fr_network_t *nr, fr_listen_t *listen, uint8_t *packet, size_t packet_len, fr_time_t recv_time);

#ifdef __cplusplus
}
Expand Down

0 comments on commit b63c279

Please sign in to comment.