-
Notifications
You must be signed in to change notification settings - Fork 5
/
CouchbaseWeatherSource.java
259 lines (245 loc) · 11 KB
/
CouchbaseWeatherSource.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
/*
* © 2021. TU Dortmund University,
* Institute of Energy Systems, Energy Efficiency and Energy Economics,
* Research group Distribution grid planning and operation
*/
package edu.ie3.datamodel.io.source.couchbase;
import com.couchbase.client.core.error.DecodingFailureException;
import com.couchbase.client.core.error.DocumentNotFoundException;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.GetResult;
import com.couchbase.client.java.query.QueryResult;
import edu.ie3.datamodel.exceptions.SourceException;
import edu.ie3.datamodel.io.connectors.CouchbaseConnector;
import edu.ie3.datamodel.io.factory.timeseries.TimeBasedWeatherValueData;
import edu.ie3.datamodel.io.factory.timeseries.TimeBasedWeatherValueFactory;
import edu.ie3.datamodel.io.source.IdCoordinateSource;
import edu.ie3.datamodel.io.source.WeatherSource;
import edu.ie3.datamodel.models.timeseries.individual.IndividualTimeSeries;
import edu.ie3.datamodel.models.timeseries.individual.TimeBasedValue;
import edu.ie3.datamodel.models.value.WeatherValue;
import edu.ie3.datamodel.utils.Try;
import edu.ie3.util.interval.ClosedInterval;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.locationtech.jts.geom.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Couchbase Source for weather data */
public class CouchbaseWeatherSource extends WeatherSource {
private static final Logger logger = LoggerFactory.getLogger(CouchbaseWeatherSource.class);
private static final String DEFAULT_TIMESTAMP_PATTERN = "yyyy-MM-dd'T'HH:mm:ssxxx";
/** The start of the document key, comparable to a table name in relational databases */
private static final String DEFAULT_KEY_PREFIX = "weather";
private final String keyPrefix;
private final CouchbaseConnector connector;
private final String coordinateIdColumnName;
private final String timeStampPattern;
/**
* Instantiate a weather source utilising a connection to a couchbase instance obtained via the
* connector. This convenient constructor uses {@link CouchbaseWeatherSource#DEFAULT_KEY_PREFIX}
* as key prefix.
*
* @param connector Connector, that establishes the connection to the couchbase instance
* @param coordinateSource Source to obtain actual coordinates from
* @param coordinateIdColumnName Name of the column containing the information about the
* coordinate identifier
* @param weatherFactory Factory to transfer field to value mapping into actual java object
* instances
* @param timeStampPattern Pattern of time stamps to parse
*/
public CouchbaseWeatherSource(
CouchbaseConnector connector,
IdCoordinateSource coordinateSource,
String coordinateIdColumnName,
TimeBasedWeatherValueFactory weatherFactory,
String timeStampPattern)
throws SourceException {
this(
connector,
coordinateSource,
coordinateIdColumnName,
DEFAULT_KEY_PREFIX,
weatherFactory,
timeStampPattern);
}
/**
* Instantiate a weather source utilising a connection to a couchbase instance obtained via the
* connector
*
* @param connector Connector, that establishes the connection to the couchbase instance
* @param idCoordinateSource Source to obtain actual coordinates from
* @param coordinateIdColumnName Name of the column containing the information about the
* coordinate identifier
* @param keyPrefix Prefix of entries, that belong to weather
* @param weatherFactory Factory to transfer field to value mapping into actual java object
* instances
* @param timeStampPattern Pattern of time stamps to parse
*/
public CouchbaseWeatherSource(
CouchbaseConnector connector,
IdCoordinateSource idCoordinateSource,
String coordinateIdColumnName,
String keyPrefix,
TimeBasedWeatherValueFactory weatherFactory,
String timeStampPattern)
throws SourceException {
super(idCoordinateSource, weatherFactory);
this.connector = connector;
this.coordinateIdColumnName = coordinateIdColumnName;
this.keyPrefix = keyPrefix;
this.timeStampPattern = timeStampPattern;
// validating
Try.of(() -> getSourceFields(WeatherValue.class), SourceException.class)
.flatMap(
fieldsOpt ->
fieldsOpt
.map(
fields ->
weatherFactory
.validate(fields, WeatherValue.class)
.transformF(SourceException::new))
.orElse(Try.Success.empty()))
.getOrThrow();
}
@Override
public <C extends WeatherValue> Optional<Set<String>> getSourceFields(Class<C> entityClass) {
return connector.getSourceFields();
}
@Override
public Map<Point, IndividualTimeSeries<WeatherValue>> getWeather(
ClosedInterval<ZonedDateTime> timeInterval) {
logger.warn(
"By not providing coordinates you are forcing couchbase to check all possible coordinates one by one."
+ " This is not very performant. Please consider providing specific coordinates instead.");
return getWeather(timeInterval, idCoordinateSource.getAllCoordinates());
}
@Override
public Map<Point, IndividualTimeSeries<WeatherValue>> getWeather(
ClosedInterval<ZonedDateTime> timeInterval, Collection<Point> coordinates) {
HashMap<Point, IndividualTimeSeries<WeatherValue>> coordinateToTimeSeries = new HashMap<>();
for (Point coordinate : coordinates) {
Optional<Integer> coordinateId = idCoordinateSource.getId(coordinate);
if (coordinateId.isPresent()) {
String query = createQueryStringForIntervalAndCoordinate(timeInterval, coordinateId.get());
CompletableFuture<QueryResult> futureResult = connector.query(query);
QueryResult queryResult = futureResult.join();
List<JsonObject> jsonWeatherInputs = Collections.emptyList();
try {
jsonWeatherInputs = queryResult.rowsAsObject();
} catch (DecodingFailureException ex) {
logger.error("Querying weather inputs failed!", ex);
}
if (jsonWeatherInputs != null && !jsonWeatherInputs.isEmpty()) {
Set<TimeBasedValue<WeatherValue>> weatherInputs =
jsonWeatherInputs.stream()
.map(this::toTimeBasedWeatherValue)
.flatMap(Optional::stream)
.collect(Collectors.toSet());
IndividualTimeSeries<WeatherValue> weatherTimeSeries =
new IndividualTimeSeries<>(weatherInputs);
coordinateToTimeSeries.put(coordinate, weatherTimeSeries);
}
} else logger.warn("Unable to match coordinate {} to a coordinate ID", coordinate);
}
return coordinateToTimeSeries;
}
@Override
public Optional<TimeBasedValue<WeatherValue>> getWeather(ZonedDateTime date, Point coordinate) {
Optional<Integer> coordinateId = idCoordinateSource.getId(coordinate);
if (coordinateId.isEmpty()) {
logger.warn("Unable to match coordinate {} to a coordinate ID", coordinate);
return Optional.empty();
}
try {
CompletableFuture<GetResult> futureResult =
connector.get(generateWeatherKey(date, coordinateId.get()));
GetResult getResult = futureResult.join();
JsonObject jsonWeatherInput = getResult.contentAsObject();
return toTimeBasedWeatherValue(jsonWeatherInput);
} catch (DecodingFailureException ex) {
logger.error("Decoding to TimeBasedWeatherValue failed!", ex);
return Optional.empty();
} catch (DocumentNotFoundException ex) {
return Optional.empty();
} catch (CompletionException ex) {
if (ex.getCause() instanceof DocumentNotFoundException) return Optional.empty();
else throw ex;
}
}
/**
* Generates a key for weather documents with the pattern: {@code
* weather::<coordinate_id>::<time>}
*
* @param time the timestamp for the weather data
* @param coordinateId the coordinate Id of the weather data
* @return a weather document key
*/
public String generateWeatherKey(ZonedDateTime time, Integer coordinateId) {
String key = keyPrefix + "::";
key += coordinateId + "::";
key += time.format(DateTimeFormatter.ofPattern(timeStampPattern));
return key;
}
/**
* Create a query string to search for documents for the given coordinate in the given time
* interval by querying a range of document keys
*
* @param timeInterval the time interval for which the documents are queried
* @param coordinateId the coordinate ID for which the documents are queried
* @return the query string
*/
public String createQueryStringForIntervalAndCoordinate(
ClosedInterval<ZonedDateTime> timeInterval, int coordinateId) {
String basicQuery =
"SELECT " + connector.getBucketName() + ".* FROM " + connector.getBucketName();
String whereClause =
" WHERE META().id >= '" + generateWeatherKey(timeInterval.getLower(), coordinateId);
whereClause +=
"' AND META().id <= '" + generateWeatherKey(timeInterval.getUpper(), coordinateId) + "'";
return basicQuery + whereClause;
}
/**
* Converts a JsonObject into TimeBasedWeatherValueData by extracting all fields into a field to
* value map and then removing the coordinate from it to supply as a parameter
*
* @param jsonObj the JsonObject to convert
* @return the Data object
*/
private Optional<TimeBasedWeatherValueData> toTimeBasedWeatherValueData(JsonObject jsonObj) {
Integer coordinateId = jsonObj.getInt(coordinateIdColumnName);
jsonObj.removeKey(coordinateIdColumnName);
Optional<Point> coordinate = idCoordinateSource.getCoordinate(coordinateId);
if (coordinate.isEmpty()) {
logger.warn("Unable to match coordinate ID {} to a coordinate", coordinateId);
return Optional.empty();
}
Map<String, String> fieldToValueMap =
jsonObj.toMap().entrySet().stream()
.collect(
Collectors.toMap(Map.Entry::getKey, entry -> String.valueOf(entry.getValue())));
fieldToValueMap.putIfAbsent("uuid", UUID.randomUUID().toString());
return Optional.of(new TimeBasedWeatherValueData(fieldToValueMap, coordinate.get()));
}
/**
* Converts a JsonObject into a time based weather value by converting it to a
* TimeBasedWeatherValueData first, then using the TimeBasedWeatherValueFactory to create an
* entity
*
* @param jsonObj the JsonObject to convert
* @return an optional weather value
*/
public Optional<TimeBasedValue<WeatherValue>> toTimeBasedWeatherValue(JsonObject jsonObj) {
Optional<TimeBasedWeatherValueData> data = toTimeBasedWeatherValueData(jsonObj);
if (data.isEmpty()) {
logger.warn("Unable to parse json object");
logger.debug("The following json could not be parsed:\n{}", jsonObj);
return Optional.empty();
}
return weatherFactory.get(data.get()).getData();
}
}