<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>ms.c</filename>
    </added>
    <added>
      <filename>ms.h</filename>
    </added>
    <added>
      <filename>tube.c</filename>
    </added>
    <added>
      <filename>tube.h</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -42,11 +42,24 @@ static void
 conn_free(conn c)
 {
     c-&gt;fd = 0;
+    TUBE_ASSIGN(c-&gt;use, NULL);
     conn_insert(&amp;pool, c);
 }
 
+static void
+inc(ms a, tube t, size_t i)
+{
+    tube_iref(t);
+}
+
+static void
+dec(ms a, tube t, size_t i)
+{
+    tube_dref(t);
+}
+
 conn
-make_conn(int fd, char start_state)
+make_conn(int fd, char start_state, tube use, tube watch)
 {
     job j;
     conn c;
@@ -54,6 +67,15 @@ make_conn(int fd, char start_state)
     c = conn_alloc();
     if (!c) return twarn(&quot;OOM&quot;), NULL;
 
+    ms_init(&amp;c-&gt;watch, (ms_event_fn) inc, (ms_event_fn) dec);
+    if (!ms_append(&amp;c-&gt;watch, watch)) {
+        conn_free(c);
+        return twarn(&quot;OOM&quot;), NULL;
+    }
+
+    c-&gt;use = NULL; /* initialize */
+    TUBE_ASSIGN(c-&gt;use, use);
+
     c-&gt;fd = fd;
     c-&gt;state = start_state;
     c-&gt;type = 0;
@@ -149,7 +171,7 @@ conn_update_evq(conn c, const int events)
     return conn_set_evq(c, events, c-&gt;evq.ev_callback);
 }
 
-int
+static int
 conn_list_any_p(conn head)
 {
     return head-&gt;next != head || head-&gt;prev != head;
@@ -209,10 +231,10 @@ conn_close(conn c)
 
     close(c-&gt;fd);
 
-    free(c-&gt;in_job);
+    job_free(c-&gt;in_job);
 
     /* was this a peek or stats command? */
-    if (!has_reserved_this_job(c, c-&gt;out_job)) free(c-&gt;out_job);
+    if (!has_reserved_this_job(c, c-&gt;out_job)) job_free(c-&gt;out_job);
 
     c-&gt;in_job = c-&gt;out_job = NULL;
 
@@ -226,5 +248,7 @@ conn_close(conn c)
     conn_remove(c);
     if (has_reserved_job(c)) enqueue_reserved_jobs(c);
 
+    ms_clear(&amp;c-&gt;watch);
+
     conn_free(c);
 }</diff>
      <filename>conn.c</filename>
    </modified>
    <modified>
      <diff>@@ -20,7 +20,9 @@
 #define conn_h
 
 #include &quot;event.h&quot;
+#include &quot;ms.h&quot;
 #include &quot;job.h&quot;
+#include &quot;tube.h&quot;
 
 #define STATE_WANTCOMMAND 0
 #define STATE_WANTDATA 1
@@ -44,6 +46,9 @@
 #define OP_STATS 8
 #define OP_JOBSTATS 9
 #define OP_PEEK 10
+#define OP_USE 11
+#define OP_WATCH 12
+#define OP_IGNORE 13
 
 /* CONN_TYPE_* are bit masks */
 #define CONN_TYPE_PRODUCER 1
@@ -74,16 +79,17 @@ struct conn {
     job out_job;
     int out_job_sent;
     struct job reserved_jobs; /* doubly-linked list header */
+    tube use;
+    struct ms watch;
 };
 
-conn make_conn(int fd, char start_state);
+conn make_conn(int fd, char start_state, tube use, tube watch);
 
 int conn_set_evq(conn c, const int events, evh handler);
 int conn_update_evq(conn c, const int flags);
 
 void conn_close(conn c);
 
-int conn_list_any_p(conn head);
 conn conn_remove(conn c);
 void conn_insert(conn head, conn c);
 </diff>
      <filename>conn.h</filename>
    </modified>
    <modified>
      <diff>@@ -10,6 +10,12 @@ were received and sends responses in the same order. All integers in the
 protocol are formatted in decimal and (unless otherwise indicated)
 nonnegative.
 
+Names, in this protocol, are ASCII strings. They may contain letters (A-Z and
+a-z), numerals (0-9), hyphen (&quot;-&quot;), plus (&quot;+&quot;), slash (&quot;/&quot;), semicolon (&quot;;&quot;),
+dot (&quot;.&quot;), dollar-sign (&quot;$&quot;), and parentheses (&quot;(&quot; and &quot;)&quot;), but they may not
+begin with a hyphen. They are terminated by white space (either a space char or
+end of line). Each name must be at least one character long.
+
 The protocol contains two kinds of data: text lines and unstructured chunks of
 data. Text lines are used for client commands and server responses. Chunks are
 used to transfer job bodies and stats information. Each job body is an opaque
