Skip to content
This repository was archived by the owner on May 12, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,6 @@

package org.apache.tajo.ws.rs.resources;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Application;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tajo.QueryId;
Expand All @@ -54,13 +32,18 @@
import org.apache.tajo.session.InvalidSessionException;
import org.apache.tajo.session.Session;
import org.apache.tajo.util.TajoIdUtils;
import org.apache.tajo.ws.rs.JerseyResourceDelegate;
import org.apache.tajo.ws.rs.JerseyResourceDelegateContext;
import org.apache.tajo.ws.rs.JerseyResourceDelegateContextKey;
import org.apache.tajo.ws.rs.JerseyResourceDelegateUtil;
import org.apache.tajo.ws.rs.ResourcesUtil;
import org.apache.tajo.ws.rs.*;
import org.apache.tajo.ws.rs.requests.SubmitQueryRequest;

import javax.ws.rs.*;
import javax.ws.rs.core.*;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Path("/databases/{databaseName}/queries")
public class QueryResource {

Expand All @@ -87,7 +70,9 @@ public class QueryResource {
private static final String submitQueryRequestKeyName = "submitQueryRequest";
private static final String printTypeKeyName = "printType";
private static final String queryIdKeyName = "queryId";


private static final String defaultQueryInfoPrintType = "COMPLICATED";

private void initializeContext() {
context = new JerseyResourceDelegateContext();
JerseyResourceDelegateContextKey<UriInfo> uriInfoKey =
Expand Down Expand Up @@ -311,7 +296,8 @@ public Response run(JerseyResourceDelegateContext context) {
@GET
@Path("{queryId}")
@Produces(MediaType.APPLICATION_JSON)
public Response getQuery(@PathParam("queryId") String queryId, @QueryParam("print") String printType) {
public Response getQuery(@PathParam("queryId") String queryId,
@DefaultValue(defaultQueryInfoPrintType) @QueryParam("print") String printType) {
if (LOG.isDebugEnabled()) {
LOG.debug("Client sent a get query request.");
}
Expand All @@ -325,6 +311,7 @@ public Response getQuery(@PathParam("queryId") String queryId, @QueryParam("prin
context.put(queryIdKey, queryId);
JerseyResourceDelegateContextKey<String> printTypeKey =
JerseyResourceDelegateContextKey.valueOf(printTypeKeyName, String.class);

context.put(printTypeKey, printType);

response = JerseyResourceDelegateUtil.runJerseyResourceDelegate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public Response run(JerseyResourceDelegateContext context) {
public Response getQueryResultSet(@HeaderParam(QueryResource.tajoSessionIdHeaderName) String sessionId,
@PathParam("cacheId") String cacheId,
@DefaultValue("-1") @QueryParam("offset") int offset,
@DefaultValue("-1") @QueryParam("count") int count) {
@DefaultValue("100") @QueryParam("count") int count) {
if (LOG.isDebugEnabled()) {
LOG.debug("Client sent a get query result set request.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,7 @@

package org.apache.tajo.ws.rs.resources;

import java.net.URI;
import java.util.List;
import java.util.Map;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import com.google.gson.internal.StringMap;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf.ConfVars;
Expand All @@ -45,7 +34,16 @@
import org.junit.Before;
import org.junit.Test;

import com.google.gson.internal.StringMap;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.net.URI;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -106,7 +104,7 @@ private String generateNewSessionAndGetId() throws Exception {
public void testGetAllQueries() throws Exception {
String sessionId = generateNewSessionAndGetId();
SubmitQueryRequest queryRequest = createNewQueryRequest("select * from lineitem");

Response response = restClient.target(queriesURI)
.request().header(tajoSessionIdHeaderName, sessionId)
.post(Entity.entity(queryRequest, MediaType.APPLICATION_JSON));
Expand Down Expand Up @@ -143,7 +141,7 @@ public void testGetAllQueries() throws Exception {
public void testSubmitQuery() throws Exception {
String sessionId = generateNewSessionAndGetId();
SubmitQueryRequest queryRequest = createNewQueryRequest("select * from lineitem");

Response response = restClient.target(queriesURI)
.request().header(tajoSessionIdHeaderName, sessionId)
.post(Entity.entity(queryRequest, MediaType.APPLICATION_JSON));
Expand All @@ -167,4 +165,32 @@ public void testSubmitQuery() throws Exception {
assertNotNull(queryInfo);
assertEquals(queryId, queryInfo.getQueryIdStr());
}

@Test
public void testGetQueryInfoWithDefault() throws Exception {
String sessionId = generateNewSessionAndGetId();
SubmitQueryRequest queryRequest = createNewQueryRequest("select * from lineitem");

Response response = restClient.target(queriesURI)
.request().header(tajoSessionIdHeaderName, sessionId)
.post(Entity.entity(queryRequest, MediaType.APPLICATION_JSON));

assertNotNull(response);
assertEquals(Status.CREATED.getStatusCode(), response.getStatus());
String locationHeader = response.getHeaderString("Location");
assertTrue(locationHeader != null && !locationHeader.isEmpty());

String queryId = locationHeader.lastIndexOf('/') >= 0?
locationHeader.substring(locationHeader.lastIndexOf('/')+1):null;

assertTrue(queryId != null && !queryId.isEmpty());

QueryInfo queryInfo = restClient.target(queriesURI)
.path("/{queryId}")
.resolveTemplate("queryId", queryId)
.request().get(new GenericType<QueryInfo>(QueryInfo.class));

assertNotNull(queryInfo);
assertEquals(queryId, queryInfo.getQueryIdStr());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,7 @@

package org.apache.tajo.ws.rs.resources;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.net.URI;
import java.security.MessageDigest;
import java.util.List;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.compress.compressors.CompressorInputStream;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.tajo.QueryTestCaseBase;
import org.apache.tajo.TajoConstants;
import org.apache.tajo.conf.TajoConf.ConfVars;
Expand All @@ -55,6 +37,21 @@
import org.junit.Before;
import org.junit.Test;

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.InputStream;
import java.net.URI;
import java.security.MessageDigest;
import java.util.List;

import static org.junit.Assert.*;

public class TestQueryResultResource extends QueryTestCaseBase {
Expand Down Expand Up @@ -284,4 +281,65 @@ public void testGetQueryResultSetWithOffset() throws Exception {
assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
}
}

@Test
public void testGetQueryResultSetWithDefaultCount() throws Exception {
String sessionId = generateNewSessionAndGetId();
URI queryIdURI = sendNewQueryResquest(sessionId, "select * from lineitem");
URI queryResultURI = new URI(queryIdURI + "/result");

GetQueryResultDataResponse response = restClient.target(queryResultURI)
.request().header(tajoSessionIdHeaderName, sessionId)
.get(new GenericType<GetQueryResultDataResponse>(GetQueryResultDataResponse.class));

assertNotNull(response);
assertNotNull(response.getResultCode());
assertEquals(ResultCode.OK, response.getResultCode());
assertNotNull(response.getSchema());
assertEquals(16, response.getSchema().getRootColumns().size());
assertNotNull(response.getResultset());
assertTrue(response.getResultset().getId() != 0);
assertNotNull(response.getResultset().getLink());

URI queryResultSetURI = response.getResultset().getLink();

Response queryResultSetResponse = restClient.target(queryResultSetURI)
.request().header(tajoSessionIdHeaderName, sessionId)
.get();

assertNotNull(queryResultSetResponse);
String tajoDigest = queryResultSetResponse.getHeaderString(tajoDigestHeaderName);
assertTrue(tajoDigest != null && !tajoDigest.isEmpty());

DataInputStream queryResultSetInputStream =
new DataInputStream(new BufferedInputStream(queryResultSetResponse.readEntity(InputStream.class)));

assertNotNull(queryResultSetInputStream);

boolean isFinished = false;
List<Tuple> tupleList = TUtil.newList();
RowStoreUtil.RowStoreDecoder decoder = RowStoreUtil.createDecoder(response.getSchema());
MessageDigest messageDigest = MessageDigest.getInstance("SHA-1");
while (!isFinished) {
try {
int length = queryResultSetInputStream.readInt();
byte[] dataByteArray = new byte[length];
int readBytes = queryResultSetInputStream.read(dataByteArray);

assertEquals(length, readBytes);

tupleList.add(decoder.toTuple(dataByteArray));
messageDigest.update(dataByteArray);
} catch (EOFException eof) {
isFinished = true;
}
}

assertEquals(5, tupleList.size());
assertEquals(tajoDigest, Base64.encodeBase64String(messageDigest.digest()));

for (Tuple aTuple: tupleList) {
assertTrue(aTuple.getInt4(response.getSchema().getColumnId("l_orderkey")) > 0);
}
}
}