-
Notifications
You must be signed in to change notification settings - Fork 215
/
AbstractWriteModel.java
135 lines (117 loc) · 3.98 KB
/
AbstractWriteModel.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/*
* Copyright (c) 2019 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.model;
import java.time.Duration;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.conversions.Bson;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.updater.actors.MongoWriteModel;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.WriteModel;
import akka.pattern.Patterns;
/**
* Interface for write models of Thing changes for MongoDB.
*/
public abstract class AbstractWriteModel {
/**
* MongoDB operator for setting a field.
*/
public static final String SET = "$set";
private final Metadata metadata;
/**
* Initialize metadata of the write model.
*
* @param metadata the metadata.
*/
protected AbstractWriteModel(final Metadata metadata) {
this.metadata = metadata;
}
/**
* Convert this description of a Thing search index change into a MongoDB write model.
*
* @return MongoDB write model.
*/
public abstract WriteModel<BsonDocument> toMongo();
/**
* Convert this into a MongoDB write model taking previous updates cached at the origin into consideration.
*
* @return either the MongoDB write model of this object or an incremental update converting
*/
public final CompletionStage<Optional<MongoWriteModel>> toIncrementalMongo() {
final var origin = metadata.getOrigin();
if (origin.isPresent()) {
return Patterns.ask(origin.orElseThrow(), this, Duration.ofSeconds(10L))
.thenApply(answer -> {
if (answer instanceof MongoWriteModel mongoWriteModel) {
return Optional.of(mongoWriteModel);
} else {
return Optional.empty();
}
});
} else {
return CompletableFuture.completedStage(Optional.of(MongoWriteModel.of(this, toMongo(), false)));
}
}
// TODO
public Optional<MongoWriteModel> toIncrementalMongo(@Nullable final AbstractWriteModel previousWriteModel) {
return Optional.of(MongoWriteModel.of(this, toMongo(), false));
}
/**
* @return Metadata of this write model.
*/
public Metadata getMetadata() {
return metadata;
}
/**
* Get the filter of this write model.
*
* @return filter on search index documents.
*/
public Bson getFilter() {
return Filters.eq(PersistenceConstants.FIELD_ID, new BsonString(metadata.getThingId().toString()));
}
/**
* Check whether this update is a patch update based on a specific sequence number.
*
* @return Whether this is a patch update.
*/
public boolean isPatchUpdate() {
return false;
}
@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AbstractWriteModel that = (AbstractWriteModel) o;
return Objects.equals(metadata, that.metadata);
}
@Override
public int hashCode() {
return Objects.hash(metadata);
}
@Override
public String toString() {
return getClass().getSimpleName() + "[" +
"metadata=" + metadata;
}
}