Skip to content

Commit

Permalink
[yugabyte#7811][YW] Fix Slow Queries not fetching from every node or …
Browse files Browse the repository at this point in the history
…pod.

Summary:
Slow queries is currently not displaying all queries on k8s universe pods. The reason is
due to the database only storing pg_stat_statements on the node which received the queries.
Currently we fetch queries from the first node/pod in the list, so let's fetch from all nodes and
aggregate the results. This issue should also be apparently on multi-node clusters or when queries
are run against read replicas.
**Note: I haven't tested it locally yet due to being unable to create universes, so it would be helpful to hear opinions on whether this is the best way to go about it.**

Test Plan:
Create multi-node (or k8s) universe and run queries on different nodes and check that YW displays the
information correctly. Confirm that queries on read replicas also show up.

Reviewers: arnav, wesley, sb-yb, spotachev

Reviewed By: spotachev

Subscribers: jenkins-bot, yugaware

Differential Revision: https://phabricator.dev.yugabyte.com/D11114
  • Loading branch information
Andrew Cai authored and YintongMa committed May 26, 2021
1 parent fe25c21 commit e1a257d
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 69 deletions.
92 changes: 66 additions & 26 deletions managed/src/main/java/com/yugabyte/yw/queries/QueryHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,23 +57,7 @@ public JsonNode liveQueries(Universe universe) {
}

public JsonNode slowQueries(Universe universe) {
RunQueryFormData ysqlQuery = new RunQueryFormData();
ysqlQuery.query = SLOW_QUERY_STATS_SQL;
ysqlQuery.db_name = "postgres";
JsonNode ysqlResponse = ysqlQueryExecutor.executeQuery(universe, ysqlQuery).get("result");
ArrayNode queries = Json.newArray();
if (ysqlResponse.isArray()) {
for (JsonNode queryObject : ysqlResponse) {
if (!EXCLUDED_QUERY_STATEMENTS.contains(queryObject.get("query").asText())) {
queries.add(queryObject);
}
}
}
ObjectNode ysqlJson = Json.newObject();
ysqlJson.set("queries", queries);
ObjectNode responseJson = Json.newObject();
responseJson.set("ysql", ysqlJson);
return responseJson;
return query(universe, true);
}

public JsonNode resetQueries(Universe universe) {
Expand Down Expand Up @@ -102,9 +86,9 @@ public JsonNode query(Universe universe, boolean fetchSlowQueries) {

if (fetchSlowQueries) {
callable = new SlowQueryExecutor(
node.nodeName,
ip,
node.ysqlServerHttpPort
node.ysqlServerRpcPort,
SLOW_QUERY_STATS_SQL
);
Future<JsonNode> future = threadPool.submit(callable);
futures.add(future);
Expand Down Expand Up @@ -132,6 +116,7 @@ public JsonNode query(Universe universe, boolean fetchSlowQueries) {
}

try {
Map<String, JsonNode> queryMap = new HashMap<>();
for (Future<JsonNode> future : futures) {
JsonNode response = future.get();
if (response.has("error")) {
Expand All @@ -142,13 +127,68 @@ public JsonNode query(Universe universe, boolean fetchSlowQueries) {
ycqlJson.put("errorCount", ycqlJson.get("errorCount").asInt() + 1);
}
} else {
if (response.has("ysql")) {
ArrayNode arr = (ArrayNode) ysqlJson.get("queries");
concatArrayNodes(arr, response.get("ysql"));

} else if (response.has("ycql")) {
ArrayNode arr = (ArrayNode) ycqlJson.get("queries");
concatArrayNodes(arr, response.get("ycql"));
if (fetchSlowQueries) {
JsonNode ysqlResponse = response.get("result");
for (JsonNode queryObject : ysqlResponse) {
String queryStatement = queryObject.get("query").asText();
if (!EXCLUDED_QUERY_STATEMENTS.contains(queryStatement)) {
if (queryMap.containsKey(queryStatement)) {
// Calculate new query stats
ObjectNode previousQueryObj = (ObjectNode) queryMap.get(queryStatement);
// Defining values to reuse
double X_a = previousQueryObj.get("mean_time").asDouble();
double X_b = queryObject.get("mean_time").asDouble();
int n_a = previousQueryObj.get("calls").asInt();
int n_b = queryObject.get("calls").asInt();
double S_a = previousQueryObj.get("stddev_time").asDouble();
double S_b = queryObject.get("stddev_time").asDouble();

double totalTime = previousQueryObj.get("total_time").asDouble() + queryObject.get("total_time").asDouble();
int totalCalls = n_a + n_b;
int rows = previousQueryObj.get("rows").asInt() + queryObject.get("rows").asInt();
double minTime = Math.min(previousQueryObj.get("min_time").asDouble(), queryObject.get("min_time").asDouble());
double maxTime = Math.max(previousQueryObj.get("max_time").asDouble(), queryObject.get("max_time").asDouble());
int tmpTables = previousQueryObj.get("local_blks_written").asInt() + queryObject.get("local_blks_written").asInt();
/**
* Formula to calculate std dev of two samples:
* Let mean, std dev, and size of sample A be X_a, S_a, n_a respectively; and
* mean, std dev, and size of sample B be X_b, S_b, n_b respectively.
* Then mean of combined sample X is given by
* n_a X_a + n_b X_b
* X = -----------------
* n_a + n_b
*
* The std dev of combined sample S is
* n_a ( S_a^2 + (X_a - X)^2) + n_b(S_b^2 + (X_b - X)^2)
* S = -----------------------------------------------------
* n_a + n_b
*/
double averageTime = (n_a * X_a + n_b * X_b) / totalCalls;
double stdDevTime = (n_a * (Math.pow(S_a, 2) + Math.pow(X_a - averageTime, 2)) +
n_b * (Math.pow(S_b, 2) + Math.pow(X_b - averageTime, 2))) / totalCalls;
previousQueryObj.put("total_time", totalTime);
previousQueryObj.put("calls", totalCalls);
previousQueryObj.put("rows", rows);
previousQueryObj.put("min_time", minTime);
previousQueryObj.put("max_time", maxTime);
previousQueryObj.put("mean_time", averageTime);
previousQueryObj.put("local_blks_written", tmpTables);
previousQueryObj.put("stddev_time", stdDevTime);
} else {
queryMap.put(queryStatement, queryObject);
}
}
}
ArrayNode queryArr = Json.newArray();
ysqlJson.set("queries", queryArr.addAll(queryMap.values()));
} else {
if (response.has("ysql")) {
ArrayNode arr = (ArrayNode) ysqlJson.get("queries");
concatArrayNodes(arr, response.get("ysql"));
} else if (response.has("ycql")) {
ArrayNode arr = (ArrayNode) ycqlJson.get("queries");
concatArrayNodes(arr, response.get("ycql"));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,77 +8,89 @@
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.yugabyte.yw.common.ApiHelper;
import com.yugabyte.yw.forms.RunQueryFormData;
import com.yugabyte.yw.forms.SlowQueriesParams;
import com.yugabyte.yw.models.MetricConfig;
import com.yugabyte.yw.models.Universe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import play.Configuration;
import play.api.Play;
import play.libs.Json;

import javax.inject.Singleton;
import java.sql.*;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static play.libs.Json.newObject;
import static play.libs.Json.toJson;

public class SlowQueryExecutor implements Callable<JsonNode> {
public static final Logger LOG = LoggerFactory.getLogger(LiveQueryExecutor.class);

private final ApiHelper apiHelper;
// hostname can be either IP address or DNS
private String hostName;
private String nodeName;
private int port;
private String query;

private final String DEFAULT_DB_USER = "yugabyte";
private final String DEFAULT_DB_PASSWORD = "yugabyte";

public SlowQueryExecutor(String nodeName, String hostName,
int port) {
this.nodeName = nodeName;
public SlowQueryExecutor(String hostName, int port, String query) {
this.hostName = hostName;
this.port = port;
this.query = query;
this.apiHelper = Play.current().injector().instanceOf(ApiHelper.class);
}

@Override
public JsonNode call() {
String url = String.format("http://%s:%d/statements", hostName, port);
JsonNode response = apiHelper.getRequest(url);
return processStatementData(response);
private List<Map<String, Object>> resultSetToMap(ResultSet result) throws SQLException {
List<Map<String, Object>> rows = new ArrayList<>();
ResultSetMetaData rsmd = result.getMetaData();
int columnCount = rsmd.getColumnCount();

while (result.next()) {
// Represent a row in DB. Key: Column name, Value: Column value
Map<String, Object> row = new HashMap<>();
for (int i = 1; i <= columnCount; i++) {
// Note that the index is 1-based
String colName = rsmd.getColumnName(i);
Object colVal = result.getObject(i);
row.put(colName, colVal);
}
rows.add(row);
}
return rows;
}

private JsonNode processStatementData(JsonNode response) {
ObjectNode responseJson = Json.newObject();
ObjectMapper mapper = new ObjectMapper();
if (response.has("statements")) {
for (JsonNode objNode : response.get("statements")) {
try {
// Rather than access the JSON through .get()'s we convert to POJO
SlowQueriesParams params = mapper.treeToValue(
objNode,
SlowQueriesParams.class
);
ObjectNode rowData = Json.newObject();
// Random UUID intended for table row key
rowData.put("id", UUID.randomUUID().toString());
rowData.put("nodeName", nodeName);
rowData.put("privateIp", hostName);
rowData.put("query", params.query);
rowData.put("calls", params.calls);
rowData.put("averageTime", params.mean_time);
rowData.put("totalTime", params.total_time);
rowData.put("minTime", params.min_time);
rowData.put("maxTime", params.max_time);
rowData.put("stdDevTime", params.stddev_time);
if (!responseJson.has("ysql")) {
ArrayNode ysqlArray = responseJson.putArray("ysql");
ysqlArray.add(rowData);
} else {
ArrayNode ysqlArray = (ArrayNode) responseJson.get("ysql");
ysqlArray.add(rowData);
}
} catch (JsonProcessingException exception) {
// Try to process all connections even if there is an exception
LOG.error("Unable to process JSON from YSQL query. {}", objNode.asText());
@Override
public JsonNode call() {
ObjectNode response = Json.newObject();
String connectString = String.format("jdbc:postgresql://%s:%d/%s",
hostName, port, "postgres");
try (Connection conn = DriverManager.getConnection(
connectString, DEFAULT_DB_USER, DEFAULT_DB_PASSWORD)) {
if (conn == null) {
response.put("error", "Unable to connect to DB");
} else {
PreparedStatement p = conn.prepareStatement(query);
boolean hasResult = p.execute();
if (hasResult) {
ResultSet result = p.getResultSet();
List<Map<String, Object>> rows = resultSetToMap(result);
response.put("result", toJson(rows));
}
}
} catch (SQLException e) {
response.put("error", e.getMessage());
} catch (Exception e) {
response.put("error", e.getMessage());
}
return responseJson;

return response;
}
}

0 comments on commit e1a257d

Please sign in to comment.