Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ARCTIC-969] Refactor Logstore Source to FLINK FLIP-27 API #970

Merged
merged 11 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.netease.arctic.flink.read.internals.AbstractFetcher;
import com.netease.arctic.flink.read.internals.KafkaFetcher;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.metrics.MetricGroup;
Expand Down Expand Up @@ -69,7 +68,7 @@
* <p>Please refer to Kafka's documentation for the available configuration properties:
* http://kafka.apache.org/documentation.html#newconsumerconfigs
*/
@PublicEvolving
@Deprecated
public class FlinkKafkaConsumer<T> extends FlinkKafkaConsumerBase<T> {

private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
* @param <T> The type of records produced by this data source
*/
@Internal
@Deprecated
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T>
implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
/**
* This Handler contains the topic offsets of upstream job id, epicNo, topic.
*/
@Deprecated
public class LogEpicStateHandler implements Serializable {
private static final String SEPARATOR = "_";
private static final long serialVersionUID = 203036690144637883L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
/**
* An arctic log consumer that consume arctic log data from kafka.
*/
@Deprecated
public class LogKafkaConsumer extends FlinkKafkaConsumer<RowData> {
private static final long serialVersionUID = 7855676094345921722L;
private KafkaDeserializationSchemaWrapper<RowData> logRecordDeserializationSchemaWrapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* This thread is an extent of {@link KafkaConsumerThread} added an abstract method
* {@link KafkaConsumerThread#reSeekPartitionOffsets()} to reSeek the offset of kafka topic partitions.
*/
@Deprecated
public class LogKafkaConsumerThread<T> extends KafkaConsumerThread<T> {
protected final ClosableBlockingQueue<KafkaTopicPartitionState<T, TopicPartition>>
unReSeekPartitionsQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* The fetcher runs in {@link LogKafkaConsumer} and fetches messages from kafka, and retracts message as handling a
* Flip message that {@link LogData#getFlip()} is true.
*/
@Deprecated
public class LogKafkaFetcher extends KafkaFetcher<RowData> {
private static final Logger LOG = LoggerFactory.getLogger(LogKafkaFetcher.class);
private final LogDataJsonDeserialization<RowData> logDataJsonDeserialization;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
* According to upstreamId and partition topic dealing with the flip message, when should begin to retract message and
* when to end it.
*/
@Deprecated
public class LogReadHelper implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(LogReadHelper.class);
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* @param <K> The type of topic/partition identifier used by Kafka in the specific version.
*/
@Internal
@Deprecated
public abstract class AbstractFetcher<T, K> {

private static final int NO_TIMESTAMPS_WATERMARKS = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
* an indirection to the KafkaConsumer calls that change signature.
*/
@Internal
@Deprecated
public class KafkaConsumerThread<T> extends Thread {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* @param <T> The type of elements produced by the fetcher.
*/
@Internal
@Deprecated
public class KafkaFetcher<T> extends AbstractFetcher<T, TopicPartition> {

private static final Logger LOG = LoggerFactory.getLogger(KafkaFetcher.class);
Expand Down
Loading