Skip to content
This repository
Browse code

Make recovery work better with many clients

Limit the number of parallel reads, early stop, etc..
  • Loading branch information...
commit 756568b0616e54a7fa6482f0ddf426b4bdfe4143 1 parent cc4de96
Daniel Gómez Ferro authored
33 src/main/java/com/yahoo/omid/client/TSOClient.java
@@ -333,8 +333,8 @@ public TSOClient(Configuration conf) throws IOException {
333 333
334 334 String host = conf.get("tso.host");
335 335 int port = conf.getInt("tso.port", 1234);
336   - max_retries = conf.getInt("tso.max_retries", 10);
337   - retry_delay_ms = conf.getInt("tso.retry_delay_ms", 3000);
  336 + max_retries = conf.getInt("tso.max_retries", 100);
  337 + retry_delay_ms = conf.getInt("tso.retry_delay_ms", 1000);
338 338
339 339 if (host == null) {
340 340 throw new IOException("tso.host missing from configuration");
@@ -359,7 +359,12 @@ private State connectIfNeeded() throws IOException {
359 359 throw e;
360 360 }
361 361 retries++;
362   - bootstrap.connect(addr);
  362 + bootstrap.connect(addr).addListener(new ChannelFutureListener() {
  363 + @Override
  364 + public void operationComplete(ChannelFuture future) throws Exception {
  365 + LOG.debug("Connection completed. Success: " + future.isSuccess());
  366 + }
  367 + });
363 368 state = State.CONNECTING;
364 369 return state;
365 370 }
@@ -422,6 +427,7 @@ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
422 427 retries = 0;
423 428 }
424 429 clearState();
  430 + LOG.debug("Channel connected");
