Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

add support for TAP request/response with checkpoint info

Change-Id: If646c70aaa345f3dd1191257dfcedc1aaacdc7b1
Reviewed-on: http://review.couchbase.org/13571
Reviewed-by: Matt Ingenthron <matt@couchbase.com>
Tested-by: Matt Ingenthron <matt@couchbase.com>
  • Loading branch information...
commit 8fa20606020d440d755f2aed473011db17d7c3bf 1 parent b9cf1f4
Marty Schoch mschoch authored ingenthr committed
28 src/main/java/net/spy/memcached/tapmessage/RequestMessage.java
@@ -23,8 +23,10 @@
23 23 package net.spy.memcached.tapmessage;
24 24
25 25 import java.nio.ByteBuffer;
  26 +import java.util.HashMap;
26 27 import java.util.LinkedList;
27 28 import java.util.List;
  29 +import java.util.Map;
28 30 import java.util.UUID;
29 31
30 32 /**
@@ -34,11 +36,13 @@
34 36 public class RequestMessage extends BaseMessage{
35 37 private boolean hasBackfill;
36 38 private boolean hasVBucketList;
  39 + private boolean hasVBucketCheckpoints;
37 40 private boolean hasFlags;
38 41 private List<TapRequestFlag> flagList;
39 42 private short[] vblist;
40 43 private String name;
41 44 private long backfilldate;
  45 + private Map<Short, Long> vBucketCheckpoints;
42 46
43 47 /**
44 48 * Create a tap request message. These messages are used to start tap streams.
@@ -46,6 +50,7 @@
46 50 public RequestMessage() {
47 51 flagList = new LinkedList<TapRequestFlag>();
48 52 vblist = new short[0];
  53 + vBucketCheckpoints = new HashMap<Short, Long>();
49 54 name = UUID.randomUUID().toString();
50 55 backfilldate = -1;
51 56 totalbody += name.length();
@@ -74,6 +79,10 @@ public void setFlags(TapRequestFlag f) {
74 79 hasVBucketList = true;
75 80 totalbody += 2;
76 81 }
  82 + if (f.equals(TapRequestFlag.CHECKPOINT)) {
  83 + hasVBucketCheckpoints = true;
  84 + totalbody += 2;
  85 + }
77 86 flagList.add(f);
78 87 }
79 88 }
@@ -109,6 +118,18 @@ public void setVbucketlist(short[] vbs) {
109 118 }
110 119
111 120 /**
  121 + * Sets a map of vbucket checkpoints.
  122 + *
  123 + * @param vbchkpnts - A map of vbucket checkpoint identifiers
  124 + */
  125 + public void setvBucketCheckpoints(Map<Short, Long> vbchkpnts) {
  126 + int oldSize = (vBucketCheckpoints.size()) * 10;
  127 + int newSize = (vbchkpnts.size()) * 10;
  128 + totalbody += newSize - oldSize;
  129 + vBucketCheckpoints = vbchkpnts;
  130 + }
  131 +
  132 + /**
112 133 * Sets a name for this tap stream. If the tap stream fails this name can be
113 134 * used to try to restart the tap stream from where it last left off.
114 135 *
@@ -156,6 +177,13 @@ public ByteBuffer getBytes() {
156 177 bb.putShort(vblist[i]);
157 178 }
158 179 }
  180 + if (hasVBucketCheckpoints) {
  181 + bb.putShort((short)vBucketCheckpoints.size());
  182 + for (Short vBucket : vBucketCheckpoints.keySet()) {
  183 + bb.putShort(vBucket);
  184 + bb.putLong(vBucketCheckpoints.get(vBucket));
  185 + }
  186 + }
159 187
160 188 return (ByteBuffer) bb.flip();
161 189 }
32 src/main/java/net/spy/memcached/tapmessage/ResponseMessage.java
@@ -52,6 +52,7 @@
52 52 private final int itemflags;
53 53 private int itemexpiry;
54 54 private final int vbucketstate;
  55 + private final long checkpoint;
55 56 private final byte[] key;
56 57 private final byte[] value;
57 58 private final byte[] revid;
@@ -90,6 +91,7 @@ public ResponseMessage(byte[] b) {
90 91 }
91 92 itemexpiry = decodeInt(b, ITEM_EXPIRY_OFFSET);
92 93 vbucketstate = 0;
  94 + checkpoint = 0;
93 95 revid = new byte[engineprivate];
94 96 System.arraycopy(b, KEY_OFFSET, revid, 0, engineprivate);
95 97 key = new byte[keylength];
@@ -102,6 +104,7 @@ public ResponseMessage(byte[] b) {
102 104 vbucketstate = 0;
103 105 revid = new byte[engineprivate];
104 106 System.arraycopy(b, 32, revid, 0, engineprivate);
  107 + checkpoint = 0;
105 108 key = new byte[keylength];
106 109 System.arraycopy(b, 32 + engineprivate, key, 0, keylength);
107 110 value = new byte[0];
@@ -109,6 +112,24 @@ public ResponseMessage(byte[] b) {
109 112 itemflags = 0;
110 113 itemexpiry = 0;
111 114 vbucketstate = decodeInt(b, ITEM_FLAGS_OFFSET);
  115 + checkpoint = 0;
  116 + key = new byte[0];
  117 + value = new byte[0];
  118 + revid = new byte[0];
  119 + } else if (opcode.equals(TapOpcode.START_CHECKPOINT)
  120 + || opcode.equals(TapOpcode.END_CHECKPOINT)) {
  121 + itemflags = 0;
  122 + itemexpiry = 0;
  123 + vbucketstate = 0;
  124 + checkpoint = decodeLong(b, KEY_OFFSET);
  125 + key = new byte[0];
  126 + value = new byte[0];
  127 + revid = new byte[0];
  128 + } else if (opcode.equals(TapOpcode.OPAQUE)) {
  129 + itemflags = 0;
  130 + itemexpiry = 0;
  131 + vbucketstate = decodeInt(b, ITEM_FLAGS_OFFSET);
  132 + checkpoint = 0;
112 133 key = new byte[0];
113 134 value = new byte[0];
114 135 revid = new byte[0];
@@ -116,6 +137,7 @@ public ResponseMessage(byte[] b) {
116 137 itemflags = 0;
117 138 itemexpiry = 0;
118 139 vbucketstate = 0;
  140 + checkpoint = 0;
119 141 key = new byte[0];
120 142 value = new byte[0];
121 143 revid = new byte[0];
@@ -188,6 +210,16 @@ public int getVBucketState() {
188 210 }
189 211
190 212 /**
  213 + * Gets the checkpoint of the vbucket. Only returned with a start/end
  214 + * checkpoint message.
  215 + *
  216 + * @return the checkpoint
  217 + */
  218 + public long getCheckpoint() {
  219 + return checkpoint;
  220 + }
  221 +
  222 + /**
191 223 * Gets the value of the items flag field. Only returned with a tap mutation
192 224 * message.
193 225 *

0 comments on commit 8fa2060

Please sign in to comment.
Something went wrong with that request. Please try again.