Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-6009 ScanKudu Processor #3611

Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 81 additions & 1 deletion nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/pom.xml
Expand Up @@ -23,6 +23,10 @@

<artifactId>nifi-kudu-processors</artifactId>
<packaging>jar</packaging>
<properties>
<exclude.tests>None</exclude.tests>
<kudu.version>1.10.0</kudu.version>
</properties>

<dependencies>
<dependency>
Expand All @@ -46,7 +50,13 @@
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>1.10.0</version>
<version>${kudu.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-test-utils</artifactId>
<version>${kudu.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
Expand Down Expand Up @@ -87,4 +97,74 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<extensions>
<!-- Used to find the right kudu-binary artifact with the Maven
property ${os.detected.classifier} -->
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.6.2</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>${exclude.tests}</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>kudu-windows</id>
<activation>
<os>
<family>Windows</family>
</os>
</activation>
<properties>
<!-- Kudu tests do not support Windows. -->
<exclude.tests>**/*.java</exclude.tests>
</properties>
</profile>
<profile>
<id>kudu-linux</id>
<activation>
<os>
<family>Unix</family>
</os>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>${kudu.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>kudu-mac</id>
<activation>
<os>
<family>mac</family>
</os>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-binary</artifactId>
<version>${kudu.version}</version>
<classifier>${os.detected.classifier}</classifier>
<scope>test</scope>
</dependency>
</dependencies>
</profile>
</profiles>
</project>
Expand Up @@ -17,6 +17,7 @@

package org.apache.nifi.processors.kudu;

import java.security.PrivilegedExceptionAction;
import org.apache.kudu.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
Expand All @@ -29,6 +30,7 @@
import org.apache.kudu.client.OperationResponse;
import org.apache.kudu.client.PartialRow;
import org.apache.kudu.client.RowError;
import org.apache.kudu.client.RowResult;
import org.apache.kudu.client.SessionConfiguration;
import org.apache.kudu.client.Delete;
import org.apache.kudu.client.Insert;
Expand All @@ -41,6 +43,8 @@
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.security.krb.KerberosKeytabUser;
Expand All @@ -50,6 +54,7 @@
import javax.security.auth.login.LoginException;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.List;

Expand Down Expand Up @@ -119,6 +124,25 @@ public void createKuduClient(ProcessContext context) throws LoginException {
this.kuduClient = kerberosAction.execute();
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final KerberosUser user = getKerberosUser();

if (user == null) {
trigger(context, session);
return;
}

final PrivilegedExceptionAction<Void> privelegedAction = () -> {
trigger(context, session);
return null;
};

final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
action.execute();
}

public abstract void trigger(ProcessContext context, ProcessSession session);

protected KuduClient buildClient(final String masters, final ProcessContext context) {
final Integer operationTimeout = context.getProperty(KUDU_OPERATION_TIMEOUT_MS).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
Expand Down Expand Up @@ -260,4 +284,72 @@ protected Update updateRecordToKudu(KuduTable kuduTable, Record record, List<Str
return update;
}

}

/**
* Serializes a row from Kudu to a JSON document of the form:
*
* {
* "rows": [
* {
* "columnname-1" : "value1",
* "columnname-2" : "value2",
* "columnname-3" : "value3",
* "columnname-4" : "value4",
* },
* {
* "columnname-1" : "value1",
* "columnname-2" : "value2",
* "columnname-3" : "value3",
* "columnname-4" : "value4",
* }
* ]
* }
*/
protected String convertToJson(RowResult row) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if JSON should be the format of choice here.
Avro seems to be a better option. If it needs to be displayed or processed as a JSON a ConvertRecord processor could help with that.

Also (even if it builds an Avro) it probably should not be in the AbstractKuduProcessor but int the ScanKuduResultHandler itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking for initial release JSON would be a better choice, as it gives users better transparency and easy for debugging. if required user can use ConvertJSONToAvro.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One of the issue is that JSON is a very low-performance format. With this implementation even if one wants to transform the data into Avro for example, all the serialization-desrialization need to happen for all JSON documents.

Also the JSON generation is basically duplicated here (in a simple way). Most processors that are generating data build Record objects and delegate its serialization to a RecordWriter controller service.
I think following that pattern would be most preferable in this case as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method has some issues that needs to be fixed:

  1. INT8 should be retrieved via getByte
  2. INT16 should be retrieved via getShort
  3. DOUBLE is not handled
  4. BINARY now returns a toString of a ByteBuffer

Here are my suggested fixes, although how BINARY should be handled may be up for debate:

    protected String convertToJson(RowResult row) {
        final StringBuilder jsonBuilder = new StringBuilder();
        jsonBuilder.append("{\"rows\":[{");
        Iterator<ColumnSchema> columns = row.getSchema().getColumns().iterator();
        while (columns.hasNext()) {
            ColumnSchema col = columns.next();
            jsonBuilder.append("\"" + col.getName() + "\":");
            switch (col.getType()) {
                case STRING:
                    jsonBuilder.append("\"" + row.getString(col.getName()) + "\"");
                    break;
                case INT8:
                    jsonBuilder.append("\"" + row.getByte(col.getName()) + "\"");
                    break;
                case INT16:
                    jsonBuilder.append("\"" + row.getShort(col.getName()) + "\"");
                    break;
                case INT32:
                    jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
                    break;
                case INT64:
                    jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
                    break;
                case BOOL:
                    jsonBuilder.append("\"" + row.getBoolean(col.getName()) + "\"");
                    break;
                case DECIMAL:
                    jsonBuilder.append("\"" + row.getDecimal(col.getName()) + "\"");
                    break;
                case FLOAT:
                    jsonBuilder.append("\"" + row.getFloat(col.getName()) + "\"");
                    break;
                case DOUBLE:
                    jsonBuilder.append("\"" + row.getDouble(col.getName()) + "\"");
                    break;
                case UNIXTIME_MICROS:
                    jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
                    break;
                case BINARY:
                    jsonBuilder.append("\"0x" + Hex.encodeHexString(row.getBinaryCopy(col.getName())) + "\"");
                    break;
                default:
                    break;
            }
            if(columns.hasNext())
                jsonBuilder.append(",");
        }
        jsonBuilder.append("}]}");
        return jsonBuilder.toString();
    }

final StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{\"rows\":[{");
Iterator<ColumnSchema> columns = row.getSchema().getColumns().iterator();
while (columns.hasNext()) {
ColumnSchema col = columns.next();
jsonBuilder.append("\"" + col.getName() + "\":");
switch (col.getType()) {
case STRING:
jsonBuilder.append("\"" + row.getString(col.getName()) + "\"");
break;
case INT8:
jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
break;
case INT16:
jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
break;
case INT32:
jsonBuilder.append("\"" + row.getInt(col.getName()) + "\"");
break;
case INT64:
jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
break;
case BOOL:
jsonBuilder.append("\"" + row.getBoolean(col.getName()) + "\"");
break;
case DECIMAL:
jsonBuilder.append("\"" + row.getDecimal(col.getName()) + "\"");
break;
case FLOAT:
jsonBuilder.append("\"" + row.getFloat(col.getName()) + "\"");
break;
case UNIXTIME_MICROS:
jsonBuilder.append("\"" + row.getLong(col.getName()) + "\"");
break;
case BINARY:
jsonBuilder.append("\"" + row.getBinary(col.getName()) + "\"");
break;
default:
break;
}
if(columns.hasNext())
jsonBuilder.append(",");
}
jsonBuilder.append("}]}");
return jsonBuilder.toString();
}
}
Expand Up @@ -41,7 +41,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.krb.KerberosAction;
import org.apache.nifi.processors.kudu.io.OperationType;
import org.apache.nifi.security.krb.KerberosUser;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
Expand All @@ -51,7 +51,6 @@
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -209,30 +208,11 @@ public void onScheduled(final ProcessContext context) throws IOException, LoginE
createKuduClient(context);
}

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
public void trigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final List<FlowFile> flowFiles = session.get(ffbatch);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't

        if (flowFiles.isEmpty()) {	
            return;	
        }

stay here in PutKudu?

if (flowFiles.isEmpty()) {
return;
}
kerberosUser = getKerberosUser();

final KerberosUser user = kerberosUser;
if (user == null) {
trigger(context, session, flowFiles);
return;
}

final PrivilegedExceptionAction<Void> privelegedAction = () -> {
trigger(context, session, flowFiles);
return null;
};

final KerberosAction<Void> action = new KerberosAction<>(user, privelegedAction, getLogger());
action.execute();
}

private void trigger(final ProcessContext context, final ProcessSession session, final List<FlowFile> flowFiles) throws ProcessException {
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);

final KuduClient kuduClient = getKuduClient();
Expand Down