Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6009 ScanKudu Processor #3611

Conversation

SandishKumarHN
Copy link
Contributor

@SandishKumarHN SandishKumarHN commented Jul 29, 2019

This PR has a dependency on NIFI-6009 Kudu Put Operations. first, this NIFI-6009 Kudu Put Operations requires to merge.

This PR includes Kudu Scan Processor support

Thank you for submitting a contribution to Apache NiFi.

Please provide a short description of the PR here:

Description of PR

Enables X functionality; fixes bug NIFI-YYYY.

In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:

For all changes:

  • Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?

  • Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.

  • Has your PR been rebased against the latest commit within the target branch (typically master)?

  • Is your initial contribution a single, squashed commit? Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not squash or use --force when pushing to allow for clean monitoring of changes.

For code changes:

  • Have you ensured that the full suite of tests is executed via mvn -Pcontrib-check clean install at the root nifi folder?
  • Have you written or updated unit tests to verify your changes?
  • If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • If applicable, have you updated the LICENSE file, including the main LICENSE file under nifi-assembly?
  • If applicable, have you updated the NOTICE file, including the main NOTICE file found under nifi-assembly?
  • If adding new Properties, have you added .displayName in addition to .name (programmatic access) for each of the new properties?

For documentation related changes:

  • Have you ensured that format looks appropriate for the output in which it is rendered?

Note:

Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.

@SandishKumarHN
Copy link
Contributor Author

@granthanke made changes suggestion on the previous one

@pvillard31
Copy link
Contributor

@SandishKumarHN, now #3610 is merged, you can rebase this one against master

Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have a look at the unit tests / Travis build? I believe you removed Kudu masters property in the test runner which is required.

@SandishKumarHN
Copy link
Contributor Author

Can you have a look at the unit tests / Travis build? I believe you removed Kudu masters property in the test runner which is required.

@pvillard31 Thanks for pointing out.

static final Pattern COLUMNS_PATTERN = Pattern.compile("\\w+((\\w)+)?(?:,\\w+((\\w)+)?)*");

protected static final PropertyDescriptor TABLE_NAME = new PropertyDescriptor.Builder()
.name("Table Name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use both .name() and .displayName()? with a more "programmatic"-friendly name for .name() (liketable-name). It'd help when using tools like the CLI to automate some CI/CD tasks. (same applies to all properties)

"Depending on your memory size, and data size per row set an appropriate batch size. " +
"Gradually increase this number to find out the best one for best performances.")
.defaultValue("500")
.required(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd change this to true

Suggested change
.required(false)
.required(true)


protected static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("A FlowFile is routed to this relationship if it cannot be sent to Kudu")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the description is not accurate. Data is not sent to Kudu, right?

final String tableName = context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
if (StringUtils.isBlank(tableName)) {
getLogger().error("Table Name is blank or null for {}, transferring to failure", new Object[] {flowFile});
session.transfer(session.penalize(flowFile), REL_FAILURE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it makes to penalize the flow file in this case. If the table name is blank or null, it won't change over time and it'd not make sense to have a self-loop failure relationship on the processor, no?

return;
}

batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you evaluate this property after L201? Right now, it seems like whatever value is set by the user, the first execution of the processor would use 500 (the default) as the batch size, no?

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class TestScanKudu {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some tests to confirm the batch size behavior?

@SandishKumarHN
Copy link
Contributor Author

@pvillard31 made changes based on your suggestions

@SandishKumarHN
Copy link
Contributor Author

@pvillard31 did you had a chance to go through the latest changes?

return;
}

final PrivilegedExceptionAction<Void> privelegedAction = () -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: privelegedAction probably should be privilegedAction


@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final KerberosUser user = getKerberosUser();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole logic probably could go into AbstractKuduProcessor

return;
}

final List<String> projctedColumnNames = Arrays.asList(context.getProperty(PROJECTED_COLUMNS).evaluateAttributeExpressions(flowFile).getValue().split(","));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: projctedColumnNames probably should be projectedColumnNames

}

