From ef7defcaeb9ae8093ef04bd4ddba81a51fd2013f Mon Sep 17 00:00:00 2001 From: tzolov Date: Fri, 31 Jul 2015 02:24:41 +0200 Subject: [PATCH 1/9] ZEPPELIN-189: Add Apache Geode Interpreter for Zeppelin --- conf/zeppelin-site.xml.template | 2 +- geode/pom.xml | 156 +++++++++++ .../zeppelin/geode/GeodeOqlInterpreter.java | 255 ++++++++++++++++++ .../geode/GeodeOqlInterpreterTest.java | 125 +++++++++ pom.xml | 1 + 5 files changed, 538 insertions(+), 1 deletion(-) create mode 100644 geode/pom.xml create mode 100644 geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java create mode 100644 geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 13e4d1dca34..f48a960fab9 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -72,7 +72,7 @@ zeppelin.interpreters - org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter + org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter,org.apache.zeppelin.tajo.TajoInterpreter,org.apache.zeppelin.flink.FlinkInterpreter,org.apache.zeppelin.lens.LensInterpreter,org.apache.zeppelin.ignite.IgniteInterpreter,org.apache.zeppelin.ignite.IgniteSqlInterpreter,org.apache.zeppelin.cassandra.CassandraInterpreter,org.apache.zeppelin.geode.GeodeOqlInterpreter Comma separated interpreter configurations. First interpreter become a default diff --git a/geode/pom.xml b/geode/pom.xml new file mode 100644 index 00000000000..806429e2135 --- /dev/null +++ b/geode/pom.xml @@ -0,0 +1,156 @@ + + + + + 4.0.0 + + + zeppelin + org.apache.zeppelin + 0.6.0-incubating-SNAPSHOT + + + org.apache.zeppelin + zeppelin-geode + jar + 0.6.0-incubating-SNAPSHOT + Zeppelin: Apache Geode Interpreter + http://geode.incubator.apache.org/ + + + 8.1.0 + + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + com.gemstone.gemfire + gemfire + ${gemfire.version} + + + + org.apache.commons + commons-exec + 1.1 + + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + + + + junit + junit + test + + + + org.mockito + mockito-all + 1.9.5 + test + + + + + + + + org.apache.maven.plugins + maven-deploy-plugin + 2.7 + + true + + + + + maven-enforcer-plugin + 1.3.1 + + + enforce + none + + + + + + maven-dependency-plugin + 2.8 + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/../../interpreter/psql + false + false + true + runtime + + + + copy-artifact + package + + copy + + + ${project.build.directory}/../../interpreter/psql + false + false + true + runtime + + + ${project.groupId} + ${project.artifactId} + ${project.version} + ${project.packaging} + + + + + + + + + + + spring-gemstone-releases + http://repo.springsource.org/gemstone-release-cache/ + + + diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java new file mode 100644 index 00000000000..9fb9599743c --- /dev/null +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.zeppelin.geode; + +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.gemstone.gemfire.cache.client.ClientCache; +import com.gemstone.gemfire.cache.client.ClientCacheFactory; +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.pdx.PdxInstance; + +/** + * Apache Geode OQL Interpreter (http://geode.incubator.apache.org) + * + * + *

+ * Sample usage:
+ * {@code %geode.oql}
+ * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > '95'} + *

+ * + * The OQL spec and sample queries: + * http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html + * + *

+ * Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query + * "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a + * PartitionedRegion. + *

