Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-184] Call the flush method of CloseOnFlushWriterWrapper when… #2040

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -28,6 +28,7 @@
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.records.FlushControlMessageHandler;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.Decorator;
import org.apache.gobblin.util.FinalState;
Expand Down Expand Up @@ -126,7 +127,13 @@ public State getFinalState() {

@Override
public ControlMessageHandler getMessageHandler() {
return this.writer.getMessageHandler();
// if close on flush is configured then create a handler that will invoke the wrapper's flush to perform close
// on flush operations, otherwise return the wrapped writer's handler.
if (this.closeOnFlush) {
return new FlushControlMessageHandler(this);
} else {
return this.writer.getMessageHandler();
}
}

/**
Expand Down
Expand Up @@ -26,6 +26,9 @@

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.records.ControlMessageHandler;
import org.apache.gobblin.stream.ControlMessage;
import org.apache.gobblin.stream.FlushControlMessage;
import org.apache.gobblin.stream.RecordEnvelope;

public class CloseOnFlushWriterWrapperTest {
Expand All @@ -40,12 +43,13 @@ public void testCloseOnFlushDisabled()
byte[] record = new byte[]{'a', 'b', 'c', 'd'};

writer.writeEnvelope(new RecordEnvelope(record));
writer.flush();
writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));

Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
Assert.assertEquals(dummyWriters.get(0).closed, false);
Assert.assertEquals(dummyWriters.get(0).committed, false);
Assert.assertFalse(dummyWriters.get(0).closed);
Assert.assertFalse(dummyWriters.get(0).committed);
Assert.assertTrue(dummyWriters.get(0).handlerCalled);
}

@Test
Expand All @@ -59,12 +63,14 @@ public void testCloseOnFlushEnabled()
byte[] record = new byte[]{'a', 'b', 'c', 'd'};

writer.writeEnvelope(new RecordEnvelope(record));
writer.flush();
writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));

Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
Assert.assertEquals(dummyWriters.get(0).closed, true);
Assert.assertEquals(dummyWriters.get(0).committed, true);
Assert.assertTrue(dummyWriters.get(0).closed);
Assert.assertTrue(dummyWriters.get(0).committed);
// handler from CloseOnFlushWriterWrapper should have been called instead
Assert.assertFalse(dummyWriters.get(0).handlerCalled);
}

@Test
Expand All @@ -78,22 +84,24 @@ public void testWriteAfterFlush()
byte[] record = new byte[]{'a', 'b', 'c', 'd'};

writer.writeEnvelope(new RecordEnvelope(record));
writer.flush();
writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));

Assert.assertEquals(dummyWriters.size(), 1);
Assert.assertEquals(dummyWriters.get(0).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(0).flushCount, 1);
Assert.assertEquals(dummyWriters.get(0).closed, true);
Assert.assertEquals(dummyWriters.get(0).committed, true);
Assert.assertTrue(dummyWriters.get(0).closed);
Assert.assertTrue(dummyWriters.get(0).committed);
Assert.assertFalse(dummyWriters.get(0).handlerCalled);

writer.writeEnvelope(new RecordEnvelope(record));
writer.flush();
writer.getMessageHandler().handleMessage(new FlushControlMessage(new FlushControlMessage.FlushReason("flush")));

Assert.assertEquals(dummyWriters.size(), 2);
Assert.assertEquals(dummyWriters.get(1).recordsWritten(), 1);
Assert.assertEquals(dummyWriters.get(1).flushCount, 1);
Assert.assertEquals(dummyWriters.get(1).closed, true);
Assert.assertEquals(dummyWriters.get(1).committed, true);
Assert.assertTrue(dummyWriters.get(1).closed);
Assert.assertTrue(dummyWriters.get(1).committed);
Assert.assertFalse(dummyWriters.get(1).handlerCalled);
}

private CloseOnFlushWriterWrapper getCloseOnFlushWriter(List<DummyWriter> dummyWriters, WorkUnitState state) {
Expand All @@ -113,6 +121,7 @@ private static class DummyWriter implements DataWriter<byte[]> {
private int flushCount = 0;
private boolean committed = false;
private boolean closed = false;
private boolean handlerCalled = false;

DummyWriter() {
}
Expand Down Expand Up @@ -152,6 +161,19 @@ public void close()
this.closed = true;
}

@Override
public ControlMessageHandler getMessageHandler() {
return new ControlMessageHandler() {
@Override
public void handleMessage(ControlMessage message) {
handlerCalled = true;
if (message instanceof FlushControlMessage) {
flush();
}
}
};
}

@Override
public void flush() {
this.flushCount++;
Expand Down