Skip to content

Commit

Permalink
crypto/scheduler: optimize crypto op ordering
Browse files Browse the repository at this point in the history
This patch optimizes the crypto op ordering by replacing the
ordering method from using rte_reorder library to using rte_ring
to avoid unnecessary crypto op storing and recovering cost.

Signed-off-by: Fan Zhang <roy.fan.zhang@intel.com>
Signed-off-by: Sergio Gonzalez Monroy <sergio.gonzalez.monroy@intel.com>
Acked-by: Declan Doherty <declan.doherty@intel.com>
  • Loading branch information
Fan Zhang authored and pablodelara committed Apr 5, 2017
1 parent 211e27a commit 8a48e03
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 191 deletions.
42 changes: 22 additions & 20 deletions drivers/crypto/scheduler/scheduler_pmd_ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,41 +63,43 @@ scheduler_pmd_config(struct rte_cryptodev *dev)
}

static int
update_reorder_buff(struct rte_cryptodev *dev, uint16_t qp_id)
update_order_ring(struct rte_cryptodev *dev, uint16_t qp_id)
{
struct scheduler_ctx *sched_ctx = dev->data->dev_private;
struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[qp_id];

if (sched_ctx->reordering_enabled) {
char reorder_buff_name[RTE_CRYPTODEV_NAME_MAX_LEN];
uint32_t buff_size = sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE;
char order_ring_name[RTE_CRYPTODEV_NAME_MAX_LEN];
uint32_t buff_size = rte_align32pow2(
sched_ctx->nb_slaves * PER_SLAVE_BUFF_SIZE);

if (qp_ctx->reorder_buf) {
rte_reorder_free(qp_ctx->reorder_buf);
qp_ctx->reorder_buf = NULL;
if (qp_ctx->order_ring) {
rte_ring_free(qp_ctx->order_ring);
qp_ctx->order_ring = NULL;
}

if (!buff_size)
return 0;

if (snprintf(reorder_buff_name, RTE_CRYPTODEV_NAME_MAX_LEN,
if (snprintf(order_ring_name, RTE_CRYPTODEV_NAME_MAX_LEN,
"%s_rb_%u_%u", RTE_STR(CRYPTODEV_NAME_SCHEDULER_PMD),
dev->data->dev_id, qp_id) < 0) {
CS_LOG_ERR("failed to create unique reorder buffer "
"name");
return -ENOMEM;
}

qp_ctx->reorder_buf = rte_reorder_create(reorder_buff_name,
rte_socket_id(), buff_size);
if (!qp_ctx->reorder_buf) {
CS_LOG_ERR("failed to create reorder buffer");
qp_ctx->order_ring = rte_ring_create(order_ring_name,
buff_size, rte_socket_id(),
RING_F_SP_ENQ | RING_F_SC_DEQ);
if (!qp_ctx->order_ring) {
CS_LOG_ERR("failed to create order ring");
return -ENOMEM;
}
} else {
if (qp_ctx->reorder_buf) {
rte_reorder_free(qp_ctx->reorder_buf);
qp_ctx->reorder_buf = NULL;
if (qp_ctx->order_ring) {
rte_ring_free(qp_ctx->order_ring);
qp_ctx->order_ring = NULL;
}
}

Expand All @@ -116,7 +118,7 @@ scheduler_pmd_start(struct rte_cryptodev *dev)
return 0;

for (i = 0; i < dev->data->nb_queue_pairs; i++) {
ret = update_reorder_buff(dev, i);
ret = update_order_ring(dev, i);
if (ret < 0) {
CS_LOG_ERR("Failed to update reorder buffer");
return ret;
Expand Down Expand Up @@ -224,9 +226,9 @@ scheduler_pmd_close(struct rte_cryptodev *dev)
for (i = 0; i < dev->data->nb_queue_pairs; i++) {
struct scheduler_qp_ctx *qp_ctx = dev->data->queue_pairs[i];

if (qp_ctx->reorder_buf) {
rte_reorder_free(qp_ctx->reorder_buf);
qp_ctx->reorder_buf = NULL;
if (qp_ctx->order_ring) {
rte_ring_free(qp_ctx->order_ring);
qp_ctx->order_ring = NULL;
}

if (qp_ctx->private_qp_ctx) {
Expand Down Expand Up @@ -324,8 +326,8 @@ scheduler_pmd_qp_release(struct rte_cryptodev *dev, uint16_t qp_id)
if (!qp_ctx)
return 0;

if (qp_ctx->reorder_buf)
rte_reorder_free(qp_ctx->reorder_buf);
if (qp_ctx->order_ring)
rte_ring_free(qp_ctx->order_ring);
if (qp_ctx->private_qp_ctx)
rte_free(qp_ctx->private_qp_ctx);

Expand Down
49 changes: 46 additions & 3 deletions drivers/crypto/scheduler/scheduler_pmd_private.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
#ifndef _SCHEDULER_PMD_PRIVATE_H
#define _SCHEDULER_PMD_PRIVATE_H

#include <rte_hash.h>
#include <rte_reorder.h>
#include "rte_cryptodev_scheduler.h"

/**< Maximum number of bonded devices per devices */
Expand Down Expand Up @@ -98,14 +96,59 @@ struct scheduler_ctx {
struct scheduler_qp_ctx {
void *private_qp_ctx;

struct rte_reorder_buffer *reorder_buf;
struct rte_ring *order_ring;
uint32_t seqn;
} __rte_cache_aligned;

struct scheduler_session {
struct rte_cryptodev_sym_session *sessions[MAX_SLAVES_NUM];
};

static inline uint16_t __attribute__((always_inline))
get_max_enqueue_order_count(struct rte_ring *order_ring, uint16_t nb_ops)
{
uint32_t count = rte_ring_free_count(order_ring);

return count > nb_ops ? nb_ops : count;
}

static inline void __attribute__((always_inline))
scheduler_order_insert(struct rte_ring *order_ring,
struct rte_crypto_op **ops, uint16_t nb_ops)
{
rte_ring_sp_enqueue_burst(order_ring, (void **)ops, nb_ops, NULL);
}

#define SCHEDULER_GET_RING_OBJ(order_ring, pos, op) do { \
struct rte_crypto_op **ring = (void *)&order_ring[1]; \
op = ring[(order_ring->cons.head + pos) & order_ring->mask]; \
} while (0)

static inline uint16_t __attribute__((always_inline))
scheduler_order_drain(struct rte_ring *order_ring,
struct rte_crypto_op **ops, uint16_t nb_ops)
{
struct rte_crypto_op *op;
uint32_t nb_objs = rte_ring_count(order_ring);
uint32_t nb_ops_to_deq = 0;
int status = -1;

if (nb_objs > nb_ops)
nb_objs = nb_ops;

while (nb_ops_to_deq < nb_objs) {
SCHEDULER_GET_RING_OBJ(order_ring, nb_ops_to_deq, op);
if (op->status == RTE_CRYPTO_OP_STATUS_NOT_PROCESSED)
break;
nb_ops_to_deq++;
}

if (nb_ops_to_deq)
status = rte_ring_sc_dequeue_bulk(order_ring, (void **)ops,
nb_ops_to_deq, NULL);

return (status == 0) ? nb_ops_to_deq : 0;
}
/** device specific operations function pointer structure */
extern struct rte_cryptodev_ops *rte_crypto_scheduler_pmd_ops;

Expand Down
180 changes: 12 additions & 168 deletions drivers/crypto/scheduler/scheduler_roundrobin.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,79 +115,16 @@ static uint16_t
schedule_enqueue_ordering(void *qp, struct rte_crypto_op **ops,
uint16_t nb_ops)
{
struct scheduler_qp_ctx *qp_ctx = qp;
struct rr_scheduler_qp_ctx *rr_qp_ctx = qp_ctx->private_qp_ctx;
uint32_t slave_idx = rr_qp_ctx->last_enq_slave_idx;
struct scheduler_slave *slave = &rr_qp_ctx->slaves[slave_idx];
uint16_t i, processed_ops;
struct rte_cryptodev_sym_session *sessions[nb_ops];
struct scheduler_session *sess0, *sess1, *sess2, *sess3;

if (unlikely(nb_ops == 0))
return 0;

for (i = 0; i < nb_ops && i < 4; i++) {
rte_prefetch0(ops[i]->sym->session);
rte_prefetch0(ops[i]->sym->m_src);
}

for (i = 0; (i < (nb_ops - 8)) && (nb_ops > 8); i += 4) {
sess0 = (struct scheduler_session *)
ops[i]->sym->session->_private;
sess1 = (struct scheduler_session *)
ops[i+1]->sym->session->_private;
sess2 = (struct scheduler_session *)
ops[i+2]->sym->session->_private;
sess3 = (struct scheduler_session *)
ops[i+3]->sym->session->_private;

sessions[i] = ops[i]->sym->session;
sessions[i + 1] = ops[i + 1]->sym->session;
sessions[i + 2] = ops[i + 2]->sym->session;
sessions[i + 3] = ops[i + 3]->sym->session;

ops[i]->sym->session = sess0->sessions[slave_idx];
ops[i]->sym->m_src->seqn = qp_ctx->seqn++;
ops[i + 1]->sym->session = sess1->sessions[slave_idx];
ops[i + 1]->sym->m_src->seqn = qp_ctx->seqn++;
ops[i + 2]->sym->session = sess2->sessions[slave_idx];
ops[i + 2]->sym->m_src->seqn = qp_ctx->seqn++;
ops[i + 3]->sym->session = sess3->sessions[slave_idx];
ops[i + 3]->sym->m_src->seqn = qp_ctx->seqn++;

rte_prefetch0(ops[i + 4]->sym->session);
rte_prefetch0(ops[i + 4]->sym->m_src);
rte_prefetch0(ops[i + 5]->sym->session);
rte_prefetch0(ops[i + 5]->sym->m_src);
rte_prefetch0(ops[i + 6]->sym->session);
rte_prefetch0(ops[i + 6]->sym->m_src);
rte_prefetch0(ops[i + 7]->sym->session);
rte_prefetch0(ops[i + 7]->sym->m_src);
}

for (; i < nb_ops; i++) {
sess0 = (struct scheduler_session *)
ops[i]->sym->session->_private;
sessions[i] = ops[i]->sym->session;
ops[i]->sym->session = sess0->sessions[slave_idx];
ops[i]->sym->m_src->seqn = qp_ctx->seqn++;
}

processed_ops = rte_cryptodev_enqueue_burst(slave->dev_id,
slave->qp_id, ops, nb_ops);

slave->nb_inflight_cops += processed_ops;
struct rte_ring *order_ring =
((struct scheduler_qp_ctx *)qp)->order_ring;
uint16_t nb_ops_to_enq = get_max_enqueue_order_count(order_ring,
nb_ops);
uint16_t nb_ops_enqd = schedule_enqueue(qp, ops,
nb_ops_to_enq);

rr_qp_ctx->last_enq_slave_idx += 1;
rr_qp_ctx->last_enq_slave_idx %= rr_qp_ctx->nb_slaves;
scheduler_order_insert(order_ring, ops, nb_ops_enqd);

/* recover session if enqueue is failed */
if (unlikely(processed_ops < nb_ops)) {
for (i = processed_ops; i < nb_ops; i++)
ops[i]->sym->session = sessions[i];
}

return processed_ops;
return nb_ops_enqd;
}


Expand Down Expand Up @@ -232,105 +169,12 @@ static uint16_t
schedule_dequeue_ordering(void *qp, struct rte_crypto_op **ops,
uint16_t nb_ops)
{
struct scheduler_qp_ctx *qp_ctx = (struct scheduler_qp_ctx *)qp;
struct rr_scheduler_qp_ctx *rr_qp_ctx = (qp_ctx->private_qp_ctx);
struct scheduler_slave *slave;
struct rte_reorder_buffer *reorder_buff = qp_ctx->reorder_buf;
struct rte_mbuf *mbuf0, *mbuf1, *mbuf2, *mbuf3;
uint16_t nb_deq_ops, nb_drained_mbufs;
const uint16_t nb_op_ops = nb_ops;
struct rte_crypto_op *op_ops[nb_op_ops];
struct rte_mbuf *reorder_mbufs[nb_op_ops];
uint32_t last_slave_idx = rr_qp_ctx->last_deq_slave_idx;
uint16_t i;
struct rte_ring *order_ring =
((struct scheduler_qp_ctx *)qp)->order_ring;

if (unlikely(rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops == 0)) {
do {
last_slave_idx += 1;

if (unlikely(last_slave_idx >= rr_qp_ctx->nb_slaves))
last_slave_idx = 0;
/* looped back, means no inflight cops in the queue */
if (last_slave_idx == rr_qp_ctx->last_deq_slave_idx)
return 0;
} while (rr_qp_ctx->slaves[last_slave_idx].nb_inflight_cops
== 0);
}

slave = &rr_qp_ctx->slaves[last_slave_idx];

nb_deq_ops = rte_cryptodev_dequeue_burst(slave->dev_id,
slave->qp_id, op_ops, nb_ops);

rr_qp_ctx->last_deq_slave_idx += 1;
rr_qp_ctx->last_deq_slave_idx %= rr_qp_ctx->nb_slaves;

slave->nb_inflight_cops -= nb_deq_ops;

for (i = 0; i < nb_deq_ops && i < 4; i++)
rte_prefetch0(op_ops[i]->sym->m_src);

for (i = 0; (i < (nb_deq_ops - 8)) && (nb_deq_ops > 8); i += 4) {
mbuf0 = op_ops[i]->sym->m_src;
mbuf1 = op_ops[i + 1]->sym->m_src;
mbuf2 = op_ops[i + 2]->sym->m_src;
mbuf3 = op_ops[i + 3]->sym->m_src;

mbuf0->userdata = op_ops[i];
mbuf1->userdata = op_ops[i + 1];
mbuf2->userdata = op_ops[i + 2];
mbuf3->userdata = op_ops[i + 3];

rte_reorder_insert(reorder_buff, mbuf0);
rte_reorder_insert(reorder_buff, mbuf1);
rte_reorder_insert(reorder_buff, mbuf2);
rte_reorder_insert(reorder_buff, mbuf3);

rte_prefetch0(op_ops[i + 4]->sym->m_src);
rte_prefetch0(op_ops[i + 5]->sym->m_src);
rte_prefetch0(op_ops[i + 6]->sym->m_src);
rte_prefetch0(op_ops[i + 7]->sym->m_src);
}

for (; i < nb_deq_ops; i++) {
mbuf0 = op_ops[i]->sym->m_src;
mbuf0->userdata = op_ops[i];
rte_reorder_insert(reorder_buff, mbuf0);
}

nb_drained_mbufs = rte_reorder_drain(reorder_buff, reorder_mbufs,
nb_ops);
for (i = 0; i < nb_drained_mbufs && i < 4; i++)
rte_prefetch0(reorder_mbufs[i]);

for (i = 0; (i < (nb_drained_mbufs - 8)) && (nb_drained_mbufs > 8);
i += 4) {
ops[i] = *(struct rte_crypto_op **)reorder_mbufs[i]->userdata;
ops[i + 1] = *(struct rte_crypto_op **)
reorder_mbufs[i + 1]->userdata;
ops[i + 2] = *(struct rte_crypto_op **)
reorder_mbufs[i + 2]->userdata;
ops[i + 3] = *(struct rte_crypto_op **)
reorder_mbufs[i + 3]->userdata;

reorder_mbufs[i]->userdata = NULL;
reorder_mbufs[i + 1]->userdata = NULL;
reorder_mbufs[i + 2]->userdata = NULL;
reorder_mbufs[i + 3]->userdata = NULL;

rte_prefetch0(reorder_mbufs[i + 4]);
rte_prefetch0(reorder_mbufs[i + 5]);
rte_prefetch0(reorder_mbufs[i + 6]);
rte_prefetch0(reorder_mbufs[i + 7]);
}

for (; i < nb_drained_mbufs; i++) {
ops[i] = *(struct rte_crypto_op **)
reorder_mbufs[i]->userdata;
reorder_mbufs[i]->userdata = NULL;
}
schedule_dequeue(qp, ops, nb_ops);

return nb_drained_mbufs;
return scheduler_order_drain(order_ring, ops, nb_ops);
}

static int
Expand Down

0 comments on commit 8a48e03

Please sign in to comment.