Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.queryengine.execution.schedule.task.DriverTaskId;
import org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;

import java.util.ArrayList;
Expand All @@ -39,7 +38,6 @@ public class DriverContext {
private final FragmentInstanceContext fragmentInstanceContext;
private final List<OperatorContext> operatorContexts = new ArrayList<>();
private ISink sink;
private final RuleBasedTimeSliceAllocator timeSliceAllocator;

private int dependencyDriverIndex = -1;
private ExchangeOperator downstreamOperator;
Expand All @@ -50,13 +48,11 @@ public class DriverContext {
@TestOnly
public DriverContext() {
this.fragmentInstanceContext = null;
this.timeSliceAllocator = null;
}

public DriverContext(FragmentInstanceContext fragmentInstanceContext, int pipelineId) {
this.fragmentInstanceContext = fragmentInstanceContext;
this.driverTaskID = new DriverTaskId(fragmentInstanceContext.getId(), pipelineId);
this.timeSliceAllocator = new RuleBasedTimeSliceAllocator();
}

public OperatorContext addOperatorContext(
Expand Down Expand Up @@ -108,10 +104,6 @@ public List<OperatorContext> getOperatorContexts() {
return operatorContexts;
}

public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
return timeSliceAllocator;
}

public int getPipelineId() {
return driverTaskID.getPipelineId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.execution.operator;

import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
Expand All @@ -28,6 +29,7 @@
import io.airlift.units.Duration;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* Contains information about {@link Operator} execution.
Expand All @@ -36,12 +38,16 @@
*/
public class OperatorContext {

private static Duration maxRunTime =
new Duration(
IoTDBDescriptor.getInstance().getConfig().getDriverTaskExecutionTimeSliceInMs(),
TimeUnit.MILLISECONDS);

private final int operatorId;
// It seems it's never used.
private final PlanNodeId planNodeId;
private final String operatorType;
private DriverContext driverContext;
private Duration maxRunTime;

private long totalExecutionTimeInNanos = 0L;
private long nextCalledCount = 0L;
Expand Down Expand Up @@ -90,8 +96,8 @@ public Duration getMaxRunTime() {
return maxRunTime;
}

public void setMaxRunTime(Duration maxRunTime) {
this.maxRunTime = maxRunTime;
public static void setMaxRunTime(Duration maxRunTime) {
OperatorContext.maxRunTime = maxRunTime;
Comment on lines +99 to +100
Copy link
Copy Markdown
Contributor

@lancelly lancelly Nov 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not remove this method since it is now only used in tests and ClientRpcServiceImpl? I think there's no need to call this method both in tests and ClientRpcServiceImpl, too.
截屏2023-11-18 10 37 28

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I wanna keep this method for future usage like we want to change the time slice for running DN without restarting it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, changing the time slice length without restarting should be done through calling a Session interface or executing a SQL . However, setting OperatorMaxRuntime doesn't seem to work, and it seems that what should be changed is the value in the Config, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but in current situation, each operator's max run time is got from OperatorContext. And calling a Session interface or executing a SQL is to change the value in OperatorContext.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

}

public SessionInfo getSessionInfo() {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.source.ExchangeOperator;
import org.apache.iotdb.db.queryengine.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
import org.apache.iotdb.db.schemaengine.schemaregion.ISchemaRegion;
import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
Expand Down Expand Up @@ -134,12 +133,6 @@ public LocalExecutionPlanContext(

public void addPipelineDriverFactory(
Operator operation, DriverContext driverContext, long estimatedMemorySize) {
driverContext
.getOperatorContexts()
.forEach(
operatorContext ->
operatorContext.setMaxRunTime(
driverContext.getTimeSliceAllocator().getMaxRunTime(operatorContext)));
pipelineDriverFactories.add(
new PipelineDriverFactory(operation, driverContext, estimatedMemorySize));
}
Expand Down Expand Up @@ -264,10 +257,6 @@ public TypeProvider getTypeProvider() {
return typeProvider;
}

public RuleBasedTimeSliceAllocator getTimeSliceAllocator() {
return driverContext.getTimeSliceAllocator();
}

public FragmentInstanceContext getInstanceContext() {
return driverContext.getFragmentInstanceContext();
}
Expand Down
Loading