Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'S4-44' into dev

  • Loading branch information...
commit 1f6ddce9e95fee6973f1c37a53097a112d6fb34b 2 parents dd96e87 + a0574b5
@matthieumorel matthieumorel authored
Showing with 920 additions and 568 deletions.
  1. +49 −28 s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
  2. +43 −0 s4-core/src/main/java/org/apache/s4/ft/FetchTask.java
  3. +0 −37 s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
  4. +126 −129 s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
  5. +5 −5 s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
  6. +8 −1 s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
  7. +44 −86 s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
  8. +59 −101 s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
  9. +16 −14 s4-core/src/main/java/org/apache/s4/util/MetricsName.java
  10. +7 −3 s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
  11. +121 −122 s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
  12. +5 −5 s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
  13. +53 −0 s4-core/src/test/java/org/apache/s4/ft/rectimeout/BrokenStorage.java
  14. +130 −0 s4-core/src/test/java/org/apache/s4/ft/rectimeout/RecoveryTimeoutTest.java
  15. +26 −0 s4-core/src/test/java/org/apache/s4/ft/rectimeout/app_conf.xml
  16. +196 −0 s4-core/src/test/java/org/apache/s4/ft/rectimeout/s4_core_conf_broken_backend.xml
  17. +6 −0 s4-core/src/test/java/org/apache/s4/ft/rectimeout/wall_clock.xml
  18. +13 −20 s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
  19. +8 −9 s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
  20. +5 −8 s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
