Skip to content

Flink: RecordFactory interface that can create record batch and clone…#3866

Merged
rdblue merged 3 commits intoapache:masterfrom
stevenzwu:RecordFactory
Feb 6, 2022
Merged

Flink: RecordFactory interface that can create record batch and clone…#3866
rdblue merged 3 commits intoapache:masterfrom
stevenzwu:RecordFactory

Conversation

@stevenzwu
Copy link
Contributor

… record, which are needed by DataIteratorBatcher

… record, which are needed by DataIteratorBatcher
@github-actions github-actions bot added the flink label Jan 10, 2022
/**
* Clone record
*/
void clone(T from, T to);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/**
* Create a batch of records
*/
T[] createBatch(int batchSize);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public RowData[] createBatch(int batchSize) {
RowData[] arr = new RowData[batchSize];
for (int i = 0; i < batchSize; ++i) {
arr[i] = new GenericRowData(rowType.getFieldCount());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the use of rowType.getFieldCount() be removed from the loop? Seems like it only needs to be called once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is just a getter, which should be optimized by JVM

Copy link
Contributor

@kbendick kbendick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Outside of the loop related concern, I'm good with these interfaces.

private final RowType rowType;
private final TypeSerializer[] fieldSerializers;

RowDataRecordFactory(final RowType rowType) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually omit final unless it is an instance field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


@Override
public void clone(RowData from, RowData to) {
RowDataUtil.clone(from, to, rowType, fieldSerializers);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to return the RowData produced by clone to be correct. Otherwise, there are a few modifications to this class or others that cause correctness bugs.

}

@Override
public void clone(RowData from, RowData to) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider changing this to clone(RowData from, RowData[] toBatch, int position)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I adopted this API.

The reason I had the RecordFactory interface is to work with both RowData and Avro GenericRecord iterator. Both Netflix and Apple uses Avro GenericRecord. For the DataIterator with Avro GenericRecord, the iterator returns non-reused/fresh object for every record. In the Avro case, the clone impl can directly set the from object into the toBatch array.

@rdblue rdblue merged commit 2208b24 into apache:master Feb 6, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants