Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions integration-tests/transforms/datasets/golden-yaml-input.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
organisation,acronym,project,javaFileCount,awesome,used-apache-projects,statistics
The Apache Software Foundation,The ASF,Apache Hop,4447,Y,"[Spark, Flink, Beam, Commons, VFS]
","{number-of-libs: 1159,number-of-messages-bundles: 13025,integration-test-projects: {count: 39,pipelines: 1032,workflows: 498}}"
The Apache Software Foundation,The ASF,Apache Hop,4447,Y,"[Spark, Flink, Beam, Commons, VFS]","{number-of-libs: 1159,number-of-messages-bundles: 13025,integration-test-projects: {count: 39,pipelines: 1032,workflows: 498}}"
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ public YamlInput(
}

private void handleMissingFiles() throws HopException {
List<FileObject> nonExistantFiles = data.files.getNonExistentFiles();
if (!nonExistantFiles.isEmpty()) {
String message = FileInputList.getRequiredFilesDescription(nonExistantFiles);
List<FileObject> nonExistentFiles = data.files.getNonExistentFiles();
if (!nonExistentFiles.isEmpty()) {
String message = FileInputList.getRequiredFilesDescription(nonExistentFiles);
logError(
BaseMessages.getString(PKG, "YamlInput.Log.RequiredFilesTitle"),
BaseMessages.getString(PKG, "YamlInput.Log.RequiredFiles", message));
Expand All @@ -78,9 +78,10 @@ private void handleMissingFiles() throws HopException {

private boolean readNextString() {
try {
data.readrow = getRow(); // Grab another row ...
// Grab another row ...
data.readRow = getRow();

if (data.readrow == null) {
if (data.readRow == null) {
// finished processing!
if (isDetailed()) {
logDetailed(BaseMessages.getString(PKG, "YamlInput.Log.FinishedProcessing"));
Expand Down Expand Up @@ -116,30 +117,28 @@ private boolean readNextString() {
}

// get field value
String fieldvalue = getInputRowMeta().getString(data.readrow, data.indexOfYamlField);

String fieldValue = getInputRowMeta().getString(data.readRow, data.indexOfYamlField);
getLinesInput();

if (isDetailed()) {
logDetailed(
BaseMessages.getString(
PKG, "YamlInput.Log.YAMLStream", meta.getYamlField(), fieldvalue));
PKG, "YamlInput.Log.YAMLStream", meta.getYamlField(), fieldValue));
}

if (meta.isSourceFile()) {

// source is a file.

data.yaml = new YamlReader();
data.yaml.loadFile(HopVfs.getFileObject(fieldvalue, variables));
data.yaml.loadFile(HopVfs.getFileObject(fieldValue, variables));
dataVolumeIn =
(dataVolumeIn != null ? dataVolumeIn : 0L) + data.yaml.getBytesReadFromFile();

addFileToResultFilesname(data.yaml.getFile());

addFileToResultFilesName(data.yaml.getFile());
} else {
data.yaml = new YamlReader();
data.yaml.loadString(fieldvalue);
data.yaml.loadString(fieldValue);
}
} catch (Exception e) {
logError(BaseMessages.getString(PKG, "YamlInput.Log.UnexpectedError", e.toString()));
Expand All @@ -151,7 +150,7 @@ private boolean readNextString() {
return false;
}

private void addFileToResultFilesname(FileObject file) {
private void addFileToResultFilesName(FileObject file) {
if (meta.isAddingResultFile()) {
// Add this to the result file names...
ResultFile resultFile =
Expand All @@ -164,18 +163,18 @@ private void addFileToResultFilesname(FileObject file) {

private boolean openNextFile() {
try {
if (data.filenr >= data.files.nrOfFiles()) {
if (data.fileIndex >= data.files.nrOfFiles()) {
// finished processing!
if (isDetailed()) {
logDetailed(BaseMessages.getString(PKG, "YamlInput.Log.FinishedProcessing"));
}
return false;
}
// Get file to process from list
data.file = data.files.getFile(data.filenr);
data.file = data.files.getFile(data.fileIndex);

// Move file pointer ahead!
data.filenr++;
data.fileIndex++;

if (meta.isIgnoringEmptyFile() && data.file.getContent().getSize() == 0) {
if (isBasic()) {
Expand All @@ -198,7 +197,7 @@ private boolean openNextFile() {
dataVolumeIn =
(dataVolumeIn != null ? dataVolumeIn : 0L) + data.yaml.getBytesReadFromFile();

addFileToResultFilesname(data.file);
addFileToResultFilesName(data.file);

if (isDetailed()) {
logDetailed(
Expand All @@ -210,7 +209,7 @@ private boolean openNextFile() {
BaseMessages.getString(
PKG,
"YamlInput.Log.UnableToOpenFile",
"" + data.filenr,
"" + data.fileIndex,
data.file.toString(),
e.toString()));
stopAll();
Expand Down Expand Up @@ -243,24 +242,25 @@ public boolean processRow() throws HopException {
}
// Grab a row
Object[] r = getOneRow();

if (r == null) {
setOutputDone(); // signal end to receiver(s)
return false; // end of data or error.
if (Utils.isEmpty(r)) {
// signal end to receiver(s)
setOutputDone();
// end of data or error.
return false;
}

if (isRowLevel()) {
logRowlevel(
BaseMessages.getString(PKG, "YamlInput.Log.ReadRow", data.outputRowMeta.getString(r)));
}

incrementLinesOutput();

data.rownr++;
data.rowIndex++;
Object[] rowCopy = data.outputRowMeta.cloneRow(r);
putRow(data.outputRowMeta, rowCopy); // copy row to output rowset(s)
// copy row to output rowset(s)
putRow(data.outputRowMeta, rowCopy);

if (meta.getRowLimit() > 0 && data.rownr > meta.getRowLimit()) {
if (meta.getRowLimit() > 0 && data.rowIndex > meta.getRowLimit()) {
// limit has been reached: stop now.
setOutputDone();
return false;
Expand All @@ -269,71 +269,52 @@ public boolean processRow() throws HopException {
}

private Object[] getOneRow() throws HopException {
Object[] row = null;
boolean rowAvailable = false;
boolean fileOpened = false;
if (!meta.isInFields()) {
while (data.file == null || !fileOpened && !rowAvailable) {
if (data.file != null) {
// We have opened a file
// read one row
row = getRowData();

if (row == null) {
// No row extracted
// let's see for the next file
if (!openNextFile()) {
return null;
}
fileOpened = true;
} else {
// We had extracted one row
rowAvailable = true;
}
} else {
// First time we get there
// we have to open a new file
if (!openNextFile()) {
return null;
}
fileOpened = true;
}
return getOneRowFromFileMode();
}

return getOneRowFromFieldMode();
}

private Object[] getOneRowFromFileMode() throws HopException {
while (true) {
if (data.file == null && !openNextFile()) {
return new Object[0];
}
} else {
while (data.readrow == null || (data.readrow != null && !fileOpened && !rowAvailable)) {
if (data.readrow != null) {
// We have red the incoming Yaml value
// let's get one row
row = getRowData();
if (row == null) {
// No row.. reader next row
if (readNextString()) {
return null;
}
fileOpened = true;
} else {
// We have returned one row
rowAvailable = true;
}
} else {
// First time we get there
// We have to parse incoming Yaml value
if (readNextString()) {
return null;
}
fileOpened = true;
}
if (data.readrow == null) {
return null;
}

if (data.file == null) {
continue;
}

Object[] row = getRowData();
if (row != null && row.length > 0) {
return row;
}
}

if (!rowAvailable) {
row = getRowData();
if (!openNextFile()) {
return new Object[0];
}
}
}

private Object[] getOneRowFromFieldMode() throws HopException {
while (true) {
if (data.readRow == null) {
if (readNextString()) {
return new Object[0];
}
continue;
}

Object[] row = getRowData();
if (row != null && row.length > 0) {
return row;
}

return row;
if (readNextString()) {
return new Object[0];
}
}
}

private Object[] getRowData() throws HopException {
Expand All @@ -343,13 +324,13 @@ private Object[] getRowData() throws HopException {
try {
// Create new row...
outputRowData = data.yaml.getRow(data.rowMeta);
if (outputRowData == null) {
return null;
if (outputRowData == null || outputRowData.length == 0) {
return new Object[0];
}

if (data.readrow != null) {
if (data.readRow != null) {
outputRowData =
RowDataUtil.addRowData(data.readrow, data.totalPreviousFields, outputRowData);
RowDataUtil.addRowData(data.readRow, data.totalPreviousFields, outputRowData);
} else {
outputRowData = RowDataUtil.resizeArray(outputRowData, data.totalOutStreamFields);
}
Expand All @@ -362,9 +343,8 @@ private Object[] getRowData() throws HopException {
}
// See if we need to add the row number to the row...
if (meta.isIncludeRowNumber() && !Utils.isEmpty(meta.getRowNumberField())) {
outputRowData[rowIndex++] = data.rownr;
outputRowData[rowIndex] = data.rowIndex;
}

} catch (Exception e) {
boolean sendToErrorRow = false;
String errorMessage = null;
Expand All @@ -390,9 +370,8 @@ private Object[] getRowData() throws HopException {

@Override
public boolean init() {

if (super.init()) {
data.rownr = 1L;
data.rowIndex = 1L;
data.nrInputFields = meta.getInputFields().size();

data.rowMeta = new RowMeta();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@
import org.apache.hop.core.fileinput.FileInputList;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.pipeline.transform.BaseTransformData;
import org.apache.hop.pipeline.transform.ITransformData;

@SuppressWarnings("java:S1104")
public class YamlInputData extends BaseTransformData implements ITransformData {
public class YamlInputData extends BaseTransformData {
public IRowMeta outputRowMeta;

public int nrInputFields;
public Object[] readrow;
public Object[] readRow;
public int totalPreviousFields;
public int totalOutFields;
public int totalOutStreamFields;
Expand All @@ -37,9 +36,9 @@ public class YamlInputData extends BaseTransformData implements ITransformData {
public FileInputList files;

public FileObject file;
public int filenr;
public int fileIndex;

public long rownr;
public long rowIndex;
public int indexOfYamlField;

public YamlReader yaml;
Expand All @@ -49,10 +48,10 @@ public class YamlInputData extends BaseTransformData implements ITransformData {
public YamlInputData() {
super();

this.filenr = 0;
this.fileIndex = 0;
this.indexOfYamlField = -1;
this.nrInputFields = -1;
this.readrow = null;
this.readRow = null;
this.totalPreviousFields = 0;
this.file = null;
this.totalOutFields = 0;
Expand Down
Loading
Loading