Skip to content

[IOTDB-560] add TSRecordOutputFormat to write TsFile via Flink DataSet/DataStream API.#1084

Merged
qiaojialin merged 4 commits intoapache:masterfrom
WeiZhong94:IOTDB-560-2
May 6, 2020
Merged

[IOTDB-560] add TSRecordOutputFormat to write TsFile via Flink DataSet/DataStream API.#1084
qiaojialin merged 4 commits intoapache:masterfrom
WeiZhong94:IOTDB-560-2

Conversation

@WeiZhong94
Copy link
Copy Markdown
Contributor

TsFile is a columnar storage file format in Apache IoTDB. It is designed for time series data and supports efficient compression and query and is easy to be integrated into big data processing frameworks.

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams and becoming more and more popular in IOT scenes. So, it would be great to integrate IoTDB and Flink.

This pull request adds a TSRecordOutputFormat to support write TsFiles on flink via DataStream/DataSet API.

More detail can be found in discussion thread [1], or in the IoTDB wiki [2]

[1]https://lists.apache.org/thread.html/r6dd6afe4e8e4ca42e3ddfbc80609597788f90b214e7a81788c3b51b3%40%3Cdev.iotdb.apache.org%3E

[2]https://cwiki.apache.org/confluence/display/IOTDB/%5BImprovement+Proposal%5D+Add+Flink+Connector+Support+for+TsFile

@sunjincheng121
Copy link
Copy Markdown
Member

Thanks for the PR @WeiZhong94 !
Sorry, I'm busy with the Flink forward SF meeting. I'll review this PR as soon as possible next week!
BTW:Could you please have look at this PR @qiaojialin @jixuan1989 :)

@sunjincheng121
Copy link
Copy Markdown
Member

@WeiZhong94 Could you please rebase the PR. and I would like to have look at it then.

Copy link
Copy Markdown
Member

@sunjincheng121 sunjincheng121 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @WeiZhong94 !
Overall is pretty good! I only left a few comments in the PR and one as follows:
Though the "OutputFormat" can be used in DataStream API, it would be better if we support writing TsFile via "StreamingFileSink", which is integrated with the checkpointing mechanism to provide exactly once semantics. Of course we could do this in follow up PR.

What do you think?

import java.util.stream.Collectors;

/**
* The example of writing TsFile via Flink DataSet API.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writing TsFile -》 writing to TsFile ?

import java.util.stream.Collectors;

/**
* The example of writing TsFile via Flink DataStream API.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

writing TsFile -》 writing to TsFile ?

}
```

### TSRecordOutputFormat Example
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of TSRecordOutputFormat ?

import java.io.IOException;
import java.io.Serializable;

public interface TSRecordConverter<T> extends Serializable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be better to add JDK Doc?


void open(Schema schema) throws IOException;

void covertAndCollect(T input, Collector<TSRecord> collector) throws IOException;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add JDK Doc? Add semantic description of this method。


void open(Schema schema) throws IOException;

void covertAndCollect(T input, Collector<TSRecord> collector) throws IOException;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the method name covertAndCollect ,I think it again, It is not pretty clear for the semantic. I think in in TSRecordConverter the main goal of covertAndCollect is covert the T to TSRecord. So, I would like to change the name from covertAndCollect to convert which make the semantic more clearly. What do you think?

BTW: typo covert -> convert

public TSRecordConverter<T> getConverter() {
return converter;
}
} No newline at end of file
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an empty row.

import java.net.URISyntaxException;
import java.util.Optional;

public abstract class TsFileOutputFormat<T> extends FileOutputFormat<T> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add JDK Doc

Comment on lines +110 to +115
Types.FLOAT,
Types.INT,
Types.INT,
Types.FLOAT,
Types.INT,
Types.INT
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, should these be LONG?

DataPoint templateDataPoint = templateRecord.dataPointList.get(dataPointIndexMapping[i]);
Object o = input.getField(i);
if (o != null) {
Class typeClass = o.getClass();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

templateDataPoint.type could be used to switch the data type.

@WeiZhong94
Copy link
Copy Markdown
Contributor Author

@sunjincheng121 @qiaojialin Thanks for your review! Sorry for the late reply because my work is really busy recently :(. I have addressed your comments, please take a look.

Though the "OutputFormat" can be used in DataStream API, it would be better if we support writing TsFile via "StreamingFileSink", which is integrated with the checkpointing mechanism to provide exactly once semantics. Of course we could do this in follow up PR.

Yes, "StreamingFileSink" is much better than "OutputFormat" for streaming job. To support writing TsFile via "StreamingFileSink", we need to implement the flink interface "org.apache.flink.api.common.serialization.BulkWriter". Current blocker is that the BulkWriter wraps the output target as a "FSDataOutputStream" object. It is possible to write TsFile data via "FSDataOutputStream" but needs further discussion as the "FSDataOutputStream" does not support the "truncate" method, which is required by the "TsFileOutput" interface.

@sunjincheng121
Copy link
Copy Markdown
Member

Hi @WeiZhong94 I am fine with current OutputFormat solution for now. and please have look at the CI issue. Thanks.

Copy link
Copy Markdown
Member

@qiaojialin qiaojialin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks, only two data types error in readme. As for the truncate method in TsFileOutput, I think it is ok to not support. It's only used in restarting the IoTDB server.

@WeiZhong94
Copy link
Copy Markdown
Contributor Author

@sunjincheng121 @qiaojialin Thanks for your reply! I have correct the README.md. It seems that the travis test failure is not involved in this PR, so I just rebased this PR to trigger the test, hope this can work.

@qiaojialin qiaojialin merged commit fd62d4f into apache:master May 6, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants