Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
TAJO-1430: Improve SQLAnalyzer by session-based parsing-result caching.
Browse files Browse the repository at this point in the history
Closes #442

Signed-off-by: Jihoon Son <jihoonson@apache.org>
  • Loading branch information
dongjoon-hyun authored and jihoonson committed Apr 13, 2015
1 parent d57b16f commit 7d72088
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 31 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Release 0.11.0 - unreleased
TAJO-1135: Implement queryable virtual table for cluster information.
(jihun)

TAJO-1430: Improve SQLAnalyzer by session-based parsing-result caching.
(Contributed by Dongjoon Hyun, Committed by jihoon)

IMPROVEMENT

TAJO-1509: Use dedicated thread to release resource allocated to container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,18 @@ public boolean equalsTo(Expr expr) {
public Object clone() throws CloneNotSupportedException {
Aggregation aggregation = (Aggregation) super.clone();

aggregation.namedExprs = new NamedExpr[namedExprs.length];
for (int i = 0; i < namedExprs.length; i++) {
aggregation.namedExprs[i] = (NamedExpr) namedExprs[i].clone();
if (namedExprs != null) {
aggregation.namedExprs = new NamedExpr[namedExprs.length];
for (int i = 0; i < namedExprs.length; i++) {
aggregation.namedExprs[i] = (NamedExpr) namedExprs[i].clone();
}
}

aggregation.groups = new GroupElement[groups.length];
for (int i = 0; i < groups.length; i++) {
aggregation.groups[i] = (GroupElement) groups[i].clone();
if (groups != null) {
aggregation.groups = new GroupElement[groups.length];
for (int i = 0; i < groups.length; i++) {
aggregation.groups[i] = (GroupElement) groups[i].clone();
}
}
return aggregation;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,16 @@ public Object clone() throws CloneNotSupportedException {
alter.newTableName = newTableName;
alter.columnName = columnName;
alter.newColumnName = newColumnName;
alter.addNewColumn = (ColumnDefinition) addNewColumn.clone();
if (addNewColumn != null) {
alter.addNewColumn = (ColumnDefinition) addNewColumn.clone();
}
alter.alterTableOpType = alterTableOpType;
alter.columns = columns;
alter.values = values;
alter.location = location;
alter.params = new HashMap<String, String>(params);
if (params != null) {
alter.params = new HashMap<String, String>(params);
}
return alter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Object clone() throws CloneNotSupportedException {
between.not = not;
between.symmetric = symmetric;
between.predicand = (Expr) predicand.clone();
between.begin = (Expr) between.clone();
between.begin = (Expr) begin.clone();
between.end = (Expr) end.clone();
return between;
}
Expand Down
46 changes: 30 additions & 16 deletions tajo-algebra/src/main/java/org/apache/tajo/algebra/CreateTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,12 @@ public Object clone() throws CloneNotSupportedException {
createTable.storageType = storageType;
createTable.location = location;
createTable.subquery = subquery;
createTable.params = new HashMap<String, String>(params);
createTable.partition = (PartitionMethodDescExpr) partition.clone();
if (params != null) {
createTable.params = new HashMap<String, String>(params);
}
if (partition != null) {
createTable.partition = (PartitionMethodDescExpr) partition.clone();
}
createTable.ifNotExists = ifNotExists;
return createTable;
}
Expand Down Expand Up @@ -305,11 +309,13 @@ public boolean equals(Object object) {
@Override
public Object clone() throws CloneNotSupportedException {
RangePartition range = (RangePartition) super.clone();
range.columns = new ColumnReferenceExpr[columns.length];
for (int i = 0; i < columns.length; i++) {
range.columns[i] = (ColumnReferenceExpr) columns[i].clone();
if (columns != null) {
range.columns = new ColumnReferenceExpr[columns.length];
for (int i = 0; i < columns.length; i++) {
range.columns[i] = (ColumnReferenceExpr) columns[i].clone();
}
}
if (range.specifiers != null) {
if (specifiers != null) {
range.specifiers = new ArrayList<RangePartitionSpecifier>();
for (int i = 0; i < specifiers.size(); i++) {
range.specifiers.add(specifiers.get(i));
Expand Down Expand Up @@ -376,9 +382,11 @@ public boolean equals(Object object) {
@Override
public Object clone() throws CloneNotSupportedException {
HashPartition hash = (HashPartition) super.clone();
hash.columns = new ColumnReferenceExpr[columns.length];
for (int i = 0; i < columns.length; i++) {
hash.columns[i] = (ColumnReferenceExpr) columns[i].clone();
if (columns != null) {
hash.columns = new ColumnReferenceExpr[columns.length];
for (int i = 0; i < columns.length; i++) {
hash.columns[i] = (ColumnReferenceExpr) columns[i].clone();
}
}
hash.quantity = quantity;
if (specifiers != null) {
Expand Down Expand Up @@ -428,9 +436,11 @@ public boolean equals(Object object) {
@Override
public Object clone() throws CloneNotSupportedException {
ListPartition listPartition = (ListPartition) super.clone();
listPartition.columns = new ColumnReferenceExpr[columns.length];
for (int i = 0; i < columns.length; i++) {
listPartition.columns[i] = (ColumnReferenceExpr) columns[i].clone();
if (columns != null) {
listPartition.columns = new ColumnReferenceExpr[columns.length];
for (int i = 0; i < columns.length; i++) {
listPartition.columns[i] = (ColumnReferenceExpr) columns[i].clone();
}
}
if (specifiers != null) {
listPartition.specifiers = new ArrayList<ListPartitionSpecifier>();
Expand Down Expand Up @@ -472,9 +482,11 @@ public boolean equals(Object object) {
@Override
public Object clone() throws CloneNotSupportedException {
ColumnPartition columnPartition = (ColumnPartition) super.clone();
columnPartition.columns = new ColumnDefinition[columns.length];
for (int i = 0; i < columns.length; i++) {
columnPartition.columns[i] = (ColumnDefinition) columns[i].clone();
if (columns != null) {
columnPartition.columns = new ColumnDefinition[columns.length];
for (int i = 0; i < columns.length; i++) {
columnPartition.columns[i] = (ColumnDefinition) columns[i].clone();
}
}
return columnPartition;
}
Expand Down Expand Up @@ -524,7 +536,9 @@ public boolean equals(Object o) {
@Override
public Object clone() throws CloneNotSupportedException {
RangePartitionSpecifier specifier = (RangePartitionSpecifier) super.clone();
specifier.end = (Expr) end.clone();
if (end != null) {
specifier.end = (Expr) end.clone();
}
specifier.maxValue = maxValue;
return specifier;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ public Object clone() throws CloneNotSupportedException {
insert.storageType = storageType;
insert.location = location;
insert.subquery = (Expr) subquery.clone();
insert.params = new HashMap<String, String>(params);
if (params != null) {
insert.params = new HashMap<String, String>(params);
}
return insert;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ public int hashCode() {
@Override
public Object clone() throws CloneNotSupportedException {
UnaryOperator unaryOperator = (UnaryOperator) super.clone();
unaryOperator.child = (Expr) child.clone();
if (child != null) {
unaryOperator.child = (Expr) child.clone();
}
return unaryOperator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public Object clone() throws CloneNotSupportedException {
ValueListExpr valueListExpr = (ValueListExpr) super.clone();
valueListExpr.values = new Expr[values.length];
for (int i = 0; i < values.length; i++) {
valueListExpr.values = (Expr[]) values[i].clone();
valueListExpr.values[i] = (Expr) values[i].clone();
}
return valueListExpr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ public static enum ConfVars implements ConfigKey {

// Query Configuration
QUERY_SESSION_TIMEOUT("tajo.query.session.timeout-sec", 60, Validators.min("0")),
QUERY_SESSION_QUERY_CACHE_SIZE("tajo.query.session.query-cache-size-kb", 1024, Validators.min("0")),

// Shuffle Configuration --------------------------------------------------
PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")),
Expand Down
44 changes: 41 additions & 3 deletions tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.tajo.master;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.Weigher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
Expand All @@ -32,7 +36,9 @@
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableDesc;
import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.engine.parser.SQLAnalyzer;
import org.apache.tajo.engine.parser.SQLSyntaxError;
import org.apache.tajo.engine.query.QueryContext;
import org.apache.tajo.ipc.ClientProtos;
import org.apache.tajo.master.TajoMaster.MasterContext;
Expand All @@ -54,6 +60,7 @@

import java.io.IOException;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;

import static org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse;

Expand Down Expand Up @@ -143,6 +150,25 @@ private QueryContext createQueryContext(Session session) {
newQueryContext.putAll(CommonTestingUtil.getSessionVarsForTest());
}

// Set queryCache in session
int queryCacheSize = context.getConf().getIntVar(TajoConf.ConfVars.QUERY_SESSION_QUERY_CACHE_SIZE);
if (queryCacheSize > 0 && session.getQueryCache() == null) {
Weigher<String, Expr> weighByLength = new Weigher<String, Expr>() {
public int weigh(String key, Expr expr) {
return key.length();
}
};
LoadingCache<String, Expr> cache = CacheBuilder.newBuilder()
.maximumWeight(queryCacheSize * 1024)
.weigher(weighByLength)
.expireAfterAccess(1, TimeUnit.HOURS)
.build(new CacheLoader<String, Expr>() {
public Expr load(String sql) throws SQLSyntaxError {
return analyzer.parse(sql);
}
});
session.setQueryCache(cache);
}
return newQueryContext;
}

Expand All @@ -155,7 +181,7 @@ public SubmitQueryResponse executeQuery(Session session, String query, boolean i
if (isJson) {
planningContext = buildExpressionFromJson(query);
} else {
planningContext = buildExpressionFromSql(query);
planningContext = buildExpressionFromSql(query, session);
}

String jsonExpr = planningContext.toJson();
Expand Down Expand Up @@ -184,10 +210,22 @@ public Expr buildExpressionFromJson(String json) {
return JsonHelper.fromJson(json, Expr.class);
}

public Expr buildExpressionFromSql(String sql) throws InterruptedException, IOException,
public Expr buildExpressionFromSql(String sql, Session session) throws InterruptedException, IOException,
IllegalQueryStatusException {
context.getSystemMetrics().counter("Query", "totalQuery").inc();
return analyzer.parse(sql);
try {
if (session.getQueryCache() == null) {
return analyzer.parse(sql);
} else {
return (Expr) session.getQueryCache().get(sql.trim()).clone();
}
} catch (Exception e) {
if (e.getCause() instanceof SQLSyntaxError) {
throw (SQLSyntaxError) e.getCause();
} else {
throw new SQLSyntaxError(e.getCause().getMessage());
}
}
}

public QueryId updateQuery(QueryContext queryContext, String sql, boolean isJson) throws IOException,
Expand Down
11 changes: 11 additions & 0 deletions tajo-core/src/main/java/org/apache/tajo/session/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@

package org.apache.tajo.session;

import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
import org.apache.tajo.SessionVars;
import org.apache.tajo.algebra.Expr;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.util.KeyValueSet;
import org.apache.tajo.common.ProtoObject;
Expand All @@ -40,6 +42,7 @@ public class Session implements SessionConstants, ProtoObject<SessionProto>, Clo
private String currentDatabase;
private final Map<String, String> sessionVariables;
private final Map<QueryId, NonForwardQueryResultScanner> nonForwardQueryMap = new HashMap<QueryId, NonForwardQueryResultScanner>();
private LoadingCache<String, Expr> cache;

// transient status
private volatile long lastAccessTime;
Expand Down Expand Up @@ -121,6 +124,14 @@ public synchronized String getCurrentDatabase() {
return currentDatabase;
}

public synchronized void setQueryCache(LoadingCache<String, Expr> cache) {
this.cache = cache;
}

public synchronized LoadingCache<String, Expr> getQueryCache() {
return cache;
}

@Override
public SessionProto getProto() {
SessionProto.Builder builder = SessionProto.newBuilder();
Expand Down

0 comments on commit 7d72088

Please sign in to comment.