Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,9 @@ protected boolean executeOnce() {
throw e;
} else {
LOGGER.info(
"{} in pipe transfer, ignored because the connector subtask is dropped.",
"{} in pipe transfer, ignored because the connector subtask is dropped.{}",
e.getClass().getSimpleName(),
e);
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
} catch (final Exception e) {
Expand All @@ -161,7 +161,8 @@ protected boolean executeOnce() {
e);
} else {
LOGGER.info(
"Exception in pipe transfer, ignored because the connector subtask is dropped.", e);
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
e.getMessage() != null ? " Message: " + e.getMessage() : "");
clearReferenceCountAndReleaseLastEvent(event);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,20 @@ public InsertNode getInsertNode() {
}

public ByteBuffer getByteBuffer() throws WALPipeException {
return insertNode.serializeToByteBuffer();
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
throw new PipeException("InsertNode has been released");
}
return node.serializeToByteBuffer();
}

public String getDeviceId() {
if (Objects.isNull(insertNode)) {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
return null;
}
return Objects.nonNull(insertNode.getTargetPath())
? insertNode.getTargetPath().getFullPath()
: null;
final PartialPath targetPath = node.getTargetPath();
return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
}

public long getExtractTime() {
Expand Down Expand Up @@ -244,10 +248,14 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
final boolean skipIfNoPrivileges,
final long startTime,
final long endTime) {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
throw new PipeException("InsertNode has been released");
}
return new PipeInsertNodeTabletInsertionEvent(
getRawIsTableModelEvent(),
getSourceDatabaseNameFromDataRegion(),
insertNode,
node,
pipeName,
creationTime,
pipeTaskMeta,
Expand All @@ -263,25 +271,34 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP

@Override
public boolean isGeneratedByPipe() {
return insertNode.isGeneratedByPipe();
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
throw new PipeException("InsertNode has been released");
}
return node.isGeneratedByPipe();
}

@Override
public void throwIfNoPrivilege() {
if (skipIfNoPrivileges || !isTableModelEvent()) {
return;
}
if (Objects.nonNull(insertNode.getTargetPath())) {
checkTableName(
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
} else if (insertNode instanceof InsertRowsNode) {
final InsertNode node = insertNode;
if (Objects.isNull(node)) {
// Event is released, skip privilege check
return;
}
final PartialPath targetPath = node.getTargetPath();
if (Objects.nonNull(targetPath)) {
checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName());
} else if (node instanceof InsertRowsNode) {
for (final String tableName :
((InsertRowsNode) insertNode)
((InsertRowsNode) node)
.getInsertRowNodeList().stream()
.map(
node ->
insertRowNode ->
DeviceIDFactory.getInstance()
.getDeviceID(node.getTargetPath())
.getDeviceID(insertRowNode.getTargetPath())
.getTableName())
.collect(Collectors.toSet())) {
checkTableName(tableName);
Expand Down Expand Up @@ -443,6 +460,9 @@ private List<TabletInsertionEventParser> initEventParsers() {

eventParsers = new ArrayList<>();
final InsertNode node = getInsertNode();
if (Objects.isNull(node)) {
throw new PipeException("InsertNode has been released");
}
switch (node.getType()) {
case INSERT_ROW:
case INSERT_TABLET:
Expand Down Expand Up @@ -526,23 +546,25 @@ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {

@Override
public String toString() {
final InsertNode node = insertNode;
return String.format(
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}",
progressIndex,
Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null,
Objects.nonNull(node) ? node.isAligned() : null,
Objects.nonNull(node) ? node.isGeneratedByPipe() : null,
eventParsers)
+ " - "
+ super.toString();
}

@Override
public String coreReportMessage() {
final InsertNode node = insertNode;
return String.format(
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}",
progressIndex,
Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null)
Objects.nonNull(node) ? node.isAligned() : null,
Objects.nonNull(node) ? node.isGeneratedByPipe() : null)
+ " - "
+ super.coreReportMessage();
}
Expand All @@ -567,12 +589,15 @@ public PipeEventResource eventResourceBuilder() {
// invoked, the event will soon be released.
@Override
public long ramBytesUsed() {
return bytes > 0
? bytes
: (bytes =
INSTANCE_SIZE
+ (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0));
if (bytes > 0) {
return bytes;
}
final InsertNode node = insertNode;
bytes =
INSTANCE_SIZE
+ (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) : 0)
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
return bytes;
}

private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource {
Expand Down
Loading