Skip to content

Commit

Permalink
[HUDI-5205] support flink 1.16.0
Browse files Browse the repository at this point in the history
  • Loading branch information
wuzhiping committed Jan 3, 2023
1 parent fb28ad8 commit efd000d
Show file tree
Hide file tree
Showing 46 changed files with 4,380 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
- flinkProfile: "flink1.13"
- flinkProfile: "flink1.14"
- flinkProfile: "flink1.15"
- flinkProfile: "flink1.16"
steps:
- uses: actions/checkout@v2
- name: Set up JDK 8
Expand All @@ -95,6 +96,8 @@ jobs:
strategy:
matrix:
include:
- flinkProfile: 'flink1.16'
sparkProfile: 'spark3.3'
- flinkProfile: 'flink1.15'
sparkProfile: 'spark3.3'
- flinkProfile: 'flink1.14'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.sink;

import org.apache.hudi.adapter.OperatorCoordinatorAdapter;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.SerializableConfiguration;
Expand Down Expand Up @@ -48,10 +49,11 @@
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -79,7 +81,7 @@
* @see StreamWriteFunction for the work flow and semantics
*/
public class StreamWriteOperatorCoordinator
implements OperatorCoordinator {
implements OperatorCoordinatorAdapter {
private static final Logger LOG = LoggerFactory.getLogger(StreamWriteOperatorCoordinator.class);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.adapter.SortCodeGeneratorAdapter;

import java.util.Arrays;

Expand Down Expand Up @@ -52,6 +53,6 @@ public SortCodeGenerator createSortCodeGenerator() {
for (int sortIndex : sortIndices) {
builder.addField(sortIndex, true, true);
}
return new SortCodeGenerator(tableConfig, rowType, builder.build());
return new SortCodeGeneratorAdapter(tableConfig, rowType, builder.build());
}
}
12 changes: 10 additions & 2 deletions hudi-flink-datasource/hudi-flink1.13.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -89,15 +91,21 @@
<version>${flink1.13.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink1.13.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_${scala.binary.version}</artifactId>
<version>${flink1.13.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

/**
* Adapter clazz for {@code OperatorCoordinator}.
*/
public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;

/**
* Adapter clazz for {@code SortCodeGenerator}.
*/
public class SortCodeGeneratorAdapter extends SortCodeGenerator {
public SortCodeGeneratorAdapter(TableConfig conf, RowType input, SortSpec sortSpec) {
super(conf, input, sortSpec);
}
}
10 changes: 10 additions & 0 deletions hudi-flink-datasource/hudi-flink1.14.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
Expand Down Expand Up @@ -107,6 +109,8 @@
<version>${flink1.14.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
Expand All @@ -116,6 +120,12 @@
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink1.14.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

/**
* Adapter clazz for {@code OperatorCoordinator}.
*/
public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;

/**
* Adapter clazz for {@code SortCodeGenerator}.
*/
public class SortCodeGeneratorAdapter extends SortCodeGenerator {
public SortCodeGeneratorAdapter(TableConfig tableConfig, RowType input, SortSpec sortSpec) {
super(tableConfig, input, sortSpec);
}
}
12 changes: 10 additions & 2 deletions hudi-flink-datasource/hudi-flink1.15.x/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
Expand Down Expand Up @@ -107,15 +109,21 @@
<version>${flink1.15.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink1.15.version}</version>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink1.15.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-tests-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;

/**
* Adapter clazz for {@code OperatorCoordinator}.
*/
public interface OperatorCoordinatorAdapter extends OperatorCoordinator {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.adapter;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator;
import org.apache.flink.table.planner.plan.nodes.exec.spec.SortSpec;
import org.apache.flink.table.types.logical.RowType;

/**
* Adapter clazz for {@code SortCodeGenerator}.
*/
public class SortCodeGeneratorAdapter extends SortCodeGenerator {
public SortCodeGeneratorAdapter(TableConfig tableConfig, RowType input, SortSpec sortSpec) {
super(tableConfig, input, sortSpec);
}
}

0 comments on commit efd000d

Please sign in to comment.