View
77 s4-core/src/main/java/org/apache/s4/ft/CheckpointingCoordinator.java
@@ -17,6 +17,7 @@
*/
package org.apache.s4.ft;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -27,8 +28,8 @@
/**
* Prevents event processing thread and serialization thread to overlap on the same PE instance, which would cause consistency issues during recovery.
- *
- * How it works:
+ *
+ * How it works (for each prototype):
* - we keep track of the PE being serialized and the PE being processed
* - access to the PE is guarded by the instance of this class
* - if the event processing thread receives an event that is handled by a PE currently being serialized, it waits until serialization is complete
@@ -41,14 +42,18 @@
private static final Logger logger = Logger
.getLogger(CheckpointingCoordinator.class);
- AbstractPE processing = null;
- AbstractPE serializing = null;
+ private static class PrototypeSynchro {
+ Lock lock = new ReentrantLock();
+ Condition processingFinished = lock.newCondition();
+ Condition serializingFinished = lock.newCondition();
+ AbstractPE processing = null;
+ AbstractPE serializing = null;
+ }
+
+ ConcurrentHashMap<String, PrototypeSynchro> synchros = new ConcurrentHashMap<String, CheckpointingCoordinator.PrototypeSynchro>();
long maxSerializationLockDuration;
- Lock lock = new ReentrantLock();
- Condition processingFinished = lock.newCondition();
- Condition serializingFinished = lock.newCondition();
public CheckpointingCoordinator(long maxSerializationLockDuration) {
super();
@@ -56,15 +61,16 @@ public CheckpointingCoordinator(long maxSerializationLockDuration) {
}
public void acquireForProcessing(AbstractPE pe) {
- lock.lock();
+ PrototypeSynchro sync = getPrototypeSynchro(pe);
+ sync.lock.lock();
try {
- if (serializing == pe) {
+ if (sync.serializing == pe) {
try {
if (logger.isTraceEnabled()) {
logger.trace("processing must wait for serialization to finish for PE "
+ pe.getId() + "/" + pe.getKeyValueString());
}
- serializingFinished.await(maxSerializationLockDuration,
+ sync.serializingFinished.await(maxSerializationLockDuration,
TimeUnit.MILLISECONDS);
acquireForProcessing(pe);
} catch (InterruptedException e) {
@@ -75,61 +81,76 @@ public void acquireForProcessing(AbstractPE pe) {
+ "/"
+ pe.getKeyValueString()
+ "]\nProceeding anyway, but checkpoint may contain inconsistent value");
- serializing = null;
+ sync.serializing = null;
}
}
- processing = pe;
+ sync.processing = pe;
} finally {
- lock.unlock();
+ sync.lock.unlock();
}
}
public void releaseFromProcessing(AbstractPE pe) {
- lock.lock();
+ PrototypeSynchro sync = getPrototypeSynchro(pe);
+ sync.lock.lock();
try {
- if (processing == pe) {
- processing = null;
- processingFinished.signal();
+ if (sync.processing == pe) {
+ sync.processing = null;
+ sync.processingFinished.signal();
} else {
logger.warn("Cannot release from processing thread a PE that is not already in processing state");
}
} finally {
- lock.unlock();
+ sync.lock.unlock();
}
}
public void acquireForSerialization(AbstractPE pe) {
- lock.lock();
+ PrototypeSynchro sync = getPrototypeSynchro(pe);
+ sync.lock.lock();
try {
- if (processing == pe) {
+ if (sync.processing == pe) {
try {
if (logger.isTraceEnabled()) {
logger.trace("serialization must wait for processing to finish for PE "
+ pe.getId() + "/" + pe.getKeyValueString());
}
- processingFinished.await(maxSerializationLockDuration, TimeUnit.MILLISECONDS);
+ sync.processingFinished.await(maxSerializationLockDuration, TimeUnit.MILLISECONDS);
acquireForSerialization(pe);
} catch (InterruptedException e) {
// we still need to make sure it is now safe to serialize
acquireForSerialization(pe);
}
}
- serializing = pe;
+ sync.serializing = pe;
} finally {
- lock.unlock();
+ sync.lock.unlock();
+ }
+ }
+
+ private PrototypeSynchro getPrototypeSynchro(AbstractPE pe) {
+ PrototypeSynchro sync = synchros.get(pe.getId());
+ if (sync==null) {
+ sync = new PrototypeSynchro();
+ PrototypeSynchro existing = synchros.putIfAbsent(pe.getId(), sync);
+ if (existing !=null) {
+ sync = existing;
+ }
}
+ return sync;
}
public void releaseFromSerialization(AbstractPE pe)
throws InterruptedException {
- lock.lock();
+ PrototypeSynchro sync = synchros.get(pe.getId());
+ sync.lock.lock();
try {
- if (serializing == pe) {
- serializing = null;
- serializingFinished.signal();
+ if (sync.serializing == pe) {
+ sync.serializing = null;
+ sync.serializingFinished.signal();
}
} finally {
- lock.unlock();
+ sync.lock.unlock();
}
}
}
View
43 s4-core/src/main/java/org/apache/s4/ft/FetchTask.java
@@ -0,0 +1,43 @@
+package org.apache.s4.ft;
+
+import static org.apache.s4.util.MetricsName.S4_CORE_METRICS;
+
+import java.util.concurrent.Callable;
+
+import org.apache.s4.util.MetricsName;
+
+/**
+ * Encapsulates a fetching operation.
+ *
+ */
+public class FetchTask implements Callable<byte[]>{
+
+ StateStorage stateStorage;
+ SafeKeeper safeKeeper;
+ SafeKeeperId safeKeeperId;
+
+ public FetchTask(StateStorage stateStorage, SafeKeeper safeKeeper,
+ SafeKeeperId safeKeeperId) {
+ super();
+ this.stateStorage = stateStorage;
+ this.safeKeeper = safeKeeper;
+ this.safeKeeperId = safeKeeperId;
+ }
+
+ @Override
+ public byte[] call() throws Exception {
+ try {
+ byte[] result = stateStorage.fetchState(safeKeeperId);
+ if (safeKeeper.monitor!=null) {
+ safeKeeper.monitor.increment(MetricsName.checkpointing_fetching_success.toString(), 1, S4_CORE_METRICS.toString());
+ }
+ return result;
+ } catch (Exception e) {
+ if (safeKeeper.monitor!=null) {
+ safeKeeper.monitor.increment(MetricsName.checkpointing_fetching_failed.toString(), 1, S4_CORE_METRICS.toString());
+ }
+ throw e;
+ }
+ }
+
+}
View
37 s4-core/src/main/java/org/apache/s4/ft/InitiateCheckpointingEvent.java
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.s4.ft;
-
-/**
- *
- * Event that triggers a checkpoint.
- *
- */
-public class InitiateCheckpointingEvent extends CheckpointingEvent {
-
- public InitiateCheckpointingEvent() {
- // as required by default kryo serializer
- }
-
- public InitiateCheckpointingEvent(SafeKeeperId safeKeeperId) {
- super(safeKeeperId);
- }
-
-
-
-}
View
255 s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -23,10 +23,12 @@
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;
import org.apache.s4.dispatcher.Dispatcher;
@@ -38,19 +40,19 @@
import org.apache.s4.util.MetricsName;
/**
- *
+ *
* <p>
* This class is responsible for coordinating interactions between the S4 event
* processor and the checkpoint storage backend. In particular, it schedules
* asynchronous save tasks to be executed on the backend.
* </p>
- *
- *
- *
+ *
+ *
+ *
*/
public class SafeKeeper {
- public enum StorageResultCode {
+ public enum StorageResultCode {
SUCCESS, FAILURE
}
@@ -66,19 +68,26 @@
private ThreadPoolExecutor storageThreadPool;
private ThreadPoolExecutor serializationThreadPool;
-
+ private ThreadPoolExecutor fetchingThreadPool;
+
private CheckpointingCoordinator processingSerializationSynchro;
-
- private Monitor monitor;
-
+
+ Monitor monitor;
+
int storageMaxThreads = 1;
int storageThreadKeepAliveSeconds = 120;
int storageMaxOutstandingRequests = 1000;
-
- int serializationMaxThreads=1;
+
int serializationThreadKeepAliveSeconds = 120;
int serializationMaxOutstandingRequests = 1000;
-
+
+ long fetchingMaxWaitMs = 1000;
+ int fetchingMaxConsecutiveFailuresBeforeDisabling = 10;
+ int fetchingCurrentConsecutiveFailures = 0;
+ long fetchingDisabledDurationMs = 600000;
+ long fetchingDisabledInitTime=-1;
+
+
long maxSerializationLockTime = 1000;
public SafeKeeper() {
@@ -103,9 +112,11 @@ public void init() {
}
storageThreadPool = new ThreadPoolExecutor(1, storageMaxThreads, storageThreadKeepAliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(storageMaxOutstandingRequests));
- serializationThreadPool = new ThreadPoolExecutor(1, serializationMaxThreads, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ serializationThreadPool = new ThreadPoolExecutor(1, 1, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(serializationMaxOutstandingRequests));
-
+ fetchingThreadPool = new ThreadPoolExecutor(1, 1, serializationThreadKeepAliveSeconds, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(serializationMaxOutstandingRequests));
+
processingSerializationSynchro = new CheckpointingCoordinator(maxSerializationLockTime);
logger.debug("Started thread pool with maxWriteThreads=[" + storageMaxThreads
@@ -121,68 +132,68 @@ public void init() {
}
nodeCount = getLoopbackDispatcher().getEventEmitter().getNodeCount();
}
-
+
signalNodesAvailable.countDown();
}
-
+
/**
* Synchronization to prevent race conditions with serialization threads
*/
public void acquirePermitForProcessing(AbstractPE pe) {
- processingSerializationSynchro.acquireForProcessing(pe);
+ processingSerializationSynchro.acquireForProcessing(pe);
}
-
+
/**
* Notification part of the mechanism for preventing race condition with serialization threads
*/
- public void releasePermitForProcessing(AbstractPE pe) {
- processingSerializationSynchro.releaseFromProcessing(pe);
- }
-
-
- /**
- * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
- *
- * @return a callback for getting notified of the result of the storage operation
- */
- public StorageCallback saveState(AbstractPE pe) {
- StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
- Future<byte[]> futureSerializedState = null;
- try {
- futureSerializedState = serializeState(pe, processingSerializationSynchro);
- } catch (RejectedExecutionException e) {
- if (monitor!=null) {
- monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
- }
- storageCallback.storageOperationResult(StorageResultCode.FAILURE,
- "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["+ pe.getId()+"]" +
- " Remaining capacity for the serialization task queue is ["
- + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
- + serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" + serializationThreadPool
- + "]");
- return storageCallback;
- }
- submitSaveStateTask(new SaveStateTask(pe.getSafeKeeperId(), futureSerializedState, storageCallback, stateStorage), storageCallback);
- return storageCallback;
- }
-
+ public void releasePermitForProcessing(AbstractPE pe) {
+ processingSerializationSynchro.releaseFromProcessing(pe);
+ }
+
+
+ /**
+ * Serializes and stores state to the storage backend. Serialization and storage operations are asynchronous.
+ *
+ * @return a callback for getting notified of the result of the storage operation
+ */
+ public StorageCallback saveState(AbstractPE pe) {
+ StorageCallback storageCallback = storageCallbackFactory.createStorageCallback();
+ Future<byte[]> futureSerializedState = null;
+ try {
+ futureSerializedState = serializeState(pe, processingSerializationSynchro);
+ } catch (RejectedExecutionException e) {
+ if (monitor!=null) {
+ monitor.increment(MetricsName.checkpointing_dropped_from_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
+ }
+ storageCallback.storageOperationResult(StorageResultCode.FAILURE,
+ "Serialization task queue is full. An older serialization task was dumped in order to serialize PE ["+ pe.getId()+"]" +
+ " Remaining capacity for the serialization task queue is ["
+ + serializationThreadPool.getQueue().remainingCapacity() + "] ; number of elements is ["
+ + serializationThreadPool.getQueue().size() + "] ; maximum capacity is [" + serializationThreadPool
+ + "]");
+ return storageCallback;
+ }
+ submitSaveStateTask(new SaveStateTask(pe.getSafeKeeperId(), futureSerializedState, storageCallback, stateStorage), storageCallback);
+ return storageCallback;
+ }
+
private Future<byte[]> serializeState(AbstractPE pe, CheckpointingCoordinator coordinator) {
Future<byte[]> future = serializationThreadPool.submit(new SerializeTask(pe, coordinator));
- if(monitor!=null) {
+ if(monitor!=null) {
monitor.increment(MetricsName.checkpointing_added_to_serialization_queue.toString(), 1, S4_CORE_METRICS.toString());
}
- return future;
+ return future;
}
-
+
private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCallback) {
- try {
+ try {
storageThreadPool.execute(task);
if (monitor!=null) {
- monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1);
+ monitor.increment(MetricsName.checkpointing_added_to_storage_queue.toString(), 1, S4_CORE_METRICS.toString());
}
- } catch (RejectedExecutionException e) {
+ } catch (RejectedExecutionException e) {
if (monitor!=null) {
- monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1);
+ monitor.increment(MetricsName.checkpointing_dropped_from_storage_queue.toString(), 1, S4_CORE_METRICS.toString());
}
storageCallback.storageOperationResult(StorageResultCode.FAILURE,
"Storage checkpoint queue is full. Removed an old task to handle latest task. Remaining capacity for task queue is ["
@@ -194,7 +205,7 @@ private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCall
/**
* Fetches checkpoint data from storage for a given PE
- *
+ *
* @param key
* safeKeeperId
* @return checkpoint data
@@ -206,38 +217,32 @@ private void submitSaveStateTask(SaveStateTask task, StorageCallback storageCall
} catch (InterruptedException ignored) {
}
byte[] result = null;
- result = stateStorage.fetchState(key);
- return result;
- }
-
- /**
- * Generates a checkpoint event for a given PE, and enqueues it in the local
- * event queue.
- *
- * @param pe
- * reference to a PE
- */
- public void generateCheckpoint(AbstractPE pe) {
- InitiateCheckpointingEvent initiateCheckpointingEvent = new InitiateCheckpointingEvent(pe.getSafeKeeperId());
-
- List<List<String>> compoundKeyNames;
- if (pe.getKeyValueString() == null) {
- logger.warn("Only keyed PEs can be checkpointed. Current PE [" + pe.getSafeKeeperId()
- + "] will not be checkpointed.");
- } else {
- List<String> list = new ArrayList<String>(1);
- list.add("");
- compoundKeyNames = new ArrayList<List<String>>(1);
- compoundKeyNames.add(list);
- loopbackDispatcher.dispatchEvent(pe.getId() + "_checkpointing", compoundKeyNames,
- initiateCheckpointingEvent);
+ if ((fetchingCurrentConsecutiveFailures>0 && (fetchingCurrentConsecutiveFailures== fetchingMaxConsecutiveFailuresBeforeDisabling))) {
+ if((fetchingDisabledInitTime+fetchingDisabledDurationMs)<System.currentTimeMillis()) {
+ return null;
+ } else {
+ // reached time, reinit
+ fetchingCurrentConsecutiveFailures=0;
+ }
}
+ Future<byte[]> fetched = fetchingThreadPool.submit(new FetchTask(stateStorage, this, key));
+ try {
+ result = fetched.get(fetchingMaxWaitMs, TimeUnit.MILLISECONDS);
+ fetchingCurrentConsecutiveFailures=0;
+ } catch (Exception e) {
+ logger.error("Cannot fetch checkpoint from backend for key ["+ key.getStringRepresentation()+"]", e);
+ fetchingCurrentConsecutiveFailures++;
+ if (fetchingCurrentConsecutiveFailures==fetchingMaxConsecutiveFailuresBeforeDisabling) {
+ fetchingDisabledInitTime = System.currentTimeMillis();
+ }
+ }
+ return result;
}
/**
* Generates a recovery event, and enqueues it in the local event queue.<br/>
* This can be used for an eager recovery mechanism.
- *
+ *
* @param safeKeeperId
* safeKeeperId to recover
*/
@@ -296,62 +301,54 @@ public void setStorageCallbackFactory(StorageCallbackFactory storageCallbackFact
this.storageCallbackFactory = storageCallbackFactory;
}
- public int getStorageMaxThreads() {
- return storageMaxThreads;
- }
-
- public void setStorageMaxThreads(int storageMaxThreads) {
- this.storageMaxThreads = storageMaxThreads;
- }
-
- public int getStorageThreadKeepAliveSeconds() {
- return storageThreadKeepAliveSeconds;
- }
+ public int getStorageMaxThreads() {
+ return storageMaxThreads;
+ }
- public void setStorageThreadKeepAliveSeconds(int storageThreadKeepAliveSeconds) {
- this.storageThreadKeepAliveSeconds = storageThreadKeepAliveSeconds;
- }
+ public void setStorageMaxThreads(int storageMaxThreads) {
+ this.storageMaxThreads = storageMaxThreads;
+ }
- public int getStorageMaxOutstandingRequests() {
- return storageMaxOutstandingRequests;
- }
+ public int getStorageThreadKeepAliveSeconds() {
+ return storageThreadKeepAliveSeconds;
+ }
- public void setStorageMaxOutstandingRequests(int storageMaxOutstandingRequests) {
- this.storageMaxOutstandingRequests = storageMaxOutstandingRequests;
- }
+ public void setStorageThreadKeepAliveSeconds(int storageThreadKeepAliveSeconds) {
+ this.storageThreadKeepAliveSeconds = storageThreadKeepAliveSeconds;
+ }
- public int getSerializationMaxThreads() {
- return serializationMaxThreads;
- }
+ public int getStorageMaxOutstandingRequests() {
+ return storageMaxOutstandingRequests;
+ }
- public void setSerializationMaxThreads(int serializationMaxThreads) {
- this.serializationMaxThreads = serializationMaxThreads;
- }
+ public void setStorageMaxOutstandingRequests(int storageMaxOutstandingRequests) {
+ this.storageMaxOutstandingRequests = storageMaxOutstandingRequests;
+ }
- public int getSerializationThreadKeepAliveSeconds() {
- return serializationThreadKeepAliveSeconds;
- }
+ public int getSerializationThreadKeepAliveSeconds() {
+ return serializationThreadKeepAliveSeconds;
+ }
- public void setSerializationThreadKeepAliveSeconds(
- int serializationThreadKeepAliveSeconds) {
- this.serializationThreadKeepAliveSeconds = serializationThreadKeepAliveSeconds;
- }
+ public void setSerializationThreadKeepAliveSeconds(
+ int serializationThreadKeepAliveSeconds) {
+ this.serializationThreadKeepAliveSeconds = serializationThreadKeepAliveSeconds;
+ }
- public int getSerializationMaxOutstandingRequests() {
- return serializationMaxOutstandingRequests;
- }
+ public int getSerializationMaxOutstandingRequests() {
+ return serializationMaxOutstandingRequests;
+ }
- public void setSerializationMaxOutstandingRequests(
- int serializationMaxOutstandingRequests) {
- this.serializationMaxOutstandingRequests = serializationMaxOutstandingRequests;
- }
+ public void setSerializationMaxOutstandingRequests(
+ int serializationMaxOutstandingRequests) {
+ this.serializationMaxOutstandingRequests = serializationMaxOutstandingRequests;
+ }
- public long getMaxSerializationLockTime() {
- return maxSerializationLockTime;
- }
+ public long getMaxSerializationLockTime() {
+ return maxSerializationLockTime;
+ }
- public void setMaxSerializationLockTime(long maxSerializationLockTime) {
- this.maxSerializationLockTime = maxSerializationLockTime;
- }
+ public void setMaxSerializationLockTime(long maxSerializationLockTime) {
+ this.maxSerializationLockTime = maxSerializationLockTime;
+ }
}
View
10 s4-core/src/main/java/org/apache/s4/ft/SaveStateTask.java
@@ -24,18 +24,18 @@
/**
- *
+ *
* Encapsulates a checkpoint request. It is scheduled by the checkpointing framework.
*
*/
public class SaveStateTask implements Runnable {
-
+
SafeKeeperId safeKeeperId;
byte[] serializedState;
Future<byte[]> futureSerializedState = null;
StorageCallback storageCallback;
StateStorage stateStorage;
-
+
public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback storageCallback, StateStorage stateStorage) {
super();
this.safeKeeperId = safeKeeperId;
@@ -43,14 +43,14 @@ public SaveStateTask(SafeKeeperId safeKeeperId, byte[] state, StorageCallback st
this.storageCallback = storageCallback;
this.stateStorage = stateStorage;
}
-
+
public SaveStateTask(SafeKeeperId safeKeeperId, Future<byte[]> futureSerializedState, StorageCallback storageCallback, StateStorage stateStorage) {
this.safeKeeperId = safeKeeperId;
this.futureSerializedState = futureSerializedState;
this.storageCallback = storageCallback;
this.stateStorage = stateStorage;
}
-
+
@Override
public void run() {
if (futureSerializedState!=null) {
View
9 s4-core/src/main/java/org/apache/s4/ft/SerializeTask.java
@@ -21,11 +21,15 @@
import org.apache.s4.processor.AbstractPE;
+/**
+ * Encapsulates serialiation operation. Ensures semaphore taken on PE when serializing.
+ *
+ */
public class SerializeTask implements Callable<byte[]> {
AbstractPE pe;
private CheckpointingCoordinator coordinator;
-
+
public SerializeTask(AbstractPE pe, CheckpointingCoordinator coordinator) {
super();
this.pe = pe;
@@ -38,6 +42,9 @@ public SerializeTask(AbstractPE pe, CheckpointingCoordinator coordinator) {
coordinator.acquireForSerialization(pe);
return pe.serializeState();
} finally {
+ // remove dirty flag
+ pe.setCheckpointable(false);
+
coordinator.releaseFromSerialization(pe);
}
}
View
130 s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -17,14 +17,21 @@
*/
package org.apache.s4.processor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.StringTokenizer;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.log4j.Logger;
import org.apache.s4.dispatcher.partitioner.CompoundKeyInfo;
import org.apache.s4.dispatcher.partitioner.KeyInfo;
import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElement;
import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementIndex;
import org.apache.s4.dispatcher.partitioner.KeyInfo.KeyPathElementName;
-import org.apache.s4.ft.CheckpointingCoordinator;
-import org.apache.s4.ft.InitiateCheckpointingEvent;
-import org.apache.s4.ft.RecoveryEvent;
import org.apache.s4.ft.SafeKeeper;
import org.apache.s4.ft.SafeKeeperId;
import org.apache.s4.persist.Persister;
@@ -33,23 +40,6 @@
import org.apache.s4.schema.SchemaContainer;
import org.apache.s4.util.clock.Clock;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.StringTokenizer;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Future;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.log4j.Logger;
-
/**
* This is the base class for processor classes.
* <p>
@@ -86,7 +76,7 @@ public String getName() {
}
transient private Clock clock;
- // FIXME replaces monitor wait on AbstractPE, for triggering possible extra
+ // replaces monitor wait on AbstractPE, for triggering possible extra
// thread when checkpointing activated
transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
transient private int outputFrequency = 1;
@@ -110,9 +100,7 @@ public String getName() {
transient private boolean recoveryAttempted = false;
// true if state may have changed
- transient private boolean checkpointable = false;
- // use a flag for identifying checkpointing events
- transient private boolean isCheckpointingEvent = false;
+ transient private volatile boolean checkpointable = false;
transient private SafeKeeper safeKeeper; // handles fault tolerance
transient private CountDownLatch safeKeeperSetSignal = new CountDownLatch(1);
@@ -122,7 +110,7 @@ public String getName() {
transient private int checkpointableEventCount = 0;
transient private int checkpointsBeforePause = -1;
transient private long checkpointingPauseTimeInMillis;
-
+
transient private OverloadDispatcher overloadDispatcher;
@@ -209,9 +197,7 @@ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object e
this.streamName = streamName;
if (safeKeeper != null) {
- safeKeeper.acquirePermitForProcessing(this);
- // initialize checkpointing event flag
- this.isCheckpointingEvent = false;
+ safeKeeper.acquirePermitForProcessing(this);
if (!recoveryAttempted) {
recover();
recoveryAttempted = true;
@@ -224,7 +210,7 @@ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object e
keyRecord.clear(); // the PE doesn't need it anymore
}
- if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0 && !isCheckpointingEvent) {
+ if (outputFrequencyType == FrequencyType.EVENTCOUNT && outputFrequency > 0) {
eventCount++;
if (eventCount % outputFrequency == 0) {
try {
@@ -235,18 +221,17 @@ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object e
}
}
- // do not take into account checkpointing/recovery trigger messages
- if (!isCheckpointingEvent) {
- checkpointable = true; // dirty flag
- if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
- checkpointableEventCount++;
- if (checkpointableEventCount % checkpointingFrequency == 0) {
- // for count-based frequency, we directly checkpoint here
- checkpoint();
- }
+ setCheckpointable(true); // dirty flag
+ if (checkpointingFrequencyType == FrequencyType.EVENTCOUNT && checkpointingFrequency > 0) {
+ checkpointableEventCount++;
+ if (checkpointableEventCount % checkpointingFrequency == 0) {
+ // for count-based frequency, we directly checkpoint here
+ if (isCheckpointable()) {
+ checkpoint();
+ }
}
}
-
+
if (safeKeeper!=null) {
safeKeeper.releasePermitForProcessing(this);
}
@@ -261,7 +246,7 @@ public long getCurrentTime() {
* <p>
* The key value is a list because the key may be a compound (composite)
* key, in which case the key will have one value for each simple key.
- *
+ *
* @return the key value as a List of Objects (each element contains the
* value of a simple key).
**/
@@ -363,7 +348,7 @@ private void setKeyValue(Object event, CompoundKeyInfo compoundKeyInfo) {
* "by event count" with an output frequency of 1. (That is,
* <code>output</code> is called after after each return from
* <code>processEvent</code>).
- *
+ *
* @param outputFrequency
* the number of application events passed to
* <code>processEvent</code> before output is called.
@@ -377,7 +362,7 @@ public void setOutputFrequencyByEventCount(int outputFrequency) {
/**
* Sets the frequency strategy to "by event count". Uses the same mechanism
* than {@link #setOutputFrequencyByEventCount(int)}
- *
+ *
* @param checkpointingFrequency
* the number of application events passed to
* <code>processEvent</code> before output is called (ignoring
@@ -386,7 +371,6 @@ public void setOutputFrequencyByEventCount(int outputFrequency) {
public void setCheckpointingFrequencyByEventCount(int checkpointingFrequency) {
this.checkpointingFrequency = checkpointingFrequency;
this.checkpointingFrequencyType = FrequencyType.EVENTCOUNT;
- supplementAdviceForCheckpointingAndRecovery();
}
/**
@@ -414,7 +398,7 @@ public void setCheckpointingFrequencyByEventCount(int checkpointingFrequency) {
* "by event count" with an output frequency of 1. (That is,
* <code>output</code> is called after after each return from
* <code>processEvent</code>).
- *
+ *
* @param outputFrequency
* the time boundary in seconds
**/
@@ -427,14 +411,13 @@ public void setOutputFrequencyByTimeBoundary(int outputFrequency) {
/**
* Sets the frequency of checkpointing. It uses the same mechanism than
* {@link #setOutputFrequencyByTimeBoundary(int)}
- *
+ *
* @param checkpointingFrequency
* the time boundary in seconds
*/
public void setCheckpointingFrequencyByTimeBoundary(int checkpointingFrequency) {
this.checkpointingFrequency = checkpointingFrequency;
this.checkpointingFrequencyType = FrequencyType.TIMEBOUNDARY;
- supplementAdviceForCheckpointingAndRecovery();
initFrequency(PeriodicInvokerType.CHECKPOINTING);
}
@@ -458,13 +441,12 @@ public void setOutputFrequencyOffset(int outputFrequencyOffset) {
* Set the offset from the time boundary at which calls to checkpoint should
* be performed. It uses the same mechanism than
* {@link AbstractPE#setOutputFrequencyOffset(int)}
- *
+ *
* @param checkpointingFrequencyOffset
* checkpointing frequency offset in seconds
*/
public void setCheckpointingFrequencyOffset(int checkpointingFrequencyOffset) {
this.checkpointingFrequencyOffset = checkpointingFrequencyOffset;
- supplementAdviceForCheckpointingAndRecovery();
}
public void setKeys(String[] keys) {
@@ -472,7 +454,6 @@ public void setKeys(String[] keys) {
StringTokenizer st = new StringTokenizer(key);
eventAdviceList.add(new EventAdvice(st.nextToken(), st.nextToken()));
}
- supplementAdviceForCheckpointingAndRecovery();
}
private void initFrequency(PeriodicInvokerType type) {
@@ -519,7 +500,7 @@ public void setTtl(int ttl) {
}
/**
- *
+ *
*/
public int getTtl() {
return ttl;
@@ -530,7 +511,7 @@ public int getTtl() {
}
/**
- *
+ *
*/
public void setLookupTable(Persister lookupTable) {
this.lookupTable = lookupTable;
@@ -548,8 +529,6 @@ protected void checkpoint() {
// NOTE: assumes pe id is keyvalue from the PE...
safeKeeper.saveState(this);
- // remove dirty flag
- checkpointable = false;
}
protected void recover() {
@@ -581,28 +560,24 @@ public void setSafeKeeper(SafeKeeper safeKeeper) {
}
}
- public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
- isCheckpointingEvent = true;
- if (isCheckpointable()) {
- checkpoint();
- }
- }
-
+ /**
+ * Indicates whether this PE is dirty and therefore checkpointable.
+ * Developers can override this method in order to precisely define the conditions of checkpointability.
+ * @return true if this PE can be checkpointed, false otherwise
+ */
protected boolean isCheckpointable() {
return checkpointable;
}
- protected void setCheckpointable(boolean checkpointable) {
+ /**
+ * Marks the PE as clean or dirty. Only dirty PEs are checkpointed.
+ * Developers can override this method in order to have more control
+ * @param checkpointable true|false for setting the dirty state of the PE
+ */
+ public void setCheckpointable(boolean checkpointable) {
this.checkpointable = checkpointable;
}
- public final void initiateCheckpoint() {
- // enqueue checkpointing event
- if (safeKeeper != null) {
- safeKeeper.generateCheckpoint(this);
- }
- }
-
public byte[] serializeState() {
return safeKeeper.getSerializer().serialize(this);
}
@@ -643,23 +618,6 @@ private void restoreFieldsForClass(Class currentInOldStateClassHierarchy, Abstra
}
/**
- * Subscribes this PE to the checkpointing stream
- */
- private void supplementAdviceForCheckpointingAndRecovery() {
- // don't do anything until both conditions are true
- Logger.getLogger("s4").info(
- "Maybe adding for " + this.getId() + ": " + checkpointingFrequency + " and " + eventAdviceList.size());
- if (checkpointingFrequency > 0 && eventAdviceList.size() > 0) {
- eventAdviceList.add(new EventAdvice(this.getId() + "_checkpointing", "key"));
- }
- }
-
- public void processEvent(RecoveryEvent recoveryEvent) {
- isCheckpointingEvent = true;
- recover();
- }
-
- /**
* This method expires the current PE.
**/
protected void expire() {
@@ -751,7 +709,7 @@ public void run() {
} else if (PeriodicInvokerType.CHECKPOINTING.equals(type)) {
try {
if (pe.isCheckpointable()) {
- pe.initiateCheckpoint();
+ pe.checkpoint();
checkpointCount++;
}
} catch (Exception e) {
View
160 s4-core/src/main/java/org/apache/s4/processor/PEContainer.java
@@ -55,7 +55,6 @@
private int maxQueueSize = 1000;
private boolean trackByKey;
private Map<String, Integer> countByEventType = Collections.synchronizedMap(new HashMap<String, Integer>());
- private SafeKeeper safeKeeper;
private ControlEventProcessor controlEventProcessor = null;
@@ -80,7 +79,7 @@ public void setTrackByKey(boolean trackByKey) {
}
public void setSafeKeeper(SafeKeeper sk) {
- this.safeKeeper = sk;
+ // kept for backward compatibility with existing configuration files
}
public void addProcessor(AbstractPE processor) {
@@ -119,7 +118,7 @@ public void init() {
/*
* (non-Javadoc)
- *
+ *
* @see
* org.apache.s4.processor.AsynchronousEventProcessor#queueWork(org.apache.s4.collector.
* EventWrapper)
@@ -153,7 +152,7 @@ public void queueWork(EventWrapper eventWrapper) {
// run()
/*
* (non-Javadoc)
- *
+ *
* @see org.apache.s4.processor.AsynchronousEventProcessor#getQueueSize()
*/
@Override
@@ -164,9 +163,9 @@ public int getQueueSize() {
/**
* An event is a control event if its stream name begins with the character
* '#'.
- *
+ *
* Control events are handled specially.
- *
+ *
* @param e
* the event wrapper to test
* @return true if and only if e is a control message.
@@ -218,68 +217,61 @@ public void run() {
}
// printPlainPartitionInfoList(event.getCompoundKeyList());
- if (eventWrapper.getStreamName().endsWith("_checkpointing")
- || eventWrapper.getStreamName().endsWith("_recovery")) {
- // in that case, we don't need to iterate over all prototypes and advises:
- // the target PE is specified in the event
- handleCheckpointingOrRecovery(eventWrapper);
- } else {
-
- boolean ctrlEvent = testControlEvent(eventWrapper);
-
- // execute the PEs interested in this event
- for (int i = 0; i < prototypeWrappers.size(); i++) {
- if (logger.isDebugEnabled()) {
- logger.debug("STEP 6 (PEContainer): prototypeWrappers("
- + i
- + ") - "
- + prototypeWrappers.get(i).toString()
- + " - " + eventWrapper.getStreamName());
- }
-
- // first check if this is a control message and handle
- // it if
- // so.
- if (ctrlEvent) {
- if (controlEventProcessor != null) {
- controlEventProcessor.process(eventWrapper,
- prototypeWrappers.get(i));
- }
- continue;
- }
-
- // otherwise, continue processing event.
- List<EventAdvice> adviceList = adviceLists.get(i);
- for (EventAdvice eventAdvice : adviceList) {
- if (eventAdvice.getEventName().equals("*")
- || eventAdvice.getEventName().equals(
- eventWrapper.getStreamName())) {
- // event name matches
- } else {
- continue;
- }
-
- if (eventAdvice.getKey().equals("*")) {
- invokePE(prototypeWrappers.get(i).getPE("*"),
- eventWrapper, null);
- continue;
- }
-
- for (CompoundKeyInfo compoundKeyInfo : eventWrapper
- .getCompoundKeys()) {
- if (eventAdvice.getKey().equals(
- compoundKeyInfo.getCompoundKey())) {
- invokePE(
- prototypeWrappers
- .get(i)
- .getPE(compoundKeyInfo
- .getCompoundValue()),
- eventWrapper, compoundKeyInfo);
- }
- }
- }
- }
+ boolean ctrlEvent = testControlEvent(eventWrapper);
+
+ // execute the PEs interested in this event
+ for (int i = 0; i < prototypeWrappers.size(); i++) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("STEP 6 (PEContainer): prototypeWrappers("
+ + i
+ + ") - "
+ + prototypeWrappers.get(i).toString()
+ + " - " + eventWrapper.getStreamName());
+ }
+
+ // first check if this is a control message and handle
+ // it if
+ // so.
+ if (ctrlEvent) {
+ if (controlEventProcessor != null) {
+ controlEventProcessor.process(eventWrapper,
+ prototypeWrappers.get(i));
+ }
+
+ continue;
+ }
+
+ // otherwise, continue processing event.
+ List<EventAdvice> adviceList = adviceLists.get(i);
+ for (EventAdvice eventAdvice : adviceList) {
+ if (eventAdvice.getEventName().equals("*")
+ || eventAdvice.getEventName().equals(
+ eventWrapper.getStreamName())) {
+ // event name matches
+ } else {
+ continue;
+ }
+
+ if (eventAdvice.getKey().equals("*")) {
+ invokePE(prototypeWrappers.get(i).getPE("*"),
+ eventWrapper, null);
+ continue;
+ }
+
+ for (CompoundKeyInfo compoundKeyInfo : eventWrapper
+ .getCompoundKeys()) {
+ if (eventAdvice.getKey().equals(
+ compoundKeyInfo.getCompoundKey())) {
+ invokePE(
+ prototypeWrappers
+ .get(i)
+ .getPE(compoundKeyInfo
+ .getCompoundValue()),
+ eventWrapper, compoundKeyInfo);
+ }
+ }
+ }
}
endTime = System.currentTimeMillis();
@@ -299,40 +291,6 @@ public void run() {
}
}
- private void handleCheckpointingOrRecovery(EventWrapper eventWrapper) {
- CheckpointingEvent checkpointingEvent = null;
- try {
- checkpointingEvent = (CheckpointingEvent) eventWrapper.getEvent();
- } catch (ClassCastException e) {
- logger.error("Checkpointing stream ["
- + eventWrapper.getStreamName()
- + "] can only handle checkpointing events. Received event is not a checkpointing event; it will be ignored.");
- return;
- }
- // 1. event is targeted towards PE prototype whose name is given by the
- // name of the stream
- // 2. PE id is given by the event
- for (int i = 0; i < prototypeWrappers.size(); i++) {
- if (checkpointingEvent.getSafeKeeperId().getPrototypeId()
- .equals(prototypeWrappers.get(i).getId())) {
-
- // check that PE is subscribed to checkpointing stream
- List<EventAdvice> advices = adviceLists.get(i);
- for (EventAdvice eventAdvice : advices) {
- if (eventAdvice.getEventName().equals(eventWrapper.getStreamName())){
- invokePE(
- prototypeWrappers.get(i).getPE(
- checkpointingEvent.getSafeKeeperId().getKey()),
- eventWrapper, null);
- break;
- }
- }
- }
- }
-
-
- }
-
private void invokePE(AbstractPE pe, EventWrapper eventWrapper,
CompoundKeyInfo compoundKeyInfo) {
try {
View
30 s4-core/src/main/java/org/apache/s4/util/MetricsName.java
@@ -34,23 +34,25 @@
pecontainer_exec_elapse_time("pec_exec_t"), low_level_emitter_msg_out_ct(
"lle_out"), low_level_emitter_out_err_ct("lle_err"), low_level_emitter_qsz(
"lle_qsz"), s4_core_exit_ct("s4_ex_ct"), s4_core_free_mem("s4_fmem"), pe_join_ev_ct(
- "pe_j_ct"), pe_error_count("pe_err"), checkpointing_dropped_from_serialization_queue("cp_ser_dr"),
- checkpointing_dropped_from_storage_queue("cp_sto_dr"),
- checkpointing_added_to_serialization_queue("cp_ser_in"),
- checkpointing_added_to_storage_queue("cp_sto_in");
+ "pe_j_ct"), pe_error_count("pe_err"), checkpointing_dropped_from_serialization_queue("cp_ser_dr"),
+ checkpointing_dropped_from_storage_queue("cp_sto_dr"),
+ checkpointing_added_to_serialization_queue("cp_ser_in"),
+ checkpointing_added_to_storage_queue("cp_sto_in"),
+ checkpointing_fetching_failed("cp_fet_err"),
+ checkpointing_fetching_success("cp_fet_ok");
- private final String eventShortName;
+ private final String eventShortName;
- private MetricsName(String eventShortName) {
- this.eventShortName = eventShortName;
- }
+ private MetricsName(String eventShortName) {
+ this.eventShortName = eventShortName;
+ }
- public String toString() {
- return eventShortName;
- }
+ public String toString() {
+ return eventShortName;
+ }
- public static void main(String[] args) {
- System.out.println(generic_listener_msg_in_ct.toString());
+ public static void main(String[] args) {
+ System.out.println(generic_listener_msg_in_ct.toString());
- }
+ }
}
View
10 s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -5,6 +5,7 @@
import junit.framework.Assert;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.NIOServerCnxn.Factory;
import org.junit.After;
@@ -83,14 +84,17 @@ public void testCheckpointRestorationThroughApplicationEvent()
gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
signalValue1Set.await(10, TimeUnit.SECONDS);
Assert.assertEquals("value1=message1b ; value2=",
- TestUtils.readFile(StatefulTestPE.DATA_FILE));
+ new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
Thread.sleep(2000);
// kill app
forkedS4App.destroy();
// S4App.killS4App(getClass().getName());
- StatefulTestPE.DATA_FILE.delete();
+ try {
+ zk.delete(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, -1);
+ } catch (Exception ignored) {
+ }
forkedS4App = TestUtils.forkS4App(getClass().getName(),
"s4_core_conf_fs_backend.xml");
@@ -106,7 +110,7 @@ public void testCheckpointRestorationThroughApplicationEvent()
// we should get "message1" (checkpointed) instead of "message1b"
// (latest)
Assert.assertEquals("value1=message1 ; value2=message2",
- TestUtils.readFile(StatefulTestPE.DATA_FILE));
+ new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
}
View
243 s4-core/src/test/java/org/apache/s4/ft/StatefulTestPE.java
@@ -1,14 +1,10 @@
package org.apache.s4.ft;
-import org.apache.s4.processor.AbstractPE;
-
-import java.io.File;
-import java.io.FileDescriptor;
-import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.PrintStream;
+import org.apache.s4.processor.AbstractPE;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -16,121 +12,124 @@
public class StatefulTestPE extends AbstractPE implements Watcher {
- String id;
- String value1 = "";
- String value2 = "";
- transient ZooKeeper zk = null;
- transient public static File DATA_FILE = new File(
- System.getProperty("user.dir")
- + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public void output() {
- // TODO Auto-generated method stub
-
- }
-
- public void processEvent(KeyValue event) {
- if (zk == null) {
- try {
- zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
- S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
- }
- try {
-
- if ("value1".equals(event.getKey())) {
- setValue1(event.getValue());
- zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else if ("value2".equals(event.getKey())) {
- setValue2(event.getValue());
- zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } else if ("initiateCheckpoint".equals(event.getKey())) {
- initiateCheckpoint();
- } else {
- throw new RuntimeException("unidentified event: " + event);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public String getValue1() {
- return value1;
- }
-
- public void setValue1(String value1) {
- this.value1 = value1;
- persistValues();
- }
-
- public String getValue2() {
- return value2;
- }
-
- public void setValue2(String value2) {
- this.value2 = value2;
- persistValues();
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- protected void checkpoint() {
- super.checkpoint();
- try {
- zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- }
-
- // NOTE: we use a file as a simple way to keep track of changes
- private void persistValues() {
-
- if (DATA_FILE.exists()) {
- if (!DATA_FILE.delete()) {
- throw new RuntimeException("Cannot delete datafile "
- + DATA_FILE.getAbsolutePath());
- }
- }
- try {
- if (!DATA_FILE.createNewFile()) {
- throw new RuntimeException("Cannot create datafile "
- + DATA_FILE.getAbsolutePath());
- }
- } catch (IOException e) {
- throw new RuntimeException("Cannot create datafile "
- + DATA_FILE.getAbsolutePath());
- }
- try {
- TestUtils.writeStringToFile("value1=" + value1 + " ; value2=" + value2,
- DATA_FILE);
- } catch (IOException e) {
- throw new RuntimeException("Cannot write to datafile "
- + DATA_FILE.getAbsolutePath());
- }
- }
-
- @Override
- public void process(WatchedEvent event) {
- // TODO Auto-generated method stub
-
- }
+ public static final String STATEFUL_TEST_PE_DATA_ZNODE = "/statefulTestPE.data";
+ String id;
+ String value1 = "";
+ String value2 = "";
+ transient ZooKeeper zk = null;
+
+ // transient public static File DATA_FILE = new File(
+ // System.getProperty("user.dir")
+ // + File.separator + "tmp" + File.separator + "StatefulTestPE.data");;
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void processEvent(KeyValue event) {
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if (!S4TestCase.registeredPEs.containsKey(getSafeKeeperId())) {
+ S4TestCase.registeredPEs.put(getSafeKeeperId(), this);
+ }
+ try {
+
+ if ("value1".equals(event.getKey())) {
+ setValue1(event.getValue());
+ zk.create("/value1Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else if ("value2".equals(event.getKey())) {
+ setValue2(event.getValue());
+ zk.create("/value2Set", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } else if ("initiateCheckpoint".equals(event.getKey())) {
+ checkpoint();
+ } else {
+ throw new RuntimeException("unidentified event: " + event);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public String getValue1() {
+ return value1;
+ }
+
+ public void setValue1(String value1) {
+ this.value1 = value1;
+ persistValues();
+ }
+
+ public String getValue2() {
+ return value2;
+ }
+
+ public void setValue2(String value2) {
+ this.value2 = value2;
+ persistValues();
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ protected void checkpoint() {
+ super.checkpoint();
+ try {
+ zk.create("/checkpointed", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ // NOTE: we use a file as a simple way to keep track of changes
+ private void persistValues() {
+ if (zk == null) {
+ try {
+ zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ try {
+ try {
+ zk.delete(STATEFUL_TEST_PE_DATA_ZNODE, -1);
+ } catch (KeeperException e) {
+ if (e instanceof KeeperException.NoNodeException) {
+ // ignore
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ zk.create(STATEFUL_TEST_PE_DATA_ZNODE, new String("value1="
+ + value1 + " ; value2=" + value2).getBytes(),
+ Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ // TODO Auto-generated method stub
+
+ }
}
View
10 s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -180,9 +180,9 @@ public static void stopZookeeperServer(NIOServerCnxn.Factory f)
Assert.assertTrue("waiting for server down",
waitForServerDown("localhost", ZK_PORT, 3000));
}
-
-
-
+
+
+
// List<String> cmdList = new ArrayList<String>();
// cmdList.add(System.getProperty("user.dir")
// + "/src/test/scripts/killJavaProcessForPort.sh");
@@ -297,7 +297,7 @@ public void process(WatchedEvent event) {
}
});
}
-
+
public static void watchAndSignalChangedChildren(String path,
final CountDownLatch latch, final ZooKeeper zk)
throws KeeperException, InterruptedException {
@@ -393,7 +393,7 @@ public static void cleanupTmpDirs() {
deleteDirectoryContents(S4TestCase.DEFAULT_TEST_OUTPUT_DIR);
}
S4TestCase.DEFAULT_STORAGE_DIR.mkdirs();
-
+
}
}
View
53 s4-core/src/test/java/org/apache/s4/ft/rectimeout/BrokenStorage.java
@@ -0,0 +1,53 @@
+package org.apache.s4.ft.rectimeout;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.s4.ft.DefaultFileSystemStateStorage;
+import org.apache.s4.ft.SafeKeeperId;
+import org.apache.s4.ft.StateStorage;
+import org.apache.s4.ft.StorageCallback;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooKeeper;
+
+// triggered by ZK
+public class BrokenStorage implements StateStorage {
+
+ DefaultFileSystemStateStorage storage = new DefaultFileSystemStateStorage();
+
+ CountDownLatch signalFetchable = new CountDownLatch(1);
+
+ public BrokenStorage() {
+ }
+
+ @Override
+ public void saveState(SafeKeeperId key, byte[] state,
+ StorageCallback callback) {
+ storage.saveState(key, state, callback);
+ }
+
+ @Override
+ public byte[] fetchState(SafeKeeperId key) {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ throw new RuntimeException("fetching failed");
+ }
+
+ @Override
+ public Set<SafeKeeperId> fetchStoredKeys() {
+ return storage.fetchStoredKeys();
+ }
+
+
+}
View
130 s4-core/src/test/java/org/apache/s4/ft/rectimeout/RecoveryTimeoutTest.java
@@ -0,0 +1,130 @@
+package org.apache.s4.ft.rectimeout;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.StatefulTestPE;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.json.JSONException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecoveryTimeoutTest extends S4TestCase {
+
+ public static long ZOOKEEPER_PORT = 21810;
+ private Process forkedS4App = null;
+ private static Factory zookeeperServerConnectionFactory = null;
+ private CountDownLatch signalValue2Set;
+
+ @Before
+ public void prepare() throws Exception {
+ TestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+ final ZooKeeper zk = TestUtils.createZkClient();
+ try {
+ zk.delete("/value1Set", -1);
+ } catch (Exception ignored) {
+ }
+ try {
+ // FIXME can't figure out where this is retained
+ zk.delete("/value2Set", -1);
+ } catch (Exception ignored) {
+ }
+ try {
+ // FIXME can't figure out where this is retained
+ zk.delete("/checkpointed", -1);
+ } catch (Exception ignored) {
+ }
+ zk.close();
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ TestUtils.killS4App(forkedS4App);
+ }
+
+ @Test
+ public void testRecoveryTimeout()
+ throws Exception {
+ checkpointAndRecover();
+ signalValue2Set.await(10, TimeUnit.SECONDS);
+
+ // we should NOT get value1 since the checkpoint is not recovered"
+ ZooKeeper zk = TestUtils.createZkClient();
+ Assert.assertEquals("value1= ; value2=message2",
+ new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
+
+ }
+
+ private void checkpointAndRecover() throws IOException,
+ InterruptedException, KeeperException, JSONException {
+ ZooKeeper zk = TestUtils.createZkClient();
+ // 1. instantiate remote S4 app
+ forkedS4App = TestUtils.forkS4App(getClass().getName(),
+ "s4_core_conf_broken_backend.xml");
+ // TODO synchro
+ Thread.sleep(5000);
+
+ CountDownLatch signalValue1Set = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+
+ // 2. generate a simple event that changes the state of the PE
+ // --> this event triggers recovery
+ // we inject a value for value2 field (was for value1 in
+ // checkpointing
+ // test). This should trigger recovery and provide a pe with value1
+ // and
+ // value2 set:
+ // value1 from recovery, and value2 from injected event.
+ EventGenerator gen = new EventGenerator();
+ gen.injectValueEvent(new KeyValue("value1", "message1"), "Stream1", 0);
+ signalValue1Set.await();
+ final CountDownLatch signalCheckpointed = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed,
+ zk);
+ // trigger checkpoint
+ gen.injectValueEvent(new KeyValue("initiateCheckpoint", "blah"),
+ "Stream1", 0);
+ signalCheckpointed.await(10, TimeUnit.SECONDS);
+ // signalCheckpointAddedByBK.await();
+
+ signalValue1Set = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value1Set", signalValue1Set, zk);
+ gen.injectValueEvent(new KeyValue("value1", "message1b"), "Stream1", 0);
+ signalValue1Set.await(10, TimeUnit.SECONDS);
+ Assert.assertEquals("value1=message1b ; value2=",
+ new String(zk.getData(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, null, null)));
+
+ Thread.sleep(2000);
+ // kill app
+ forkedS4App.destroy();
+ // S4App.killS4App(getClass().getName());
+
+
+ try {
+ zk.delete(StatefulTestPE.STATEFUL_TEST_PE_DATA_ZNODE, -1);
+ } catch (Exception ignored) {
+ }
+
+ forkedS4App = TestUtils.forkS4App(getClass().getName(),
+ "s4_core_conf_broken_backend.xml");
+ // TODO synchro
+ Thread.sleep(2000);
+ signalValue2Set = new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value2Set", signalValue2Set, zk);
+
+ gen.injectValueEvent(new KeyValue("value2", "message2"), "Stream1", 0);
+ }
+
+}
View
26 s4-core/src/test/java/org/apache/s4/ft/rectimeout/app_conf.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <!-- <bean id="printEventPE" class="org.apache.s4.processor.PrintEventPE">
+ <property name="id" value="printEventPE"/>
+ <property name="keys">
+ <list>
+ <value>TopicSeen topic</value>
+ </list>
+ </property>
+ </bean> -->
+
+ <bean id="statefulPE" class="org.apache.s4.ft.StatefulTestPE">
+ <property name="id" value="statefulPE"/>
+ <property name="keys">
+ <list>
+ <value>Stream1 key</value>
+ </list>
+ </property>
+ <!-- we set the frequency to 1000 so that it checkpointing does NOT get triggered automatically! -->
+ <property name="checkpointingFrequencyByEventCount" value="1000" />
+ </bean>
+
+
+
+</beans>
View
196 s4-core/src/test/java/org/apache/s4/ft/rectimeout/s4_core_conf_broken_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="org.apache.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="org.apache.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="org.apache.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="org.apache.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="org.apache.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="org.apache.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="org.apache.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="org.apache.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="org.apache.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="org.apache.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="org.apache.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="org.apache.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="org.apache.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="org.apache.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="mockStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="org.apache.s4.ft.LoggingStorageCallbackFactory"/>
+
+ <bean id="mockStateStorage" class="org.apache.s4.ft.rectimeout.BrokenStorage">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ </bean>
+
+
+</beans>
View
6 s4-core/src/test/java/org/apache/s4/ft/rectimeout/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="clock" class="org.apache.s4.util.clock.WallClock"/>
+
+</beans>
View
33 s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -5,6 +5,7 @@
import org.apache.s4.ft.S4TestCase;
import org.apache.s4.ft.TestRedisStateStorage;
import org.apache.s4.ft.TestUtils;
+import org.apache.s4.wordcount.WordClassifier;
import org.apache.s4.wordcount.WordCountTest;
import java.io.File;
@@ -22,23 +23,23 @@
import org.junit.Test;
/**
- *
+ *
* We use 2 lists of words that we inject in a word counting s4 system.
- *
+ *
* After processing the first sentence, we just kill the platform and restart
* it.
- *
+ *
* Then we inject the second sentence.
- *
- *
+ *
+ *
* We verify that no state was lost, i.e. that the words count includes words
* from both the first and the second sentence.
- *
+ *
* NOTE 1: we synchronize through zookeeper to control when to kill the
* application, and when to verify assertions. NOTE 2: we use some additional
* explicit waits for bookkeeper backend so that it gets correctly initialized.
- *
- *
+ *
+ *
*/
public class FTWordCountTest extends S4TestCase {
@@ -104,8 +105,8 @@ public void doTestCheckpointingAndRecovery(String backendConf)
"Sentences", 0);
signalSentence1Processed.await(10, TimeUnit.SECONDS);
Thread.sleep(1000);
-
-
+
+
// crash the app
forkedS4App.destroy();
@@ -149,16 +150,8 @@ public void doTestCheckpointingAndRecovery(String backendConf)
new KeyValue("sentence", WordCountTest.SENTENCE_3),
"Sentences", 0);
signalTextProcessed.await(10, TimeUnit.SECONDS);
- File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
- + File.separator + "wordcount");
- if (!results.exists()) {
- // in case the results file isn't ready yet
- Thread.sleep(1000);
- results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
- + File.separator + "wordcount");
- }
- String s = TestUtils.readFile(results);
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", new String(zk.getData(WordClassifier.WORD_COUNT_ZNODE, null, null)));
}
View
17 s4-core/src/test/java/org/apache/s4/wordcount/WordClassifier.java
@@ -2,6 +2,7 @@
import org.apache.s4.ft.KeyValue;
import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.StatefulTestPE;
import org.apache.s4.ft.TestUtils;
import org.apache.s4.processor.AbstractPE;
@@ -28,6 +29,7 @@
transient private ZooKeeper zk;
private String id;
public final static String ROUTING_KEY = "classifier";
+ public static final String WORD_COUNT_ZNODE = "/wordcount";
public void setId(String id) {
this.id = id;
@@ -59,20 +61,17 @@ public void processEvent(WordCount wordCount) throws IOException,
}
++counter;
if (counter == WordCountTest.TOTAL_WORDS) {
- File results = new File(S4TestCase.DEFAULT_TEST_OUTPUT_DIR
- + File.separator + "wordcount");
- if (results.exists()) {
- if (!results.delete()) {
- throw new RuntimeException("cannot delete results file");
- }
- }
+ try {
+ zk.delete(WordClassifier.WORD_COUNT_ZNODE, -1);
+ } catch (Exception ignored) {
+ }
Set<Entry<String, Integer>> entrySet = counts.entrySet();
StringBuilder sb = new StringBuilder();
for (Entry<String, Integer> entry : entrySet) {
sb.append(entry.getKey() + "=" + entry.getValue() + ";");
}
- TestUtils.writeStringToFile(sb.toString(), results);
-
+ zk.create(WordClassifier.WORD_COUNT_ZNODE, sb.toString().getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
} else {
View
13 s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -34,7 +34,7 @@
+ SENTENCE_2_TOTAL_WORDS + SENTENCE_3_TOTAL_WORDS;
private static Factory zookeeperServerConnectionFactory;
-
+
@Before
public vo