Skip to content

Commit

Permalink
NIFI-8095: Addressed review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
markap14 committed Jan 4, 2021
1 parent d428f25 commit de26bd1
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 16 deletions.
Expand Up @@ -50,6 +50,7 @@
import java.util.regex.Pattern;

public class StatelessKafkaConnectorUtil {
private static final String UNKNOWN_VERSION = "<Unable to determine Stateless NiFi Kafka Connector Version>";
private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
private static final Lock unpackNarLock = new ReentrantLock();

Expand All @@ -59,6 +60,7 @@ public class StatelessKafkaConnectorUtil {
static final String KRB5_FILE = "krb5.file";
static final String NEXUS_BASE_URL = "nexus.url";
static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
static final String DATAFLOW_NAME = "name";

static final String TRUSTSTORE_FILE = "security.truststore";
static final String TRUSTSTORE_TYPE = "security.truststoreType";
Expand All @@ -67,10 +69,16 @@ public class StatelessKafkaConnectorUtil {
static final String KEYSTORE_TYPE = "security.keystoreType";
static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
static final String KEY_PASSWORD = "security.keyPasswd";
static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";

static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";

static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";

private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
Expand All @@ -83,6 +91,7 @@ public static void addCommonConfigElements(final ConfigDef configDef) {
"Specifies the temporary working directory for expanding NiFi Archives (NARs)");
configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new ConnectFileExistsOrUrlValidator(), ConfigDef.Importance.HIGH,
"Specifies the file containing the dataflow to run");
configDef.define(DATAFLOW_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "The name of the dataflow.");

configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
"Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
Expand All @@ -107,12 +116,14 @@ public static void addCommonConfigElements(final ConfigDef configDef) {
"The type of the Truststore file. Either JKS or PKCS12.");
configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
"The password for the truststore.");
configDef.define(SENSITIVE_PROPS_KEY, ConfigDef.Type.PASSWORD, DEFAULT_SENSITIVE_PROPS_KEY, ConfigDef.Importance.MEDIUM, "A key that components can use for encrypting and decrypting " +
"sensitive values.");
}

public static String getVersion() {
final File bootstrapJar = detectBootstrapJar();
if (bootstrapJar == null) {
return "<Unable to Stateless NiFi Kafka Connector Version>";
return UNKNOWN_VERSION;
}

try (final JarFile jarFile = new JarFile(bootstrapJar)) {
Expand All @@ -122,32 +133,32 @@ public static String getVersion() {
}
} catch (IOException e) {
logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
return "<Unable to Stateless NiFi Kafka Connector Version>";
return UNKNOWN_VERSION;
}

return "<Unable to Stateless NiFi Kafka Connector Version>";
return UNKNOWN_VERSION;
}

public static StatelessDataflow createDataflow(final Map<String, String> properties) {
final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);

final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
final String dataflowName = properties.get("name");
final String dataflowName = properties.get(DATAFLOW_NAME);

final DataflowDefinition<?> dataflowDefinition;
final StatelessBootstrap bootstrap;
try {
final Map<String, String> dataflowDefinitionProperties = new HashMap<>();

if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", configuredFlowSnapshot);
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
} else {
final File flowSnapshotFile = new File(configuredFlowSnapshot);
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.file", flowSnapshotFile.getAbsolutePath());
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
}

dataflowDefinitionProperties.put("nifi.stateless.flow.name", dataflowName);
dataflowDefinitionProperties.put(BOOTSTRAP_FLOW_NAME, dataflowName);

MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));

Expand Down Expand Up @@ -197,6 +208,12 @@ private static List<ParameterOverride> parseParameterOverrides(final Map<String,
return parameterOverrides;
}

public static Map<String, String> getLoggableProperties(final Map<String, String> properties) {
final Map<String, String> loggable = new HashMap<>(properties);
loggable.keySet().removeIf(key -> key.startsWith("parameter."));
return loggable;
}

private static StatelessEngineConfiguration createEngineConfiguration(final Map<String, String> properties) {
final File narDirectory;
final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
Expand Down Expand Up @@ -239,7 +256,7 @@ public SslContextDefinition getSslContext() {

@Override
public String getSensitivePropsKey() {
return "nifi-stateless";
return properties.getOrDefault(SENSITIVE_PROPS_KEY, DEFAULT_SENSITIVE_PROPS_KEY);
}

@Override
Expand Down
Expand Up @@ -68,12 +68,12 @@ public String version() {

@Override
public void start(final Map<String, String> properties) {
logger.info("Starting Sink Task with properties {}", properties);
logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));

final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);

dataflowName = properties.get("name");
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);

final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
headerNameRegex = regex == null ? null : Pattern.compile(regex);
Expand All @@ -85,7 +85,7 @@ public void start(final Map<String, String> properties) {
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);

// Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException.
final String dataflowName = properties.get("name");
final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
inputPortName = properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
if (inputPortName == null) {
final Set<String> inputPorts = dataflow.getInputPortNames();
Expand Down Expand Up @@ -255,7 +255,7 @@ private byte[] getContents(final Object value) {
}

private Map<String, String> createAttributes(final SinkRecord record) {
final Map<String, String> attributes = new HashMap<>(8);
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", record.topic());
attributes.put("kafka.offset", String.valueOf(record.kafkaOffset()));
attributes.put("kafka.partition", String.valueOf(record.kafkaPartition()));
Expand Down
Expand Up @@ -64,7 +64,7 @@ public String version() {

@Override
public void start(final Map<String, String> properties) {
logger.info("Starting Source Task with properties {}", properties);
logger.info("Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));

final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
Expand All @@ -83,7 +83,7 @@ public void start(final Map<String, String> properties) {
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);

// Determine the name of the Output Port to retrieve data from
dataflowName = properties.get("name");
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
if (outputPortName == null) {
final Set<String> outputPorts = dataflow.getOutputPortNames();
Expand Down
Expand Up @@ -56,6 +56,8 @@

public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionParser {
private static final Logger logger = LoggerFactory.getLogger(PropertiesFileFlowDefinitionParser.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final Pattern PROPERTY_LINE_PATTERN = Pattern.compile("(.*?)(?<!\\\\)=(.*)");
// parameter context pattern starts with "nifi.stateless.parameters." followed by the name of a parameter context.
// After the name of the parameter context, it may or may not have a ".<parameter name>" component, then an equals (=) and a value.
Expand Down Expand Up @@ -363,8 +365,7 @@ private VersionedFlowSnapshot fetchFlowFromUrl(final String url, final SslContex
}

try {
final ObjectMapper objectMapper = new ObjectMapper();
final VersionedFlowSnapshot snapshot = objectMapper.readValue(responseBody.bytes(), VersionedFlowSnapshot.class);
final VersionedFlowSnapshot snapshot = OBJECT_MAPPER.readValue(responseBody.bytes(), VersionedFlowSnapshot.class);
return snapshot;
} catch (final Exception e) {
throw new IOException("Downloaded flow from " + url + " but failed to parse the contents as a Versioned Flow. Please verify that the correct URL was provided.", e);
Expand Down

0 comments on commit de26bd1

Please sign in to comment.