Skip to content

Commit

Permalink
Merge branch 'master' into xingtanzjr/fix_sourceHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
xingtanzjr committed Apr 21, 2022
2 parents 3619f29 + b2520b3 commit 9d2de75
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* Internal cleaner that removes the completed procedure results after a TTL.
*
* <p>NOTE: This is a special case handled in timeoutLoop().
*
* <p>Since the client code looks more or less like:
*
* <pre>
* procId = master.doOperation()
* while (master.getProcResult(procId) == ProcInProgress);
* </pre>
*
* The master should not throw away the proc result as soon as the procedure is done but should wait
* a result request from the client (see executor.removeResult(procId)) The client will call
* something like master.isProcDone() or master.getProcResult() which will return the result/state
* to the client, and it will mark the completed proc as ready to delete. note that the client may
* not receive the response from the master (e.g. master failover) so, if we delay a bit the real
* deletion of the proc result the client will be able to get the result the next try.
*/
/** Internal cleaner that removes the completed procedure results after a TTL. */
public class CompletedProcedureCleaner<Env> extends InternalProcedure<Env> {
private static final Logger LOG = LoggerFactory.getLogger(CompletedProcedureCleaner.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@

package org.apache.iotdb.procedure;

import org.apache.iotdb.procedure.exception.*;
import org.apache.iotdb.procedure.exception.ProcedureAbortedException;
import org.apache.iotdb.procedure.exception.ProcedureException;
import org.apache.iotdb.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.procedure.exception.ProcedureTimeoutException;
import org.apache.iotdb.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.procedure.store.IProcedureStore;
import org.apache.iotdb.service.rpc.thrift.ProcedureState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,16 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ public DataBlockServiceImpl getOrCreateDataBlockServiceImpl() {
@Override
public ISinkHandle createSinkHandle(
TFragmentInstanceId localFragmentInstanceId,
TEndPoint endpoint,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
FragmentInstanceContext instanceContext) {
Expand All @@ -274,13 +274,13 @@ public ISinkHandle createSinkHandle(

SinkHandle sinkHandle =
new SinkHandle(
endpoint.toString(),
remoteEndpoint,
remoteFragmentInstanceId,
remotePlanNodeId,
localFragmentInstanceId,
localMemoryManager,
executorService,
clientFactory.getDataBlockServiceClient(endpoint),
clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
new SinkHandleListenerImpl(instanceContext));
sinkHandles.put(localFragmentInstanceId, sinkHandle);
Expand All @@ -291,7 +291,7 @@ public ISinkHandle createSinkHandle(
public ISourceHandle createSourceHandle(
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
TEndPoint endpoint,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId) {
if (sourceHandles.containsKey(localFragmentInstanceId)
&& sourceHandles.get(localFragmentInstanceId).containsKey(localPlanNodeId)) {
Expand All @@ -311,13 +311,13 @@ public ISourceHandle createSourceHandle(

SourceHandle sourceHandle =
new SourceHandle(
endpoint.getIp(),
remoteEndpoint,
remoteFragmentInstanceId,
localFragmentInstanceId,
localPlanNodeId,
localMemoryManager,
executorService,
clientFactory.getDataBlockServiceClient(endpoint),
clientFactory.getDataBlockServiceClient(remoteEndpoint),
tsBlockSerdeFactory.get(),
new SourceHandleListenerImpl());
sourceHandles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public interface ISinkHandle extends AutoCloseable {

/**
* Send a list of tsblocks to an unpartitioned output buffer. If no-more-tsblocks has been set,
* the send tsblock call is ignored. This can happen with limit queries. A {@link
* RuntimeException} will be thrown if any exception happened * during the data transmission.
* the invocation will be ignored. This can happen with limit queries. A {@link RuntimeException}
* will be thrown if any exception happened during the data transmission.
*/
void send(List<TsBlock> tsBlocks) throws IOException;

Expand All @@ -57,13 +57,13 @@ public interface ISinkHandle extends AutoCloseable {
void setNoMoreTsBlocks();

/** If the handle is closed. */
public boolean isClosed();
boolean isClosed();

/**
* If no more tsblocks will be sent and all the tsblocks have been fetched by downstream fragment
* instances.
*/
public boolean isFinished();
boolean isFinished();

/**
* Close the handle. The output buffer will not be cleared until all tsblocks are fetched by
Expand All @@ -73,6 +73,9 @@ public interface ISinkHandle extends AutoCloseable {
@Override
void close() throws IOException;

/** Abort the sink handle, discarding all tsblocks which may still be in memory buffer. */
/**
* Abort the sink handle. Discard all tsblocks which may still be in the memory buffer and cancel
* the future returned by {@link #isFull()}.
*/
void abort();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ public interface ISourceHandle extends Closeable {
/** If there are more tsblocks. */
boolean isFinished();

/**
* Get a future that will be completed when the input buffer is not empty. The future will not
* complete even when the handle is finished or closed.
*/
/** Get a future that will be completed when the input buffer is not empty. */
ListenableFuture<Void> isBlocked();

/** If this handle is closed. */
boolean isClosed();

/** Close the handle. Discarding all tsblocks which may still be in memory buffer. */
/**
* Close the handle. Discard all tsblocks which may still be in the memory buffer and complete the
* future returned by {@link #isBlocked()}.
*/
@Override
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.mpp.buffer;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SinkHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class SinkHandle implements ISinkHandle {

public static final int MAX_ATTEMPT_TIMES = 3;

private final String remoteHostname;
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final String remotePlanNodeId;
private final TFragmentInstanceId localFragmentInstanceId;
Expand All @@ -76,7 +77,7 @@ public class SinkHandle implements ISinkHandle {
private Throwable throwable;

public SinkHandle(
String remoteHostname,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
String remotePlanNodeId,
TFragmentInstanceId localFragmentInstanceId,
Expand All @@ -85,7 +86,7 @@ public SinkHandle(
DataBlockService.Iface client,
TsBlockSerde serde,
SinkHandleListener sinkHandleListener) {
this.remoteHostname = Validate.notNull(remoteHostname);
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.remotePlanNodeId = Validate.notNull(remotePlanNodeId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
Expand Down Expand Up @@ -220,6 +221,9 @@ public void abort() {
synchronized (this) {
sequenceIdToTsBlock.clear();
closed = true;
if (blocked != null && !blocked.isDone()) {
blocked.cancel(true);
}
if (bufferRetainedSizeInBytes > 0) {
localMemoryManager
.getQueryPool()
Expand Down Expand Up @@ -292,8 +296,8 @@ void acknowledgeTsBlock(int startSequenceId, int endSequenceId) {
localMemoryManager.getQueryPool().free(localFragmentInstanceId.getQueryId(), freedBytes);
}

String getRemoteHostname() {
return remoteHostname;
TEndPoint getRemoteEndpoint() {
return remoteEndpoint;
}

TFragmentInstanceId getRemoteFragmentInstanceId() {
Expand All @@ -311,7 +315,7 @@ TFragmentInstanceId getLocalFragmentInstanceId() {
@Override
public String toString() {
return new StringJoiner(", ", SinkHandle.class.getSimpleName() + "[", "]")
.add("remoteHostname='" + remoteHostname + "'")
.add("remoteEndpoint='" + remoteEndpoint + "'")
.add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
.add("remotePlanNodeId='" + remotePlanNodeId + "'")
.add("localFragmentInstanceId=" + localFragmentInstanceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.mpp.buffer;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.db.mpp.buffer.DataBlockManager.SourceHandleListener;
import org.apache.iotdb.db.mpp.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.DataBlockService;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class SourceHandle implements ISourceHandle {

public static final int MAX_ATTEMPT_TIMES = 3;

private final String remoteHostname;
private final TEndPoint remoteEndpoint;
private final TFragmentInstanceId remoteFragmentInstanceId;
private final TFragmentInstanceId localFragmentInstanceId;
private final String localPlanNodeId;
Expand All @@ -75,7 +76,7 @@ public class SourceHandle implements ISourceHandle {
private Throwable throwable;

public SourceHandle(
String remoteHostname,
TEndPoint remoteEndpoint,
TFragmentInstanceId remoteFragmentInstanceId,
TFragmentInstanceId localFragmentInstanceId,
String localPlanNodeId,
Expand All @@ -84,7 +85,7 @@ public SourceHandle(
DataBlockService.Iface client,
TsBlockSerde serde,
SourceHandleListener sourceHandleListener) {
this.remoteHostname = Validate.notNull(remoteHostname);
this.remoteEndpoint = Validate.notNull(remoteEndpoint);
this.remoteFragmentInstanceId = Validate.notNull(remoteFragmentInstanceId);
this.localFragmentInstanceId = Validate.notNull(localFragmentInstanceId);
this.localPlanNodeId = Validate.notNull(localPlanNodeId);
Expand Down Expand Up @@ -249,8 +250,8 @@ private boolean remoteTsBlockedConsumedUp() {
return currSequenceId - 1 == lastSequenceId;
}

String getRemoteHostname() {
return remoteHostname;
TEndPoint getRemoteEndpoint() {
return remoteEndpoint;
}

TFragmentInstanceId getRemoteFragmentInstanceId() {
Expand Down Expand Up @@ -278,7 +279,7 @@ public boolean isClosed() {
@Override
public String toString() {
return new StringJoiner(", ", SourceHandle.class.getSimpleName() + "[", "]")
.add("remoteHostname='" + remoteHostname + "'")
.add("remoteEndpoint='" + remoteEndpoint + "'")
.add("remoteFragmentInstanceId=" + remoteFragmentInstanceId)
.add("localFragmentInstanceId=" + localFragmentInstanceId)
.add("localPlanNodeId='" + localPlanNodeId + "'")
Expand Down
Loading

0 comments on commit 9d2de75

Please sign in to comment.