Skip to content

Commit

Permalink
implemented _nodes sys column on sys.operations table
Browse files Browse the repository at this point in the history
  • Loading branch information
dobe committed Jan 16, 2015
1 parent f7b533f commit 50097a4
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 50 deletions.
21 changes: 9 additions & 12 deletions docs/sql/system.txt
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,14 @@ A ``operation`` is part of a ``job`` but not all jobs list their operations. A
operation is listed on each node it is being executed on.

In order to see on which nodes the operations are being executed the
``sys.nodes.name`` expression can be used in the query::

cr> select sys.nodes.name, job_id, name, used_bytes from sys.operations order by name limit 1;
+----------------+--------...-+---------+------------+
| sys.nodes.name | job_id | name | used_bytes |
+----------------+--------...-+---------+------------+
| crate | ... | collect | ... |
+----------------+--------...-+---------+------------+
``_node`` system column can be used in the query::

cr> select _node['name'], job_id, name, used_bytes from sys.operations order by name limit 1;
+---------------+--------...-+---------+------------+
| _node['name'] | job_id | name | used_bytes |
+---------------+--------...-+---------+------------+
| crate | ... | collect | ... |
+---------------+--------...-+---------+------------+
SELECT 1 row in set (... sec)

Logs
Expand All @@ -573,7 +573,7 @@ limit old entries will be discarded as new entries are added::
+-...+------------------------------------...-+-...-----+-...---+-------+
| id | stmt | started | ended | error |
+-...+------------------------------------...-+-...-----+-...---+-------+
| ...| select sys.nodes.name, ... | ... | ... | NULL |
| ...| select _node['name'], ... | ... | ... | NULL |
| ...| select stmt, started from sys.jobs ... | ... | ... | NULL |
+-...+------------------------------------...-+-...-----+-...---+-------+
SELECT 2 rows in set (... sec)
Expand Down Expand Up @@ -649,9 +649,6 @@ Note, that querying a time setting will always return a ``string`` value::

::

cr> select settings['cluster']['graceful_stop']['not_a_setting'] from sys.cluster;
SQLActionException[Unknown Reference sys.cluster.settings['cluster']['graceful_stop']['not_a_setting']]

The default configuration in ``crate.yml`` looks like:

.. code-block:: yaml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

