Skip to content

Commit

Permalink
[HUDI-4357] Support flink 1.15.x
Browse files Browse the repository at this point in the history
  • Loading branch information
danny0405 committed Jul 5, 2022
1 parent fbda4ad commit 64e3f11
Show file tree
Hide file tree
Showing 91 changed files with 7,799 additions and 82 deletions.
12 changes: 6 additions & 6 deletions hudi-client/hudi-flink-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>${flink.clients.artifactId}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<artifactId>${flink.hadoop.compatibility.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand All @@ -70,7 +70,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<artifactId>${flink.parquet.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
<exclusions>
Expand Down Expand Up @@ -164,7 +164,7 @@
<!-- Flink - Tests -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -177,7 +177,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<classifier>tests</classifier>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@

import java.io.IOException;
import java.util.Iterator;

import scala.collection.immutable.List;
import java.util.List;

public class FlinkMergeHelper<T extends HoodieRecordPayload> extends BaseMergeHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
Expand Down
22 changes: 8 additions & 14 deletions hudi-examples/hudi-examples-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>${flink.clients.artifactId}</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
Expand All @@ -138,7 +138,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<artifactId>${flink.connector.kafka.artifactId}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -148,12 +148,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<artifactId>${flink.hadoop.compatibility.artifactId}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<artifactId>${flink.parquet.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -183,7 +183,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<artifactId>${flink.statebackend.rocksdb.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -304,17 +304,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.apache.hudi</groupId>-->
<!-- <artifactId>hudi-flink_${scala.binary.version}</artifactId>-->
<!-- <version>${project.version}</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->

<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -327,7 +321,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.hudi.examples.quickstart.source;

import org.apache.hudi.adapter.DataStreamScanProviderAdapter;

import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
Expand All @@ -28,7 +30,6 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -74,7 +75,7 @@ public ContinuousFileSource(

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return new DataStreamScanProvider() {
return new DataStreamScanProviderAdapter() {

@Override
public boolean isBounded() {
Expand Down
22 changes: 11 additions & 11 deletions hudi-flink-datasource/hudi-flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,13 @@
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<artifactId>${flink.clients.artifactId}</artifactId>
<scope>compile</scope>
<exclusions>
<exclusion>
Expand All @@ -144,7 +145,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<artifactId>${flink.connector.kafka.artifactId}</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -154,12 +155,12 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<artifactId>${flink.hadoop.compatibility.artifactId}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<artifactId>${flink.parquet.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -189,7 +190,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<artifactId>${flink.statebackend.rocksdb.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
Expand Down Expand Up @@ -348,7 +349,7 @@
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
<artifactId>${flink.test.utils.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand All @@ -361,7 +362,7 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<artifactId>${flink.streaming.java.artifactId}</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
Expand All @@ -375,14 +376,13 @@
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ public void close() {
* End input action for batch source.
*/
public void endInput() {
super.endInput();
flushRemaining(true);
this.writeClient.cleanHandles();
this.writeStatuses.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public void processElement(I value, Context ctx, Collector<Object> out) throws E
* End input action for batch source.
*/
public void endInput() {
super.endInput();
flushData(true);
this.writeStatuses.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ public abstract class AbstractStreamWriteFunction<I>
*/
private transient CkpMetadata ckpMetadata;

/**
* Since flink 1.15, the streaming job with bounded source triggers one checkpoint
* after calling #endInput, use this flag to avoid unnecessary data flush.
*/
private transient boolean inputEnded;

/**
* Constructs a StreamWriteFunctionBase.
*
Expand Down Expand Up @@ -154,13 +160,21 @@ public void initializeState(FunctionInitializationContext context) throws Except

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (inputEnded) {
return;
}
snapshotState();
// Reload the snapshot state as the current state.
reloadWriteMetaState();
}

public abstract void snapshotState();

@Override
public void endInput() {
this.inputEnded = true;
}

// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void startInstant(String instant) {
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant);
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
}
// cleaning
clean(instant);
Expand Down Expand Up @@ -142,7 +142,7 @@ public void commitInstant(String instant) {
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant);
throw new HoodieException("Exception while adding checkpoint commit metadata for instant: " + instant, e);
}
}

Expand All @@ -166,7 +166,7 @@ private void load() {
try {
this.messages = scanCkpMetadata(this.path);
} catch (IOException e) {
throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path);
throw new HoodieException("Exception while scanning the checkpoint meta files under path: " + this.path, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.DataStreamSinkProviderAdapter;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.configuration.FlinkOptions;
Expand All @@ -30,7 +31,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
Expand Down Expand Up @@ -60,7 +60,7 @@ public HoodieTableSink(Configuration conf, ResolvedSchema schema, boolean overwr

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProvider) dataStream -> {
return (DataStreamSinkProviderAdapter) dataStream -> {

// setup configuration
long ckpTimeout = dataStream.getExecutionEnvironment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.table;

import org.apache.hudi.adapter.DataStreamScanProviderAdapter;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.HoodieBaseFile;
Expand Down Expand Up @@ -64,7 +65,6 @@
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
Expand Down Expand Up @@ -167,7 +167,7 @@ public HoodieTableSource(

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
return new DataStreamScanProvider() {
return new DataStreamScanProviderAdapter() {

@Override
public boolean isBounded() {
Expand Down
Loading

0 comments on commit 64e3f11

Please sign in to comment.