Skip to content

Commit

Permalink
remove more unused code
Browse files Browse the repository at this point in the history
  • Loading branch information
mfussenegger committed May 7, 2015
1 parent 64940d4 commit 57048c7
Show file tree
Hide file tree
Showing 23 changed files with 33 additions and 184 deletions.
2 changes: 1 addition & 1 deletion sql/src/main/java/io/crate/action/job/ContextPreparer.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public Void visitMergeNode(final MergeNode node, final PreparerContext context)
context.ramAccountingContext,
Optional.of(threadPool.executor(ThreadPool.Names.SEARCH))
);
StreamerVisitor.Context streamerContext = streamerVisitor.processPlanNode(node, context.ramAccountingContext);
StreamerVisitor.Context streamerContext = streamerVisitor.processPlanNode(node);
PageDownstreamContext pageDownstreamContext = new PageDownstreamContext(
pageDownstream, streamerContext.inputStreamers(), node.numUpstreams());

Expand Down
2 changes: 0 additions & 2 deletions sql/src/main/java/io/crate/analyze/EvaluatingNormalizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public class EvaluatingNormalizer {
private final ReferenceResolver referenceResolver;
private final FieldResolver fieldResolver;
private final BaseVisitor visitor;
private final boolean inPlace;


/**
Expand All @@ -78,7 +77,6 @@ public EvaluatingNormalizer(
this.granularity = granularity;
this.referenceResolver = referenceResolver;
this.fieldResolver = fieldResolver;
this.inPlace = inPlace;
if (inPlace) {
this.visitor = new InPlaceVisitor();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public abstract class WhereClauseValidator {

public static void validate(WhereClause whereClause) {
if (whereClause.hasQuery()){
visitor.process(whereClause.query(), new Visitor.Context(whereClause));
visitor.process(whereClause.query(), new Visitor.Context());
}
}

Expand All @@ -29,10 +29,8 @@ private static class Visitor extends SymbolVisitor<Visitor.Context, Symbol> {
public static class Context {

public final Stack<Function> functions = new Stack<>();
private final WhereClause whereClause;

public Context(WhereClause whereClause) {
this.whereClause = whereClause;
public Context() {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void start() {
assert mergeNode != null : "mergeNode must not be null";
RamAccountingContext ramAccountingContext = trackOperation(mergeNode, "localMerge");

Streamer<?>[] streamers = streamerVisitor.processExecutionNode(mergeNode, ramAccountingContext).inputStreamers();
Streamer<?>[] streamers = streamerVisitor.processExecutionNode(mergeNode).inputStreamers();
final PageDownstreamContext pageDownstreamContext = createPageDownstreamContext(ramAccountingContext, streamers);
Map<String, Collection<ExecutionNode>> nodesByServer = groupExecutionNodesByServer(executionNodes);
if (nodesByServer.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,37 +87,9 @@ public void writeTo(StreamOutput out) throws IOException {

public static class Response implements Streamable {

private String id;
private long version;
private boolean created;

Response() {
}

public Response(String id, long version, boolean created) {
this.id = id;
this.version = version;
this.created = created;
}

public String id() {
return this.id;
}

/**
* Returns the current version of the doc indexed.
*/
public long version() {
return this.version;
}

/**
* Returns true if document was created due to an UPSERT operation
*/
public boolean created() {
return this.created;
}


public static Response readResponse(StreamInput in) throws IOException {
Response response = new Response();
response.readFrom(in);
Expand All @@ -126,37 +98,22 @@ public static Response readResponse(StreamInput in) throws IOException {

@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readString();
version = in.readLong();
created = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeLong(version);
out.writeBoolean(created);
}
}



private String index;
private IntArrayList locations = new IntArrayList();
private List<Response> responses = new ArrayList<>();
private List<Failure> failures = new ArrayList<>();

public ShardUpsertResponse() {
}

public ShardUpsertResponse(String index) {
this.index = index;
}

public String index() {
return this.index;
}

public void add(int location, Response response) {
locations.add(location);
responses.add(response);
Expand Down Expand Up @@ -187,7 +144,6 @@ public List<Failure> failures() {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
index = in.readSharedString();
int size = in.readVInt();
locations = new IntArrayList(size);
responses = new ArrayList<>(size);
Expand All @@ -210,7 +166,6 @@ public void readFrom(StreamInput in) throws IOException {
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeSharedString(index);
out.writeVInt(locations.size());
for (int i = 0; i < locations.size(); i++) {
out.writeVInt(locations.get(i));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,20 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {

@Override
protected PrimaryResponse<ShardUpsertResponse, SymbolBasedShardUpsertRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardUpsertResponse shardUpsertResponse = new ShardUpsertResponse(shardRequest.shardId.getIndex());
ShardUpsertResponse shardUpsertResponse = new ShardUpsertResponse();
SymbolBasedShardUpsertRequest request = shardRequest.request;
for (int i = 0; i < request.itemIndices().size(); i++) {
int location = request.itemIndices().get(i);
SymbolBasedShardUpsertRequest.Item item = request.items().get(i);
try {
IndexResponse indexResponse = indexItem(
indexItem(
request,
item,
shardRequest.shardId,
item.insertValues() != null, // try insert first
0);
shardUpsertResponse.add(location,
new ShardUpsertResponse.Response(
item.id(),
indexResponse.getVersion(),
indexResponse.isCreated()));
new ShardUpsertResponse.Response());
} catch (Throwable t) {
if (TransportActions.isShardNotAvailableException(t) || !request.continueOnError()) {
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ protected ShardIterator shards(ClusterState state, InternalRequest request) {

@Override
protected PrimaryResponse<ShardUpsertResponse, ShardUpsertRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardUpsertResponse shardUpsertResponse = new ShardUpsertResponse(shardRequest.shardId.getIndex());
ShardUpsertResponse shardUpsertResponse = new ShardUpsertResponse();
ShardUpsertRequest request = shardRequest.request;
SymbolToFieldExtractorContext extractorContextUpdate = null;
SymbolToInputContext implContextInsert = null;
Expand All @@ -178,18 +178,14 @@ protected PrimaryResponse<ShardUpsertResponse, ShardUpsertRequest> shardOperatio
while (it.hasNext()) {
ShardUpsertRequest.Item item = it.next();
try {
IndexResponse indexResponse = indexItem(
indexItem(
request,
item, shardRequest.shardId,
extractorContextUpdate,
implContextInsert,
request.insertAssignments() != null, // try insert first
0);
shardUpsertResponse.add(item.location(),
new ShardUpsertResponse.Response(
item.id(),
indexResponse.getVersion(),
indexResponse.isCreated()));
shardUpsertResponse.add(item.location(), new ShardUpsertResponse.Response());
} catch (Throwable t) {
if (TransportActions.isShardNotAvailableException(t) || !request.continueOnError()) {
throw t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@

package io.crate.executor.transport.distributed;

import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.crate.Streamer;
import io.crate.core.collections.Bucket;
import io.crate.core.collections.Row;
Expand All @@ -37,24 +35,9 @@
import java.util.List;


@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
public class MultiBucketBuilder {

public static final Function<StreamBucket.Builder, Bucket> BUILD_FUNCTION_THREAD_SAFE =
new Function<StreamBucket.Builder, Bucket>() {
@Nullable
@Override
public Bucket apply(StreamBucket.Builder input) {
synchronized (input) {
try {
return input.build();
} catch (IOException e) {
Throwables.propagate(e);
}
}
return null;
}
};

private final List<StreamBucket.Builder> bucketBuilders;

public MultiBucketBuilder(Streamer<?>[] streamers, int numBuckets) {
Expand All @@ -64,10 +47,6 @@ public MultiBucketBuilder(Streamer<?>[] streamers, int numBuckets) {
}
}

public List<Bucket> build() {
return Lists.transform(bucketBuilders, BUILD_FUNCTION_THREAD_SAFE);
}

public Bucket build(int bucketIdx) {
Bucket bucket = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import io.crate.executor.TaskResult;
import io.crate.executor.transport.ShardUpsertResponse;
import io.crate.executor.transport.SymbolBasedShardUpsertRequest;
import io.crate.metadata.settings.CrateSettings;
import io.crate.jobs.JobContextService;
import io.crate.jobs.JobExecutionContext;
import io.crate.jobs.UpsertByIdContext;
import io.crate.metadata.settings.CrateSettings;
import io.crate.planner.node.dml.SymbolBasedUpsertByIdNode;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
Expand All @@ -47,8 +47,6 @@
import org.elasticsearch.action.bulk.SymbolBasedTransportShardUpsertActionDelegate;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndexAlreadyExistsException;
Expand All @@ -75,8 +73,6 @@ public class SymbolBasedUpsertByIdTask extends JobTask {
@Nullable
private SymbolBasedBulkShardProcessor<SymbolBasedShardUpsertRequest, ShardUpsertResponse> bulkShardProcessor;

private static final ESLogger logger = Loggers.getLogger(SymbolBasedUpsertByIdTask.class);

public SymbolBasedUpsertByIdTask(UUID jobId,
ClusterService clusterService,
Settings settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ public void addReference(Reference reference) {
references.add(reference);
}

public List<Reference> references() {
return references;
}

public String[] referenceNames() {
if (referenceNames == null) {
referenceNames = Lists.transform(references, new com.google.common.base.Function<Reference, String>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,6 @@ public Input<?> collectExpressionFor(InputColumn inputColumn) {
}
}

public Context process(Symbol... symbols) {
Context context = new Context();
for (Symbol symbol : symbols) {
process(symbol, context);
}
return context;
}

public Context process(Collection<Symbol> symbols) {
Context context = new Context();
for (Symbol symbol : symbols) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ public JobCollectContext(UUID jobId, RamAccountingContext ramAccountingContext,
this.downstream = downstream;
}

public UUID id() {
return id;
}

public RamAccountingContext ramAccountingContext() {
return ramAccountingContext;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public static class Context extends AbstractImplementationSymbolVisitor.Context
private Row row;
private List<Reference> references = new ArrayList<>();

private int inputIndexPartitionedBy = 0;
private Row partitionByRow;
private List<ReferenceInfo> partitionedBy;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ public ResultProvider createDownstream(ExecutionNode node, UUID jobId) {
}

protected Streamer<?>[] getStreamers(ExecutionNode node) {
return streamerVisitor.processExecutionNode(node, null).outputStreamers();
return streamerVisitor.processExecutionNode(node).outputStreamers();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,6 @@ public Row poll() {
}
}

public boolean isEmpty() {
synchronized (lock) {
return rows.isEmpty();
}
}

public boolean emitUntil(Row until) {
synchronized (lock) {
while (firstRow != null && ordering.compare(firstRow, until) >= 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ public Projector process(Projection projection, RamAccountingContext ramAccounti
return super.process(projection, new Context(ramAccountingContext));
}

public Projector process(Projection projection,
RamAccountingContext ramAccountingContext,
Optional<UUID> jobId) {
return super.process(projection, new Context(ramAccountingContext, jobId));
}

public Projector process(Projection projection,
RamAccountingContext ramAccountingContext,
Optional<UUID> jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.elasticsearch.action.bulk.BulkRetryCoordinatorPool;
import org.elasticsearch.action.bulk.SymbolBasedBulkShardProcessor;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.shard.ShardId;
Expand All @@ -51,7 +49,6 @@

public class UpdateProjector implements Projector, RowDownstreamHandle {

private static final ESLogger LOGGER = Loggers.getLogger(UpdateProjector.class);
public static final int DEFAULT_BULK_SIZE = 1024;

private RowDownstreamHandle downstream;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public boolean consume(AnalyzedRelation rootRelation, ConsumerContext context) {
private static class Context {
ConsumerContext consumerContext;
boolean result = false;
boolean insertVisited = false;
ColumnIndexWriterProjection indexWriterProjection;

public Context(ConsumerContext context){
this.consumerContext = context;
Expand Down Expand Up @@ -88,9 +86,6 @@ public AnalyzedRelation visitInsertFromQuery(InsertFromSubQueryAnalyzedStatement
insertFromSubQueryAnalyzedStatement.tableInfo().isPartitioned()
);

context.insertVisited = true;
context.indexWriterProjection = indexWriterProjection;

AnalyzedRelation innerRelation = insertFromSubQueryAnalyzedStatement.subQueryRelation();
if (innerRelation instanceof PlannedAnalyzedRelation) {
PlannedAnalyzedRelation analyzedRelation = (PlannedAnalyzedRelation)innerRelation;
Expand Down
Loading

0 comments on commit 57048c7

Please sign in to comment.