Skip to content

Commit

Permalink
Internal: Add some @nullable annotations and fix related compilation …
Browse files Browse the repository at this point in the history
…warnings.

Added @nullable to:
 - IndicesService.indexService
 - IndexService.shard
 - IndexService.shardInjector

This change doesn't try to do anything smart but just makes sure that a
*MissingException is thrown instead of a NullPointerException when the requested
object doesn't exist.

Close #7251
  • Loading branch information
jpountz committed Aug 14, 2014
1 parent 084793f commit a4f974d
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 38 deletions.
Expand Up @@ -108,7 +108,7 @@ protected void resolveRequest(ClusterState state, InternalRequest request) {

@Override
protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId) throws ElasticsearchException {
IndexService indexService = indicesService.indexService(shardId.getIndex());
IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardId.id());
Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
Engine.GetResult result = indexShard.get(new Engine.Get(false, uidTerm));
Expand Down
Expand Up @@ -273,7 +273,10 @@ protected void doRun() {
listener.onResponse(update);
IndexService indexServiceOrNull = indicesService.indexService(request.concreteIndex());
if (indexServiceOrNull != null) {
indexService.shard(request.request().shardId()).indexingService().noopUpdate(request.request().type());
IndexShard shard = indexService.shard(request.request().shardId());
if (shard != null) {
shard.indexingService().noopUpdate(request.request().type());
}
}
break;
default:
Expand Down
Expand Up @@ -25,7 +25,10 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingClusterStateUpdateRequest;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.Tuple;
Expand Down Expand Up @@ -491,33 +494,28 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
Map<String, DocumentMapper> newMappers = newHashMap();
Map<String, DocumentMapper> existingMappers = newHashMap();
for (String index : request.indices()) {
IndexService indexService = indicesService.indexService(index);
if (indexService != null) {
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
// _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false);
} else {
newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()));
if (existingMapper != null) {
// first, simulate
DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true));
// if we have conflicts, and we are not supposed to ignore them, throw an exception
if (!request.ignoreConflicts() && mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.conflicts());
}
}
}

newMappers.put(index, newMapper);
IndexService indexService = indicesService.indexServiceSafe(index);
// try and parse it (no need to add it here) so we can bail early in case of parsing exception
DocumentMapper newMapper;
DocumentMapper existingMapper = indexService.mapperService().documentMapper(request.type());
if (MapperService.DEFAULT_MAPPING.equals(request.type())) {
// _default_ types do not go through merging, but we do test the new settings. Also don't apply the old default
newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()), false);
} else {
newMapper = indexService.mapperService().parse(request.type(), new CompressedString(request.source()));
if (existingMapper != null) {
existingMappers.put(index, existingMapper);
// first, simulate
DocumentMapper.MergeResult mergeResult = existingMapper.merge(newMapper, mergeFlags().simulate(true));
// if we have conflicts, and we are not supposed to ignore them, throw an exception
if (!request.ignoreConflicts() && mergeResult.hasConflicts()) {
throw new MergeMappingException(mergeResult.conflicts());
}
}
}

} else {
throw new IndexMissingException(new Index(index));
newMappers.put(index, newMapper);
if (existingMapper != null) {
existingMappers.put(index, existingMapper);
}
}

Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/elasticsearch/index/service/IndexService.java
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableSet;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.IndexShardMissingException;
Expand Down Expand Up @@ -79,12 +80,26 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {

boolean hasShard(int shardId);

/**
* Return the shard with the provided id, or null if there is no such shard.
*/
@Nullable
IndexShard shard(int shardId);

/**
* Return the shard with the provided id, or throw an exception if it doesn't exist.
*/
IndexShard shardSafe(int shardId) throws IndexShardMissingException;

/**
* Return the shard injector for the provided id, or null if there is no such shard.
*/
@Nullable
Injector shardInjector(int shardId);

/**
* Return the shard injector for the provided id, or throw an exception if there is no such shard.
*/
Injector shardInjectorSafe(int shardId) throws IndexShardMissingException;

String indexUUID();
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/elasticsearch/indices/IndicesService.java
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.service.IndexService;
Expand Down Expand Up @@ -62,6 +63,7 @@ public interface IndicesService extends Iterable<IndexService>, LifecycleCompone
* Even if the index name appeared in {@link #indices()} <code>null</code> can still be returned as an
* index maybe removed in the meantime, so preferable use the associated {@link IndexService} in order to prevent NPE.
*/
@Nullable
IndexService indexService(String index);

/**
Expand Down
Expand Up @@ -542,8 +542,8 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
continue;
}

if (indexService.hasShard(shardId)) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId);
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId);
if (indexShard != null) {
ShardRouting currentRoutingEntry = indexShard.routingEntry();
// if the current and global routing are initializing, but are still not the same, its a different "shard" being allocated
// for example: a shard that recovers from one node and now needs to recover to another node,
Expand All @@ -566,13 +566,10 @@ private void applyNewOrUpdatedShards(final ClusterChangedEvent event) throws Ela
}
}
}
}

if (indexService.hasShard(shardId)) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId);
if (!shardRouting.equals(indexShard.routingEntry())) {
indexShard.routingEntry(shardRouting);
indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class).routingStateChanged();
indexService.shardInjectorSafe(shardId).getInstance(IndexShardGatewayService.class).routingStateChanged();
}
}

Expand Down Expand Up @@ -740,7 +737,7 @@ private void applyInitializingShard(final RoutingTable routingTable, final Disco
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
boolean indexShouldExists = indexShardRouting.primaryAllocatedPostApi();
IndexShardGatewayService shardGatewayService = indexService.shardInjector(shardId).getInstance(IndexShardGatewayService.class);
IndexShardGatewayService shardGatewayService = indexService.shardInjectorSafe(shardId).getInstance(IndexShardGatewayService.class);
shardGatewayService.recover(indexShouldExists, new IndexShardGatewayService.RecoveryListener() {
@Override
public void onRecoveryDone() {
Expand Down
Expand Up @@ -152,9 +152,7 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
Store store = indexShard.store();
store.incRef();
try {
if (indexShard != null) {
return new StoreFilesMetaData(true, shardId, indexShard.store().getMetadata().asMap());
}
return new StoreFilesMetaData(true, shardId, indexShard.store().getMetadata().asMap());
} finally {
store.decRef();
}
Expand Down
Expand Up @@ -160,7 +160,7 @@ private Directory getStoreDirectory(String index, int shardId) {
Set<String> nodes = internalCluster().nodesInclude("test");
assertThat(nodes.isEmpty(), equalTo(false));
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodes.iterator().next());
InternalIndexShard indexShard = (InternalIndexShard) (indicesService.indexService(index).shard(shardId));
InternalIndexShard indexShard = (InternalIndexShard) (indicesService.indexService(index).shardSafe(shardId));
return indexShard.store().directory();
}
}

0 comments on commit a4f974d

Please sign in to comment.