Skip to content
Permalink
Browse files
[NO ISSUE][HYR][MISC] Add option to provide tracer args via Suppliers
Change-Id: I040523a73e65a0fd86fff3fff38d2278d83ef65f
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14643
Reviewed-by: Michael Blow <mblow@apache.org>
Reviewed-by: Hussain Towaileb <hussainht@gmail.com>
Integration-Tests: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenkins@fulliautomatix.ics.uci.edu>
  • Loading branch information
mblow committed Dec 23, 2021
1 parent a0fd47e commit f65e0adfcf538af769f2393b768fc8770cfebf51
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 32 deletions.
@@ -162,7 +162,7 @@ private void initializeNewFeedRuntime(ActiveRuntimeId runtimeId) throws Exceptio

@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
long tid = tracer.durationB("Ingestion-Store", traceCategory, null);
long tid = tracer.durationB("Ingestion-Store", traceCategory);
try {
if (hasMessage) {
FeedUtils.processFeedMessage(buffer, message, fta);
@@ -172,7 +172,7 @@ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
LOGGER.log(Level.WARN, "Failure Processing a frame at store side", e);
throw HyracksDataException.create(e);
} finally {
tracer.durationE(tid, traceCategory, null);
tracer.durationE(traceCategory, tid);
}
}

@@ -459,7 +459,7 @@ private void traceLastRecordIn() {
if (tracer.isEnabled(traceCategory) && lastRecordInTimeStamp > 0 && indexHelper != null
&& indexHelper.getIndexInstance() != null) {
tracer.instant("UpsertClose", traceCategory, Scope.t,
"{\"last-record-in\":\"" + DATE_FORMAT.get().format(new Date(lastRecordInTimeStamp))
() -> "{\"last-record-in\":\"" + DATE_FORMAT.get().format(new Date(lastRecordInTimeStamp))
+ "\", \"index\":" + indexHelper.getIndexInstance().toString() + "}");
}
} catch (Throwable traceFailure) {
@@ -710,7 +710,7 @@ public TraceCurrentTimeTask(ITracer tracer) {
@Override
public void run() {
try {
tracer.instant("CurrentTime", traceCategory, Tracer.Scope.p, Tracer.dateTimeStamp());
tracer.instant("CurrentTime", traceCategory, ITracer.Scope.p, Tracer::dateTimeStamp);
} catch (Exception e) {
LOGGER.log(Level.WARN, "Exception tracing current time", e);
}
@@ -116,7 +116,7 @@ public void flush(IFrameWriter writer, ITracer tracer, String name, long traceCa
try {
flush(writer);
} finally {
tracer.durationE(tid, traceCategory, args);
tracer.durationE(traceCategory, tid, args);
}
}

@@ -157,7 +157,7 @@ public static int appendToWriter(IFrameWriter writer, IFrameTupleAppender frameT
flushedBytes = frameTupleAppender.getBuffer().capacity();
long tid = tracer.durationB(name, cat, args);
frameTupleAppender.write(writer, true);
tracer.durationE(tid, cat, args);
tracer.durationE(cat, tid, args);
if (!frameTupleAppender.append(tupleAccessor, tIndex)) {
throw HyracksDataException.create(ErrorCode.TUPLE_CANNOT_FIT_INTO_EMPTY_FRAME,
tupleAccessor.getTupleLength(tIndex));
@@ -171,7 +171,7 @@ public final boolean isTracingEnabled() {
public void logPerformanceCounters(int tupleCount) {
if (isTracingEnabled()) {
tracer.instant("store-counters", traceCategory, Scope.t,
"{\"count\":" + tupleCount + ",\"enter-exit-duration-ns\":" + enterExitTime + "}");
() -> "{\"count\":" + tupleCount + ",\"enter-exit-duration-ns\":" + enterExitTime + "}");
resetCounters();
}
}
@@ -262,7 +262,7 @@ private void doExitComponents(ILSMIndexOperationContext ctx, LSMOperationType op
}
if (inactiveMemoryComponentsToBeCleanedUp != null) {
for (ILSMMemoryComponent c : inactiveMemoryComponentsToBeCleanedUp) {
tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex.toString());
tracer.instant(c.toString(), traceCategory, Scope.p, lsmIndex::toString);
c.cleanup();
synchronized (opTracker) {
c.reset();
@@ -58,7 +58,7 @@ public static ILSMIOOperation wrap(final ILSMIOOperation ioOp, final ITracer tra
final long traceCategory = tracer.getRegistry().get(TraceUtils.INDEX_IO_OPERATIONS);
if (tracer.isEnabled(traceCategory)) {
tracer.instant("schedule-" + ioOpName, traceCategory, Scope.p,
"{\"path\": \"" + ioOp.getTarget().getRelativePath() + "\"}");
() -> "{\"path\": \"" + ioOp.getTarget().getRelativePath() + "\"}");
return new TracedIOOperation(ioOp, tracer, traceCategory);
}
return ioOp;
@@ -91,11 +91,11 @@ public LSMIOOperationType getIOOpertionType() {
@Override
public LSMIOOperationStatus call() throws HyracksDataException {
final String name = getTarget().getRelativePath();
final long tid = tracer.durationB(name, traceCategory, null);
final long tid = tracer.durationB(name, traceCategory);
try {
return ioOp.call();
} finally {
tracer.durationE(ioOp.getIOOpertionType().name().toLowerCase(), traceCategory, tid, "{\"size\":"
tracer.durationE(ioOp.getIOOpertionType().name().toLowerCase(), traceCategory, tid, () -> "{\"size\":"
+ getTarget().getFile().length() + ", \"path\": \"" + ioOp.getTarget().getRelativePath() + "\"}");
}
}
@@ -45,6 +45,10 @@ protected void beforeExecute(Thread t, Runnable r) {
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Thread.currentThread().setName(savedName.get());
try {
Thread.currentThread().setName(savedName.get());
} finally {
savedName.remove();
}
}
}
@@ -19,6 +19,8 @@

package org.apache.hyracks.util.trace;

import java.util.function.Supplier;

public interface ITracer {

enum Phase {
@@ -83,14 +85,49 @@ public void durationE(String name, long cat, long tid, String args) {
}

@Override
public void durationE(long tid, long cat, String args) {
public void durationE(long cat, long tid, String args) {
// nothing to do here
}

@Override
public void instant(String name, long cat, Scope scope) {
// nothing to do here
}

@Override
public void instant(String name, long cat, Scope scope, String args) {
// nothing to do here
}

@Override
public long durationB(String name, long cat) {
return -1;
}

@Override
public long durationB(String name, long cat, Supplier<String> args) {
return -1;
}

@Override
public void durationE(long cat, long tid) {
// nothing to do here
}

@Override
public void durationE(long cat, long tid, Supplier<String> args) {
// nothing to do here
}

@Override
public void durationE(String name, long cat, long tid, Supplier<String> args) {
// nothing to do here
}

@Override
public void instant(String name, long cat, Scope scope, Supplier<String> args) {
// nothing to do here
}
};

String getName();
@@ -101,14 +138,28 @@ public void instant(String name, long cat, Scope scope, String args) {

boolean isEnabled(long cat);

long durationB(String name, long cat);

long durationB(String name, long cat, String args);

void durationE(long tid, long cat, String args);
long durationB(String name, long cat, Supplier<String> args);

void durationE(long cat, long tid);

void durationE(long cat, long tid, String args);

void durationE(long cat, long tid, Supplier<String> args);

void durationE(String name, long cat, long tid, String args);

void durationE(String name, long cat, long tid, Supplier<String> args);

void instant(String name, long cat, Scope scope);

void instant(String name, long cat, Scope scope, String args);

void instant(String name, long cat, Scope scope, Supplier<String> args);

@Override
String toString();
}
@@ -23,6 +23,7 @@
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.function.Supplier;

import org.apache.hyracks.util.PidHelper;
import org.apache.logging.log4j.Level;
@@ -44,7 +45,7 @@ public class Tracer implements ITracer {
private long categories;
private final TraceCategoryRegistry registry;

private static final long pid = PidHelper.getPid();
private static final long PID = PidHelper.getPid();

public Tracer(String name, long categories, TraceCategoryRegistry registry) {
final String traceLoggerName = Tracer.class.getName() + ".Traces." + name;
@@ -53,7 +54,7 @@ public Tracer(String name, long categories, TraceCategoryRegistry registry) {
this.categories = categories;
this.registry = registry;
final long traceCategory = getRegistry().get(TraceUtils.TRACER);
instant("Trace-Start", traceCategory, Scope.p, dateTimeStamp());
instant("Trace-Start", traceCategory, Scope.p, Tracer::dateTimeStamp);
}

public Tracer(String name, String[] categories, TraceCategoryRegistry registry) {
@@ -99,37 +100,94 @@ public boolean isEnabled(long cat) {
return (categories & cat) != 0L;
}

@Override
public long durationB(String name, long cat) {
if (isEnabled(cat)) {
return emitDurationB(name, cat, null);
}
return -1;
}

@Override
public long durationB(String name, long cat, String args) {
if (isEnabled(cat)) {
Event e = Event.create(name, cat, Phase.B, pid, Thread.currentThread().getId(), null, args, getRegistry());
traceLog.log(TRACE_LOG_LEVEL, e.toJson());
return e.tid;
return emitDurationB(name, cat, args);
}
return -1;
}

@Override
public long durationB(String name, long cat, Supplier<String> args) {
if (isEnabled(cat)) {
return emitDurationB(name, cat, args.get());
}
return -1;
}

@Override
public void durationE(String name, long cat, long tid, String args) {
if (isEnabled(cat)) {
Event e = Event.create(name, cat, Phase.E, pid, tid, null, args, getRegistry());
traceLog.log(TRACE_LOG_LEVEL, e.toJson());
emit(name, cat, null, Phase.E, tid, args);
}
}

@Override
public void durationE(String name, long cat, long tid, Supplier<String> args) {
if (isEnabled(cat)) {
emit(name, cat, null, Phase.E, tid, args.get());
}
}

@Override
public void durationE(long tid, long cat, String args) {
public void durationE(long cat, long tid) {
if (isEnabled(cat)) {
Event e = Event.create(null, 0L, Phase.E, pid, tid, null, args, getRegistry());
traceLog.log(TRACE_LOG_LEVEL, e.toJson());
emit(null, 0L, null, Phase.E, tid, null);
}
}

@Override
public void durationE(long cat, long tid, String args) {
if (isEnabled(cat)) {
emit(null, 0L, null, Phase.E, tid, args);
}
}

@Override
public void durationE(long cat, long tid, Supplier<String> args) {
if (isEnabled(cat)) {
emit(null, 0L, null, Phase.E, tid, args.get());
}
}

@Override
public void instant(String name, long cat, Scope scope) {
if (isEnabled(cat)) {
emit(name, cat, scope, Phase.i, Thread.currentThread().getId(), null);
}
}

@Override
public void instant(String name, long cat, Scope scope, String args) {
if (isEnabled(cat)) {
Event e = Event.create(name, cat, Phase.i, pid, Thread.currentThread().getId(), scope, args, getRegistry());
traceLog.log(TRACE_LOG_LEVEL, e.toJson());
emit(name, cat, scope, Phase.i, Thread.currentThread().getId(), args);
}
}

@Override
public void instant(String name, long cat, Scope scope, Supplier<String> args) {
if (isEnabled(cat)) {
emit(name, cat, scope, Phase.i, Thread.currentThread().getId(), args.get());
}
}

private long emitDurationB(String name, long cat, String args) {
Event e = Event.create(name, cat, Phase.B, PID, Thread.currentThread().getId(), null, args, getRegistry());
traceLog.log(TRACE_LOG_LEVEL, e.toJson());
return e.tid;
}

private void emit(String name, long cat, Scope scope, Phase i, long tid, String args) {
Event e = Event.create(name, cat, i, PID, tid, scope, args, getRegistry());
traceLog.log(TRACE_LOG_LEVEL, e.toJson());
}
}
@@ -61,9 +61,9 @@ public void testInstant() throws IOException {

ITracer tracer = new Tracer(name, new String[] { "CAT1", "CAT2" }, registry);
Log4j2Monitor.start();
tracer.instant("test1", cat1, ITracer.Scope.p, null);
tracer.instant("test2", cat2, ITracer.Scope.p, null);
tracer.instant("test3", cat3, ITracer.Scope.p, null);
tracer.instant("test1", cat1, ITracer.Scope.p);
tracer.instant("test2", cat2, ITracer.Scope.p);
tracer.instant("test3", cat3, ITracer.Scope.p);

List<String> lines = Log4j2Monitor.getLogs();
for (String line : lines) {
@@ -76,9 +76,9 @@ public void testInstant() throws IOException {
tracer.setCategories("CAT1", "CAT3");
Log4j2Monitor.reset();

tracer.instant("test1", cat1, ITracer.Scope.p, null);
tracer.instant("test2", cat2, ITracer.Scope.p, null);
tracer.instant("test3", cat3, ITracer.Scope.p, null);
tracer.instant("test1", cat1, ITracer.Scope.p);
tracer.instant("test2", cat2, ITracer.Scope.p);
tracer.instant("test3", cat3, ITracer.Scope.p);

lines = Log4j2Monitor.getLogs();
for (String line : lines) {

0 comments on commit f65e0ad

Please sign in to comment.