diff --git a/assemblies/static/src/main/resources/hop-gui.sh b/assemblies/static/src/main/resources/hop-gui.sh
index 9d6591a9ef0..f384182f33d 100755
--- a/assemblies/static/src/main/resources/hop-gui.sh
+++ b/assemblies/static/src/main/resources/hop-gui.sh
@@ -22,6 +22,9 @@ ORIGINDIR=$(pwd)
BASEDIR=$(dirname $0)
cd $BASEDIR
+# Compliance Engine customization to add MASKING HOME lib dir to classpath
+ADDITIONAL_CLASSPATH=":lib"
+
# set java primary is HOP_JAVA_HOME fallback to JAVA_HOME or default java
if [ -n "$HOP_JAVA_HOME" ]; then
_HOP_JAVA=$HOP_JAVA_HOME/bin/java
@@ -81,7 +84,7 @@ Darwin)
;;
esac
-"$_HOP_JAVA" ${HOP_OPTIONS} -Djava.library.path=$LIBPATH -classpath "${CLASSPATH}" org.apache.hop.ui.hopgui.HopGui $@
+"$_HOP_JAVA" ${HOP_OPTIONS} -Djava.library.path=$LIBPATH -classpath "${CLASSPATH}${ADDITIONAL_CLASSPATH}" org.apache.hop.ui.hopgui.HopGui $@
EXITCODE=$?
cd ${ORIGINDIR}
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
index eeb046f169a..4d751edfc56 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/BaseTransform.java
@@ -193,15 +193,15 @@ public class BaseTransform();
+ outputRowSets = new ArrayList<>();
nextTransforms = null;
terminator = transformMeta.hasTerminator();
@@ -418,8 +418,15 @@ public BaseTransform(
dispatch();
- upperBufferBoundary = (int) (pipeline.getRowSetSize() * 0.99);
- lowerBufferBoundary = (int) (pipeline.getRowSetSize() * 0.01);
+ if (pipeline != null) {
+ upperBufferBoundary = (int) (pipeline.getRowSetSize() * 0.99);
+ lowerBufferBoundary = (int) (pipeline.getRowSetSize() * 0.01);
+ } else {
+ upperBufferBoundary = 100;
+ lowerBufferBoundary = 10;
+ }
+
+ setInternalVariables();
}
@Override
@@ -441,7 +448,7 @@ public boolean init() {
.getPartitionSchema()
.calculatePartitionIds(this);
- if (partitionIdList.size() > 0) {
+ if (!partitionIdList.isEmpty()) {
String partitionId = partitionIdList.get(partitionNr);
setVariable(Const.INTERNAL_VARIABLE_TRANSFORM_PARTITION_ID, partitionId);
} else {
@@ -1023,13 +1030,19 @@ private void handlePutRow(IRowMeta rowMeta, Object[] row) throws HopTransformExc
// Are we running yet? If not, wait a bit until all threads have been
// started.
//
- if (this.checkPipelineRunning == false) {
+ if (!this.checkPipelineRunning) {
+ int counter = 0;
while (!pipeline.isRunning() && !stopped.get()) {
try {
- Thread.sleep(1);
+ Thread.sleep(1000);
+ counter++;
} catch (InterruptedException e) {
// Ignore
}
+ // wait 3s max
+ if (counter >= 3) {
+ break;
+ }
}
this.checkPipelineRunning = true;
}
@@ -1100,7 +1113,7 @@ private void specialPartitioning(IRowMeta rowMeta, Object[] row) throws HopTrans
// This is the case for non-clustered partitioning...
//
List nextTransforms = pipelineMeta.findNextTransforms(transformMeta);
- if (nextTransforms.size() > 0) {
+ if (!nextTransforms.isEmpty()) {
nextTransformPartitioningMeta = nextTransforms.get(0).getTransformPartitioningMeta();
}
@@ -1287,11 +1300,9 @@ public void handlePutRowTo(IRowMeta rowMeta, Object[] row, IRowSet rowSet)
}
}
- // call all row listeners...
- //
- for (IRowListener listener : rowListeners) {
- listener.rowWrittenEvent(rowMeta, row);
- }
+ // Do not call the row listeners for targeted rows.
+ // It can cause rows with varying layouts to arrive at the same listener without a way to keep
+ // them apart.
// Keep adding to terminator_rows buffer...
if (terminator && terminatorRows != null) {
@@ -1479,7 +1490,7 @@ protected void waitUntilPipelineIsStarted() {
// Are we running yet? If not, wait a bit until all threads have been
// started.
//
- if (this.checkPipelineRunning == false) {
+ if (!this.checkPipelineRunning) {
while (!pipeline.isRunning() && !stopped.get()) {
try {
Thread.sleep(1);
@@ -1556,6 +1567,7 @@ private Object[] handleGetRow() throws HopException {
row = inputRowSet.getRowImmediate();
}
if (row != null) {
+ obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
@@ -1567,11 +1579,23 @@ private Object[] handleGetRow() throws HopException {
// The buffer to grow beyond "a few" entries.
// We'll only do that if the previous transform has not ended...
- if (!inputRowSet.isDone() && inputRowSet.size() <= lowerBufferBoundary && !isStopped()) {
- try {
- Thread.sleep(0, 1);
- } catch (InterruptedException e) {
- // Ignore sleep interruption exception
+ if (!isStopped() && row == null) {
+ boolean streamReady = false;
+ // Check each other input stream to see that stream meets the threshold
+ for (int r = 0; r < inputRowSets.size(); r++) {
+ if (inputRowSet.isDone() || inputRowSet.size() > lowerBufferBoundary) {
+ streamReady = true;
+ break;
+ }
+ nextInputStream();
+ inputRowSet = currentInputStream();
+ }
+ if (!streamReady) {
+ try {
+ Thread.sleep(0, 1); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
+ } catch (InterruptedException e) {
+ // Ignore sleep interruption exception
+ }
}
}
@@ -1588,7 +1612,7 @@ private Object[] handleGetRow() throws HopException {
// We can use timeouts to switch from one to another...
//
if (waitingTime == null) {
- waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr);
+ waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr, 20);
}
while (row == null && !isStopped()) {
// Get a row from the input in row set ...
@@ -1598,6 +1622,7 @@ private Object[] handleGetRow() throws HopException {
row = inputRowSet.getRowWait(waitingTime.get(), TimeUnit.MILLISECONDS);
boolean timeout = false;
if (row != null) {
+ obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
blockPointer++;
waitingTime.reset();
@@ -1632,6 +1657,7 @@ private Object[] handleGetRow() throws HopException {
inputRowSetsLock.writeLock().unlock();
}
} else {
+ obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
@@ -1660,17 +1686,12 @@ private Object[] handleGetRow() throws HopException {
nextInputStream();
inputRowSet = currentInputStream();
row = getRowFrom(inputRowSet);
+ obtainInputRowMeta(row, inputRowSet);
}
} finally {
inputRowSetsLock.readLock().unlock();
}
- // Also set the meta data on the first occurrence.
- // or if prevTransforms.length > 1 inputRowMeta can be changed
- if (inputRowMeta == null || prevTransforms.length > 1) {
- inputRowMeta = inputRowSet.getRowMeta();
- }
-
if (row != null) {
// OK, before we return the row, let's see if we need to check on mixing
// row compositions...
@@ -1690,6 +1711,54 @@ private Object[] handleGetRow() throws HopException {
return row;
}
+ /**
+ * The first non-null row we get we'll lock in the row metadata. For scenarios with multiple
+ * inputs, we move the metadata around (e.g. Merge Rows).
+ *
+ * @param row The input row (not null!)
+ * @param inputRowSet The row set we're reading from right now
+ */
+ private void obtainInputRowMeta(Object[] row, IRowSet inputRowSet) {
+ if (row == null) {
+ return;
+ }
+
+ // Set the row metadata on the first occurrence.
+ // If prevTransforms.length > 1, inputRowMeta can be changed as well.
+ //
+ if (inputRowMeta == null || prevTransforms.length > 1) {
+ inputRowMeta = inputRowSet.getRowMeta();
+ }
+
+ // Extra sanity check
+ //
+ if (inputRowMeta == null) {
+ int nr = 0;
+ for (IRowSet rowSet : inputRowSets) {
+ log.logMinimal(
+ "===> Input row set #"
+ + nr
+ + ", done? "
+ + rowSet.isDone()
+ + ", size="
+ + rowSet.size()
+ + ", metadata? "
+ + (rowSet.getRowMeta() != null));
+ nr++;
+ }
+ log.logMinimal("===> Current input row set nr=" + currentInputRowSetNr);
+
+ throw new RuntimeException(
+ "No row metadata obtained for row "
+ + Arrays.toString(row)
+ + Const.CR
+ + "inputRowSet.getRowMeta()="
+ + inputRowSet.getRowMeta()
+ + ", inputRowSets.size()="
+ + inputRowSets.size());
+ }
+ }
+
/**
* IRowHandler controls how getRow/putRow are handled. The default IRowHandler will simply call
* {@link #handleGetRow()} and {@link #handlePutRow(IRowMeta, Object[])}
@@ -1853,7 +1922,7 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
// Have all threads started?
// Are we running yet? If not, wait a bit until all threads have been
// started.
- if (this.checkPipelineRunning == false) {
+ if (!this.checkPipelineRunning) {
while (!pipeline.isRunning() && !stopped.get()) {
try {
Thread.sleep(1);
@@ -1869,11 +1938,23 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
// The buffer to grow beyond "a few" entries.
// We'll only do that if the previous transform has not ended...
- if (!rowSet.isDone() && rowSet.size() <= lowerBufferBoundary && !isStopped()) {
- try {
- Thread.sleep(0, 1);
- } catch (InterruptedException e) {
- // Ignore sleep interruption exception
+ if (!isStopped()) {
+ boolean streamReady = false;
+ // Check each other input stream to see that stream meets the threshold
+ for (int r = 0; r < inputRowSets.size(); r++) {
+ if (rowSet.isDone() || rowSet.size() > lowerBufferBoundary) {
+ streamReady = true;
+ break;
+ }
+ nextInputStream();
+ rowSet = currentInputStream();
+ }
+ if (!streamReady) {
+ try {
+ Thread.sleep(0, 1); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
+ } catch (InterruptedException e) {
+ // Ignore sleep interruption exception
+ }
}
}
@@ -1907,7 +1988,7 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
rowData = rowSet.getRow();
if (rowData == null) {
if (waitingTime == null) {
- waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr);
+ waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr, 20);
}
// Must release the read lock before acquisition of the write lock to prevent deadlocks.
//
@@ -2170,9 +2251,6 @@ public void dispatch() {
inputRowSetsLock.writeLock().lock();
outputRowSetsLock.writeLock().lock();
try {
- inputRowSets = new ArrayList<>();
- outputRowSets = new ArrayList<>();
-
errorRowSet = null;
prevTransforms = new TransformMeta[nrInput];
nextTransforms = new TransformMeta[nrOutput];
@@ -2723,15 +2801,15 @@ public synchronized void markStop() {
Calendar cal = Calendar.getInstance();
stopTime = cal.getTime();
+ // We're finally completely done with this transform.
+ //
+ setRunning(false);
+
// Here we are completely done with the pipeline.
// Call all the attached listeners and notify the outside world that the transform has
// finished.
//
fireTransformFinishedListeners();
-
- // We're finally completely done with this transform.
- //
- setRunning(false);
}
}
diff --git a/engine/src/main/java/org/apache/hop/pipeline/transform/DynamicWaitTimes.java b/engine/src/main/java/org/apache/hop/pipeline/transform/DynamicWaitTimes.java
index 5be9df6669e..09c09ee6a37 100644
--- a/engine/src/main/java/org/apache/hop/pipeline/transform/DynamicWaitTimes.java
+++ b/engine/src/main/java/org/apache/hop/pipeline/transform/DynamicWaitTimes.java
@@ -26,18 +26,22 @@
final class DynamicWaitTimes {
- static final long MAX_TIMEOUT = 1000;
-
- static SingleStreamStatus build(List rowSets, Supplier supplier) {
+ static SingleStreamStatus build(
+ List rowSets, Supplier supplier, Integer waitTime) {
if (rowSets.size() == 1) {
- return new SingleStreamStatus();
+ return new SingleStreamStatus(waitTime);
}
- return new MultiStreamStatus(new ArrayList<>(rowSets), supplier);
+ return new MultiStreamStatus(new ArrayList<>(rowSets), supplier, waitTime);
}
static class SingleStreamStatus {
protected boolean active = true;
private long interval = 1;
+ private final long waitTime;
+
+ SingleStreamStatus(Integer waitTime) {
+ this.waitTime = waitTime;
+ }
public long get() {
return interval;
@@ -50,12 +54,12 @@ public void reset() {
public void adjust(boolean timeout, IRowSet nextIfExist) {
if (allowAdjust() && timeout) {
- if (interval == MAX_TIMEOUT) {
+ if (interval == waitTime) {
active = false;
}
interval = interval * 2;
- if (interval > MAX_TIMEOUT) {
- interval = MAX_TIMEOUT;
+ if (interval > waitTime) {
+ interval = waitTime;
}
}
}
@@ -75,12 +79,13 @@ private static class MultiStreamStatus extends SingleStreamStatus {
private final List statusList;
private final Supplier supplier;
- MultiStreamStatus(List rowSets, Supplier supplier) {
+ MultiStreamStatus(List rowSets, Supplier supplier, Integer waitTime) {
+ super(waitTime);
this.streamList = rowSets;
this.supplier = supplier;
this.statusList = new ArrayList<>(rowSets.size());
for (int i = 0; i < rowSets.size(); i++) {
- statusList.add(new SingleStreamStatus());
+ statusList.add(new SingleStreamStatus(waitTime));
}
}
@@ -147,6 +152,7 @@ private boolean activeIfNeed() {
return statusList.stream().noneMatch(SingleStreamStatus::allowAdjust);
}
+ @Override
protected void doReset(int index) {
statusList.get(index).reset();
}