Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.Expose;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.catalog.json.CatalogGsonHelper;
Expand Down
3 changes: 3 additions & 0 deletions tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ public static enum ConfVars implements ConfigKey {
Validators.networkAddr()),
TAJO_MASTER_INFO_ADDRESS("tajo.master.info-http.address", "0.0.0.0:26080", Validators.networkAddr()),

// Tajo Rest Service
REST_SERVICE_PORT("tajo.rest.service.port", 26880),

// High availability configurations
TAJO_MASTER_HA_ENABLE("tajo.master.ha.enable", false, Validators.bool()),
TAJO_MASTER_HA_MONITOR_INTERVAL("tajo.master.ha.monitor.interval", 5 * 1000), // 5 sec
Expand Down
8 changes: 8 additions & 0 deletions tajo-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-thirdparty-asm</artifactId>
</dependency>
<dependency>
<groupId>org.apache.tajo</groupId>
<artifactId>tajo-ws-rs</artifactId>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down Expand Up @@ -367,6 +371,10 @@
<groupId>org.antlr</groupId>
<artifactId>antlr4</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
17 changes: 17 additions & 0 deletions tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.tajo.util.metrics.TajoSystemMetrics;
import org.apache.tajo.webapp.QueryExecutorServlet;
import org.apache.tajo.webapp.StaticHttpServer;
import org.apache.tajo.ws.rs.TajoRestService;

