Skip to content

Commit

Permalink
NIFI-4152 Initial commit of ListenTCPRecord
Browse files Browse the repository at this point in the history
  • Loading branch information
bbende committed Jul 6, 2017
1 parent e6b166a commit b021ca8
Show file tree
Hide file tree
Showing 11 changed files with 1,186 additions and 0 deletions.
Expand Up @@ -68,4 +68,5 @@ public void close() {
IOUtils.closeQuietly(sslChannel);
sslChannel = null;
}

}
Expand Up @@ -95,4 +95,5 @@ public void close() {
socketChannelOutput = null;
channel = null;
}

}
Expand Up @@ -32,11 +32,19 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-security-utils</artifactId>
</dependency>
<!-- Other modules using nifi-standard-record-utils are expected to have these APIs available, typically through a NAR dependency -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
Expand Down
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.listen;

import java.io.Closeable;
import java.io.IOException;

public class IOUtils {

public static void closeQuietly(final Closeable closeable) {
if (closeable == null) {
return;
}
try {

closeable.close();
} catch (final IOException ioe) {

}
}

}
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;

/**
* Encapsulates an SSLSocketChannel and a RecordReader created for the given channel.
*/
public class SSLSocketChannelRecordReader implements SocketChannelRecordReader {

private final SocketChannel socketChannel;
private final SSLSocketChannel sslSocketChannel;
private final RecordReaderFactory readerFactory;
private final SocketChannelRecordReaderDispatcher dispatcher;

private RecordReader recordReader;

public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
final SSLSocketChannel sslSocketChannel,
final RecordReaderFactory readerFactory,
final SocketChannelRecordReaderDispatcher dispatcher) {
this.socketChannel = socketChannel;
this.sslSocketChannel = sslSocketChannel;
this.readerFactory = readerFactory;
this.dispatcher = dispatcher;
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream in = new SSLSocketChannelInputStream(sslSocketChannel);
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
return recordReader;
}

@Override
public RecordReader getRecordReader() {
return recordReader;
}

@Override
public InetAddress getRemoteAddress() {
return socketChannel.socket().getInetAddress();
}

@Override
public boolean isClosed() {
return sslSocketChannel.isClosed();
}

@Override
public void close() {
IOUtils.closeQuietly(recordReader);
IOUtils.closeQuietly(sslSocketChannel);
dispatcher.connectionCompleted();
}

}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;

/**
* Encapsulates a SocketChannel and a RecordReader for the channel.
*/
public interface SocketChannelRecordReader extends Closeable {

/**
* Currently a RecordReader can only be created with a FlowFile. Since we won't have a FlowFile at the time
* a connection is accepted, this method will be used to lazily create the RecordReader later. Eventually this
* method should be removed and the reader should be passed in through the constructor.
*
*
* @param flowFile the flow file we are creating the reader for
* @param logger the logger of the component creating the reader
* @return a RecordReader
*
* @throws IllegalStateException if create is called after a reader has already been created
*/
RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;

/**
* @return the RecordReader created by calling createRecordReader, or null if one has not been created yet
*/
RecordReader getRecordReader();

/**
* @return the remote address of the underlying channel
*/
InetAddress getRemoteAddress();

/**
* @return true if the underlying channel is closed, false otherwise
*/
boolean isClosed();

}
@@ -0,0 +1,147 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.security.util.SslContextFactory;
import org.apache.nifi.serialization.RecordReaderFactory;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLEngine;
import java.io.Closeable;
import java.net.SocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Accepts connections on the given ServerSocketChannel and dispatches a SocketChannelRecordReader for processing.
*/
public class SocketChannelRecordReaderDispatcher implements Runnable, Closeable {

private final ServerSocketChannel serverSocketChannel;
private final SSLContext sslContext;
private final SslContextFactory.ClientAuth clientAuth;
private final int socketReadTimeout;
private final int receiveBufferSize;
private final int maxConnections;
private final RecordReaderFactory readerFactory;
private final BlockingQueue<SocketChannelRecordReader> recordReaders;
private final ComponentLog logger;

private final AtomicInteger currentConnections = new AtomicInteger(0);

private volatile boolean stopped = false;

public SocketChannelRecordReaderDispatcher(final ServerSocketChannel serverSocketChannel,
final SSLContext sslContext,
final SslContextFactory.ClientAuth clientAuth,
final int socketReadTimeout,
final int receiveBufferSize,
final int maxConnections,
final RecordReaderFactory readerFactory,
final BlockingQueue<SocketChannelRecordReader> recordReaders,
final ComponentLog logger) {
this.serverSocketChannel = serverSocketChannel;
this.sslContext = sslContext;
this.clientAuth = clientAuth;
this.socketReadTimeout = socketReadTimeout;
this.receiveBufferSize = receiveBufferSize;
this.maxConnections = maxConnections;
this.readerFactory = readerFactory;
this.recordReaders = recordReaders;
this.logger = logger;
}

@Override
public void run() {
while(!stopped) {
try {
final SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel == null) {
Thread.sleep(20);
continue;
}

final SocketAddress remoteSocketAddress = socketChannel.getRemoteAddress();
socketChannel.socket().setSoTimeout(socketReadTimeout);
socketChannel.socket().setReceiveBufferSize(receiveBufferSize);

if (currentConnections.incrementAndGet() > maxConnections){
currentConnections.decrementAndGet();
final String remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
logger.warn("Rejecting connection from {} because max connections has been met", new Object[]{remoteAddress});
IOUtils.closeQuietly(socketChannel);
continue;
}

if (logger.isDebugEnabled()) {
final String remoteAddress = remoteSocketAddress == null ? "null" : remoteSocketAddress.toString();
logger.debug("Accepted connection from {}", new Object[]{remoteAddress});
}

// create a StandardSocketChannelRecordReader or an SSLSocketChannelRecordReader based on presence of SSLContext
final SocketChannelRecordReader socketChannelRecordReader;
if (sslContext == null) {
socketChannelRecordReader = new StandardSocketChannelRecordReader(socketChannel, readerFactory, this);
} else {
final SSLEngine sslEngine = sslContext.createSSLEngine();
sslEngine.setUseClientMode(false);

switch (clientAuth) {
case REQUIRED:
sslEngine.setNeedClientAuth(true);
break;
case WANT:
sslEngine.setWantClientAuth(true);
break;
case NONE:
sslEngine.setNeedClientAuth(false);
sslEngine.setWantClientAuth(false);
break;
}

final SSLSocketChannel sslSocketChannel = new SSLSocketChannel(sslEngine, socketChannel);
socketChannelRecordReader = new SSLSocketChannelRecordReader(socketChannel, sslSocketChannel, readerFactory, this);
}

// queue the SocketChannelRecordReader for processing by the processor
recordReaders.offer(socketChannelRecordReader);

} catch (Exception e) {
logger.error("Error dispatching connection: " + e.getMessage(), e);
}
}
}

public int getPort() {
return serverSocketChannel == null ? 0 : serverSocketChannel.socket().getLocalPort();
}

@Override
public void close() {
this.stopped = true;
IOUtils.closeQuietly(this.serverSocketChannel);
}

public void connectionCompleted() {
currentConnections.decrementAndGet();
}

}

0 comments on commit b021ca8

Please sign in to comment.