Skip to content

Commit

Permalink
[Improve][Engine] Adjust the sleep mode of flink and spark engine to …
Browse files Browse the repository at this point in the history
…be consistent with zeta (#5698)
  • Loading branch information
happyboy1024 committed Oct 31, 2023
1 parent 67edcff commit e26006d
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ default void markSchemaChangeAfterCheckpoint() {}
* @return The object to use as the lock
*/
Object getCheckpointLock();

default boolean isEmptyThisPollNext() {
return false;
}

default void resetEmptyThisPollNext() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,12 @@ public Object getCheckpointLock() {
return checkpointLock;
}

@Override
public boolean isEmptyThisPollNext() {
return emptyThisPollNext;
}

@Override
public void resetEmptyThisPollNext() {
this.emptyThisPollNext = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,20 @@ public void run(Collector<T> collector) throws Exception {
while (flag.get()) {
try {
reader.pollNext(collector);
Thread.sleep(SLEEP_TIME_INTERVAL);
if (collector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
collector.resetEmptyThisPollNext();
/**
* sleep(0) is used to prevent the current
* thread from occupying CPU resources for a
* long time, thus blocking the checkpoint
* thread for a long time. It is mentioned in
* this
* https://github.com/apache/seatunnel/issues/5694
*/
Thread.sleep(0L);
}
} catch (Exception e) {
running = false;
flag.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;

import static org.apache.seatunnel.translation.source.CoordinatedSource.SLEEP_TIME_INTERVAL;

public class ParallelSource<T, SplitT extends SourceSplit, StateT extends Serializable>
implements BaseSourceFunction<T> {
private static final Logger LOG = LoggerFactory.getLogger(ParallelSource.class);
Expand Down Expand Up @@ -134,7 +132,17 @@ public void run(Collector<T> collector) throws Exception {
future.get();
}
reader.pollNext(collector);
Thread.sleep(SLEEP_TIME_INTERVAL);
if (collector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
collector.resetEmptyThisPollNext();
/**
* sleep(0) is used to prevent the current thread from occupying CPU resources for a
* long time, thus blocking the checkpoint thread for a long time. It is mentioned
* in this https://github.com/apache/seatunnel/issues/5694
*/
Thread.sleep(0L);
}
}
LOG.debug("Parallel source runs complete.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class RowCollector implements Collector<SeaTunnelRow> {
protected final Object checkpointLock;

private FlowControlGate flowControlGate;
private volatile boolean emptyThisPollNext;

public RowCollector(
SourceFunction.SourceContext<Row> internalCollector,
Expand All @@ -60,6 +61,7 @@ public void collect(SeaTunnelRow sourceRecord) {
}
}
internalCollector.collect(rowSerialization.convert(sourceRecord));
emptyThisPollNext = false;
} catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -69,4 +71,14 @@ public void collect(SeaTunnelRow sourceRecord) {
public Object getCheckpointLock() {
return this.checkpointLock;
}

@Override
public boolean isEmptyThisPollNext() {
return emptyThisPollNext;
}

@Override
public void resetEmptyThisPollNext() {
this.emptyThisPollNext = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,20 @@ public void run(Collector<SeaTunnelRow> collector) throws Exception {
while (flag.get()) {
try {
reader.pollNext(rowCollector);
Thread.sleep(SLEEP_TIME_INTERVAL);
if (rowCollector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
rowCollector.resetEmptyThisPollNext();
/**
* sleep(0) is used to prevent the current
* thread from occupying CPU resources for a
* long time, thus blocking the checkpoint
* thread for a long time. It is mentioned
* in this
* https://github.com/apache/seatunnel/issues/5694
*/
Thread.sleep(0L);
}
} catch (Exception e) {
this.running = false;
flag.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,20 @@ public void run(Collector<SeaTunnelRow> collector) throws Exception {
while (flag.get()) {
try {
reader.pollNext(rowCollector);
Thread.sleep(SLEEP_TIME_INTERVAL);
if (rowCollector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
rowCollector.resetEmptyThisPollNext();
/**
* sleep(0) is used to prevent the current
* thread from occupying CPU resources for a
* long time, thus blocking the checkpoint
* thread for a long time. It is mentioned
* in this
* https://github.com/apache/seatunnel/issues/5694
*/
Thread.sleep(0L);
}
} catch (Exception e) {
this.running = false;
flag.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,20 @@ public void run(Collector<SeaTunnelRow> collector) throws Exception {
while (flag.get()) {
try {
reader.pollNext(rowCollector);
Thread.sleep(SLEEP_TIME_INTERVAL);
if (rowCollector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
rowCollector.resetEmptyThisPollNext();
/**
* sleep(0) is used to prevent the current
* thread from occupying CPU resources for a
* long time, thus blocking the checkpoint
* thread for a long time. It is mentioned
* in this
* https://github.com/apache/seatunnel/issues/5694
*/
Thread.sleep(0L);
}
} catch (Exception e) {
this.running = false;
flag.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,20 @@ public void run(Collector<SeaTunnelRow> collector) throws Exception {
while (flag.get()) {
try {
reader.pollNext(rowCollector);
Thread.sleep(SLEEP_TIME_INTERVAL);
if (rowCollector.isEmptyThisPollNext()) {
Thread.sleep(100);
} else {
rowCollector.resetEmptyThisPollNext();
/**
* sleep(0) is used to prevent the current
* thread from occupying CPU resources for a
* long time, thus blocking the checkpoint
* thread for a long time. It is mentioned
* in this
* https://github.com/apache/seatunnel/issues/5694
*/
Thread.sleep(0L);
}
} catch (Exception e) {
this.running = false;
flag.set(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class InternalRowCollector implements Collector<SeaTunnelRow> {
private final AtomicLong collectTotalCount;
private Map<String, Object> envOptions;
private FlowControlGate flowControlGate;
private volatile boolean emptyThisPollNext;

public InternalRowCollector(
Handover<InternalRow> handover,
Expand Down Expand Up @@ -65,6 +66,7 @@ public void collect(SeaTunnelRow record) {
handover.produce(rowSerialization.convert(record));
}
collectTotalCount.incrementAndGet();
emptyThisPollNext = false;
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -78,4 +80,14 @@ public long collectTotalCount() {
public Object getCheckpointLock() {
return this.checkpointLock;
}

@Override
public boolean isEmptyThisPollNext() {
return emptyThisPollNext;
}

@Override
public void resetEmptyThisPollNext() {
this.emptyThisPollNext = true;
}
}

0 comments on commit e26006d

Please sign in to comment.