From 3e6db43ce2b8257bffc0834cead3dd06bd33ad5a Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Mon, 18 May 2015 03:25:23 -0700 Subject: [PATCH 1/2] TAJO-1537: Implement a virtual table for sessions. --- .../InfoSchemaMetadataDictionary.java | 2 + .../dictionary/SessionTableDescriptor.java | 47 +++++++++++++++++++ .../NonForwardQueryResultSystemScanner.java | 32 +++++++++++++ .../apache/tajo/session/SessionManager.java | 10 ++++ ...estNonForwardQueryResultSystemScanner.java | 36 ++++++++++++++ 5 files changed, 127 insertions(+) create mode 100644 tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java index 1bb8bc503f..8312e0e962 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/InfoSchemaMetadataDictionary.java @@ -42,6 +42,7 @@ private static enum DEFINED_TABLES { PARTITIONS, PARTITION_KEYS, CLUSTER, + SESSION, MAX_TABLE; } @@ -63,6 +64,7 @@ private void createSystemTableDescriptors() { schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITIONS.ordinal(), new PartitionsTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.PARTITION_KEYS.ordinal(), new PartitionKeysTableDescriptor(this)); schemaInfoTableDescriptors.set(DEFINED_TABLES.CLUSTER.ordinal(), new ClusterTableDescriptor(this)); + schemaInfoTableDescriptors.set(DEFINED_TABLES.SESSION.ordinal(), new SessionTableDescriptor(this)); } public boolean isSystemDatabase(String databaseName) { diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java new file mode 100644 index 0000000000..98468607a2 --- /dev/null +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.dictionary; + +import org.apache.tajo.common.TajoDataTypes.Type; + +class SessionTableDescriptor extends AbstractTableDescriptor { + + private static final String TABLENAME = "session"; + private final ColumnDescriptor[] columns = new ColumnDescriptor[] { + new ColumnDescriptor("session_id", Type.TEXT, 0), + new ColumnDescriptor("username", Type.TEXT, 0), + new ColumnDescriptor("current_db", Type.TEXT, 0), + new ColumnDescriptor("last_access_time", Type.TIMESTAMP, 0) + }; + + public SessionTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) { + super(metadataDictionary); + } + + @Override + public String getTableNameString() { + return TABLENAME; + } + + @Override + protected ColumnDescriptor[] getColumnDescriptors() { + return columns; + } + +} \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 6ae96776ec..45258aecd0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -49,6 +49,7 @@ import org.apache.tajo.plan.logical.IndexScanNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.session.Session; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; @@ -541,6 +542,35 @@ private List getClusterInfo(Schema outSchema) { return tuples; } + + private List getSessionInfo(Schema outSchema) { + List sessions = masterContext.getSessionManager().getAllSessions(); + List tuples = new ArrayList(sessions.size()); + List columns = outSchema.getAllColumns(); + Tuple aTuple; + + for (Session sess : sessions) { + aTuple = new VTuple(outSchema.size()); + + for (int fieldId = 0; fieldId < columns.size(); fieldId++) { + Column column = columns.get(fieldId); + + if ("session_id".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(sess.getSessionId())); + } else if ("username".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(sess.getUserName())); + } else if ("current_db".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createText(sess.getCurrentDatabase())); + } else if ("last_access_time".equalsIgnoreCase(column.getSimpleName())) { + aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(sess.getLastAccessTime())); + } + } + + tuples.add(aTuple); + } + + return tuples; + } private List fetchSystemTable(TableDesc tableDesc, Schema inSchema) { List tuples = null; @@ -564,6 +594,8 @@ private List fetchSystemTable(TableDesc tableDesc, Schema inSchema) { tuples = getAllPartitions(inSchema); } else if ("cluster".equalsIgnoreCase(tableName)) { tuples = getClusterInfo(inSchema); + } else if ("session".equalsIgnoreCase(tableName)) { + tuples = getSessionInfo(inSchema); } return tuples; diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java index 5d66b2b612..36479c9842 100644 --- a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java @@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -92,6 +94,14 @@ public Session getSession(String sessionId) throws InvalidSessionException { return sessions.get(sessionId); } + public List getAllSessions() { + List current_sessions = new ArrayList(sessions.size()); + for (Map.Entry e : sessions.entrySet()) { + current_sessions.add(e.getValue()); + } + return current_sessions; + } + public void setVariable(String sessionId, String name, String value) throws InvalidSessionException { assertSessionExistence(sessionId); touch(sessionId); diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java index 01d4ec486e..90d972bbc6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -34,6 +34,7 @@ import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.client.TajoClient; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; @@ -313,4 +314,39 @@ public void testGetClusterDetails() throws Exception { assertThat(tuples.size(), is(2)); assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster")))); } + + @Test + public void testGetSessionDetails() throws Exception { + final String dbname = "INFORMATION_SCHEMA"; + + TajoClient client = testingCluster.newTajoClient(); + assertTrue(client.selectDatabase(dbname)); // now, cluster has one session. + + NonForwardQueryResultScanner queryResultScanner = + getScanner("SELECT CURRENT_DB FROM INFORMATION_SCHEMA.SESSION"); + + queryResultScanner.init(); + + List rowBytes = queryResultScanner.getNextRows(100); + + assertThat(rowBytes.size(), is(1)); + + RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); + List tuples = getTupleList(decoder, rowBytes); + + assertThat(tuples.size(), is(1)); + assertThat(tuples, hasItem(getTupleMatcher(0, is(dbname)))); + + client.close(); // no more active session + + queryResultScanner.close(); + queryResultScanner.init(); + queryResultScanner = getScanner("SELECT CURRENT_DB FROM INFORMATION_SCHEMA.SESSION"); + + rowBytes = queryResultScanner.getNextRows(100); + + assertThat(rowBytes.size(), is(0)); + + queryResultScanner.close(); + } } From 2554deccfe7d6a1628408f54e0341618aba3a913 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Mon, 18 May 2015 04:01:29 -0700 Subject: [PATCH 2/2] change unit tests and session table schema. --- .../dictionary/SessionTableDescriptor.java | 10 ++--- .../NonForwardQueryResultSystemScanner.java | 42 +++++++++---------- .../apache/tajo/session/SessionManager.java | 8 ---- .../tajo/engine/query/TestSelectQuery.java | 7 ++++ ...estNonForwardQueryResultSystemScanner.java | 35 ---------------- .../testSelectOnSessionTable.sql | 1 + .../testSelectOnSessionTable.result | 3 ++ 7 files changed, 35 insertions(+), 71 deletions(-) create mode 100644 tajo-core/src/test/resources/queries/TestSelectQuery/testSelectOnSessionTable.sql create mode 100644 tajo-core/src/test/resources/results/TestSelectQuery/testSelectOnSessionTable.result diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java index 98468607a2..935f72da2d 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/dictionary/SessionTableDescriptor.java @@ -23,12 +23,10 @@ class SessionTableDescriptor extends AbstractTableDescriptor { private static final String TABLENAME = "session"; - private final ColumnDescriptor[] columns = new ColumnDescriptor[] { - new ColumnDescriptor("session_id", Type.TEXT, 0), - new ColumnDescriptor("username", Type.TEXT, 0), - new ColumnDescriptor("current_db", Type.TEXT, 0), - new ColumnDescriptor("last_access_time", Type.TIMESTAMP, 0) - }; + private final ColumnDescriptor[] columns = + new ColumnDescriptor[] { + new ColumnDescriptor("name", Type.TEXT, 0), new ColumnDescriptor("value", Type.TEXT, 0) + }; public SessionTableDescriptor(InfoSchemaMetadataDictionary metadataDictionary) { super(metadataDictionary); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java index 45258aecd0..6c1399ed22 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultSystemScanner.java @@ -18,10 +18,12 @@ package org.apache.tajo.master.exec; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; +import org.apache.tajo.SessionVars; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.catalog.*; @@ -49,6 +51,7 @@ import org.apache.tajo.plan.logical.IndexScanNode; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.session.InvalidSessionException; import org.apache.tajo.session.Session; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; @@ -544,32 +547,22 @@ private List getClusterInfo(Schema outSchema) { } private List getSessionInfo(Schema outSchema) { - List sessions = masterContext.getSessionManager().getAllSessions(); - List tuples = new ArrayList(sessions.size()); - List columns = outSchema.getAllColumns(); - Tuple aTuple; - - for (Session sess : sessions) { - aTuple = new VTuple(outSchema.size()); + List outputs = Lists.newArrayList(); + Tuple eachVariable; - for (int fieldId = 0; fieldId < columns.size(); fieldId++) { - Column column = columns.get(fieldId); + try { + for (Map.Entry var: masterContext.getSessionManager().getAllVariables(sessionId).entrySet()) { + eachVariable = new VTuple(outSchema.size()); + eachVariable.put(0, DatumFactory.createText(var.getKey())); + eachVariable.put(1, DatumFactory.createText(var.getValue())); - if ("session_id".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(sess.getSessionId())); - } else if ("username".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(sess.getUserName())); - } else if ("current_db".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createText(sess.getCurrentDatabase())); - } else if ("last_access_time".equalsIgnoreCase(column.getSimpleName())) { - aTuple.put(fieldId, DatumFactory.createTimestmpDatumWithJavaMillis(sess.getLastAccessTime())); - } + outputs.add(eachVariable); } - - tuples.add(aTuple); + } catch (InvalidSessionException e) { + LOG.error(e); } - return tuples; + return outputs; } private List fetchSystemTable(TableDesc tableDesc, Schema inSchema) { @@ -689,7 +682,12 @@ class SystemPhysicalExec extends PhysicalExec { public SystemPhysicalExec(TaskAttemptContext context, ScanNode scanNode) { super(context, scanNode.getInSchema(), scanNode.getOutSchema()); this.scanNode = scanNode; - this.qual = this.scanNode.getQual(); + + if (this.scanNode.hasQual()) { + this.qual = this.scanNode.getQual(); + this.qual.bind(null, inSchema); + } + cachedData = TUtil.newList(); currentRow = 0; isClosed = false; diff --git a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java index 36479c9842..9c49fe1746 100644 --- a/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/session/SessionManager.java @@ -94,14 +94,6 @@ public Session getSession(String sessionId) throws InvalidSessionException { return sessions.get(sessionId); } - public List getAllSessions() { - List current_sessions = new ArrayList(sessions.size()); - for (Map.Entry e : sessions.entrySet()) { - current_sessions.add(e.getValue()); - } - return current_sessions; - } - public void setVariable(String sessionId, String name, String value) throws InvalidSessionException { assertSessionExistence(sessionId); touch(sessionId); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index 002f05de52..6bb4433898 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -743,4 +743,11 @@ public void testSelectWithParentheses2() throws Exception { assertResultSet(res); cleanupQuery(res); } + + @Test + public void testSelectOnSessionTable() throws Exception { + ResultSet res = executeQuery(); + assertResultSet(res); + cleanupQuery(res); + } } \ No newline at end of file diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java index 90d972bbc6..fb149b2b10 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestNonForwardQueryResultSystemScanner.java @@ -314,39 +314,4 @@ public void testGetClusterDetails() throws Exception { assertThat(tuples.size(), is(2)); assertThat(tuples, hasItem(getTupleMatcher(0, is("QueryMaster")))); } - - @Test - public void testGetSessionDetails() throws Exception { - final String dbname = "INFORMATION_SCHEMA"; - - TajoClient client = testingCluster.newTajoClient(); - assertTrue(client.selectDatabase(dbname)); // now, cluster has one session. - - NonForwardQueryResultScanner queryResultScanner = - getScanner("SELECT CURRENT_DB FROM INFORMATION_SCHEMA.SESSION"); - - queryResultScanner.init(); - - List rowBytes = queryResultScanner.getNextRows(100); - - assertThat(rowBytes.size(), is(1)); - - RowStoreDecoder decoder = RowStoreUtil.createDecoder(queryResultScanner.getLogicalSchema()); - List tuples = getTupleList(decoder, rowBytes); - - assertThat(tuples.size(), is(1)); - assertThat(tuples, hasItem(getTupleMatcher(0, is(dbname)))); - - client.close(); // no more active session - - queryResultScanner.close(); - queryResultScanner.init(); - queryResultScanner = getScanner("SELECT CURRENT_DB FROM INFORMATION_SCHEMA.SESSION"); - - rowBytes = queryResultScanner.getNextRows(100); - - assertThat(rowBytes.size(), is(0)); - - queryResultScanner.close(); - } } diff --git a/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectOnSessionTable.sql b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectOnSessionTable.sql new file mode 100644 index 0000000000..a953751577 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestSelectQuery/testSelectOnSessionTable.sql @@ -0,0 +1 @@ +select * from information_schema.session where name = 'CURRENT_DATABASE'; \ No newline at end of file diff --git a/tajo-core/src/test/resources/results/TestSelectQuery/testSelectOnSessionTable.result b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectOnSessionTable.result new file mode 100644 index 0000000000..b8797bafc4 --- /dev/null +++ b/tajo-core/src/test/resources/results/TestSelectQuery/testSelectOnSessionTable.result @@ -0,0 +1,3 @@ +name,value +------------------------------- +CURRENT_DATABASE,default \ No newline at end of file