Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Create/Update/Delete classes less mutable #5939

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -410,7 +410,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
Engine.IndexingOperation op;
try {
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
}
Expand All @@ -419,7 +419,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
}
Expand All @@ -443,7 +443,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
}

private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
Expand Down Expand Up @@ -561,14 +561,12 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());

if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA);
indexShard.create(create);
}
} catch (Throwable e) {
Expand All @@ -577,8 +575,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version())
.versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
} catch (Throwable e) {
// ignore, we are on backup
Expand Down
Expand Up @@ -185,9 +185,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteRequ
protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with teh version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
Expand All @@ -211,8 +209,7 @@ protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);

indexShard.delete(delete);

Expand Down
Expand Up @@ -93,8 +93,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, ShardDelet
protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the version to happen on the replicas
request.version(delete.version());
Expand All @@ -116,11 +115,10 @@ protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperatio
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.REPLICA);

// IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version
delete.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery());
delete = new Engine.Delete(delete, VersionType.INTERNAL.versionTypeForReplicationAndRecovery());

assert delete.versionType().validateVersion(delete.version());

Expand Down
Expand Up @@ -119,8 +119,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.PRIMARY, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
indexShard.deleteByQuery(deleteByQuery);
} finally {
Expand All @@ -142,8 +141,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.REPLICA);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.REPLICA, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
indexShard.deleteByQuery(deleteByQuery);
} finally {
Expand Down
Expand Up @@ -191,10 +191,7 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
boolean created;
Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
}
Expand All @@ -203,10 +200,8 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
}
Expand Down Expand Up @@ -240,14 +235,11 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.create(create);
}
if (request.refresh()) {
Expand Down