@@ -106,9 +112,23 @@ Here is a picture with more possibilities:
                         `--------&gt; *poof*
 
 
+The system has one or more tubes. Each tube consists of a ready queue and a
+delay queue. Each job spends its entire life in one tube. Consumers can show
+interest in tubes by sending the &quot;watch&quot; command; they can show disinterest by
+sending the &quot;ignore&quot; command. This set of interesting tubes is said to be a
+consumer's &quot;watch list&quot;. When a client reserves a job, it may come from any of
+the tubes in its watch list.
+
+When a client connects, its watch list is initially just the tube named
+&quot;default&quot;. If it submits jobs without having sent a &quot;use&quot; command, they will
+live in the tube named &quot;default&quot;.
+
+Tubes are created on demand whenever they are referenced. If a tube is empty
+(that is, it contains no ready, delayed, or buried jobs) and no client refers
+to it, it will be deleted.
 
-Producer Command
-----------------
+Producer Commands
+-----------------
 
 The &quot;put&quot; command is for any process that wants to insert a job into the queue.
 It comprises a command line followed by the job body:
@@ -116,7 +136,8 @@ It comprises a command line followed by the job body:
 put &lt;pri&gt; &lt;delay&gt; &lt;ttr&gt; &lt;bytes&gt;\r\n
 &lt;data&gt;\r\n
 
-It inserts a job into the queue.
+It inserts a job into the client's currently used tube (see the &quot;use&quot; command
+below).
 
  - &lt;pri&gt; is an integer &lt; 2**32. Jobs with smaller priority values will be
    scheduled before jobs with larger priorities. The most urgent priority is 0;
@@ -150,6 +171,19 @@ may be:
 
    - &lt;id&gt; is the integer id of the new job
 
+The &quot;use&quot; command is for producers. Subsequent put commands will put jobs into
+the tube specified by this command. If no use command has been issued, jobs
+will be put into the tube named &quot;default&quot;.
+
+use &lt;tube&gt;\r\n
+
+ - &lt;tube&gt; is a name at most 200 bytes. It specifies the tube to use. If the
+   tube does not exist, it will be created.
+
+The only reply is:
+
+USING\r\n
+
 Worker Commands
 ---------------
 
@@ -239,6 +273,36 @@ There are two possible responses:
 
  - &quot;NOT_FOUND\r\n&quot; if the job does not exist or is not reserved by the client.
 
+The &quot;watch&quot; command adds the named tube to the watch list for the current
+connection. A reserve command will take a job from any of the tubes in the
+watch list. For each new connection, the watch list initially consists of one
+tube, named &quot;default&quot;.
+
+watch &lt;tube&gt;\r\n
+
+ - &lt;tube&gt; is a name at most 200 bytes. It specifies a tube to add to the watch
+   list.
+
+The reply is:
+
+WATCHING &lt;count&gt;\r\n
+
+ - &lt;count&gt; is the integer number of tubes currently in the watch list.
+
+The &quot;ignore&quot; command is for consumers. It removes the named tube from the
+watch list for the current connection.
+
+ignore &lt;tube&gt;\r\n
+
+The reply is one of:
+
+ - &quot;WATCHING &lt;count&gt;\r\n&quot; to indicate success.
+
+   - &lt;count&gt; is the integer number of tubes currently in the watch list.
+
+ - &quot;NOT_IGNORED\r\n&quot; if the client attempts to ignore the only tube in its
+   watch list.
+
 Other Commands
 --------------
 
@@ -315,6 +379,8 @@ scalars.
 
    - &quot;id&quot; is the job id
 
+   - &quot;tube&quot; is the name of the tube that contains this job
+
    - &quot;state&quot; is &quot;ready&quot; or &quot;delayed&quot; or &quot;reserved&quot; or &quot;buried&quot;
 
    - &quot;age&quot; is the time in seconds since the put command that created this job.
@@ -366,6 +432,8 @@ scalars.
 
    - &quot;total-jobs&quot; is the cumulative count of jobs created.
 
+   - &quot;current-tubes&quot; is the number of currently-existing tubes.
+
    - &quot;current-connections&quot; is the number of currently open connections.
 
    - &quot;current-producers&quot; is the number of open connections that have each</diff>
      <filename>doc/protocol.txt</filename>
    </modified>
    <modified>
      <diff>@@ -20,6 +20,7 @@
 #include &lt;string.h&gt;
 
 #include &quot;job.h&quot;
+#include &quot;tube.h&quot;
 #include &quot;util.h&quot;
 
 static unsigned long long int next_id = 1;
@@ -37,12 +38,14 @@ allocate_job(int body_size)
     j-&gt;timeout_ct = j-&gt;release_ct = j-&gt;bury_ct = j-&gt;kick_ct = 0;
     j-&gt;body_size = body_size;
     j-&gt;next = j-&gt;prev = j; /* not in a linked list */
+    j-&gt;tube = NULL;
 
     return j;
 }
 
 job
-make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size)
+make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size,
+         tube tube)
 {
     job j;
 
@@ -53,10 +56,18 @@ make_job(unsigned int pri, unsigned int delay, unsigned int ttr, int body_size)
     j-&gt;pri = pri;
     j-&gt;delay = delay;
     j-&gt;ttr = ttr;
+    TUBE_ASSIGN(j-&gt;tube, tube);
 
     return j;
 }
 
+void
+job_free(job j)
+{
+    if (j) TUBE_ASSIGN(j-&gt;tube, NULL);
+    free(j);
+}
+
 int
 job_pri_cmp(job a, job b)
 {
@@ -91,10 +102,9 @@ job_copy(job j)
     n = malloc(sizeof(struct job) + j-&gt;body_size);
     if (!n) return twarnx(&quot;OOM&quot;), NULL;
 
-    n-&gt;id = j-&gt;id;
-    n-&gt;pri = j-&gt;pri;
-    n-&gt;body_size = j-&gt;body_size;
-    memcpy(n-&gt;body, j-&gt;body, j-&gt;body_size);
+    memcpy(n, j, sizeof(struct job) + j-&gt;body_size);
+    n-&gt;next = n-&gt;prev = n; /* not in a linked list */
+    TUBE_ASSIGN(n-&gt;tube, j-&gt;tube);
 
     return n;
 }</diff>
      <filename>job.c</filename>
    </modified>
    <modified>
      <diff>@@ -21,14 +21,17 @@
 
 #include &lt;time.h&gt;
 
+typedef struct job *job;
+typedef int(*job_cmp_fn)(job, job);
+
+#include &quot;tube.h&quot;
+
 #define JOB_STATE_INVALID 0
 #define JOB_STATE_READY 1
 #define JOB_STATE_RESERVED 2
 #define JOB_STATE_BURIED 3
 #define JOB_STATE_DELAYED 4
 
-typedef struct job *job;
-
 struct job {
     job prev, next; /* linked list of jobs */
     unsigned long long int id;
@@ -42,15 +45,16 @@ struct job {
     unsigned int release_ct;
     unsigned int bury_ct;
     unsigned int kick_ct;
+    tube tube;
     char state;
     char body[];
 };
 
 job allocate_job(int body_size);
 job make_job(unsigned int pri, unsigned int delay, unsigned int ttr,
-        int body_size);
+             int body_size, tube tube);
+void job_free(job j);
 
-typedef int(*job_cmp_fn)(job, job);
 int job_pri_cmp(job a, job b);
 int job_delay_cmp(job a, job b);
 </diff>
      <filename>job.h</filename>
    </modified>
    <modified>
      <diff>@@ -20,35 +20,39 @@
 #include &lt;stdio.h&gt;
 #include &lt;string.h&gt;
 
+#include &quot;tube.h&quot; /* hack to make cpp happy */
 #include &quot;pq.h&quot;
 
-pq
-make_pq(unsigned int initial_cap, job_cmp_fn cmp)
+void
+pq_init(pq q, job_cmp_fn cmp)
 {
-    pq q;
+    if (!q) return;
 
-    q = malloc(sizeof(struct pq));
-    if (!q) return NULL;
-
-    q-&gt;cap = initial_cap;
+    q-&gt;cap = 0;
     q-&gt;used = 0;
     q-&gt;cmp = cmp;
-    q-&gt;heap = malloc(initial_cap * sizeof(job));
-    if (!q-&gt;heap) return free(q), NULL;
+    q-&gt;heap = NULL;
+
+    return;
+}
 
-    return q;
+void
+pq_clear(pq q)
+{
+    free(q-&gt;heap);
+    pq_init(q, q-&gt;cmp);
 }
 
 static void
 pq_grow(pq q)
 {
     job *nheap;
-    unsigned int ncap = q-&gt;cap &lt;&lt; 1;
+    unsigned int ncap = q-&gt;cap &lt;&lt; 1 ? : 1;
 
     nheap = malloc(ncap * sizeof(job));
     if (!nheap) return;
 
-    memcpy(nheap, q-&gt;heap, q-&gt;used * sizeof(job));
+    if (q-&gt;heap) memcpy(nheap, q-&gt;heap, q-&gt;used * sizeof(job));
     free(q-&gt;heap);
     q-&gt;heap = nheap;
     q-&gt;cap = ncap;</diff>
      <filename>pq.c</filename>
    </modified>
    <modified>
      <diff>@@ -19,17 +19,21 @@
 #ifndef q_h
 #define q_h
 
+typedef struct pq *pq;
+
 #include &quot;job.h&quot;
 
-typedef struct pq {
+struct pq {
     unsigned int cap;
     unsigned int used;
     job_cmp_fn cmp;
     job *heap;
-} *pq;
+};
+
+/* initialize a priority queue */
+void pq_init(pq q, job_cmp_fn cmp);
 
-/* make a priority queue with initial capacity initial_cap */
-pq make_pq(unsigned int initial_cap, job_cmp_fn cmp);
+void pq_clear(pq q);
 
 /* return 1 if the job was inserted, else 0 */
 int pq_give(pq q, job j);</diff>
      <filename>pq.h</filename>
    </modified>
    <modified>
      <diff>@@ -27,7 +27,9 @@
 
 #include &quot;prot.h&quot;
 #include &quot;pq.h&quot;
+#include &quot;ms.h&quot;
 #include &quot;job.h&quot;
+#include &quot;tube.h&quot;
 #include &quot;conn.h&quot;
 #include &quot;util.h&quot;
 #include &quot;net.h&quot;
@@ -36,6 +38,11 @@
 /* job body cannot be greater than this many bytes long */
 #define JOB_DATA_SIZE_LIMIT ((1 &lt;&lt; 16) - 1)
 
+#define NAME_CHARS \
+    &quot;ABCDEFGHIJKLMNOPQRSTUVWXYZ&quot; \
+    &quot;abcdefghijklmnopqrstuvwxyz&quot; \
+    &quot;0123456789-+/;.$()&quot;
+
 #define CMD_PUT &quot;put &quot;
 #define CMD_PEEK &quot;peek&quot;
 #define CMD_PEEKJOB &quot;peek &quot;
@@ -46,6 +53,9 @@
 #define CMD_KICK &quot;kick &quot;
 #define CMD_STATS &quot;stats&quot;
 #define CMD_JOBSTATS &quot;stats &quot;
+#define CMD_USE &quot;use &quot;
+#define CMD_WATCH &quot;watch &quot;
+#define CMD_IGNORE &quot;ignore &quot;
 
 #define CONSTSTRLEN(m) (sizeof(m) - 1)
 
@@ -58,6 +68,9 @@
 #define CMD_KICK_LEN CONSTSTRLEN(CMD_KICK)
 #define CMD_STATS_LEN CONSTSTRLEN(CMD_STATS)
 #define CMD_JOBSTATS_LEN CONSTSTRLEN(CMD_JOBSTATS)
+#define CMD_USE_LEN CONSTSTRLEN(CMD_USE)
+#define CMD_WATCH_LEN CONSTSTRLEN(CMD_WATCH)
+#define CMD_IGNORE_LEN CONSTSTRLEN(CMD_IGNORE)
 
 #define MSG_FOUND &quot;FOUND&quot;
 #define MSG_NOTFOUND &quot;NOT_FOUND\r\n&quot;
@@ -67,11 +80,13 @@
 #define MSG_BURIED &quot;BURIED\r\n&quot;
 #define MSG_BURIED_FMT &quot;BURIED %llu\r\n&quot;
 #define MSG_INSERTED_FMT &quot;INSERTED %llu\r\n&quot;
+#define MSG_NOT_IGNORED &quot;NOT_IGNORED\r\n&quot;
 
 #define MSG_NOTFOUND_LEN CONSTSTRLEN(MSG_NOTFOUND)
 #define MSG_DELETED_LEN CONSTSTRLEN(MSG_DELETED)
 #define MSG_RELEASED_LEN CONSTSTRLEN(MSG_RELEASED)
 #define MSG_BURIED_LEN CONSTSTRLEN(MSG_BURIED)
+#define MSG_NOT_IGNORED_LEN CONSTSTRLEN(MSG_NOT_IGNORED)
 
 #define MSG_OUT_OF_MEMORY &quot;OUT_OF_MEMORY\r\n&quot;
 #define MSG_INTERNAL_ERROR &quot;INTERNAL_ERROR\r\n&quot;
@@ -97,6 +112,7 @@
     &quot;cmd-stats: %llu\n&quot; \
     &quot;job-timeouts: %llu\n&quot; \
     &quot;total-jobs: %llu\n&quot; \
+    &quot;current-tubes: %u\n&quot; \
     &quot;current-connections: %u\n&quot; \
     &quot;current-producers: %u\n&quot; \
     &quot;current-workers: %u\n&quot; \
@@ -111,6 +127,7 @@
 
 #define JOB_STATS_FMT &quot;---\n&quot; \
     &quot;id: %llu\n&quot; \
+    &quot;tube: %s\n&quot; \
     &quot;state: %s\n&quot; \
     &quot;age: %u\n&quot; \
     &quot;delay: %u\n&quot; \
@@ -122,13 +139,15 @@
     &quot;kicks: %u\n&quot; \
     &quot;\r\n&quot;
 
-static pq ready_q;
-static pq delay_q;
+static struct pq ready_q;
+static struct pq delay_q;
 
 /* Doubly-linked list of waiting connections. */
-static struct conn wait_queue = { &amp;wait_queue, &amp;wait_queue, 0 };
 static struct job graveyard = { &amp;graveyard, &amp;graveyard, 0 };
-static unsigned int buried_ct = 0, urgent_ct = 0, waiting_ct = 0;
+static unsigned int buried_ct = 0, ready_ct = 0, urgent_ct = 0, waiting_ct = 0;
+
+static tube default_tube;
+static struct ms tubes;
 
 static int drain_mode = 0;
 static time_t start_time;
@@ -155,16 +174,13 @@ static const char * op_names[] = {
     CMD_STATS,
     CMD_JOBSTATS,
     CMD_PEEK,
+    CMD_USE,
+    CMD_WATCH,
+    CMD_IGNORE,
 };
 #endif
 
 static int
-waiting_conn_p()
-{
-    return conn_list_any_p(&amp;wait_queue);
-}
-
-static int
 buried_job_p()
 {
     return job_list_any_p(&amp;graveyard);
@@ -220,10 +236,15 @@ reply_job(conn c, job j, const char *word)
 conn
 remove_waiting_conn(conn c)
 {
+    size_t i;
+
     if (!(c-&gt;type &amp; CONN_TYPE_WAITING)) return NULL;
     c-&gt;type &amp;= ~CONN_TYPE_WAITING;
     waiting_ct--;
-    return conn_remove(c);
+    for (i = 0; i &lt; c-&gt;watch.used; i++) {
+        ms_remove(&amp;((tube) c-&gt;watch.items[i])-&gt;waiting, c);
+    }
+    return c;
 }
 
 static void
@@ -237,16 +258,40 @@ reserve_job(conn c, job j)
     return reply_job(c, j, MSG_RESERVED);
 }
 
+static job
+next_eligible_job()
+{
+    tube t;
+    size_t i;
+    job j = NULL, candidate;
+
+    dprintf(&quot;tubes.used = %d\n&quot;, tubes.used);
+    for (i = 0; i &lt; tubes.used; i++) {
+        t = tubes.items[i];
+        dprintf(&quot;for %s t-&gt;waiting.used=%d t-&gt;ready.used=%d\n&quot;,
+                t-&gt;name, t-&gt;waiting.used, t-&gt;ready.used);
+        if (t-&gt;waiting.used &amp;&amp; t-&gt;ready.used) {
+            candidate = pq_peek(&amp;t-&gt;ready);
+            if (!j || candidate-&gt;id &lt; j-&gt;id) j = candidate;
+        }
+        dprintf(&quot;i = %d, tubes.used = %d\n&quot;, i, tubes.used);
+    }
+
+    return j;
+}
+
 static void
 process_queue()
 {
     job j;
 
-    while (waiting_conn_p()) {
-        j = pq_take(ready_q);
-        if (!j) return;
+    dprintf(&quot;processing queue\n&quot;);
+    while ((j = next_eligible_job())) {
+        dprintf(&quot;got eligible job %llu in %s\n&quot;, j-&gt;id, j-&gt;tube-&gt;name);
+        j = pq_take(&amp;j-&gt;tube-&gt;ready);
+        ready_ct--;
         if (j-&gt;pri &lt; URGENT_THRESHOLD) urgent_ct--;
-        reserve_job(remove_waiting_conn(wait_queue.next), j);
+        reserve_job(remove_waiting_conn(ms_take(&amp;j-&gt;tube-&gt;waiting)), j);
     }
 }
 
@@ -257,14 +302,15 @@ enqueue_job(job j, unsigned int delay)
 
     if (delay) {
         j-&gt;deadline = time(NULL) + delay;
-        r = pq_give(delay_q, j);
+        r = pq_give(&amp;delay_q, j);
         if (!r) return 0;
         j-&gt;state = JOB_STATE_DELAYED;
-        set_main_timeout(pq_peek(delay_q)-&gt;deadline);
+        set_main_timeout(pq_peek(&amp;delay_q)-&gt;deadline);
     } else {
-        r = pq_give(ready_q, j);
+        r = pq_give(&amp;j-&gt;tube-&gt;ready, j);
         if (!r) return 0;
         j-&gt;state = JOB_STATE_READY;
+        ready_ct++;
         if (j-&gt;pri &lt; URGENT_THRESHOLD) urgent_ct++;
     }
     process_queue();
@@ -298,13 +344,13 @@ enqueue_reserved_jobs(conn c)
 static job
 delay_q_peek()
 {
-    return pq_peek(delay_q);
+    return pq_peek(&amp;delay_q);
 }
 
 static job
 delay_q_take()
 {
-    return pq_take(delay_q);
+    return pq_take(&amp;delay_q);
 }
 
 static job
@@ -327,7 +373,7 @@ kick_buried_job()
     r = enqueue_job(j, 0);
     if (r) return 1;
 
-    /* ready_q is full, so bury it */
+    /* ready queue is full, so bury it */
     bury_job(j);
     return 0;
 }
@@ -335,7 +381,7 @@ kick_buried_job()
 static unsigned int
 get_delayed_job_ct()
 {
-    return pq_used(delay_q);
+    return pq_used(&amp;delay_q);
 }
 
 static int
@@ -350,7 +396,7 @@ kick_delayed_job()
     r = enqueue_job(j, 0);
     if (r) return 1;
 
-    /* ready_q is full, so delay it again */
+    /* ready queue is full, so delay it again */
     r = enqueue_job(j, j-&gt;delay);
     if (r) return 0;
 
@@ -410,9 +456,13 @@ remove_buried_job(unsigned long long int id)
 static void
 enqueue_waiting_conn(conn c)
 {
+    size_t i;
+
     waiting_ct++;
     c-&gt;type |= CONN_TYPE_WAITING;
-    conn_insert(&amp;wait_queue, conn_remove(c) ? : c);
+    for (i = 0; i &lt; c-&gt;watch.used; i++) {
+        ms_append(&amp;((tube) c-&gt;watch.items[i])-&gt;waiting, c);
+    }
 }
 
 static job
@@ -446,19 +496,33 @@ find_reserved_job(unsigned long long int id)
 }
 
 static job
+peek_ready_job(unsigned long long int id)
+{
+
+    job j;
+    size_t i;
+
+    for (i = 0; i &lt; tubes.used; i++) {
+        j = pq_find(&amp;((tube) tubes.items[i])-&gt;ready, id);
+        if (j) return j;
+    }
+    return NULL;
+}
+
+/* TODO: make a global hashtable of jobs because this is slow */
+static job
 peek_job(unsigned long long int id)
 {
-    return pq_find(ready_q, id) ? :
-           pq_find(delay_q, id) ? :
+    return peek_ready_job(id) ? :
+           pq_find(&amp;delay_q, id) ? :
            find_reserved_job(id) ? :
-           find_reserved_job_in_list(&amp;wait_queue, id) ? :
            find_buried_job(id);
 }
 
 static unsigned int
 get_ready_job_ct()
 {
-    return pq_used(ready_q);
+    return ready_ct;
 }
 
 static unsigned int
@@ -528,6 +592,9 @@ which_cmd(conn c)
     TEST_CMD(c-&gt;cmd, CMD_KICK, OP_KICK);
     TEST_CMD(c-&gt;cmd, CMD_JOBSTATS, OP_JOBSTATS);
     TEST_CMD(c-&gt;cmd, CMD_STATS, OP_STATS);
+    TEST_CMD(c-&gt;cmd, CMD_USE, OP_USE);
+    TEST_CMD(c-&gt;cmd, CMD_WATCH, OP_WATCH);
+    TEST_CMD(c-&gt;cmd, CMD_IGNORE, OP_IGNORE);
     return OP_UNKNOWN;
 }
 
@@ -570,7 +637,7 @@ enqueue_incoming_job(conn c)
 
     /* check if the trailer is present and correct */
     if (memcmp(j-&gt;body + j-&gt;body_size - 2, &quot;\r\n&quot;, 2)) {
-        free(j);
+        job_free(j);
         return reply_msg(c, MSG_EXPECTED_CRLF);
     }
 
@@ -612,6 +679,7 @@ fmt_stats(char *buf, size_t size, void *x)
             stats_ct,
             timeout_ct,
             total_jobs(),
+            tubes.used,
             count_cur_conns(),
             count_cur_producers(),
             count_cur_workers(),
@@ -710,6 +778,7 @@ fmt_job_stats(char *buf, size_t size, void *jp)
     t = time(NULL);
     return snprintf(buf, size, JOB_STATS_FMT,
             j-&gt;id,
+            j-&gt;tube-&gt;name,
             job_state(j),
             (unsigned int) (t - j-&gt;creation),
             j-&gt;delay,
@@ -749,6 +818,56 @@ remove_reserved_job(conn c, unsigned long long int id)
     return remove_this_reserved_job(c, find_reserved_job_in_conn(c, id));
 }
 
+static int
+name_is_ok(const char *name, size_t max)
+{
+    size_t len = strlen(name);
+    return len &gt; 0 &amp;&amp; len &lt;= max &amp;&amp;
+        strspn(name, NAME_CHARS) == len &amp;&amp; name[0] != '-';
+}
+
+static tube
+find_tube(const char *name)
+{
+    tube t;
+    size_t i;
+
+    for (i = 0; i &lt; tubes.used; i++) {
+        t = tubes.items[i];
+        if (strncmp(t-&gt;name, name, MAX_TUBE_NAME_LEN) == 0) return t;
+    }
+    return NULL;
+}
+
+void
+prot_remove_tube(tube t)
+{
+    ms_remove(&amp;tubes, t);
+}
+
+static tube
+make_and_insert_tube(const char *name)
+{
+    int r;
+    tube t = NULL;
+
+    t = make_tube(name);
+    if (!t) return NULL;
+
+    /* We want this global tube list to behave like &quot;weak&quot; refs, so don't
+     * increment the ref count. */
+    r = ms_append(&amp;tubes, t);
+    if (!r) return tube_dref(t), NULL;
+
+    return t;
+}
+
+static tube
+find_or_make_tube(const char *name)
+{
+    return find_tube(name) ? : make_and_insert_tube(name);
+}
+
 static void
 dispatch_cmd(conn c)
 {
@@ -756,9 +875,10 @@ dispatch_cmd(conn c)
     unsigned int count;
     job j;
     char type;
-    char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf;
+    char *size_buf, *delay_buf, *ttr_buf, *pri_buf, *end_buf, *name;
     unsigned int pri, delay, ttr, body_size;
     unsigned long long int id;
+    tube t = NULL;
 
     /* NUL-terminate this string so we can use strtol and friends */
     c-&gt;cmd[c-&gt;cmd_len - 2] = '\0';
@@ -797,7 +917,7 @@ dispatch_cmd(conn c)
 
         conn_set_producer(c);
 
-        c-&gt;in_job = make_job(pri, delay, ttr ? : 1, body_size + 2);
+        c-&gt;in_job = make_job(pri, delay, ttr ? : 1, body_size + 2, c-&gt;use);
 
         fill_extra_data(c);
 
@@ -856,7 +976,7 @@ dispatch_cmd(conn c)
         if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
 
         delete_ct++; /* stats */
-        free(j);
+        job_free(j);
 
         reply(c, MSG_DELETED, MSG_DELETED_LEN, STATE_SENDWORD);
         break;
@@ -935,8 +1055,54 @@ dispatch_cmd(conn c)
         if (!j) return reply(c, MSG_NOTFOUND, MSG_NOTFOUND_LEN, STATE_SENDWORD);
 
         stats_ct++; /* stats */
+
+        if (!j-&gt;tube) return reply_serr(c, MSG_INTERNAL_ERROR);
         do_stats(c, fmt_job_stats, j);
         break;
+    case OP_USE:
+        name = c-&gt;cmd + CMD_USE_LEN;
+        if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
+
+        TUBE_ASSIGN(t, find_or_make_tube(name));
+        if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
+
+        TUBE_ASSIGN(c-&gt;use, t);
+        TUBE_ASSIGN(t, NULL);
+
+        reply_line(c, STATE_SENDWORD, &quot;USING %s\r\n&quot;, c-&gt;use-&gt;name);
+        break;
+    case OP_WATCH:
+        name = c-&gt;cmd + CMD_WATCH_LEN;
+        if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
+
+        TUBE_ASSIGN(t, find_or_make_tube(name));
+        if (!t) return reply_serr(c, MSG_OUT_OF_MEMORY);
+
+        r = 1;
+        if (!ms_contains(&amp;c-&gt;watch, t)) r = ms_append(&amp;c-&gt;watch, t);
+        TUBE_ASSIGN(t, NULL);
+        if (!r) return reply_serr(c, MSG_OUT_OF_MEMORY);
+
+        reply_line(c, STATE_SENDWORD, &quot;WATCHING %d\r\n&quot;, c-&gt;watch.used);
+        break;
+    case OP_IGNORE:
+        name = c-&gt;cmd + CMD_IGNORE_LEN;
+        if (!name_is_ok(name, 200)) return reply_msg(c, MSG_BAD_FORMAT);
+
+        t = NULL;
+        for (i = 0; i &lt; c-&gt;watch.used; i++) {
+            t = c-&gt;watch.items[i];
+            if (strncmp(t-&gt;name, name, MAX_TUBE_NAME_LEN) == 0) break;
+            t = NULL;
+        }
+
+        if (t &amp;&amp; c-&gt;watch.used &lt; 2) return reply_msg(c, MSG_NOT_IGNORED);
+
+        if (t) ms_remove(&amp;c-&gt;watch, t); /* may free t if refcount =&gt; 0 */
+        t = NULL;
+
+        reply_line(c, STATE_SENDWORD, &quot;WATCHING %d\r\n&quot;, c-&gt;watch.used);
+        break;
     default:
         return reply_msg(c, MSG_UNKNOWN_COMMAND);
     }
@@ -983,7 +1149,7 @@ reset_conn(conn c)
     if (r == -1) return twarnx(&quot;update events failed&quot;), conn_close(c);
 
     /* was this a peek or stats command? */
-    if (!has_reserved_this_job(c, c-&gt;out_job)) free(c-&gt;out_job);
+    if (!has_reserved_this_job(c, c-&gt;out_job)) job_free(c-&gt;out_job);
     c-&gt;out_job = NULL;
 
     c-&gt;reply_sent = 0; /* now that we're done, reset this */
@@ -1006,12 +1172,14 @@ h_conn_data(conn c)
         c-&gt;cmd_read += r; /* we got some bytes */
 
         c-&gt;cmd_len = cmd_len(c); /* find the EOL */
+        dprintf(&quot;cmd_len is %d\n&quot;, c-&gt;cmd_len);
 
         /* yay, complete command line */
         if (c-&gt;cmd_len) return do_cmd(c);
 
         /* c-&gt;cmd_read &gt; LINE_BUF_SIZE can't happen */
 
+        dprintf(&quot;cmd_read is %d\n&quot;, c-&gt;cmd_read);
         /* command line too long? */
         if (c-&gt;cmd_read == LINE_BUF_SIZE) {
             return reply_msg(c, MSG_BAD_FORMAT);
@@ -1152,7 +1320,7 @@ h_accept(const int fd, const short which, struct event *ev)
     r = fcntl(cfd, F_SETFL, flags | O_NONBLOCK);
     if (r &lt; 0) return twarn(&quot;setting O_NONBLOCK&quot;), close(cfd), v();
 
-    c = make_conn(cfd, STATE_WANTCOMMAND);
+    c = make_conn(cfd, STATE_WANTCOMMAND, default_tube, default_tube);
     if (!c) return twarnx(&quot;make_conn() failed&quot;), close(cfd), brake();
 
     dprintf(&quot;accepted conn, fd=%d\n&quot;, cfd);
@@ -1164,6 +1332,11 @@ void
 prot_init()
 {
     start_time = time(NULL);
-    ready_q = make_pq(16, job_pri_cmp);
-    delay_q = make_pq(16, job_delay_cmp);
+    pq_init(&amp;ready_q, job_pri_cmp);
+    pq_init(&amp;delay_q, job_delay_cmp);
+
+    ms_init(&amp;tubes, NULL, NULL);
+
+    TUBE_ASSIGN(default_tube, find_or_make_tube(&quot;default&quot;));
+    if (!default_tube) twarnx(&quot;Out of memory during startup!&quot;);
 }</diff>
      <filename>prot.c</filename>
    </modified>
    <modified>
      <diff>@@ -31,5 +31,6 @@ void enqueue_reserved_jobs(conn c);
 
 void enter_drain_mode(int sig);
 void h_accept(const int fd, const short which, struct event *ev);
+void prot_remove_tube(tube t);
 
 #endif /*prot_h*/</diff>
      <filename>prot.h</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>35594240aec055321e97065f778be5ec6196cbfa</id>
    </parent>
  </parents>
  <author>
    <name>Keith Rarick</name>
    <email>kr@causes.com</email>
  </author>
  <url>http://github.com/kr/beanstalkd/commit/c1b09c4fd41f4496fc147bd273d0b823b400f35b</url>
  <id>c1b09c4fd41f4496fc147bd273d0b823b400f35b</id>
  <committed-date>2008-02-26T00:58:36-08:00</committed-date>
  <authored-date>2008-02-20T15:08:02-08:00</authored-date>
  <message>Implement tubes.</message>
  <tree>477a3cd539a7d01603ace54ae1b8d107b0af4ef6</tree>
  <committer>
    <name>Keith Rarick</name>
    <email>kr@causes.com</email>
  </committer>
</commit>