import java.io.*;
import java.lang.management.ManagementFactory;
Expand Down Expand Up @@ -119,6 +120,7 @@ public class TajoMaster extends CompositeService {
private WorkerResourceManager resourceManager;
//Web Server
private StaticHttpServer webServer;
private TajoRestService restServer;

private QueryManager queryManager;

Expand Down Expand Up @@ -193,6 +195,9 @@ public void serviceInit(Configuration _conf) throws Exception {

tajoMasterService = new QueryCoordinatorService(context);
addIfService(tajoMasterService);

restServer = new TajoRestService(context);
addIfService(restServer);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
throw e;
Expand Down Expand Up @@ -375,6 +380,14 @@ public void stop() {
LOG.error(e, e);
}
}

if (restServer != null) {
try {
restServer.stop();
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

if (webServer != null) {
try {
Expand Down Expand Up @@ -474,6 +487,10 @@ public HistoryWriter getHistoryWriter() {
public HistoryReader getHistoryReader() {
return historyReader;
}

public TajoRestService getRestServer() {
return restServer;
}
}

String getThreadTaskName(long id, String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public TableDesc getTableDesc() {
return tableDesc;
}

@Override
public int getCurrentRowNumber() {
return currentNumRows;
}

public void close() throws Exception {
if (scanExec != null) {
scanExec.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,6 @@ public interface NonForwardQueryResultScanner {

public void init() throws IOException;

public int getCurrentRowNumber();

}
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,11 @@ public TableDesc getTableDesc() {
public Schema getLogicalSchema() {
return outSchema;
}

@Override
public int getCurrentRowNumber() {
return currentRow;
}

class SimplePhysicalPlannerImpl extends PhysicalPlannerImpl {

Expand Down
179 changes: 179 additions & 0 deletions tajo-core/src/main/java/org/apache/tajo/ws/rs/ClientApplication.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tajo.ws.rs;

import org.apache.tajo.QueryId;
import org.apache.tajo.master.TajoMaster.MasterContext;
import org.apache.tajo.master.exec.NonForwardQueryResultScanner;
import org.apache.tajo.ws.rs.resources.ClusterResource;
import org.apache.tajo.ws.rs.resources.DatabasesResource;
import org.apache.tajo.ws.rs.resources.FunctionsResource;
import org.apache.tajo.ws.rs.resources.QueryResource;
import org.apache.tajo.ws.rs.resources.SessionsResource;
import org.apache.tajo.ws.rs.resources.TablesResource;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

import javax.ws.rs.core.Application;

import java.security.SecureRandom;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/**
* It loads client classes for Tajo REST protocol.
*/
public class ClientApplication extends Application {

private final MasterContext masterContext;
private final ConcurrentMap<QueryId, Long> queryIdToResultSetCacheIdMap;
private final Cache<Long, NonForwardQueryResultScanner> queryResultScannerCache;

private final SecureRandom secureRandom;

public ClientApplication(MasterContext masterContext) {
this.masterContext = masterContext;

this.secureRandom = new SecureRandom();

this.queryIdToResultSetCacheIdMap = new ConcurrentHashMap<QueryId, Long>();
this.queryResultScannerCache = CacheBuilder.newBuilder()
.concurrencyLevel(4)
.maximumSize(1000)
.expireAfterAccess(30, TimeUnit.MINUTES)
.build();
}

@Override
public Set<Class<?>> getClasses() {
Set<Class<?>> classes = new HashSet<Class<?>>();

classes.add(SessionsResource.class);
classes.add(DatabasesResource.class);
classes.add(TablesResource.class);
classes.add(FunctionsResource.class);
classes.add(ClusterResource.class);
classes.add(QueryResource.class);

return classes;
}

public MasterContext getMasterContext() {
return masterContext;
}

/**
* It returns generated 8-byte size integer.
*
* @return
*/
private long generateCacheId() {
byte[] generatedBytes = new byte[8];
long generatedId = 0;

secureRandom.nextBytes(generatedBytes);
for (byte generatedByte: generatedBytes) {
generatedId = (generatedId << 8) + (generatedByte & 0xff);
}

return generatedId;
}

/**
* If cannot find any cache id for supplied query id, it will generate a new cache id.
*
* @param queryId
* @return
*/
public long generateCacheIdIfAbsent(QueryId queryId) {
Long cacheId = this.queryIdToResultSetCacheIdMap.get(queryId);
long newCacheId = 0;

if (cacheId == null) {
boolean generated = false;
do {
newCacheId = generateCacheId();
if (queryResultScannerCache.getIfPresent(newCacheId) == null) {
generated = true;
}
} while (!generated);
cacheId = this.queryIdToResultSetCacheIdMap.putIfAbsent(queryId, newCacheId);
if (cacheId != null) {
newCacheId = cacheId.longValue();
}
} else {
newCacheId = cacheId.longValue();
}

return newCacheId;
}

/**
* get cached NonForwardResultScanner instance by query id and cache id.
*
* @param queryId
* @param cacheId
* @return
*/
public NonForwardQueryResultScanner getCachedNonForwardResultScanner(QueryId queryId, long cacheId) {
Long cachedCacheId = queryIdToResultSetCacheIdMap.get(queryId);

if (cachedCacheId == null) {
throw new RuntimeException("Supplied cache id " + cacheId + " was expired or invalid.");
}

if (cacheId != cachedCacheId.longValue()) {
throw new RuntimeException("Supplied cache id " + cacheId + " was expired or invalid. " +
"Please use the valid cache id.");
}

return queryResultScannerCache.getIfPresent(cachedCacheId);
}

/**
* Store NonForwardResultScanner instance to cached memory if not present.
*
* @param queryId
* @param cacheId
* @param resultScanner
*/
public NonForwardQueryResultScanner setCachedNonForwardResultScanner(QueryId queryId, long cacheId,
NonForwardQueryResultScanner resultScanner) {
NonForwardQueryResultScanner cachedScanner = null;

if (cacheId == 0) {
cacheId = generateCacheIdIfAbsent(queryId);
}

cachedScanner = getCachedNonForwardResultScanner(queryId, cacheId);
if (cachedScanner == null) {
cachedScanner = this.queryResultScannerCache.asMap().putIfAbsent(cacheId, resultScanner);
if (cachedScanner == null) {
cachedScanner = resultScanner;
}
}

return cachedScanner;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tajo.ws.rs;

import javax.ws.rs.core.Response;

/**
* Implements business flows for jersey resource
*
*/
public interface JerseyResourceDelegate {

public Response run(JerseyResourceDelegateContext context);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tajo.ws.rs;

import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* It holds variables for running deletegates
*/
public class JerseyResourceDelegateContext {

private final ConcurrentMap<JerseyResourceDelegateContextKey<?>, Object> contextMap =
new ConcurrentHashMap<JerseyResourceDelegateContextKey<?>, Object>();

/**
* Add value to Context. If value exists, it will overwrite.
*
* @param key
* @param value
* @return
*/
public <T> JerseyResourceDelegateContext put(JerseyResourceDelegateContextKey<T> key, T value) {
contextMap.put(key, value);
return this;
}

public <T> T get(JerseyResourceDelegateContextKey<T> key) {
Class<T> keyTypeClass = key.getType();
Object object = contextMap.get(key);
if (object != null) {
return keyTypeClass.cast(contextMap.get(key));
} else {
if (Boolean.class.isAssignableFrom(keyTypeClass)) {
return keyTypeClass.cast(false);
} else if (Number.class.isAssignableFrom(keyTypeClass)) {
try {
return keyTypeClass.getConstructor(String.class).newInstance("0");
} catch (Throwable e) {
return null;
}
} else {
return null;
}
}
}
}
Loading