+ */ +public class GeodeOqlInterpreter extends Interpreter { + + Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); + int commandTimeOut = 600000; + + private static final String TABLE_MAGIC = "%table "; + + public static final String LOCATOR_HOST = "geode.locator.host"; + public static final String LOCATOR_PORT = "geode.locator.port"; + + static { + Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(), + new InterpreterPropertyBuilder().add(LOCATOR_HOST, "localhost", "The Geode Locator Host.") + .add(LOCATOR_PORT, "10334", "The Geode Locator Port").build()); + } + + private ClientCache clientCache = null; + private QueryService queryService = null; + private Exception exceptionOnConnect; + + public GeodeOqlInterpreter(Properties property) { + super(property); + } + + protected ClientCache getClientCache() { + + String locatorHost = getProperty(LOCATOR_HOST); + int locatorPort = Integer.valueOf(getProperty(LOCATOR_PORT)); + + ClientCache clientCache = + new ClientCacheFactory().addPoolLocator(locatorHost, locatorPort).create(); + + return clientCache; + } + + @Override + public void open() { + logger.info("Geode open connection called!"); + try { + clientCache = getClientCache(); + queryService = clientCache.getQueryService(); + exceptionOnConnect = null; + logger.info("Successfully created Geode connection"); + } catch (Exception e) { + logger.error("Cannot open connection", e); + exceptionOnConnect = e; + } + } + + @Override + public void close() { + try { + if (clientCache != null) { + clientCache.close(); + } + + if (queryService != null) { + queryService.closeCqs(); + } + + } catch (Exception e) { + logger.error("Cannot close connection", e); + } finally { + clientCache = null; + queryService = null; + exceptionOnConnect = null; + } + } + + + private InterpreterResult executeOql(String oql) { + try { + + if (getExceptionOnConnect() != null) { + return new InterpreterResult(Code.ERROR, getExceptionOnConnect().getMessage()); + } + + @SuppressWarnings("unchecked") + SelectResults results = + (SelectResults) getQueryService().newQuery(oql).execute(); + + StringBuilder msg = new StringBuilder(TABLE_MAGIC); + boolean isTableHeaderSet = false; + + Iterator iterator = results.iterator(); + while (iterator.hasNext()) { + + Object entry = iterator.next(); + + if (entry instanceof Number) { + handleNumberEntry(isTableHeaderSet, entry, msg); + } else if (entry instanceof Struct) { + handleStructEntry(isTableHeaderSet, entry, msg); + } else if (entry instanceof PdxInstance) { + handlePdxInstanceEntry(isTableHeaderSet, entry, msg); + } else { + handleUnsupportedTypeEntry(isTableHeaderSet, entry, msg); + } + + isTableHeaderSet = true; + msg.append('\n'); + } + + return new InterpreterResult(Code.SUCCESS, msg.toString()); + + } catch (Exception ex) { + logger.error("Cannot run " + oql, ex); + return new InterpreterResult(Code.ERROR, ex.getMessage()); + } + } + + private void handleStructEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + Struct struct = (Struct) entry; + if (!isTableHeaderSet) { + for (String titleName : struct.getStructType().getFieldNames()) { + msg.append(titleName).append('\t'); + } + msg.append('\n'); + } + + for (String titleName : struct.getStructType().getFieldNames()) { + msg.append(struct.get(titleName)).append('\t'); + } + } + + private void handlePdxInstanceEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + PdxInstance pdxEntry = (PdxInstance) entry; + if (!isTableHeaderSet) { + for (String titleName : pdxEntry.getFieldNames()) { + msg.append(titleName).append('\t'); + } + msg.append('\n'); + } + + for (String titleName : pdxEntry.getFieldNames()) { + msg.append(pdxEntry.getField(titleName)).append('\t'); + } + } + + private void handleNumberEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + if (!isTableHeaderSet) { + msg.append("Result").append('\n'); + } + msg.append((Number) entry); + } + + private void handleUnsupportedTypeEntry(boolean isTableHeaderSet, + Object entry, StringBuilder msg) { + if (!isTableHeaderSet) { + msg.append("Unsuppoted Type").append('\n'); + } + msg.append("" + entry); + } + + + @Override + public InterpreterResult interpret(String cmd, InterpreterContext contextInterpreter) { + logger.info("Run OQL command '{}'", cmd); + return executeOql(cmd); + } + + @Override + public void cancel(InterpreterContext context) { + // Do nothing + } + + @Override + public FormType getFormType() { + return FormType.SIMPLE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + GeodeOqlInterpreter.class.getName() + this.hashCode()); + } + + @Override + public List completion(String buf, int cursor) { + return null; + } + + // Test only + QueryService getQueryService() { + return this.queryService; + } + + Exception getExceptionOnConnect() { + return this.exceptionOnConnect; + } +} diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java new file mode 100644 index 00000000000..70a5cb3513c --- /dev/null +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -0,0 +1,125 @@ +package org.apache.zeppelin.geode; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.RETURNS_DEEP_STUBS; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter.FormType; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.junit.Test; + +import com.gemstone.gemfire.cache.query.QueryService; +import com.gemstone.gemfire.cache.query.SelectResults; +import com.gemstone.gemfire.cache.query.Struct; +import com.gemstone.gemfire.cache.query.internal.StructImpl; +import com.gemstone.gemfire.cache.query.internal.types.StructTypeImpl; +import com.gemstone.gemfire.pdx.PdxInstance; +import com.gemstone.gemfire.pdx.internal.PdxInstanceImpl; +import com.gemstone.gemfire.pdx.internal.PdxType; + +public class GeodeOqlInterpreterTest { + + private static final String OQL_QUERY = "select * from /region"; + + @Test + public void oqlNumberResponse() throws Exception { + oqlTest(new ArrayList(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n"); + } + + @Test + public void oqlStructResponse() throws Exception { + String[] fields = new String[] {"field1", "field2"}; + Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"}); + + oqlTest(new ArrayList(Arrays.asList(struct)).iterator(), + "field1\tfield2\t\nval1\tval2\t\n"); + } + + @Test + public void oqlPdxInstanceResponse() throws Exception { + ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes()); + PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); + oqlTest(new ArrayList(Arrays.asList(pdxInstance)).iterator(), "\n\n"); + } + + private static class DummyUnspportedType { + @Override + public String toString() { + return "Unsupported Indeed"; + } + } + + @Test + public void oqlUnsupportedTypeResponse() throws Exception { + DummyUnspportedType unspported1 = new DummyUnspportedType(); + DummyUnspportedType unspported2 = new DummyUnspportedType(); + oqlTest(new ArrayList(Arrays.asList(unspported1, unspported2)).iterator(), + "Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n"); + } + + private void oqlTest(Iterator queryResponseIterator, String expectedOutput) + throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS); + + when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService); + + @SuppressWarnings("unchecked") + SelectResults mockResults = mock(SelectResults.class); + + when(mockQueryService.newQuery(eq(OQL_QUERY)).execute()).thenReturn(mockResults); + + when(mockResults.iterator()).thenReturn(queryResponseIterator); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.SUCCESS, interpreterResult.code()); + assertEquals(expectedOutput, interpreterResult.message()); + } + + @Test + public void oqlWithQueryException() throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + when(spyGeodeOqlInterpreter.getExceptionOnConnect()).thenReturn( + new RuntimeException("Test Exception On Connect")); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.ERROR, interpreterResult.code()); + assertEquals("Test Exception On Connect", interpreterResult.message()); + } + + @Test + public void oqlWithExceptionOnConnect() throws Exception { + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); + + when(spyGeodeOqlInterpreter.getQueryService()) + .thenThrow(new RuntimeException("Test exception")); + + InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); + + assertEquals(Code.ERROR, interpreterResult.code()); + assertEquals("Test exception", interpreterResult.message()); + } + + @Test + public void testFormType() { + assertEquals(FormType.SIMPLE, new GeodeOqlInterpreter(new Properties()).getFormType()); + } +} diff --git a/pom.xml b/pom.xml index 9e5f54e7002..ecdebddd75f 100644 --- a/pom.xml +++ b/pom.xml @@ -91,6 +91,7 @@ angular shell hive + geode tajo flink ignite From 14b8a37d05a15b4209f39585f97c749e15857a11 Mon Sep 17 00:00:00 2001 From: tzolov Date: Fri, 31 Jul 2015 13:45:37 +0200 Subject: [PATCH 2/9] ZEPPELIN-189: Add missing license tag --- .../zeppelin/geode/GeodeOqlInterpreterTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java index 70a5cb3513c..2e88470cbd6 100644 --- a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -1,3 +1,17 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ package org.apache.zeppelin.geode; import static org.junit.Assert.assertEquals; From cd6294be733bb4b392d63fea692b06d646994874 Mon Sep 17 00:00:00 2001 From: tzolov Date: Sat, 1 Aug 2015 15:56:10 +0200 Subject: [PATCH 3/9] ZEPPELIN-189: Handle responses containing reserved characters --- geode/pom.xml | 15 ++++- .../zeppelin/geode/GeodeOqlInterpreter.java | 67 ++++++++++++------- .../geode/GeodeOqlInterpreterTest.java | 23 +++++-- 3 files changed, 71 insertions(+), 34 deletions(-) diff --git a/geode/pom.xml b/geode/pom.xml index 806429e2135..6812c1d20fb 100644 --- a/geode/pom.xml +++ b/geode/pom.xml @@ -29,11 +29,12 @@ zeppelin-geode jar 0.6.0-incubating-SNAPSHOT - Zeppelin: Apache Geode Interpreter + Zeppelin: Apache Geode interpreter http://geode.incubator.apache.org/ - 8.1.0 + 1.0.0-incubating-SNAPSHOT + @@ -44,10 +45,18 @@ provided + + + + org.apache.geode + gemfire-core + ${geode.version} @@ -147,10 +156,12 @@ + diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java index 9fb9599743c..a916e14bb2b 100644 --- a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -18,6 +18,7 @@ import java.util.List; import java.util.Properties; +import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; @@ -45,7 +46,7 @@ *

* Sample usage:
* {@code %geode.oql}
- * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > '95'} + * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95} *

