Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve]Added direct access to BE through the intranet #187

Merged
merged 7 commits into from Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion flink-doris-connector/pom.xml
Expand Up @@ -224,7 +224,13 @@ under the License.
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.27.0</version>
<version>4.2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>

Expand Down
Expand Up @@ -31,15 +31,23 @@ public class DorisConnectionOptions implements Serializable {
protected final String username;
protected final String password;
protected String jdbcUrl;
protected String benodes;

public DorisConnectionOptions(String fenodes, String username, String password) {
this.fenodes = Preconditions.checkNotNull(fenodes, "fenodes is empty");
this.username = username;
this.password = password;
}

public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl){
this(fenodes,username,password);
public DorisConnectionOptions(String fenodes, String username, String password, String jdbcUrl) {
this(fenodes, username, password);
this.jdbcUrl = jdbcUrl;
}

public DorisConnectionOptions(String fenodes, String benodes, String username, String password,
String jdbcUrl) {
this(fenodes, username, password);
this.benodes = benodes;
this.jdbcUrl = jdbcUrl;
}

Expand All @@ -55,6 +63,10 @@ public String getPassword() {
return password;
}

public String getBenodes() {
return benodes;
}

public String getJdbcUrl(){
return jdbcUrl;
}
Expand Down
Expand Up @@ -42,6 +42,12 @@ public DorisOptions(String fenodes, String username, String password, String tab
this.tableIdentifier = tableIdentifier;
}

public DorisOptions(String fenodes, String beNodes, String username, String password,
String tableIdentifier, String jdbcUrl) {
super(fenodes, beNodes, username, password, jdbcUrl);
this.tableIdentifier = tableIdentifier;
}

public String getTableIdentifier() {
return tableIdentifier;
}
Expand All @@ -60,7 +66,7 @@ public static Builder builder() {
*/
public static class Builder {
private String fenodes;

private String benodes;
private String jdbcUrl;
private String username;
private String password;
Expand Down Expand Up @@ -98,6 +104,14 @@ public Builder setFenodes(String fenodes) {
return this;
}

/**
* optional, Backend Http Port
*/
public Builder setBenodes(String benodes) {
this.benodes = benodes;
return this;
}

/**
* not required, fe jdbc url, for lookup query
*/
Expand All @@ -109,9 +123,8 @@ public Builder setJdbcUrl(String jdbcUrl) {
public DorisOptions build() {
checkNotNull(fenodes, "No fenodes supplied.");
JNSimba marked this conversation as resolved.
Show resolved Hide resolved
checkNotNull(tableIdentifier, "No tableIdentifier supplied.");
return new DorisOptions(fenodes, username, password, tableIdentifier, jdbcUrl);
return new DorisOptions(fenodes, benodes, username, password, tableIdentifier, jdbcUrl);
}
}


}
Expand Up @@ -19,11 +19,15 @@

import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.BackendV2.BackendRowV2;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

public class BackendUtil {
Expand All @@ -36,13 +40,34 @@ public BackendUtil(List<BackendV2.BackendRowV2> backends) {
this.pos = 0;
}

public BackendUtil(String beNodes) {
this.backends = initBackends(beNodes);
this.pos = 0;
}

private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
List<BackendV2.BackendRowV2> backends = new ArrayList<>();
List<String> nodes = Arrays.asList(beNodes.split(","));
nodes.forEach(node -> {
if (tryHttpConnection(node)) {
node = node.trim();
String[] ipAndPort = node.split(":");
BackendRowV2 backendRowV2 = new BackendRowV2();
backendRowV2.setIp(ipAndPort[0]);
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
backendRowV2.setAlive(true);
backends.add(backendRowV2);
}
});
return backends;
}

public String getAvailableBackend() {
long tmp = pos + backends.size();
while (pos < tmp) {
BackendV2.BackendRowV2 backend = backends.get((int) (pos % backends.size()));
BackendV2.BackendRowV2 backend = backends.get((int) (pos++ % backends.size()));
JNSimba marked this conversation as resolved.
Show resolved Hide resolved
String res = backend.toBackendString();
if(tryHttpConnection(res)){
pos++;
if (tryHttpConnection(res)) {
return res;
}
}
Expand All @@ -60,7 +85,6 @@ public boolean tryHttpConnection(String backend) {
return true;
} catch (Exception ex) {
LOG.warn("Failed to connect to backend:{}", backend, ex);
pos++;
return false;
}
}
Expand Down
Expand Up @@ -29,6 +29,8 @@
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;

import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
Expand Down Expand Up @@ -93,7 +95,9 @@ public DorisBatchStreamLoad(DorisOptions dorisOptions,
DorisReadOptions dorisReadOptions,
DorisExecutionOptions executionOptions,
LabelGenerator labelGenerator) {
this.backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.hostPort = backendUtil.getAvailableBackend();
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
this.db = tableInfo[0];
Expand Down
Expand Up @@ -17,6 +17,7 @@

package org.apache.doris.flink.sink.committer;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.connector.sink.Committer;

import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -25,6 +26,7 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
Expand Down Expand Up @@ -55,6 +57,7 @@ public class DorisCommitter implements Committer<DorisCommittable> {
private final DorisOptions dorisOptions;
private final DorisReadOptions dorisReadOptions;
private final ObjectMapper jsonMapper = new ObjectMapper();
private final BackendUtil backendUtil;

int maxRetry;

Expand All @@ -67,6 +70,9 @@ public DorisCommitter(DorisOptions dorisOptions, DorisReadOptions dorisReadOptio
this.dorisReadOptions = dorisReadOptions;
this.maxRetry = maxRetry;
this.httpClient = client;
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
}

@Override
Expand Down Expand Up @@ -116,13 +122,13 @@ private void commitTransaction(DorisCommittable committable) throws IOException
if (retry == maxRetry) {
throw new DorisRuntimeException("stream load error: " + reasonPhrase);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
hostPort = backendUtil.getAvailableBackend();
} catch (IOException e) {
LOG.error("commit transaction failed: ", e);
if (retry == maxRetry) {
throw new IOException("commit transaction failed: {}", e);
}
hostPort = RestService.getBackend(dorisOptions, dorisReadOptions, LOG);
hostPort = backendUtil.getAvailableBackend();
}
}
}
Expand Down
Expand Up @@ -27,6 +27,8 @@
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpUtil;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
Expand Down Expand Up @@ -99,8 +101,9 @@ public DorisWriter(Sink.InitContext initContext,
}

public void initializeLoad(List<DorisWriterState> state) throws IOException {
//cache backend
backendUtil = new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
this.backendUtil = StringUtils.isNotEmpty(dorisOptions.getBenodes()) ? new BackendUtil(
dorisOptions.getBenodes())
: new BackendUtil(RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
try {
this.dorisStreamLoad = new DorisStreamLoad(
backendUtil.getAvailableBackend(),
Expand Down
Expand Up @@ -40,6 +40,7 @@ public class DorisConfigOptions {
public static final String IDENTIFIER = "doris";
// common option
public static final ConfigOption<String> FENODES = ConfigOptions.key("fenodes").stringType().noDefaultValue().withDescription("doris fe http address.");
public static final ConfigOption<String> BENODES = ConfigOptions.key("benodes").stringType().noDefaultValue().withDescription("doris be http address.");
public static final ConfigOption<String> TABLE_IDENTIFIER = ConfigOptions.key("table.identifier").stringType().noDefaultValue().withDescription("the doris table name.");
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username").stringType().noDefaultValue().withDescription("the doris user name.");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password").stringType().noDefaultValue().withDescription("the doris password.");
Expand Down
Expand Up @@ -20,6 +20,8 @@
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;

import static org.apache.doris.flink.table.DorisConfigOptions.BENODES;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableSchema;
Expand Down Expand Up @@ -103,6 +105,7 @@ public Set<ConfigOption<?>> requiredOptions() {
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(FENODES);
options.add(BENODES);
options.add(TABLE_IDENTIFIER);
options.add(USERNAME);
options.add(PASSWORD);
Expand Down Expand Up @@ -169,8 +172,10 @@ public DynamicTableSource createDynamicTableSource(Context context) {

private DorisOptions getDorisOptions(ReadableConfig readableConfig) {
final String fenodes = readableConfig.get(FENODES);
final String benodes = readableConfig.get(BENODES);
final DorisOptions.Builder builder = DorisOptions.builder()
.setFenodes(fenodes)
.setBenodes(benodes)
.setJdbcUrl(readableConfig.get(JDBC_URL))
.setTableIdentifier(readableConfig.get(TABLE_IDENTIFIER));

Expand Down
Expand Up @@ -105,6 +105,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
}
DorisRowDataInputFormat.Builder builder = DorisRowDataInputFormat.builder()
.setFenodes(options.getFenodes())
.setBenodes(options.getBenodes())
.setUsername(options.getUsername())
.setPassword(options.getPassword())
.setTableIdentifier(options.getTableIdentifier())
Expand Down
Expand Up @@ -198,6 +198,11 @@ public Builder setFenodes(String fenodes) {
return this;
}

public Builder setBenodes(String benodes) {
this.optionsBuilder.setBenodes(benodes);
return this;
}

public Builder setUsername(String username) {
this.optionsBuilder.setUsername(username);
return this;
Expand Down
Expand Up @@ -153,13 +153,15 @@ private DorisConnectionOptions getDorisConnectionOptions() {
*/
public DorisSink<String> buildDorisSink(String table) {
String fenodes = sinkConfig.getString(DorisConfigOptions.FENODES);
String benodes = sinkConfig.getString(DorisConfigOptions.BENODES);
String user = sinkConfig.getString(DorisConfigOptions.USERNAME);
String passwd = sinkConfig.getString(DorisConfigOptions.PASSWORD, "");
String labelPrefix = sinkConfig.getString(DorisConfigOptions.SINK_LABEL_PREFIX);

DorisSink.Builder<String> builder = DorisSink.builder();
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes(fenodes)
.setBenodes(benodes)
.setTableIdentifier(database + "." + table)
.setUsername(user)
.setPassword(passwd);
Expand Down