Skip to content

Commit

Permalink
Add FragmentInstanceStateMachine for FragmentInstance State change (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Apr 22, 2022
1 parent b2520b3 commit f9226c3
Show file tree
Hide file tree
Showing 37 changed files with 1,048 additions and 539 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@
<artifactId>slice</artifactId>
<version>0.41</version>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
<version>200</version>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@
<groupId>io.airlift</groupId>
<artifactId>units</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>stats</artifactId>
</dependency>
<dependency>
<groupId>io.airlift</groupId>
<artifactId>airline</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public interface SinkHandleListener {
void onClosed(SinkHandle sinkHandle);

void onAborted(SinkHandle sinkHandle);

void onFailure(Throwable t);
}

/** Handle thrift communications. */
Expand Down Expand Up @@ -166,6 +168,7 @@ public void onEndOfDataBlockEvent(TEndOfDataBlockEvent e) throws TException {

/** Listen to the state changes of a source handle. */
class SourceHandleListenerImpl implements SourceHandleListener {

@Override
public void onFinished(SourceHandle sourceHandle) {
logger.info("Release resources of finished source handle {}", sourceHandle);
Expand Down Expand Up @@ -208,12 +211,12 @@ public void onFinish(SinkHandle sinkHandle) {
logger.info("Resources of finished sink handle {} has already been released", sinkHandle);
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
context.finish();
context.finished();
}

@Override
public void onClosed(SinkHandle sinkHandle) {
context.flushing();
context.transitionToFlushing();
}

@Override
Expand All @@ -224,6 +227,11 @@ public void onAborted(SinkHandle sinkHandle) {
}
sinkHandles.remove(sinkHandle.getLocalFragmentInstanceId());
}

@Override
public void onFailure(Throwable t) {
context.failed(t);
}
}

private final LocalMemoryManager localMemoryManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.io.IOException;
import java.util.List;

public interface ISinkHandle extends AutoCloseable {
public interface ISinkHandle {

/** Get the total amount of memory used by buffered tsblocks. */
long getBufferRetainedSizeInBytes();
Expand All @@ -41,7 +41,7 @@ public interface ISinkHandle extends AutoCloseable {
* the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
* will be thrown if any exception happened during the data transmission.
*/
void send(List<TsBlock> tsBlocks) throws IOException;
void send(List<TsBlock> tsBlocks);

/**
* Send a {@link TsBlock} to a specific partition. If no-more-tsblocks has been set, the send
Expand Down Expand Up @@ -70,8 +70,7 @@ public interface ISinkHandle extends AutoCloseable {
* downstream instances. A {@link RuntimeException} will be thrown if any exception happened
* during the data transmission.
*/
@Override
void close() throws IOException;
void close();

/**
* Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
Expand Down
29 changes: 10 additions & 19 deletions server/src/main/java/org/apache/iotdb/db/mpp/buffer/SinkHandle.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class SinkHandle implements ISinkHandle {
private long bufferRetainedSizeInBytes;
private boolean closed;
private boolean noMoreTsBlocks;
private Throwable throwable;

public SinkHandle(
TEndPoint remoteEndpoint,
Expand Down Expand Up @@ -114,11 +113,8 @@ private void submitSendNewDataBlockEventTask(int startSequenceId, List<Long> blo
}

@Override
public void send(List<TsBlock> tsBlocks) throws IOException {
public void send(List<TsBlock> tsBlocks) {
Validate.notNull(tsBlocks, "tsBlocks is null");
if (throwable != null) {
throw new IOException(throwable);
}
if (closed) {
throw new IllegalStateException("Sink handle is closed.");
}
Expand Down Expand Up @@ -194,24 +190,21 @@ private void sendEndOfDataBlockEvent() throws TException {
}

@Override
public void close() throws IOException {
public void close() {
logger.info("Sink handle {} is being closed.", this);
if (throwable != null) {
throw new IOException(throwable);
}
if (closed) {
return;
}
try {
sendEndOfDataBlockEvent();
} catch (TException e) {
throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
}
synchronized (this) {
closed = true;
noMoreTsBlocks = true;
}
sinkHandleListener.onClosed(this);
try {
sendEndOfDataBlockEvent();
} catch (TException e) {
throw new IOException(e);
}
logger.info("Sink handle {} is closed.", this);
}

Expand Down Expand Up @@ -247,7 +240,7 @@ public boolean isClosed() {

@Override
public boolean isFinished() {
return throwable == null && noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
return noMoreTsBlocks && sequenceIdToTsBlock.isEmpty();
}

@Override
Expand Down Expand Up @@ -362,7 +355,7 @@ public void run() {
try {
client.onNewDataBlockEvent(newDataBlockEvent);
break;
} catch (TException e) {
} catch (Throwable e) {
logger.error(
"Failed to send new data block event to plan node {} of {} due to {}, attempt times: {}",
remotePlanNodeId,
Expand All @@ -371,9 +364,7 @@ public void run() {
attempt,
e);
if (attempt == MAX_ATTEMPT_TIMES) {
synchronized (this) {
throwable = e;
}
sinkHandleListener.onFailure(e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public ListenableFuture<Void> isFull() {
}

@Override
public void send(List<TsBlock> tsBlocks) throws IOException {
public void send(List<TsBlock> tsBlocks) {
this.tsBlocks.addAll(tsBlocks);
}

Expand Down Expand Up @@ -87,7 +87,7 @@ public void close() {
return;
}
closed = true;
instanceContext.flushing();
instanceContext.transitionToFlushing();
}

@Override
Expand Down
Loading

0 comments on commit f9226c3

Please sign in to comment.