[GLUTEN-10361][FLINK] Fix UT failure between the conversion of BinaryRowData and StatefulRecord#10362
[GLUTEN-10361][FLINK] Fix UT failure between the conversion of BinaryRowData and StatefulRecord#10362shuai-xu merged 12 commits intoapache:mainfrom
BinaryRowData and StatefulRecord#10362Conversation
BinaryRowData and StatefulRecordBinaryRowData and StatefulRecord
|
depend on pr: bigo-sg/velox#7, bigo-sg/velox4j#8 |
| return provider | ||
| .consumeDataStream(createProviderContext(config), dataStream) | ||
| .getTransformation(); | ||
| Transformation<?> transformation = |
There was a problem hiding this comment.
Please change it back if not necessary to keep this class changed as less as possible.
| ((SinkFunctionProvider) runtimeProvider).createSinkFunction(); | ||
| return createSinkFunctionTransformation( | ||
| sinkFunction, env, inputTransform, rowtimeFieldIndex, sinkMeta, sinkParallelism); | ||
| Transformation sinkTransformation = |
There was a problem hiding this comment.
add // --- Begin Gluten-specific code changes --- before and after your changes
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.flink.table.planner.plan.nodes.exec.common.sink; |
There was a problem hiding this comment.
This class should be put into gluten package
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.flink.table.planner.plan.nodes.exec.common.source; |
| new TableScanNode(PlanNodeIdGenerator.newId(), rowType, tableHandle, List.of()); | ||
| GlutenStreamSource op = | ||
| new GlutenStreamSource( | ||
| new GlutenValuesSourceFunction( |
There was a problem hiding this comment.
It need not to add a GlutenValuesSourceFunction, you can pass the values to TableScanNode
| .getScanTableSource( | ||
| planner.getFlinkContext(), ShortcutUtils.unwrapTypeFactory(planner)); | ||
| Transformation<RowData> sourceTransformation = super.translateToPlanInternal(planner, config); | ||
| return VeloxSourceBuilder.build(sourceTransformation, tableSource); |
There was a problem hiding this comment.
Add comments for gluten specified changes.
| import java.util.Map; | ||
| import java.util.stream.IntStream; | ||
|
|
||
| public class FlinkRowToRowDataConverter { |
There was a problem hiding this comment.
It seems flink has utility class to convert Row to RowData, maybe we can use it. See RowRowConvertor
There was a problem hiding this comment.
RowRowConverter has bugs and not supported features when transform map and arrays, that is why we introduce FlinkRowToRowDataConverter here, not to repalce RowRowConverter, but to fix the problems of RowRowConverter.
| try { | ||
| Class<?> tableSourceClazz = | ||
| Class.forName( | ||
| "org.apache.flink.table.planner.factories.TestValuesTableFactory$TestValuesScanTableSourceWithoutProjectionPushDown"); |
There was a problem hiding this comment.
This name is under test since in test environment it loads this class? Which class it use in production environment?
There was a problem hiding this comment.
This is specially designed for test,and does not have a class in production.
There was a problem hiding this comment.
please add some comment for it.
|
velox 4j changes have been merged, you can update the version |
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
(Fixes: #10361)
How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)