Skip to content
Permalink
Browse files
[HTTP][API] Add backends info API for spark/flink connector (#6984)
Doris should provide a http api to return backends list for connectors to submit stream load,
and without privilege checking, which can let common user to use it
  • Loading branch information
morningman committed Nov 5, 2021
1 parent 00589ff commit 27b72a734db7f7aa78a6fea091ca3c72b1f18198
Showing 9 changed files with 180 additions and 10 deletions.
@@ -39,7 +39,6 @@ fi
# check maven
MVN_CMD=mvn


if [[ ! -z ${CUSTOM_MVN} ]]; then
MVN_CMD=${CUSTOM_MVN}
fi
@@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then
exit 1
fi
export MVN_CMD
if [ $1 == 3 ]

usage() {
echo "
Eg.
$0 2 build with spark 2.x
$0 3 build with spark 3.x
"
exit 1
}

if [ $# == 0 ]; then
usage
fi


if [ "$1"x == "3x" ]
then
${MVN_CMD} clean package -f pom_3.0.xml
fi
if [ $1 == 2 ]
if [ "$1"x == "2x" ]
then
${MVN_CMD} clean package
fi
@@ -161,6 +161,12 @@
<version>4.1.27.Final</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>

</dependencies>

@@ -75,7 +75,7 @@ public DorisStreamLoad(String hostPort, String db, String tbl, String user, Stri
}

public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
String hostPort = RestService.randomBackend(settings, LOG);
String hostPort = RestService.randomBackendV2(settings, LOG);
this.hostPort = hostPort;
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
this.db = dbTable[0];
@@ -62,6 +62,7 @@
import org.apache.doris.spark.exception.ShouldNeverHappenException;
import org.apache.doris.spark.rest.models.Backend;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
import org.apache.doris.spark.rest.models.Tablet;
@@ -86,8 +87,9 @@ public class RestService implements Serializable {
private static final String API_PREFIX = "/api";
private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
@Deprecated
private static final String BACKENDS = "/rest/v1/system?path=//backends";

private static final String BACKENDS_V2 = "/api/backends?is_alive=true";

/**
* send request to Doris FE and get response json string.
@@ -478,14 +480,17 @@ static int tabletCountLimitForOnePartition(Settings cfg, Logger logger) {
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
* Deprecated, use randomBackendV2 instead
*/
@Deprecated
@VisibleForTesting
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS,feNode);
String beUrl = String.format("http://%s" + BACKENDS, feNode);
HttpGet httpGet = new HttpGet(beUrl);
String response = send(sparkSettings,httpGet, logger);
logger.info("Backend Info:{}",response);
String response = send(sparkSettings, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendRow> backends = parseBackend(response, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
@@ -497,14 +502,14 @@ public static String randomBackend(SparkSettings sparkSettings , Logger logger)
return backend.getIP() + ":" + backend.getHttpPort();
}


/**
* translate Doris FE response to inner {@link BackendRow} struct.
* @param response Doris FE response
* @param logger {@link Logger}
* @return inner {@link List<BackendRow>} struct
* @throws DorisException,IOException throw when translate failed
* */
@Deprecated
@VisibleForTesting
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
@@ -534,6 +539,59 @@ static List<BackendRow> parseBackend(String response, Logger logger) throws Dori
return backendRows;
}

/**
* choice a Doris BE node to request.
* @param logger slf4j logger
* @return the chosen one Doris BE node
* @throws IllegalArgumentException BE nodes is illegal
*/
@VisibleForTesting
public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
String feNode = randomEndpoint(feNodes, logger);
String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
HttpGet httpGet = new HttpGet(beUrl);
String response = send(sparkSettings, httpGet, logger);
logger.info("Backend Info:{}", response);
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
logger.trace("Parse beNodes '{}'.", backends);
if (backends == null || backends.isEmpty()) {
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
}
Collections.shuffle(backends);
BackendV2.BackendRowV2 backend = backends.get(0);
return backend.getIp() + ":" + backend.getHttpPort();
}

static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
BackendV2 backend;
try {
backend = mapper.readValue(response, BackendV2.class);
} catch (com.fasterxml.jackson.core.JsonParseException e) {
String errMsg = "Doris BE's response is not a json. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (com.fasterxml.jackson.databind.JsonMappingException e) {
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
} catch (IOException e) {
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
logger.error(errMsg, e);
throw new DorisException(errMsg, e);
}

if (backend == null) {
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
throw new ShouldNeverHappenException();
}
List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
logger.debug("Parsing schema result is '{}'.", backendRows);
return backendRows;
}

/**
* translate BE tablets map to Doris RDD partition.
* @param cfg configuration of request
@@ -23,6 +23,7 @@
/**
* Be response model
**/
@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class Backend {

@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.rest.models;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

@Deprecated
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendRow {

@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.spark.rest.models;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.List;

/**
* Be response model
**/
@JsonIgnoreProperties(ignoreUnknown = true)
public class BackendV2 {

@JsonProperty(value = "backends")
private List<BackendRowV2> backends;

public List<BackendRowV2> getBackends() {
return backends;
}

public void setRows(List<BackendRowV2> rows) {
this.backends = rows;
}

public static class BackendRowV2 {
@JsonProperty("ip")
public String ip;
@JsonProperty("http_port")
public int httpPort;
@JsonProperty("is_alive")
public boolean isAlive;

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public int getHttpPort() {
return httpPort;
}

public void setHttpPort(int httpPort) {
this.httpPort = httpPort;
}

public boolean isAlive() {
return isAlive;
}

public void setAlive(boolean alive) {
isAlive = alive;
}
}
}
@@ -38,6 +38,7 @@
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.IllegalArgumentException;
import org.apache.doris.spark.rest.models.BackendRow;
import org.apache.doris.spark.rest.models.BackendV2;
import org.apache.doris.spark.rest.models.Field;
import org.apache.doris.spark.rest.models.QueryPlan;
import org.apache.doris.spark.rest.models.Schema;
@@ -49,6 +50,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jdk.nashorn.internal.ir.annotations.Ignore;

public class TestRestService {
private static Logger logger = LoggerFactory.getLogger(TestRestService.class);

@@ -295,7 +298,8 @@ public void testTabletsMapToPartition() throws Exception {
Assert.assertEquals(expected, actual);
}

@Test
@Deprecated
@Ignore
public void testParseBackend() throws Exception {
String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
"\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
@@ -313,4 +317,11 @@ public void testParseBackend() throws Exception {
List<BackendRow> backendRows = RestService.parseBackend(response, logger);
Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
}

@Test
public void testParseBackendV2() throws Exception {
String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}";
List<BackendV2.BackendRowV2> backendRows = RestService.parseBackendV2(response, logger);
Assert.assertEquals(2, backendRows.size());
}
}
@@ -19,8 +19,12 @@ package org.apache.doris.spark.sql

import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Ignore;
import org.junit.Test

// This test need real connect info to run.
// Set the connect info before comment out this @Ignore
@Ignore
class TestSparkConnector {
val dorisFeNodes = "your_fe_host:8030"
val dorisUser = "root"
@@ -107,3 +111,4 @@ class TestSparkConnector {
.start().awaitTermination()
}
}

0 comments on commit 27b72a7

Please sign in to comment.