* * The OQL spec and sample queries: @@ -59,18 +60,24 @@ */ public class GeodeOqlInterpreter extends Interpreter { + private static final String DEFAULT_PORT = "10334"; + private static final String DEFAULT_HOST = "localhost"; + + private static final char NEWLINE = '\n'; + private static final char TAB = '\t'; + private static final char WHITESPACE = ' '; + Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); - int commandTimeOut = 600000; - private static final String TABLE_MAGIC = "%table "; + private static final String TABLE_MAGIC_TAG = "%table "; public static final String LOCATOR_HOST = "geode.locator.host"; public static final String LOCATOR_PORT = "geode.locator.port"; static { Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(), - new InterpreterPropertyBuilder().add(LOCATOR_HOST, "localhost", "The Geode Locator Host.") - .add(LOCATOR_PORT, "10334", "The Geode Locator Port").build()); + new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.") + .add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port").build()); } private ClientCache clientCache = null; @@ -126,7 +133,6 @@ public void close() { } } - private InterpreterResult executeOql(String oql) { try { @@ -138,7 +144,7 @@ private InterpreterResult executeOql(String oql) { SelectResults results = (SelectResults) getQueryService().newQuery(oql).execute(); - StringBuilder msg = new StringBuilder(TABLE_MAGIC); + StringBuilder msg = new StringBuilder(TABLE_MAGIC_TAG); boolean isTableHeaderSet = false; Iterator iterator = results.iterator(); @@ -157,7 +163,7 @@ private InterpreterResult executeOql(String oql) { } isTableHeaderSet = true; - msg.append('\n'); + msg.append(NEWLINE); } return new InterpreterResult(Code.SUCCESS, msg.toString()); @@ -168,45 +174,56 @@ private InterpreterResult executeOql(String oql) { } } - private void handleStructEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + /** + * For %table response replace Tab and Newline characters from the content. + */ + private String replaceReservedChars(String str) { + + if (StringUtils.isBlank(str)) { + return str; + } + + return str.replace(TAB, WHITESPACE).replace(NEWLINE, WHITESPACE); + } + + private void handleStructEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { Struct struct = (Struct) entry; - if (!isTableHeaderSet) { + if (!isHeaderSet) { for (String titleName : struct.getStructType().getFieldNames()) { - msg.append(titleName).append('\t'); + msg.append(replaceReservedChars(titleName)).append(TAB); } - msg.append('\n'); + msg.append(NEWLINE); } for (String titleName : struct.getStructType().getFieldNames()) { - msg.append(struct.get(titleName)).append('\t'); + msg.append(replaceReservedChars("" + struct.get(titleName))).append(TAB); } } - private void handlePdxInstanceEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { + private void handlePdxInstanceEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { PdxInstance pdxEntry = (PdxInstance) entry; - if (!isTableHeaderSet) { + if (!isHeaderSet) { for (String titleName : pdxEntry.getFieldNames()) { - msg.append(titleName).append('\t'); + msg.append(replaceReservedChars(titleName)).append(TAB); } - msg.append('\n'); + msg.append(NEWLINE); } for (String titleName : pdxEntry.getFieldNames()) { - msg.append(pdxEntry.getField(titleName)).append('\t'); + msg.append(replaceReservedChars("" + pdxEntry.getField(titleName))).append(TAB); } } - private void handleNumberEntry(boolean isTableHeaderSet, Object entry, StringBuilder msg) { - if (!isTableHeaderSet) { - msg.append("Result").append('\n'); + private void handleNumberEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { + if (!isHeaderSet) { + msg.append("Result").append(NEWLINE); } msg.append((Number) entry); } - private void handleUnsupportedTypeEntry(boolean isTableHeaderSet, - Object entry, StringBuilder msg) { - if (!isTableHeaderSet) { - msg.append("Unsuppoted Type").append('\n'); + private void handleUnsupportedTypeEntry(boolean isHeaderSet, Object entry, StringBuilder msg) { + if (!isHeaderSet) { + msg.append("Unsuppoted Type").append(NEWLINE); } msg.append("" + entry); } diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java index 2e88470cbd6..757c19f3687 100644 --- a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -48,7 +48,7 @@ public class GeodeOqlInterpreterTest { @Test public void oqlNumberResponse() throws Exception { - oqlTest(new ArrayList(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n"); + testOql(new ArrayList(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n"); } @Test @@ -56,15 +56,24 @@ public void oqlStructResponse() throws Exception { String[] fields = new String[] {"field1", "field2"}; Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"}); - oqlTest(new ArrayList(Arrays.asList(struct)).iterator(), + testOql(new ArrayList(Arrays.asList(struct)).iterator(), "field1\tfield2\t\nval1\tval2\t\n"); } + + @Test + public void oqlStructResponseWithReservedCharacters() throws Exception { + String[] fields = new String[] {"fi\teld1", "f\nield2"}; + Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"}); + + testOql(new ArrayList(Arrays.asList(struct)).iterator(), + "fi eld1\tf ield2\t\nv al 1\tval2\t\n"); + } @Test public void oqlPdxInstanceResponse() throws Exception { ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes()); PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); - oqlTest(new ArrayList(Arrays.asList(pdxInstance)).iterator(), "\n\n"); + testOql(new ArrayList(Arrays.asList(pdxInstance)).iterator(), "\n\n"); } private static class DummyUnspportedType { @@ -78,11 +87,11 @@ public String toString() { public void oqlUnsupportedTypeResponse() throws Exception { DummyUnspportedType unspported1 = new DummyUnspportedType(); DummyUnspportedType unspported2 = new DummyUnspportedType(); - oqlTest(new ArrayList(Arrays.asList(unspported1, unspported2)).iterator(), + testOql(new ArrayList(Arrays.asList(unspported1, unspported2)).iterator(), "Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n"); } - private void oqlTest(Iterator queryResponseIterator, String expectedOutput) + private void testOql(Iterator queryResponseIterator, String expectedOutput) throws Exception { GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); @@ -124,12 +133,12 @@ public void oqlWithExceptionOnConnect() throws Exception { GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); when(spyGeodeOqlInterpreter.getQueryService()) - .thenThrow(new RuntimeException("Test exception")); + .thenThrow(new RuntimeException("Expected Test Exception!")); InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); assertEquals(Code.ERROR, interpreterResult.code()); - assertEquals("Test exception", interpreterResult.message()); + assertEquals("Expected Test Exception!", interpreterResult.message()); } @Test From c410e2e23704e0fd7b0bf0d1646fc4bec92918b1 Mon Sep 17 00:00:00 2001 From: tzolov Date: Sat, 1 Aug 2015 16:18:24 +0200 Subject: [PATCH 4/9] ZEPPELIN-189: Fix wrong interpreter name in pom --- geode/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/geode/pom.xml b/geode/pom.xml index 6812c1d20fb..f3041cee757 100644 --- a/geode/pom.xml +++ b/geode/pom.xml @@ -123,7 +123,7 @@ copy-dependencies - ${project.build.directory}/../../interpreter/psql + ${project.build.directory}/../../interpreter/geode false false true @@ -137,7 +137,7 @@ copy - ${project.build.directory}/../../interpreter/psql + ${project.build.directory}/../../interpreter/geode false false true From 9b701f972fc0ce6ada8323d9c7de2a2bc0d22d6d Mon Sep 17 00:00:00 2001 From: tzolov Date: Mon, 3 Aug 2015 07:31:18 +0200 Subject: [PATCH 5/9] ZEPPELIN-189: Clean pom formatting and remove obsolete dependencies --- geode/pom.xml | 46 +++++++++++++++------------------------------- 1 file changed, 15 insertions(+), 31 deletions(-) diff --git a/geode/pom.xml b/geode/pom.xml index f3041cee757..3fa7cb3ddd6 100644 --- a/geode/pom.xml +++ b/geode/pom.xml @@ -16,7 +16,8 @@ ~ limitations under the License. --> - + 4.0.0 @@ -33,8 +34,7 @@ http://geode.incubator.apache.org/ - 1.0.0-incubating-SNAPSHOT - + 1.0.0-incubating-SNAPSHOT @@ -45,19 +45,11 @@ provided - - - - org.apache.geode - gemfire-core - ${geode.version} - + + org.apache.geode + gemfire-core + ${geode.version} + org.apache.commons @@ -80,14 +72,14 @@ junit test - + - org.mockito - mockito-all - 1.9.5 - test - - + org.mockito + mockito-all + 1.9.5 + test + + @@ -156,12 +148,4 @@ - From 437c091b429f928e3cf51d0845f60d25ceded82f Mon Sep 17 00:00:00 2001 From: tzolov Date: Mon, 3 Aug 2015 09:57:01 +0200 Subject: [PATCH 6/9] ZEPPELIN-189: Improve javadoc documentation --- .../zeppelin/geode/GeodeOqlInterpreter.java | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java index a916e14bb2b..956c899af5a 100644 --- a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -44,14 +44,37 @@ *
  • {@code geode.locator.port} - The Geode Locator {@code } to connect to.
  • * *

    - * Sample usage:
    + * Sample usages:
    * {@code %geode.oql}
    - * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95} + * {@code SELECT * FROM /regionEmployee e WHERE e.companyId > 95}
    + * {@code SELECT * FROM /regionEmployee ORDER BY employeeId}
    + * {@code + * SELECT * FROM /regionEmployee + * WHERE companyId IN SET(1, 3, 7) OR lastName IN SET('NameA', 'NameB') + * }
    + * {@code + * SELECT e.employeeId, c.id as companyId FROM /regionEmployee e, /regionCompany c + * WHERE e.companyId = c.id + * } *

    - * - * The OQL spec and sample queries: + *

    + * OQL specification and sample queries: * http://geode-docs.cfapps.io/docs/getting_started/querying_quick_reference.html - * + *

    + *

    + * When the Zeppelin server is collocated with Geode Shell (gfsh) one can use the %sh interpreter to + * run Geode shell commands:
    + * {@code + * %sh + * source /etc/geode/conf/geode-env.sh + * gfsh << EOF + * connect --locator=ambari.localdomain[10334] + * destroy region --name=/regionEmployee + * create region --name=regionEmployee --type=REPLICATE + * exit; + * EOF + *} + *

    *

    * Known issue:http://gemfire.docs.pivotal.io/bugnotes/KnownIssuesGemFire810.html #43673 Using query * "select * from /exampleRegion.entrySet" fails in a client-server topology and/or in a @@ -60,6 +83,8 @@ */ public class GeodeOqlInterpreter extends Interpreter { + private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); + private static final String DEFAULT_PORT = "10334"; private static final String DEFAULT_HOST = "localhost"; @@ -67,8 +92,6 @@ public class GeodeOqlInterpreter extends Interpreter { private static final char TAB = '\t'; private static final char WHITESPACE = ' '; - Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); - private static final String TABLE_MAGIC_TAG = "%table "; public static final String LOCATOR_HOST = "geode.locator.host"; @@ -175,7 +198,9 @@ private InterpreterResult executeOql(String oql) { } /** - * For %table response replace Tab and Newline characters from the content. + * Zeppelin's %TABLE convention uses tab (\t) to delimit fields and new-line (\n) to delimit rows + * To complain with this convention we need to replace any occurrences of tab and/or newline + * characters in the content. */ private String replaceReservedChars(String str) { From 0ff81c95b5d2ad3153acf7f2de3799cb5c904039 Mon Sep 17 00:00:00 2001 From: tzolov Date: Wed, 5 Aug 2015 10:32:44 +0200 Subject: [PATCH 7/9] ZEPPELIN-189: Add Max number of OQL result to display --- .../zeppelin/geode/GeodeOqlInterpreter.java | 25 ++++++++++-- .../geode/GeodeOqlInterpreterTest.java | 40 ++++++++++++------- 2 files changed, 47 insertions(+), 18 deletions(-) diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java index 956c899af5a..f06ae84a4d6 100644 --- a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -42,6 +42,7 @@ *

      *
    • {@code geode.locator.host} - The Geode Locator {@code } to connect to.
    • *
    • {@code geode.locator.port} - The Geode Locator {@code } to connect to.
    • + *
    • {@code geode.max.result} - Max number of OQL result to display.
    • *
    *

    * Sample usages:
    @@ -87,6 +88,7 @@ public class GeodeOqlInterpreter extends Interpreter { private static final String DEFAULT_PORT = "10334"; private static final String DEFAULT_HOST = "localhost"; + private static final String DEFAULT_MAX_RESULT = "1000"; private static final char NEWLINE = '\n'; private static final char TAB = '\t'; @@ -96,16 +98,22 @@ public class GeodeOqlInterpreter extends Interpreter { public static final String LOCATOR_HOST = "geode.locator.host"; public static final String LOCATOR_PORT = "geode.locator.port"; + public static final String MAX_RESULT = "geode.max.result"; static { - Interpreter.register("oql", "geode", GeodeOqlInterpreter.class.getName(), + Interpreter.register( + "oql", + "geode", + GeodeOqlInterpreter.class.getName(), new InterpreterPropertyBuilder().add(LOCATOR_HOST, DEFAULT_HOST, "The Geode Locator Host.") - .add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port").build()); + .add(LOCATOR_PORT, DEFAULT_PORT, "The Geode Locator Port") + .add(MAX_RESULT, DEFAULT_MAX_RESULT, "Max number of OQL result to display.").build()); } private ClientCache clientCache = null; private QueryService queryService = null; private Exception exceptionOnConnect; + private int maxResult; public GeodeOqlInterpreter(Properties property) { super(property); @@ -125,9 +133,13 @@ protected ClientCache getClientCache() { @Override public void open() { logger.info("Geode open connection called!"); + try { + maxResult = Integer.valueOf(getProperty(MAX_RESULT)); + clientCache = getClientCache(); queryService = clientCache.getQueryService(); + exceptionOnConnect = null; logger.info("Successfully created Geode connection"); } catch (Exception e) { @@ -171,9 +183,12 @@ private InterpreterResult executeOql(String oql) { boolean isTableHeaderSet = false; Iterator iterator = results.iterator(); - while (iterator.hasNext()) { + int rowDisplayCount = 0; + + while (iterator.hasNext() && (rowDisplayCount < getMaxResult())) { Object entry = iterator.next(); + rowDisplayCount++; if (entry instanceof Number) { handleNumberEntry(isTableHeaderSet, entry, msg); @@ -286,6 +301,10 @@ public List completion(String buf, int cursor) { return null; } + public int getMaxResult() { + return maxResult; + } + // Test only QueryService getQueryService() { return this.queryService; diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java index 757c19f3687..b375ea1a149 100644 --- a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -46,34 +46,42 @@ public class GeodeOqlInterpreterTest { private static final String OQL_QUERY = "select * from /region"; + private static Iterator asIterator(Object... items) { + return new ArrayList(Arrays.asList(items)).iterator(); + } + @Test public void oqlNumberResponse() throws Exception { - testOql(new ArrayList(Arrays.asList(66, 67)).iterator(), "Result\n66\n67\n"); + testOql(asIterator(66, 67), "Result\n66\n67\n", 10); + testOql(asIterator(66, 67), "Result\n66\n", 1); } @Test public void oqlStructResponse() throws Exception { String[] fields = new String[] {"field1", "field2"}; - Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"val1", "val2"}); + Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"val11", "val12"}); + Struct s2 = new StructImpl(new StructTypeImpl(fields), new String[] {"val21", "val22"}); - testOql(new ArrayList(Arrays.asList(struct)).iterator(), - "field1\tfield2\t\nval1\tval2\t\n"); + testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\nval21\tval22\t\n", 10); + testOql(asIterator(s1, s2), "field1\tfield2\t\nval11\tval12\t\n", 1); } - + @Test public void oqlStructResponseWithReservedCharacters() throws Exception { String[] fields = new String[] {"fi\teld1", "f\nield2"}; - Struct struct = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"}); + Struct s1 = new StructImpl(new StructTypeImpl(fields), new String[] {"v\nal\t1", "val2"}); - testOql(new ArrayList(Arrays.asList(struct)).iterator(), - "fi eld1\tf ield2\t\nv al 1\tval2\t\n"); + testOql(asIterator(s1), "fi eld1\tf ield2\t\nv al 1\tval2\t\n", 10); } @Test public void oqlPdxInstanceResponse() throws Exception { ByteArrayInputStream bais = new ByteArrayInputStream("koza\tboza\n".getBytes()); - PdxInstance pdxInstance = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); - testOql(new ArrayList(Arrays.asList(pdxInstance)).iterator(), "\n\n"); + PdxInstance pdx1 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); + PdxInstance pdx2 = new PdxInstanceImpl(new PdxType(), new DataInputStream(bais), 4); + + testOql(asIterator(pdx1, pdx2), "\n\n\n", 10); + testOql(asIterator(pdx1, pdx2), "\n\n", 1); } private static class DummyUnspportedType { @@ -87,11 +95,12 @@ public String toString() { public void oqlUnsupportedTypeResponse() throws Exception { DummyUnspportedType unspported1 = new DummyUnspportedType(); DummyUnspportedType unspported2 = new DummyUnspportedType(); - testOql(new ArrayList(Arrays.asList(unspported1, unspported2)).iterator(), - "Unsuppoted Type\n" + unspported1.toString() + "\n" + unspported1.toString() + "\n"); + + testOql(asIterator(unspported1, unspported2), "Unsuppoted Type\n" + unspported1.toString() + + "\n" + unspported1.toString() + "\n", 10); } - private void testOql(Iterator queryResponseIterator, String expectedOutput) + private void testOql(Iterator queryResponseIterator, String expectedOutput, int maxResult) throws Exception { GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); @@ -99,6 +108,7 @@ private void testOql(Iterator queryResponseIterator, String expectedOutp QueryService mockQueryService = mock(QueryService.class, RETURNS_DEEP_STUBS); when(spyGeodeOqlInterpreter.getQueryService()).thenReturn(mockQueryService); + when(spyGeodeOqlInterpreter.getMaxResult()).thenReturn(maxResult); @SuppressWarnings("unchecked") SelectResults mockResults = mock(SelectResults.class); @@ -132,8 +142,8 @@ public void oqlWithExceptionOnConnect() throws Exception { GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(new Properties())); - when(spyGeodeOqlInterpreter.getQueryService()) - .thenThrow(new RuntimeException("Expected Test Exception!")); + when(spyGeodeOqlInterpreter.getQueryService()).thenThrow( + new RuntimeException("Expected Test Exception!")); InterpreterResult interpreterResult = spyGeodeOqlInterpreter.interpret(OQL_QUERY, null); From ab404e371368bef5ee19c56e548f96938f10b75d Mon Sep 17 00:00:00 2001 From: tzolov Date: Thu, 6 Aug 2015 00:31:26 +0200 Subject: [PATCH 8/9] ZEPPELIN-189: Add GeodeOqlInterpreter to ZeppelinConfiguration's default interpreters list --- .../java/org/apache/zeppelin/conf/ZeppelinConfiguration.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 223dc70caad..e25d3c0e63a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -394,7 +394,8 @@ public static enum ConfVars { + "org.apache.zeppelin.ignite.IgniteInterpreter," + "org.apache.zeppelin.ignite.IgniteSqlInterpreter," + "org.apache.zeppelin.lens.LensInterpreter," - + "org.apache.zeppelin.cassandra.CassandraInterpreter"), + + "org.apache.zeppelin.cassandra.CassandraInterpreter," + + "org.apache.zeppelin.geode.GeodeOqlInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), From c5eef90474ed3ece530bd7092de6f8403334cc89 Mon Sep 17 00:00:00 2001 From: tzolov Date: Thu, 6 Aug 2015 08:45:17 +0200 Subject: [PATCH 9/9] ZEPPELIN-189: Close previous connection before opening new one --- .../zeppelin/geode/GeodeOqlInterpreter.java | 9 +++++--- .../geode/GeodeOqlInterpreterTest.java | 23 +++++++++++++++++++ 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java index f06ae84a4d6..6f6b440830c 100644 --- a/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java +++ b/geode/src/main/java/org/apache/zeppelin/geode/GeodeOqlInterpreter.java @@ -86,9 +86,9 @@ public class GeodeOqlInterpreter extends Interpreter { private Logger logger = LoggerFactory.getLogger(GeodeOqlInterpreter.class); - private static final String DEFAULT_PORT = "10334"; - private static final String DEFAULT_HOST = "localhost"; - private static final String DEFAULT_MAX_RESULT = "1000"; + public static final String DEFAULT_PORT = "10334"; + public static final String DEFAULT_HOST = "localhost"; + public static final String DEFAULT_MAX_RESULT = "1000"; private static final char NEWLINE = '\n'; private static final char TAB = '\t'; @@ -134,6 +134,9 @@ protected ClientCache getClientCache() { public void open() { logger.info("Geode open connection called!"); + // Close the previous open connections. + close(); + try { maxResult = Integer.valueOf(getProperty(MAX_RESULT)); diff --git a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java index b375ea1a149..78755eba5a4 100644 --- a/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java +++ b/geode/src/test/java/org/apache/zeppelin/geode/GeodeOqlInterpreterTest.java @@ -14,12 +14,16 @@ */ package org.apache.zeppelin.geode; +import static org.apache.zeppelin.geode.GeodeOqlInterpreter.*; + import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.io.ByteArrayInputStream; import java.io.DataInputStream; @@ -50,6 +54,25 @@ private static Iterator asIterator(Object... items) { return new ArrayList(Arrays.asList(items)).iterator(); } + @Test + public void testOpenCommandIndempotency() { + + Properties properties = new Properties(); + properties.put(LOCATOR_HOST, DEFAULT_HOST); + properties.put(LOCATOR_PORT, DEFAULT_PORT); + properties.put(MAX_RESULT, DEFAULT_MAX_RESULT); + + GeodeOqlInterpreter spyGeodeOqlInterpreter = spy(new GeodeOqlInterpreter(properties)); + + // Ensure that an attempt to open new connection will clean any remaining connections + spyGeodeOqlInterpreter.open(); + spyGeodeOqlInterpreter.open(); + spyGeodeOqlInterpreter.open(); + + verify(spyGeodeOqlInterpreter, times(3)).open(); + verify(spyGeodeOqlInterpreter, times(3)).close(); + } + @Test public void oqlNumberResponse() throws Exception { testOql(asIterator(66, 67), "Result\n66\n67\n", 10);