Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into incremental_results
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed Apr 27, 2015
2 parents 032ab6f + 9a0cb9a commit 1cdac1d
Show file tree
Hide file tree
Showing 47 changed files with 1,996 additions and 254 deletions.
9 changes: 9 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ Changes for Crate
Unreleased
==========

- Fix: Prevent COPY FROM or INSERT into a partitioned table to run into timeouts
when creating many partitions at once

- update crate-admin to 0.12.0 which contains the following changes:

- display ``rest_url`` from sys.nodes table on node detail view

- Expose REST URL of node in ``sys.nodes`` table

- Fix: In some cases too few threads where started which could slow
down queries

Expand Down
2 changes: 1 addition & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ distZip {

ext {
downloadDir = new File(buildDir, 'downloads')
plugin_crateadmin_version = '0.11.5'
plugin_crateadmin_version = '0.12.0'
crash_version = '0.11.5'
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.core.collections;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

/**
* an ArrayBlockingQueue that only contains unique elements
* @param <T> type of the element
*/
public class UniqueBlockingQueue<T> {

private static enum PlaceHolder { DUMMY }
private final ConcurrentHashMap<T, PlaceHolder> queuedIndices;
private final BlockingQueue<T> requestQueue;

public UniqueBlockingQueue(int capacity) {
this.queuedIndices = new ConcurrentHashMap<>(capacity);
this.requestQueue = new ArrayBlockingQueue<>(capacity);
}

/**
* puts element to the queue if it is not already in the queue
* @param element the thing to add
* @return true if element was putted
*/
public boolean put(T element) throws InterruptedException {
PlaceHolder previous = queuedIndices.putIfAbsent(element, PlaceHolder.DUMMY);
if (previous == null) {
this.requestQueue.put(element);
}
return previous == null; // added
}

public T take() throws InterruptedException {
T element = requestQueue.take();
queuedIndices.remove(element);
return element;
}
}
4 changes: 2 additions & 2 deletions docs/clients/client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ Retrieving the returned columns using the ``cols()`` method::

will print::

["id", "name", "hostname", "port", "load", "mem", "heap", "fs", "version",
"thread_pools"]
["id", "name", "hostname", "rest_url", "port", "load", "mem", "heap",
"fs", "version", "thread_pools"]

Retrieving the returned rows using the ``rows()`` method::

Expand Down
11 changes: 11 additions & 0 deletions docs/sql/system.txt
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,17 @@ hostname
| | is running on. | |
+--------------+-------------------------------------------------+-------------+

rest_url
--------

+--------------+-----------------------------------------------------+-------------+
| Column Name | Description | Return Type |
+==============+=====================================================+=============+
| ``rest_url`` | Full http(s) address where the REST API of the node | ``String`` |
| | is exposed, including schema, hostname (or IP) | |
| | and port. | |
+--------------+-----------------------------------------------------+-------------+

port
----

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
package io.crate.executor.transport;

import io.crate.action.job.TransportJobAction;
import io.crate.executor.transport.merge.TransportDistributedResultAction;
import org.elasticsearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction;
import org.elasticsearch.action.admin.indices.create.TransportBulkCreateIndicesAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
Expand All @@ -46,7 +46,6 @@

public class TransportActionProvider {

private final Provider<TransportDistributedResultAction> transportDistributedResultActionProvider;
private final Provider<TransportFetchNodeAction> transportFetchNodeActionProvider;
private final Provider<TransportCloseContextNodeAction> transportCloseContextNodeActionProvider;

Expand All @@ -67,12 +66,12 @@ public class TransportActionProvider {
private final Provider<TransportPutMappingAction> transportPutMappingActionProvider;
private final Provider<TransportRefreshAction> transportRefreshActionProvider;
private final Provider<TransportUpdateSettingsAction> transportUpdateSettingsActionProvider;
private final Provider<TransportBulkCreateIndicesAction> transportBulkCreateIndicesActionProvider;

private final Provider<TransportJobAction> transportJobInitActionProvider;

@Inject
public TransportActionProvider(Provider<TransportDistributedResultAction> transportDistributedResultActionProvider,
Provider<TransportFetchNodeAction> transportFetchNodeActionProvider,
public TransportActionProvider(Provider<TransportFetchNodeAction> transportFetchNodeActionProvider,
Provider<TransportCloseContextNodeAction> transportCloseContextNodeActionProvider,
Provider<TransportCreateIndexAction> transportCreateIndexActionProvider,
Provider<TransportDeleteIndexAction> transportDeleteIndexActionProvider,
Expand All @@ -89,7 +88,9 @@ public TransportActionProvider(Provider<TransportDistributedResultAction> transp
Provider<TransportShardUpsertAction> transportShardUpsertActionProvider,
Provider<TransportPutMappingAction> transportPutMappingActionProvider,
Provider<TransportRefreshAction> transportRefreshActionProvider,
Provider<TransportUpdateSettingsAction> transportUpdateSettingsActionProvider, Provider<TransportJobAction> transportJobInitActionProvider) {
Provider<TransportUpdateSettingsAction> transportUpdateSettingsActionProvider,
Provider<TransportJobAction> transportJobInitActionProvider,
Provider<TransportBulkCreateIndicesAction> transportBulkCreateIndicesActionProvider) {
this.transportCreateIndexActionProvider = transportCreateIndexActionProvider;
this.transportDeleteIndexActionProvider = transportDeleteIndexActionProvider;
this.transportPutIndexTemplateActionProvider = transportPutIndexTemplateActionProvider;
Expand All @@ -103,20 +104,23 @@ public TransportActionProvider(Provider<TransportDistributedResultAction> transp
this.transportMultiGetActionProvider = transportMultiGetActionProvider;
this.symbolBasedTransportShardUpsertActionProvider = symbolBasedTransportShardUpsertActionProvider;
this.transportShardUpsertActionProvider = transportShardUpsertActionProvider;
this.transportDistributedResultActionProvider = transportDistributedResultActionProvider;
this.transportFetchNodeActionProvider = transportFetchNodeActionProvider;
this.transportCloseContextNodeActionProvider = transportCloseContextNodeActionProvider;
this.transportPutMappingActionProvider = transportPutMappingActionProvider;
this.transportRefreshActionProvider = transportRefreshActionProvider;
this.transportUpdateSettingsActionProvider = transportUpdateSettingsActionProvider;
this.transportJobInitActionProvider = transportJobInitActionProvider;
this.transportBulkCreateIndicesActionProvider = transportBulkCreateIndicesActionProvider;
}


public TransportCreateIndexAction transportCreateIndexAction() {
return transportCreateIndexActionProvider.get();
}

public TransportBulkCreateIndicesAction transportBulkCreateIndicesAction() {
return transportBulkCreateIndicesActionProvider.get();
}

public TransportDeleteIndexAction transportDeleteIndexAction() {
return transportDeleteIndexActionProvider.get();
}
Expand Down Expand Up @@ -169,10 +173,6 @@ public TransportJobAction transportJobInitAction() {
return transportJobInitActionProvider.get();
}

public TransportDistributedResultAction transportDistributedResultAction() {
return transportDistributedResultActionProvider.get();
}

public TransportFetchNodeAction transportFetchNodeAction() {
return transportFetchNodeActionProvider.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ public ImmutableList<Task> visitSymbolBasedUpsertByIdNode(SymbolBasedUpsertByIdN
settings,
transportActionProvider.symbolBasedTransportShardUpsertActionDelegate(),
transportActionProvider.transportCreateIndexAction(),
transportActionProvider.transportBulkCreateIndicesAction(),
bulkRetryCoordinatorPool,
node));
}
Expand All @@ -410,6 +411,7 @@ public ImmutableList<Task> visitUpsertByIdNode(UpsertByIdNode node, UUID jobId)
settings,
transportActionProvider.transportShardUpsertActionDelegate(),
transportActionProvider.transportCreateIndexAction(),
transportActionProvider.transportBulkCreateIndicesAction(),
bulkRetryCoordinatorPool,
node));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportBulkCreateIndicesAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.bulk.BulkRetryCoordinatorPool;
import org.elasticsearch.action.bulk.SymbolBasedBulkShardProcessor;
Expand All @@ -61,6 +62,7 @@ public class SymbolBasedUpsertByIdTask extends JobTask {

private final SymbolBasedTransportShardUpsertActionDelegate transportShardUpsertActionDelegate;
private final TransportCreateIndexAction transportCreateIndexAction;
private final TransportBulkCreateIndicesAction transportBulkCreateIndicesAction;
private final ClusterService clusterService;
private final SymbolBasedUpsertByIdNode node;
private final List<ListenableFuture<TaskResult>> resultList;
Expand All @@ -76,11 +78,13 @@ public SymbolBasedUpsertByIdTask(UUID jobId,
Settings settings,
SymbolBasedTransportShardUpsertActionDelegate transportShardUpsertActionDelegate,
TransportCreateIndexAction transportCreateIndexAction,
TransportBulkCreateIndicesAction transportBulkCreateIndicesAction,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
SymbolBasedUpsertByIdNode node) {
super(jobId);
this.transportShardUpsertActionDelegate = transportShardUpsertActionDelegate;
this.transportCreateIndexAction = transportCreateIndexAction;
this.transportBulkCreateIndicesAction = transportBulkCreateIndicesAction;
this.clusterService = clusterService;
this.node = node;
this.bulkRetryCoordinatorPool = bulkRetryCoordinatorPool;
Expand Down Expand Up @@ -175,7 +179,7 @@ public void onFailure(Throwable e) {
private List<ListenableFuture<TaskResult>> initializeBulkShardProcessor(Settings settings) {
bulkShardProcessor = new SymbolBasedBulkShardProcessor(
clusterService,
transportCreateIndexAction,
transportBulkCreateIndicesAction,
settings,
bulkRetryCoordinatorPool,
node.isPartitionedTable(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportBulkCreateIndicesAction;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.bulk.BulkRetryCoordinatorPool;
import org.elasticsearch.action.bulk.BulkShardProcessor;
Expand All @@ -62,6 +63,7 @@ public class UpsertByIdTask extends JobTask {

private final TransportShardUpsertActionDelegate transportShardUpsertActionDelegate;
private final TransportCreateIndexAction transportCreateIndexAction;
private final TransportBulkCreateIndicesAction transportBulkCreateIndicesAction;
private final ClusterService clusterService;
private final BulkRetryCoordinatorPool bulkRetryCoordinatorPool;
private final UpsertByIdNode node;
Expand All @@ -78,12 +80,14 @@ public UpsertByIdTask(UUID jobId,
Settings settings,
TransportShardUpsertActionDelegate transportShardUpsertActionDelegate,
TransportCreateIndexAction transportCreateIndexAction,
TransportBulkCreateIndicesAction transportBulkCreateIndicesAction,
BulkRetryCoordinatorPool bulkRetryCoordinatorPool,
UpsertByIdNode node) {
super(jobId);
this.transportShardUpsertActionDelegate = transportShardUpsertActionDelegate;
this.transportCreateIndexAction = transportCreateIndexAction;
this.clusterService = clusterService;
this.transportBulkCreateIndicesAction = transportBulkCreateIndicesAction;
this.bulkRetryCoordinatorPool = bulkRetryCoordinatorPool;
this.node = node;
autoCreateIndex = new AutoCreateIndex(settings);
Expand Down Expand Up @@ -191,7 +195,7 @@ private List<ListenableFuture<TaskResult>> initializeBulkShardProcessor(Settings
bulkShardProcessor = new BulkShardProcessor(
clusterService,
settings,
transportCreateIndexAction,
transportBulkCreateIndicesAction,
shardingProjector,
node.isPartitionedTable(),
false, // overwrite Duplicates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,17 @@ public boolean convertArrayReference(Reference arrayReference, Literal literal,

@Override
public boolean convertArrayLiteral(Reference reference, Literal arrayLiteral, Context context) throws IOException {
String refName = reference.info().ident().columnIdent().fqn();
context.builder.startObject("terms").field(refName);
String columnName = reference.info().ident().columnIdent().fqn();
context.builder.startObject(Fields.FILTERED);
context.builder.startObject(Fields.QUERY)
.startObject(Fields.MATCH_ALL).endObject()
.endObject();
context.builder.startObject(Fields.FILTER).startObject("terms").field(columnName);
context.builder.startArray();
for (Object value: toIterable(arrayLiteral.value())){
context.builder.value(value);
}
context.builder.endArray().endObject();
context.builder.endArray().endObject().endObject().endObject();
return true;
}
}
Expand Down Expand Up @@ -681,16 +685,23 @@ public boolean convertArrayReference(Reference arrayReference, Literal literal,
public boolean convertArrayLiteral(Reference reference, Literal arrayLiteral, Context context) throws IOException {
// col != ANY ([1,2,3]) --> not(col=1 and col=2 and col=3)
String columnName = reference.info().ident().columnIdent().fqn();
context.builder.startObject("bool").startObject("must_not")
.startObject("bool").startArray("must");
context.builder.startObject(Fields.FILTERED)
.startObject(Fields.QUERY)
.startObject(Fields.MATCH_ALL).endObject()
.endObject()
.startObject(Fields.FILTER)
.startObject("bool").startObject("must_not")
.startObject("bool")
.startArray("must");
for (Object value: toIterable(arrayLiteral.value())) {
context.builder.startObject()
.startObject("term").field(columnName, value).endObject()
.endObject();
}
context.builder.endArray().endObject();

context.builder.endObject().endObject();
context.builder.endArray()
.endObject()
.endObject().endObject()
.endObject().endObject();
return true;
}
}
Expand Down
Loading

0 comments on commit 1cdac1d

Please sign in to comment.