Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

扩展impala引擎,采用thrift原生接口,支持进度条以及服务终端重连等 #251

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 82 additions & 0 deletions ujes/definedEngines/impala/engine/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- ~ Copyright 2019 WeBank ~ ~ Licensed under the Apache License, Version
2.0 (the "License"); ~ you may not use this file except in compliance with
the License. ~ You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
~ ~ Unless required by applicable law or agreed to in writing, software ~
distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
License for the specific language governing permissions and ~ limitations
under the License. -->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>linkis</artifactId>
<groupId>com.webank.wedatasphere.linkis</groupId>
<version>0.9.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>linkis-impala-engine</artifactId>
<dependencies>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-ujes-engine</artifactId>
<version>${linkis.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-ujes-enginemanager</artifactId>
<version>${linkis.version}</version>
</dependency>
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-module</artifactId>
<version>${linkis.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.10</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.webank.wedatasphere.linkis.engine.impala.client;

import com.webank.wedatasphere.linkis.engine.impala.client.exception.SubmitException;
import com.webank.wedatasphere.linkis.engine.impala.client.exception.TransportException;
import com.webank.wedatasphere.linkis.engine.impala.client.protocol.ExecProgress;
import com.webank.wedatasphere.linkis.engine.impala.client.protocol.ExecStatus;
import com.webank.wedatasphere.linkis.engine.impala.client.protocol.ExecSummary;

/**
* Impala客户端
*
* @author dingqihuang
* @version Sep 20, 2019
*/
public interface ImpalaClient extends AutoCloseable {

/**
* 异步执行查询
*
* @param sql
* @param resultListener 查询结果的回调函数
* @return queryId 查询的唯一标识符
* @throws TransportException 传输错误
* @throws SubmitException 执行错误
*/
String executeAsync(String sql, ResultListener resultListener) throws TransportException, SubmitException;

/**
* 同步执行查询
*
* @param sql
* @param resultListener 查询结果的回调函数
* @throws TransportException 传输错误
* @throws SubmitException 执行错误
*/
void execute(String sql, ResultListener resultListener) throws TransportException, SubmitException;

/**
* 取消查询
*
* @param queryId 查询ID
* @throws TransportException 传输错误
*/
void cancel(String queryId) throws TransportException;

/**
* 查询执行状况
*
* @param queryId 查询ID
* @return
* @throws TransportException 传输错误
*/
ExecSummary getExecSummary(String queryId) throws TransportException;

/**
* 查询执行进度
*
* @param queryId 查询ID
* @return
* @throws TransportException 传输错误
*/
ExecProgress getExecProgress(String queryId) throws TransportException;

/**
* 查询执行状态
*
* @param queryId 查询ID
* @return
* @throws TransportException 传输错误
*/
ExecStatus getExecStatus(String queryId) throws TransportException;

/**
* 设置查询队列
*
* @param poolName 队列名称
* @throws TransportException 传输错误
* @see ImpalaClient.setQueryOption
*/
void setRequestPool(String poolName) throws TransportException;

/**
* 设置查询参数,详情见impala官网
*
* @param key
* @param value
* @throws TransportException 传输错误
*/
void setQueryOption(String key, String value) throws TransportException;

/**
* 重置查询参数
*
* @param key
* @throws TransportException
*/
void unsetQueryOption(String key) throws TransportException;

/**
* 获取正在运行的查询数目
*
* @return 数目
*/
int getExecutionCount();

/**
* 设置结果集缓冲区大小
*
* @param batchSize
*/
void setBatchSize(int batchSize);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.webank.wedatasphere.linkis.engine.impala.client;

import java.util.List;

/**
* Impala查询结果集
*
* @author dingqihuang
* @version Sep 20, 2019
*/
public interface ImpalaResultSet extends AutoCloseable {
/**
* 获取下一行数据
*
* @return true if exist
*/
boolean next();

public Object[] getValues();

public Object getObject(int columnIndex);

public <T> T getObject(int columnIndex, Class<T> clasz);

public String getString(int columnIndex);

public Short getShort(int columnIndex);

public Integer getInteger(int columnIndex);

public Long getLong(int columnIndex);

public int getColumnSize();

/*
* 获取字段类型
*/
public Class<?> getType(int columnIndex);

/**
* 获取字段信息
*
* @return
*/
public List<String> getColumns();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* @Title ImpalaTransport.java
* @description TODO
* @time Nov 5, 2019 3:42:15 PM
* @author dingqihuang
* @version 1.0
**/
package com.webank.wedatasphere.linkis.engine.impala.client;

import java.io.Closeable;

import javax.net.ssl.TrustManager;

/**
* @author dingqihuang
* @version Nov 5, 2019
*/
public abstract class ImpalaTransport<T> implements Closeable {

/**
* 按构造属性创建并打开连接
*
* @return 传输对象
* @throws Exception 建立连接失败
*/
public abstract T getTransport() throws Exception;

protected String host;
protected int port;
protected String username;
protected String password;
protected boolean useTicket;
protected String ticketBin;
protected boolean useSsl;
protected String sslType;
protected TrustManager[] trustManagers;
protected int connectionTimeout;

public void setHost(String host) {
this.host = host;
}

public void setPort(int port) {
this.port = port;
}

public void setUsername(String username) {
this.username = username;
}

public void setPassword(String password) {
this.password = password;
}

public void setUseTicket(boolean useTicket) {
this.useTicket = useTicket;
}

public void setTicketBin(String ticketBin) {
this.ticketBin = ticketBin;
}

public void setUseSsl(boolean useSsl) {
this.useSsl = useSsl;
}

public void setSslType(String sslType) {
this.sslType = sslType;
}

public void setTrustManagers(TrustManager[] trustManagers) {
this.trustManagers = trustManagers;
}

public void setConnectionTimeout(int connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.webank.wedatasphere.linkis.engine.impala.client;

import java.util.List;

import com.webank.wedatasphere.linkis.engine.impala.client.protocol.ExecProgress;
import com.webank.wedatasphere.linkis.engine.impala.client.protocol.ExecStatus;

/**
* 查询监听器
*
* @author dingqihuang
* @version Sep 20, 2019
*/
public interface ResultListener {

/**
* 查询成功
*
* @param resultSet
*/
void success(ImpalaResultSet resultSet);

/**
* 查询失败
*
* @param status 任务状态
*/
void error(ExecStatus status);

/**
* 进度提示,每隔固定时间返回执行进度
*
* @param progress 进度信息,进度数值小于零表示队列已满,任务正在等待执行
*/
void progress(ExecProgress progress);

/**
* 提示信息
*
* @param message 提示信息
*/
void message(List<String> message);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.webank.wedatasphere.linkis.engine.impala.client.exception;

import lombok.Getter;

/**
* 错误代码
*
* @author dingqihuang
* @version Sep 20, 2019
*/
@Getter
public enum ExceptionCode {
ClosedError("Session is closed."), ExecutionError("Server report an error."),
CommunicateError("Could not communicate with target host."), StillRunningError("Target is still running."),
InvalidHandleError("Current handle is invalid."), ParallelLimitError("Reach the parallel limit."),
LoginError("Failed to login to target server.");

private String message;

ExceptionCode(String message) {
this.message = message;
}
}