Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FragmentInstanceStateMachine for FragmentInstance State change #5615

Merged
merged 10 commits into from
Apr 22, 2022
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,11 @@
<artifactId>slice</artifactId>
<version>0.41</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
<version>214</version>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public interface SinkHandleListener {
void onClosed(SinkHandle sinkHandle);

void onAborted(SinkHandle sinkHandle);

void onFailed(Throwable t);
}

/** Handle thrift communications. */
Expand Down Expand Up @@ -166,6 +168,7 @@ public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException {

/** Listen to the state changes of a source handle. */
class SourceHandleListenerImpl implements SourceHandleListener {

@Override
public void onFinished(SourceHandle sourceHandle) {
logger.info("Release resources of finished source handle {}", sourceHandle);
Expand Down Expand Up @@ -208,12 +211,12 @@ public void onFinish(SinkHandle sinkHandle) {
logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
context.finish();
context.finished();
}

@Override
public void onClosed(SinkHandle sinkHandle) {
context.flushing();
context.transitionToFlushing();
}

@Override
Expand All @@ -224,6 +227,11 @@ public void onAborted(SinkHandle sinkHandle) {
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
}

@Override
public void onFailed(Throwable t) {
context.failed(t);
}
}

private final LocalMemoryManager localMemoryManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.io.IOException;
import java.util.List;

public interface ISinkHandle extends AutoCloseable {
public interface ISinkHandle {

/** Get the total amount of memory used by buffered tsblocks. */
long getBufferRetainedSizeInBytes();
Expand All @@ -41,7 +41,7 @@ public interface ISinkHandle extends AutoCloseable {
* the send tsblock call is ignored. This can happen with limit queries. A {@link
* RuntimeException} will be thrown if any exception happened * during the data transmission.
*/
void send(List<TsBlock> tsBlocks) throws IOException;
void send(List<TsBlock> tsBlocks);

/**
* Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
Expand All @@ -57,21 +57,20 @@ public interface ISinkHandle extends AutoCloseable {
void setNoMoreTsBlocks();

/** If the handle is closed. */
public boolean isClosed();
boolean isClosed();

/**
* If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
* instances.
*/
public boolean isFinished();
boolean isFinished();

/**
* Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
* downstream instances. A {@link RuntimeException} will be thrown if any exception happened
* during the data transmission.
*/
@Override
void close() throws IOException;
void close();

/** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
void abort();
Expand Down
29 changes: 10 additions & 19 deletions server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public class SinkHandle implements ISinkHandle {
private long bufferRetainedSizeInBytes;
private boolean closed;
private boolean noMoreTsBlocks;
private Throwable throwable;

public SinkHandle(
String remoteHostname,
Expand Down Expand Up @@ -113,11 +112,8 @@ private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blo
}

@Override
public void send(List<TsBlock> tsBlocks) throws IOException {
public void send(List<TsBlock> tsBlocks) {
Validate.notNull(tsBlocks, "tsBlocks is null");
if (throwable != null) {
throw new IOException(throwable);
}
if (closed) {
throw new IllegalStateException("Sink handle is closed.");
}
Expand Down Expand Up @@ -193,24 +189,21 @@ private void sendEndOfDataBlockEvent() throws TException {
}

@Override
public void close() throws IOException {
public void close() {
logger.info("Sink handle {} is being closed.", this);
if (throwable != null) {
throw new IOException(throwable);
}
if (closed) {
return;
}
try {
sendEndOfDataBlockEvent();
} catch (TException e) {
throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
}
synchronized (this) {
closed = true;
noMoreTsBlocks = true;
}
sinkHandleListener.onClosed(this);
try {
sendEndOfDataBlockEvent();
} catch (TException e) {
throw new IOException(e);
}
logger.info("Sink handle {} is closed.", this);
}

Expand Down Expand Up @@ -243,7 +236,7 @@ public boolean isClosed() {

@Override
public boolean isFinished() {
return throwable == null && noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
}

@Override
Expand Down Expand Up @@ -358,7 +351,7 @@ public void run() {
try {
client.onNewDataBlockEvent(newDataBlockEvent);
break;
} catch (TException e) {
} catch (Throwable e) {
logger.error(
"Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
remotePlanNodeId,
Expand All @@ -367,9 +360,7 @@ public void run() {
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (this) {
throwable = e;
}
sinkHandleListener.onFailed(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ListenableFuture<Void> isFull() {
}

@Override
public void send(List<TsBlock> tsBlocks) throws IOException {
public void send(List<TsBlock> tsBlocks) {
this.tsBlocks.addAll(tsBlocks);
}

Expand Down Expand Up @@ -87,7 +87,7 @@ public void close() {
return;
}
closed = true;
instanceContext.flushing();
instanceContext.transitionToFlushing();
}

@Override
Expand Down
Loading