Skip to content

Commit

Permalink
Better logging for MSQ worker task (#13790)
Browse files Browse the repository at this point in the history
* Adding more logs to MSQ worker implementation which makes it easier to debug.
  • Loading branch information
cryptoe committed Feb 25, 2023
1 parent e46379b commit 6bb5eff
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,13 @@ public Optional<MSQErrorReport> runTask(final Closer closer) throws Exception
final StageDefinition stageDefinition = kernel.getStageDefinition();

if (kernel.getPhase() == WorkerStagePhase.NEW) {
log.debug("New work order: %s", context.jsonMapper().writeValueAsString(kernel.getWorkOrder()));

log.info("Processing work order for stage [%d]" +
(log.isDebugEnabled()
? StringUtils.format(
" with payload [%s]",
context.jsonMapper().writeValueAsString(kernel.getWorkOrder())
) : ""), stageDefinition.getId().getStageNumber());

// Create separate inputChannelFactory per stage, because the list of tasks can grow between stages, and
// so we need to avoid the memoization in baseInputChannelFactory.
Expand Down Expand Up @@ -444,6 +450,7 @@ public Optional<MSQErrorReport> runTask(final Closer closer) throws Exception
@Override
public void stopGracefully()
{
log.info("Stopping gracefully for taskId [%s]", task.getId());
kernelManipulationQueue.add(
kernel -> {
// stopGracefully() is called when the containing process is terminated, or when the task is canceled.
Expand Down Expand Up @@ -520,6 +527,7 @@ public InputStream readChannel(
@Override
public void postWorkOrder(final WorkOrder workOrder)
{
log.info("Got work order for stage [%d]", workOrder.getStageNumber());
if (task.getWorkerNumber() != workOrder.getWorkerNumber()) {
throw new ISE("Worker number mismatch: expected [%d]", task.getWorkerNumber());
}
Expand Down Expand Up @@ -569,7 +577,7 @@ public boolean postResultPartitionBoundaries(
@Override
public void postCleanupStage(final StageId stageId)
{
log.info("Cleanup order for stage: [%s] received", stageId);
log.info("Cleanup order for stage [%s] received", stageId);
kernelManipulationQueue.add(
holder -> {
cleanStageOutput(stageId, true);
Expand All @@ -587,17 +595,19 @@ public void postCleanupStage(final StageId stageId)
@Override
public void postFinish()
{
log.info("Finish received for task [%s]", task.getId());
kernelManipulationQueue.add(KernelHolder::setDone);
}

@Override
public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId)
{
log.info("Fetching statistics for stage [%d]", stageId.getStageNumber());
if (stageKernelMap.get(stageId) == null) {
throw new ISE("Requested statistics snapshot for non-existent stageId %s.", stageId);
} else if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) {
throw new ISE(
"Requested statistics snapshot is not generated yet for stageId[%s]",
"Requested statistics snapshot is not generated yet for stageId [%s]",
stageId
);
} else {
Expand All @@ -608,11 +618,16 @@ public ClusterByStatisticsSnapshot fetchStatisticsSnapshot(StageId stageId)
@Override
public ClusterByStatisticsSnapshot fetchStatisticsSnapshotForTimeChunk(StageId stageId, long timeChunk)
{
log.debug(
"Fetching statistics for stage [%d] with time chunk [%d] ",
stageId.getStageNumber(),
timeChunk
);
if (stageKernelMap.get(stageId) == null) {
throw new ISE("Requested statistics snapshot for non-existent stageId[%s].", stageId);
throw new ISE("Requested statistics snapshot for non-existent stageId [%s].", stageId);
} else if (stageKernelMap.get(stageId).getResultKeyStatisticsSnapshot() == null) {
throw new ISE(
"Requested statistics snapshot is not generated yet for stageId[%s]",
"Requested statistics snapshot is not generated yet for stageId [%s]",
stageId
);
} else {
Expand Down Expand Up @@ -1349,7 +1364,7 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
if (taskId.equals(id())) {
final ConcurrentMap<Integer, ReadableFrameChannel> partitionOutputsForStage = stageOutputs.get(stageId);
if (partitionOutputsForStage == null) {
throw new ISE("Unable to find outputs for stage: [%s]", stageId);
throw new ISE("Unable to find outputs for stage [%s]", stageId);
}

final ReadableFrameChannel myChannel = partitionOutputsForStage.get(partitionNumber);
Expand All @@ -1361,7 +1376,7 @@ public ReadableFrameChannel openChannel(StageId stageId, int workerNumber, int p
} else if (myChannel instanceof ReadableNilFrameChannel) {
return myChannel;
} else {
throw new ISE("Output for stage: [%s] are stored in an instance of %s which is not "
throw new ISE("Output for stage [%s] are stored in an instance of %s which is not "
+ "supported", stageId, myChannel.getClass());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
import org.apache.druid.msq.kernel.WorkOrder;
Expand All @@ -44,6 +45,7 @@
*/
public class WorkerStageKernel
{
private static final Logger log = new Logger(WorkerStageKernel.class);
private final WorkOrder workOrder;

private WorkerStagePhase phase = WorkerStagePhase.NEW;
Expand Down Expand Up @@ -210,6 +212,12 @@ private void assertPreshuffleStatisticsNeeded()
private void transitionTo(final WorkerStagePhase newPhase)
{
if (newPhase.canTransitionFrom(phase)) {
log.info(
"Stage [%d] transitioning from old phase [%s] to new phase [%s]",
workOrder.getStageNumber(),
phase,
newPhase
);
phase = newPhase;
} else {
throw new IAE("Cannot transition from [%s] to [%s]", phase, newPhase);
Expand Down

0 comments on commit 6bb5eff

Please sign in to comment.