Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion assemblies/static/src/main/resources/hop-gui.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ ORIGINDIR=$(pwd)
BASEDIR=$(dirname $0)
cd $BASEDIR

# Compliance Engine customization to add MASKING HOME lib dir to classpath
ADDITIONAL_CLASSPATH=":lib"

# set java primary is HOP_JAVA_HOME fallback to JAVA_HOME or default java
if [ -n "$HOP_JAVA_HOME" ]; then
_HOP_JAVA=$HOP_JAVA_HOME/bin/java
Expand Down Expand Up @@ -81,7 +84,7 @@ Darwin)
;;
esac

"$_HOP_JAVA" ${HOP_OPTIONS} -Djava.library.path=$LIBPATH -classpath "${CLASSPATH}" org.apache.hop.ui.hopgui.HopGui $@
"$_HOP_JAVA" ${HOP_OPTIONS} -Djava.library.path=$LIBPATH -classpath "${CLASSPATH}${ADDITIONAL_CLASSPATH}" org.apache.hop.ui.hopgui.HopGui $@
EXITCODE=$?

cd ${ORIGINDIR}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,15 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
/** the rowset for the error rows */
private IRowSet errorRowSet;

private AtomicBoolean running;
private final AtomicBoolean running;

private AtomicBoolean stopped;
private final AtomicBoolean stopped;

protected AtomicBoolean safeStopped;

private AtomicBoolean paused;

private boolean init;
private final boolean init;

/** the copy number of this thread */
private int copyNr;
Expand Down Expand Up @@ -282,13 +282,13 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
* The upper buffer size boundary after which we manage the thread priority a little bit to
* prevent excessive locking
*/
private int upperBufferBoundary;
private final int upperBufferBoundary;

/**
* The lower buffer size boundary after which we manage the thread priority a little bit to
* prevent excessive locking
*/
private int lowerBufferBoundary;
private final int lowerBufferBoundary;

/** maximum number of errors to allow */
private Long maxErrors = -1L;
Expand Down Expand Up @@ -316,7 +316,7 @@ public class BaseTransform<Meta extends ITransformMeta, Data extends ITransformD
*/
private IRowHandler rowHandler;

private AtomicBoolean markStopped;
private final AtomicBoolean markStopped;

