Skip to content

Commit

Permalink
fix: jobManager connect kudu seesion not close
Browse files Browse the repository at this point in the history
  • Loading branch information
collabH committed May 30, 2023
1 parent 30d957a commit a10b726
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .idea/misc.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import org.apache.kudu.client.KuduException;
import org.colloh.flink.kudu.connector.internal.KuduFilterInfo;
import org.colloh.flink.kudu.connector.internal.KuduTableInfo;
import org.colloh.flink.kudu.connector.internal.convertor.RowResultConvertor;
Expand All @@ -30,9 +33,6 @@
import org.colloh.flink.kudu.connector.internal.reader.KuduReaderConfig;
import org.colloh.flink.kudu.connector.internal.reader.KuduReaderIterator;
import org.colloh.flink.kudu.connector.table.catalog.KuduCatalog;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.types.Row;
import org.apache.kudu.client.KuduException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -59,22 +59,24 @@ public abstract class BaseKuduInputFormat<T> extends RichInputFormat<T, KuduInpu
private final KuduTableInfo tableInfo;
private final List<KuduFilterInfo> tableFilters;
private final List<String> tableProjections;

private final RowResultConvertor<T> rowResultConvertor;
private boolean endReached;

private transient KuduReader<T> kuduReader;
private transient KuduReaderIterator<T> resultIterator;
private final RowResultConvertor<T> rowResultConvertor;

public BaseKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor, KuduTableInfo tableInfo) {
public BaseKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
KuduTableInfo tableInfo) {
this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), null);
}

public BaseKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor, KuduTableInfo tableInfo, List<String> tableProjections) {
public BaseKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
KuduTableInfo tableInfo, List<String> tableProjections) {
this(readerConfig, rowResultConvertor, tableInfo, new ArrayList<>(), tableProjections);
}

public BaseKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor, KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters, List<String> tableProjections) {
public BaseKuduInputFormat(KuduReaderConfig readerConfig, RowResultConvertor<T> rowResultConvertor,
KuduTableInfo tableInfo, List<KuduFilterInfo> tableFilters,
List<String> tableProjections) {

this.readerConfig = checkNotNull(readerConfig, "readerConfig could not be null");
this.rowResultConvertor = checkNotNull(rowResultConvertor, "readerConfig could not be null");
Expand Down Expand Up @@ -112,6 +114,10 @@ public void close() throws IOException {
e.printStackTrace();
}
}
closeKuduReader();
}

private void closeKuduReader() throws IOException {
if (kuduReader != null) {
kuduReader.close();
kuduReader = null;
Expand All @@ -130,8 +136,12 @@ public InputSplitAssigner getInputSplitAssigner(KuduInputSplit[] inputSplits) {

@Override
public KuduInputSplit[] createInputSplits(int minNumSplits) throws IOException {
startKuduReader();
return kuduReader.createInputSplits(minNumSplits);
try {
startKuduReader();
return kuduReader.createInputSplits(minNumSplits);
} finally {
closeKuduReader();
}
}

@Override
Expand Down

0 comments on commit a10b726

Please sign in to comment.