Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.DELETE_BATCH_FAILED_SIZE;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.UPSERT_BATCH_FAILED_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -103,28 +108,118 @@ void assertReadMetricsForMutatingSql(String tableName, long tableSaltBuckets,
assertEquals("There should have been read metrics only for one table: " + tableName, 1, numTables);
}

void assertMutationMetrics(String tableName, int numRows, Map<String, Map<MetricType, Long>> mutationMetrics) {
assertTrue("No mutation metrics present when there should have been", mutationMetrics.size() > 0);
static void assertMutationMetrics(String tableName, int numRows, boolean isUpsert,
Map<String, Map<MetricType, Long>> mutationMetrics) {
for (Map.Entry<String, Map<MetricType, Long>> entry : mutationMetrics.entrySet()) {
String t = entry.getKey();
assertEquals("Table name didn't match for mutation metrics", tableName, t);
assertEquals("Table names didn't match!", tableName, t);
Map<MetricType, Long> p = entry.getValue();
assertEquals("There should have been five metrics", 5, p.size());
assertEquals("There should have been nine metrics", 15, p.size());
boolean mutationBatchSizePresent = false;
boolean mutationCommitTimePresent = false;
boolean mutationBytesPresent = false;
boolean mutationBatchFailedPresent = false;
boolean upsertBatchFailedSizePresent = false;
boolean deleteBatchFailedSizePresent = false;
boolean upsertBatchFailedCounterPresent = false;
boolean deleteBatchFailedCounterPresent = false;
boolean deleteMutationBytesPresent = false;
boolean upsertMutationBytesPresent = false;
boolean indexCommitFailureSizePresent = false;
boolean deleteMutationSqlCounterPresent = false;
boolean upsertMutationSqlCounterPresent = false;
boolean upsertCommitTimeCounterPresent = false;
boolean deleteCommitTimeCounterPresent = false;
for (Map.Entry<MetricType, Long> metric : p.entrySet()) {
MetricType metricType = metric.getKey();
long metricValue = metric.getValue();
if (metricType.equals(MetricType.MUTATION_BATCH_SIZE)) {
assertEquals("Mutation batch sizes didn't match!", numRows, metricValue);
} else if (metricType.equals(MetricType.MUTATION_COMMIT_TIME)) {
mutationBatchSizePresent = true;
} else if (metricType.equals(MUTATION_COMMIT_TIME)) {
assertTrue("Mutation commit time should be greater than zero", metricValue > 0);
mutationCommitTimePresent = true;
} else if (metricType.equals(MetricType.MUTATION_BYTES)) {
assertTrue("Mutation bytes size should be greater than zero", metricValue > 0);
mutationBytesPresent = true;
} else if (metricType.equals(MetricType.MUTATION_BATCH_FAILED_SIZE)) {
assertEquals("Zero failed mutations expected", 0, metricValue);
mutationBatchFailedPresent = true;
} else if (metricType.equals(UPSERT_BATCH_FAILED_SIZE)) {
assertEquals("Zero failed upsert mutations size expected", 0, metricValue);
upsertBatchFailedSizePresent = true;
} else if (metricType.equals(DELETE_BATCH_FAILED_SIZE)) {
assertEquals("Zero failed delete mutations size expected", 0, metricValue);
deleteBatchFailedSizePresent = true;
} else if (metricType.equals(UPSERT_BATCH_FAILED_COUNTER)) {
assertEquals("Zero failed upsert mutations counter expected", 0, metricValue);
upsertBatchFailedCounterPresent = true;
} else if (metricType.equals(DELETE_BATCH_FAILED_COUNTER)) {
assertEquals("Zero failed delete mutations counter expected", 0, metricValue);
deleteBatchFailedCounterPresent = true;
} else if (metricType.equals(MetricType.DELETE_MUTATION_BYTES)) {
if (isUpsert) {
assertEquals("Zero delete mutation bytes size expected", 0, metricValue);
} else {
assertTrue("Delete mutation bytes size should be greater than zero", metricValue > 0);
}
deleteMutationBytesPresent = true;
} else if (metricType.equals(MetricType.UPSERT_MUTATION_BYTES)) {
if (isUpsert) {
assertTrue("Upsert mutation bytes size should be greater than zero", metricValue > 0);
} else {
assertEquals("Zero Upsert mutation bytes size expected", 0, metricValue);
}
upsertMutationBytesPresent = true;
} else if (metricType.equals(MetricType.INDEX_COMMIT_FAILURE_SIZE)) {
assertEquals("Zero failed phase 3 mutations expected", 0, metricValue);
assertEquals("Zero index commit failures expected", 0, metricValue);
indexCommitFailureSizePresent = true;
} else if (metricType.equals(MetricType.DELETE_MUTATION_SQL_COUNTER)) {
if (isUpsert) {
assertEquals("Zero delete mutations sql counter expected", 0, metricValue);
} else {
assertTrue("Delete mutations sql counter should be greater than zero", metricValue > 0);
}
deleteMutationSqlCounterPresent = true;
} else if (metricType.equals(MetricType.UPSERT_MUTATION_SQL_COUNTER)) {
if (isUpsert) {
assertTrue("Upsert mutation sql counter should be greater than zero", metricValue > 0);
} else {
assertEquals("Zero upsert mutations sql counter expected", 0, metricValue);
}
upsertMutationSqlCounterPresent = true;
} else if (metricType.equals(MetricType.UPSERT_COMMIT_TIME)) {
if (isUpsert) {
assertTrue("Upsert commit time counter should be greater than zero", metricValue > 0);
} else {
assertEquals("Zero upsert commit time counter expected", 0, metricValue);
}
upsertCommitTimeCounterPresent = true;
} else if (metricType.equals(MetricType.DELETE_COMMIT_TIME)) {
if (!isUpsert) {
assertTrue("delete commit time counter should be greater than zero",
metricValue > 0);
} else {
assertEquals("Zero delete commit time counter expected", 0, metricValue);
}
deleteCommitTimeCounterPresent = true;
}
}
assertTrue(mutationBatchSizePresent);
assertTrue(mutationCommitTimePresent);
assertTrue(mutationBytesPresent);
assertTrue(mutationBatchFailedPresent);
assertTrue(deleteMutationBytesPresent);
assertTrue(upsertMutationBytesPresent);
assertTrue(indexCommitFailureSizePresent);
assertTrue(deleteMutationSqlCounterPresent);
assertTrue(upsertMutationSqlCounterPresent);
assertTrue(upsertBatchFailedSizePresent);
assertTrue(deleteBatchFailedSizePresent);
assertTrue(upsertBatchFailedCounterPresent);
assertTrue(deleteBatchFailedCounterPresent);
assertTrue(upsertCommitTimeCounterPresent);
assertTrue(deleteCommitTimeCounterPresent);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void testPhoenixMetricsLoggedOnCommit() throws Exception {
loggedConn.commit();
assertTrue("Mutation write metrics for not found for " + tableName2,
mutationWriteMetricsMap.get(tableName2).size() > 0);
assertMutationMetrics(tableName2, NUM_ROWS, mutationWriteMetricsMap);
assertMutationMetrics(tableName2, NUM_ROWS, true, mutationWriteMetricsMap);
assertTrue("Mutation read metrics for not found for " + tableName1,
mutationReadMetricsMap.get(tableName1).size() > 0);
assertReadMetricsForMutatingSql(tableName1, 1, mutationReadMetricsMap);
Expand Down
Loading