425 431 Op o = queuedOps.poll();;
426 432 while (o != null && state == State.CONNECTED) {
427 433 o.execute(channel);
@@ -442,8 +448,24 @@ private void clearState() {
442 448 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e)
443 449 throws Exception {
444 450 synchronized(state) {
  451 + LOG.debug("Channel disconnected");
445 452 channel = null;
446 453 state = State.DISCONNECTED;
  454 + for (CreateCallback cb : createCallbacks) {
  455 + cb.error(new IOException("Channel Disconnected"));
  456 + }
  457 + for (CommitCallback cb : commitCallbacks.values()) {
  458 + cb.error(new IOException("Channel Disconnected"));
  459 + }
  460 + for (List<CommitQueryCallback> lcqb : isCommittedCallbacks.values()) {
  461 + for (CommitQueryCallback cqb : lcqb) {
  462 + cqb.error(new IOException("Channel Disconnected"));
  463 + }
  464 + }
  465 + createCallbacks.clear();
  466 + commitCallbacks.clear();
  467 + isCommittedCallbacks.clear();
  468 + connectIfNeeded();
447 469 }
448 470 }
449 471
@@ -547,13 +569,14 @@ public void exceptionCaught(ChannelHandlerContext ctx,
547 569 throws Exception {
548 570 System.out.println("Unexpected exception " + e.getCause());
549 571 e.getCause().printStackTrace();
  572 +// e.getChannel().disconnect();
550 573
551 574 synchronized(state) {
552 575
553 576 if (state == State.CONNECTING) {
554 577 state = State.RETRY_CONNECT_WAIT;
555   - if (LOG.isTraceEnabled()) {
556   - LOG.trace("Retrying connect in " + retry_delay_ms + "ms " + retries);
  578 + if (LOG.isDebugEnabled()) {
  579 + LOG.debug("Retrying connect in " + retry_delay_ms + "ms " + retries);
557 580 }
558 581 try {
559 582 retryTimer.schedule(new TimerTask() {
2  src/main/java/com/yahoo/omid/tso/Bucket.java
@@ -59,7 +59,7 @@ public boolean isUncommited(long id) {
59 59 return aborted;
60 60 }
61 61
62   - LOG.debug("Performing scanning...");
  62 + LOG.trace("Performing scanning...");
63 63
64 64 for (int i = transactions.nextClearBit(firstUncommited); i >= 0
65 65 && i <= lastCommited; i = transactions.nextClearBit(i + 1)) {
21 src/main/java/com/yahoo/omid/tso/ClientHandler.java
@@ -19,7 +19,6 @@
19 19 import java.io.IOException;
20 20 import java.util.Collections;
21 21 import java.util.Date;
22   -import java.util.HashMap;
23 22 import java.util.HashSet;
24 23 import java.util.Set;
25 24 import java.util.concurrent.BlockingQueue;
@@ -37,8 +36,6 @@
37 36 import org.jboss.netty.channel.ChannelFutureListener;
38 37 import org.jboss.netty.channel.ChannelHandlerContext;
39 38 import org.jboss.netty.channel.ChannelStateEvent;
40   -import org.jboss.netty.channel.Channels;
41   -import org.jboss.netty.channel.ExceptionEvent;
42 39
43 40 import com.yahoo.omid.client.SyncAbortCompleteCallback;
44 41 import com.yahoo.omid.client.SyncCommitCallback;
@@ -161,13 +158,9 @@ public ClientHandler(Configuration conf, int nbMessage, int inflight, boolean pa
161 158 @Override
162 159 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
163 160 super.channelConnected(ctx, e);
164   - try {
165   - Thread.sleep(15000);
166   - } catch (InterruptedException e1) {
167   - //ignore
168   - }
169 161 startDate = new Date();
170 162 channel = e.getChannel();
  163 + outstandingTransactions = 0;
171 164 startTransaction();
172 165 }
173 166
@@ -269,18 +262,6 @@ private long getSizeAborted() {
269 262 return aborted.size() * 8 * 8;
270 263 }
271 264
272   - @Override
273   - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
274   - if (e.getCause() instanceof IOException) {
275   - LOG.warn("IOException from downstream.", e.getCause());
276   - } else {
277   - LOG.warn("Unexpected exception from downstream.", e.getCause());
278   - }
279   - // Offer default object
280   - answer.offer(false);
281   - Channels.close(e.getChannel());
282   - }
283   -
284 265 private java.util.Random rnd;
285 266
286 267 private boolean pauseClient;
7 src/main/java/com/yahoo/omid/tso/CommitHashMap.java
@@ -16,6 +16,11 @@
16 16
17 17 package com.yahoo.omid.tso;
18 18
  19 +import java.util.Collections;
  20 +import java.util.Set;
  21 +
  22 +import org.jboss.netty.util.internal.ConcurrentHashMap;
  23 +
19 24 /**
20 25 * A hash map that uses byte[] for the key rather than longs.
21 26 *
@@ -138,7 +143,7 @@ public CommitHashMap(int initialCapacity, float loadFactor) {
138 143
139 144 // set of half aborted transactions
140 145 // TODO: set the initial capacity in a smarter way
141   - java.util.HashSet<Long> halfAborted = new java.util.HashSet<Long>(10000);
  146 + Set<Long> halfAborted = Collections.newSetFromMap(new ConcurrentHashMap<Long, Boolean>(10000));
142 147
143 148 // add a new half aborted transaction
144 149 void setHalfAborted(long startTimestamp) {
67 src/main/java/com/yahoo/omid/tso/TSOHandler.java
@@ -84,7 +84,6 @@
84 84 * Channel Group
85 85 */
86 86 private ChannelGroup channelGroup = null;
87   - private static ChannelGroup clientChannels = new DefaultChannelGroup("clients");
88 87
89 88 private Map<Channel, ReadingBuffer> messageBuffersMap = new HashMap<Channel, ReadingBuffer>();
90 89
@@ -198,28 +197,33 @@ public void handle(TimestampRequest msg, ChannelHandlerContext ctx) {
198 197 }
199 198
200 199 ReadingBuffer buffer;
  200 + Channel channel = null;
  201 + boolean bootstrap = false;
201 202 synchronized (messageBuffersMap) {
202 203 buffer = messageBuffersMap.get(ctx.getChannel());
203 204 if (buffer == null) {
204   - synchronized (sharedState) {
  205 +// synchronized (sharedState) {
205 206 synchronized (sharedMsgBufLock) {
206   - Channel channel = ctx.getChannel();
207   - channel.write(new CommittedTransactionReport(sharedState.latestStartTimestamp, sharedState.latestCommitTimestamp));
208   - for (Long halfAborted : sharedState.hashmap.halfAborted) {
209   - channel.write(new AbortedTransactionReport(halfAborted));
210   - }
211   - channel.write(new AbortedTransactionReport(sharedState.latestHalfAbortTimestamp));
212   - channel.write(new FullAbortReport(sharedState.latestFullAbortTimestamp));
213   - channel.write(new LargestDeletedTimestampReport(sharedState.largestDeletedTimestamp));
  207 + bootstrap = true;
  208 + channel = ctx.getChannel();
  209 + channel.write(new CommittedTransactionReport(sharedState.latestStartTimestamp.get(), sharedState.latestCommitTimestamp.get()));
  210 + channel.write(new AbortedTransactionReport(sharedState.latestHalfAbortTimestamp.get()));
  211 + channel.write(new FullAbortReport(sharedState.latestFullAbortTimestamp.get()));
  212 + channel.write(new LargestDeletedTimestampReport(sharedState.largestDeletedTimestamp.get()));
214 213 buffer = sharedState.sharedMessageBuffer.new ReadingBuffer(channel);
215 214 messageBuffersMap.put(channel, buffer);
216 215 channelGroup.add(channel);
217   - clientChannels.add(channel);
  216 +// clientChannels.add(channel);
218 217 LOG.warn("Channel connected: " + messageBuffersMap.size());
219 218 }
220   - }
  219 +// }
221 220 }
222 221 }
  222 + if (bootstrap) {
  223 + for (Long halfAborted : sharedState.hashmap.halfAborted) {
  224 + channel.write(new AbortedTransactionReport(halfAborted));
  225 + }
  226 + }
223 227 synchronized (sharedMsgBufLock) {
224 228 sharedState.sharedMessageBuffer.writeTimestamp(timestamp);
225 229 buffer.flush();
@@ -255,10 +259,10 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
255 259 if (msg.startTimestamp < timestampOracle.first()) {
256 260 reply.committed = false;
257 261 LOG.warn("Aborting transaction after restarting TSO");
258   - } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp) {
  262 + } else if (msg.startTimestamp < sharedState.largestDeletedTimestamp.get()) {
259 263 // Too old
260 264 reply.committed = false;//set as abort
261   -// LOG.warn("Too old starttimestamp: ST "+ msg.startTimestamp +" MAX " + sharedState.largestDeletedTimestamp);
  265 + LOG.warn("Too old starttimestamp: ST "+ msg.startTimestamp +" MAX " + sharedState.largestDeletedTimestamp);
262 266 } else {
263 267 //1. check the write-write conflicts
264 268 for (RowKey r: msg.rows) {
@@ -268,9 +272,9 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
268 272 //System.out.println("Abort...............");
269 273 reply.committed = false;//set as abort
270 274 break;
271   - } else if (value == 0 && sharedState.largestDeletedTimestamp > msg.startTimestamp) {
  275 + } else if (value == 0 && sharedState.largestDeletedTimestamp.get() > msg.startTimestamp) {
272 276 //then it could have been committed after start timestamp but deleted by recycling
273   - System.out.println("Old............... " + sharedState.largestDeletedTimestamp + " " + msg.startTimestamp);
  277 + LOG.warn("Old............... " + sharedState.largestDeletedTimestamp + " " + msg.startTimestamp);
274 278 reply.committed = false;//set as abort
275 279 break;
276 280 }
@@ -285,12 +289,13 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
285 289 //2. commit
286 290 try {
287 291 long commitTimestamp = timestampOracle.next(toWAL);
  292 +// System.out.println(" Committing " + msg.startTimestamp + " with ts " + commitTimestamp);
288 293 sharedState.uncommited.commit(commitTimestamp);
289 294 sharedState.uncommited.commit(msg.startTimestamp);
290 295 reply.commitTimestamp = commitTimestamp;
291 296 if (msg.rows.length > 0) {
292   - if(LOG.isDebugEnabled()){
293   - LOG.debug("Adding commit to WAL");
  297 + if(LOG.isTraceEnabled()){
  298 + LOG.trace("Adding commit to WAL");
294 299 }
295 300 toWAL.writeByte(LoggerProtocol.COMMIT);
296 301 toWAL.writeLong(msg.startTimestamp);
@@ -299,18 +304,18 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
299 304
300 305 for (RowKey r: msg.rows) {
301 306 // toWAL.write(r.getRow(), 0, r.getRow().length);
302   - sharedState.largestDeletedTimestamp = sharedState.hashmap.put(r.getRow(),
  307 + sharedState.largestDeletedTimestamp.set(sharedState.hashmap.put(r.getRow(),
303 308 r.getTable(),
304 309 commitTimestamp,
305 310 r.hashCode(),
306   - sharedState.largestDeletedTimestamp);
  311 + sharedState.largestDeletedTimestamp.get()));
307 312 }
308 313
309 314 sharedState.processCommit(msg.startTimestamp, commitTimestamp);
310   - if (sharedState.largestDeletedTimestamp > sharedState.previousLargestDeletedTimestamp) {
  315 + if (sharedState.largestDeletedTimestamp.get() > sharedState.previousLargestDeletedTimestamp.get()) {
311 316 toWAL.writeByte(LoggerProtocol.LARGESTDELETEDTIMESTAMP);
312   - toWAL.writeLong(sharedState.largestDeletedTimestamp);
313   - Set<Long> toAbort = sharedState.uncommited.raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp);
  317 + toWAL.writeLong(sharedState.largestDeletedTimestamp.get());
  318 + Set<Long> toAbort = sharedState.uncommited.raiseLargestDeletedTransaction(sharedState.largestDeletedTimestamp.get());
314 319 // if (!toAbort.isEmpty()) {
315 320 // LOG.warn("Slow transactions after raising max: " + toAbort);
316 321 // System.out.println("largest deleted ts: " + sharedState.largestDeletedTimestamp);
@@ -320,9 +325,9 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
320 325 sharedState.hashmap.setHalfAborted(id);
321 326 queueHalfAbort(id);
322 327 }
323   - queueLargestIncrease(sharedState.largestDeletedTimestamp);
  328 + queueLargestIncrease(sharedState.largestDeletedTimestamp.get());
324 329 }
325   - sharedState.previousLargestDeletedTimestamp = sharedState.largestDeletedTimestamp;
  330 + sharedState.previousLargestDeletedTimestamp.set(sharedState.largestDeletedTimestamp.get());
326 331 }
327 332 synchronized (sharedMsgBufLock) {
328 333 queueCommit(msg.startTimestamp, commitTimestamp);
@@ -356,8 +361,8 @@ public void handle(CommitRequest msg, ChannelHandlerContext ctx) {
356 361
357 362 sharedState.nextBatch.add(cam);
358 363 if (sharedState.baos.size() >= TSOState.BATCH_SIZE) {
359   - if(LOG.isDebugEnabled()){
360   - LOG.debug("Going to add record of size " + sharedState.baos.size());
  364 + if(LOG.isTraceEnabled()){
  365 + LOG.trace("Going to add record of size " + sharedState.baos.size());
361 366 }
362 367 //sharedState.lh.asyncAddEntry(baos.toByteArray(), this, sharedState.nextBatch);
363 368 sharedState.addRecord(baos.toByteArray(),
@@ -421,8 +426,8 @@ else if (sharedState.uncommited.isUncommited(msg.queryTimestamp))
421 426
422 427 public void flush() {
423 428 synchronized (sharedState) {
424   - if(LOG.isDebugEnabled()){
425   - LOG.debug("Adding record, size: " + sharedState.baos.size());
  429 + if(LOG.isTraceEnabled()){
  430 + LOG.trace("Adding record, size: " + sharedState.baos.size());
426 431 }
427 432 sharedState.addRecord(sharedState.baos.toByteArray(), new AddRecordCallback() {
428 433 @Override
@@ -459,8 +464,8 @@ public void run() {
459 464 if (sharedState.nextBatch.size() > 0) {
460 465 synchronized (sharedState) {
461 466 if (sharedState.nextBatch.size() > 0) {
462   - if(LOG.isDebugEnabled()){
463   - LOG.debug("Flushing log batch.");
  467 + if(LOG.isTraceEnabled()){
  468 + LOG.trace("Flushing log batch.");
464 469 }
465 470 flush();
466 471 }
16 src/main/java/com/yahoo/omid/tso/TSOSharedMessageBuffer.java
@@ -175,8 +175,8 @@ public void writeCommit(long startTimestamp, long commitTimestamp) {
175 175 ++_Coms;
176 176 ++_Writes;
177 177 int readBefore = writeBuffer.readableBytes();
178   - long startDiff = startTimestamp - state.latestStartTimestamp;
179   - long commitDiff = commitTimestamp - state.latestCommitTimestamp;
  178 + long startDiff = startTimestamp - state.latestStartTimestamp.get();
  179 + long commitDiff = commitTimestamp - state.latestCommitTimestamp.get();
180 180 if (commitDiff == 1 && startDiff >= -32 && startDiff <= 31) {
181 181 ++_1B;
182 182 startDiff &= 0x3f;
@@ -230,8 +230,8 @@ public void writeCommit(long startTimestamp, long commitTimestamp) {
230 230
231 231 _Avg2 += (written - _Avg2) / _Writes;
232 232 _Avg += (written - _Avg) / _Coms;
233   - state.latestStartTimestamp = startTimestamp;
234   - state.latestCommitTimestamp = commitTimestamp;
  233 + state.latestStartTimestamp.set(startTimestamp);
  234 + state.latestCommitTimestamp.set(commitTimestamp);
235 235 }
236 236
237 237 public void writeHalfAbort(long startTimestamp) {
@@ -240,7 +240,7 @@ public void writeHalfAbort(long startTimestamp) {
240 240 }
241 241 ++_Writes;
242 242 int readBefore = writeBuffer.readableBytes();
243   - long diff = startTimestamp - state.latestHalfAbortTimestamp;
  243 + long diff = startTimestamp - state.latestHalfAbortTimestamp.get();
244 244 if (diff >= -16 && diff <= 15) {
245 245 writeBuffer.writeByte((byte)((diff & 0x1f) | (0x40)));
246 246 } else if (diff >= Byte.MIN_VALUE && diff <= Byte.MAX_VALUE) {
@@ -252,7 +252,7 @@ public void writeHalfAbort(long startTimestamp) {
252 252 }
253 253 ++_ha;
254 254
255   - state.latestHalfAbortTimestamp = startTimestamp;
  255 + state.latestHalfAbortTimestamp.set(startTimestamp);
256 256 int written = writeBuffer.readableBytes() - readBefore;
257 257 _Avg2 += (written - _Avg2) / _Writes;
258 258 }
@@ -263,7 +263,7 @@ public void writeFullAbort(long startTimestamp) {
263 263 }
264 264 ++_Writes;
265 265 int readBefore = writeBuffer.readableBytes();
266   - long diff = startTimestamp - state.latestFullAbortTimestamp;
  266 + long diff = startTimestamp - state.latestFullAbortTimestamp.get();
267 267 if (diff >= -16 && diff <= 15) {
268 268 writeBuffer.writeByte((byte)((diff & 0x1f) | (0x60)));
269 269 } else if (diff >= Byte.MIN_VALUE && diff <= Byte.MAX_VALUE) {
@@ -275,7 +275,7 @@ public void writeFullAbort(long startTimestamp) {
275 275 }
276 276 ++_fa;
277 277
278   - state.latestFullAbortTimestamp = startTimestamp;
  278 + state.latestFullAbortTimestamp.set(startTimestamp);
279 279 int written = writeBuffer.readableBytes() - readBefore;
280 280 _Avg2 += (written - _Avg2) / _Writes;
281 281 }
42 src/main/java/com/yahoo/omid/tso/TSOState.java
@@ -20,15 +20,15 @@
20 20 import java.io.DataOutputStream;
21 21 import java.util.ArrayList;
22 22 import java.util.List;
23   -
24   -import com.yahoo.omid.tso.persistence.LoggerException.Code;
25   -import com.yahoo.omid.tso.persistence.BookKeeperStateBuilder;
26   -import com.yahoo.omid.tso.persistence.StateLogger;
27   -import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
  23 +import java.util.concurrent.atomic.AtomicLong;
28 24
29 25 import org.apache.commons.logging.Log;
30 26 import org.apache.commons.logging.LogFactory;
31 27
  28 +import com.yahoo.omid.tso.persistence.LoggerAsyncCallback.AddRecordCallback;
  29 +import com.yahoo.omid.tso.persistence.LoggerException.Code;
  30 +import com.yahoo.omid.tso.persistence.StateLogger;
  31 +
32 32
33 33 /**
34 34 * The wrapper for different states of TSO
@@ -103,12 +103,12 @@ protected TimestampOracle getSO(){
103 103 /**
104 104 * Largest Deleted Timestamp
105 105 */
106   - public long largestDeletedTimestamp = 0;
107   - public long previousLargestDeletedTimestamp = 0;
108   - public long latestCommitTimestamp = 0;
109   - public long latestStartTimestamp = 0;
110   - public long latestHalfAbortTimestamp = 0;
111   - public long latestFullAbortTimestamp = 0;
  106 + public AtomicLong largestDeletedTimestamp = new AtomicLong();
  107 + public AtomicLong previousLargestDeletedTimestamp = new AtomicLong();
  108 + public AtomicLong latestCommitTimestamp = new AtomicLong();
  109 + public AtomicLong latestStartTimestamp = new AtomicLong();
  110 + public AtomicLong latestHalfAbortTimestamp = new AtomicLong();
  111 + public AtomicLong latestFullAbortTimestamp = new AtomicLong();
112 112
113 113 public TSOSharedMessageBuffer sharedMessageBuffer = new TSOSharedMessageBuffer(this);
114 114
@@ -125,8 +125,8 @@ protected TimestampOracle getSO(){
125 125 *
126 126 * @param startTimestamp
127 127 */
128   - protected void processCommit(long startTimestamp, long commitTimestamp){
129   - largestDeletedTimestamp = hashmap.setCommitted(startTimestamp, commitTimestamp, largestDeletedTimestamp);
  128 + protected synchronized void processCommit(long startTimestamp, long commitTimestamp){
  129 + largestDeletedTimestamp.set(hashmap.setCommitted(startTimestamp, commitTimestamp, largestDeletedTimestamp.get()));
130 130 }
131 131
132 132 /**
@@ -135,7 +135,7 @@ protected void processCommit(long startTimestamp, long commitTimestamp){
135 135 * @param largestDeletedTimestamp
136 136 */
137 137 protected synchronized void processLargestDeletedTimestamp(long largestDeletedTimestamp){
138   - this.largestDeletedTimestamp = Math.max(largestDeletedTimestamp, this.largestDeletedTimestamp);
  138 + this.largestDeletedTimestamp.set(Math.max(largestDeletedTimestamp, this.largestDeletedTimestamp.get()));
139 139 }
140 140
141 141 /**
@@ -143,7 +143,7 @@ protected synchronized void processLargestDeletedTimestamp(long largestDeletedTi
143 143 *
144 144 * @param startTimestamp
145 145 */
146   - protected void processAbort(long startTimestamp){
  146 + protected synchronized void processAbort(long startTimestamp){
147 147 hashmap.setHalfAborted(startTimestamp);
148 148 uncommited.abort(startTimestamp);
149 149 }
@@ -153,7 +153,7 @@ protected void processAbort(long startTimestamp){
153 153 *
154 154 * @param startTimestamp
155 155 */
156   - protected void processFullAbort(long startTimestamp){
  156 + protected synchronized void processFullAbort(long startTimestamp){
157 157 hashmap.setFullAborted(startTimestamp);
158 158 }
159 159
@@ -191,16 +191,14 @@ void stop(){
191 191
192 192 public TSOState(StateLogger logger, TimestampOracle timestampOracle) {
193 193 this.timestampOracle = timestampOracle;
194   - this.largestDeletedTimestamp = this.previousLargestDeletedTimestamp = this.timestampOracle.get();
195   - this.uncommited = new Uncommited(largestDeletedTimestamp);
  194 + this.previousLargestDeletedTimestamp.set(this.timestampOracle.get());
  195 + this.largestDeletedTimestamp.set(this.previousLargestDeletedTimestamp.get());
  196 + this.uncommited = new Uncommited(largestDeletedTimestamp.get());
196 197 this.logger = logger;
197 198 }
198 199
199 200 public TSOState(TimestampOracle timestampOracle) {
200   - this.timestampOracle = timestampOracle;
201   - this.largestDeletedTimestamp = this.previousLargestDeletedTimestamp = timestampOracle.get();
202   - this.uncommited = new Uncommited(largestDeletedTimestamp);
203   - this.logger = null;
  201 + this(null, timestampOracle);
204 202 }
205 203 }
206 204
48 src/main/java/com/yahoo/omid/tso/ThroughputMonitor.java
@@ -79,7 +79,7 @@ public void run() {
79 79
80 80 long oldOverflow = TSOSharedMessageBuffer._overflows;
81 81 for (;;) {
82   - Thread.sleep(3000);
  82 + Thread.sleep(1000);
83 83
84 84 long endTime = System.currentTimeMillis();
85 85 long newCounter = TSOHandler.getTransferredBytes();
@@ -156,32 +156,32 @@ public void run() {
156 156 // TSOSharedMessageBuffer._avgLT,
157 157 TSOPipelineFactory.bwhandler != null ? TSOPipelineFactory.bwhandler.getBytesReceivedPerSecond() / (double) (1024 * 1024) : 0,
158 158 TSOPipelineFactory.bwhandler != null ? TSOPipelineFactory.bwhandler.getBytesSentPerSecond() / (double) (1024 * 1024) : 0,
159   - state.largestDeletedTimestamp
  159 + state.largestDeletedTimestamp.get()
160 160 // Arrays.toString(TSOSharedMessageBuffer.freq)
161 161 )
162 162 );
163   - LOG.trace(String.format("SERVER: %4.3f TPS, %4.6f Abort/s "
164   - + "Co: %2.2f Ha: %2.2f Fa: %2.2f Li: %2.2f Avg commit: %2.4f Avg flush: %5.2f "
165   - + "Avg write: %5.2f Tot overflows: %d Tot flushes: %d Tot empty flu: %d "
166   - + "Queries: %d CurrentBuffers: %d ExtraGets: %d AskedTSO: %d",
167   - (newCounter - oldCounter) / (float)(endTime - startTime) * 1000,
168   - (newAbortCount - oldAbortCount) / (float)(endTime - startTime) * 1000,
169   - (newComs - oldComs) / (float)(newWrites - oldWrites) * 100,
170   - (newHa - oldHa) / (float)(newWrites - oldWrites) * 100,
171   - (newFa - oldFa) / (float)(newWrites - oldWrites) * 100,
172   - (newLi - oldLi) / (float)(newWrites - oldWrites) * 100,
173   - avg,
174   - (newflusheSize - oldflusheSize) / (float)(newflushes - oldflushes),
175   - avg2,
176   - newOverflow - oldOverflow,
177   - (newflushes - oldflushes),
178   - newEmptyFlushes - oldEmptyFlushes,
179   -
180   - newQueries - oldQueries,
181   - TSOBuffer.nBuffers,
182   - newExtraGetsPerformed - oldExtraGetsPerformed,
183   - newAskedTSO - oldAskedTSO)
184   - );
  163 +// LOG.trace(String.format("SERVER: %4.3f TPS, %4.6f Abort/s "
  164 +// + "Co: %2.2f Ha: %2.2f Fa: %2.2f Li: %2.2f Avg commit: %2.4f Avg flush: %5.2f "
  165 +// + "Avg write: %5.2f Tot overflows: %d Tot flushes: %d Tot empty flu: %d "
  166 +// + "Queries: %d CurrentBuffers: %d ExtraGets: %d AskedTSO: %d",
  167 +// (newCounter - oldCounter) / (float)(endTime - startTime) * 1000,
  168 +// (newAbortCount - oldAbortCount) / (float)(endTime - startTime) * 1000,
  169 +// (newComs - oldComs) / (float)(newWrites - oldWrites) * 100,
  170 +// (newHa - oldHa) / (float)(newWrites - oldWrites) * 100,
  171 +// (newFa - oldFa) / (float)(newWrites - oldWrites) * 100,
  172 +// (newLi - oldLi) / (float)(newWrites - oldWrites) * 100,
  173 +// avg,
  174 +// (newflusheSize - oldflusheSize) / (float)(newflushes - oldflushes),
  175 +// avg2,
  176 +// newOverflow - oldOverflow,
  177 +// (newflushes - oldflushes),
  178 +// newEmptyFlushes - oldEmptyFlushes,
  179 +//
  180 +// newQueries - oldQueries,
  181 +// TSOBuffer.nBuffers,
  182 +// newExtraGetsPerformed - oldExtraGetsPerformed,
  183 +// newAskedTSO - oldAskedTSO)
  184 +// );
185 185 // if (TSOPipelineFactory.bwhandler != null) {
186 186 // TSOPipelineFactory.bwhandler.reset();
187 187 // }
9 src/main/java/com/yahoo/omid/tso/TimestampOracle.java
@@ -61,9 +61,10 @@
61 61 public long next(DataOutputStream toWal) throws IOException {
62 62 last++;
63 63 if (last >= maxTimestamp) {
64   - toWal.writeByte(LoggerProtocol.TIMESTAMPORACLE);
65   - toWal.writeLong(last);
66 64 maxTimestamp += TIMESTAMP_BATCH;
  65 + toWal.writeByte(LoggerProtocol.TIMESTAMPORACLE);
  66 + toWal.writeLong(maxTimestamp);
  67 + LOG.warn("Loggin timestamporacle " + maxTimestamp);
67 68 }
68 69 if(LOG.isTraceEnabled()){
69 70 LOG.trace("Next timestamp: " + last);
@@ -106,8 +107,8 @@ public void initialize(){
106 107 */
107 108 public void initialize(long timestamp){
108 109 LOG.info("Initializing timestamp oracle");
109   - this.last = this.first = Math.max(this.last, TIMESTAMP_BATCH);
110   - maxTimestamp = this.first;
  110 + this.last = this.first = Math.max(this.last, timestamp + TIMESTAMP_BATCH);
  111 + maxTimestamp = this.first; // max timestamp will be persisted
111 112 LOG.info("First: " + this.first + ", Last: " + this.last);
112 113 initialize();
113 114 }
1  src/main/java/com/yahoo/omid/tso/Uncommited.java
@@ -44,6 +44,7 @@ public Uncommited(long startTimestamp) {
44 44 }
45 45
46 46 public synchronized void commit(long id) {
  47 +// System.out.println("Committing uncommitted " + id);
47 48 if (log && id < first_start) {
48 49 System.out.println("id: " + id);
49 50 Thread.dumpStack();
5 src/main/java/com/yahoo/omid/tso/messages/MinimumTimestamp.java
@@ -16,4 +16,9 @@ public long getTimestamp() {
16 16 return timestamp;
17 17 }
18 18
  19 + @Override
  20 + public String toString() {
  21 + return "{MinimumTimestamp " + timestamp + "}";
  22 + }
  23 +
19 24 }
11 src/main/java/com/yahoo/omid/tso/persistence/BookKeeperStateBuilder.java
@@ -23,6 +23,7 @@
23 23 import java.nio.ByteBuffer;
24 24 import java.util.Enumeration;
25 25 import java.util.concurrent.CountDownLatch;
  26 +import java.util.concurrent.Semaphore;
26 27
27 28 import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
28 29 import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
@@ -73,6 +74,7 @@
73 74 * It provides a good degree of parallelism.
74 75 */
75 76 private static final long BKREADBATCHSIZE = 50;
  77 + private static final int PARALLEL_READS = 4;
76 78
77 79 public static TSOState getState(TSOServerConfig config){
78 80 TSOState returnValue;
@@ -99,11 +101,13 @@ public static TSOState getState(TSOServerConfig config){
99 101 ZooKeeper zk;
100 102 LoggerProtocol lp;
101 103 boolean enabled;
  104 + Semaphore throttleReads;
102 105 TSOServerConfig config;
103 106
104 107 BookKeeperStateBuilder(TSOServerConfig config) {
105 108 this.timestampOracle = new TimestampOracle();
106 109 this.config = config;
  110 + this.throttleReads = new Semaphore(PARALLEL_READS);
107 111 }
108 112
109 113 /**
@@ -205,6 +209,7 @@ public void processResult(int rc, String path, Object ctx, byte[] data, Stat sta
205 209 */
206 210 class LoggerExecutor implements ReadCallback {
207 211 public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx){
  212 + throttleReads.release();
208 213 if(rc != BKException.Code.OK){
209 214 LOG.error("Error while reading ledger entries." + BKException.getMessage(rc));
210 215 ((BookKeeperStateBuilder.Context) ctx).setState(null);
@@ -343,6 +348,12 @@ public void openComplete(int rc, LedgerHandle lh, Object ctx){
343 348 } else {
344 349 long counter = lh.getLastAddConfirmed();
345 350 while(counter >= 0){
  351 + try {
  352 + throttleReads.acquire();
  353 + } catch (InterruptedException e) {
  354 + // ignore
  355 + }
  356 + if (((Context) ctx).isReady()) break;
346 357 long nextBatch = Math.max(counter - BKREADBATCHSIZE + 1, 0);
347 358 lh.asyncReadEntries(nextBatch, counter, new LoggerExecutor(), ctx);
348 359 counter -= BKREADBATCHSIZE;
13 src/main/java/com/yahoo/omid/tso/persistence/LoggerProtocol.java
@@ -48,14 +48,16 @@
48 48 super(timestampOracle);
49 49 }
50 50
  51 + private boolean commits;
  52 + private boolean oracle;
  53 +
51 54 /**
52 55 * Execute a logged entry (several logged ops)
53 56 * @param bb Serialized operations
54 57 * @return true if the recovery is finished
55 58 */
56   - boolean execute(ByteBuffer bb){
  59 + synchronized boolean execute(ByteBuffer bb){
57 60 boolean done = !bb.hasRemaining();
58   - boolean recovered = false;
59 61 while(!done){
60 62 byte op = bb.get();
61 63 long timestamp, startTimestamp, commitTimestamp;
@@ -66,13 +68,14 @@ boolean execute(ByteBuffer bb){
66 68 case TIMESTAMPORACLE:
67 69 timestamp = bb.getLong();
68 70 this.getSO().initialize(timestamp);
  71 + oracle = true;
69 72 break;
70 73 case COMMIT:
71 74 startTimestamp = bb.getLong();
72 75 commitTimestamp = bb.getLong();
73 76 processCommit(startTimestamp, commitTimestamp);
74   - if (commitTimestamp < largestDeletedTimestamp) {
75   - recovered = true;
  77 + if (commitTimestamp < largestDeletedTimestamp.get()) {
  78 + commits = true;
76 79 }
77 80 break;
78 81 case LARGESTDELETEDTIMESTAMP:
@@ -93,7 +96,7 @@ boolean execute(ByteBuffer bb){
93 96 }
94 97 if(bb.remaining() == 0) done = true;
95 98 }
96   - return recovered;
  99 + return oracle && commits;
97 100 }
98 101
99 102 /**
1  src/test/java/com/yahoo/omid/TestCompaction.java
@@ -35,6 +35,7 @@
35 35 import com.yahoo.omid.client.TransactionManager;
36 36 import com.yahoo.omid.client.TransactionState;
37 37 import com.yahoo.omid.client.TransactionalTable;
  38 +import com.yahoo.omid.tso.TSOState;
38 39
39 40 public class TestCompaction extends OmidTestBase {
40 41 private static final Log LOG = LogFactory.getLog(TestCompaction.class);
2  src/test/java/com/yahoo/omid/tso/TestCommitQuery.java
@@ -82,7 +82,7 @@ public void testCommitQuery() throws Exception {
82 82 TimestampResponse tr3 = clientHandler.receiveMessage(TimestampResponse.class);
83 83 assertTrue(tr3.timestamp > tr2.timestamp);
84 84
85   - state.largestDeletedTimestamp = 1000000;
  85 + state.largestDeletedTimestamp.set(1000000);
86 86
87 87 //
88 88 // Test Commit query of half aborted transaction

0 comments on commit 756568b

Flavio Junqueira

Was it commented out accidently? Otherwise, why not delete it?

Flavio Junqueira

Why aren't you using the logging facility here? Also, output the stack trace through the call to the log object.

Flavio Junqueira

Do you really need to make all these attributes AtomicLong?

Flavio Junqueira

I'm not sure why this method needs to be synchronized.

Flavio Junqueira

Did you really mean to comment out all these lines?

Flavio Junqueira

Is this really a warning? Sounds like an error to me. Last shouldn't be greater than max, no?

Flavio Junqueira

Can we use the logging facility here?

Flavio Junqueira

If you catch an interrupted exception here, then you probably don't want to continue the execution.

Daniel Gómez Ferro

I want to get rid of them later on, but right now I'm accessing them from an unsynchronized section (to bootstrap the clients) and wanted to make sure they are consistent.

Daniel Gómez Ferro

I was quite sure these synchronized were needed, but you are probably right. I'll have a second look.

Daniel Gómez Ferro

Actually it's a trace. I changed the comparison to == to make it clear. This is where we reserve the next batch of timestamps.

Daniel Gómez Ferro

I think this is needed, the largest Deleted timestamp can be modified both from processCommit() and from here.
We probably should be rebuilding the state serially, otherwise we could process a FullAbort before a HalfAbort, which would be leaked.

Please sign in to comment.
Something went wrong with that request. Please try again.