Skip to content

Commit

Permalink
Merge c33ce16 into 8b8d4c3
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Mar 24, 2020
2 parents 8b8d4c3 + c33ce16 commit 43fb327
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 48 deletions.
Expand Up @@ -16,24 +16,20 @@

import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.iotdb.db.metrics.ui.MetricsPage;
import org.apache.iotdb.db.service.TSServiceImpl;

public class QueryServlet extends HttpServlet {

private static final long serialVersionUID = 1L;

private List<SqlArgument> list = TSServiceImpl.sqlArgumentsList;
private MetricsPage page;

public QueryServlet(MetricsPage page) {
this.page = page;
this.page.setList(list);
}

@Override
Expand Down
Expand Up @@ -20,10 +20,12 @@
import java.io.InputStreamReader;
import java.net.URL;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.metrics.server.SqlArgument;
import org.apache.iotdb.db.service.TSServiceImpl;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -33,15 +35,6 @@ public class MetricsPage {

private static final Logger logger = LoggerFactory.getLogger(MetricsPage.class);
private MetricRegistry mr;
private List<SqlArgument> list;

public List<SqlArgument> getList() {
return list;
}

public void setList(List<SqlArgument> list) {
this.list = list;
}

public MetricsPage(MetricRegistry metricRegistry) {
this.mr = metricRegistry;
Expand Down Expand Up @@ -97,8 +90,10 @@ public StringBuilder sqlRow() {
TSExecuteStatementResp resp;
String errMsg;
int statusCode;
for (int i = (list.size() - 1); i >= 0; i--) {
sqlArgument = list.get(i);

List<SqlArgument> readCopy = TSServiceImpl.getSqlArgumentList();
for (int i = (readCopy.size() - 1); i >= 0; i--) {
sqlArgument = readCopy.get(i);
resp = sqlArgument.getTSExecuteStatementResp();
errMsg = resp.getStatus().message;
statusCode = resp.getStatus().code;
Expand Down
62 changes: 29 additions & 33 deletions server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
Expand Up @@ -31,7 +31,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.antlr.v4.runtime.misc.ParseCancellationException;
Expand Down Expand Up @@ -133,7 +132,9 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
private static final int DELETE_SIZE = 50;
private static final String ERROR_PARSING_SQL =
"meet error while parsing SQL to physical plan: {}";
public static Vector<SqlArgument> sqlArgumentsList = new Vector<>();

private boolean enableMetric = IoTDBDescriptor.getInstance().getConfig().isEnableMetricService();
private static final List<SqlArgument> sqlArgumentList = new ArrayList<>(MAX_SIZE);

protected Planner processor;
protected IPlanExecutor executor;
Expand Down Expand Up @@ -483,9 +484,6 @@ private boolean executeStatementInBatch(String statement, List<TSStatus> result,

@Override
public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
long startTime = System.currentTimeMillis();
TSExecuteStatementResp resp;
SqlArgument sqlArgument;
try {
if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
Expand All @@ -500,19 +498,8 @@ public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
PhysicalPlan physicalPlan =
processor.parseSQLToPhysicalPlan(statement, sessionIdZoneIdMap.get(req.getSessionId()));
if (physicalPlan.isQuery()) {
resp =
internalExecuteQueryStatement(
req.statementId,
physicalPlan,
req.fetchSize,
return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
long endTime = System.currentTimeMillis();
sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
sqlArgumentsList.add(sqlArgument);
if (sqlArgumentsList.size() > MAX_SIZE) {
sqlArgumentsList.subList(0, DELETE_SIZE).clear();
}
return resp;
} else {
return executeUpdateStatement(physicalPlan, req.getSessionId());
}
Expand All @@ -538,10 +525,6 @@ public TSExecuteStatementResp executeStatement(TSExecuteStatementReq req) {
@Override
public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
try {
long startTime = System.currentTimeMillis();
TSExecuteStatementResp resp;
SqlArgument sqlArgument;

if (!checkLogin(req.getSessionId())) {
logger.info(INFO_NOT_LOGIN, IoTDBConstant.GLOBAL_DB_NAME);
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.NOT_LOGIN_ERROR);
Expand All @@ -562,16 +545,9 @@ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
TSStatusCode.EXECUTE_STATEMENT_ERROR, "Statement is not a query statement.");
}

resp = internalExecuteQueryStatement(
req.statementId, physicalPlan, req.fetchSize,
return internalExecuteQueryStatement(statement, req.statementId, physicalPlan, req.fetchSize,
sessionIdUsernameMap.get(req.getSessionId()));
long endTime = System.currentTimeMillis();
sqlArgument = new SqlArgument(resp, physicalPlan, statement, startTime, endTime);
sqlArgumentsList.add(sqlArgument);
if (sqlArgumentsList.size() > MAX_SIZE) {
sqlArgumentsList.subList(0, DELETE_SIZE).clear();
}
return resp;

} catch (ParseCancellationException e) {
logger.debug(e.getMessage());
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.SQL_PARSE_ERROR,
Expand All @@ -587,9 +563,9 @@ public TSExecuteStatementResp executeQueryStatement(TSExecuteStatementReq req) {
* @param plan must be a plan for Query: FillQueryPlan, AggregationPlan, GroupByPlan, some
* AuthorPlan
*/
private TSExecuteStatementResp internalExecuteQueryStatement(
private TSExecuteStatementResp internalExecuteQueryStatement(String statement,
long statementId, PhysicalPlan plan, int fetchSize, String username) {
long t1 = System.currentTimeMillis();
long startTime = System.currentTimeMillis();
long queryId = -1;
try {
TSExecuteStatementResp resp = getQueryResp(plan, username); // column headers
Expand Down Expand Up @@ -625,6 +601,18 @@ private TSExecuteStatementResp internalExecuteQueryStatement(
resp.setQueryDataSet(result);
}
resp.setQueryId(queryId);

if (enableMetric) {
long endTime = System.currentTimeMillis();
SqlArgument sqlArgument = new SqlArgument(resp, plan, statement, startTime, endTime);
synchronized (sqlArgumentList) {
sqlArgumentList.add(sqlArgument);
if (sqlArgumentList.size() >= MAX_SIZE) {
sqlArgumentList.subList(0, DELETE_SIZE).clear();
}
}
}

return resp;
} catch (Exception e) {
logger.error("{}: Internal server error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
Expand All @@ -637,7 +625,7 @@ private TSExecuteStatementResp internalExecuteQueryStatement(
}
return RpcUtils.getTSExecuteStatementResp(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
} finally {
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, t1);
Measurement.INSTANCE.addOperationLatency(Operation.EXECUTE_QUERY, startTime);
}
}

Expand Down Expand Up @@ -1323,6 +1311,14 @@ protected TSStatus executePlan(PhysicalPlan plan) {
: RpcUtils.getStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR);
}

public static List<SqlArgument> getSqlArgumentList() {
List<SqlArgument> readCopy;
synchronized (sqlArgumentList) {
readCopy = new ArrayList<>(sqlArgumentList);
}
return readCopy;
}

private long generateQueryId(boolean isDataQuery) {
return QueryResourceManager.getInstance().assignQueryId(isDataQuery);
}
Expand Down

0 comments on commit 43fb327

Please sign in to comment.