HDDS-14913. Implement Scalable CSV Export for Unhealthy Containers in Recon UI.#9994
HDDS-14913. Implement Scalable CSV Export for Unhealthy Containers in Recon UI.#9994ArafatKhan2198 wants to merge 6 commits intoapache:masterfrom
Conversation
|
Thanks for adding this @ArafatKhan2198. Do we need the new configuration |
Makes sense! |
|
Hey @errose28 could you please review the patch again! |
devmadhuu
left a comment
There was a problem hiding this comment.
@ArafatKhan2198 , thanks for the patch. Kindly see the comments.
| @Produces("text/csv") | ||
| public Response exportUnhealthyContainers( | ||
| @QueryParam("state") String state, | ||
| @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit) { |
There was a problem hiding this comment.
There should not be any limit for export. Export option is for full container export for a given state. During real clusters, we have found that for a given state, there can be upto 4M containers per state sometimes, so in such unbalanced cluster, need export all for that state.
There was a problem hiding this comment.
Thanks Devesh! The current implementation already supports this. limit=0 (the default) means unlimited — no cap is applied. The UI sends limit=0 when the user clicks export, so all records for that state are exported. The limit parameter exists only as an optional safety valve if someone wants to test with fewer records via direct API call, but the default behavior is full export.
There was a problem hiding this comment.
We need async way , solution is not scalable for full export
| try (Cursor<UnhealthyContainersRecord> cursor = | ||
| containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit)) { | ||
|
|
||
| PrintWriter writer = new PrintWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
One single csv may not be sufficient enough in large scale unbalanced cluster. Our all solutions should be designed by keeping scalability in mind. Break them into multiple shards (parts) with multiple csv files for a given state. Let the below export logic work in async way, once all export of all shards are completed, it should put some flag or indicator to detect the completion. So that async polling request from UI to API will know and allow to download the exported files in a compressed format.
| * @return Cursor returning UnhealthyContainersRecord | ||
| */ | ||
| public Cursor<UnhealthyContainersRecord> getUnhealthyContainersCursor( | ||
| UnHealthyContainerStates state, int limit) { |
There was a problem hiding this comment.
During test it was found that for 1M to fetch from table, it may take upto ~300 to 400 msec, but for 4M records for same state my take up to little longer. Time not exactly proportional because indexes are defined in UNHEALTHY_CONTAINER table, but we still cannot tell the time in deterministic way as external factors like disk, network also involved. So I would suggest to always implement the API in async way. User action will trigger the request and UI will continue to poll till results are available and once available and all exported, it will be downloaded in compressed format having all shards(parts). Please check my other comment to mark the export done/completed.
| "ozone.recon.scm.container.id.batch.size"; | ||
| public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; | ||
|
|
||
|
|
There was a problem hiding this comment.
This change is unnecessary. Revert it.
There was a problem hiding this comment.
Pull request overview
Implements a scalable CSV export for unhealthy container records in Recon by adding a UI export control and a backend streaming CSV endpoint intended to avoid large in-memory result sets.
Changes:
- Added “Export CSV” UI controls (limit selector + button) on the Containers page.
- Added a new backend export API that streams unhealthy container rows from the DB using a jOOQ cursor and JAX-RS
StreamingOutput. - Added cursor-based query method for unhealthy containers in the schema manager.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
.../containers.tsx |
Adds export limit selector + button and constructs export URL per selected unhealthy state tab. |
.../containers.less |
Styles the new actions section alongside search/filters. |
.../ContainerHealthSchemaManager.java |
Introduces a fetchLazy() cursor query for streaming unhealthy containers. |
.../ContainerEndpoint.java |
Adds /containers/unhealthy/export endpoint streaming CSV output from a DB cursor. |
.../ReconServerConfigKeys.java |
PR claims a new export cap config, but this file currently only changes whitespace. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -61,6 +67,7 @@ | |||
| import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; | |||
| import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo; | |||
| import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup; | |||
| import org.apache.hadoop.ozone.recon.ReconServerConfigKeys; | |||
| import org.apache.hadoop.ozone.recon.ReconUtils; | |||
There was a problem hiding this comment.
OzoneConfiguration and ReconServerConfigKeys are imported but never used in this class, which will fail compilation (unused import). Either remove these imports or use them to apply the CSV export max-records configuration described in the PR.
| @GET | ||
| @Path("/unhealthy/export") | ||
| @Produces("text/csv") | ||
| public Response exportUnhealthyContainers( | ||
| @QueryParam("state") String state, | ||
| @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit) { | ||
|
|
||
| ContainerSchemaDefinition.UnHealthyContainerStates internalState = null; | ||
| if (StringUtils.isNotEmpty(state)) { | ||
| try { | ||
| internalState = ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state); | ||
| } catch (IllegalArgumentException e) { | ||
| throw new WebApplicationException(e, Response.Status.BAD_REQUEST); | ||
| } | ||
| } | ||
|
|
||
| final ContainerSchemaDefinition.UnHealthyContainerStates filterState = internalState; | ||
|
|
||
| StreamingOutput stream = outputStream -> { | ||
| try (BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024); | ||
| Cursor<UnhealthyContainersRecord> cursor = | ||
| containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit)) { | ||
|
|
||
| PrintWriter writer = new PrintWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8)); | ||
|
|
||
| // Write CSV header | ||
| writer.println("container_id,container_state,in_state_since," + | ||
| "expected_replica_count,actual_replica_count,replica_delta"); | ||
|
|
||
| StringBuilder sb = new StringBuilder(128); | ||
| while (cursor.hasNext()) { | ||
| UnhealthyContainersRecord rec = cursor.fetchNext(); | ||
| sb.setLength(0); | ||
| sb.append(rec.getContainerId()).append(',') | ||
| .append(rec.getContainerState()).append(',') | ||
| .append(rec.getInStateSince()).append(',') | ||
| .append(rec.getExpectedReplicaCount()).append(',') | ||
| .append(rec.getActualReplicaCount()).append(',') | ||
| .append(rec.getReplicaDelta()); | ||
| writer.println(sb); | ||
| } | ||
| writer.flush(); | ||
| } catch (Exception e) { | ||
| LOG.error("Error streaming unhealthy containers CSV", e); | ||
| throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); | ||
| } | ||
| }; | ||
|
|
||
| String filename = String.format("unhealthy_containers_%s_%d.csv", | ||
| state != null ? state.toLowerCase() : "all", | ||
| System.currentTimeMillis()); | ||
|
|
||
| return Response.ok(stream) | ||
| .header("Content-Disposition", "attachment; filename=\"" + filename + "\"") | ||
| .build(); | ||
| } |
There was a problem hiding this comment.
This endpoint currently allows truly unbounded exports when limit=0 (and also for negative limits), but the PR description calls out a hard cap via ozone.recon.csv.export.max.records (default 5,000,000). Please enforce a configured maximum here (e.g., clamp limit to min(requestedLimitOrMax, configuredMax) and treat 0 as “use configured max”), otherwise a single request can tie up a DB connection / request thread for an arbitrarily long time.
| StreamingOutput stream = outputStream -> { | ||
| try (BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024); | ||
| Cursor<UnhealthyContainersRecord> cursor = | ||
| containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit)) { | ||
|
|
||
| PrintWriter writer = new PrintWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8)); | ||
|
|
||
| // Write CSV header | ||
| writer.println("container_id,container_state,in_state_since," + | ||
| "expected_replica_count,actual_replica_count,replica_delta"); | ||
|
|
||
| StringBuilder sb = new StringBuilder(128); | ||
| while (cursor.hasNext()) { | ||
| UnhealthyContainersRecord rec = cursor.fetchNext(); | ||
| sb.setLength(0); | ||
| sb.append(rec.getContainerId()).append(',') | ||
| .append(rec.getContainerState()).append(',') | ||
| .append(rec.getInStateSince()).append(',') | ||
| .append(rec.getExpectedReplicaCount()).append(',') | ||
| .append(rec.getActualReplicaCount()).append(',') | ||
| .append(rec.getReplicaDelta()); | ||
| writer.println(sb); | ||
| } | ||
| writer.flush(); |
There was a problem hiding this comment.
PrintWriter swallows IOExceptions (it sets an internal error flag instead of throwing). For a long-running stream, this can hide client disconnects / broken pipes and make failures harder to detect. Consider using a BufferedWriter/OutputStream that propagates IOExceptions (or at least check writer.checkError() periodically) so streaming failures surface reliably.
| const handleExportCsv = useCallback(() => { | ||
| const state = tabToExportState[selectedTab]; | ||
| const exportUrl = `/api/v1/containers/unhealthy/export?state=${state}&limit=${exportLimit}`; | ||
| window.open(exportUrl, '_blank'); | ||
| message.success(`Exporting ${tabToLabel[selectedTab]} containers as CSV (Limit: ${exportLimit})`); | ||
| }, [selectedTab, exportLimit]); | ||
|
|
There was a problem hiding this comment.
handleExportCsv always includes limit=${exportLimit} in the URL. With the current UI default/choices, this can emit limit=0 (Complete) and trigger an unbounded server-side export. Once the backend enforces a max, it’d be safer for the UI to omit the limit param when “Complete” is selected (letting the server apply the configured max) and to avoid showing a success toast until the download actually starts (eg, after a HEAD/check request).
| const handleExportCsv = useCallback(() => { | |
| const state = tabToExportState[selectedTab]; | |
| const exportUrl = `/api/v1/containers/unhealthy/export?state=${state}&limit=${exportLimit}`; | |
| window.open(exportUrl, '_blank'); | |
| message.success(`Exporting ${tabToLabel[selectedTab]} containers as CSV (Limit: ${exportLimit})`); | |
| }, [selectedTab, exportLimit]); | |
| const handleExportCsv = useCallback(async () => { | |
| const state = tabToExportState[selectedTab]; | |
| const queryParams = new URLSearchParams({ state }); | |
| if (exportLimit > 0) { | |
| queryParams.set('limit', exportLimit.toString()); | |
| } | |
| const exportUrl = `/api/v1/containers/unhealthy/export?${queryParams.toString()}`; | |
| try { | |
| const response = await fetch(exportUrl, { method: 'HEAD' }); | |
| if (!response.ok) { | |
| throw new Error(`Failed to export containers. HTTP status: ${response.status}`); | |
| } | |
| window.open(exportUrl, '_blank'); | |
| message.success( | |
| exportLimit > 0 | |
| ? `Exporting ${tabToLabel[selectedTab]} containers as CSV (Limit: ${exportLimit})` | |
| : `Exporting ${tabToLabel[selectedTab]} containers as CSV` | |
| ); | |
| } catch { | |
| message.error(`Failed to start CSV export for ${tabToLabel[selectedTab]} containers`); | |
| } | |
| }, [selectedTab, exportLimit, tabToExportState, tabToLabel]); |
| @GET | ||
| @Path("/unhealthy/export") | ||
| @Produces("text/csv") | ||
| public Response exportUnhealthyContainers( | ||
| @QueryParam("state") String state, | ||
| @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit) { | ||
|
|
||
| ContainerSchemaDefinition.UnHealthyContainerStates internalState = null; | ||
| if (StringUtils.isNotEmpty(state)) { | ||
| try { | ||
| internalState = ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state); | ||
| } catch (IllegalArgumentException e) { | ||
| throw new WebApplicationException(e, Response.Status.BAD_REQUEST); | ||
| } | ||
| } | ||
|
|
||
| final ContainerSchemaDefinition.UnHealthyContainerStates filterState = internalState; | ||
|
|
||
| StreamingOutput stream = outputStream -> { | ||
| try (BufferedOutputStream bos = new BufferedOutputStream(outputStream, 256 * 1024); | ||
| Cursor<UnhealthyContainersRecord> cursor = | ||
| containerHealthSchemaManager.getUnhealthyContainersCursor(filterState, limit)) { | ||
|
|
||
| PrintWriter writer = new PrintWriter(new OutputStreamWriter(bos, StandardCharsets.UTF_8)); | ||
|
|
||
| // Write CSV header | ||
| writer.println("container_id,container_state,in_state_since," + | ||
| "expected_replica_count,actual_replica_count,replica_delta"); | ||
|
|
||
| StringBuilder sb = new StringBuilder(128); | ||
| while (cursor.hasNext()) { | ||
| UnhealthyContainersRecord rec = cursor.fetchNext(); | ||
| sb.setLength(0); | ||
| sb.append(rec.getContainerId()).append(',') | ||
| .append(rec.getContainerState()).append(',') | ||
| .append(rec.getInStateSince()).append(',') | ||
| .append(rec.getExpectedReplicaCount()).append(',') | ||
| .append(rec.getActualReplicaCount()).append(',') | ||
| .append(rec.getReplicaDelta()); | ||
| writer.println(sb); | ||
| } | ||
| writer.flush(); | ||
| } catch (Exception e) { | ||
| LOG.error("Error streaming unhealthy containers CSV", e); | ||
| throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR); | ||
| } | ||
| }; | ||
|
|
||
| String filename = String.format("unhealthy_containers_%s_%d.csv", | ||
| state != null ? state.toLowerCase() : "all", | ||
| System.currentTimeMillis()); | ||
|
|
||
| return Response.ok(stream) | ||
| .header("Content-Disposition", "attachment; filename=\"" + filename + "\"") | ||
| .build(); | ||
| } |
There was a problem hiding this comment.
This new export behavior isn’t covered by tests. TestContainerEndpoint already has coverage for unhealthy container APIs; please add tests for /containers/unhealthy/export to verify: (1) valid state streams CSV with a header, (2) invalid state returns 400, and (3) the configured max-records cap is enforced/clamps the requested limit.
| public Response exportUnhealthyContainers( | ||
| @QueryParam("state") String state, | ||
| @DefaultValue("0") @QueryParam(RECON_QUERY_LIMIT) int limit) { | ||
|
|
There was a problem hiding this comment.
The limit query param is not validated here: negative values currently behave like “unlimited” (because the cursor query only applies a limit when limit > 0). Please reject limit < 0 with 406/400 (consistent with other endpoints) so clients can’t accidentally trigger unbounded exports.
| if (limit < 0) { | |
| throw new WebApplicationException("The limit query parameter must be " | |
| + "greater than or equal to 0.", Response.Status.BAD_REQUEST); | |
| } |
| DSLContext dslContext = containerSchemaDefinition.getDSLContext(); | ||
| org.jooq.SelectQuery<UnhealthyContainersRecord> query = dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery(); | ||
|
|
||
| if (state != null) { | ||
| query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString())); | ||
| // Single-state filter: ordering by container_state is redundant (every | ||
| // row has the same value). ORDER BY container_id alone lets Derby walk | ||
| // the composite index idx_state_container_id in order and return rows | ||
| // immediately — no sort step, eliminating "time to first byte" delay. | ||
| query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc()); | ||
| } else { | ||
| // All-states query: order by state first, then container_id. This | ||
| // matches the composite index order so Derby can still avoid a sort. | ||
| query.addOrderBy( | ||
| UNHEALTHY_CONTAINERS.CONTAINER_STATE.asc(), | ||
| UNHEALTHY_CONTAINERS.CONTAINER_ID.asc() | ||
| ); | ||
| } | ||
|
|
||
| if (limit > 0) { | ||
| query.addLimit(limit); | ||
| } | ||
|
|
||
| return query.fetchLazy(); |
There was a problem hiding this comment.
getUnhealthyContainersCursor streams via fetchLazy(), but unlike getUnhealthyContainers(...) it doesn’t set a JDBC fetch size. Without it, Derby/JDBC may default to very small batches, significantly increasing round-trips for million-row exports. Consider setting a reasonable query.fetchSize(...) (possibly configurable) for better streaming throughput.
| public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT = 1_000_000; | ||
|
|
||
|
|
There was a problem hiding this comment.
The PR description mentions a new config key ozone.recon.csv.export.max.records, but no constant (or any config key) is added here—this change is currently just a blank line. Please add the config key + default (eg 5,000,000) to ReconServerConfigKeys and use it in the export endpoint to enforce a hard cap.
| <Select | ||
| value={exportLimit} | ||
| onChange={(value: number) => setExportLimit(value)} | ||
| style={{ width: 130 }} | ||
| options={[ | ||
| { value: 10000, label: '10K Records' }, | ||
| { value: 100000, label: '100K Records' }, | ||
| { value: 1000000, label: '1M Records' }, | ||
| { value: 0, label: 'Complete' } | ||
| ]} |
There was a problem hiding this comment.
The export limit dropdown doesn’t match the PR description (“10K up to 5 Million”) and includes a Complete option that sends limit=0, which the backend currently interprets as unlimited. Consider adding a 5M option and making “Complete” either omitted (so the server applies its configured max) or explicitly set to the configured cap to avoid accidental unbounded exports.
|
@ArafatKhan2198 , kindly also check co-pilot comments for their validity. |
What changes were proposed in this pull request?
Problem: Currently, exporting a large number of unhealthy container records (e.g., Missing, Under-Replicated, Mis-Replicated, etc.) from Recon is difficult. The UI is paginated and cannot efficiently handle downloading millions of records. Administrators often have to rely on manual, non-scalable database extraction methods (like connecting the Derby
ijclient) to analyze cluster health data.Solution: This ticket implements a native, scalable CSV export feature directly in the Recon UI and Backend.
/api/v1/containers/unhealthy/export) that uses jOOQ cursors and JAX-RSStreamingOutputto fetch and stream records lazily. By piping data directly from the Derby database to the HTTP response, the JVM heap memory is protected from OutOfMemory (OOM) errors, regardless of scale.@AdminOnlyauthorization. Additionally, a new configuration propertyozone.recon.csv.export.max.records(default: 5,000,000) was introduced to forcefully cap exports, protecting the server connection pool and worker threads from accidental misuse or abuse.The file is not stored anywhere on the server and it is not loaded fully into memory. When you click “Export CSV”, the backend starts streaming the response: it uses a database cursor (which is like an iterator that fetches rows in small batches instead of all at once) to read records one by one, and immediately writes each row to the HTTP response. The browser receives this data continuously and saves it as a file on your machine as it arrives. So there is no temporary file created on the server and no large memory usage—the data simply flows from the database → backend → browser in a stream until the download completes.
What is the link to the Apache JIRA
https://issues.apache.org/jira/browse/HDDS-14902
How was this patch tested?
Tested out with 4 million containers 1 million for each unhealthy state.