-
Notifications
You must be signed in to change notification settings - Fork 5
/
SqlConnector.java
199 lines (183 loc) · 6.8 KB
/
SqlConnector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
/*
* © 2021. TU Dortmund University,
* Institute of Energy Systems, Energy Efficiency and Energy Economics,
* Research group Distribution grid planning and operation
*/
package edu.ie3.datamodel.io.connectors;
import edu.ie3.datamodel.exceptions.InvalidColumnNameException;
import edu.ie3.util.StringUtils;
import edu.ie3.util.TimeUtil;
import java.sql.*;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements a DataConnector for a native SQL connection to a relational database. It was
* implemented with a PostgreSQL database in mind, so there might be dialect issues if used with
* other databases.
*/
public class SqlConnector implements DataConnector {
public static final Logger log = LoggerFactory.getLogger(SqlConnector.class);
private final String jdbcUrl;
private final Properties connectionProps;
private Connection connection;
/**
* Initializes a SqlConnector with the given JDBC url, username, password and time util
*
* @param jdbcUrl the JDBC url, should start with "jdbc:postgresql://" and contain the database
* name
* @param userName Name of the role used for authentication
* @param password Password for the role
*/
public SqlConnector(String jdbcUrl, String userName, String password) {
this.jdbcUrl = jdbcUrl;
// setup properties
this.connectionProps = new Properties();
connectionProps.put("user", userName);
connectionProps.put("password", password);
}
/**
* Executes the given query. For update queries please use {@link
* SqlConnector#executeUpdate(String)}.
*
* @param stmt the created statement
* @param query the query to execute
* @return the query result
* @throws SQLException if the execution fails
*/
public ResultSet executeQuery(Statement stmt, String query) throws SQLException {
try {
return stmt.executeQuery(query);
} catch (SQLException e) {
throw new SQLException(String.format("Error at execution of query \"%1.127s\": ", query), e);
}
}
/**
* Executes an update query
*
* @param updateQuery the query to execute
* @return The number of updates or a negative number if the execution failed
*/
public int executeUpdate(String updateQuery) {
try (Statement stmt = getConnection().createStatement()) {
return stmt.executeUpdate(updateQuery);
} catch (SQLException e) {
log.error(String.format("Error at execution of query \"%1.127s\": ", updateQuery), e);
return -1;
}
}
/**
* Establishes and returns a database connection. If a connection has not been established yet, a
* new one is created.
*
* @return the connection object
* @throws SQLException if the connection could not be established
*/
public Connection getConnection() throws SQLException {
return getConnection(true);
}
/**
* Establishes and returns a database connection
*
* @param reuseConnection should the connection be used again, if it is still valid? If not, a new
* connection will be established
* @return the connection object
* @throws SQLException if the connection could not be established
*/
public Connection getConnection(boolean reuseConnection) throws SQLException {
if (!reuseConnection || connection == null || connection.isClosed()) {
try {
if (connection != null) connection.close();
connection = DriverManager.getConnection(jdbcUrl, connectionProps);
} catch (SQLException e) {
throw new SQLException("Could not establish connection: ", e);
}
}
return connection;
}
@Override
public void shutdown() {
try {
if (Objects.nonNull(connection)) connection.close();
} catch (SQLException throwables) {
log.error("Unable to close connection '{}' during shutdown.", connection, throwables);
}
}
/**
* Extracts all field to value maps from the ResultSet, one for each row
*
* @param rs the ResultSet to use
* @return a list of field maps
*/
public List<Map<String, String>> extractFieldMaps(ResultSet rs) {
List<Map<String, String>> fieldMaps = new ArrayList<>();
try {
while (rs.next()) {
fieldMaps.add(extractFieldMap(rs));
}
} catch (SQLException e) {
log.error("Exception at extracting ResultSet: ", e);
}
return fieldMaps;
}
/**
* Extracts only the current row of the ResultSet into a field to value map
*
* @param rs the ResultSet to use
* @return the field map for the current row
*/
public Map<String, String> extractFieldMap(ResultSet rs) {
TreeMap<String, String> insensitiveFieldsToAttributes =
new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
try {
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
String columnName = StringUtils.snakeCaseToCamelCase(metaData.getColumnName(i));
String value;
Object result = rs.getObject(i);
if (result instanceof Timestamp) {
value = TimeUtil.withDefaults.toString(rs.getTimestamp(i).toInstant());
} else {
value = String.valueOf(rs.getObject(i));
}
insensitiveFieldsToAttributes.put(columnName, value);
}
} catch (SQLException e) {
log.error("Exception at extracting ResultSet: ", e);
}
return insensitiveFieldsToAttributes;
}
/**
* Determine the corresponding database column name based on the provided factory field parameter
* name. Needed to support camel as well as snake case database column names.
*
* @param factoryColumnName the name of the field parameter set in the entity factory
* @param tableName the table name where the data is stored
* @return the column name that corresponds to the provided field parameter or an empty optional
* if no matching column can be found
*/
public String getDbColumnName(String factoryColumnName, String tableName) {
try {
ResultSet rs = getConnection().getMetaData().getColumns(null, null, tableName, null);
while (rs.next()) {
String databaseColumnName = rs.getString("COLUMN_NAME");
if (StringUtils.snakeCaseToCamelCase(databaseColumnName)
.equalsIgnoreCase(factoryColumnName)) {
return databaseColumnName;
}
}
} catch (SQLException ex) {
log.error(
"Cannot connect to database to retrieve db column name for factory column name '{}' in table '{}'",
factoryColumnName,
tableName,
ex);
}
throw new InvalidColumnNameException(
"Cannot find column for '"
+ factoryColumnName
+ "' in provided times series data configuration."
+ "Please ensure that the database connection is working and the column names are correct!");
}
}