Skip to content

Commit

Permalink
Adding Calcite SQL Parser and make another entry point to query Pinot (
Browse files Browse the repository at this point in the history
…#4387)

* Adding Calcite SQL Parser and make another entry point to query Pinot use SQL

* Make CalciteSqlCompiler to implement AbstractCompiler to handle pql/sql parsing logic seamlessly

* Make validation logics into utils

* Address comments

* update calcite-core exclusions
  • Loading branch information
xiangfu0 committed Jul 10, 2019
1 parent 52f69d1 commit a7c419a
Show file tree
Hide file tree
Showing 45 changed files with 2,628 additions and 184 deletions.
44 changes: 40 additions & 4 deletions pinot-api/src/main/java/org/apache/pinot/client/Connection.java
Expand Up @@ -66,7 +66,18 @@ public PreparedStatement prepareStatement(String statement) {
*/
public ResultSetGroup execute(String statement)
throws PinotClientException {
return execute(null, statement);
return execute(null, new Request("pql", statement));
}

/**
* Executes a Pinot Request.
* @param request The statement to execute
* @return The result of the query
* @throws PinotClientException If an exception occurs while processing the query
*/
public ResultSetGroup execute(Request request)
throws PinotClientException {
return execute(null, request);
}

/**
Expand All @@ -78,12 +89,24 @@ public ResultSetGroup execute(String statement)
*/
public ResultSetGroup execute(String tableName, String statement)
throws PinotClientException {
return execute(tableName, new Request("pql", statement));
}

/**
* Executes a Pinot Request.
*
* @param request The statement to execute
* @return The result of the query
* @throws PinotClientException If an exception occurs while processing the query
*/
public ResultSetGroup execute(String tableName, Request request)
throws PinotClientException {
String brokerHostPort = _brokerSelector.selectBroker(tableName);
if (brokerHostPort == null) {
throw new PinotClientException(
"Could not find broker to query for table: " + (tableName == null ? "null" : tableName));
}
BrokerResponse response = _transport.executeQuery(brokerHostPort, statement);
BrokerResponse response = _transport.executeQuery(brokerHostPort, request);
if (response.hasExceptions()) {
throw new PinotClientException("Query had processing exceptions: \n" + response.getExceptions());
}
Expand All @@ -99,12 +122,25 @@ public ResultSetGroup execute(String tableName, String statement)
*/
public Future<ResultSetGroup> executeAsync(String statement)
throws PinotClientException {
return executeAsync(new Request("pql", statement));
}

/**
* Executes a Pinot Request asynchronously.
*
* @param request The statement to execute
* @return A future containing the result of the query
* @throws PinotClientException If an exception occurs while processing the query
*/
public Future<ResultSetGroup> executeAsync(Request request)
throws PinotClientException {
String brokerHostPort = _brokerSelector.selectBroker(null);
if (brokerHostPort == null) {
throw new PinotClientException(
"Could not find broker to query for statement: " + (statement == null ? "null" : statement));
"Could not find broker to query for statement: " + (request.getQuery() == null ? "null"
: request.getQuery()));
}
final Future<BrokerResponse> responseFuture = _transport.executeQueryAsync(brokerHostPort, statement);
final Future<BrokerResponse> responseFuture = _transport.executeQueryAsync(brokerHostPort, request.getQuery());
return new ResultSetGroupFuture(responseFuture);
}

Expand Down
Expand Up @@ -61,28 +61,48 @@ public BrokerResponse executeQuery(String brokerAddress, String query)

@Override
public Future<BrokerResponse> executeQueryAsync(String brokerAddress, final String query) {
return executeQueryAsync(brokerAddress, new Request("pql", query));
}

public Future<BrokerResponse> executePinotQueryAsync(String brokerAddress, final Request request) {
try {
ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("pql", query);
json.put(request.getQueryFormat(), request.getQuery());

final String url = "http://" + brokerAddress + "/query";

AsyncHttpClient.BoundRequestBuilder request = _httpClient.preparePost(url);
AsyncHttpClient.BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);

if(_headers != null) {
_headers.forEach((k, v) -> request.addHeader(k, v));
_headers.forEach((k, v) -> requestBuilder.addHeader(k, v));
}

final Future<Response> response = request
final Future<Response> response = requestBuilder
.addHeader("Content-Type", "application/json; charset=utf-8")
.setBody(json.toString()).execute();

return new BrokerResponseFuture(response, query, url);
return new BrokerResponseFuture(response, request.getQuery(), url);
} catch (Exception e) {
throw new PinotClientException(e);
}
}

@Override
public BrokerResponse executeQuery(String brokerAddress, Request request)
throws PinotClientException {
try {
return executeQueryAsync(brokerAddress, request).get();
} catch (Exception e) {
throw new PinotClientException(e);
}
}

@Override
public Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
throws PinotClientException {
return executePinotQueryAsync(brokerAddress, request);
}

private static class BrokerResponseFuture implements Future<BrokerResponse> {
private final Future<Response> _response;
private final String _query;
Expand Down
Expand Up @@ -30,4 +30,10 @@ BrokerResponse executeQuery(String brokerAddress, String query)

Future<BrokerResponse> executeQueryAsync(String brokerAddress, String query)
throws PinotClientException;

BrokerResponse executeQuery(String brokerAddress, Request request)
throws PinotClientException;

Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
throws PinotClientException;
}
49 changes: 49 additions & 0 deletions pinot-api/src/main/java/org/apache/pinot/client/Request.java
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client;

/**
* Request is used in server request to host multiple pinot query types, like PQL, SQL.
*/
public class Request {

private String _queryFormat;
private String _query;

public Request(String queryFormat, String query) {
_queryFormat = queryFormat;
_query = query;
}

public String getQueryFormat() {
return _queryFormat;
}

public void setQueryFormat(String queryType) {
_queryFormat = queryType;
}

public String getQuery() {
return _query;
}

public void setQuery(String query) {
_query = query;
}
}
Expand Up @@ -30,6 +30,9 @@
*
*/
public class PreparedStatementTest {
private DummyPinotClientTransport _dummyPinotClientTransport = new DummyPinotClientTransport();
private PinotClientTransportFactory _previousTransportFactory = null;

@Test
public void testPreparedStatementEscaping() {
// Create a prepared statement that has to quote a string appropriately
Expand All @@ -42,8 +45,16 @@ public void testPreparedStatementEscaping() {
Assert.assertEquals("SELECT foo FROM bar WHERE baz = '''hello'''", _dummyPinotClientTransport.getLastQuery());
}

private DummyPinotClientTransport _dummyPinotClientTransport = new DummyPinotClientTransport();
private PinotClientTransportFactory _previousTransportFactory = null;
@BeforeClass
public void overridePinotClientTransport() {
_previousTransportFactory = ConnectionFactory._transportFactory;
ConnectionFactory._transportFactory = new DummyPinotClientTransportFactory();
}

@AfterClass
public void resetPinotClientTransport() {
ConnectionFactory._transportFactory = _previousTransportFactory;
}

class DummyPinotClientTransport implements PinotClientTransport {
private String _lastQuery;
Expand All @@ -62,6 +73,20 @@ public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String que
return null;
}

@Override
public BrokerResponse executeQuery(String brokerAddress, Request request)
throws PinotClientException {
_lastQuery = request.getQuery();
return BrokerResponse.empty();
}

@Override
public Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
throws PinotClientException {
_lastQuery = request.getQuery();
return null;
}

public String getLastQuery() {
return _lastQuery;
}
Expand All @@ -73,15 +98,4 @@ public PinotClientTransport buildTransport() {
return _dummyPinotClientTransport;
}
}

@BeforeClass
public void overridePinotClientTransport() {
_previousTransportFactory = ConnectionFactory._transportFactory;
ConnectionFactory._transportFactory = new DummyPinotClientTransportFactory();
}

@AfterClass
public void resetPinotClientTransport() {
ConnectionFactory._transportFactory = _previousTransportFactory;
}
}
Expand Up @@ -32,6 +32,9 @@
*
*/
public class ResultSetGroupTest {
private DummyJsonTransport _dummyJsonTransport = new DummyJsonTransport();
private PinotClientTransportFactory _previousTransportFactory = null;

@Test
public void testDeserializeSelectionResultSet() {
// Deserialize selection result
Expand Down Expand Up @@ -117,8 +120,16 @@ private ResultSetGroup getResultSet(String resourceName) {
return connection.execute("dummy");
}

private DummyJsonTransport _dummyJsonTransport = new DummyJsonTransport();
private PinotClientTransportFactory _previousTransportFactory = null;
@BeforeClass
public void overridePinotClientTransport() {
_previousTransportFactory = ConnectionFactory._transportFactory;
ConnectionFactory._transportFactory = new DummyJsonTransportFactory();
}

@AfterClass
public void resetPinotClientTransport() {
ConnectionFactory._transportFactory = _previousTransportFactory;
}

class DummyJsonTransport implements PinotClientTransport {
public String _resource;
Expand Down Expand Up @@ -147,6 +158,18 @@ public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String que
throws PinotClientException {
return null;
}

@Override
public BrokerResponse executeQuery(String brokerAddress, Request request)
throws PinotClientException {
return executeQuery(brokerAddress, request.getQuery());
}

@Override
public Future<BrokerResponse> executeQueryAsync(String brokerAddress, Request request)
throws PinotClientException {
return null;
}
}

class DummyJsonTransportFactory implements PinotClientTransportFactory {
Expand All @@ -155,15 +178,4 @@ public PinotClientTransport buildTransport() {
return _dummyJsonTransport;
}
}

@BeforeClass
public void overridePinotClientTransport() {
_previousTransportFactory = ConnectionFactory._transportFactory;
ConnectionFactory._transportFactory = new DummyJsonTransportFactory();
}

@AfterClass
public void resetPinotClientTransport() {
ConnectionFactory._transportFactory = _previousTransportFactory;
}
}

0 comments on commit a7c419a

Please sign in to comment.