Skip to content

Commit

Permalink
NIFI-4152 Addressing code review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Aug 7, 2017
1 parent 89905aa commit b5f3a36
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 13 deletions.
Expand Up @@ -360,10 +360,12 @@ protected void relinquishSender(final ChannelSender sender) {
boolean returned = senderPool.offer(sender);
// if the pool is full then close the sender.
if (!returned) {
getLogger().debug("Sender wasn't returned because queue was full, closing sender");
sender.close();
}
} else {
// probably already closed here, but quietly close anyway to be safe.
getLogger().debug("Sender is not connected, closing sender");
sender.close();
}
}
Expand Down
Expand Up @@ -98,7 +98,37 @@ public void close() {
}

public OutputStream getOutputStream() {
return socketChannelOutput;
return new OutputStream() {
@Override
public void write(int b) throws IOException {
socketChannelOutput.write(b);
}

@Override
public void write(byte[] b) throws IOException {
socketChannelOutput.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
socketChannelOutput.write(b, off, len);
}

@Override
public void close() throws IOException {
socketChannelOutput.close();
}

@Override
public void flush() throws IOException {
socketChannelOutput.flush();
updateLastUsed();
}
};
}

private void updateLastUsed() {
this.lastUsed = System.currentTimeMillis();
}

}
Expand Up @@ -58,6 +58,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NetworkInterface;
import java.net.SocketTimeoutException;
import java.nio.channels.ServerSocketChannel;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -108,7 +109,7 @@ public class ListenTCPRecord extends AbstractProcessor {
.displayName("Read Timeout")
.description("The amount of time to wait before timing out when reading from a connection.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 seconds")
.defaultValue("10 seconds")
.required(true)
.build();

Expand Down Expand Up @@ -337,7 +338,33 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
recordReader = socketRecordReader.createRecordReader(flowFile, getLogger());
}

Record record = recordReader.nextRecord();
Record record;
try {
record = recordReader.nextRecord();
} catch (final Exception e) {
boolean timeout = false;

// some of the underlying record libraries wrap the real exception in RuntimeException, so check each
// throwable (starting with the current one) to see if its a SocketTimeoutException
Throwable cause = e;
while (cause != null) {
if (cause instanceof SocketTimeoutException) {
timeout = true;
break;
}
cause = cause.getCause();
}

if (timeout) {
getLogger().debug("Timeout reading records, will try again later", e);
socketReaders.offer(socketRecordReader);
session.remove(flowFile);
return;
} else {
throw e;
}
}

if (record == null) {
getLogger().debug("No records available from {}, closing connection", new Object[]{getRemoteAddress(socketRecordReader)});
IOUtils.closeQuietly(socketRecordReader);
Expand All @@ -362,11 +389,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
// if keeping then null out the record to break out of the loop, which will transfer what we have and close the connection
try {
record = recordReader.nextRecord();
} catch (final SocketTimeoutException ste) {
getLogger().debug("Timeout reading records, will try again later", ste);
break;
} catch (final Exception e) {
if (ERROR_HANDLING_DISCARD.getValue().equals(readerErrorHandling)) {
throw e;
} else {
getLogger().error("Error reading records: " + e.getMessage(), e);
record = null;
}
}
Expand Down Expand Up @@ -402,15 +431,8 @@ record = null;
session.transfer(flowFile, REL_SUCCESS);
}

// if record is not null then we broke out of the loop because we reached the batch size which means there are
// possibly still more records available so we need to re-queue for further processing
if (record != null && !socketRecordReader.isClosed()) {
getLogger().debug("More records may be available, re-queuing for further processing");
socketReaders.offer(socketRecordReader);
} else {
getLogger().debug("No records available, or socket is closed, closing SocketRecordReader");
IOUtils.closeQuietly(socketRecordReader);
}
getLogger().debug("Re-queuing connection for further processing...");
socketReaders.offer(socketRecordReader);

} catch (Exception e) {
getLogger().error("Error processing records: " + e.getMessage(), e);
Expand Down
Expand Up @@ -214,8 +214,10 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
getLogger().error("Exception while handling a process session, transferring {} to failure.", new Object[] { flowFile }, e);
} finally {
if (closeSender) {
getLogger().debug("Closing sender");
sender.close();
} else {
getLogger().debug("Relinquishing sender");
relinquishSender(sender);
}
}
Expand Down

0 comments on commit b5f3a36

Please sign in to comment.