Skip to content
Browse files

provide optional recovery after expiry from persister cache

- also allow injection of persister through persister factory
  • Loading branch information...
1 parent 45efb82 commit b43de357d49711fc319c46865fdb08f67414d4f8 @matthieumorel matthieumorel committed Jan 31, 2012
View
28 s4-core/src/main/java/org/apache/s4/ft/SafeKeeper.java
@@ -22,9 +22,12 @@
import org.apache.s4.emitter.CommLayerEmitter;
import org.apache.s4.processor.AbstractPE;
import org.apache.s4.serialize.SerializerDeserializer;
+import org.apache.s4.util.clock.Clock;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
@@ -256,4 +259,29 @@ public void setMaxOutstandingWriteRequests(int maxOutstandingWriteRequests) {
this.maxOutstandingWriteRequests = maxOutstandingWriteRequests;
}
+ /**
+ * Recovery depends on expiration settings:
+ * a PE may or may not recover its previous state if it was expired, depending on the "recoveryAfterExpiration" setting.
+ * @param abstractPE TODO
+ * @param pe TODO
+ *
+ */
+ public boolean mustRestoreState(AbstractPE pe, int ttl, Clock clock) {
+ if (pe.isRecoveryAfterExpiration()) {
+ return true;
+ } else {
+ // NOTE : ttl is not checkpointed. Get the value from current prototype
+ if (pe.getCacheAddDate()!=-1 && ttl!=-1 &&
+ (pe.getCacheAddDate() + (1000 * ttl)) <= clock.getCurrentTime()
+ ) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Not recovering PE ["+ pe.getSafeKeeperId() + "] because it was expired");
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
+
}
View
4 s4-core/src/main/java/org/apache/s4/persist/ConMapPersister.java
@@ -17,6 +17,7 @@
*/
package org.apache.s4.persist;
+import org.apache.s4.processor.AbstractPE;
import org.apache.s4.util.clock.Clock;
import java.util.Enumeration;
@@ -121,6 +122,9 @@ public void set(String key, Object value, int period) {
ce.value = value;
ce.period = period;
ce.addTime = s4Clock.getCurrentTime();
+ if (value instanceof AbstractPE) {
+ ((AbstractPE)value).setCacheAddDate(ce.addTime);
+ }
cache.put(key, ce);
}
View
4 s4-core/src/main/java/org/apache/s4/persist/HashMapPersister.java
@@ -17,6 +17,7 @@
*/
package org.apache.s4.persist;
+import org.apache.s4.processor.AbstractPE;
import org.apache.s4.util.clock.Clock;
import java.util.ArrayList;
@@ -123,6 +124,9 @@ public void set(String key, Object value, int period) {
ce.value = value;
ce.period = period;
ce.addTime = s4Clock.getCurrentTime();
+ if (value instanceof AbstractPE) {
+ ((AbstractPE)value).setCacheAddDate(ce.addTime);
+ }
cache.put(key, ce);
}
View
31 s4-core/src/main/java/org/apache/s4/processor/AbstractPE.java
@@ -83,7 +83,7 @@ public String getName() {
}
}
- transient private Clock clock;
+ private transient Clock clock;
// FIXME replaces monitor wait on AbstractPE, for triggering possible extra
// thread when checkpointing activated
transient private CountDownLatch s4ClockSetSignal = new CountDownLatch(1);
@@ -120,6 +120,8 @@ public String getName() {
transient private int checkpointableEventCount = 0;
transient private int checkpointsBeforePause = -1;
transient private long checkpointingPauseTimeInMillis;
+ private boolean isRecoveryAfterExpiration;
+ private long cacheAddDate = -1;
transient private OverloadDispatcher overloadDispatcher;
@@ -209,7 +211,7 @@ public void execute(String streamName, CompoundKeyInfo compoundKeyInfo, Object e
// initialize checkpointing event flag
this.isCheckpointingEvent = false;
if (!recoveryAttempted) {
- recover();
+ recover();
recoveryAttempted = true;
}
}
@@ -562,7 +564,9 @@ protected void recover() {
}
try {
AbstractPE peInOldState = deserializeState(serializedState);
- restoreState(peInOldState);
+ if (safeKeeper.mustRestoreState(peInOldState, ttl, clock)) {
+ restoreState(peInOldState);
+ }
} catch (RuntimeException e) {
Logger.getLogger("s4-ft").error("Cannot restore state for key [" + getSafeKeeperId().toString()+"]: "+e.getMessage(), e);
}
@@ -578,6 +582,10 @@ public void setSafeKeeper(SafeKeeper safeKeeper) {
this.safeKeeperSetSignal.countDown();
}
}
+
+ public SafeKeeper getSafeKeeper() {
+ return this.safeKeeper;
+ }
public final void processEvent(InitiateCheckpointingEvent checkpointingEvent) {
isCheckpointingEvent = true;
@@ -777,4 +785,21 @@ public void run() {
}
}
+
+ public void setCacheAddDate(long cacheAddDate) {
+ this.cacheAddDate = cacheAddDate;
+ }
+
+ public long getCacheAddDate() {
+ return cacheAddDate;
+ }
+
+ public boolean isRecoveryAfterExpiration() {
+ return isRecoveryAfterExpiration;
+ }
+
+ public void setRecoveryAfterExpiration(boolean isRecoveryAfterExpiration) {
+ this.isRecoveryAfterExpiration = isRecoveryAfterExpiration;
+ }
+
}
View
4 s4-core/src/main/java/org/apache/s4/processor/PrototypeWrapper.java
@@ -31,7 +31,6 @@
private static Logger logger = Logger.getLogger(PrototypeWrapper.class);
private AbstractPE prototype;
Persister lookupTable;
- SafeKeeper safeKeeper;
public String getId() {
return prototype.getId();
@@ -125,7 +124,4 @@ public int getPECount() {
return prototype.advise();
}
- public void setSafeKeeper(SafeKeeper safeKeeper) {
- this.safeKeeper = safeKeeper;
- }
}
View
3 s4-core/src/test/java/org/apache/s4/ft/CheckpointingTest.java
@@ -31,7 +31,7 @@
public void prepare() throws Exception {
zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
app = new S4App(getClass(), "s4_core_conf_fs_backend.xml");
- app.initializeS4App();
+ app.initializeS4App("app_conf.xml");
}
@After
@@ -101,6 +101,7 @@ public void testCheckpointStorage() throws Exception {
keyValueStringField.setAccessible(true);
keyValueStringField.set(refPE, "value");
refPE.setId("statefulPE");
+ refPE.setCacheAddDate(pe.getCacheAddDate());
refPE.setKeys(new String[] {});
KryoSerDeser kryoSerDeser = new KryoSerDeser();
byte[] refBytes = kryoSerDeser.serialize(refPE);
View
4 s4-core/src/test/java/org/apache/s4/ft/RecoveryTest.java
@@ -51,7 +51,7 @@ public void testCheckpointRestorationThroughApplicationEvent()
final ZooKeeper zk = TestUtils.createZkClient();
// 1. instantiate remote S4 app
forkedS4App = TestUtils.forkS4App(getClass().getName(),
- "s4_core_conf_fs_backend.xml");
+ "s4_core_conf_fs_backend.xml", "app_conf.xml");
// TODO synchro
Thread.sleep(4000);
@@ -93,7 +93,7 @@ public void testCheckpointRestorationThroughApplicationEvent()
StatefulTestPE.DATA_FILE.delete();
forkedS4App = TestUtils.forkS4App(getClass().getName(),
- "s4_core_conf_fs_backend.xml");
+ "s4_core_conf_fs_backend.xml", "app_conf.xml");
// TODO synchro
Thread.sleep(2000);
// trigger recovery by sending application event to set value 2
View
25 s4-core/src/test/java/org/apache/s4/ft/S4App.java
@@ -59,19 +59,23 @@ public S4App(Class testClass, String s4CoreConfFileName) throws Exception {
* @param args
* @throws Exception
*/
- public static void main(String[] args) throws Exception {
- Class testClass = Class.forName(args[0]);
- String s4CoreConfFile = args[1];
- S4App app = new S4App(testClass, s4CoreConfFile);
- S4TestCase.initS4Parameters();
- app.initializeS4App();
-
+ public static void main(String[] args) {
+ try {
+ Class testClass = Class.forName(args[0]);
+ String s4CoreConfFile = args[1];
+ String s4AppConfFile = args[2];
+ S4App app = new S4App(testClass, s4CoreConfFile);
+ S4TestCase.initS4Parameters();
+ app.initializeS4App(s4AppConfFile);
+ } catch (Exception e) {
+ Logger.getLogger(S4App.class).error("Cannot start S4 app", e);
+ }
}
/**
* Performs dependency injection and starts the S4 plaftform.
*/
- public void initializeS4App()
+ public void initializeS4App(String s4AppConfFile)
throws Exception {
initConfigPaths(testClass, s4CoreConfFileName);
ApplicationContext coreContext = null;
@@ -93,6 +97,7 @@ public void initializeS4App()
Watcher w = (Watcher) context.getBean("watcher");
w.setConfigFilename(configBase + s4CoreConfFileName);
+ Logger.getLogger(getClass()).info("initializing app");
// load extension modules
// String[] configFileNames = getModuleConfigFiles(extsHome, prop);
// if (configFileNames.length > 0) {
@@ -105,7 +110,8 @@ public void initializeS4App()
// }
// load application modules
- String applicationConfigFileName = configBase + "app_conf.xml";
+// String applicationConfigFileName = configBase + "app_conf.xml";
+ String applicationConfigFileName = configBase + s4AppConfFile;
String[] configFileUrls = new String[] { "file:"
+ applicationConfigFileName };
context = new FileSystemXmlApplicationContext(configFileUrls, context);
@@ -128,6 +134,7 @@ public void initializeS4App()
}
}
((AbstractPE)bean).setSafeKeeper((SafeKeeper) context.getBean("safeKeeper"));
+
} catch (NoSuchMethodException mnfe) {
// acceptable
}
View
4 s4-core/src/test/java/org/apache/s4/ft/TestUtils.java
@@ -35,7 +35,8 @@
public class TestUtils {
public static final int ZK_PORT = 21810;
- public static Process forkS4App(String testClassName, String s4CoreConfFileName) throws IOException,
+
+ public static Process forkS4App(String testClassName, String s4CoreConfFileName, String s4AppConfFileName) throws IOException,
InterruptedException {
List<String> cmdList = new ArrayList<String>();
cmdList.add("java");
@@ -53,6 +54,7 @@ public static Process forkS4App(String testClassName, String s4CoreConfFileName)
cmdList.add(S4App.class.getName());
cmdList.add(testClassName);
cmdList.add(s4CoreConfFileName);
+ cmdList.add(s4AppConfFileName);
ProcessBuilder pb = new ProcessBuilder(cmdList);
pb.directory(new File(System.getProperty("user.dir")));
View
60 s4-core/src/test/java/org/apache/s4/ft/pecache/CacheTestPE.java
@@ -0,0 +1,60 @@
+package org.apache.s4.ft.pecache;
+
+import java.io.IOException;
+
+import org.apache.log4j.Logger;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.TestUtils;
+import org.apache.s4.processor.AbstractPE;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+
+public class CacheTestPE extends AbstractPE implements Watcher {
+
+ String value = "";
+ transient ZooKeeper zk = null;
+
+ public void processEvent(KeyValue event) {
+ if (zk == null) {
+ Logger.getLogger(getClass()).info("Creating ZK connection");
+ try {
+ zk = new ZooKeeper("localhost:" + TestUtils.ZK_PORT, 4000, this);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ if ("key".equals(event.getKey())) {
+ setValue(this.value + event.getValue());
+ try {
+ Logger.getLogger(getClass()).info("setting ZK /value");
+ zk.create("/value", value.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("unknown event " + event);
+
+ }
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public void output() {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void process(WatchedEvent arg0) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
View
101 s4-core/src/test/java/org/apache/s4/ft/pecache/TestRecoveryInteractionWithPECache.java
@@ -0,0 +1,101 @@
+package org.apache.s4.ft.pecache;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import junit.framework.Assert;
+
+import org.apache.s4.ft.EventGenerator;
+import org.apache.s4.ft.KeyValue;
+import org.apache.s4.ft.S4App;
+import org.apache.s4.ft.S4TestCase;
+import org.apache.s4.ft.TestUtils;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.server.NIOServerCnxn.Factory;
+import org.json.JSONException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestRecoveryInteractionWithPECache extends S4TestCase {
+
+ private static Factory zookeeperServerConnectionFactory;
+ private Process forkedS4App;
+
+ @Before
+ public void prepare() throws IOException, InterruptedException, KeeperException {
+ TestUtils.cleanupTmpDirs();
+ zookeeperServerConnectionFactory = TestUtils.startZookeeperServer();
+
+ }
+
+ @After
+ public void cleanup() throws Exception {
+ TestUtils.stopZookeeperServer(zookeeperServerConnectionFactory);
+ forkedS4App.destroy();
+ }
+
+ @Test
+ public void testNoRecoveryAfterExpiration() throws Exception {
+ byte[] data = testAndReturnValueAfterExpiration("app_conf_noRecoveryAfterExpiration.xml");
+ Assert.assertTrue("got: " + new String(data) , new String(data).equals("value2-"));
+ }
+
+ @Test
+ public void testRecoveryAfterExpiration() throws Exception {
+ byte[] data = testAndReturnValueAfterExpiration("app_conf_recoveryAfterExpiration.xml");
+ Assert.assertTrue(new String(data), new String(data).equals("value1-value2-"));
+ }
+
+
+ private byte[] testAndReturnValueAfterExpiration(String appConfig) throws Exception,
+ IOException, KeeperException, InterruptedException, JSONException {
+
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), "s4_core_conf_fs_backend.xml", appConfig);
+// app.initializeS4App();
+ final ZooKeeper zk = TestUtils.createZkClient();
+
+ CountDownLatch latch1=new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value", latch1,
+ zk);
+
+ EventGenerator generator = new EventGenerator();
+ generator.injectValueEvent(new KeyValue("key", "value1-"),
+ "Values", 0);
+
+ latch1.await(10, TimeUnit.SECONDS);
+
+ byte[] data = zk.getData("/value", false, null);
+ Assert.assertTrue(new String(data).equals("value1-"));
+
+ zk.delete("/value", -1);
+
+ boolean nodeDeleted = false;
+ try {
+ zk.getData("/value", false, null);
+ } catch (NoNodeException e) {
+ nodeDeleted = true;
+ }
+ Assert.assertTrue(nodeDeleted);
+ // NOTE: we need sleeps to wait for checkpoints (in the absence of ZK notifications for that)
+ Thread.sleep(1000);
+ forkedS4App.destroy();
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), "s4_core_conf_fs_backend.xml", appConfig);
+ CountDownLatch latch2=new CountDownLatch(1);
+ TestUtils.watchAndSignalCreation("/value", latch2,
+ zk);
+ generator.injectValueEvent(new KeyValue("key", "value2-"),
+ "Values", 0);
+ latch2.await(10, TimeUnit.SECONDS);
+ data = zk.getData("/value", false, null);
+ return data;
+ }
+
+
+
+
+
+}
View
18 s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_noRecoveryAfterExpiration.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="cacheTestPE" class="org.apache.s4.ft.pecache.CacheTestPE">
+ <property name="id" value="cacheTestPE"/>
+ <property name="keys">
+ <list>
+ <value>Values key</value>
+ </list>
+ </property>
+ <property name="checkpointingFrequencyByEventCount" value="1" />
+ <property name="ttl" value="1"/>
+ <property name="recoveryAfterExpiration" value="false"/>
+ </bean>
+
+</beans>
View
18 s4-core/src/test/java/org/apache/s4/ft/pecache/app_conf_recoveryAfterExpiration.xml
@@ -0,0 +1,18 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="cacheTestPE" class="org.apache.s4.ft.pecache.CacheTestPE">
+ <property name="id" value="cacheTestPE"/>
+ <property name="keys">
+ <list>
+ <value>Values key</value>
+ </list>
+ </property>
+ <property name="checkpointingFrequencyByEventCount" value="1" />
+ <property name="ttl" value="1"/>
+ <property name="recoveryAfterExpiration" value="true"/>
+ </bean>
+
+</beans>
View
196 s4-core/src/test/java/org/apache/s4/ft/pecache/s4_core_conf_fs_backend.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+ <bean id="propertyConfigurer"
+ class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
+ <property name="location">
+ <value>classpath:s4_core.properties</value>
+ </property>
+ <property name="properties">
+ <props>
+ <prop key="kryoSerDeser.initialBufferSize">2048</prop>
+ <prop key="kryoSerDeser.maxBufferSize">262144</prop>
+ </props>
+ </property>
+ <property name="ignoreUnresolvablePlaceholders" value="true" />
+ </bean>
+
+ <bean id="hasher" class="org.apache.s4.dispatcher.partitioner.DefaultHasher" />
+
+ <bean id="commLayerEmitterToAdapter" class="org.apache.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="listenerAppName" value="${adapter_app_name}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="commLayerEmitter" class="org.apache.s4.emitter.CommLayerEmitter"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="listener" ref="rawListener" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="serDeser" class="org.apache.s4.serialize.KryoSerDeser">
+ <property name="initialBufferSize" value="${kryoSerDeser.initialBufferSize}" />
+ <property name="maxBufferSize" value="${kryoSerDeser.maxBufferSize}" />
+ </bean>
+
+ <!--START: Dispatchers for control event processor. If stream name in Response
+ is @adapter or @client, then the event is sent to the adapter (via ctrlDispatcherAdapter).
+ Else it is sent to the S4 cluster itself (via ctrlDispatcherS4) -->
+ <bean id="ctrlDispatcher" class="org.apache.s4.dispatcher.MultiDispatcher">
+ <property name="dispatchers">
+ <list>
+ <ref bean="ctrlDispatcherFilteredS4" />
+ <ref bean="ctrlDispatcherFilteredAdapter" />
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredAdapter" class="org.apache.s4.dispatcher.StreamSelectingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherAdapter" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="ctrlDispatcherFilteredS4" class="org.apache.s4.dispatcher.StreamExcludingDispatcher">
+ <property name="dispatcher" ref="ctrlDispatcherS4" />
+ <property name="streams">
+ <list>
+ <value>@${adapter_app_name}</value>
+ </list>
+ </property>
+ </bean>
+
+ <bean id="genericPartitioner" class="org.apache.s4.dispatcher.partitioner.DefaultPartitioner">
+ <property name="hasher" ref="hasher" />
+ <property name="debug" value="false" />
+ </bean>
+
+ <bean id="ctrlDispatcherS4" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="ctrlDispatcherAdapter" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="genericPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+ <!-- END: Dispatchers for control events -->
+
+ <!-- Control Events handler -->
+ <bean id="ctrlHandler" class="org.apache.s4.processor.ControlEventProcessor">
+ <property name="dispatcher" ref="ctrlDispatcher" />
+ </bean>
+
+ <bean id="peContainer" class="org.apache.s4.processor.PEContainer"
+ init-method="init" lazy-init="true">
+ <property name="maxQueueSize" value="${pe_container_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ <property name="trackByKey" value="true" />
+ <property name="clock" ref="clock" />
+ <property name="controlEventProcessor" ref="ctrlHandler" />
+ <property name="safeKeeper" ref="safeKeeper" />
+ </bean>
+
+ <bean id="rawListener" class="org.apache.s4.listener.CommLayerListener"
+ init-method="init">
+ <property name="serDeser" ref="serDeser" />
+ <property name="clusterManagerAddress" value="${zk_address}" />
+ <property name="appName" value="${s4_app_name}" />
+ <property name="maxQueueSize" value="${listener_max_queue_size}" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="eventListener" class="org.apache.s4.collector.EventListener"
+ init-method="init">
+ <property name="rawListener" ref="rawListener" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="monitor" ref="monitor" />
+ </bean>
+
+ <bean id="monitor" class="org.apache.s4.logger.Log4jMonitor" lazy-init="true"
+ init-method="init">
+ <property name="flushInterval" value="30" />
+ <property name="loggerName" value="monitor" />
+ </bean>
+
+ <bean id="watcher" class="org.apache.s4.util.Watcher" init-method="init"
+ lazy-init="true">
+ <property name="monitor" ref="monitor" />
+ <property name="peContainer" ref="peContainer" />
+ <property name="minimumMemory" value="52428800" />
+ </bean>
+
+
+
+
+ <!-- Some useful beans related to client-adapter for apps -->
+
+ <!-- Dispatcher to send to all adapter nodes. -->
+ <bean id="dispatcherToClientAdapters" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="broadcastPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitterToAdapter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <!-- Partitioner to achieve broadcast -->
+ <bean id="broadcastPartitioner" class="org.apache.s4.dispatcher.partitioner.BroadcastPartitioner" />
+
+
+
+ <bean id="loopbackDispatcher" class="org.apache.s4.dispatcher.Dispatcher"
+ init-method="init">
+ <property name="partitioners">
+ <list>
+ <ref bean="loopbackPartitioner" />
+ </list>
+ </property>
+ <property name="eventEmitter" ref="commLayerEmitter" />
+ <property name="loggerName" value="s4" />
+ </bean>
+
+ <bean id="loopbackPartitioner" class="org.apache.s4.dispatcher.partitioner.LoopbackPartitioner">
+ <property name="eventEmitter" ref="commLayerEmitter"/>
+ </bean>
+
+ <bean id="safeKeeper" class="org.apache.s4.ft.SafeKeeper" init-method="init">
+ <property name="stateStorage" ref="fsStateStorage" />
+ <property name="loopbackDispatcher" ref="loopbackDispatcher" />
+ <property name="serializer" ref="serDeser"/>
+ <property name="hasher" ref="hasher"/>
+ <property name="storageCallbackFactory" ref="loggingStorageCallbackFactory"/>
+ </bean>
+
+ <bean id="loggingStorageCallbackFactory" class="org.apache.s4.ft.LoggingStorageCallbackFactory"/>
+
+ <bean id="fsStateStorage" class="org.apache.s4.ft.DefaultFileSystemStateStorage" init-method="init">
+ <!-- if not specified, default is <current_dir>/tmp/storage
+ <property name="storageRootPath" value="${storage_root_path}" /> -->
+ </bean>
+
+
+</beans>
View
6 s4-core/src/test/java/org/apache/s4/ft/pecache/wall_clock.xml
@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
+
+ <bean id="clock" class="org.apache.s4.util.clock.WallClock"/>
+
+</beans>
View
6 s4-core/src/test/java/org/apache/s4/ft/wordcount/FTWordCountTest.java
@@ -83,7 +83,7 @@ public void doTestCheckpointingAndRecovery(String backendConf)
throws Exception {
final ZooKeeper zk = TestUtils.createZkClient();
- forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
CountDownLatch signalTextProcessed = new CountDownLatch(1);
TestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed,
@@ -110,7 +110,7 @@ public void doTestCheckpointingAndRecovery(String backendConf)
forkedS4App.destroy();
// recovering and making sure checkpointing still works
- forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
// add authorizations for continuing processing. Without these, the
// WordClassifier processed keeps waiting
@@ -136,7 +136,7 @@ public void doTestCheckpointingAndRecovery(String backendConf)
// crash the app
forkedS4App.destroy();
- forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf);
+ forkedS4App = TestUtils.forkS4App(getClass().getName(), backendConf, "app_conf.xml");
// add authorizations for continuing processing. Without these, the
// WordClassifier processed keeps waiting
View
2 s4-core/src/test/java/org/apache/s4/processor/TestPrototypeWrapper.java
@@ -2,8 +2,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import org.apache.s4.util.clock.WallClock;
+import org.apache.s4.util.clock.WallClock;
import org.junit.Test;
public class TestPrototypeWrapper
View
2 s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -46,7 +46,7 @@ public void prepare() throws IOException, InterruptedException, KeeperException
public void testSimple() throws Exception {
S4App app = new S4App(getClass(), "s4_core_conf.xml");
- app.initializeS4App();
+ app.initializeS4App("app_conf.xml");
final ZooKeeper zk = TestUtils.createZkClient();
CountDownLatch signalTextProcessed = new CountDownLatch(1);

0 comments on commit b43de35

Please sign in to comment.
Something went wrong with that request. Please try again.