public void trigger(ProcessContext context, ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we sure this processor should require input flowfiles?

try {
this.kuduTable = getKuduClient().openTable(tableName);
} catch (Exception e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we handle this case differently?

String predicate = context.getProperty(PREDICATES).evaluateAttributeExpressions(flowFile).getValue();
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();

final AtomicReference<Long> rowsPulledHolder = new AtomicReference<>(0L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be an AtomicLong.

scannerBuilder.addPredicate(predicate);
}

protected void scan(ProcessContext context, ProcessSession session, KuduTable kuduTable, String predicates, List<String> projectedColumnNames, ResultHandler handler) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this in the abstract processor?

@@ -260,4 +266,110 @@ protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<Str
return update;
}

private void addPredicate(KuduScanner.KuduScannerBuilder scannerBuilder, KuduTable kuduTable, String column, Object value, String comparisonOp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this in the abstract processor?

for(String column : arrayPredicates){
if (column.contains("=")) {
final String[] parts = column.split("=");
addPredicate(scannerBuilder, kuduTable, parts[0], parts[1], "EQUAL");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use KuduPredicate.ComparisonOp.EQUAL instead?

* ]
* }
*/
protected String convertToJson(RowResult row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if JSON should be the format of choice here.
Avro seems to be a better option. If it needs to be displayed or processed as a JSON a ConvertRecord processor could help with that.

Also (even if it builds an Avro) it probably should not be in the AbstractKuduProcessor but int the ScanKuduResultHandler itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking for initial release JSON would be a better choice, as it gives users better transparency and easy for debugging. if required user can use ConvertJSONToAvro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the issue is that JSON is a very low-performance format. With this implementation even if one wants to transform the data into Avro for example, all the serialization-desrialization need to happen for all JSON documents.

Also the JSON generation is basically duplicated here (in a simple way). Most processors that are generating data build Record objects and delegate its serialization to a RecordWriter controller service.
I think following that pattern would be most preferable in this case as well.

@SandishKumarHN
Copy link
Contributor Author

@tpalfy Thank you for the review, made changes based on your suggestions and left couple of comments.

}

String predicate = context.getProperty(PREDICATES).evaluateAttributeExpressions(flowFile).getValue();
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked thoroughly and haven't seen a single case where a field is set this way.

This could lead to a potential race condition if the process is set to run multiple tasks at once. I think this is not necessary, the batchSize could be a local variable.

* ]
* }
*/
protected String convertToJson(RowResult row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the issue is that JSON is a very low-performance format. With this implementation even if one wants to transform the data into Avro for example, all the serialization-desrialization need to happen for all JSON documents.

Also the JSON generation is basically duplicated here (in a simple way). Most processors that are generating data build Record objects and delegate its serialization to a RecordWriter controller service.
I think following that pattern would be most preferable in this case as well.

@@ -209,30 +196,8 @@ public void onScheduled(final ProcessContext context) throws IOException, LoginE
createKuduClient(context);
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
public void trigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(ffbatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't

        if (flowFiles.isEmpty()) {	
            return;	
        }

stay here in PutKudu?

String predicate = context.getProperty(PREDICATES).evaluateAttributeExpressions(flowFile).getValue();
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions(flowFile).asInteger();

final AtomicReference<AtomicLong> rowsPulledHolder = new AtomicReference<AtomicLong>(new AtomicLong(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant AtomLong instead of AtomicReference, not within.

this.rowsPulledHolder = rowsPulledHolder;
this.ffCountHolder = ffCountHolder;
this.tableName = tableName;
this.bulkSize = bulkSize == null ? 0 : bulkSize;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bit misleading to call it bulkSize here while batchSize in the processor.
It makes one wonder if this 2 are different concepts or not.


static final PropertyDescriptor PREDICATES = new PropertyDescriptor.Builder()
.name("Predicates")
.description("A comma-separated list of Predicates, format: \"<colName>:<value>\" ")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit confusing. It might help users if the description was more clear about the supported predicate operations (<,>,<=,>=,=) and that : should be substituted to one of those operations.


protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to process in a single execution, between 1 - 100000. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the description here is misleading. It's not about the number of flow files to process but how many row results should be processed for one outgoing flowfiles.

}

@Test
public void testKuduPredicatesValidation() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could add some tests to cover multiple (comma-separated) predicates.

}

@Test
public void testInvalidPredicatesPattern() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the same implementation as testInvalidKuduTableName, also the predicates look valid.
Probably the table name and predicates property values should be changed.

}

public void addResult(Map<String, String> rows) {
rows.entrySet().forEach(kv -> rowResultList.add(createRowResult(kv.getKey(), kv.getValue())));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be simplified to rows.forEach((key, value) -> rowResultList.add(createRowResult(key, value)));

@SandishKumarHN
Copy link
Contributor Author

SandishKumarHN commented Sep 24, 2019

@tpalfy instead of Outing as Avro Output, I was thinking of outputting as NifiRecors. Is that Okay?

@SandishKumarHN
Copy link
Contributor Author

@tpalfy made all suggested changes except the output format and added kudu test harness support. let me know if Nifi Record is better or we should go with Avro.

Copy link
Contributor

@tpalfy tpalfy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN I added a couple suggestions and also want to highlight one here:
The ScanKudu currently requires flowfiles to work. I don't think that should be the case. These kinds of processors usually bring in flowfiles themselves and generally run scheduled instead of being driven by incoming flowfiles.

As for the JSON/Record serialization. I wasn't aware but I think I understand now that the idea of JSON output came from ScanHBase. I think this one was a rework of the older GetHBase which itself is a fairly old processor created before the introduction of the org.apache.nifi.serialization.RecordWriter interface.

If I understand correctly, the newer approach usually is to have a RecordSetWriterFactory defined through a property, similar to this example:

    public static final PropertyDescriptor RECORD_WRITER_FACTORY = new PropertyDescriptor.Builder()
            .name("qdbtr-record-writer")
            .displayName("Record Writer")
            .description("Specifies the Controller Service to use for writing results to a FlowFile. The Record Writer may use Inherit Schema to emulate the inferred schema behavior, i.e. "
                    + "an explicit schema need not be defined in the writer, and will be supplied by the same logic used to infer the schema from the column types.")
            .identifiesControllerService(RecordSetWriterFactory.class)
            .required(true)
            .build();

and use that to write org.apache.nifi.serialization.record.RecordSet objects (which is basically very similar to what you suggested - I presume - by Nifi Records).
However, this requires a schema. Schema can be inferred but that needs to be handled.

If that is not too much of a hassle, I'd suggest go that route. If it is, as an initial implementation, using JSON serialization could be fine after all.


public class MockScanKudu extends AbstractMockKuduProcessor {

private LinkedList<Insert> insertQueue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 3 fields are not used.
The linesBeforeException is set by a test it doesn't have an actual effect.


@Test
public void testKuduScanWhenKuduScanThrowsExceptionAfterLineN() throws KuduException {
kuduScan.setLinesBeforeException(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call to setLinesBeforeException has no real effect.
Exception will still be thrown because the predicate "key2=val1" references key2 which doesn't exist.

@SandishKumarHN
Copy link
Contributor Author

SandishKumarHN commented Sep 25, 2019

@tpalfy Yes, I did referenced ScanHbase. Let's keep the JSON output format for now. and made changes based on your suggests.

}
}

projectedColumnNames = Arrays.asList(context.getProperty(PROJECTED_COLUMNS).evaluateAttributeExpressions(fileToProcess).getValue().split(","));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

projectedColumnNames has become a field, which brings the same concerns that we discussed with batchSize.
The value can differ from flowfile to flowfile and is not thread-safe. Why can't it just stay as a local variable?

Integer batchSize = Integer.valueOf(context.getProperty(BATCH_SIZE).getValue());
final AtomicLong rowsPulledHolder = new AtomicLong(0);
final AtomicLong ffCountHolder = new AtomicLong(0);
ScanKuduResultHandler handler = new ScanKuduResultHandler(session, fileToProcess, rowsPulledHolder, ffCountHolder, tableName, batchSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be simplified. The AtomicLongs don't need to be visible here as they are not used outside of the ScanKuduResultHandler.
But later we can see that they are not even needed at all (see bellow)!

So this part could be simply:

        Integer batchSize = Integer.valueOf(context.getProperty(BATCH_SIZE).getValue());
        ScanKuduResultHandler handler = new ScanKuduResultHandler(session, fileToProcess, tableName, batchSize);

/**
* Result Handler for Scan operation
*/
private class ScanKuduResultHandler implements ResultHandler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After carefully looking into this it became apparent that the AtomicLong rowsPulledHolder and ffCountHolder properties are not handled in a thread-safe manner. They are not - and cannot be - used concurrently.

Still the implementation is not incorrect if we take into account this constraint. However some simplification can be made as further checks reveal that the ffCountHolder property is not even used.
Also a unit test verifying the correctness of creating new flowfile when the batchsize limit has been reached would be definitely welcome.

The whole ScanKuduResultHandler could look like this:

    private class ScanKuduResultHandler implements ResultHandler {

        final private ProcessSession session;
        final private FlowFile origFF;
        final private String tableName;
        final private Integer batchSize;
        private FlowFile flowFile = null;
        final private byte[] JSON_ARRAY_DELIM = ",\n".getBytes();

        private AtomicLong rowsPulledHolder = new AtomicLong(0);

        private boolean handledAny = false;

        ScanKuduResultHandler(final ProcessSession session,
                              final FlowFile origFF,
                              final String tableName, final Integer batchSize){
            this.session = session;
            this.tableName = tableName;
            this.batchSize = batchSize == null ? 0 : batchSize;
            this.origFF = origFF;

        }

        @Override
        public void handle(final RowResult resultCells) {

            long rowsPulled = rowsPulledHolder.get();

            try{
                if (flowFile == null){
                    flowFile = initNewFlowFile(session, origFF, tableName);
                }

                flowFile = session.append(flowFile, (out) -> {
                    if (rowsPulledHolder.get() > 0){
                        out.write(JSON_ARRAY_DELIM);
                    }
                    final String json = convertToJson(resultCells);
                    out.write(json.getBytes());
                });
                handledAny = true;

            } catch (Exception e) {
                throw new RuntimeException(e);
            }

            rowsPulled++;

            // bulkSize controls number of records per flow file.
            if (batchSize > 0 && rowsPulled >= batchSize) {

                finalizeFlowFile(session, flowFile, tableName, rowsPulled, null);
                flowFile = null;
                rowsPulledHolder.set(0);
                // we could potentially have a huge number of rows. If we get to batchSize, go ahead and commit the
                // session so that we can avoid buffering tons of FlowFiles without ever sending any out.
                session.commit();
            } else {
                rowsPulledHolder.set(rowsPulled);
            }
        }

        public boolean isHandledAny(){
            return handledAny;
        }

        @Override
        public FlowFile getFlowFile(){
            return flowFile;
        }

        public long getRecordsCount(){
            return rowsPulledHolder.get();
        }

    }


protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of RowResults to process in a single execution, between 1 - 100000. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is still misleading a bit as it's not the number or results in a single execution but the number of results in an output flowfile. Multiple flowfiles can be created in a single execution - which is in fact a good thing to indicate at least here.


@Test
public void testInvalidKuduTableName() {
runner.setProperty(ScanKudu.TABLE_NAME, "${table1}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still basically the same as testInvalidPredicatesPattern. Both fail because the table name is invalid (resolved to empty), and actually both have invalid predicates as well (as both reference a non-existing column in a non-existing table).
Both tests are missing the table initialization.

@SandishKumarHN
Copy link
Contributor Author

@tpalfy Thank you so much for the valuable suggestions,
Added test for batchSize and made ScanKuduResultHandler, ProjectedColumns changes based on your suggestion.

Copy link
Contributor

@tpalfy tpalfy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN I think it's in good shape, just added some suggestion to clear up some documentation and tests.

}

@Test
public void testInvalidPredicatesPattern() throws KuduException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make clear up any potential confusion I'd rename this method and add a new one (as this doesn't test the pattern of the predicate but the semantics of it - namely in this case it will fail because the referenced column1 doesn't exist).

So the 2 tests could look something like this:

@Test
    public void testInvalidPredicateReferencingNonExistingColumn() throws KuduException {
        final Map<String, String> rows = new HashMap<>();
        rows.put("key", "val1");
        rows.put("key1", "val1");
        
        String patternReferencingNonExistingColumn = "column1=val1";

        kuduScan.insertTestRecordsToKuduTable(DEFAULT_TABLE_NAME, rows);

        runner.setProperty(ScanKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
        runner.setProperty(ScanKudu.PREDICATES, patternReferencingNonExistingColumn);
        runner.setProperty(ScanKudu.PROJECTED_COLUMNS, "column1");
        runner.setValidateExpressionUsage(false);
        runner.enqueue("trigger flow file");
        runner.run(1, false);

        runner.assertTransferCount(ScanKudu.REL_FAILURE, 1);
        runner.assertTransferCount(ScanKudu.REL_SUCCESS, 0);
        runner.assertTransferCount(ScanKudu.REL_ORIGINAL, 0);
    }

    @Test
    public void testInvalidPredicatesPattern() throws KuduException {
        final Map<String, String> rows = new HashMap<>();
        rows.put("key", "val1");
        rows.put("key1", "val1");
        
        String invalidPattern = "key1==val1";

        kuduScan.insertTestRecordsToKuduTable(DEFAULT_TABLE_NAME, rows);

        runner.setProperty(ScanKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
        runner.setProperty(ScanKudu.PREDICATES, invalidPattern);
        runner.setProperty(ScanKudu.PROJECTED_COLUMNS, "column1");
        runner.setValidateExpressionUsage(false);
        runner.enqueue("trigger flow file");
        runner.assertNotValid();

        runner.assertTransferCount(ScanKudu.REL_FAILURE, 0);
        runner.assertTransferCount(ScanKudu.REL_SUCCESS, 0);
        runner.assertTransferCount(ScanKudu.REL_ORIGINAL, 0);
    }


protected static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
.name("Batch Size")
.description("The maximum number of FlowFiles to generate in a single execution, between 1 - 100000. " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this description still doesn't capture properly the behaviour. I'd change this something like this:
"The maximum number of rows to generate per output flowfiles, between 1 - 100000. "

@SandishKumarHN
Copy link
Contributor Author

@tpalfy Thanks for quick review and suggestions. made changes as per your suggestions.

Copy link
Contributor

@tpalfy tpalfy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN I did some testing and found some bugs. Added some suggestions about other things as well.

runner.assertTransferCount(ScanKudu.REL_ORIGINAL, 0);
}

@Test(expected = AssertionError.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think java.lang.AssertionError should not be expected. It basically means "I expect this test to fail."
I know there are other tests that expect it in the NiFi codebase and that the AssertionError comes from the underlying NiFi testing framework. But I don't think this practice is justified, at least not here - and generally results in an unreliable test because the underlying logic can have whatever kind of errors or bugs that results an AssertionError, not just the case we are trying to cover.

If java.lang.AssertionError is thrown, it should mean either the test or the production code has errors.

In this case there is error in the test code in my opinion:
If the pattern is invalid, the processor itself will be in the invalid state and won't be able to run. If you still try to run it (via runner.run) the framework will force the test to fail.

So the test should not try to run a processor that cannot run but assert that it registers the validation errors.
Hence in my original suggestion: runner.assertNotValid(); instead of runner.run(1, false);

flowFile = session.putAttribute(flowFile, "scankudu.error", (e==null?e:ioe.get()).toString());
rel = REL_FAILURE;
} else {
session.getProvenanceReporter().receive(flowFile, tableName, "{ids}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The third String parameter should be used to pass a flowfile UUID (coming from another NiFi instance for example).
In this case it probably should be omitted entirely, like this:
session.getProvenanceReporter().receive(flowFile, tableName);

}
}

projectedColumnNames = Arrays.asList(context.getProperty(PROJECTED_COLUMNS).getValue().split(","));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PROJECTED_COLUMNS is not required but this will throw a NullPointerException if it is not set.
I'd suggest changing this to something like this:

        projectedColumnNames = Optional.ofNullable(context.getProperty(PROJECTED_COLUMNS))
                .map(PropertyValue::getValue)
                .map(value -> value.split(","))
                .map(Arrays::asList)
                .orElseGet(() -> Collections.emptyList());

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class TestScanKudu {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current tests only work with String columns so I wrote one that covers (mostly) all the currently supported types.
Sure enough, there are some small bugs in handling some of the other types.

Here's the unit test (will comment the bugs in the corresponding class):

    @Test
    public void testScanKuduWithMultipleTypes() throws KuduException {
        List<ColumnSchema> columns = Arrays.asList(
                new ColumnSchema.ColumnSchemaBuilder("key", Type.INT8).key(true).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.INT16.getName(), Type.INT16).key(true).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.INT32.getName(), Type.INT32).key(true).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.INT64.getName(), Type.INT64).key(true).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.BINARY.getName(), Type.BINARY).key(false).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.STRING.getName(), Type.STRING).key(false).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.BOOL.getName(), Type.BOOL).key(false).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.FLOAT.getName(), Type.FLOAT).key(false).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.DOUBLE.getName(), Type.DOUBLE).key(false).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.UNIXTIME_MICROS.getName(), Type.UNIXTIME_MICROS).key(false).build(),
                new ColumnSchema.ColumnSchemaBuilder(Type.DECIMAL.getName(), Type.DECIMAL).typeAttributes(
                        new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
                                .precision(20)
                                .scale(4)
                                .build()
                ).key(false).build()
        );

        Instant now = Instant.now();

        KuduTable kuduTable = kuduScan.getKuduTable(DEFAULT_TABLE_NAME, columns);
        Insert insert = kuduTable.newInsert();
        PartialRow rows = insert.getRow();
        rows.addByte("key", (byte) 1);
        rows.addShort(Type.INT16.getName(), (short)20);
        rows.addInt(Type.INT32.getName(), 300);
        rows.addLong(Type.INT64.getName(), 4000L);
        rows.addBinary(Type.BINARY.getName(), new byte[]{55, 89});
        rows.addString(Type.STRING.getName(), "stringValue");
        rows.addBoolean(Type.BOOL.getName(), true);
        rows.addFloat(Type.FLOAT.getName(), 1.5F);
        rows.addDouble(Type.DOUBLE.getName(), 10.28);
        rows.addTimestamp(Type.UNIXTIME_MICROS.getName(), Timestamp.from(now));
        rows.addDecimal(Type.DECIMAL.getName(), new BigDecimal("3.1415"));

        KuduSession kuduSession = kuduScan.kuduClient.newSession();
        kuduSession.apply(insert);
        kuduSession.close();

        runner.setProperty(ScanKudu.TABLE_NAME, DEFAULT_TABLE_NAME);

        runner.setIncomingConnection(false);
        runner.enqueue();
        runner.run(1, false);

        runner.assertTransferCount(ScanKudu.REL_FAILURE, 0);
        runner.assertTransferCount(ScanKudu.REL_SUCCESS, 1);
        runner.assertTransferCount(ScanKudu.REL_ORIGINAL, 1);

        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanKudu.REL_SUCCESS).get(0);
        Object timestamp = ChronoUnit.MICROS.between(Instant.EPOCH, now);
        flowFile.assertContentEquals("[{\"rows\":[{\"key\":\"1\",\"int16\":\"20\",\"int32\":\"300\",\"int64\":\"4000\",\"binary\":\"0x3759\",\"string\":\"stringValue\",\"bool\":\"true\",\"float\":\"1.5\",\"double\":\"10.28\",\"unixtime_micros\":\"" + timestamp + "\",\"decimal\":\"3.1415\"}]}]");
    }

