diff --git a/integration/zeppelin/README.md b/integration/zeppelin/README.md
new file mode 100644
index 00000000000..35b27bcebc3
--- /dev/null
+++ b/integration/zeppelin/README.md
@@ -0,0 +1,44 @@
+
+
+### Please follow below steps to integrate with zeppelin
+1. run ```mvn package -Pzeppelin```
+ This will generate _carbondata-zeppelin-*.tar.gz_ under target folder
+2. Extract the tar content to _ZEPPELIN_INSTALL_HOME/interpreter/_
+3. Add _org.apache.carbonndata.zeppelin.CarbonInterpreter_ to list of interpreters mentioned by _zeppelin.interpreters_ @ _ZEPPELIN_INSTALL_HOME/conf/zeppelin-site.xml_ (create if not exists)
+ Example:
+```xml
+
+ zeppelin.interpreters
+org.apache.zeppelin.spark.SparkInterpreter,.....,org.apache.carbonndata.zeppelin.CarbonInterpreter
+ Comma separated interpreter configurations. First interpreter become a default
+
+```
+4. Add carbon to list of interpreters mentioned by zeppelin.interpreter.order @ ZEPPELIN_INSTALL_HOME/conf/zeppelin-site.xml
+ Example:
+```xml
+
+ zeppelin.interpreter.group.order
+ spark,..,carbon
+
+
+```
+
+5. Restart Zeppelin server and add new interpreter with name _carbon_ from zeppelin interpreter page
+ Refer : https://zeppelin.apache.org/docs/0.8.0/usage/interpreter/overview.html#what-is-zeppelin-interpreter
+6. Configure ```carbon.query.api.url``` in interpreter setting from zeppelin interpreter page and click save
+7. Now can use notebook with interpreter ```%carbon```
diff --git a/integration/zeppelin/assembly/assembly.xml b/integration/zeppelin/assembly/assembly.xml
new file mode 100644
index 00000000000..6d09ad35615
--- /dev/null
+++ b/integration/zeppelin/assembly/assembly.xml
@@ -0,0 +1,37 @@
+
+compress
+
+ tar.gz
+
+
+ false
+
+
+ /carbon
+ false
+
+ com.fasterxml.jackson.core:*:jar
+
+
+
+
+
+
+
+ misc/
+ carbon
+
+ *.json
+
+
+
+
+ target
+ carbon
+
+ carbondata-zeppelin-*.jar
+
+
+
+
+
\ No newline at end of file
diff --git a/integration/zeppelin/misc/interpreter-setting.json b/integration/zeppelin/misc/interpreter-setting.json
new file mode 100644
index 00000000000..ce3980441ea
--- /dev/null
+++ b/integration/zeppelin/misc/interpreter-setting.json
@@ -0,0 +1,22 @@
+[
+ {
+ "group": "carbon",
+ "name": "carbon",
+ "className": "org.apache.carbondata.zeppelin.CarbonInterpreter",
+ "properties": {
+ "carbon.query.api.url": {
+ "envName": null,
+ "propertyName": "carbon.query.api.url",
+ "defaultValue": "",
+ "description": "API URL for request",
+ "type": "string"
+ }
+ },
+ "editor": {
+ "language": "sql",
+ "editOnDblClick": false,
+ "completionKey": "TAB",
+ "completionSupport": true
+ }
+ }
+]
\ No newline at end of file
diff --git a/integration/zeppelin/pom.xml b/integration/zeppelin/pom.xml
new file mode 100644
index 00000000000..c381c485f54
--- /dev/null
+++ b/integration/zeppelin/pom.xml
@@ -0,0 +1,99 @@
+
+
+
+
+ 4.0.0
+
+
+ org.apache.carbondata
+ carbondata-parent
+ 1.5.0-SNAPSHOT
+ ../../pom.xml
+
+
+ carbondata-zeppelin
+ Apache CarbonData :: Zeppelin
+
+
+ ${basedir}/../../dev
+
+
+
+
+ org.apache.zeppelin
+ zeppelin-interpreter
+ 0.8.0
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.1
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.8.0
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.8.11.1
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.8.10
+
+
+ junit
+ junit
+ 4.11
+ test
+
+
+
+
+
+
+ maven-compiler-plugin
+
+
+ 1.8
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+
+ assembly/assembly.xml
+ carbondata-zeppelin-${version}
+
+
+
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java
new file mode 100644
index 00000000000..f5dfa0e4f3f
--- /dev/null
+++ b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/CarbonInterpreter.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.carbondata.zeppelin.response.CarbonResponse;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Carbon based interpreter for zeppelin
+ */
+public class CarbonInterpreter extends Interpreter {
+
+ public static final Logger logger = LoggerFactory.getLogger(CarbonInterpreter.class);
+
+ static final char TAB = '\t';
+
+ static final String LF = "\n";
+
+ /**
+ * Property which can be set in zeppelin to carbon REST API server
+ */
+ public static final String CARBON_QUERY_API_URL = "carbon.query.api.url";
+
+ /**
+ * These are the queries which need Table like output format
+ */
+ private static final String[] SEARCH_QUERIES = {"select", "list", "show", "desc"};
+
+ public CarbonInterpreter(Properties properties) {
+ super(properties);
+ }
+
+ @Override
+ public void open() throws InterpreterException {
+ }
+
+ @Override
+ public void close() throws InterpreterException {
+ }
+
+ @Override
+ public void cancel(InterpreterContext interpreterContext) throws InterpreterException {
+ }
+
+ @Override
+ public int getProgress(InterpreterContext interpreterContext) throws InterpreterException {
+ return 0;
+ }
+
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.SIMPLE;
+ }
+
+ @Override
+ public InterpreterResult interpret(String sql, InterpreterContext interpreterContext)
+ throws InterpreterException {
+ try {
+ return executeQuery.apply(sql);
+ } catch (RuntimeException e) {
+ logger.error("failed to query data in carbon ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ }
+
+ /**
+ * This will execute the given sql Query by sending a post request on CARBON_QUERY_API_URL
+ */
+ private Function doPost = sql -> {
+ // prepare the post body
+ String postContent = new StringBuilder("{\"sqlStatement\":")
+ .append("\"").append(sql).append("\"" + "}").toString();
+ logger.debug("post:" + postContent);
+
+ // prepare entity and set content type
+ StringEntity entity = new StringEntity(postContent, "UTF-8");
+ entity.setContentType("application/json; charset=UTF-8");
+
+ // get the POST url from interpreter property
+ String postURL = getProperty(CARBON_QUERY_API_URL);
+ logger.debug("post url:" + postURL);
+
+ // do POST and get response
+ HttpPost postRequest = new HttpPost(postURL);
+ postRequest.setEntity(entity);
+ HttpClient httpClient = HttpClientBuilder.create().build();
+ try {
+ return httpClient.execute(postRequest);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ /**
+ * Check if output has to be sent as a able to zeppelin
+ */
+ private Function isTableFormatOutput = sql ->
+ (StringUtils.startsWithAny(sql, SEARCH_QUERIES));
+
+ /**
+ * returns InterpreterResult from CarbonResponse
+ */
+ private BiFunction getResult = (sql, response) -> {
+ if (isTableFormatOutput.apply(sql.toLowerCase().trim())) {
+ //format only select queries and return as table
+ String formattedResult = Arrays
+ .stream(response.getRows())
+ .filter(Objects::nonNull)
+ .map(row -> StringUtils.join(row, TAB))
+ .collect(Collectors.joining(LF));
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TABLE,
+ formattedResult);
+ } else {
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS, InterpreterResult.Type.TEXT,
+ response.getMessage());
+ }
+ };
+
+ /**
+ * Executes the given sql and return formatted result
+ */
+ private Function executeQuery = sql -> {
+ try {
+ HttpResponse response = doPost.apply(sql);
+ // always close the content after reading fully to release connection
+ // IOUtils.toString will completely read the content
+ try (InputStream content = response.getEntity().getContent()) {
+ Optional carbonResponse = CarbonResponse.parse(content);
+ int code = response.getStatusLine().getStatusCode();
+ if (code != 200) {
+ StringBuilder errorMessage = new StringBuilder("Failed : HTTP error code " + code + " .");
+ carbonResponse.ifPresent(rsp -> {
+ logger.error("Failed to execute query: " + rsp.getFullResponse());
+ errorMessage.append(rsp.getMessage());
+ });
+ return new InterpreterResult(InterpreterResult.Code.ERROR, errorMessage.toString());
+ } else {
+ return carbonResponse.map(rsp -> getResult.apply(sql, rsp))
+ .orElseGet(() -> new InterpreterResult(InterpreterResult.Code.SUCCESS,
+ InterpreterResult.Type.TEXT,
+ "Query Success, but unable to parse response"));
+ }
+ }
+ } catch (IOException e) {
+ logger.error("Error executing query ", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.getMessage());
+ }
+ };
+}
diff --git a/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java
new file mode 100644
index 00000000000..2428d75e622
--- /dev/null
+++ b/integration/zeppelin/src/main/java/org/apache/carbondata/zeppelin/response/CarbonResponse.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin.response;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Optional;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.commons.io.IOUtils;
+
+
+
+/**
+ * acts as a response object from carbon horizon server
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CarbonResponse {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private String responseId;
+
+ private String message;
+
+ private Object[][] rows;
+
+ long timestamp;
+
+ int status;
+
+ String error;
+
+ String exception;
+
+ String path;
+
+ String fullResponse;
+
+
+ public String getResponseId() {
+ return responseId;
+ }
+
+ public void setResponseId(String responseId) {
+ this.responseId = responseId;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public Object[][] getRows() {
+ return rows;
+ }
+
+ public void setRows(Object[][] rows) {
+ this.rows = rows;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public void setStatus(int status) {
+ this.status = status;
+ }
+
+ public String getError() {
+ return error;
+ }
+
+ public void setError(String error) {
+ this.error = error;
+ }
+
+ public String getException() {
+ return exception;
+ }
+
+ public void setException(String exception) {
+ this.exception = exception;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
+ public String getFullResponse() {
+ return fullResponse;
+ }
+
+ public void setFullResponse(String fullResponse) {
+ this.fullResponse = fullResponse;
+ }
+
+ /**
+ * Reads the input stream for JSON and return a CarbonResponse instance
+ * PS: Caller responsible for closing the stream
+ *
+ * @param inputStream
+ * @return
+ * @throws IOException
+ */
+ public static Optional parse(InputStream inputStream) throws IOException {
+ String plainTextResponse = IOUtils.toString(inputStream, "UTF-8");
+ CarbonResponse response = mapper.readValue(plainTextResponse, CarbonResponse.class);
+ response.setFullResponse(plainTextResponse);
+ return Optional.of(response);
+ }
+}
\ No newline at end of file
diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java
new file mode 100644
index 00000000000..6cb47b4f96b
--- /dev/null
+++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonInterpreter.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Optional;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestCarbonInterpreter {
+
+ static HttpServer server = null;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ server = HttpServer.create(new InetSocketAddress("localhost", 8123), 0);
+ server.createContext("/table/sql", new FakePostHandler());
+ server.start();
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ Optional.of(server).ifPresent(serverSocket -> serverSocket.stop(0));
+ }
+
+ @Test(timeout = 5000)
+ public void testInterpreterSelectSuccessResponse() throws InterpreterException {
+ Properties properties = new Properties();
+ properties.put(CarbonInterpreter.CARBON_QUERY_API_URL, "http://localhost:8123/table/sql");
+ CarbonInterpreter interpreter = new CarbonInterpreter(properties);
+ InterpreterResult result = interpreter.interpret("show tables", null);
+ String expectedFormattedResult = "{\"code\":\"SUCCESS\",\"msg\":[{\"type\":\"TABLE\",\"data\"" +
+ ":\"database\\ttableName\\tisTemporary\\ndefault\\tsinka6\\tfalse\\ndefault\\tsinka7\\tfalse\"}]}";
+ assertEquals(expectedFormattedResult, result.toJson());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
+ }
+}
+
+class FakePostHandler implements HttpHandler {
+
+ @Override
+ public void handle(HttpExchange he) throws IOException {
+ InputStreamReader isr = new InputStreamReader(he.getRequestBody(), "utf-8");
+ BufferedReader br = new BufferedReader(isr);
+ String query = br.readLine();
+ String response = "";
+ if (query.equals("{\"sqlStatement\":\"show tables\"}")) {
+ response = "{\n" +
+ " \"responseId\": 19435528129427470,\n" +
+ " \"message\": \"SUCCESS\",\n" +
+ " \"rows\": [\n" +
+ " [\n" +
+ " \"database\",\n" +
+ " \"tableName\",\n" +
+ " \"isTemporary\"\n" +
+ " ],\n" +
+ " [\n" +
+ " \"default\",\n" +
+ " \"sinka6\",\n" +
+ " false\n" +
+ " ],\n" +
+ " [\n" +
+ " \"default\",\n" +
+ " \"sinka7\",\n" +
+ " false\n" +
+ " ]\n" +
+ " ]\n" +
+ "}";
+ }
+ he.sendResponseHeaders(200, response.length());
+ OutputStream os = he.getResponseBody();
+ os.write(response.toString().getBytes());
+ os.close();
+ }
+}
diff --git a/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
new file mode 100644
index 00000000000..4b169f48e13
--- /dev/null
+++ b/integration/zeppelin/src/test/java/org/apache/carbondata/zeppelin/TestCarbonResponse.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.zeppelin;
+
+import com.fasterxml.jackson.databind.JsonMappingException;
+import org.apache.carbondata.zeppelin.response.CarbonResponse;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestCarbonResponse {
+
+ @Test(expected = JsonMappingException.class)
+ public void testBodyIsEmpty() throws IOException {
+ String input = "";
+ CarbonResponse.parse(new ByteArrayInputStream(input.getBytes()));
+ }
+
+ @Test
+ public void testSuccessResponse() throws IOException {
+ String input = "{\n" +
+ " \"responseId\": 19435528129427470,\n" +
+ " \"message\": \"SUCCESS\",\n" +
+ " \"rows\": [\n" +
+ " [\n" +
+ " \"database\",\n" +
+ " \"tableName\",\n" +
+ " \"isTemporary\"\n" +
+ " ],\n" +
+ " [\n" +
+ " \"default\",\n" +
+ " \"sinka6\",\n" +
+ " false\n" +
+ " ],\n" +
+ " [\n" +
+ " \"default\",\n" +
+ " \"sinka7\",\n" +
+ " false\n" +
+ " ]\n" +
+ " ]\n" +
+ "}";
+ Object[][] expectedResponse = new Object[3][];
+ expectedResponse[0] = new Object[]{"database", "tableName", "isTemporary"};
+ expectedResponse[1] = new Object[]{"default", "sinka6", false};
+ expectedResponse[2] = new Object[]{"default", "sinka7", false};
+ CarbonResponse successResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get();
+ assertEquals("SUCCESS", successResponse.getMessage());
+ assertEquals("19435528129427470", successResponse.getResponseId());
+ assertTrue(Arrays.deepEquals(expectedResponse, successResponse.getRows()));
+ assertEquals(input, successResponse.getFullResponse());
+ }
+
+ @Test
+ public void testErrorResponse() throws IOException {
+ String input = "{\n" +
+ " \"timestamp\": 1531884083849,\n" +
+ " \"status\": 500,\n" +
+ " \"error\": \"Internal Server Error\",\n" +
+ " \"exception\": \"org.apache.carbondata.store.api.exception.StoreException\",\n" +
+ " \"message\": \"org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
+ "Table or view 'sinka6' already exists in database 'default';\",\n" +
+ " \"path\": \"/table/sql\"\n" +
+ "}";
+ CarbonResponse errorResponse = CarbonResponse.parse(new ByteArrayInputStream(input.getBytes())).get();
+ assertEquals("org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException: " +
+ "Table or view 'sinka6' already exists in database 'default';", errorResponse.getMessage());
+ assertEquals("org.apache.carbondata.store.api.exception.StoreException", errorResponse.getException());
+ assertEquals(1531884083849L, errorResponse.getTimestamp());
+ assertEquals("Internal Server Error", errorResponse.getError());
+ assertEquals(500, errorResponse.getStatus());
+ assertEquals("/table/sql", errorResponse.getPath());
+ assertEquals(input, errorResponse.getFullResponse());
+ }
+}
diff --git a/pom.xml b/pom.xml
index f1f51f016a7..2135b9d12bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -646,6 +646,12 @@
datamap/mv/core
+
+ zeppelin
+
+ integration/zeppelin
+
+
diff --git a/store/sql/pom.xml b/store/sql/pom.xml
index 411590b5d3d..d90ebb345ea 100644
--- a/store/sql/pom.xml
+++ b/store/sql/pom.xml
@@ -48,8 +48,8 @@
org.apache.maven.plugins
maven-compiler-plugin
-
- 1.7
+
+ 1.8
diff --git a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
index 7583a14a799..da9df52b4dd 100644
--- a/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
+++ b/store/sql/src/main/java/org/apache/carbondata/horizon/rest/controller/SqlHorizonController.java
@@ -18,6 +18,7 @@
package org.apache.carbondata.horizon.rest.controller;
import java.util.List;
+import java.util.stream.IntStream;
import org.apache.carbondata.horizon.rest.model.validate.RequestValidator;
import org.apache.carbondata.horizon.rest.model.view.SqlRequest;
@@ -26,6 +27,7 @@
import org.apache.carbondata.store.api.exception.StoreException;
import org.apache.spark.sql.AnalysisException;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
@@ -42,25 +44,29 @@ public class SqlHorizonController {
public ResponseEntity sql(@RequestBody SqlRequest request) throws StoreException {
RequestValidator.validateSql(request);
List rows;
+ Dataset sqlDataFrame = null;
try {
- rows = SparkSqlWrapper.sql(SqlHorizon.getSession(), request.getSqlStatement())
- .collectAsList();
+ sqlDataFrame = SparkSqlWrapper.sql(SqlHorizon.getSession(),
+ request.getSqlStatement());
+ rows = sqlDataFrame.collectAsList();
} catch (AnalysisException e) {
throw new StoreException(e.getSimpleMessage());
} catch (Exception e) {
throw new StoreException(e.getMessage());
}
- Object[][] result = new Object[rows.size()][];
- for (int i = 0; i < rows.size(); i++) {
- Row row = rows.get(i);
- result[i] = new Object[row.size()];
- for (int j = 0; j < row.size(); j++) {
- result[i][j] = row.get(j);
- }
+ final String[] fieldNames = sqlDataFrame.schema().fieldNames();
+ Object[][] responseData = new Object[0][];
+ if (rows.size() > 0) {
+ final Object[][] result = new Object[rows.size() + 1][fieldNames.length];
+ System.arraycopy(fieldNames, 0, result[0], 0, fieldNames.length);
+ IntStream.range(0, rows.size()).forEach(index ->
+ IntStream.range(0, fieldNames.length).forEach(col ->
+ result[index + 1][col] = rows.get(index).get(col)));
+ responseData = result;
}
return new ResponseEntity<>(
- new SqlResponse(request, "SUCCESS", result), HttpStatus.OK);
+ new SqlResponse(request, "SUCCESS", responseData), HttpStatus.OK);
}
@RequestMapping(value = "echosql")