Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,9 @@ public enum Property {
"The time between adjustments of the server thread pool."),
TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.MEMORY,
"The maximum size of a message that can be sent to a tablet server."),
TSERV_LOG_TOP_TABLETS_COUNT("tserver.log.top.tablets.count", "0", PropertyType.COUNT,
"Number of top tablets to log when saving tablet stats. If <= 0, logging "
+ "of top tablets is disabled"),
TSERV_HOLD_TIME_SUICIDE("tserver.hold.time.max", "5m", PropertyType.TIMEDURATION,
"The maximum time for a tablet server to be in the \"memory full\" state."
+ " If the tablet server cannot write out memory in this much time, it will"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
Expand Down Expand Up @@ -358,17 +359,60 @@ public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) th
this.logSorter = new LogSorter(instance, fs, aconf);
this.replWorker = new ReplicationWorker(this, fs);
this.statsKeeper = new TabletStatsKeeper();
final int numTopTabletsToLog = aconf.getCount(Property.TSERV_LOG_TOP_TABLETS_COUNT);
final boolean logTopTablets = numTopTabletsToLog > 0;
SimpleTimer.getInstance(aconf).schedule(new Runnable() {
@Override
public void run() {

Comparator<Pair<String,Long>> topTabletComparator = new Comparator<Pair<String,Long>>() {
@Override
public int compare(Pair<String,Long> first, Pair<String,Long> second) {
return second.getSecond().compareTo(first.getSecond());
}
};
PriorityQueue<Pair<String,Long>> topTabletsByIngestCount =
new PriorityQueue<>(numTopTabletsToLog, topTabletComparator);
PriorityQueue<Pair<String,Long>> topTabletsByQueryCount =
new PriorityQueue<>(topTabletsByIngestCount);
synchronized (onlineTablets) {
long now = System.currentTimeMillis();
topTabletsByIngestCount.clear();
topTabletsByQueryCount.clear();
for (Tablet tablet : onlineTablets.values())
try {
tablet.updateRates(now);
if (logTopTablets) {
addToTopTablets(tablet.totalIngest(), topTabletsByIngestCount, numTopTabletsToLog);
addToTopTablets(tablet.totalQueries(), topTabletsByQueryCount, numTopTabletsToLog);
}
} catch (Exception ex) {
log.error("Error updating rates for {}", tablet.getExtent(), ex);
}

if (logTopTablets) {
logTopTablets(topTabletsByIngestCount, "QUERY", numTopTabletsToLog);
logTopTablets(topTabletsByQueryCount, "INGEST", numTopTabletsToLog);
}
}
}

private void addToTopTablets(long count,
PriorityQueue<Pair<String,Long>> topTabletsByIngestCount, int numTopTabletsToLog) {
if (topTabletsByIngestCount.size() < numTopTabletsToLog
|| topTabletsByIngestCount.peek().getSecond() < count) {
if (topTabletsByIngestCount.size() == numTopTabletsToLog) {
topTabletsByIngestCount.remove();
}
}
}

private void logTopTablets(PriorityQueue<Pair<String,Long>> topTabletsByIngestCount,
String label, int numTopTabletsToLog) {
for (int i = 0; i < numTopTabletsToLog; i++) {
Pair<String,Long> pair = topTabletsByIngestCount.poll();
log.debug("Top {} tablet by {} count -- extent: {} count: {}", i, label, pair.getFirst(),
pair.getSecond());
}
}
}, 5000, 5000);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2395,6 +2395,10 @@ public long totalQueries() {
return this.queryCount;
}

public long totalIngest() {
return this.ingestCount;
}

// synchronized?
public void updateRates(long now) {
queryRate.update(now, queryCount);
Expand Down