Skip to content

Commit

Permalink
[FLINK-2536] [streaming] Cleanups and improvements on SocketClientSink
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Sep 21, 2015
1 parent fd354ba commit b9663c4
Show file tree
Hide file tree
Showing 2 changed files with 391 additions and 281 deletions.
Expand Up @@ -17,178 +17,249 @@

package org.apache.flink.streaming.api.functions.sink;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.streaming.util.serialization.SerializationSchema;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

/**
* Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
* <p>
* The sink can be set to retry message sends after the sending failed.
* <p>
* The sink can be set to 'autoflush', in which case the socket stream is flushed after every message. This
* significantly reduced throughput, but also decreases message latency.
*
* @param <IN> data to be written into the Socket.
*/
public class SocketClientSink<IN> extends RichSinkFunction<IN> {
protected static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);

private static final int CONNECTION_RETRY_DELAY = 500;


private final SerializableObject lock = new SerializableObject();
private final SerializationSchema<IN, byte[]> schema;
private final String hostName;
private final int port;
private final SerializationSchema<IN, byte[]> schema;
private final int maxNumRetries;
private final boolean autoFlush;

private transient Socket client;
private transient DataOutputStream dataOutputStream;
private long maxRetry;
private boolean retryForever;
private boolean isRunning;
protected long retries;
private final SerializableObject lock;
private transient OutputStream outputStream;

private int retries;

private static final int CONNECTION_RETRY_SLEEP = 1000;
private volatile boolean isRunning = true;

/**
* Creates a new SocketClientSink. The sink will not attempt to retry connections upon failure
* and will not auto-flush the stream.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
this(hostName, port, schema, 0);
}

/**
* Default constructor.
* Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
* A value of -1 for the number of retries will cause the system to retry an infinite number of times.
* The sink will not auto-flush the stream.
*
* @param hostName Host of the Socket server.
* @param port Port of the Socket.
* @param schema Schema of the data.
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, long maxRetry) {
this.hostName = hostName;
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema, int maxNumRetries) {
this(hostName, port, schema, maxNumRetries, false);
}

/**
* Creates a new SocketClientSink that retries connections upon failure up to a given number of times.
* A value of -1 for the number of retries will cause the system to retry an infinite number of times.
*
* @param hostName Hostname of the server to connect to.
* @param port Port of the server.
* @param schema Schema used to serialize the data into bytes.
* @param maxNumRetries The maximum number of retries after a message send failed.
* @param autoflush Flag to indicate whether the socket stream should be flushed after each message.
*/
public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema,
int maxNumRetries, boolean autoflush)
{
checkArgument(port > 0 && port < 65536, "port is out of range");
checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)");

this.hostName = checkNotNull(hostName, "hostname must not be null");
this.port = port;
this.schema = schema;
this.maxRetry = maxRetry;
this.retryForever = maxRetry < 0;
this.isRunning = false;
this.retries = 0;
this.lock = new SerializableObject();
this.schema = checkNotNull(schema);
this.maxNumRetries = maxNumRetries;
this.autoFlush = autoflush;
}

// ------------------------------------------------------------------------
// Life cycle
// ------------------------------------------------------------------------

