Skip to content

Commit

Permalink
PHOENIX-7316 Need close more Statements
Browse files Browse the repository at this point in the history
  • Loading branch information
chaijunjie0101 committed May 19, 2024
1 parent 0abdcdb commit 1c258d1
Show file tree
Hide file tree
Showing 8 changed files with 354 additions and 323 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1342,9 +1342,10 @@ public ExecutableShowTablesStatement(String schema, String pattern) {
@Override
public QueryPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction)
throws SQLException {
PreparedStatement delegateStmt = QueryUtil.getTablesStmt(stmt.getConnection(), null,
getTargetSchema(), getDbPattern(), null);
return ((PhoenixPreparedStatement) delegateStmt).compileQuery();
try (PreparedStatement delegateStmt = QueryUtil.getTablesStmt(stmt.getConnection(), null,
getTargetSchema(), getDbPattern(), null)) {
return ((PhoenixPreparedStatement) delegateStmt).compileQuery();
}
}
}

Expand All @@ -1355,9 +1356,10 @@ private static class ExecutableShowSchemasStatement extends ShowSchemasStatement

@Override
public QueryPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
PreparedStatement delegateStmt =
QueryUtil.getSchemasStmt(stmt.getConnection(), null, getSchemaPattern());
return ((PhoenixPreparedStatement) delegateStmt).compileQuery();
try (PreparedStatement delegateStmt =
QueryUtil.getSchemasStmt(stmt.getConnection(), null, getSchemaPattern())) {
return ((PhoenixPreparedStatement) delegateStmt).compileQuery();
}
}
}

Expand All @@ -1371,9 +1373,10 @@ public ExecutableShowCreateTable(TableName tableName) {
@Override
public QueryPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction)
throws SQLException {
PreparedStatement delegateStmt = QueryUtil.getShowCreateTableStmt(stmt.getConnection(), null,
getTableName());
return ((PhoenixPreparedStatement) delegateStmt).compileQuery();
try (PreparedStatement delegateStmt = QueryUtil.getShowCreateTableStmt(stmt.getConnection(), null,
getTableName())) {
return ((PhoenixPreparedStatement) delegateStmt).compileQuery();
}
}
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,11 @@ public static List<Mutation> getMutationsForAddTask(
try (PhoenixConnection newConnection =
QueryUtil.getConnectionOnServer(curConn.getClientInfo(), conf)
.unwrap(PhoenixConnection.class)) {
PreparedStatement statement = addTaskAndGetStatement(
systemTaskParams, newConnection);
return executeStatementAndGetTaskMutations(newConnection,
statement);
try (PreparedStatement statement = addTaskAndGetStatement(
systemTaskParams, newConnection)) {
return executeStatementAndGetTaskMutations(newConnection,
statement);
}
}
}

