Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ public class NiFiProperties extends ApplicationProperties {
public static final String FLOWFILE_SWAP_MANAGER_IMPLEMENTATION = "nifi.swap.manager.implementation";
public static final String QUEUE_SWAP_THRESHOLD = "nifi.queue.swap.threshold";

// bulk replay properties
public static final String BULK_REPLAY_MAX_CONCURRENT = "nifi.bulk.replay.max.concurrent";
public static final int DEFAULT_BULK_REPLAY_MAX_CONCURRENT = 5;
public static final String BULK_REPLAY_MAX_JOBS = "nifi.bulk.replay.max.jobs";
public static final int DEFAULT_BULK_REPLAY_MAX_JOBS = 50;
public static final String BULK_REPLAY_NODE_DISCONNECT_TIMEOUT = "nifi.bulk.replay.node.disconnect.timeout";
public static final String DEFAULT_BULK_REPLAY_NODE_DISCONNECT_TIMEOUT = "5 mins";

// provenance properties
public static final String PROVENANCE_REPO_IMPLEMENTATION_CLASS = "nifi.provenance.repository.implementation";
public static final String PROVENANCE_REPO_DIRECTORY_PREFIX = "nifi.provenance.repository.directory.";
Expand Down Expand Up @@ -503,6 +511,19 @@ public int getQueueSwapThreshold() {
}
}

public int getBulkReplayMaxConcurrent() {
return getIntegerProperty(BULK_REPLAY_MAX_CONCURRENT, DEFAULT_BULK_REPLAY_MAX_CONCURRENT);
}

public int getBulkReplayMaxJobs() {
return getIntegerProperty(BULK_REPLAY_MAX_JOBS, DEFAULT_BULK_REPLAY_MAX_JOBS);
}

public String getBulkReplayNodeDisconnectTimeout() {
final String value = getProperty(BULK_REPLAY_NODE_DISCONNECT_TIMEOUT);
return (value == null || value.isBlank()) ? DEFAULT_BULK_REPLAY_NODE_DISCONNECT_TIMEOUT : value.trim();
}