/**
* Initializes the connection to Socket.
* Initialize the connection with the Socket in the server.
* @param parameters Configuration.
*/
public void intializeConnection() {
OutputStream outputStream;
@Override
public void open(Configuration parameters) throws Exception {
try {
client = new Socket(hostName, port);
outputStream = client.getOutputStream();
isRunning = true;
} catch (IOException e) {
throw new RuntimeException("Cannot initialize connection to socket server at " + hostName + ":" + port, e);
synchronized (lock) {
createConnection();
}
}
catch (IOException e) {
throw new IOException("Cannot connect to socket server at " + hostName + ":" + port, e);
}
dataOutputStream = new DataOutputStream(outputStream);
}



/**
* Called when new data arrives to the sink, and forwards it to Socket.
*
* @param value
* The incoming data
* @param value The value to write to the socket.
*/
@Override
public void invoke(IN value) throws Exception {
byte[] msg = schema.serialize(value);
try {
dataOutputStream.write(msg);
} catch (IOException e) {
LOG.error("Cannot send message " + value +
" to socket server at " + hostName + ":" + port + ". Caused by " + e.getMessage() +
". Trying to reconnect.", e);
retries = 0;
boolean success = false;
while ((retries < maxRetry || retryForever) && !success && isRunning){
try {

if (dataOutputStream != null) {
dataOutputStream.close();
outputStream.write(msg);
if (autoFlush) {
outputStream.flush();
}
}
catch (IOException e) {
// if no re-tries are enable, fail immediately
if (maxNumRetries == 0) {
throw new IOException("Failed to send message '" + value + "' to socket server at "
+ hostName + ":" + port + ". Connection re-tries are not enabled.", e);
}

LOG.error("Failed to send message '" + value + "' to socket server at " + hostName + ":" + port +
". Trying to reconnect..." , e);

// do the retries in locked scope, to guard against concurrent close() calls
// note that the first re-try comes immediately, without a wait!

synchronized (lock) {
IOException lastException = null;
retries = 0;

while (isRunning && (maxNumRetries < 0 || retries < maxNumRetries)) {

// first, clean up the old resources
try {
if (outputStream != null) {
outputStream.close();
}
}

if (client != null && !client.isClosed()) {
client.close();
catch (IOException ee) {
LOG.error("Could not close output stream from failed write attempt", ee);
}

retries++;

client = new Socket(hostName, port);
dataOutputStream = new DataOutputStream(client.getOutputStream());
dataOutputStream.write(msg);
success = true;

} catch(IOException ee) {
LOG.error("Reconnect to socket server and send message failed. Caused by " +
ee.getMessage() + ". Retry time(s):" + retries);

try {
synchronized (lock) {
lock.wait(CONNECTION_RETRY_SLEEP);
if (client != null) {
client.close();
}
} catch(InterruptedException eee) {
break;
}
catch (IOException ee) {
LOG.error("Could not close socket from failed write attempt", ee);
}

// try again
retries++;

try {
// initialize a new connection
createConnection();

// re-try the write
outputStream.write(msg);

// success!
return;
}
catch (IOException ee) {
lastException = ee;
LOG.error("Re-connect to socket server and send message failed. Retry time(s): " + retries, ee);
}

// wait before re-attempting to connect
lock.wait(CONNECTION_RETRY_DELAY);
}

// throw an exception if the task is still running, otherwise simply leave the method
if (isRunning) {
throw new IOException("Failed to send message '" + value + "' to socket server at "
+ hostName + ":" + port + ". Failed after " + retries + " retries.", lastException);
}
}
if (!success) {
throw new RuntimeException("Cannot send message " + value +
" to socket server at " + hostName + ":" + port, e);
}
}
}

/**
* Closes the connection of the Socket client.
* Closes the connection with the Socket server.
*/
private void closeConnection(){
try {
isRunning = false;

if (dataOutputStream != null) {
dataOutputStream.close();
}

if (client != null && !client.isClosed()) {
client.close();
}

if (lock != null) {
synchronized (lock) {
lock.notifyAll();
@Override
public void close() throws Exception {
// flag this as not running any more
isRunning = false;

// clean up in locked scope, so there is no concurrent change to the stream and client
synchronized (lock) {
// we notify first (this statement cannot fail). The notified thread will not continue
// anyways before it can re-acquire the lock
lock.notifyAll();

try {
if (outputStream != null) {
outputStream.close();
}
}
} catch (IOException e) {
throw new RuntimeException("Error while closing connection with socket server at "
+ hostName + ":" + port, e);
} finally {
if (client != null) {
try {
finally {
if (client != null) {
client.close();
} catch (IOException e) {
throw new RuntimeException("Cannot close connection with socket server at "
+ hostName + ":" + port, e);
}
}
}
}

/**
* Initialize the connection with the Socket in the server.
* @param parameters Configuration.
*/
@Override
public void open(Configuration parameters) {
intializeConnection();
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------

private void createConnection() throws IOException {
client = new Socket(hostName, port);
client.setKeepAlive(true);
client.setTcpNoDelay(true);

outputStream = client.getOutputStream();
}

/**
* Closes the connection with the Socket server.
*/
@Override
public void close() {
closeConnection();
// ------------------------------------------------------------------------
// For testing
// ------------------------------------------------------------------------

int getCurrentNumberOfRetries() {
return retries;
}

}

0 comments on commit b9663c4

Please sign in to comment.