Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docker/resources/load-and-execute.sh
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ else
"${HOP_EXEC_OPTIONS}" \
"${HOP_COMMAND_PARAMETERS}" \
2>&1 | tee "${HOP_LOG_PATH}"
exitWithCode "${PIPESTATUS[0]}"
else
log "Running a single hop workflow / pipeline (${HOP_FILE_PATH})"
"${DEPLOYMENT_PATH}"/hop-run.sh \
Expand All @@ -255,6 +256,6 @@ else
--parameters="${HOP_RUN_PARAMETERS}" \
${HOP_EXEC_OPTIONS} \
2>&1 | tee "${HOP_LOG_PATH}"
exitWithCode "${PIPESTATUS[0]}"
fi
exitWithCode "${PIPESTATUS[0]}"
fi
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,15 @@ When data is sent to the Hop Flight server, the incoming schema is checked again
If they do not match exactly, an error is thrown.

| *Batch Size* |
Hint for the number of rows per batch (default: 10,000).
The number of rows that Apache Arrow will use per batch. (default: 500)

| *Maximum Buffer Size* |
The maximum number of rows that will be kept in the buffer on the Hop Flight server. Set it high enough to avoid losing rows. As a ballpark figure, take the throughput in rows/s and multiply that by 10 for the buffer size to avoid issues. (default: 10M)

|===

IMPORTANT: This is an IPC system, not a safe data queue. The purpose is to hand data over to the receiving party as soon as possible. That is why there is no blocking happening when we write data to the Flight server. Rows are kept in memory to avoid stalling the gRPC back-end system as this would cause data to get lost. The only time we wait is when we read data from the Flight server. That is why it's recommended to start reading with one process before you write data with another. There is a time-out configured of 1 minute giving you plenty of time.

== Important Behavior

* The **stream name** is simply the name of the Data Stream metadata element.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,22 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipInputStream;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.io.IOUtils;
import org.apache.hop.core.Const;
import org.apache.hop.core.exception.HopException;
Expand All @@ -42,6 +47,8 @@
import org.jboss.jandex.IndexWriter;
import org.jboss.jandex.Indexer;

