Skip to content

NIFI-10234 Implement PutIoTDB#6265

Closed
xuanronaldo wants to merge 20 commits intoapache:mainfrom
lizhizhou:implement_put_iotdb
Closed

NIFI-10234 Implement PutIoTDB#6265
xuanronaldo wants to merge 20 commits intoapache:mainfrom
lizhizhou:implement_put_iotdb

Conversation

@xuanronaldo
Copy link
Contributor

@xuanronaldo xuanronaldo commented Aug 2, 2022

PutIoTDB

Properties of PutIoTDB

property description default value necessary
Host The host of IoTDB. null true
Port The port of IoTDB. 6667 true
Username Username to access the IoTDB. null true
Password Password to access the IoTDB. null true
Record Reader Specifies the type of Record Reader controller service to use
for parsing the incoming data and determining the schema.
null true
Schema The schema that IoTDB needs doesn't support good by NiFi.
Therefore, you can define the schema here.
Besides, you can set encoding type and compression type by this method.
If you don't set this property, the inferred schema will be used.
It can be updated by expression language.
null false
Aligned Whether using aligned interface? It can be updated by expression language. false false
MaxRowNumber Specifies the max row number of each tablet. It can be updated by expression language. 1024 false

Inferred Schema of Flowfile

There are a couple of rules about flowfile:

  1. The flowfile can be read by Record Reader.
  2. The schema of flowfile must contains a field Time, and it must be the first.
  3. The data type of time must be STRING or LONG.
  4. Fields excepted time must start with root..
  5. The supported data types are INT, LONG, FLOAT, DOUBLE, BOOLEAN, TEXT.

Convert Schema by property

As mentioned above, converting schema by property which is more flexible and stronger than inferred schema.

The structure of property Schema:

{
	"timeType": "long",
	"field": [{
		"tsName": "root.sg.d1.s1",
		"dataType": "INT32",
		"encoding": "RLE",
		"compressionType": "GZIP"
	}, {
		"tsName": "root.sg.d1.s2",
		"dataType": "INT64",
		"encoding": "RLE",
		"compressionType": "GZIP"
	}]
}

Note

  1. The first column must be Time. The rest must be arranged in the same order as in field of JSON.
  2. The JSON of schema must contain timeType and fields.
  3. There are only two options long and string for timeType.
  4. The columns tsName and dataType must be set.
  5. The tsName must start with root..
  6. The supported dataTypes are INT32, INT64, FLOAT, DOUBLE, BOOLEAN, TEXT.
  7. The supported encoding are PLAIN, DICTIONARY, RLE, DIFF, TS_2DIFF, BITMAP, GORILLA_V1, REGULAR, GORILLA.
  8. The supported compressionType are UNCOMPRESSED, SNAPPY, GZIP, LZO, SDT, PAA, PLA, LZ4.

Relationships

relationship description
success Data can be written correctly or flow file is empty.
failure The shema or flow file is abnormal.

mtien-apache and others added 3 commits July 7, 2020 15:36
…der response when the OIDC Identifying User claim is not found. Revised log message to print available claims.

Added new StandardOidcIdentityProviderGroovyTest file.
Updated deprecated methods in StandardOidcIdentityProvider. Changed log output to print all available claim names from JWTClaimsSet. Added unit test.
Added comments in getAvailableClaims() method.
Fixed typos in NiFi Docs Admin Guide.
Added license to Groovy test.
Fixed a checkstyle error.
Refactor exchangeAuthorizationCode method.
Added unit tests.
Verified all unit tests added so far are passing.
Refactored code. Added unit tests.
Refactored OIDC provider to decouple constructor & network-dependent initialization.
Added unit tests.
Added unit tests.
Refactored OIDC provider to separately authorize the client. Added unit tests.
Added unit tests.

NIFI-7332 Refactored exchangeAuthorizationCode method to separately retrieve the NiFi JWT.

Signed-off-by: Nathan Gough <thenatog@gmail.com>

This closes apache#4344.
@exceptionfactory exceptionfactory changed the title implement PutIoTDB NIFI-10234 Implement PutIoTDB Aug 2, 2022
Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @xuanronaldo! Supporting integration with Apache IoTDB through the standard Java API looks like a very useful feature.