* }
*/
protected String convertToJson(RowResult row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has some issues that needs to be fixed:

  1. INT8 should be retrieved via getByte
  2. INT16 should be retrieved via getShort
  3. DOUBLE is not handled
  4. BINARY now returns a toString of a ByteBuffer

Here are my suggested fixes, although how BINARY should be handled may be up for debate:

    protected String convertToJson(RowResult row) {
        final StringBuilder jsonBuilder = new StringBuilder();
        jsonBuilder.append("{\"rows\":[{");
        Iterator<ColumnSchema> columns = row.getSchema().getColumns().iterator();
        while (columns.hasNext()) {
            ColumnSchema col = columns.next();
            jsonBuilder.append("\"" + col.getName() + "\":");
            switch (col.getType()) {
                case STRING:
                    jsonBuilder.append("\"" + row.getString(col.getName()) + "\"");
                    break;
                case INT8:
                    jsonBuilder.append("\"" + row.getByte(col.getName()) + "\"");
                    break;
                case INT16:
                    jsonBuilder.append("\"" + row.getShort(col.getName()) + "\"");
                    break;
                case INT32:
                    jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
                    break;
                case INT64:
                    jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
                    break;
                case BOOL:
                    jsonBuilder.append("\"" + row.getBoolean(col.getName()) + "\"");
                    break;
                case DECIMAL:
                    jsonBuilder.append("\"" + row.getDecimal(col.getName()) + "\"");
                    break;
                case FLOAT:
                    jsonBuilder.append("\"" + row.getFloat(col.getName()) + "\"");
                    break;
                case DOUBLE:
                    jsonBuilder.append("\"" + row.getDouble(col.getName()) + "\"");
                    break;
                case UNIXTIME_MICROS:
                    jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
                    break;
                case BINARY:
                    jsonBuilder.append("\"0x" + Hex.encodeHexString(row.getBinaryCopy(col.getName())) + "\"");
                    break;
                default:
                    break;
            }
            if(columns.hasNext())
                jsonBuilder.append(",");
        }
        jsonBuilder.append("}]}");
        return jsonBuilder.toString();
    }

@SandishKumarHN
Copy link
Contributor Author

@tpalfy thank you so much for thorough testing and adding more tests this. really appreciate. made changes as per your code snippets.

Copy link
Contributor

@tpalfy tpalfy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN I found some bugs again, this time in the handling of the predicates.
Added some suggestions.
I think it might be a good idea to run a coverage and see if there are other parts that are not covered.


private void addPredicate(KuduScanner.KuduScannerBuilder scannerBuilder, KuduTable kuduTable, String column, Object value, KuduPredicate.ComparisonOp comparisonOp) {
ColumnSchema columnSchema = kuduTable.getSchema().getColumn(column);
KuduPredicate predicate = KuduPredicate.newComparisonPredicate(columnSchema, comparisonOp, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KuduPredicate.newComparisonPredicate will throw an exception if the column contains non-string values (because the value parameter is actually a String).

The value needs to be parsed based on the column type.

This method should be changed to something like this:

    private void addPredicate(KuduScanner.KuduScannerBuilder scannerBuilder, KuduTable kuduTable, String column, String value, KuduPredicate.ComparisonOp comparisonOp) {
        ColumnSchema columnSchema = kuduTable.getSchema().getColumn(column);
        Object parsedValue = parseValue(value, columnSchema);
        KuduPredicate predicate = KuduPredicate.newComparisonPredicate(columnSchema, comparisonOp, parsedValue);
        scannerBuilder.addPredicate(predicate);
    }

And the parseValue method should be created and probably added to AbstractKuduProcessor.
It would look something like this, I added some implementations but not all, left TODOs:

    protected Object parseValue(String value, ColumnSchema columnSchema) {
        Object  parsedValue;

        Type type = columnSchema.getType();

        switch (type) {
            case STRING:
                parsedValue = value;
                break;
            case INT8:
                // TODO
                parsedValue = value;
                break;
            case INT16:
                // TODO
                parsedValue = value;
                break;
            case INT32:
                parsedValue = Integer.valueOf(value);
                break;
            case INT64:
                parsedValue = Long.valueOf(value);
                break;
            case BOOL:
                parsedValue = Boolean.valueOf(value);
                break;
            case DECIMAL:
                // TODO
                parsedValue = value;
                break;
            case FLOAT:
                parsedValue = Float.valueOf(value);
                break;
            case DOUBLE:
                // TODO
                parsedValue = value;
                break;
            case UNIXTIME_MICROS:
                // TODO
                parsedValue = value;
                break;
            case BINARY:
                // TODO
                parsedValue = value;
                break;
            default:
                // TODO Need to handle this
                throw new IllegalArgumentException("Couldn't parse '" + value + "' as '" + type + "'");
        }

        return parsedValue;
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ "Could be null (not present) if transfered to FAILURE")})
public class ScanKudu extends AbstractKuduProcessor {

static final Pattern PREDICATES_PATTERN = Pattern.compile("\\w+((<=|>=|[=<>])(\\w|-)+)?(?:,\\w+((<=|>=|[=<>])(\\w|-)+)?)*");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This predicate doesn't allow float values. Or timestamps.
There probably should be unit tests to make sure this pattern works as intended.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tpalfy kudu does not support timestamp yet. timestamps currently are epoch times, which are long

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SandishKumarHN The Kudu api does support timestamps it's just that it stores the value as a number.

In any case, this is a user experience issue. However Kudu works, it would be perfectly possible for the user to provide a timestamp in any kind of user-friendly predefined format and we could parse it and turn it into a number if that's how the underlying infrastructure handles it.

Getting it done properly would not be simple though. (Need to take care of formatting, timezones and possibly rounding as Kudu stores microseconds.)
So I guess it's fine to leave it out for now and add it later if needed.

final String[] arrayPredicates = (predicates == null || predicates.isEmpty() ? new String[0] : predicates.split(","));

for(String column : arrayPredicates){
if (column.contains("=")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ordering of these if-else statements should be changed.
column.contains("=") will match for example "column1<=10" in which case after splitting along = the column name will be considered to be "column1<" which will lead to an exception.

This way I think it would work:

            if(column.contains(">=")) {
                final String[] parts = column.split(">=");
                addPredicate(scannerBuilder, kuduTable, parts[0], parts[1], KuduPredicate.ComparisonOp.GREATER_EQUAL);
            } else if(column.contains("<=")) {
                final String[] parts = column.split("<=");
                addPredicate(scannerBuilder, kuduTable, parts[0], parts[1], KuduPredicate.ComparisonOp.LESS_EQUAL);
            } else if (column.contains("=")) {
                final String[] parts = column.split("=");
                addPredicate(scannerBuilder, kuduTable, parts[0], parts[1], KuduPredicate.ComparisonOp.EQUAL);
            } else if(column.contains(">")) {
                final String[] parts = column.split(">");
                addPredicate(scannerBuilder, kuduTable, parts[0], parts[1], KuduPredicate.ComparisonOp.GREATER);
            } else if(column.contains("<")) {
                final String[] parts = column.split("<");
                addPredicate(scannerBuilder, kuduTable, parts[0], parts[1], KuduPredicate.ComparisonOp.LESS);
            }

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class TestScanKudu {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering how the predicate handling had bugs we should add some tests that cover them - not just wether their syntax is valid, but also wether they work as intended.

I created 3 but please cover the other types and predicates as well:

    @Test
    public void testPredicateEqualsFalse() throws Exception {
        List<Boolean> values = Arrays.asList(false, true, false);

        String predicate = "value=false";

        // TODO add proper value
        String expectedContent = "";

        testPredicate(predicate, expectedContent, Type.BOOL, (row, value) -> row.addBoolean("value", value), values);
    }

    @Test
    public void testPredicateGreaterThanInteger() throws Exception {
        List<Integer> values = Arrays.asList(10, 15, 10);

        String predicate = "value>10";

        // TODO This works now, but should be changed, see below
        String expectedContent = "[{\"rows\":[{\"key\":\"2\",\"value\":\"15\"}]}]";

        testPredicate(predicate, expectedContent, Type.INT32, (row, value) -> row.addInt("value", value), values);
    }

    @Test
    public void testPredicateLowerThanOrEqualsFloat() throws Exception {
        List<Float> values = Arrays.asList(3.231F, 3.232F, 3.233F);

        // TODO this test fails because the .232 part of the float doesn't fit the PREDICATES_PATTERN.
        String predicate = "value<=3.232";

        // TODO The "rows" shouldn't be there at the start of each row.
        //  In fact probably shouldn't be there at all as it just makes it harder to chain into other processors.
        String expectedContent = "[{\"rows\":[{\"key\":\"1\",\"value\":\"3.231\"}]},\n" +
                "{\"rows\":[{\"key\":\"2\",\"value\":\"3.232\"}]}]";

        testPredicate(predicate, expectedContent, Type.FLOAT, (row, value) -> row.addFloat("value", value), values);
    }

    private <V> void testPredicate(String predicate, String expectedContent, Type valueType, BiConsumer<PartialRow, V> valueSetter, List<V> values) throws Exception {
        List<ColumnSchema> columns = Arrays.asList(
                new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
                new ColumnSchema.ColumnSchemaBuilder("value", valueType).key(false).build()
        );

        KuduTable kuduTable = kuduScan.getKuduTable(DEFAULT_TABLE_NAME, columns);
        KuduSession kuduSession = kuduScan.kuduClient.newSession();

        values.forEach(value -> {
            Insert insert = kuduTable.newInsert();
            PartialRow row = insert.getRow();

            valueSetter.accept(row, value);

            try {
                kuduSession.apply(insert);
            } catch (KuduException e) {
                throw new RuntimeException(e);
            }
        });

        runner.setProperty(ScanKudu.TABLE_NAME, DEFAULT_TABLE_NAME);
        runner.setProperty(ScanKudu.PREDICATES, predicate);

        runner.setIncomingConnection(false);
        runner.enqueue();
        runner.run(1, false);

        runner.assertTransferCount(ScanKudu.REL_FAILURE, 0);
        runner.assertTransferCount(ScanKudu.REL_SUCCESS, 1);
        runner.assertTransferCount(ScanKudu.REL_ORIGINAL, 1);

        final MockFlowFile flowFile = runner.getFlowFilesForRelationship(ScanKudu.REL_SUCCESS).get(0);
        flowFile.assertContentEquals(expectedContent);
    }

break;
case BINARY:
jsonBuilder.append("\"0x" + Hex.encodeHexString(row.getBinaryCopy(col.getName())) + "\"");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the binary format through again and I think for consistency's sake (see below) and for simplicity we should omit the "0x" at the beginning after all.

So it should be just jsonBuilder.append("\"" + Hex.encodeHexString(row.getBinaryCopy(col.getName())) + "\"");

(Need to change 0x3759 to 3759 in TestScanKudu.testScanKuduWithMultipleTypes as well.)

switch (type) {
case STRING:
if (value.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isEmpty() check should probably be before the switch statement.
Also the message is a bit confusing, it should mention the column name instead of the type (as type is not really relevant). Could be String.format("No value provided for %s", columnSchema.getName() or similar.

break;
case BINARY:
parsedValue = Byte.valueOf(value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return a byte array, not a byte.
Not sure in what format the user should be able to provide a value here but to be consistent with the JSON output, imo we should expect a hexadecimal.

So the implementation could be:

                try {
                    parsedValue = Hex.decodeHex(value);
                } catch (DecoderException e) {
                    throw new RuntimeException(e);
                }

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

public class TestScanKudu {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The binary predicate handling still had a bug.
Can we please really cover all the cases?

Here are some more:

    @Test
    public void testPredicateLowerOrEqualsByte() throws Exception {
        List<Byte> values = Arrays.asList((byte)11, (byte)9, (byte)10, (byte)8);

        String predicate = "value<=10";
        String expectedContent = "[{\"key\":\"2\",\"value\":\"9\"},\n{\"key\":\"3\",\"value\":\"10\"},\n{\"key\":\"4\",\"value\":\"8\"}]";

        testPredicate(predicate, expectedContent, Type.INT8, (row, value) -> row.addByte("value", value), values);
    }

    @Test
    public void testPredicateLowerThanShort() throws Exception {
        List<Short> values = Arrays.asList((short)11, (short)9, (short)10, (short)-8);

        String predicate = "value<10";
        String expectedContent = "[{\"key\":\"2\",\"value\":\"9\"},\n{\"key\":\"4\",\"value\":\"-8\"}]";

        testPredicate(predicate, expectedContent, Type.INT16, (row, value) -> row.addShort("value", value), values);
    }

    @Test
    public void testPredicateEqualsLong() throws Exception {
        List<Long> values = Arrays.asList(11L, 9L, 10L, 8L);

        String predicate = "value=10";
        String expectedContent = "[{\"key\":\"3\",\"value\":\"10\"}]";

        testPredicate(predicate, expectedContent, Type.INT64, (row, value) -> row.addLong("value", value), values);
    }

    @Test
    public void testPredicatesEqualsBinary() throws Exception {
        List<byte[]> values = Arrays.asList(
                new byte[] {127, 10, 28},
                new byte[] {-128, 10, 28},
                new byte[] {0, 10, 28}
        );

        String predicate = "value=7f0a1c";
        String expectedContent = "[{\"key\":\"1\",\"value\":\"7f0a1c\"}]";

        testPredicate(predicate, expectedContent, Type.BINARY, (row, value) -> row.addBinary("value", value), values);
    }

    @Test
    public void testPredicateGreaterOrEqualsBigDecimal() throws Exception {
        List<BigDecimal> values = Arrays.asList(
                new BigDecimal("15643691156541351512356.1264865416"),
                new BigDecimal("69879841523159494156164.3198491561316"),
                new BigDecimal("99165198489191894987456.61561616")
        );

        String predicate = "value>=69879841523159494156164.3198491561316";
        String expectedContent = "[{\"key\":\"2\",\"value\":\"69879841523159494156164.3198491561316\"},\n" +
                "{\"key\":\"3\",\"value\":\"99165198489191894987456.6156161600000\"}]";

        testPredicate(
                predicate,
                expectedContent,
                new ColumnSchema.ColumnSchemaBuilder("value", Type.DECIMAL).key(false).typeAttributes(
                        new ColumnTypeAttributes.ColumnTypeAttributesBuilder()
                                .precision(38)
                                .scale(13)
                                .build()
                ).build(),
                (row, value) -> row.addDecimal("value", value),
                values
        );
    }
...
    private <V> void testPredicate(String predicate, String expectedContent, Type valueType,
                                   BiConsumer<PartialRow, V> valueSetter, List<V> values) throws Exception {
        testPredicate(
                predicate,
                expectedContent,
                new ColumnSchema.ColumnSchemaBuilder("value", valueType).key(false).build(),
                valueSetter,
                values
        );
    }

    private <V> void testPredicate(String predicate, String expectedContent,
                                   ColumnSchema valueColumnSchema, BiConsumer<PartialRow, V> valueSetter, List<V> values) throws Exception {
        List<ColumnSchema> columns = Arrays.asList(
                new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
                valueColumnSchema
        );
...

@SandishKumarHN
Copy link
Contributor Author

@tpalfy I couldn't think of more tests for Predicates and Kudu Types. Please comment if you see we missed it.
and one of the CI failed with a timeout after the build success.

Copy link
Contributor

@tpalfy tpalfy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1
I think it's in a good shape.
The TestPutKudu is fairly slow though (half a minute or so), maybe it should be turned into an integrations test. But then again, that's the only unit test currently.

@SandishKumarHN
Copy link
Contributor Author

@tpalfy @pvillard31 can we please merge this ? and this https://github.com/apache/nifi/pull/. we are waiting on these two to place at customer place.

@joewitt
Copy link
Contributor

joewitt commented Mar 25, 2021

closing due to inactivity

@joewitt joewitt closed this Mar 25, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants