Skip to content

Commit

Permalink
KuduScannaer for loop instead of while with iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
SandishKumarHN committed Aug 1, 2019
1 parent a967964 commit d8a9d1a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 67 deletions.
Expand Up @@ -297,8 +297,8 @@ protected void scan(ProcessContext context, ProcessSession session, KuduTable ku
}

KuduScanner scanner = scannerBuilder.build();
while (scanner.hasMoreRows()) {
handler.handle(scanner.nextRows());
for (RowResult rowResult: scanner) {
handler.handle(rowResult);
}
}

Expand All @@ -322,67 +322,51 @@ protected void scan(ProcessContext context, ProcessSession session, KuduTable ku
* ]
* }
*/
protected String convertToJson(Iterator<RowResult> rows) {
protected String convertToJson(RowResult row) {
final StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");

jsonBuilder.append("\"rows\":[");
while (rows.hasNext()) {
RowResult result = rows.next();
jsonBuilder.append("{");

Iterator<ColumnSchema> columns = result.getSchema().getColumns().iterator();
while (columns.hasNext()) {
ColumnSchema col = columns.next();
jsonBuilder.append("\"" + col.getName() + "\":");
switch (col.getType()) {
case STRING:
jsonBuilder.append("\"" + result.getString(col.getName()) + "\"");
break;
case INT8:
jsonBuilder.append("\"" + result.getInt(col.getName()) + "\"");
break;
case INT16:
jsonBuilder.append("\"" + result.getInt(col.getName()) + "\"");
break;
case INT32:
jsonBuilder.append("\"" + result.getInt(col.getName()) + "\"");
break;
case INT64:
jsonBuilder.append("\"" + result.getLong(col.getName()) + "\"");
break;
case BOOL:
jsonBuilder.append("\"" + result.getBoolean(col.getName()) + "\"");
break;
case DECIMAL:
jsonBuilder.append("\"" + result.getDecimal(col.getName()) + "\"");
break;
case FLOAT:
jsonBuilder.append("\"" + result.getFloat(col.getName()) + "\"");
break;
case UNIXTIME_MICROS:
jsonBuilder.append("\"" + result.getLong(col.getName()) + "\"");
break;
case BINARY:
jsonBuilder.append("\"" + result.getBinary(col.getName()) + "\"");
break;
default:
break;
}
if(columns.hasNext())
jsonBuilder.append(",");
}

jsonBuilder.append("}");
if (rows.hasNext()) {
jsonBuilder.append(", ");
jsonBuilder.append("{\"rows\":[{");
Iterator<ColumnSchema> columns = row.getSchema().getColumns().iterator();
while (columns.hasNext()) {
ColumnSchema col = columns.next();
jsonBuilder.append("\"" + col.getName() + "\":");
switch (col.getType()) {
case STRING:
jsonBuilder.append("\"" + row.getString(col.getName()) + "\"");
break;
case INT8:
jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
break;
case INT16:
jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
break;
case INT32:
jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
break;
case INT64:
jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
break;
case BOOL:
jsonBuilder.append("\"" + row.getBoolean(col.getName()) + "\"");
break;
case DECIMAL:
jsonBuilder.append("\"" + row.getDecimal(col.getName()) + "\"");
break;
case FLOAT:
jsonBuilder.append("\"" + row.getFloat(col.getName()) + "\"");
break;
case UNIXTIME_MICROS:
jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
break;
case BINARY:
jsonBuilder.append("\"" + row.getBinary(col.getName()) + "\"");
break;
default:
break;
}
if(columns.hasNext())
jsonBuilder.append(",");
}
// end row array
jsonBuilder.append("]");

// end overall document
jsonBuilder.append("}");
jsonBuilder.append("}]}");
return jsonBuilder.toString();
}
}
Expand Up @@ -46,7 +46,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -262,7 +261,7 @@ private class ScanKuduResultHandler implements ResultHandler {
}

@Override
public void handle(final Iterator<RowResult> resultCells) {
public void handle(final RowResult resultCells) {

long rowsPulled = rowsPulledHolder.get();
long ffUncommittedCount = ffCountHolder.get();
Expand Down
Expand Up @@ -20,13 +20,11 @@
import org.apache.kudu.client.RowResult;
import org.apache.nifi.flowfile.FlowFile;

import java.util.Iterator;

/**
* Handles a single row from an Kudu scan.
*/
public interface ResultHandler {

void handle(Iterator<RowResult> resultCells);
void handle(RowResult resultCells);
FlowFile getFlowFile();
}
Expand Up @@ -110,6 +110,7 @@ public void scan(ProcessContext context, ProcessSession session, KuduTable kuduT
if (predicates == null || predicates.isEmpty() || !predicates.contains("=")) {
while (it.hasNext()) {
matchedRows.add(it.next());
handler.handle(it.next());
}
} else {
if (predicates.contains("=")) {
Expand All @@ -122,11 +123,11 @@ public void scan(ProcessContext context, ProcessSession session, KuduTable kuduT
RowResult result = it.next();
if (parts[1].equals(result.getString(String.valueOf(parts[0])))) {
matchedRows.add(result);
handler.handle(result);
}
}
}
}
handler.handle(matchedRows.iterator());
}
numScans++;
}
Expand Down
Expand Up @@ -184,13 +184,14 @@ public void testScanKuduProcessorJsonOutput() throws InitializationException {
public void testKuduScanToContentWithStringValues() throws InitializationException {
final Map<String, String> rows = new HashMap<>();
rows.put("column1", "val1");
rows.put("column1", "val1");
rows.put("column2", "val2");

kuduScan.addResult(rows);

runner.setProperty(ScanKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
runner.setProperty(ScanKudu.PREDICATES, "column1=val1");
runner.setProperty(ScanKudu.PROJECTED_COLUMNS, "column1");
runner.setProperty(ScanKudu.PROJECTED_COLUMNS, "column1,column2");

runner.setValidateExpressionUsage(false);
runner.setIncomingConnection(false);
Expand Down

0 comments on commit d8a9d1a

Please sign in to comment.