@Getter
@Setter
public class FatJarBuilder {

public static final String CONST_META_INF = "META-INF";
Expand Down Expand Up @@ -80,7 +87,6 @@ public FatJarBuilder(
}

public void buildTargetJar() throws HopException {

fileContentMap = new HashMap<>();
classCollisionMap = new HashMap<>();
collisionFileSet = new HashSet<>();
Expand Down Expand Up @@ -242,6 +248,9 @@ else if (entryName.endsWith("META-INF/jandex.idx")) {
}
}

// Add the META-INF/MANIFEST.MF file:
addInfoManifestMf(zipOutputStream);

// Add the META-INF/services files...
//
for (String entryName : fileContentMap.keySet()) {
Expand All @@ -266,83 +275,15 @@ else if (entryName.endsWith("META-INF/jandex.idx")) {
}
}

/**
* Gets targetJarFile
*
* @return value of targetJarFile
*/
public String getTargetJarFile() {
return targetJarFile;
}

/**
* @param targetJarFile The targetJarFile to set
*/
public void setTargetJarFile(String targetJarFile) {
this.targetJarFile = targetJarFile;
}

/**
* Gets jarFiles
*
* @return value of jarFiles
*/
public List<String> getJarFiles() {
return jarFiles;
}

/**
* @param jarFiles The jarFiles to set
*/
public void setJarFiles(List<String> jarFiles) {
this.jarFiles = jarFiles;
}

/**
* Gets extraTransformPluginClasses
*
* @return value of extraTransformPluginClasses
*/
public String getExtraTransformPluginClasses() {
return extraTransformPluginClasses;
}

/**
* @param extraTransformPluginClasses The extraTransformPluginClasses to set
*/
public void setExtraTransformPluginClasses(String extraTransformPluginClasses) {
this.extraTransformPluginClasses = extraTransformPluginClasses;
}

/**
* Gets extraXpPluginClasses
*
* @return value of extraXpPluginClasses
*/
public String getExtraXpPluginClasses() {
return extraXpPluginClasses;
}
private static void addInfoManifestMf(JarOutputStream zipOutputStream) throws IOException {
ZipEntry manifestEntry = new ZipEntry("META-INF/MANIFEST.MF");

/**
* @param extraXpPluginClasses The extraXpPluginClasses to set
*/
public void setExtraXpPluginClasses(String extraXpPluginClasses) {
this.extraXpPluginClasses = extraXpPluginClasses;
}

/**
* Gets fileContentMap
*
* @return value of fileContentMap
*/
public Map<String, String> getFileContentMap() {
return fileContentMap;
}
Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
manifest.getMainAttributes().put(new Attributes.Name("Multi-Release"), "true");

/**
* @param fileContentMap The fileContentMap to set
*/
public void setFileContentMap(Map<String, String> fileContentMap) {
this.fileContentMap = fileContentMap;
zipOutputStream.putNextEntry(manifestEntry);
manifest.write(zipOutputStream);
zipOutputStream.closeEntry();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hop.arrow.datastream.flight;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
Expand Down Expand Up @@ -56,6 +55,8 @@
@Setter
@GuiPlugin
public class ArrowFlightDataStream extends ArrowBaseDataStream {
public static final int DEFAULT_MAX_BUFFER_SIZE = 10000000;

@GuiWidgetElement(
order = "20000-arrow-flight-data-stream-buffer-size",
parentId = DataStreamMeta.GUI_WIDGETS_PARENT_ID,
Expand Down Expand Up @@ -124,7 +125,7 @@ public ArrowFlightDataStream() {
this.pluginId = annotation.id();
this.pluginName = annotation.name();
rowBuffer = new ArrayList<>();
bufferSize = "500";
bufferSize = Integer.toString(DEFAULT_MAX_BUFFER_SIZE);
batchSize = "10000";
hostname = "localhost";
port = "33333";
Expand All @@ -151,11 +152,17 @@ public void initialize(
DataStreamMeta dataStreamMeta)
throws HopException {
super.initialize(variables, metadataProvider, writing, dataStreamMeta);
realBufferSize = Const.toInt(variables.resolve(bufferSize), 10000);
realBufferSize = Const.toInt(variables.resolve(bufferSize), DEFAULT_MAX_BUFFER_SIZE);
realBatchSize = Const.toInt(variables.resolve(batchSize), 500);
String realSchemaDefinition = variables.resolve(schemaDefinitionName);
schemaDefinition =
metadataProvider.getSerializer(SchemaDefinition.class).load(realSchemaDefinition);
if (schemaDefinition == null) {
throw new HopException(
"The specified schema definition '"
+ realSchemaDefinition
+ "' could not be found in the Hop Flight server metadata.");
}
realHostname = variables.resolve(hostname);
realPort = Const.toInt(variables.resolve(port), 33333);
firstRead = true;
Expand Down Expand Up @@ -332,7 +339,7 @@ public Object[] readRow() throws HopException {
}
}

protected boolean readNextBatch() throws IOException {
protected boolean readNextBatch() {
boolean readNext = readFlightStream.next();
readVectorSchemaRoot = readFlightStream.getRoot();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,9 @@
import org.apache.hop.core.row.IRowMeta;

public record FlightStreamBuffer(
Schema schema, IRowMeta rowMeta, IRowSet rowSet, int batchSize, Location location) {}
Schema schema,
IRowMeta rowMeta,
IRowSet rowSet,
int bufferSize,
int batchSize,
Location location) {}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.arrow.flight.CallStatus;
import org.apache.arrow.flight.FlightDescriptor;
import org.apache.arrow.flight.FlightEndpoint;
Expand All @@ -39,9 +40,9 @@
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.hop.arrow.datastream.flight.ArrowFlightDataStream;
import org.apache.hop.arrow.datastream.shared.ArrowBaseDataStream;
import org.apache.hop.core.BlockingRowSet;
import org.apache.hop.core.Const;
import org.apache.hop.core.IRowSet;
import org.apache.hop.core.QueueRowSet;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.variables.IVariables;
Expand All @@ -50,12 +51,14 @@
import org.apache.hop.metadata.api.IHopMetadataProvider;

public class HopFlightProducer extends NoOpFlightProducer {
public static final int BUFFER_SIZE_OVERSHOOT = 5000;
public static final int BUFFER_SIZE_ERROR_LEVEL = 4000;
public static final int MAX_READ_BLOCK_TIME_MS = 60000;

private final IVariables variables;
private final IHopMetadataProvider metadataProvider;
private final RootAllocator rootAllocator;
private final Map<String, FlightStreamBuffer> streamMap;
private int rowsRead;
private int rowsWritten;

public HopFlightProducer(
IVariables variables, IHopMetadataProvider metadataProvider, RootAllocator rootAllocator) {
Expand Down Expand Up @@ -93,7 +96,7 @@ public Runnable acceptPut(
//
IRowMeta rowMeta = streamBuffer.rowMeta();
IRowSet rowSet = streamBuffer.rowSet();
rowsWritten = 0;
int bufferSize = streamBuffer.bufferSize();
while (flightStream.next()) {
VectorSchemaRoot vectorSchemaRoot = flightStream.getRoot();
List<FieldVector> fieldVectors = vectorSchemaRoot.getFieldVectors();
Expand All @@ -108,7 +111,16 @@ public Runnable acceptPut(
Object[] rowData =
ArrowBaseDataStream.convertFieldVectorsToHopRow(fieldVectors, rowMeta, rowIndex);
rowSet.putRow(rowMeta, rowData);
rowsWritten++;

// If too many rows are kept in memory, throw an error!
// The client needs to read faster. This is IPC, not queueing!
//
if (rowSet.size() > 0 && rowSet.size() > bufferSize + BUFFER_SIZE_ERROR_LEVEL) {
throw new HopException(
"The maximum amount of rows kept in memory is exceeded by reaching "
+ rowSet.size()
+ " rows.");
}
}
}

Expand Down Expand Up @@ -145,18 +157,22 @@ private FlightStreamBuffer lookupArrowStreamBuffer(String streamName) throws Hop
IRowMeta rowMeta = flightDataStream.buildExpectedRowMeta();
Schema expectedSchema = flightDataStream.buildExpectedSchema();

// int bufferSize = Const.toInt(variables.resolve(flightDataStream.getBufferSize()), 10000);
int bufferSize =
Const.toInt(
variables.resolve(flightDataStream.getBufferSize()),
ArrowFlightDataStream.DEFAULT_MAX_BUFFER_SIZE);
int batchSize = Const.toInt(variables.resolve(flightDataStream.getBatchSize()), 500);

// Figure out why blocking rows isn't a good idea.
// Are we receiving rows in parallel if we block?
// We use a very large queue because we don't ever want to block while writing.
// We over-size it by 5000 rows and then throw an error if we reach that.
//
IRowSet rowSet = new QueueRowSet();
IRowSet rowSet = new BlockingRowSet(bufferSize + BUFFER_SIZE_OVERSHOOT);

String hostname = Const.NVL(variables.resolve(flightDataStream.getHostname()), "0.0.0.0");
int port = Const.toInt(variables.resolve(flightDataStream.getPort()), 33333);
Location location = Location.forGrpcInsecure(hostname, port);
buffer = new FlightStreamBuffer(expectedSchema, rowMeta, rowSet, batchSize, location);
buffer =
new FlightStreamBuffer(expectedSchema, rowMeta, rowSet, bufferSize, batchSize, location);
streamMap.put(streamName, buffer);
}
return buffer;
Expand Down Expand Up @@ -184,11 +200,9 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
//
listener.start(vectorSchemaRoot);

rowsRead = 0;
// Example: Pull rows from a readable RowSet and send them back as Arrow batches
Object[] hopRow = rowSet.getRow();
Object[] hopRow = waitForRow(rowSet);
while (hopRow != null) {
rowsRead++;

// Add the row to a batch row buffer:
//
Expand All @@ -200,7 +214,8 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
// Send the batch
listener.putNext();
}
hopRow = rowSet.getRow();
// Get another row
hopRow = waitForRow(rowSet);
}
// Do we have any rows in the buffer left?
//
Expand All @@ -210,12 +225,27 @@ public void getStream(CallContext context, Ticket ticket, ServerStreamListener l
}

listener.completed();

// Clean up the stream map as well to avoid leaking memory
//
streamMap.remove(streamName);
}
} catch (Exception e) {
listener.error(CallStatus.INTERNAL.withCause(e).toRuntimeException());
}
}

private Object[] waitForRow(IRowSet rowSet) throws HopException {
Object[] row = rowSet.getRowWait(20, TimeUnit.MILLISECONDS);
long startTime = System.currentTimeMillis();
while (row == null
&& !rowSet.isDone()
&& System.currentTimeMillis() - startTime < MAX_READ_BLOCK_TIME_MS) {
row = rowSet.getRowWait(20, TimeUnit.MILLISECONDS);
}
return row;
}

private void fillBatch(
List<Object[]> rowBuffer, IRowMeta rowMeta, VectorSchemaRoot vectorSchemaRoot)
throws HopException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
# limitations under the License.
#
#
ArrowFlightDataStream.BufferSize.Label=Stream buffer size
ArrowFlightDataStream.BufferSize.Tooltip=The number of rows to buffer, kept on the server, before blocking.
ArrowFlightDataStream.BufferSize.Label=Stream maximum buffer size
ArrowFlightDataStream.BufferSize.Tooltip=The maximum number of rows to buffer, kept on the server, before throwing an error to prevent the server running out of memory.
ArrowFlightDataStream.BatchSize.Label=Batch size
ArrowFlightDataStream.BatchSize.Tooltip=The Apache Arrow batch size for the stream reading from the server.
ArrowFlightDataStream.SchemaDefinition.Label=The schema definition
Expand Down
Loading