New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-8095: Created StatelessNiFi Sink Connector and Source Connector.… #4730
Conversation
9b0dd7a
to
5c54a87
Compare
return "<Unable to Stateless NiFi Kafka Connector Version>"; | ||
} | ||
|
||
return "<Unable to Stateless NiFi Kafka Connector Version>"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method could be refactored to declare this string as the default return value, updated when the Manifest Implementation-Version
is available, and then have a single return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. I actually think pulling it out into a variable makes sense, rather than trying to simplify to a single return. But it's good to avoid the repetition. Thanks.
final Map<String, String> dataflowDefinitionProperties = new HashMap<>(); | ||
|
||
if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) { | ||
dataflowDefinitionProperties.put("nifi.stateless.flow.snapshot.url", configuredFlowSnapshot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this property key be declared as a static variable?
|
||
@Override | ||
public String getSensitivePropsKey() { | ||
return "nifi-stateless"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be possible to override this value using configuration properties?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I forgot to come back to that. Thanks.
if (bootstrapJar == null) { | ||
final URLClassLoader urlClassLoader = getConnectClassLoader(); | ||
logger.error("ClassLoader that loaded Stateless Kafka Connector did not contain nifi-stateless-bootstrap. URLs that were present: {}", Arrays.asList(urlClassLoader.getURLs())); | ||
throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message appears to be the same as the one used in getConnectClassLoader
. The message could be declared once, or this message could be adjusted.
|
||
@Override | ||
public void start(final Map<String, String> properties) { | ||
logger.info("Starting Sink Task with properties {}", properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging all properties will include store passwords and other potentially sensitive parameters. Perhaps logging a subset of the standard parameters would be a safer approach to avoid writing sensitive information to logs. Otherwise, some type of filtering should be implemented prior to passing properties for logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only thing that should be sensitive is parameters. Will update it to avoid logging any parameters.
final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT); | ||
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS); | ||
|
||
dataflowName = properties.get("name"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name
property is used in several places and could be declared as a static variable.
} | ||
|
||
private Map<String, String> createAttributes(final SinkRecord record) { | ||
final Map<String, String> attributes = new HashMap<>(8); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a particular reason for declaring an initial size for the HashMap?
|
||
@Override | ||
public void start(final Map<String, String> properties) { | ||
logger.info("Starting Source Task with properties {}", properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment on logging properties for the Sink Task.
} | ||
|
||
try { | ||
final ObjectMapper objectMapper = new ObjectMapper(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ObjectMapper
is thread-safe and could be declared as an instance variable.
Thanks @exceptionfactory for reviewing. Pushed a new commit that I believe addresses all feedback above. |
} | ||
|
||
|
||
private SourceRecord createSourceRecord(final FlowFile flowFile, final byte[] contents) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How exactly do we track the progress of copying data? AFAIK, in the Connect framework the usually abstracted away concepts of topic-partition and offset is used for that. If connector is rebalanced, the abstract topic-partition and offset is used to determine where the copy should continue from. Here partition and offset seem to be abandoned, or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a very good point. I wanted to revisit this but had forgot about it. Will look into how to integrate this well with NiFi StateProvider mechanism so that we can store this appropriately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Found some thread handling related issues. Also interested in the point raised by @heritamas
logger.debug("Due to previous failure, will wait {} millis before executing dataflow", backoffMillis); | ||
|
||
try { | ||
Thread.sleep(backoffMillis); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, backoff should be done by throwing a RetriableException - this method isn't supposed to block.
org.apache.kafka.connect.sink.SinkTaskContext#timeout can be used to define the delay for the retry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Will address.
"header whose key matches the regular expression will be added to the FlowFile as an attribute. The name of the attribute will match the header key (with an optional prefix, as " + | ||
"defined by the attribute.prefix configuration) and the header value will be added as the attribute value."); | ||
configDef.define(HEADER_ATTRIBUTE_NAME_PREFIX, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, | ||
"A prefix to add to the key of each header that matches the headers.as.attributes.regex Regular Expression. For example, if a header has the ke MyHeader and a value of " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: "if a header has the key"
final Map<String, String> attributes = createAttributes(record); | ||
final byte[] contents = getContents(record.value()); | ||
|
||
queueSize = dataflow.enqueue(contents, attributes, inputPortName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the method threw a RetriableException previously, the framework will repeat the previous call with the same records - those records are enqueued here repeatedly. Is this correct from the dataflow's point of view? What happens with the previously enqueued, but not yet triggered records?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. We should be calling dataflow.purge()
if we're going to re-queue the data
verifyOutputPortContents(trigger, result); | ||
|
||
// Acknowledge the data so that the session can be committed | ||
result.acknowledge(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this point, org.apache.kafka.connect.sink.SinkTaskContext#requestCommit should be invoked to try to minimize reprocessing.
final DataflowTrigger trigger = dataflow.trigger(); | ||
|
||
try { | ||
final Optional<TriggerResult> resultOptional = trigger.getResult(timeoutMillis, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will block put, but put is expected to send async. I think the trigger should be reworked to be done on a background thread to not block the thread of put.
Not completely sure about this, but also think that the retry of the trigger should be separate from the retry of put. (e.g. put should be retried if the buffer in the dataflow is full, trigger should be retried if the external system had issues).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will address.
topic = attributeValue == null ? topicName : attributeValue; | ||
} | ||
|
||
final List<Header> headers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of a list with a custom Header implementation, org.apache.kafka.connect.header.ConnectHeaders can be used. That is capable of instantiating the built-in ConnectHeader class, and also implements Iterable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I looked for this but somehow i couldn't find the ConnectHeader class in my IDE. Was able to find it now, since I know what to look for. Will use this. Thanks!
} | ||
|
||
@Override | ||
public void commitRecord(final SourceRecord record, final RecordMetadata metadata) throws InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is usually called by a producer network thread when the record batch was ACK'd by the broker.
It is possible that poll is called multiple times before the commitRecord is invoked with the records returned by the first call of poll. Because of this, the result cached in triggerResult might be overwritten, and never get acknowledged.
The task might need to track multiple TriggerResults to be able to meaningfully acknowledge them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stateless framework is going to prevent a second invocation until the previous one completes. But I can easily determine if the previous invocation is still working and if so, return null
from poll().
public List<Map<String, String>> taskConfigs(final int maxTasks) { | ||
final List<Map<String, String>> configs = new ArrayList<>(); | ||
for (int i=0; i < maxTasks; i++) { | ||
configs.add(new HashMap<>(properties)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At this phase, the connector has to have some information about the input data, and partition that between the created tasks. Not sure how hard it is, or if the stateless nifi flows can coordinate under the hood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In "NiFi proper" we have load-balanced connections that would allow the nodes to distribute the data between themselves to load-balance. We don't have that currently in Stateless NiFi. Not sure it would make sense to provide that in Stateless, given how it all works. I think for the moment, when interacting with a source that does not itself provide queuing/scaling semantics, we would just have to rely on the deployer making sure that the 'unit of work' that they deploy can be accomplished by a single task. There may be improvements that can be made to this in the future though.
|
||
public class StatelessNiFiSourceConnector extends SourceConnector { | ||
static final String OUTPUT_PORT_NAME = "output.port"; | ||
static final String TOPIC_NAME = "topic.name"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be consistent with SinkConnector.TOPICS_CONFIG and to allow SMM UI to be able to pick up the topic names could this be changed to "topics" instead?
Pushed an update that I think should address all concerns above. Many thanks for the review & guidance @urbandan @heritamas and @in-park! |
@urbandan @heritamas let me know if there's anything else that you noticed that I missed. Otherwise, I think we can probably get this merged in shortly. |
@markap14 Sorry I haven't managed to finish reviewing yet, but I think there are a few issues we should discuss - will try to complete the review ASAP |
… Minor updates to stateless nifi api to accommodate.
…ate management, improved tests, general code cleanup
|
||
logger.debug("Triggering dataflow in background thread"); | ||
|
||
backgroundTriggerExecutor.submit(this::triggerDataflow); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed offline, would be preferable to do the trigger in flush, and only enqueue in put.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do.
primaryNodeTask = "0".equals(taskIndex); | ||
|
||
if (primaryNodeOnly && !primaryNodeTask) { | ||
logger.warn("Configured Dataflow ({}) requires that the source be run only on the Primary Node, but the Connector is configured with more than one task. The dataflow will only be run by" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task should only run in a single instance, the connector should only create a single task configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it didn't occur to me that that's an option - to create fewer task configs than maxTasks
. Will do. Great catch!
final long unacked = unacknowledgedRecords.decrementAndGet(); | ||
logger.debug("SourceRecord {} committed; number of unacknowledged FlowFiles is now {}", record, unacked); | ||
|
||
if (unacked < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is usually invoked from another thread, there can be an edge case where:
- the unacknowledgedRecords is already decremented, making it 0, but the result is still not acknowledged
- poll is invoked at this point on the task thread, and we trigger the dataflow while the previous result is still not acknowledged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this scenario is okay. When that happens, the call to dataflow.trigger()
will happen. This will return a DataflowTriggerResult
immediately, but the call to DataflowTriggerResult.getResult()
will block until the dataflow completes in the background. But the dataflow won't actually run in the background until the previous one is acknowledged. So the second triggering of the dataflow doesn't actually happen until the previous one has been acknowledged.
Unless i'm overlooking something on the connect side that makes this problematic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, makes sense
|
||
for (final FlowFile flowFile : outputFlowFiles) { | ||
final byte[] contents = triggerResult.readContent(flowFile); | ||
final SourceRecord sourceRecord = createSourceRecord(flowFile, contents, componentState, partitionMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, it's preferable to only set the latest offset (component state) for the last record of the batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markap14, I misunderstood the offset commit flow of the source tasks. No need for this change, all of the records can have the same component state in their header. The framework will wait for all messages in the batch to be ack'd by kafka before committing the offsets.
final long unacked = unacknowledgedRecords.decrementAndGet(); | ||
logger.debug("SourceRecord {} committed; number of unacknowledged FlowFiles is now {}", record, unacked); | ||
|
||
if (unacked < 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation, makes sense
} | ||
|
||
@Override | ||
public void reconfigure(final Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't override reconfigure - the default implementation calls stop and start on the connector.
If you do override it, you need to apply the new configurations - so everything done in start needs to be repeated here.
|
||
for (final FlowFile flowFile : outputFlowFiles) { | ||
final byte[] contents = triggerResult.readContent(flowFile); | ||
final SourceRecord sourceRecord = createSourceRecord(flowFile, contents, componentState, partitionMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@markap14, I misunderstood the offset commit flow of the source tasks. No need for this change, all of the records can have the same component state in their header. The framework will wait for all messages in the batch to be ack'd by kafka before committing the offsets.
outputPortName = outputPorts.iterator().next(); | ||
} | ||
|
||
final String taskIndex = properties.get(STATE_MAP_KEY); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the source topic partition is identified by the task index, is that correct?
My question is, what happens if we start a connector with 3 tasks, it starts working and committing offsets with task ids 0, 1, 2. Then, we reconfigure the connector, and reduce the number of tasks to 2. Now we have task ids 0, 1. What happens with the committed offsets of task no. 2? Can the remaining 2 tasks pick up the work which was originally done by task no. 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. So in NiFi, processors can store either 'local' state or 'cluster-wide' state. Typically local state would be used if tailing a file on the local file system or something like that. In that case, it would map to the task id. If you want from 3 tasks to 2, that's fine. The state/progress would be lost so there may be some data duplication. But tailing a file in a local file system wouldn't really be a great use case for connect since the task could be potentially started elsewhere, etc. For the most part local state won't be used, though.
Cluster-wide state would be used if gathering data from S3 or a GCS bucket, for example. In that case, you'd likely only use a single task due to the fact that the protocol itself doesn't offer queuing semantics so it's difficult to scale. But for cases where cluster-wide state is used by the Processor, the state is instead stored here using the clusterStatePartitionMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I see - then I don't really see the added value of the local state, and seems like a good way of shooting yourself in the leg when configuring the connector
…stored state to Kafka Connect's SourceRecord we don't persist state before committing all records in that batch. Ensure that Sink Connector is only triggered from the flush() method. If more than one task is allowed for a source connector but the source Processor is Primary Node Only that we only schedule a single task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM from Connect's perspective, really cool PR
Thanks for sticking with me as I learn the connect API @urbandan and for all of the fantastic feedback! |
awesome work and discussion and review here. this is going to be awesome. I'm a +1 based on watching this engagement and reviewing the changes. mark please self merge as I cannot get to that part at this time. |
… Minor updates to stateless nifi api to accommodate.
Thank you for submitting a contribution to Apache NiFi.
Please provide a short description of the PR here:
Description of PR
Enables X functionality; fixes bug NIFI-YYYY.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically
main
)?Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not
squash
or use--force
when pushing to allow for clean monitoring of changes.For code changes:
mvn -Pcontrib-check clean install
at the rootnifi
folder?LICENSE
file, including the mainLICENSE
file undernifi-assembly
?NOTICE
file, including the mainNOTICE
file found undernifi-assembly
?.displayName
in addition to .name (programmatic access) for each of the new properties?For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.