Skip to content

Commit

Permalink
[HUDI-5205] Support flink 1.16.0 (apache#7584)
Browse files Browse the repository at this point in the history
Co-authored-by: wuzhiping <wuzhiping.007@bytedance.com>
  • Loading branch information
2 people authored and fengjian committed Apr 5, 2023
1 parent ba66bdf commit bc6ca1c
Show file tree
Hide file tree
Showing 49 changed files with 4,416 additions and 20 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand All @@ -95,6 +96,8 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.16'
sparkProfile: 'spark3.3'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.3'
- flinkProfile: 'flink1.14'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink;

import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
Expand Down Expand Up @@ -48,10 +49,11 @@
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -79,7 +81,7 @@
* @see StreamWriteFunction for the work flow and semantics
*/
public class StreamWriteOperatorCoordinator
implements OperatorCoordinator {
implements OperatorCoordinatorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.sink.bulk.sort;

import org.apache.hudi.adapter.SortCodeGeneratorAdapter;

import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -52,6 +54,6 @@ public SortCodeGenerator createSortCodeGenerator() {
for (int sortIndex : sortIndices) {
builder.addField(sortIndex, true, true);
}
return new SortCodeGenerator(tableConfig, rowType, builder.build());
return new SortCodeGeneratorAdapter(tableConfig, rowType, builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -746,22 +746,22 @@ public void alterTable(
@Override
public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new HoodieCatalogException("Not supported.");
return Collections.emptyList();
}

@Override
public List<CatalogPartitionSpec> listPartitions(
ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
throws TableNotExistException, TableNotPartitionedException,
PartitionSpecInvalidException, CatalogException {
throw new HoodieCatalogException("Not supported.");
return Collections.emptyList();
}

@Override
public List<CatalogPartitionSpec> listPartitionsByFilter(
ObjectPath tablePath, List<Expression> expressions)
throws TableNotExistException, TableNotPartitionedException, CatalogException {
throw new HoodieCatalogException("Not supported.");
return Collections.emptyList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1535,18 +1535,18 @@ void testBuiltinFunctionWithHMSCatalog() {
.end();
tableEnv.executeSql(hoodieTableDDL);

String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02'), '1'), (2, DATE '2022-02-02', '1')";
String insertSql = "insert into t1 values (1, TO_DATE('2022-02-02'), '1'), (2, DATE '2022-02-02', '2')";
execInsertSql(tableEnv, insertSql);

List<Row> result = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1").execute().collect());
final String expected = "["
+ "+I[1, 2022-02-02, 1], "
+ "+I[2, 2022-02-02, 1]]";
+ "+I[2, 2022-02-02, 2]]";
assertRowsEquals(result, expected);

List<Row> partitionResult = CollectionUtil.iterableToList(
() -> tableEnv.sqlQuery("select * from t1 where f_int = 1").execute().collect());
() -> tableEnv.sqlQuery("select * from t1 where f_par = '1'").execute().collect());
assertRowsEquals(partitionResult, "[+I[1, 2022-02-02, 1]]");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hudi.adapter.DataStreamScanProviderAdapter;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -49,7 +51,7 @@
* A continuous file source that can trigger checkpoints continuously.
*
* <p>It loads the data in the specified file and split the data into number of checkpoints batches.
* Say, if you want 4 checkpoints and there are 8 records in the file, the emit strategy is:
* Say, if you want 4 checkpoints and there are 8 records in the file, the emission strategy is:
*
* <pre>
* | 2 records | 2 records | 2 records | 2 records |
Expand Down Expand Up @@ -85,6 +87,7 @@ public boolean isBounded() {
@Override
public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {
final RowType rowType = (RowType) tableSchema.toSourceRowDataType().getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType,
InternalTypeInfo.of(rowType),
Expand All @@ -95,7 +98,7 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
return execEnv.addSource(new BoundedSourceFunction(path, conf.getInteger(CHECKPOINTS)))
.name("continuous_file_source")
.setParallelism(1)
.map(record -> deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8)),
.map(new JsonDeserializationFunction(deserializationSchema),
InternalTypeInfo.of(rowType));
}
};
Expand Down Expand Up @@ -183,4 +186,28 @@ public void notifyCheckpointComplete(long l) {
this.currentCP.incrementAndGet();
}
}

/**
* Wrapper function that manages the lifecycle of the JSON deserialization schema.
*/
public static class JsonDeserializationFunction
extends AbstractRichFunction
implements MapFunction<String, RowData> {
private final JsonRowDataDeserializationSchema deserializationSchema;

public JsonDeserializationFunction(JsonRowDataDeserializationSchema deserializationSchema) {
this.deserializationSchema = deserializationSchema;
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.deserializationSchema.open(null);
}

@Override
public RowData map(String record) throws Exception {
return deserializationSchema.deserialize(record.getBytes(StandardCharsets.UTF_8));
}
}
}
12 changes: 10 additions & 2 deletions hudi-flink-datasource/hudi-flink1.13.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -89,15 +91,21 @@
<version>${flink1.13.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink1.13.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink1.13.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

/**
* Adapter clazz for {@code OperatorCoordinator}.
*/
public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;

/**
* Adapter clazz for {@code SortCodeGenerator}.
*/
public class SortCodeGeneratorAdapter extends SortCodeGenerator {
public SortCodeGeneratorAdapter(TableConfig conf, RowType input, SortSpec sortSpec) {
super(conf, input, sortSpec);
}
}
12 changes: 10 additions & 2 deletions hudi-flink-datasource/hudi-flink1.14.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
Expand Down Expand Up @@ -107,15 +109,21 @@
<version>${flink1.14.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink1.14.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink1.14.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

/**
* Adapter clazz for {@code OperatorCoordinator}.
*/
public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;

/**
* Adapter clazz for {@code SortCodeGenerator}.
*/
public class SortCodeGeneratorAdapter extends SortCodeGenerator {
public SortCodeGeneratorAdapter(TableConfig tableConfig, RowType input, SortSpec sortSpec) {
super(tableConfig, input, sortSpec);
}
}
12 changes: 10 additions & 2 deletions hudi-flink-datasource/hudi-flink1.15.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
Expand Down Expand Up @@ -107,15 +109,21 @@
<version>${flink1.15.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink1.15.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink1.15.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
Expand Down

0 comments on commit bc6ca1c

Please sign in to comment.