public class SysOperationsTableInfo extends SysTableInfo {

private final TableColumn nodesTableColumn;

public static class ColumnNames {
public final static String ID = "id";
public final static String JOB_ID = "job_id";
Expand All @@ -46,13 +48,13 @@ public static class ColumnNames {
public static final TableIdent IDENT = new TableIdent(SCHEMA, "operations");
private static final String[] INDICES = new String[] { IDENT.name() };

private static final Map<ColumnIdent, ReferenceInfo> COLUMNS_INFO = new LinkedHashMap<>();
private static final Map<ColumnIdent, ReferenceInfo> INFOS = new LinkedHashMap<>();
private static final LinkedHashSet<ReferenceInfo> columns = new LinkedHashSet<>();

private static ReferenceInfo register(String column, DataType type) {
ReferenceInfo info = new ReferenceInfo(new ReferenceIdent(IDENT, column), RowGranularity.DOC, type);
columns.add(info);
COLUMNS_INFO.put(info.ident().columnIdent(), info);
INFOS.put(info.ident().columnIdent(), info);
return info;
}

Expand All @@ -62,22 +64,31 @@ private static ReferenceInfo register(String column, DataType type) {
register(ColumnNames.NAME, DataTypes.STRING);
register(ColumnNames.STARTED, DataTypes.TIMESTAMP);
register(ColumnNames.USED_BYTES, DataTypes.LONG);

INFOS.put(SysNodesTableInfo.SYS_COL_IDENT, SysNodesTableInfo.tableColumnInfo(IDENT));
}

@Inject
public SysOperationsTableInfo(ClusterService clusterService, SysSchemaInfo sysSchemaInfo) {
public SysOperationsTableInfo(ClusterService clusterService,
SysSchemaInfo sysSchemaInfo,
SysNodesTableInfo sysNodesTableInfo) {
super(clusterService, sysSchemaInfo);
nodesTableColumn = sysNodesTableInfo.tableColumn();
}

@Nullable
@Override
public ReferenceInfo getReferenceInfo(ColumnIdent columnIdent) {
return columnInfo(columnIdent);
ReferenceInfo info = columnInfo(columnIdent);
if (info == null) {
return nodesTableColumn.getReferenceInfo(this.ident(), columnIdent);
}
return info;
}

@Nullable
public static ReferenceInfo columnInfo(ColumnIdent ident) {
return COLUMNS_INFO.get(ident);
return INFOS.get(ident);
}

@Override
Expand Down Expand Up @@ -112,6 +123,6 @@ public String[] concreteIndices() {

@Override
public Iterator<ReferenceInfo> iterator() {
return COLUMNS_INFO.values().iterator();
return INFOS.values().iterator();
}
}
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/metadata/sys/SysSchemaInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public SysSchemaInfo(ClusterService clusterService) {
.put(SysShardsTableInfo.IDENT.name(), new SysShardsTableInfo(clusterService, this, sysNodesTableInfo))
.put(SysJobsTableInfo.IDENT.name(), new SysJobsTableInfo(clusterService, this))
.put(SysJobsLogTableInfo.IDENT.name(), new SysJobsLogTableInfo(clusterService, this))
.put(SysOperationsTableInfo.IDENT.name(), new SysOperationsTableInfo(clusterService, this))
.put(SysOperationsTableInfo.IDENT.name(), new SysOperationsTableInfo(clusterService, this, sysNodesTableInfo))
.put(SysOperationsLogTableInfo.IDENT.name(), new SysOperationsLogTableInfo(clusterService, this))
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,6 @@ private static ReferenceInfo register(String column, DataType type, List<String>
columns.add(info);
}
INFOS.put(info.ident().columnIdent(), info);



return info;
}

Expand Down
87 changes: 87 additions & 0 deletions sql/src/test/java/io/crate/integrationtests/SysOperationsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.integrationtests;

import io.crate.action.sql.SQLResponse;
import io.crate.test.integration.ClassLifecycleIntegrationTest;
import io.crate.testing.SQLTransportExecutor;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.core.Is.is;

public class SysOperationsTest extends ClassLifecycleIntegrationTest {

private SQLTransportExecutor executor;

@Before
public void before() throws Exception {
executor = SQLTransportExecutor.create(ClassLifecycleIntegrationTest.GLOBAL_CLUSTER);
executor.exec("set global stats.enabled = true");
}

@After
public void after() {
executor.exec("set global stats.enabled = false");
}

@Test
public void testDistinctSysOperations() throws Exception {
// this tests a distributing collect without shards but DOC level granularity
SQLResponse response = executor.exec("select distinct name from sys.operations limit 1");
assertThat(response.rowCount(), is(1L));
}

@Test
public void testQueryNameFromSysOperations() throws Exception {
SQLResponse resp = executor.exec("select name, job_id from sys.operations order by name asc");

// usually this should return collect on 2 nodes, localMerge on 1 node
// but it could be that the collect is finished before the localMerge task is started in which
// case it is missing.

assertThat(resp.rowCount(), Matchers.greaterThanOrEqualTo(2L));
List<String> names = new ArrayList<>();
for (Object[] objects : resp.rows()) {
names.add((String) objects[0]);
}
Collections.sort(names);
assertTrue(names.contains("collect"));
}

@Test
public void testNodeExpressionOnSysOperations() throws Exception {
executor.exec("select * from sys.nodes");
SQLResponse response = executor.exec("select _node['name'], id from sys.operations limit 1");
assertThat(response.rowCount(), is(1L));
assertThat(response.rows()[0][0].toString(), startsWith("node"));
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -553,25 +552,6 @@ public void testJobLog() throws Exception {
assertThat(response.rowCount(), is(0L));
}

@Test
public void testQueryNameFromSysOperations() throws Exception {
executor.exec("set global stats.enabled = true");
SQLResponse resp = executor.exec("select name, job_id from sys.operations order by name asc");

// usually this should return collect on 2 nodes, localMerge on 1 node
// but it could be that the collect is finished before the localMerge task is started in which
// case it is missing.

assertThat(resp.rowCount(), Matchers.greaterThanOrEqualTo(2L));
List<String> names = new ArrayList<>();
for (Object[] objects : resp.rows()) {
names.add((String) objects[0]);
}
Collections.sort(names);
assertTrue(names.contains("collect"));
executor.exec("set global stats.enabled = false");
}

@Test
public void testSetSingleStatement() throws Exception {
SQLResponse response = executor.exec("select settings['stats']['jobs_log_size'] from sys.cluster");
Expand Down Expand Up @@ -688,14 +668,6 @@ public void testSelectFromJobsLogWithLimit() throws Exception {
executor.exec("reset global stats.enabled");
}

@Test
public void testDistinctSysOperations() throws Exception {
// this tests a distributing collect without shards but DOC level granularity
SQLResponse response = executor.exec("select distinct name from sys.operations");
// no data since stats.enabled is disabled
assertThat(response.rowCount(), is(0L));
}

@Test
public void testAddPrimaryKeyColumnToNonEmptyTable() throws Exception {
expectedException.expect(SQLActionException.class);
Expand Down

0 comments on commit 50097a4

Please sign in to comment.