Skip to content
Permalink
Browse files
[HTTP][API] Add backends info API for spark/flink connector (#6984)
Doris should provide a http api to return backends list for connectors to submit stream load,
and without privilege checking, which can let common user to use it
  • Loading branch information
morningman committed Nov 5, 2021
1 parent bbfb03c commit 6f1474e7f7bc57f49320dd4468f6d08aba612919
Showing 4 changed files with 136 additions and 4 deletions.
@@ -32,6 +32,7 @@
import org.apache.doris.flink.exception.ShouldNeverHappenException;
import org.apache.doris.flink.rest.models.Backend;
import org.apache.doris.flink.rest.models.BackendRow;
import org.apache.doris.flink.rest.models.BackendV2;
import org.apache.doris.flink.rest.models.QueryPlan;
import org.apache.doris.flink.rest.models.Schema;
import org.apache.doris.flink.rest.models.Tablet;
@@ -83,7 +84,9 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
@Deprecated
private static final String BACKENDS = "/rest/v1/system?path=//backends";
private static final String BACKENDS_V2 = "/api/backends?is_aliva=true";
private static final String FE_LOGIN = "/rest/v1/login";

/**
@@ -250,25 +253,29 @@ static String randomEndpoint(String feNodes, Logger logger) throws IllegalArgume
*/
@VisibleForTesting
public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
List<BackendRow> backends = getBackends(options, readOptions, logger);
List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
Collections.shuffle(backends);
BackendRow backend = backends.get(0);
return backend.getIP() + ":" + backend.getHttpPort();
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
}

/**
* get Doris BE nodes to request.
* get Doris BE nodes to request.
*
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*
* This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users.
* Use getBackendsV2 instead
*/
@Deprecated
@VisibleForTesting
static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
@@ -281,6 +288,7 @@ static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readO
return backends;
}

@Deprecated
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
ObjectMapper mapper = new ObjectMapper();
Backend backend;
@@ -309,6 +317,54 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
return backendRows;
}

/**
* get Doris BE nodes to request.
*
* @param options configuration of request
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
String feNodes = options.getFenodes();
String feNode = randomEndpoint(feNodes, logger);
String beUrl = "http://" + feNode + BACKENDS_V2;
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, readOptions, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
return backends;
}

static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException {
ObjectMapper mapper = new ObjectMapper();
BackendV2 backend;
try {
backend = mapper.readValue(response, BackendV2.class);
} catch (JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (JsonMappingException e) {
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}

if (backend == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}

/**
* get a valid URI to connect Doris FE.
*
@@ -25,6 +25,7 @@
/**
* Be response model
**/
@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class Backend {

@@ -20,6 +20,7 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendRow {

@@ -0,0 +1,74 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.flink.rest.models;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {

@JsonProperty(value = "backends")
private List<BackendRowV2> backends;

public List<BackendRowV2> getBackends() {
return backends;
}

public void setBackends(List<BackendRowV2> backends) {
this.backends = backends;
}

public static class BackendRowV2 {
@JsonProperty("ip")
public String ip;
@JsonProperty("http_port")
public int httpPort;
@JsonProperty("is_alive")
public boolean isAlive;

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public int getHttpPort() {
return httpPort;
}

public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}

public boolean isAlive() {
return isAlive;
}

public void setAlive(boolean alive) {
isAlive = alive;
}
}
}

0 comments on commit 6f1474e

Please sign in to comment.