Expand Down Expand Up @@ -202,14 +203,14 @@ public static MetaDataMutationResult taskMetaDataCoprocessorExec(

private static List<TaskRecord> populateTasks(Connection connection, String taskQuery)
throws SQLException {
PreparedStatement taskStatement = connection.prepareStatement(taskQuery);
ResultSet rs = taskStatement.executeQuery();

List<TaskRecord> result = new ArrayList<>();
while (rs.next()) {
// delete child views only if the parent table is deleted from the system catalog
TaskRecord taskRecord = parseResult(rs);
result.add(taskRecord);
try (PreparedStatement taskStatement = connection.prepareStatement(taskQuery);
ResultSet rs = taskStatement.executeQuery()) {
while (rs.next()) {
// delete child views only if the parent table is deleted from the system catalog
TaskRecord taskRecord = parseResult(rs);
result.add(taskRecord);
}
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ private void createTable(Connection conn, String table) throws SQLException {
// tables created as transactional tables, make these table non
// transactional
PhoenixDatabaseMetaData.TRANSACTIONAL + "=" + Boolean.FALSE;
PreparedStatement stmt = conn.prepareStatement(ddl);
stmt.execute();
try (PreparedStatement stmt = conn.prepareStatement(ddl)) {
stmt.execute();
}
}

@Override
Expand Down Expand Up @@ -288,8 +289,9 @@ public void putMetrics(MetricsRecord record) {
LOGGER.trace("Logging metrics to phoenix table via: " + stmt);
LOGGER.trace("With tags: " + variableValues);
}
PreparedStatement ps = null;
try {
PreparedStatement ps = conn.prepareStatement(stmt);
ps = conn.prepareStatement(stmt);
// add everything that wouldn't/may not parse
int index = 1;
for (String tag : variableValues) {
Expand All @@ -304,6 +306,14 @@ public void putMetrics(MetricsRecord record) {
} catch (SQLException e) {
LOGGER.error("Could not write metric: \n" + record + " to prepared statement:\n" + stmt,
e);
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
LOGGER.error("Close statement failed.", e);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.phoenix.trace;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
Expand Down Expand Up @@ -88,90 +89,89 @@ public Collection<TraceHolder> readAll(int limit) throws SQLException {
+ " ORDER BY " + MetricInfo.TRACE.columnName + " DESC, "
+ MetricInfo.START.columnName + " ASC" + " LIMIT " + pageSize;
int resultCount = 0;
ResultSet results = conn.prepareStatement(query).executeQuery();
TraceHolder trace = null;
// the spans that are not the root span, but haven't seen their parent yet
List<SpanInfo> orphans = null;
while (results.next()) {
int index = 1;
long traceid = results.getLong(index++);
long parent = results.getLong(index++);
long span = results.getLong(index++);
String desc = results.getString(index++);
long start = results.getLong(index++);
long end = results.getLong(index++);
String host = results.getString(index++);
int tagCount = results.getInt(index++);
int annotationCount = results.getInt(index++);
// we have a new trace
if (trace == null || traceid != trace.traceid) {
// only increment if we are on a new trace, to ensure we get at least one
if (trace != null) {
resultCount++;
}
// we beyond the limit, so we stop
if (resultCount >= limit) {
break;
try (PreparedStatement stmt = conn.prepareStatement(query);
ResultSet results = stmt.executeQuery()) {
TraceHolder trace = null;
// the spans that are not the root span, but haven't seen their parent yet
List<SpanInfo> orphans = null;
while (results.next()) {
int index = 1;
long traceid = results.getLong(index++);
long parent = results.getLong(index++);
long span = results.getLong(index++);
String desc = results.getString(index++);
long start = results.getLong(index++);
long end = results.getLong(index++);
String host = results.getString(index++);
int tagCount = results.getInt(index++);
int annotationCount = results.getInt(index++);
// we have a new trace
if (trace == null || traceid != trace.traceid) {
// only increment if we are on a new trace, to ensure we get at least one
if (trace != null) {
resultCount++;
}
// we beyond the limit, so we stop
if (resultCount >= limit) {
break;
}
trace = new TraceHolder();
// add the orphans, so we can track them later
orphans = new ArrayList<SpanInfo>();
trace.orphans = orphans;
trace.traceid = traceid;
traces.add(trace);
}
trace = new TraceHolder();
// add the orphans, so we can track them later
orphans = new ArrayList<SpanInfo>();
trace.orphans = orphans;
trace.traceid = traceid;
traces.add(trace);
}

// search the spans to determine the if we have a known parent
SpanInfo parentSpan = null;
if (parent != Span.ROOT_SPAN_ID) {
// find the parent
for (SpanInfo p : trace.spans) {
if (p.id == parent) {
parentSpan = p;
break;
// search the spans to determine the if we have a known parent
SpanInfo parentSpan = null;
if (parent != Span.ROOT_SPAN_ID) {
// find the parent
for (SpanInfo p : trace.spans) {
if (p.id == parent) {
parentSpan = p;
break;
}
}
}
}
SpanInfo spanInfo =
new SpanInfo(parentSpan, parent, span, desc, start, end, host, tagCount,
annotationCount);
// search the orphans to see if this is the parent id

for (int i = 0; i < orphans.size(); i++) {
SpanInfo orphan = orphans.get(i);
// we found the parent for the orphan
if (orphan.parentId == span) {
// update the bi-directional relationship
orphan.parent = spanInfo;
spanInfo.children.add(orphan);
// / its no longer an orphan
LOGGER.trace(addCustomAnnotations("Found parent for span: " + span));
orphans.remove(i--);
SpanInfo spanInfo =
new SpanInfo(parentSpan, parent, span, desc, start, end, host, tagCount,
annotationCount);
// search the orphans to see if this is the parent id

for (int i = 0; i < orphans.size(); i++) {
SpanInfo orphan = orphans.get(i);
// we found the parent for the orphan
if (orphan.parentId == span) {
// update the bi-directional relationship
orphan.parent = spanInfo;
spanInfo.children.add(orphan);
// / its no longer an orphan
LOGGER.trace(addCustomAnnotations("Found parent for span: " + span));
orphans.remove(i--);
}
}
}

if (parentSpan != null) {
// add this as a child to the parent span
parentSpan.children.add(spanInfo);
} else if (parent != Span.ROOT_SPAN_ID) {
// add the span to the orphan pile to check for the remaining spans we see
LOGGER.info(addCustomAnnotations("No parent span found for span: " + span + " (root span id: "
+ Span.ROOT_SPAN_ID + ")"));
orphans.add(spanInfo);
}
if (parentSpan != null) {
// add this as a child to the parent span
parentSpan.children.add(spanInfo);
} else if (parent != Span.ROOT_SPAN_ID) {
// add the span to the orphan pile to check for the remaining spans we see
LOGGER.info(addCustomAnnotations("No parent span found for span: " + span + " (root span id: "
+ Span.ROOT_SPAN_ID + ")"));
orphans.add(spanInfo);
}

// add the span to the full known list
trace.spans.add(spanInfo);
// add the span to the full known list
trace.spans.add(spanInfo);

// go back and find the tags for the row
spanInfo.tags.addAll(getTags(traceid, parent, span, tagCount));
// go back and find the tags for the row
spanInfo.tags.addAll(getTags(traceid, parent, span, tagCount));

spanInfo.annotations.addAll(getAnnotations(traceid, parent, span, annotationCount));
spanInfo.annotations.addAll(getAnnotations(traceid, parent, span, annotationCount));
}
}

// make sure we clean up after ourselves
results.close();

return traces;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,9 @@ private void addToBatch(Span span) {
LOGGER.trace("Logging metrics to phoenix table via: " + stmt);
LOGGER.trace("With tags: " + variableValues);
}
PreparedStatement ps = null;
try {
PreparedStatement ps = conn.prepareStatement(stmt);
ps = conn.prepareStatement(stmt);
// add everything that wouldn't/may not parse
int index = 1;
for (String tag : variableValues) {
Expand All @@ -238,6 +239,14 @@ private void addToBatch(Span span) {
} catch (SQLException e) {
LOGGER.error("Could not write metric: \n" + span + " to prepared statement:\n" + stmt,
e);
} finally {
if (ps != null) {
try {
ps.close();
} catch (SQLException e) {
LOGGER.error("Close statement failed.", e);
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,20 +210,21 @@ public Map<String, PhoenixAsyncIndex> getCandidateJobs() throws SQLException {
public Map<String, PhoenixAsyncIndex> getCandidateJobs(Connection con) throws SQLException {
Properties props = new Properties();
UpgradeUtil.doNotUpgradeOnFirstConnection(props);
Statement s = con.createStatement();
ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY);
Map<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<String, PhoenixAsyncIndex>();
while (rs.next()) {
PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex();
indexInfo.setIndexType(IndexType.fromSerializedValue(rs
.getByte(PhoenixDatabaseMetaData.INDEX_TYPE)));
indexInfo.setDataTableName(rs.getString(PhoenixDatabaseMetaData.DATA_TABLE_NAME));
indexInfo.setTableSchem(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
indexInfo.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
candidateIndexes.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
indexInfo.getTableSchem(), indexInfo.getDataTableName(), indexInfo.getTableName()), indexInfo);
Map<String, PhoenixAsyncIndex> candidateIndexes = new HashMap<>();
try (Statement s = con.createStatement();
ResultSet rs = s.executeQuery(CANDIDATE_INDEX_INFO_QUERY)) {
while (rs.next()) {
PhoenixAsyncIndex indexInfo = new PhoenixAsyncIndex();
indexInfo.setIndexType(IndexType.fromSerializedValue(rs
.getByte(PhoenixDatabaseMetaData.INDEX_TYPE)));
indexInfo.setDataTableName(rs.getString(PhoenixDatabaseMetaData.DATA_TABLE_NAME));
indexInfo.setTableSchem(rs.getString(PhoenixDatabaseMetaData.TABLE_SCHEM));
indexInfo.setTableName(rs.getString(PhoenixDatabaseMetaData.TABLE_NAME));
candidateIndexes.put(String.format(IndexTool.INDEX_JOB_NAME_TEMPLATE,
indexInfo.getTableSchem(), indexInfo.getDataTableName(),
indexInfo.getTableName()), indexInfo);
}
}

return candidateIndexes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,16 +466,18 @@ public Boolean call() {
// In case of disable index on failure policy, INDEX will be in PENDING_DISABLE on first retry
// but will
// become active if retry is successfull
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)");
stmt.setString(1, "b");
stmt.setString(2, "y");
stmt.setString(3, "2");
stmt.execute();
if (!leaveIndexActiveOnFailure && !transactional) {
FailingRegionObserver.FAIL_WRITE = true;
FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = true;
try (PreparedStatement stmt
= conn.prepareStatement("UPSERT INTO " + tableName + " VALUES(?,?,?)")) {
stmt.setString(1, "b");
stmt.setString(2, "y");
stmt.setString(3, "2");
stmt.execute();
if (!leaveIndexActiveOnFailure && !transactional) {
FailingRegionObserver.FAIL_WRITE = true;
FailingRegionObserver.TOGGLE_FAIL_WRITE_FOR_RETRY = true;
}
conn.commit();
}
conn.commit();
} catch (SQLException e) {
LOGGER.warn("Error while adding row", e);
return false;
Expand Down

0 comments on commit 1c258d1

Please sign in to comment.