Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

- APEX-256 Added unit tests to verify that checkpointed is or is not … #163

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
187 changes: 182 additions & 5 deletions engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
Expand Up @@ -18,17 +18,28 @@
*/
package com.datatorrent.stram.engine;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;

import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;

import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.*;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.Operator.CheckpointListener;
import com.datatorrent.api.Operator.ProcessingMode;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
Expand All @@ -38,10 +49,40 @@
*/
public class GenericNodeTest
{
public static class GenericOperator implements Operator
@Rule
public FSTestWatcher testMeta = new FSTestWatcher();

public static class FSTestWatcher extends TestWatcher
{
public org.junit.runner.Description desc;

public String getDir()
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method gets called twice for a test. Instead have a directory variable defined and initialize that variable with "target/" + desc.getClassName() + "/" + desc.getMethodName() in starting(). You don't need to store description variable

String methodName = desc.getMethodName();
String className = desc.getClassName();
return "target/" + className + "/" + methodName;
}

@Override
protected void starting(org.junit.runner.Description description)
{
this.desc = description;
}

@Override
protected void finished(Description description)
{
super.finished(description);
FileUtils.deleteQuietly(new File(getDir()));
}
}

public static class GenericOperator implements Operator, CheckpointListener
{
long beginWindowId;
long endWindowId;
public volatile boolean checkpointed = false;

public final transient DefaultInputPort<Object> ip1 = new DefaultInputPort<Object>()
{
@Override
Expand Down Expand Up @@ -79,15 +120,23 @@ public void endWindow()
@Override
public void setup(Context.OperatorContext context)
{
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void teardown()
{
throw new UnsupportedOperationException("Not supported yet.");
}

@Override
public void checkpointed(long windowId)
{
checkpointed = true;
}

@Override
public void committed(long windowId)
{
}
}

@Test
Expand Down Expand Up @@ -296,4 +345,132 @@ public void run()
Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
}

/**
* Tests to make sure that {@link CheckpointListener#checkpointed(long) is called
* immediately after copying to hdfs in the case of exactly once processing mode.
* @throws Exception
*/
@Test
public void testCheckpointedCallTestExactlyOnce() throws Exception
{
asyncCheckpointedCalledHelper(ProcessingMode.EXACTLY_ONCE);
}

/**
* Tests to make sure that {@link CheckpointListener#checkpointed(long) is NOT called immediately after the asynchronous
* task of copying to hdfs is triggered, in the at least once processing mode.
* @throws Exception
*/
@Test
public void testCheckpointedCallTestAtleastOnce() throws Exception
{
asyncCheckpointedCalledHelper(ProcessingMode.AT_LEAST_ONCE);
}

/**
* Tests to make sure that {@link CheckpointListener#checkpointed(long) is NOT called immediately after the asynchronous
* task of copying to hdfs is triggered, in the at most once processing mode.
* @throws Exception
*/
@Test
public void testCheckpointedCallTestAtModeOnce() throws Exception
{
asyncCheckpointedCalledHelper(ProcessingMode.AT_MOST_ONCE);
}

private void asyncCheckpointedCalledHelper(ProcessingMode processingMode) throws Exception
{
long maxSleep = 5000;
long sleeptime = 25L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why two different ways? 25L and 5000

GenericOperator go = new GenericOperator();
AsyncFSStorageAgent storageAgent = new TestAsyncFSStorageAgent(testMeta.getDir(), new Configuration());
DefaultAttributeMap amap = new DefaultAttributeMap();
amap.put(OperatorContext.STORAGE_AGENT, storageAgent);
amap.put(OperatorContext.PROCESSING_MODE, processingMode);
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, amap, null));
gn.setId(1);
gn.APPLICATION_WINDOW_COUNT = 1;
gn.CHECKPOINT_WINDOW_COUNT = 1;
DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);

gn.connectInputPort("ip1", reservoir1);
gn.connectInputPort("ip2", reservoir2);
gn.connectOutputPort("op", Sink.BLACKHOLE);

final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
{
@Override
public void run()
{
ab.set(true);
gn.activate();
gn.run();
gn.deactivate();
}

};
t.start();

long interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
}
while ((ab.get() == false) && (interval < maxSleep));


int controlTupleCount = gn.controlTupleCount;
Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);

reservoir1.add(beginWindow1);
reservoir2.add(beginWindow1);

interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
}
while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId);
controlTupleCount = gn.controlTupleCount;

Tuple endWindow1 = new EndWindowTuple(0x1L);

gn.doCheckpoint = true;

reservoir1.add(endWindow1);
reservoir2.add(endWindow1);

interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
}
while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId);

if (processingMode == ProcessingMode.EXACTLY_ONCE) {
Assert.assertTrue(go.checkpointed);
} else {
Assert.assertFalse(go.checkpointed);
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no endStreamTuple passed or shutdown call on GenericNode gn


public static class TestAsyncFSStorageAgent extends AsyncFSStorageAgent
{
private static final long serialVersionUID = 201511111120L;

public TestAsyncFSStorageAgent(String path, Configuration conf)
{
super(path, conf);
}

@Override
public void copyToHDFS(final int operatorId, final long windowId) throws IOException
{
//Do nothing
}
}
}