Skip to content
Permalink
Browse files
Spark return error to users when spark on doris query failed (#2531)
  • Loading branch information
Youngwb authored and imay committed Dec 30, 2019
1 parent 5d3f637 commit 817c819f5af8c12121362521114ff4feaca5b3cc
Showing 3 changed files with 41 additions and 2 deletions.
@@ -19,6 +19,8 @@

import org.apache.doris.spark.cfg.ConfigurationOptions;
import org.apache.doris.spark.exception.ConnectedFailedException;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.exception.DorisInternalException;
import org.apache.doris.spark.util.ErrorMessages;
import org.apache.doris.spark.cfg.Settings;
import org.apache.doris.spark.serialization.Routing;
@@ -151,16 +153,17 @@ public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedF
* @return scan batch result
* @throws ConnectedFailedException throw if cannot connect to Doris BE
*/
public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws ConnectedFailedException {
public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException {
logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams);
if (!isConnected) {
open();
}
TException ex = null;
TScanBatchResult result = null;
for (int attempt = 0; attempt < retries; ++attempt) {
logger.debug("Attempt {} to getNext {}.", attempt, routing);
try {
TScanBatchResult result = client.get_next(nextBatchParams);
result = client.get_next(nextBatchParams);
if (result == null) {
logger.warn("GetNext result from {} is null.", routing);
continue;
@@ -176,6 +179,12 @@ public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws Con
ex = e;
}
}
if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code()))) {
logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(),
result.getStatus().getError_msgs());
throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(),
result.getStatus().getError_msgs());
}
logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
throw new ConnectedFailedException(routing.toString(), ex);
}
@@ -0,0 +1,29 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.spark.exception;

import org.apache.doris.thrift.TStatusCode;

import java.util.List;

public class DorisInternalException extends DorisException {
public DorisInternalException(String server, TStatusCode statusCode, List<String> errorMsgs) {
super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs);
}

}
@@ -22,4 +22,5 @@ public abstract class ErrorMessages {
public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {} failed.";
public static final String ILLEGAL_ARGUMENT_MESSAGE = "argument '{}' is illegal, value is '{}'.";
public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come here.";
public static final String DORIS_INTERNAL_FAIL_MESSAGE = "Doris server '{}' internal failed, status is '{}', error message is '{}'";
}

0 comments on commit 817c819

Please sign in to comment.