-
Notifications
You must be signed in to change notification settings - Fork 5
/
InfluxDbConnector.java
258 lines (233 loc) · 10 KB
/
InfluxDbConnector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
/*
* © 2021. TU Dortmund University,
* Institute of Energy Systems, Energy Efficiency and Energy Economics,
* Research group Distribution grid planning and operation
*/
package edu.ie3.datamodel.io.connectors;
import java.util.*;
import java.util.function.BinaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.influxdb.BatchOptions;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
/**
* Implements a DataConnector for InfluxDB. InfluxDB is a time series database and as such, it can
* only handle time based data. <br>
* Entities will be persisted as <i>measurement points</i>, which consist of a time, one or more
* tags, one or more values and a measurement name. In contrast to values, tags should only contain
* metadata. A measurement name is the equivalent of the name of a table in relational data models.
*/
public class InfluxDbConnector implements DataConnector {
/* default scenario parameter when no scenario is provided */
public static final String NO_SCENARIO = "no_scenario";
/* default scenario parameter for weather data */
public static final String WEATHER_SCENARIO = "weather_scenario";
/** Merges two sets of (fieldName to fieldValue) maps */
private static final BinaryOperator<Set<Map<String, String>>> mergeSets =
(maps, maps2) -> {
maps.addAll(maps2);
return maps;
};
private final String databaseName;
private final String scenarioName;
private final InfluxDB session;
/**
* Initializes a new InfluxDbConnector with the given url, databaseName and scenario name.
*
* @param url the connection url for the influxDB database
* @param scenarioName the name of the simulation scenario which will be used in influxDB
* measurement names
* @param databaseName the name of the database the session should be set to
* @param createDb true if the connector should create the database if it doesn't exist yet, false
* otherwise
* @param logLevel log level of the {@link org.influxdb.InfluxDB.LogLevel} logger
* @param batchOptions write options to write batch operations
*/
public InfluxDbConnector(
String url,
String databaseName,
String scenarioName,
boolean createDb,
InfluxDB.LogLevel logLevel,
BatchOptions batchOptions) {
this(InfluxDBFactory.connect(url), scenarioName, databaseName, createDb);
this.session.setLogLevel(logLevel);
this.session.enableBatch(batchOptions);
}
/**
* Initializes a new InfluxDbConnector with the given influxDb session, the databaseName and
* scenario name.
*
* @param session the influxdb session that should be managed by this connector
* @param scenarioName the name of the scenario
* @param databaseName the name of the database the session should be set to
* @param createDb true if the connector should create the database if it doesn't exist yet, false
* otherwise
*/
public InfluxDbConnector(
InfluxDB session, String scenarioName, String databaseName, boolean createDb) {
this.databaseName = databaseName;
this.scenarioName = scenarioName;
this.session = session;
session.setDatabase(databaseName);
if (createDb) createDb(databaseName);
}
/**
* Initializes a new InfluxDbConnector with the given url, databaseName and scenario name.
*
* @param url the connection url for the influxDB database
* @param databaseName the name of the database to which the connection should be established
* @param scenarioName the name of the simulation scenario which will be used in influxDB
* measurement names
*/
public InfluxDbConnector(String url, String databaseName, String scenarioName) {
this(url, databaseName, scenarioName, true, InfluxDB.LogLevel.NONE, BatchOptions.DEFAULTS);
}
/**
* Initializes a new InfluxDbConnector with the given url, databaseName and {@link #NO_SCENARIO}
* as scenario parameter
*
* @param url the connection url for the influxDB database
* @param databaseName the name of the database to which the connection should be established
*/
public InfluxDbConnector(String url, String databaseName) {
this(url, databaseName, NO_SCENARIO, true, InfluxDB.LogLevel.NONE, BatchOptions.DEFAULTS);
}
/** Returns the option for fields found in the source. */
public <C> Optional<Set<String>> getSourceFields(Class<C> entityClass) {
QueryResult tagKeys = session.query(new Query("SHOW TAG KEYS ON " + databaseName));
Map<String, Set<Map<String, String>>> tagResults = parseQueryResult(tagKeys);
QueryResult fieldKeys = session.query(new Query("SHOW FIELD KEYS ON " + databaseName));
Map<String, Set<Map<String, String>>> fieldResults = parseQueryResult(fieldKeys);
Set<String> set = new HashSet<>();
tagResults.values().forEach(v -> v.stream().map(m -> m.get("tagKey")).forEach(set::add));
fieldResults.values().forEach(v -> v.stream().map(m -> m.get("fieldKey")).forEach(set::add));
return Optional.of(set);
}
/**
* Create the database of this connector if it doesn't exist yet
*
* @param databaseName the name of the database that should be created
* @return the result of the create database query
*/
public final QueryResult createDb(String databaseName) {
return session.query(new Query("CREATE DATABASE " + databaseName, databaseName));
}
/**
* Checks if the given connection parameters are valid, so that a connection can be established
*
* @return true, if the database returned the ping
*/
public Boolean isConnectionValid() {
Pong response = session.ping();
session.close();
return !response.getVersion().equalsIgnoreCase("unknown");
}
@Override
public void shutdown() {
session.close(); // release async writing resources and flushes the batch if batch is enabled
// (blocking!)
}
/**
* Return the session of this connector
*
* @return influx db session
*/
public InfluxDB getSession() {
return session;
}
public String getScenarioName() {
return scenarioName;
}
/**
* Parses the result of an influxQL query for all measurements (e.g. weather)
*
* @param queryResult Result of an influxDB query
* @return Map of (measurement name to Set of maps of (field name : field value)) for each result
* entity)
*/
public static Map<String, Set<Map<String, String>>> parseQueryResult(QueryResult queryResult) {
return parseQueryResult(queryResult, new String[0]);
}
/**
* Parses the result of one or multiple influxQL queries for the given measurements (e.g.
* weather). If no measurement names are given, all results are parsed and returned
*
* @param queryResult Result of an influxDB query
* @param measurementNames Names of measurements that should be parsed. If none are given, all
* measurements will be parsed
* @return Map of (measurement name to Set of maps of (field name : field value) for each result
* entity)
*/
public static Map<String, Set<Map<String, String>>> parseQueryResult(
QueryResult queryResult, String... measurementNames) {
HashMap<String, Set<Map<String, String>>> measurementToFields = new HashMap<>();
queryResult.getResults().stream()
.map(result -> parseResult(result, measurementNames))
.forEach(
measurementMap -> {
for (Map.Entry<String, Set<Map<String, String>>> measurementEntry :
measurementMap.entrySet()) {
if (measurementToFields.containsKey(measurementEntry.getKey())) {
measurementToFields
.get(measurementEntry.getKey())
.addAll(measurementEntry.getValue());
} else {
measurementToFields.put(measurementEntry.getKey(), measurementEntry.getValue());
}
}
});
return measurementToFields;
}
/**
* Parses the result of one influxQL query for the given measurements (e.g. weather). If no
* measurement names are given, all results are parsed and returned
*
* @param result Specific result of an influxDB query
* @param measurementNames Names of measurements that should be parsed. If none are given, all
* measurements will be parsed
* @return Map of (measurement name to Set of maps of (field name : field value) for each result
* entity)
*/
public static Map<String, Set<Map<String, String>>> parseResult(
QueryResult.Result result, String... measurementNames) {
Stream<QueryResult.Series> seriesStream = result.getSeries().stream();
if (measurementNames.length > 0) {
seriesStream =
seriesStream.filter(series -> Arrays.asList(measurementNames).contains(series.getName()));
}
return seriesStream.collect(
Collectors.toMap(QueryResult.Series::getName, InfluxDbConnector::parseSeries, mergeSets));
}
/**
* Parses the results for a single measurement series
*
* @param series A measurement series of an influxDB query result
* @return Set of maps of (field name to field value) for each result entity
*/
public static Set<Map<String, String>> parseSeries(QueryResult.Series series) {
String[] columns = series.getColumns().toArray(new String[0]);
return series.getValues().stream()
.map(valueList -> parseValueList(valueList, columns))
.collect(Collectors.toSet());
}
/**
* Parses a list of values and maps them to field names using the given column name and order
*
* @param valueList List of values, sorted by column in columns
* @param columns Array of column names
* @return Map of (field name to field value) for one result entity
*/
public static Map<String, String> parseValueList(List<?> valueList, String[] columns) {
Map<String, String> attributeMap = new HashMap<>();
Object[] valueArr = valueList.toArray();
for (int i = 0; i < columns.length; i++) {
attributeMap.put(columns[i], String.valueOf(valueArr[i]));
}
return attributeMap;
}
}