/**
* This is the base transform that forms that basis for all transforms. You can derive from this
Expand Down Expand Up @@ -381,8 +381,8 @@ public BaseTransform(
linesOutput = 0L;
}

inputRowSets = null;
outputRowSets = null;
inputRowSets = new ArrayList<>();
outputRowSets = new ArrayList<>();
nextTransforms = null;

terminator = transformMeta.hasTerminator();
Expand Down Expand Up @@ -418,8 +418,15 @@ public BaseTransform(

dispatch();

upperBufferBoundary = (int) (pipeline.getRowSetSize() * 0.99);
lowerBufferBoundary = (int) (pipeline.getRowSetSize() * 0.01);
if (pipeline != null) {
upperBufferBoundary = (int) (pipeline.getRowSetSize() * 0.99);
lowerBufferBoundary = (int) (pipeline.getRowSetSize() * 0.01);
} else {
upperBufferBoundary = 100;
lowerBufferBoundary = 10;
}

setInternalVariables();
}

@Override
Expand All @@ -441,7 +448,7 @@ public boolean init() {
.getPartitionSchema()
.calculatePartitionIds(this);

if (partitionIdList.size() > 0) {
if (!partitionIdList.isEmpty()) {
String partitionId = partitionIdList.get(partitionNr);
setVariable(Const.INTERNAL_VARIABLE_TRANSFORM_PARTITION_ID, partitionId);
} else {
Expand Down Expand Up @@ -1023,13 +1030,19 @@ private void handlePutRow(IRowMeta rowMeta, Object[] row) throws HopTransformExc
// Are we running yet? If not, wait a bit until all threads have been
// started.
//
if (this.checkPipelineRunning == false) {
if (!this.checkPipelineRunning) {
int counter = 0;
while (!pipeline.isRunning() && !stopped.get()) {
try {
Thread.sleep(1);
Thread.sleep(1000);
counter++;
} catch (InterruptedException e) {
// Ignore
}
// wait 3s max
if (counter >= 3) {
break;
}
}
this.checkPipelineRunning = true;
}
Expand Down Expand Up @@ -1100,7 +1113,7 @@ private void specialPartitioning(IRowMeta rowMeta, Object[] row) throws HopTrans
// This is the case for non-clustered partitioning...
//
List<TransformMeta> nextTransforms = pipelineMeta.findNextTransforms(transformMeta);
if (nextTransforms.size() > 0) {
if (!nextTransforms.isEmpty()) {
nextTransformPartitioningMeta = nextTransforms.get(0).getTransformPartitioningMeta();
}

Expand Down Expand Up @@ -1287,11 +1300,9 @@ public void handlePutRowTo(IRowMeta rowMeta, Object[] row, IRowSet rowSet)
}
}

// call all row listeners...
//
for (IRowListener listener : rowListeners) {
listener.rowWrittenEvent(rowMeta, row);
}
// Do not call the row listeners for targeted rows.
// It can cause rows with varying layouts to arrive at the same listener without a way to keep
// them apart.

// Keep adding to terminator_rows buffer...
if (terminator && terminatorRows != null) {
Expand Down Expand Up @@ -1479,7 +1490,7 @@ protected void waitUntilPipelineIsStarted() {
// Are we running yet? If not, wait a bit until all threads have been
// started.
//
if (this.checkPipelineRunning == false) {
if (!this.checkPipelineRunning) {
while (!pipeline.isRunning() && !stopped.get()) {
try {
Thread.sleep(1);
Expand Down Expand Up @@ -1556,6 +1567,7 @@ private Object[] handleGetRow() throws HopException {
row = inputRowSet.getRowImmediate();
}
if (row != null) {
obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
Expand All @@ -1567,11 +1579,23 @@ private Object[] handleGetRow() throws HopException {
// The buffer to grow beyond "a few" entries.
// We'll only do that if the previous transform has not ended...

if (!inputRowSet.isDone() && inputRowSet.size() <= lowerBufferBoundary && !isStopped()) {
try {
Thread.sleep(0, 1);
} catch (InterruptedException e) {
// Ignore sleep interruption exception
if (!isStopped() && row == null) {
boolean streamReady = false;
// Check each other input stream to see that stream meets the threshold
for (int r = 0; r < inputRowSets.size(); r++) {
if (inputRowSet.isDone() || inputRowSet.size() > lowerBufferBoundary) {
streamReady = true;
break;
}
nextInputStream();
inputRowSet = currentInputStream();
}
if (!streamReady) {
try {
Thread.sleep(0, 1); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
} catch (InterruptedException e) {
// Ignore sleep interruption exception
}
}
}

Expand All @@ -1588,7 +1612,7 @@ private Object[] handleGetRow() throws HopException {
// We can use timeouts to switch from one to another...
//
if (waitingTime == null) {
waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr);
waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr, 20);
}
while (row == null && !isStopped()) {
// Get a row from the input in row set ...
Expand All @@ -1598,6 +1622,7 @@ private Object[] handleGetRow() throws HopException {
row = inputRowSet.getRowWait(waitingTime.get(), TimeUnit.MILLISECONDS);
boolean timeout = false;
if (row != null) {
obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
blockPointer++;
waitingTime.reset();
Expand Down Expand Up @@ -1632,6 +1657,7 @@ private Object[] handleGetRow() throws HopException {
inputRowSetsLock.writeLock().unlock();
}
} else {
obtainInputRowMeta(row, inputRowSet);
incrementLinesRead();
}
} else {
Expand Down Expand Up @@ -1660,17 +1686,12 @@ private Object[] handleGetRow() throws HopException {
nextInputStream();
inputRowSet = currentInputStream();
row = getRowFrom(inputRowSet);
obtainInputRowMeta(row, inputRowSet);
}
} finally {
inputRowSetsLock.readLock().unlock();
}

// Also set the meta data on the first occurrence.
// or if prevTransforms.length > 1 inputRowMeta can be changed
if (inputRowMeta == null || prevTransforms.length > 1) {
inputRowMeta = inputRowSet.getRowMeta();
}

if (row != null) {
// OK, before we return the row, let's see if we need to check on mixing
// row compositions...
Expand All @@ -1690,6 +1711,54 @@ private Object[] handleGetRow() throws HopException {
return row;
}

/**
* The first non-null row we get we'll lock in the row metadata. For scenarios with multiple
* inputs, we move the metadata around (e.g. Merge Rows).
*
* @param row The input row (not null!)
* @param inputRowSet The row set we're reading from right now
*/
private void obtainInputRowMeta(Object[] row, IRowSet inputRowSet) {
if (row == null) {
return;
}

// Set the row metadata on the first occurrence.
// If prevTransforms.length > 1, inputRowMeta can be changed as well.
//
if (inputRowMeta == null || prevTransforms.length > 1) {
inputRowMeta = inputRowSet.getRowMeta();
}

// Extra sanity check
//
if (inputRowMeta == null) {
int nr = 0;
for (IRowSet rowSet : inputRowSets) {
log.logMinimal(
"===> Input row set #"
+ nr
+ ", done? "
+ rowSet.isDone()
+ ", size="
+ rowSet.size()
+ ", metadata? "
+ (rowSet.getRowMeta() != null));
nr++;
}
log.logMinimal("===> Current input row set nr=" + currentInputRowSetNr);

throw new RuntimeException(
"No row metadata obtained for row "
+ Arrays.toString(row)
+ Const.CR
+ "inputRowSet.getRowMeta()="
+ inputRowSet.getRowMeta()
+ ", inputRowSets.size()="
+ inputRowSets.size());
}
}

