Skip to content
Permalink
Browse files
[Flink][Bug] Fix potential NPE when cancel DorisSourceFunction (#6838)
Fix potential NPE of `scalaValueReader` when cancelling DorisSourceFunction.
  • Loading branch information
Myasuka committed Oct 23, 2021
1 parent f6a2e25 commit c6bb1b44a952c1effbb6dcb6dcd92884b7565b26
Showing 2 changed files with 14 additions and 8 deletions.
@@ -43,8 +43,8 @@ public class DorisSourceFunction extends RichSourceFunction<List<?>> implements
private final DorisDeserializationSchema<List<?>> deserializer;
private final DorisOptions options;
private final DorisReadOptions readOptions;
private transient volatile boolean isRunning;
private List<PartitionDefinition> dorisPartitions;
private ScalaValueReader scalaValueReader;

public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) {
this.deserializer = deserializer;
@@ -55,25 +55,32 @@ public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializatio
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.isRunning = true;
this.dorisPartitions = RestService.findPartitions(options, readOptions, logger);
}

@Override
public void run(SourceContext<List<?>> sourceContext) {
for (PartitionDefinition partitions : dorisPartitions) {
scalaValueReader = new ScalaValueReader(partitions, options, readOptions);
while (scalaValueReader.hasNext()) {
List<?> next = scalaValueReader.next();
sourceContext.collect(next);
try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) {
while (isRunning && scalaValueReader.hasNext()) {
List<?> next = scalaValueReader.next();
sourceContext.collect(next);
}
}
}
}

@Override
public void cancel() {
scalaValueReader.close();
isRunning = false;
}

@Override
public void close() throws Exception {
super.close();
isRunning = false;
}

@Override
public TypeInformation<List<?>> getProducedType() {
@@ -19,7 +19,6 @@ package org.apache.doris.flink.datastream

import java.util.concurrent._
import java.util.concurrent.atomic.AtomicBoolean

import org.apache.doris.flink.backend.BackendClient
import org.apache.doris.flink.cfg.ConfigurationOptions._
import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions}
@@ -41,7 +40,7 @@ import scala.util.control.Breaks
* @param partition Doris RDD partition
* @param options request configuration
*/
class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) {
class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) extends AutoCloseable {
protected val logger = Logger.getLogger(classOf[ScalaValueReader])

protected val client = new BackendClient(new Routing(partition.getBeAddress), readOptions)

0 comments on commit c6bb1b4

Please sign in to comment.