From 0b2730b931dd100d6f4267d2e57c1915db5c44a5 Mon Sep 17 00:00:00 2001 From: mattcasters Date: Tue, 7 Apr 2026 19:34:03 +0200 Subject: [PATCH 1/2] issue #6968 (include META-INF/MANIFEST.MF in fat jars) --- .../beam/pipeline/fatjar/FatJarBuilder.java | 95 ++++--------------- 1 file changed, 18 insertions(+), 77 deletions(-) diff --git a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/fatjar/FatJarBuilder.java b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/fatjar/FatJarBuilder.java index 50a558e42e8..5cfb045d6ed 100644 --- a/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/fatjar/FatJarBuilder.java +++ b/plugins/engines/beam/src/main/java/org/apache/hop/beam/pipeline/fatjar/FatJarBuilder.java @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; @@ -29,10 +30,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.jar.Attributes; import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; import java.util.zip.ZipEntry; import java.util.zip.ZipException; import java.util.zip.ZipInputStream; +import lombok.Getter; +import lombok.Setter; import org.apache.commons.io.IOUtils; import org.apache.hop.core.Const; import org.apache.hop.core.exception.HopException; @@ -42,6 +47,8 @@ import org.jboss.jandex.IndexWriter; import org.jboss.jandex.Indexer; +@Getter +@Setter public class FatJarBuilder { public static final String CONST_META_INF = "META-INF"; @@ -80,7 +87,6 @@ public FatJarBuilder( } public void buildTargetJar() throws HopException { - fileContentMap = new HashMap<>(); classCollisionMap = new HashMap<>(); collisionFileSet = new HashSet<>(); @@ -242,6 +248,9 @@ else if (entryName.endsWith("META-INF/jandex.idx")) { } } + // Add the META-INF/MANIFEST.MF file: + addInfoManifestMf(zipOutputStream); + // Add the META-INF/services files... // for (String entryName : fileContentMap.keySet()) { @@ -266,83 +275,15 @@ else if (entryName.endsWith("META-INF/jandex.idx")) { } } - /** - * Gets targetJarFile - * - * @return value of targetJarFile - */ - public String getTargetJarFile() { - return targetJarFile; - } - - /** - * @param targetJarFile The targetJarFile to set - */ - public void setTargetJarFile(String targetJarFile) { - this.targetJarFile = targetJarFile; - } - - /** - * Gets jarFiles - * - * @return value of jarFiles - */ - public List getJarFiles() { - return jarFiles; - } - - /** - * @param jarFiles The jarFiles to set - */ - public void setJarFiles(List jarFiles) { - this.jarFiles = jarFiles; - } - - /** - * Gets extraTransformPluginClasses - * - * @return value of extraTransformPluginClasses - */ - public String getExtraTransformPluginClasses() { - return extraTransformPluginClasses; - } - - /** - * @param extraTransformPluginClasses The extraTransformPluginClasses to set - */ - public void setExtraTransformPluginClasses(String extraTransformPluginClasses) { - this.extraTransformPluginClasses = extraTransformPluginClasses; - } - - /** - * Gets extraXpPluginClasses - * - * @return value of extraXpPluginClasses - */ - public String getExtraXpPluginClasses() { - return extraXpPluginClasses; - } + private static void addInfoManifestMf(JarOutputStream zipOutputStream) throws IOException { + ZipEntry manifestEntry = new ZipEntry("META-INF/MANIFEST.MF"); - /** - * @param extraXpPluginClasses The extraXpPluginClasses to set - */ - public void setExtraXpPluginClasses(String extraXpPluginClasses) { - this.extraXpPluginClasses = extraXpPluginClasses; - } - - /** - * Gets fileContentMap - * - * @return value of fileContentMap - */ - public Map getFileContentMap() { - return fileContentMap; - } + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + manifest.getMainAttributes().put(new Attributes.Name("Multi-Release"), "true"); - /** - * @param fileContentMap The fileContentMap to set - */ - public void setFileContentMap(Map fileContentMap) { - this.fileContentMap = fileContentMap; + zipOutputStream.putNextEntry(manifestEntry); + manifest.write(zipOutputStream); + zipOutputStream.closeEntry(); } } From 88becec3c52f53c3c2b1623b9f24e4dd3a7d63c5 Mon Sep 17 00:00:00 2001 From: mattcasters Date: Tue, 7 Apr 2026 21:46:39 +0200 Subject: [PATCH 2/2] issue #6963 (Data Stream docs update and code hardening) --- docker/resources/load-and-execute.sh | 3 +- .../data-stream/arrow-flight.adoc | 7 ++- .../flight/ArrowFlightDataStream.java | 15 +++-- .../hop/arrow/flight/FlightStreamBuffer.java | 7 ++- .../hop/arrow/flight/HopFlightProducer.java | 58 ++++++++++++++----- .../flight/messages/messages_en_US.properties | 4 +- 6 files changed, 71 insertions(+), 23 deletions(-) diff --git a/docker/resources/load-and-execute.sh b/docker/resources/load-and-execute.sh index 99a6a80cf8f..14b75171f8a 100755 --- a/docker/resources/load-and-execute.sh +++ b/docker/resources/load-and-execute.sh @@ -247,6 +247,7 @@ else "${HOP_EXEC_OPTIONS}" \ "${HOP_COMMAND_PARAMETERS}" \ 2>&1 | tee "${HOP_LOG_PATH}" + exitWithCode "${PIPESTATUS[0]}" else log "Running a single hop workflow / pipeline (${HOP_FILE_PATH})" "${DEPLOYMENT_PATH}"/hop-run.sh \ @@ -255,6 +256,6 @@ else --parameters="${HOP_RUN_PARAMETERS}" \ ${HOP_EXEC_OPTIONS} \ 2>&1 | tee "${HOP_LOG_PATH}" + exitWithCode "${PIPESTATUS[0]}" fi - exitWithCode "${PIPESTATUS[0]}" fi diff --git a/docs/hop-user-manual/modules/ROOT/pages/metadata-types/data-stream/arrow-flight.adoc b/docs/hop-user-manual/modules/ROOT/pages/metadata-types/data-stream/arrow-flight.adoc index ba8aeabe296..daf25ad0938 100644 --- a/docs/hop-user-manual/modules/ROOT/pages/metadata-types/data-stream/arrow-flight.adoc +++ b/docs/hop-user-manual/modules/ROOT/pages/metadata-types/data-stream/arrow-flight.adoc @@ -45,10 +45,15 @@ When data is sent to the Hop Flight server, the incoming schema is checked again If they do not match exactly, an error is thrown. | *Batch Size* | -Hint for the number of rows per batch (default: 10,000). +The number of rows that Apache Arrow will use per batch. (default: 500) + +| *Maximum Buffer Size* | +The maximum number of rows that will be kept in the buffer on the Hop Flight server. Set it high enough to avoid losing rows. As a ballpark figure, take the throughput in rows/s and multiply that by 10 for the buffer size to avoid issues. (default: 10M) |=== +IMPORTANT: This is an IPC system, not a safe data queue. The purpose is to hand data over to the receiving party as soon as possible. That is why there is no blocking happening when we write data to the Flight server. Rows are kept in memory to avoid stalling the gRPC back-end system as this would cause data to get lost. The only time we wait is when we read data from the Flight server. That is why it's recommended to start reading with one process before you write data with another. There is a time-out configured of 1 minute giving you plenty of time. + == Important Behavior * The **stream name** is simply the name of the Data Stream metadata element. diff --git a/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/datastream/flight/ArrowFlightDataStream.java b/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/datastream/flight/ArrowFlightDataStream.java index dac8c3e0962..5d0747d473f 100644 --- a/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/datastream/flight/ArrowFlightDataStream.java +++ b/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/datastream/flight/ArrowFlightDataStream.java @@ -18,7 +18,6 @@ package org.apache.hop.arrow.datastream.flight; -import java.io.IOException; import java.util.ArrayList; import java.util.List; import lombok.Getter; @@ -56,6 +55,8 @@ @Setter @GuiPlugin public class ArrowFlightDataStream extends ArrowBaseDataStream { + public static final int DEFAULT_MAX_BUFFER_SIZE = 10000000; + @GuiWidgetElement( order = "20000-arrow-flight-data-stream-buffer-size", parentId = DataStreamMeta.GUI_WIDGETS_PARENT_ID, @@ -124,7 +125,7 @@ public ArrowFlightDataStream() { this.pluginId = annotation.id(); this.pluginName = annotation.name(); rowBuffer = new ArrayList<>(); - bufferSize = "500"; + bufferSize = Integer.toString(DEFAULT_MAX_BUFFER_SIZE); batchSize = "10000"; hostname = "localhost"; port = "33333"; @@ -151,11 +152,17 @@ public void initialize( DataStreamMeta dataStreamMeta) throws HopException { super.initialize(variables, metadataProvider, writing, dataStreamMeta); - realBufferSize = Const.toInt(variables.resolve(bufferSize), 10000); + realBufferSize = Const.toInt(variables.resolve(bufferSize), DEFAULT_MAX_BUFFER_SIZE); realBatchSize = Const.toInt(variables.resolve(batchSize), 500); String realSchemaDefinition = variables.resolve(schemaDefinitionName); schemaDefinition = metadataProvider.getSerializer(SchemaDefinition.class).load(realSchemaDefinition); + if (schemaDefinition == null) { + throw new HopException( + "The specified schema definition '" + + realSchemaDefinition + + "' could not be found in the Hop Flight server metadata."); + } realHostname = variables.resolve(hostname); realPort = Const.toInt(variables.resolve(port), 33333); firstRead = true; @@ -332,7 +339,7 @@ public Object[] readRow() throws HopException { } } - protected boolean readNextBatch() throws IOException { + protected boolean readNextBatch() { boolean readNext = readFlightStream.next(); readVectorSchemaRoot = readFlightStream.getRoot(); diff --git a/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/FlightStreamBuffer.java b/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/FlightStreamBuffer.java index 2d3c8bd0c49..30f6965ddd6 100644 --- a/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/FlightStreamBuffer.java +++ b/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/FlightStreamBuffer.java @@ -24,4 +24,9 @@ import org.apache.hop.core.row.IRowMeta; public record FlightStreamBuffer( - Schema schema, IRowMeta rowMeta, IRowSet rowSet, int batchSize, Location location) {} + Schema schema, + IRowMeta rowMeta, + IRowSet rowSet, + int bufferSize, + int batchSize, + Location location) {} diff --git a/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/HopFlightProducer.java b/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/HopFlightProducer.java index 33b43730fba..e72f7df2ada 100644 --- a/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/HopFlightProducer.java +++ b/plugins/tech/arrow/src/main/java/org/apache/hop/arrow/flight/HopFlightProducer.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import org.apache.arrow.flight.CallStatus; import org.apache.arrow.flight.FlightDescriptor; import org.apache.arrow.flight.FlightEndpoint; @@ -39,9 +40,9 @@ import org.apache.arrow.vector.types.pojo.Schema; import org.apache.hop.arrow.datastream.flight.ArrowFlightDataStream; import org.apache.hop.arrow.datastream.shared.ArrowBaseDataStream; +import org.apache.hop.core.BlockingRowSet; import org.apache.hop.core.Const; import org.apache.hop.core.IRowSet; -import org.apache.hop.core.QueueRowSet; import org.apache.hop.core.exception.HopException; import org.apache.hop.core.row.IRowMeta; import org.apache.hop.core.variables.IVariables; @@ -50,12 +51,14 @@ import org.apache.hop.metadata.api.IHopMetadataProvider; public class HopFlightProducer extends NoOpFlightProducer { + public static final int BUFFER_SIZE_OVERSHOOT = 5000; + public static final int BUFFER_SIZE_ERROR_LEVEL = 4000; + public static final int MAX_READ_BLOCK_TIME_MS = 60000; + private final IVariables variables; private final IHopMetadataProvider metadataProvider; private final RootAllocator rootAllocator; private final Map streamMap; - private int rowsRead; - private int rowsWritten; public HopFlightProducer( IVariables variables, IHopMetadataProvider metadataProvider, RootAllocator rootAllocator) { @@ -93,7 +96,7 @@ public Runnable acceptPut( // IRowMeta rowMeta = streamBuffer.rowMeta(); IRowSet rowSet = streamBuffer.rowSet(); - rowsWritten = 0; + int bufferSize = streamBuffer.bufferSize(); while (flightStream.next()) { VectorSchemaRoot vectorSchemaRoot = flightStream.getRoot(); List fieldVectors = vectorSchemaRoot.getFieldVectors(); @@ -108,7 +111,16 @@ public Runnable acceptPut( Object[] rowData = ArrowBaseDataStream.convertFieldVectorsToHopRow(fieldVectors, rowMeta, rowIndex); rowSet.putRow(rowMeta, rowData); - rowsWritten++; + + // If too many rows are kept in memory, throw an error! + // The client needs to read faster. This is IPC, not queueing! + // + if (rowSet.size() > 0 && rowSet.size() > bufferSize + BUFFER_SIZE_ERROR_LEVEL) { + throw new HopException( + "The maximum amount of rows kept in memory is exceeded by reaching " + + rowSet.size() + + " rows."); + } } } @@ -145,18 +157,22 @@ private FlightStreamBuffer lookupArrowStreamBuffer(String streamName) throws Hop IRowMeta rowMeta = flightDataStream.buildExpectedRowMeta(); Schema expectedSchema = flightDataStream.buildExpectedSchema(); - // int bufferSize = Const.toInt(variables.resolve(flightDataStream.getBufferSize()), 10000); + int bufferSize = + Const.toInt( + variables.resolve(flightDataStream.getBufferSize()), + ArrowFlightDataStream.DEFAULT_MAX_BUFFER_SIZE); int batchSize = Const.toInt(variables.resolve(flightDataStream.getBatchSize()), 500); - // Figure out why blocking rows isn't a good idea. - // Are we receiving rows in parallel if we block? + // We use a very large queue because we don't ever want to block while writing. + // We over-size it by 5000 rows and then throw an error if we reach that. // - IRowSet rowSet = new QueueRowSet(); + IRowSet rowSet = new BlockingRowSet(bufferSize + BUFFER_SIZE_OVERSHOOT); String hostname = Const.NVL(variables.resolve(flightDataStream.getHostname()), "0.0.0.0"); int port = Const.toInt(variables.resolve(flightDataStream.getPort()), 33333); Location location = Location.forGrpcInsecure(hostname, port); - buffer = new FlightStreamBuffer(expectedSchema, rowMeta, rowSet, batchSize, location); + buffer = + new FlightStreamBuffer(expectedSchema, rowMeta, rowSet, bufferSize, batchSize, location); streamMap.put(streamName, buffer); } return buffer; @@ -184,11 +200,9 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l // listener.start(vectorSchemaRoot); - rowsRead = 0; // Example: Pull rows from a readable RowSet and send them back as Arrow batches - Object[] hopRow = rowSet.getRow(); + Object[] hopRow = waitForRow(rowSet); while (hopRow != null) { - rowsRead++; // Add the row to a batch row buffer: // @@ -200,7 +214,8 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l // Send the batch listener.putNext(); } - hopRow = rowSet.getRow(); + // Get another row + hopRow = waitForRow(rowSet); } // Do we have any rows in the buffer left? // @@ -210,12 +225,27 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l } listener.completed(); + + // Clean up the stream map as well to avoid leaking memory + // + streamMap.remove(streamName); } } catch (Exception e) { listener.error(CallStatus.INTERNAL.withCause(e).toRuntimeException()); } } + private Object[] waitForRow(IRowSet rowSet) throws HopException { + Object[] row = rowSet.getRowWait(20, TimeUnit.MILLISECONDS); + long startTime = System.currentTimeMillis(); + while (row == null + && !rowSet.isDone() + && System.currentTimeMillis() - startTime < MAX_READ_BLOCK_TIME_MS) { + row = rowSet.getRowWait(20, TimeUnit.MILLISECONDS); + } + return row; + } + private void fillBatch( List rowBuffer, IRowMeta rowMeta, VectorSchemaRoot vectorSchemaRoot) throws HopException { diff --git a/plugins/tech/arrow/src/main/resources/org/apache/hop/arrow/datastream/flight/messages/messages_en_US.properties b/plugins/tech/arrow/src/main/resources/org/apache/hop/arrow/datastream/flight/messages/messages_en_US.properties index e7e0479b8ea..cf733ffa00b 100644 --- a/plugins/tech/arrow/src/main/resources/org/apache/hop/arrow/datastream/flight/messages/messages_en_US.properties +++ b/plugins/tech/arrow/src/main/resources/org/apache/hop/arrow/datastream/flight/messages/messages_en_US.properties @@ -15,8 +15,8 @@ # limitations under the License. # # -ArrowFlightDataStream.BufferSize.Label=Stream buffer size -ArrowFlightDataStream.BufferSize.Tooltip=The number of rows to buffer, kept on the server, before blocking. +ArrowFlightDataStream.BufferSize.Label=Stream maximum buffer size +ArrowFlightDataStream.BufferSize.Tooltip=The maximum number of rows to buffer, kept on the server, before throwing an error to prevent the server running out of memory. ArrowFlightDataStream.BatchSize.Label=Batch size ArrowFlightDataStream.BatchSize.Tooltip=The Apache Arrow batch size for the stream reading from the server. ArrowFlightDataStream.SchemaDefinition.Label=The schema definition