Unfortunately there several general issues that need to be resolved before a more thorough implementation review can go forward.

  1. The Static Analysis check indicates several style problems. At a quick glance, some of the problems appear to be related to the use of asterisk imports. Please run a Maven build with the contrib-check profile to evaluate the issues
  2. All maven.compiler properties must be removed. Although NiFi will be deprecating support for Java 8 soon, it is still a supported version, and that property cannot be overridden in specific modules
  3. All packages must begin with org.apache.nifi, instead of org.apache.iotdb
  4. All Maven groupId elements must contain org.apache.nifi, instead of org.apache.iotdb

There are a couple other general recommendations worth noting:

  1. The Jackson library should be used for JSON processing unless there is a compelling reason to use a different library
  2. Use of inline anonymous class declarations for collections such as HashMap, HashSet, and ArrayList should be avoided. Although this has some convenience value, it is not recommended in either runtime code or test code.

Thanks again for working on this new feature. After addressing these issues, the changes should be in a good position for a more detailed review.

@exceptionfactory
Copy link
Contributor

One additional note, Apache NiFi just released version 1.17.0, so the current main branch is now on 1.18.0-SNAPSHOT. Please rebase and update to the current version.

Thanks!

@xuanronaldo
Copy link
Contributor Author

Ok, I got it. Thanks for your suggestions. I'm working on it.

@xuanronaldo
Copy link
Contributor Author

I have pushed the code, and all checks have passed. What should I do next to merge the branch?

@exceptionfactory
Copy link
Contributor

Thanks for making the updates @xuanronaldo. Merging requires approval from at least one committer. I will try to take a look at the latest version soon.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the initial changes @xuanronaldo.

I took a closer look at the implementation and noted a number of areas for improvement. The division of the abstract class and concrete class is helpful to indicate the core functionality. The core logic in onTrigger was difficult to follow at points because of the layout and nesting, so some general refactoring would be helpful.

