Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Send NAK when the command is filtered out

  • Loading branch information...
commit 5442f46c270685e72f39d09ba453d800ee5eadef 1 parent a71f4d9
@hurtonm authored
View
6 libmdp/include/mdp_common.h
@@ -33,11 +33,11 @@
// MDP/Client commands, as strings
#define MDPC_REQUEST "\001"
-#define MDPC_PARTIAL "\002"
-#define MDPC_FINAL "\003"
+#define MDPC_REPORT "\002"
+#define MDPC_NAK "\003"
static char *mdpc_commands [] = {
- NULL, "REQUEST", "PARTIAL", "FINAL",
+ NULL, "REQUEST", "REPORT", "NAK",
};
// This is the version of MDP/Worker we implement
View
59 libmdp/src/mdp_broker.c
@@ -79,7 +79,7 @@ static service_t *
static void
s_service_destroy (void *argument);
static void
- s_service_dispatch (service_t *service, zframe_t *sender, zmsg_t *msg);
+ s_service_dispatch (service_t *service);
static void
s_service_enable_command (service_t *self, const char *command);
static void
@@ -183,7 +183,7 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
zlist_append (worker->service->waiting, worker);
worker->service->workers++;
worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
- s_service_dispatch (worker->service, NULL, NULL);
+ s_service_dispatch (worker->service);
zframe_destroy (&service_frame);
zclock_log ("worker created");
}
@@ -195,6 +195,7 @@ s_broker_worker_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
// protocol header and service name, then rewrap envelope.
zframe_t *client = zmsg_unwrap (msg);
zmsg_pushstr (msg, worker->service->name);
+ zmsg_pushstr (msg, MDPC_REPORT);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, client);
zmsg_send (&msg, self->socket);
@@ -280,13 +281,35 @@ s_broker_client_msg (broker_t *self, zframe_t *sender, zmsg_t *msg)
// Insert the protocol header and service name, then rewrap envelope.
zmsg_push (msg, zframe_dup (service_frame));
+ zmsg_pushstr (msg, MDPC_REPORT);
zmsg_pushstr (msg, MDPC_CLIENT);
zmsg_wrap (msg, zframe_dup (sender));
zmsg_send (&msg, self->socket);
}
- else
- // Else dispatch the message to the requested service
- s_service_dispatch (service, sender, msg);
+ else {
+ int enabled = 1;
+ if (zmsg_size (msg) >= 1) {
+ zframe_t *cmd_frame = zmsg_first (msg);
+ char *cmd = zframe_strdup (cmd_frame);
+ enabled = s_service_is_command_enabled (service, cmd);
+ free (cmd);
+ }
+
+ // Forward the message to the worker.
+ if (enabled) {
+ zmsg_wrap (msg, zframe_dup (sender));
+ zlist_append (service->requests, msg);
+ s_service_dispatch (service);
+ }
+ // Send a NAK message back to the client.
+ else {
+ zmsg_push (msg, zframe_dup (service_frame));
+ zmsg_pushstr (msg, MDPC_NAK);
+ zmsg_pushstr (msg, MDPC_CLIENT);
+ zmsg_wrap (msg, zframe_dup (sender));
+ zmsg_send (&msg, self->socket);
+ }
+ }
zframe_destroy (&service_frame);
}
@@ -368,33 +391,11 @@ s_service_destroy (void *argument)
}
// The dispatch method sends request to the worker.
-// If no worker is available, we put the request into
-// the queue.
-
static void
-s_service_dispatch (service_t *self, zframe_t *sender, zmsg_t *msg)
+s_service_dispatch (service_t *self)
{
assert (self);
- if (msg) {
- int enabled = 1;
- if (zmsg_size (msg) >= 1) {
- zframe_t *cmd_frame = zmsg_first (msg);
- char *cmd = zframe_strdup (cmd_frame);
- enabled = s_service_is_command_enabled (self, cmd);
- free (cmd);
- }
- // If the command is blacklisted, we drop the message;
- // otherwise, we queue the message.
- if (enabled) {
- // Set reply return address to client sender
- zmsg_wrap (msg, zframe_dup (sender));
- zlist_append (self->requests, msg);
- }
- else
- zmsg_destroy (&msg);
- }
-
s_broker_purge (self->broker);
if (zlist_size (self->waiting) == 0)
return;
@@ -404,7 +405,7 @@ s_service_dispatch (service_t *self, zframe_t *sender, zmsg_t *msg)
zlist_remove (self->waiting, worker);
zmsg_t *msg = (zmsg_t*)zlist_pop (self->requests);
s_worker_send (worker, MDPW_REQUEST, NULL, msg);
- // Workers are scheduled in the RR fashion
+ // Workers are scheduled in the round-robin fashion
zlist_append (self->waiting, worker);
zmsg_destroy (&msg);
}
View
7 libmdp/src/mdp_client.c
@@ -170,11 +170,12 @@ mdp_client_recv (mdp_client_t *self, char **service_p)
// Message format:
// Frame 1: empty frame (delimiter)
// Frame 2: "MDPCxy" (six bytes, MDP/Client x.y)
- // Frame 3: Service name (printable string)
- // Frame 4..n: Application frames
+ // Frame 3: REPORT|NAK
+ // Frame 4: Service name (printable string)
+ // Frame 5..n: Application frames
// We would handle malformed replies better in real code
- assert (zmsg_size (msg) >= 4);
+ assert (zmsg_size (msg) >= 5);
zframe_t *empty = zmsg_pop (msg);
assert (zframe_streq (empty, ""));
Please sign in to comment.
Something went wrong with that request. Please try again.