public Integer getIntegerProperty(final String propertyName, final Integer defaultValue) {
final String value = getProperty(propertyName);
if (value == null || value.isBlank()) {
Expand Down
19 changes: 19 additions & 0 deletions nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3149,6 +3149,25 @@ may increase the rate at which the Provenance Repository is able to process thes
|`nifi.provenance.repository.buffer.size`|The Provenance Repository buffer size. The default value is `100000` provenance events.
|====

[[bulk_replay_properties]]
=== Bulk Replay Properties

The following properties control the server-side behavior of the Bulk Replay feature (see link:user-guide.html#bulk_replay[Bulk Replay] in the User Guide).
Bulk replay jobs are tracked in memory and are lost when the entire cluster is restarted. In a cluster, jobs execute on the primary node; if the primary node is lost, interrupted jobs are automatically resumed by the new primary.

|====
|*Property*|*Description*
|`nifi.bulk.replay.max.concurrent`|The number of worker threads available for executing bulk replay jobs.
Each thread handles one job at a time, so this value caps the number of jobs that can run concurrently.
Raising this value allows more jobs to execute in parallel but increases load on the node.
The default value is `5`.
|`nifi.bulk.replay.max.jobs`|The maximum number of job summaries retained in memory.
When this limit is exceeded, the oldest terminal (completed, partial success, failed, or cancelled) job is automatically evicted. Active jobs (queued, running, or interrupted) are never evicted.
The default value is `50`.
|`nifi.bulk.replay.node.disconnect.timeout`|When a replay item's content resides on a cluster node that is currently disconnected, the worker waits up to this duration for the node to reconnect before failing the item.
The value is a NiFi duration string (e.g., `5 mins`, `30 secs`).
The default value is `5 mins`.
|====

=== Status History Repository

Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
63 changes: 63 additions & 0 deletions nifi-docs/src/main/asciidoc/user-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ NOTE: For Processors, Ports, Remote Process Groups, Connections and Labels, it i
- *Enable* or *Disable*: This option allows the user to enable or disable a Processor; the option will be either Enable or Disable, depending on the current state of the Processor.
- *View data provenance*: This option displays the NiFi Data Provenance table, with information about data provenance events for the FlowFiles routed through that Processor (see <<data_provenance>>).
- *Replay last event*: This option will replay the last Provenance event, effectively requeuing the last FlowFile that was processed by the Processor (see <<replay_flowfile>>).
- *Bulk Replay*: This option opens a provenance search dialog scoped to the selected Processor, allowing the user to select multiple events and replay them all at once (see <<bulk_replay>>).
- *View status history*: This option opens a graphical representation of the Processor's statistical information over time.
- *View usage*: This option takes the user to the Processor's usage documentation.
- *View connections->Upstream*: This option allows the user to see and "jump to" upstream connections that are coming into the Processor. This is particularly useful when processors connect into and out of other Process Groups.
Expand Down Expand Up @@ -2963,6 +2964,68 @@ item. From here, the user can choose to either replay the last event from just t

image:event-content.png["Event Content", width=700]

[[bulk_replay]]
=== Bulk Replay

Bulk Replay allows a user to replay multiple provenance events for a Processor in a single operation. This is useful when a configuration error or downstream failure has caused a batch of FlowFiles to be processed incorrectly, and those FlowFiles need to be requeued for reprocessing.

==== Starting a Bulk Replay

To start a Bulk Replay, right-click on a Processor and select *Bulk Replay* from the context menu.

image:bulkReplayProcessorMenu.png["Bulk Replay context menu option"]

This opens a provenance search dialog scoped to the selected Processor. Use the date range, FlowFile attribute, and other search filters to locate the events to be replayed, then select the desired events and confirm. You can also provide a job name to help identify the replay later; if left blank, the name defaults to the processor name and submission timestamp.

image:bulkReplaySelectWindow.png["Bulk Replay event selection dialog", width=900]

Once submitted, the replay job is created on the server and appears in the *Bulk Replay Status* page, accessible from the Global Menu.

==== Bulk Replay Status Page

The Bulk Replay Status page shows a table of all replay jobs known to the NiFi instance.

image:bulkReplayStatusWindow.png["Bulk Replay Status page", width=900]

For each job the table displays:

- *Job Name* — the name of the job (defaults to the processor name and timestamp if not specified).
- *Submitted* — the time the job was submitted.
- *Submitted By* — the user who submitted the job.
- *Processor Name* — the name of the Processor whose events are being replayed.
- *Processor Type* — the type of the Processor (e.g., `GenerateFlowFile`).
- *Status* — the current job status (`QUEUED`, `RUNNING`, `COMPLETED`, `PARTIAL_SUCCESS`, `CANCELLED`, `FAILED`, or `INTERRUPTED`).
- *Progress* — a progress bar shown while the job is running.
- *Total* — total number of events in the job.
- *Replayed* — number of events successfully replayed.
- *Failed* — number of events that could not be replayed.

The table supports filtering by job name, processor name, processor type, status, or submitted by, and all columns are sortable. An auto-refresh toggle (default on, 5-second interval) keeps the counts and progress current while jobs are running.

The *View Details* button opens a dialog showing per-event results, including the event ID, FlowFile UUID, event time, replay status, and any failure reason. This dialog also supports filtering and auto-refresh. The *Go to Processor* button navigates directly to the Processor on the canvas.

image:bulkReplayViewDetails.png["Bulk Replay job detail dialog", width=900]

Running or queued jobs can be cancelled using the cancel button in the actions column. Completed jobs can be removed from the list using the eraser icon, which offers the option to clear successful jobs only, all finished jobs, or all jobs.

==== Cluster Behavior

In a clustered environment, bulk replay jobs execute on the primary node. When a job is submitted, it is replicated to all nodes so every node has a local copy, but only the primary node starts the worker thread. Requests for job listings, details, and item-level status are forwarded to the primary node. Cancel and delete operations are replicated to all nodes.

If the primary node is lost while a job is running, the worker detects the loss of primary role and sets the job status to `INTERRUPTED`. When the new primary node is elected, interrupted jobs are automatically detected and restarted. Because replay creates new FlowFiles, restarting a job from scratch is safe.

If a replay item's content resides on a cluster node that is currently disconnected, the worker waits for the node to reconnect (up to the timeout configured by `nifi.bulk.replay.node.disconnect.timeout`, default 5 minutes). If the node does not reconnect in time, items on that node are marked as failed. See the link:administration-guide.html#bulk_replay_properties[Bulk Replay Properties] section for details.

==== Concurrency

The number of jobs that can execute at the same time is controlled by the `nifi.bulk.replay.max.concurrent` property in `nifi.properties` (default 5). Each worker thread processes one job at a time, replaying items sequentially within the job. See the link:administration-guide.html#bulk_replay_properties[Bulk Replay Properties] section of the System Administrator's Guide for details.

==== Persistence

Bulk replay jobs are held in memory by default and are lost when NiFi is restarted. The maximum number of jobs retained is controlled by the `nifi.bulk.replay.max.jobs` property (default 50). When the limit is reached, the oldest terminal (completed, partial success, failed, or cancelled) job is automatically removed. Active jobs (queued, running, or interrupted) are never evicted.

If the entire cluster is restarted, any in-progress replay will not complete and pending items will not be dispatched. Completed job results should be noted before restarting if they are needed. However, if only the primary node is lost and a new primary is elected, interrupted jobs are automatically resumed (see Cluster Behavior above).

=== Viewing FlowFile Lineage

It is often useful to see a graphical representation of the lineage or path a FlowFile took within the dataflow. To see a FlowFile's lineage,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.bulkreplay;

public enum BulkReplayItemStatus {
QUEUED,
RUNNING,
SUCCEEDED,
FAILED,
SKIPPED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.bulkreplay;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;

import java.util.List;

/**
* Submission body for a bulk replay job. Extends the summary with the list of items to replay.
* The {@code items} field is populated by the client on POST and is absent in GET responses
* (items are retrieved separately via {@code GET /bulk-replay/jobs/{id}/items}).
*/
@XmlType(name = "bulkReplayJobDetail")
public class BulkReplayJobDetailDTO extends BulkReplayJobSummaryDTO {

private List<BulkReplayJobItemDTO> items;

@Schema(description = "Items to replay. Populated on submission; absent in responses.")
public List<BulkReplayJobItemDTO> getItems() {
return items;
}

public void setItems(List<BulkReplayJobItemDTO> items) {
this.items = items;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.web.api.dto.bulkreplay;

import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.xml.bind.annotation.XmlType;

/**
* A single item in a bulk replay job. On submission, only the provenance-related fields are
* required. Server-assigned fields (itemId, itemIndex, status, times) are populated in responses.
*/
@XmlType(name = "bulkReplayJobItem")
public class BulkReplayJobItemDTO {

// --- Server-assigned ---
private String itemId;
private Integer itemIndex;
private BulkReplayItemStatus status;
private String errorMessage;
private String startTime;
private String endTime;
private String lastUpdated;

// --- Submitted by client ---
private Long provenanceEventId;
private String clusterNodeId;
private String flowFileUuid;
private String eventType;
private String eventTime;
private String componentName;

private Long fileSizeBytes;

@Schema(description = "Server-assigned unique id of this item.")
public String getItemId() {
return itemId;
}

public void setItemId(String itemId) {
this.itemId = itemId;
}

@Schema(description = "Zero-based index of this item within the job.")
public Integer getItemIndex() {
return itemIndex;
}

public void setItemIndex(Integer itemIndex) {
this.itemIndex = itemIndex;
}

@Schema(description = "Current status of this item.")
public BulkReplayItemStatus getStatus() {
return status;
}

public void setStatus(BulkReplayItemStatus status) {
this.status = status;
}

@Schema(description = "Error message if replay failed, null otherwise.")
public String getErrorMessage() {
return errorMessage;
}

public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}

@Schema(description = "Time at which replay of this item began.", type = "string")
public String getStartTime() {
return startTime;
}

public void setStartTime(String startTime) {
this.startTime = startTime;
}

@Schema(description = "Time at which replay of this item completed.", type = "string")
public String getEndTime() {
return endTime;
}

public void setEndTime(String endTime) {
this.endTime = endTime;
}

@Schema(description = "Time at which this record was last updated.", type = "string")
public String getLastUpdated() {
return lastUpdated;
}

public void setLastUpdated(String lastUpdated) {
this.lastUpdated = lastUpdated;
}

@Schema(description = "The numeric id of the provenance event to replay.")
public Long getProvenanceEventId() {
return provenanceEventId;
}

public void setProvenanceEventId(Long provenanceEventId) {
this.provenanceEventId = provenanceEventId;
}

@Schema(description = "The cluster node id that owns the provenance event. Null in standalone mode.")
public String getClusterNodeId() {
return clusterNodeId;
}

public void setClusterNodeId(String clusterNodeId) {
this.clusterNodeId = clusterNodeId;
}

@Schema(description = "The UUID of the FlowFile associated with this event.")
public String getFlowFileUuid() {
return flowFileUuid;
}

public void setFlowFileUuid(String flowFileUuid) {
this.flowFileUuid = flowFileUuid;
}

@Schema(description = "The type of the provenance event.")
public String getEventType() {
return eventType;
}

public void setEventType(String eventType) {
this.eventType = eventType;
}

@Schema(description = "The timestamp of the provenance event.")
public String getEventTime() {
return eventTime;
}

public void setEventTime(String eventTime) {
this.eventTime = eventTime;
}

@Schema(description = "The name of the component that generated the provenance event.")
public String getComponentName() {
return componentName;
}

public void setComponentName(String componentName) {
this.componentName = componentName;
}

@Schema(description = "The size of the FlowFile content in bytes. Used to determine if replay can proceed when a cluster node is disconnected.")
public Long getFileSizeBytes() {
return fileSizeBytes;
}

public void setFileSizeBytes(Long fileSizeBytes) {
this.fileSizeBytes = fileSizeBytes;
}
}
Loading
Loading