<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>${project.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency should be marked with a test scope.

Comment on lines +80 to +85
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<version>${project.version}</version>
<type>nar</type>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency must be removed to nifi-iotdb-nar, instead of this jar module.

Comment on lines +98 to +108
<version>${jackson.bom.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.bom.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.bom.version}</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The version property can be removed since the Bill-of-Materials dependency in the root configuration manages the version.

Comment on lines +110 to +113
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JUnit 5 is included as a default dependency, so this dependency on JUnit 4 should be removed.

import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be changed to JUnit 5 Assertions and it would be better to use static imports for assert methods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xuanronaldo Please review this comment and update the assertions to use JUnit 5 instead of JUnit 4. Thanks!

Comment on lines +169 to +179
if (!result.getKey()) {
getLogger().error(result.getValue());
inputStream.close();
recordReader.close();
processSession.transfer(flowFile, REL_FAILURE);
return;
} else {
if (result.getValue() != null) {
getLogger().warn(result.getValue());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All log messages should have a readable message, in addition to passing the relevant value.

Comment on lines +171 to +172
inputStream.close();
recordReader.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing these objects should be unnecessary with the use of try-with-resources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to delete these, but it will throw an exception java.lang.IllegalStateException: FlowFile[0,16972544312800.mockFlowFile,0B] already in use for an active callback or InputStream created by ProcessSession.read(FlowFile) has not been closed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reply @xuanronaldo, does this occur when calling ProcessSession.transfer()? If you move it outside of the try-catch block, or change the read() signature to use a callback, that should resolve this error.

timestamp = (Long) values[0];
}

boolean flag = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of this variable is somewhat unclear based on the name.

return;
}

HashMap<String, Tablet> tablets;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable, and perhaps others, should be declared and assigned in the same location for better readability.

@xuanronaldo
Copy link
Contributor Author

Thanks for making the initial changes @xuanronaldo.

I took a closer look at the implementation and noted a number of areas for improvement. The division of the abstract class and concrete class is helpful to indicate the core functionality. The core logic in onTrigger was difficult to follow at points because of the layout and nesting, so some general refactoring would be helpful.

Thanks for your suggestions. I will update the code asap.

@xuanronaldo
Copy link
Contributor Author

I have merged the main branch to resolve the problem which happened in workflow. Please approve running workflows agin. @exceptionfactory

@xuanronaldo
Copy link
Contributor Author

All checks have passed, could you please take a look at my latest commit? @exceptionfactory

import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.junit.Assert;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the other note about changing to use JUnit 5 Assertions.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for making the latest updates @xuanronaldo, this is looking closer to completion. I noted a few additional minor details, and I had a particular question about the expected format of the IoTDB Schema Template.

I plan to run through some additional tests soon.

Comment on lines +76 to +80
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-dbcp-service-api</artifactId>
<version>${project.version}</version>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency is not necessary and should be removed

if (format == null && needInitFormatter) {
format = initFormatter((String) values[0]);
if (format == null) {
getLogger().error("{} Record [{}] time format not supported\", flowFile, recordNumber");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trailing slash character should be removed:

Suggested change
getLogger().error("{} Record [{}] time format not supported\", flowFile, recordNumber");
getLogger().error("{} Record [{}] time format not supported", flowFile, recordNumber");


long timestamp;
if (needInitFormatter) {
timestamp = Timestamp.valueOf(LocalDateTime.parse((String) values[0], format)).getTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend breaking this into multiple lines for better readability.

FlowFile flowFile = processSession.get();

if (flowFile == null) {
processSession.transfer(flowFile, REL_SUCCESS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This transfer should be removed because flowFile is null.


static final PropertyDescriptor SCHEMA =
new PropertyDescriptor.Builder()
.name("Schema")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommend naming this property Schema Template to align with the IoTDB terminology.

Comment on lines +71 to +74
"The schema that IoTDB needs doesn't support good by NiFi.\n"
+ "Therefore, you can define the schema here.\n"
+ "Besides, you can set encoding type and compression type by this method.\n"
+ "If you don't set this property, the inferred schema will be used.\n")
Copy link
Contributor

@exceptionfactory exceptionfactory Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The wording of this description is not quite clear, and does not need to state what NiFi does not support. Recommend adjusting the wording to include more details about the expected format, along the following lines:

The Apache IoTDB Schema Template defined using JSON. The Processor will infer the IoTDB Schema when this property is not configured.

The IoTDB Schema Template documentation provides some details, but is there an official format specification? It would be helpful to link to that documentation if available, otherwise it will be unclear how to define this property.

@xuanronaldo
Copy link
Contributor Author

Okay, I got it. I will update the code asap. Thanks for your suggestions.

@xuanronaldo
Copy link
Contributor Author

Hi, @exceptionfactory . Sorry about committing the code so late. I got lots of work to do, recently. Besides, I have updated a document about IoTDB Schema Template in our website as you requested. The link of it was added into the description of Schema Template.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update @xuanronaldo. The link listed in the Apache IoTDB documentation appears to describe this processor, as opposed to a generalized IoTDB Schema Template. These seems like the Schema Template is specific to this processor property, as opposed to being a generalized Apache IoTDB standard representation. Is that correct? Perhaps this is the best way to represent that IoTDB Schema, but if there is a schema representation that is specific to IoTDB, not NiFi, that would be ideal. I will take a closer look at the implementation and documentation.

It looks like the most recent commits resulted in introducing several unrelated changes, found in the nifi-hubspot-bundle and nifi-standard-bundle. Please review and revert these changes so that the latest version of the pull request is limited to IoTDB Processor changes. Thanks!

@xuanronaldo
Copy link
Contributor Author

Yep, you are rigth @exceptionfactory . The template in IoTDB is a structure to reduce the memory usage. However, the Schema Template here is just an adapter to define the schema in IoTDB. If you want to create schema by native way, the DDL or coding is necessary. Therefore, I designed the Schema Template to make it.

@xuanronaldo
Copy link
Contributor Author

I have reverted these changes. However, several files were still marked for modification. I have no idea what to do. @exceptionfactory

@exceptionfactory
Copy link
Contributor

I have reverted these changes. However, several files were still marked for modification. I have no idea what to do. @exceptionfactory

I see the files are still marked as changed. If it is necessary to squash all current changes in order to remove the merge commits and avoid including these files marked as changed, that would be fine too. Either way, the latest version should just reflect the new components.

@xuanronaldo
Copy link
Contributor Author

I have reverted these changes. However, several files were still marked for modification. I have no idea what to do. @exceptionfactory

I see the files are still marked as changed. If it is necessary to squash all current changes in order to remove the merge commits and avoid including these files marked as changed, that would be fine too. Either way, the latest version should just reflect the new components.

I have no idea about squash all current changes. Or I can create a new branch and submit another PR?

@exceptionfactory
Copy link
Contributor

Thanks for following up @xuanronaldo. Sure, if it is easier for you to create a new branch with the current version, that's fine as well. It is easy to link to this PR for background, and we can work from the new PR.

@xuanronaldo xuanronaldo deleted the implement_put_iotdb branch September 14, 2022 02:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants