Skip to content

Commit

Permalink
Make Create/Update/Delete classes less mutable
Browse files Browse the repository at this point in the history
Today we use a builder pattern / setters to set relevant information
to Engine#Delete|Create|Index. Yet almost all the values are required
but they are not passed via ctor arguments but via an error prone builder
pattern. If we add a required argument we should see compile errors on that
level to make sure we don't miss any place to set them.

Prerequisite for elastic#5917
  • Loading branch information
s1monw committed Apr 25, 2014
1 parent d0f8742 commit 019fb4c
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 310 deletions.
Expand Up @@ -410,7 +410,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
Engine.IndexingOperation op;
try {
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
}
Expand All @@ -419,7 +419,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse).version(indexRequest.version()).versionType(indexRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
Engine.Create create = indexShard.prepareCreate(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) {
mappingsToUpdate = Tuple.tuple(indexRequest.index(), indexRequest.type());
}
Expand All @@ -443,7 +443,7 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
}

private WriteResult shardDeleteOperation(DeleteRequest deleteRequest, IndexShard indexShard) {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version()).versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.PRIMARY);
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with the version so it will go to the replicas
deleteRequest.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
Expand Down Expand Up @@ -561,14 +561,12 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()).ttl(indexRequest.ttl());

if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Index index = indexShard.prepareIndex(sourceToParse, indexRequest.version(), indexRequest.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(indexRequest.version()).versionType(indexRequest.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Create create = indexShard.prepareCreate(sourceToParse,
indexRequest.version(), indexRequest.versionType(),
Engine.Operation.Origin.REPLICA);
indexShard.create(create);
}
} catch (Throwable e) {
Expand All @@ -577,8 +575,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version())
.versionType(deleteRequest.versionType()).origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(deleteRequest.type(), deleteRequest.id(), deleteRequest.version(), deleteRequest.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
} catch (Throwable e) {
// ignore, we are on backup
Expand Down
Expand Up @@ -185,9 +185,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, DeleteRequ
protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the request with teh version so it will go to the replicas
request.versionType(delete.versionType().versionTypeForReplicationAndRecovery());
Expand All @@ -211,8 +209,7 @@ protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);

indexShard.delete(delete);

Expand Down
Expand Up @@ -93,8 +93,7 @@ protected ClusterBlockException checkRequestBlock(ClusterState state, ShardDelet
protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
// update the version to happen on the replicas
request.version(delete.version());
Expand All @@ -116,11 +115,10 @@ protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperatio
protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version(), VersionType.INTERNAL, Engine.Operation.Origin.REPLICA);

// IndexDeleteAction doesn't support version type at the moment. Hard coded for the INTERNAL version
delete.versionType(VersionType.INTERNAL.versionTypeForReplicationAndRecovery());
delete = new Engine.Delete(delete, VersionType.INTERNAL.versionTypeForReplicationAndRecovery());

assert delete.versionType().validateVersion(delete.version());

Expand Down
Expand Up @@ -119,8 +119,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>
indexShard.acquireSearcher("delete_by_query"), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.PRIMARY, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
indexShard.deleteByQuery(deleteByQuery);
} finally {
Expand All @@ -142,8 +141,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
indexShard.acquireSearcher("delete_by_query", IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays));
try {
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), request.types())
.origin(Engine.Operation.Origin.REPLICA);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.source(), request.filteringAliases(), Engine.Operation.Origin.REPLICA, request.types());
SearchContext.current().parsedQuery(new ParsedQuery(deleteByQuery.query(), ImmutableMap.<String, Filter>of()));
indexShard.deleteByQuery(deleteByQuery);
} finally {
Expand Down
Expand Up @@ -191,10 +191,7 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
boolean created;
Engine.IndexingOperation op;
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
if (index.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
}
Expand All @@ -203,10 +200,8 @@ protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(C
op = index;
created = index.created();
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.PRIMARY);
if (create.parsedDoc().mappingsModified()) {
mappingUpdatedAction.updateMappingOnMaster(request.index(), request.type(), indexMetaData.getUUID(), false);
}
Expand Down Expand Up @@ -240,14 +235,11 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, request.source()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Index index = indexShard.prepareIndex(sourceToParse, request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.index(index);
} else {
Engine.Create create = indexShard.prepareCreate(sourceToParse)
.version(request.version()).versionType(request.versionType())
.origin(Engine.Operation.Origin.REPLICA);
Engine.Create create = indexShard.prepareCreate(sourceToParse,
request.version(), request.versionType(), Engine.Operation.Origin.REPLICA);
indexShard.create(create);
}
if (request.refresh()) {
Expand Down

0 comments on commit 019fb4c

Please sign in to comment.