Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ public IoTDBConfigNodeSyncClientManager(
boolean shouldReceiverConvertOnTypeMismatch,
String loadTsFileStrategy,
boolean validateTsFile,
boolean shouldMarkAsPipeRequest) {
boolean shouldMarkAsPipeRequest,
boolean shouldSendToAllClients) {
super(
endPoints,
useSSL,
Expand All @@ -59,7 +60,8 @@ public IoTDBConfigNodeSyncClientManager(
shouldReceiverConvertOnTypeMismatch,
loadTsFileStrategy,
validateTsFile,
shouldMarkAsPipeRequest);
shouldMarkAsPipeRequest,
shouldSendToAllClients);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@

import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;

@TreeModel
Expand Down Expand Up @@ -121,22 +123,17 @@ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exc

@Override
public void transfer(final Event event) throws Exception {
final int socketIndex = nextSocketIndex();
final AirGapSocket socket = sockets.get(socketIndex);

try {
if (event instanceof PipeConfigRegionWritePlanEvent) {
doTransferWrapper(socket, (PipeConfigRegionWritePlanEvent) event);
doTransferWrapper((PipeConfigRegionWritePlanEvent) event);
} else if (event instanceof PipeConfigRegionSnapshotEvent) {
doTransferWrapper(socket, (PipeConfigRegionSnapshotEvent) event);
doTransferWrapper((PipeConfigRegionSnapshotEvent) event);
} else if (!(event instanceof PipeHeartbeatEvent)) {
LOGGER.warn(
"IoTDBConfigRegionAirGapConnector does not support transferring generic event: {}.",
event);
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);

throw new PipeConnectionException(
String.format(
"Network error when transfer event %s, because %s.",
Expand All @@ -146,7 +143,6 @@ public void transfer(final Event event) throws Exception {
}

private void doTransferWrapper(
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
Expand All @@ -155,72 +151,78 @@ private void doTransferWrapper(
return;
}
try {
doTransfer(socket, pipeConfigRegionWritePlanEvent);
doTransfer(pipeConfigRegionWritePlanEvent);
} finally {
pipeConfigRegionWritePlanEvent.decreaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName(), false);
}
}

private void doTransfer(
final AirGapSocket socket,
final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
private void doTransfer(final PipeConfigRegionWritePlanEvent pipeConfigRegionWritePlanEvent)
throws PipeException, IOException {
if (!send(
pipeConfigRegionWritePlanEvent.getPipeName(),
pipeConfigRegionWritePlanEvent.getCreationTime(),
socket,
final List<Integer> socketIndexes =
shouldSendToAllClients
? allAliveSocketsIndex()
: Collections.singletonList(nextSocketIndex());
final byte[] bytes =
PipeTransferConfigPlanReq.toTPipeTransferBytes(
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan()))) {
final String errorMessage =
String.format(
"Transfer config region write plan %s error. Socket: %s.",
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
// Send handshake because we don't know whether the receiver side configNode
// has set up a new one
sendHandshakeReq(socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeConfigRegionWritePlanEvent.toString());
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan());
for (final int socketIndex : socketIndexes) {
final AirGapSocket socket = sockets.get(socketIndex);
try {
if (!send(
pipeConfigRegionWritePlanEvent.getPipeName(),
pipeConfigRegionWritePlanEvent.getCreationTime(),
socket,
bytes)) {
final String errorMessage =
String.format(
"Transfer config region write plan %s error. Socket: %s.",
pipeConfigRegionWritePlanEvent.getConfigPhysicalPlan().getType(), socket);
// Send handshake because we don't know whether the receiver side configNode
// has set up a new one
sendHandshakeReq(socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeConfigRegionWritePlanEvent.toString());
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
throw e;
}
}
}

private void doTransferWrapper(
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
private void doTransferWrapper(final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeConfigRegionSnapshotEvent.increaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName())) {
return;
}
try {
doTransfer(socket, pipeConfigRegionSnapshotEvent);
doTransfer(pipeConfigRegionSnapshotEvent);
} finally {
pipeConfigRegionSnapshotEvent.decreaseReferenceCount(
IoTDBConfigRegionAirGapConnector.class.getName(), false);
}
}

private void doTransfer(
final AirGapSocket socket, final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
private void doTransfer(final PipeConfigRegionSnapshotEvent pipeConfigRegionSnapshotEvent)
throws PipeException, IOException {
final String pipeName = pipeConfigRegionSnapshotEvent.getPipeName();
final long creationTime = pipeConfigRegionSnapshotEvent.getCreationTime();
final File snapshot = pipeConfigRegionSnapshotEvent.getSnapshotFile();
final File templateFile = pipeConfigRegionSnapshotEvent.getTemplateFile();

// 1. Transfer snapshotFile, and template file if exists
transferFilePieces(pipeName, creationTime, snapshot, socket, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(pipeName, creationTime, templateFile, socket, true);
}
// 2. Transfer file seal signal, which means the snapshots are transferred completely
if (!send(
pipeName,
creationTime,
socket,
final List<Integer> socketIndexes =
shouldSendToAllClients
? allAliveSocketsIndex()
: Collections.singletonList(nextSocketIndex());

final byte[] bytes =
PipeTransferConfigSnapshotSealReq.toTPipeTransferBytes(
// The pattern is surely Non-null
pipeConfigRegionSnapshotEvent.getTreePatternString(),
Expand All @@ -233,19 +235,35 @@ private void doTransfer(
Objects.nonNull(templateFile) ? templateFile.getName() : null,
Objects.nonNull(templateFile) ? templateFile.length() : 0,
pipeConfigRegionSnapshotEvent.getFileType(),
pipeConfigRegionSnapshotEvent.toSealTypeString()))) {
final String errorMessage =
String.format("Seal config region snapshot %s error. Socket %s.", snapshot, socket);
// Send handshake because we don't know whether the receiver side configNode
// has set up a new one
sendHandshakeReq(socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeConfigRegionSnapshotEvent.toString());
} else {
LOGGER.info("Successfully transferred config region snapshot {}.", snapshot);
pipeConfigRegionSnapshotEvent.toSealTypeString());

// 1. Transfer snapshotFile, and template file if exists
transferFilePieces(pipeName, creationTime, snapshot, socketIndexes, true);
if (Objects.nonNull(templateFile)) {
transferFilePieces(pipeName, creationTime, templateFile, socketIndexes, true);
}
for (final int socketIndex : socketIndexes) {
final AirGapSocket socket = sockets.get(socketIndex);
try {
// 2. Transfer file seal signal, which means the snapshots are transferred completely
if (!send(pipeName, creationTime, socket, bytes)) {
final String errorMessage =
String.format("Seal config region snapshot %s error. Socket %s.", snapshot, socket);
// Send handshake because we don't know whether the receiver side configNode
// has set up a new one
sendHandshakeReq(socket);
receiverStatusHandler.handle(
new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
.setMessage(errorMessage),
errorMessage,
pipeConfigRegionSnapshotEvent.toString());
} else {
LOGGER.info("Successfully transferred config region snapshot {}.", snapshot);
}
} catch (final IOException e) {
isSocketAlive.set(socketIndex, false);
throw e;
}
}
}
}
Loading
Loading