/**
* IRowHandler controls how getRow/putRow are handled. The default IRowHandler will simply call
* {@link #handleGetRow()} and {@link #handlePutRow(IRowMeta, Object[])}
Expand Down Expand Up @@ -1853,7 +1922,7 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
// Have all threads started?
// Are we running yet? If not, wait a bit until all threads have been
// started.
if (this.checkPipelineRunning == false) {
if (!this.checkPipelineRunning) {
while (!pipeline.isRunning() && !stopped.get()) {
try {
Thread.sleep(1);
Expand All @@ -1869,11 +1938,23 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
// The buffer to grow beyond "a few" entries.
// We'll only do that if the previous transform has not ended...

if (!rowSet.isDone() && rowSet.size() <= lowerBufferBoundary && !isStopped()) {
try {
Thread.sleep(0, 1);
} catch (InterruptedException e) {
// Ignore sleep interruption exception
if (!isStopped()) {
boolean streamReady = false;
// Check each other input stream to see that stream meets the threshold
for (int r = 0; r < inputRowSets.size(); r++) {
if (rowSet.isDone() || rowSet.size() > lowerBufferBoundary) {
streamReady = true;
break;
}
nextInputStream();
rowSet = currentInputStream();
}
if (!streamReady) {
try {
Thread.sleep(0, 1); // Minimum sleeps vary by OS scheduler, this could be 1ms or more
} catch (InterruptedException e) {
// Ignore sleep interruption exception
}
}
}

Expand Down Expand Up @@ -1907,7 +1988,7 @@ public Object[] handleGetRowFrom(IRowSet rowSet) throws HopTransformException {
rowData = rowSet.getRow();
if (rowData == null) {
if (waitingTime == null) {
waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr);
waitingTime = DynamicWaitTimes.build(inputRowSets, this::getCurrentInputRowSetNr, 20);
}
// Must release the read lock before acquisition of the write lock to prevent deadlocks.
//
Expand Down Expand Up @@ -2170,9 +2251,6 @@ public void dispatch() {
inputRowSetsLock.writeLock().lock();
outputRowSetsLock.writeLock().lock();
try {
inputRowSets = new ArrayList<>();
outputRowSets = new ArrayList<>();

errorRowSet = null;
prevTransforms = new TransformMeta[nrInput];
nextTransforms = new TransformMeta[nrOutput];
Expand Down Expand Up @@ -2723,15 +2801,15 @@ public synchronized void markStop() {
Calendar cal = Calendar.getInstance();
stopTime = cal.getTime();

// We're finally completely done with this transform.
//
setRunning(false);

// Here we are completely done with the pipeline.
// Call all the attached listeners and notify the outside world that the transform has
// finished.
//
fireTransformFinishedListeners();

// We're finally completely done with this transform.
//
setRunning(false);
}
}

Expand Down
Loading