Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,31 @@

package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;

import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class PipeTransferHandshakeReq extends TPipeTransferReq {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferHandshakeReq.class);

private transient String timestampPrecision;
private transient String clusterId;

private PipeTransferHandshakeReq() {
// Empty constructor
Expand All @@ -42,19 +53,28 @@ public String getTimestampPrecision() {
return timestampPrecision;
}

public String getClusterId() {
return clusterId;
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecision)
throws IOException {
throws IOException, ClientManagerException, TException {
final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();

handshakeReq.timestampPrecision = timestampPrecision;

handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();

try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID);
final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
handshakeReq.clusterId = configNodeClient.getClusterId().clusterId;
ReadWriteIOUtils.write(timestampPrecision, outputStream);
ReadWriteIOUtils.write(handshakeReq.clusterId, outputStream);
handshakeReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}
Expand All @@ -67,6 +87,15 @@ public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq tra

handshakeReq.timestampPrecision = ReadWriteIOUtils.readString(transferReq.body);

// It is possible to catch BufferUnderflowException if the older version send handshake request
// to newer version.
try {
handshakeReq.clusterId = ReadWriteIOUtils.readString(transferReq.body);
} catch (BufferUnderflowException e) {
handshakeReq.clusterId = null;
LOGGER.warn("Unable to get clusterId from handshake request.");
}

handshakeReq.version = transferReq.version;
handshakeReq.type = transferReq.type;
handshakeReq.body = transferReq.body;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.apache.thrift.TException;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;


public class PipeTransferHandshakeReqV2 extends TPipeTransferReq {
private transient Map<String, String> params;

private PipeTransferHandshakeReqV2() {
// Empty constructor
}

public Map<String, String> getParams() {
return params;
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferHandshakeReqV2 toTPipeTransferReq(HashMap<String, String> params) throws TException {
final PipeTransferHandshakeReqV2 handshakeReq = new PipeTransferHandshakeReqV2();

handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
handshakeReq.body = ByteBuffer.wrap(SerializationUtils.serialize(params));

return handshakeReq;
}

public static PipeTransferHandshakeReqV2 fromTPipeTransferReq(TPipeTransferReq transferReq) {
final PipeTransferHandshakeReqV2 handshakeReq = new PipeTransferHandshakeReqV2();

handshakeReq.params = SerializationUtils.deserialize(transferReq.body.array());

handshakeReq.version = transferReq.version;
handshakeReq.type = transferReq.version;
handshakeReq.body = transferReq.body;

return handshakeReq;
}

/////////////////////////////// Air Gap ///////////////////////////////

public static byte[] toTransferHandshakeBytes(HashMap<String, String> params) throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(), outputStream);
ReadWriteIOUtils.write(params, outputStream);
return byteArrayOutputStream.toByteArray();
}
}

/////////////////////////////// Object ///////////////////////////////

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PipeTransferHandshakeReqV2 that = (PipeTransferHandshakeReqV2) obj;
return params.equals(that.params)
&& version == that.version
&& type == that.type
&& body.equals(that.body);
}

@Override
public int hashCode() {
return Objects.hash(params, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;

import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
Expand Down Expand Up @@ -163,6 +164,8 @@ private void reconstructClient(TEndPoint endPoint) throws IOException {
endPoint.getPort(),
e.getMessage(),
e);
} catch (ClientManagerException e) {
LOGGER.warn("Unable to get configNode client, because: {}.", e.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.receiver.thrift;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
Expand All @@ -41,6 +42,9 @@
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
Expand All @@ -56,12 +60,14 @@
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.iotdb.tsfile.utils.Pair;

import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -171,6 +177,25 @@ public synchronized TPipeTransferResp receive(
}

private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq req) {
try (ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final String clusterId = configNodeClient.getClusterId().clusterId;
if (clusterId.equals(req.getClusterId())) {
final TSStatus status =
RpcUtils.getStatus(
TSStatusCode.PIPE_HANDSHAKE_ERROR,
String.format("Unable to transfer data to IoTDB cluster %s itself", clusterId));
LOGGER.warn("Handshake failed, response status = {}.", status);
return new TPipeTransferResp(status);
}
} catch (ClientManagerException e) {
LOGGER.error("Unable to get ConfigNode client in handleTransferHandshake!");
throw new PipeException(e.getMessage());
} catch (TException e) {
LOGGER.error("Unable to get clusterId in handleTransferHandshake!");
throw new PipeException(e.getMessage());
}

if (!CommonDescriptor.getInstance()
.getConfig()
.getTimestampPrecision()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.connector;

import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq;
Expand All @@ -44,6 +45,7 @@
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -59,7 +61,8 @@ public class PipeDataNodeThriftRequestTest {
private static final String TIME_PRECISION = "ms";

@Test
public void testPipeValidateHandshakeReq() throws IOException {
public void testPipeValidateHandshakeReq()
throws IOException, TException, ClientManagerException {
PipeTransferHandshakeReq req = PipeTransferHandshakeReq.toTPipeTransferReq(TIME_PRECISION);
PipeTransferHandshakeReq deserializeReq = PipeTransferHandshakeReq.fromTPipeTransferReq(req);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.pipe.connector;

import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
Expand All @@ -29,6 +30,7 @@
import org.apache.iotdb.tsfile.write.record.Tablet;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -55,7 +57,7 @@ public void testIoTDBThriftReceiverV1() {
true),
mock(IPartitionFetcher.class),
mock(ISchemaFetcher.class));
} catch (IOException e) {
} catch (IOException | TException | ClientManagerException e) {
Assert.fail();
}
}
Expand Down