diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java new file mode 100644 index 0000000000000..9987ebe97322d --- /dev/null +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/param/IoTDBEncryptedPasswordPullConsumerIT.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.subscription.it.triple.regression.param; + +import org.apache.iotdb.commons.utils.AuthUtils; +import org.apache.iotdb.it.framework.IoTDBTestRunner; +import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMisc; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; + +import org.apache.thrift.TException; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +@RunWith(IoTDBTestRunner.class) +@Category({MultiClusterIT2SubscriptionRegressionMisc.class}) +public class IoTDBEncryptedPasswordPullConsumerIT extends AbstractSubscriptionRegressionIT { + + private static final String DATABASE = "root.TestEncryptedPasswordPullConsumer"; + private static final String DEVICE = DATABASE + ".d_0"; + private static final String TOPIC_NAME = "TestEncryptedPasswordPullConsumerTopic"; + private static final String USERNAME = "encrypted_user"; + private static final String PASSWORD = "EncryptedUser@123"; + private static final String ENCRYPTED_PASSWORD = AuthUtils.encryptPassword(PASSWORD); + private static final String WRONG_ENCRYPTED_PASSWORD = + AuthUtils.encryptPassword("WrongEncryptedUser@123"); + + private static final List SCHEMA_LIST = new ArrayList<>(); + + static { + SCHEMA_LIST.add(new MeasurementSchema("s_0", TSDataType.INT64)); + SCHEMA_LIST.add(new MeasurementSchema("s_1", TSDataType.DOUBLE)); + } + + private SubscriptionPullConsumer consumer; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + createDB(DATABASE); + createTopic_s(TOPIC_NAME, "root.**", null, null, false); + session_src.createTimeseries( + DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, CompressionType.LZ4); + session_src.createTimeseries( + DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, CompressionType.LZ4); + session_dest.createTimeseries( + DEVICE + ".s_0", TSDataType.INT64, TSEncoding.GORILLA, CompressionType.LZ4); + session_dest.createTimeseries( + DEVICE + ".s_1", TSDataType.DOUBLE, TSEncoding.TS_2DIFF, CompressionType.LZ4); + session_src.executeNonQueryStatement("create user " + USERNAME + " '" + PASSWORD + "'"); + session_src.executeNonQueryStatement("grant read,write on root.** to user " + USERNAME); + Assert.assertTrue(subs.getTopic(TOPIC_NAME).isPresent()); + } + + @Override + @After + public void tearDown() throws Exception { + try { + if (consumer != null) { + consumer.close(); + } + } catch (final Exception ignored) { + } + try { + subs.dropTopic(TOPIC_NAME); + } catch (final Exception ignored) { + } + try { + session_src.executeNonQueryStatement("drop user " + USERNAME); + } catch (final Exception ignored) { + } + dropDB(DATABASE); + super.tearDown(); + } + + @Test + public void testSubscribeWithEncryptedPassword() + throws TException, + IoTDBConnectionException, + IOException, + StatementExecutionException, + InterruptedException { + consumer = createConsumer("encrypted-password-group", ENCRYPTED_PASSWORD); + + consumer.open(); + consumer.subscribe(TOPIC_NAME); + Assert.assertEquals(1, subs.getSubscriptions().size()); + + insertData(1706659200000L); + consume_data(consumer, session_dest); + check_count( + 4, + "select count(s_0) from " + DEVICE + " where time >= 1706659200000", + "encrypted password consumption"); + } + + @Test + public void testSubscribeFailsWithWrongEncryptedPassword() + throws IoTDBConnectionException, StatementExecutionException { + consumer = createConsumer("wrong-encrypted-password-group", WRONG_ENCRYPTED_PASSWORD); + + try { + consumer.open(); + consumer.subscribe(TOPIC_NAME); + Assert.fail("subscribe should fail when encrypted password mismatches"); + } catch (final Exception ignored) { + Assert.assertTrue(subs.getSubscriptions().isEmpty()); + } + } + + private SubscriptionPullConsumer createConsumer( + final String consumerGroupId, final String encryptedPassword) { + return new SubscriptionPullConsumer.Builder() + .host(SRC_HOST) + .port(SRC_PORT) + .username(USERNAME) + .encryptedPassword(encryptedPassword) + .consumerId("consumer_" + consumerGroupId) + .consumerGroupId(consumerGroupId) + .buildPullConsumer(); + } + + private void insertData(long timestamp) + throws IoTDBConnectionException, StatementExecutionException { + final Tablet tablet = new Tablet(DEVICE, SCHEMA_LIST, 10); + for (int row = 0; row < 5; row++) { + final int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + tablet.addValue("s_0", rowIndex, row * 20L + row); + tablet.addValue("s_1", rowIndex, row + 2.45); + timestamp += row * 2000; + } + session_src.insertTablet(tablet); + } +} diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java index 5094ae7eea42e..97c35baf8f4a3 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConfig.java @@ -68,6 +68,18 @@ public String getConsumerGroupId() { return getString(ConsumerConstant.CONSUMER_GROUP_ID_KEY); } + public String getUsername() { + return getString(ConsumerConstant.USERNAME_KEY); + } + + public String getPassword() { + return getString(ConsumerConstant.PASSWORD_KEY); + } + + public String getEncryptedPassword() { + return getString(ConsumerConstant.ENCRYPTED_PASSWORD_KEY); + } + public long getHeartbeatIntervalMs() { return getLongOrDefault( ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java index 504893b80ed55..21b13e5d3f128 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java @@ -31,6 +31,7 @@ public class ConsumerConstant { public static final String USERNAME_KEY = "username"; public static final String PASSWORD_KEY = "password"; + public static final String ENCRYPTED_PASSWORD_KEY = "encrypted-password"; public static final String CONSUMER_ID_KEY = "consumer-id"; public static final String CONSUMER_GROUP_ID_KEY = "group-id"; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java index 92c6b2c9a3e29..56f1c2e9b9b02 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java @@ -100,6 +100,7 @@ abstract class SubscriptionConsumer implements AutoCloseable { private final String username; private final String password; + private final String encryptedPassword; protected String consumerId; protected String consumerGroupId; @@ -177,6 +178,7 @@ protected SubscriptionConsumer(final Builder builder) { this.username = builder.username; this.password = builder.password; + this.encryptedPassword = builder.encryptedPassword; this.consumerId = builder.consumerId; this.consumerGroupId = builder.consumerGroupId; @@ -206,6 +208,7 @@ protected SubscriptionConsumer(final Builder builder, final Properties propertie (String) properties.getOrDefault( ConsumerConstant.PASSWORD_KEY, SessionConfig.DEFAULT_PASSWORD)) + .encryptedPassword((String) properties.get(ConsumerConstant.ENCRYPTED_PASSWORD_KEY)) .consumerId((String) properties.get(ConsumerConstant.CONSUMER_ID_KEY)) .consumerGroupId((String) properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)) .heartbeatIntervalMs( @@ -386,6 +389,7 @@ SubscriptionProvider constructProviderAndHandshake(final TEndPoint endPoint) endPoint, this.username, this.password, + this.encryptedPassword, this.consumerId, this.consumerGroupId, this.thriftMaxFrameSize, @@ -1401,6 +1405,7 @@ public abstract static class Builder { protected String username = SessionConfig.DEFAULT_USER; protected String password = SessionConfig.DEFAULT_PASSWORD; + protected String encryptedPassword; protected String consumerId; protected String consumerGroupId; @@ -1437,10 +1442,27 @@ public Builder username(final String username) { } public Builder password(final String password) { + if (!Objects.equals(password, SessionConfig.DEFAULT_PASSWORD) + && Objects.nonNull(this.encryptedPassword)) { + throw new IllegalStateException( + "password and encryptedPassword are mutually exclusive; encryptedPassword is already set"); + } this.password = password; return this; } + public Builder encryptedPassword(final String encryptedPassword) { + if (Objects.isNull(encryptedPassword)) { + return this; + } + if (!Objects.equals(this.password, SessionConfig.DEFAULT_PASSWORD)) { + throw new IllegalStateException( + "password and encryptedPassword are mutually exclusive; password is already set"); + } + this.encryptedPassword = encryptedPassword; + return this; + } + public Builder consumerId(@Nullable final String consumerId) { if (Objects.isNull(consumerId)) { return this; diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java index 9f153efeb106c..25d9c5da09e69 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java @@ -86,11 +86,15 @@ final class SubscriptionProvider extends SubscriptionSession { private final long heartbeatIntervalMs; private final int connectionTimeoutInMs; private int dataNodeId; + private final String username; + private final String password; + private final String encryptedPassword; SubscriptionProvider( final TEndPoint endPoint, final String username, final String password, + final String encryptedPassword, final String consumerId, final String consumerGroupId, final int thriftMaxFrameSize, @@ -101,6 +105,9 @@ final class SubscriptionProvider extends SubscriptionSession { this.endPoint = endPoint; this.consumerId = consumerId; this.consumerGroupId = consumerGroupId; + this.username = username; + this.password = password; + this.encryptedPassword = encryptedPassword; this.heartbeatIntervalMs = heartbeatIntervalMs; this.connectionTimeoutInMs = connectionTimeoutInMs; } @@ -149,6 +156,11 @@ synchronized void handshake() throws SubscriptionException, IoTDBConnectionExcep final Map consumerAttributes = new HashMap<>(); consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY, consumerGroupId); consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId); + consumerAttributes.put(ConsumerConstant.USERNAME_KEY, username); + consumerAttributes.put(ConsumerConstant.PASSWORD_KEY, password); + if (encryptedPassword != null) { + consumerAttributes.put(ConsumerConstant.ENCRYPTED_PASSWORD_KEY, encryptedPassword); + } consumerAttributes.put( ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY, String.valueOf(heartbeatIntervalMs)); consumerAttributes.put( diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java index a77716fe02cdf..227843811b15b 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java @@ -329,6 +329,12 @@ public Builder password(final String password) { return this; } + @Override + public Builder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public Builder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java index 2a41327809025..b70633bef35a8 100644 --- a/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java +++ b/iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPushConsumer.java @@ -257,6 +257,12 @@ public Builder password(final String password) { return this; } + @Override + public Builder encryptedPassword(final String encryptedPassword) { + super.encryptedPassword(encryptedPassword); + return this; + } + @Override public Builder consumerId(final String consumerId) { super.consumerId(consumerId); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java index 9592692d1bc45..b4f31d84023fa 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/PermissionManager.java @@ -110,6 +110,15 @@ public TPermissionInfoResp login(String username, String password) { return authorInfo.login(username, password); } + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { + return authorInfo.login(username, password, useEncryptedPassword); + } + + public String login4Pipe(final String userName, final String password) { + return authorInfo.login4Pipe(userName, password); + } + public TPermissionInfoResp checkUserPrivileges( String username, List paths, int permission) { return authorInfo.checkUserPrivileges(username, paths, permission); diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java index e9e1058d79ba6..3bd17b5b5b76f 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/AuthorInfo.java @@ -113,6 +113,58 @@ public TPermissionInfoResp login(String username, String password) { return result; } + public TPermissionInfoResp login( + final String username, final String password, final boolean useEncryptedPassword) { + if (!useEncryptedPassword) { + return login(username, password); + } + + boolean status = false; + String loginMessage = null; + TSStatus tsStatus = new TSStatus(); + TPermissionInfoResp result = new TPermissionInfoResp(); + try { + final User user = authorizer.getUser(username); + status = user != null && password != null && password.equals(user.getPassword()); + if (status) { + result = getUserPermissionInfo(username); + result.setStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Login successfully")); + } else { + result = AuthUtils.generateEmptyPermissionInfoResp(); + } + } catch (AuthException e) { + LOGGER.error("meet error while logging in.", e); + loginMessage = e.getMessage(); + } + if (!status) { + tsStatus.setMessage(loginMessage != null ? loginMessage : "Authentication failed."); + tsStatus.setCode(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode()); + result.setStatus(tsStatus); + } + return result; + } + + public String login4Pipe(final String username, final String password) { + try { + final User user = authorizer.getUser(username); + if (user == null) { + return null; + } + if (password == null) { + return user.getPassword(); + } + final TPermissionInfoResp loginResp = login(username, password); + if (loginResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode() + && loginResp.isSetUserInfo()) { + return loginResp.getUserInfo().getPassword(); + } + return null; + } catch (final AuthException e) { + LOGGER.error("meet error while logging in for pipe.", e); + return null; + } + } + // if All paths fail, return No permission // if some paths fail, return SUCCESS and failed index list // if all path success, return success and empty index list diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java index 26e9eb9bba145..ad18cc95bb050 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java @@ -25,11 +25,13 @@ import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex; import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeType; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.commons.schema.SchemaConstant; import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import org.apache.iotdb.confignode.consensus.request.write.pipe.task.CreatePipePlanV2; @@ -41,8 +43,10 @@ import org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation; import org.apache.iotdb.confignode.procedure.store.ProcedureType; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; import org.apache.iotdb.consensus.exception.ConsensusException; import org.apache.iotdb.pipe.api.PipePlugin; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.iotdb.rpc.TSStatusCode; @@ -53,6 +57,8 @@ import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -136,9 +142,70 @@ public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws PipeEx createPipeRequest.getProcessorAttributes(), createPipeRequest.getConnectorAttributes()); + checkAndEnrichSourceAuthentication(env, createPipeRequest.getExtractorAttributes()); + return pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest); } + public static void checkAndEnrichSourceAuthentication( + final ConfigNodeProcedureEnv env, final Map sourceAttributes) { + if (Objects.isNull(sourceAttributes)) { + return; + } + final PipeParameters sourceParameters = new PipeParameters(sourceAttributes); + + final String pluginName = + sourceParameters + .getStringOrDefault( + Arrays.asList(PipeSourceConstant.EXTRACTOR_KEY, PipeSourceConstant.SOURCE_KEY), + BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + .toLowerCase(); + + if (!pluginName.equals(BuiltinPipePlugin.IOTDB_EXTRACTOR.getPipePluginName()) + && !pluginName.equals(BuiltinPipePlugin.IOTDB_SOURCE.getPipePluginName())) { + return; + } + + if (sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USER_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY) + || sourceParameters.hasAttribute(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)) { + final String username = + sourceParameters.getStringByKeys( + PipeSourceConstant.EXTRACTOR_IOTDB_USER_KEY, + PipeSourceConstant.SOURCE_IOTDB_USER_KEY, + PipeSourceConstant.EXTRACTOR_IOTDB_USERNAME_KEY, + PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY); + final String password = + sourceParameters.getStringByKeys( + PipeSourceConstant.EXTRACTOR_IOTDB_PASSWORD_KEY, + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY); + String hashedPassword = null; + if (Objects.nonNull(password)) { + final TPermissionInfoResp loginResp = + env.getConfigManager().getPermissionManager().login(username, password, true); + if (Objects.nonNull(loginResp) + && Objects.nonNull(loginResp.getStatus()) + && loginResp.getStatus().getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + hashedPassword = password; + } + } + if (Objects.isNull(hashedPassword)) { + hashedPassword = + env.getConfigManager().getPermissionManager().login4Pipe(username, password); + } + if (Objects.isNull(hashedPassword)) { + throw new PipeException("Authentication failed."); + } + sourceParameters.addOrReplaceEquivalentAttributes( + new PipeParameters( + Collections.singletonMap( + PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, hashedPassword))); + } + } + @Override public void executeFromCalculateInfoForTask(ConfigNodeProcedureEnv env) { LOGGER.info( diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java index 4a48ebdd35d6a..80e4e5115541a 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java @@ -22,6 +22,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta; +import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta; import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan; @@ -89,7 +90,11 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) subscriptionInfo.get().validateBeforeSubscribe(subscribeReq); // Construct AlterConsumerGroupProcedure + final String consumerId = subscribeReq.getConsumerId(); final String consumerGroupId = subscribeReq.getConsumerGroupId(); + final ConsumerGroupMeta consumerGroupMeta = + subscriptionInfo.get().getConsumerGroupMeta(consumerGroupId); + final ConsumerMeta consumerMeta = consumerGroupMeta.getConsumerMeta(consumerId); final ConsumerGroupMeta updatedConsumerGroupMeta = subscriptionInfo.get().deepCopyConsumerGroupMeta(consumerGroupId); updatedConsumerGroupMeta.addSubscription( @@ -110,7 +115,9 @@ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env) new CreatePipeProcedureV2( new TCreatePipeReq() .setPipeName(pipeName) - .setExtractorAttributes(topicMeta.generateExtractorAttributes()) + .setExtractorAttributes( + topicMeta.generateExtractorAttributes( + consumerMeta.getUsername(), consumerMeta.getSubscriptionAuthPassword())) .setProcessorAttributes(topicMeta.generateProcessorAttributes()) .setConnectorAttributes(topicMeta.generateConnectorAttributes(consumerGroupId)), pipeTaskInfo)); diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java index 6c095de98529a..ac6ca8c740c36 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2Test.java @@ -19,11 +19,19 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task; +import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; +import org.apache.iotdb.confignode.manager.ConfigManager; +import org.apache.iotdb.confignode.manager.PermissionManager; +import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.store.ProcedureFactory; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp; +import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.utils.PublicBAOS; import org.junit.Test; +import org.mockito.Mockito; import java.io.DataOutputStream; import java.nio.ByteBuffer; @@ -68,4 +76,57 @@ public void serializeDeserializeTest() { fail(); } } + + @Test + public void testCheckAndEnrichSourceAuthenticationWithEncryptedPassword() { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final PermissionManager permissionManager = Mockito.mock(PermissionManager.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager); + + final TPermissionInfoResp loginResp = new TPermissionInfoResp(); + loginResp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); + Mockito.when(permissionManager.login("user", "encrypted-password", true)).thenReturn(loginResp); + + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put("extractor", "iotdb-source"); + sourceAttributes.put("username", "user"); + sourceAttributes.put("password", "encrypted-password"); + + CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, sourceAttributes); + + assertEquals( + "encrypted-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + Mockito.verify(permissionManager).login("user", "encrypted-password", true); + Mockito.verify(permissionManager, Mockito.never()) + .login4Pipe(Mockito.anyString(), Mockito.any()); + } + + @Test + public void testCheckAndEnrichSourceAuthenticationFallsBackToRawPassword() { + final ConfigNodeProcedureEnv env = Mockito.mock(ConfigNodeProcedureEnv.class); + final ConfigManager configManager = Mockito.mock(ConfigManager.class); + final PermissionManager permissionManager = Mockito.mock(PermissionManager.class); + Mockito.when(env.getConfigManager()).thenReturn(configManager); + Mockito.when(configManager.getPermissionManager()).thenReturn(permissionManager); + + final TPermissionInfoResp loginResp = new TPermissionInfoResp(); + loginResp.setStatus(new TSStatus(TSStatusCode.WRONG_LOGIN_PASSWORD.getStatusCode())); + Mockito.when(permissionManager.login("user", "raw-password", true)).thenReturn(loginResp); + Mockito.when(permissionManager.login4Pipe("user", "raw-password")) + .thenReturn("hashed-password"); + + final Map sourceAttributes = new HashMap<>(); + sourceAttributes.put(PipeSourceConstant.SOURCE_KEY, "iotdb-source"); + sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_USERNAME_KEY, "user"); + sourceAttributes.put(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY, "raw-password"); + + CreatePipeProcedureV2.checkAndEnrichSourceAuthentication(env, sourceAttributes); + + assertEquals( + "hashed-password", sourceAttributes.get(PipeSourceConstant.SOURCE_IOTDB_PASSWORD_KEY)); + Mockito.verify(permissionManager).login("user", "raw-password", true); + Mockito.verify(permissionManager).login4Pipe("user", "raw-password"); + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java index e9adf64bc03d1..aba780fb0d1af 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/PipeSourceConstant.java @@ -24,6 +24,13 @@ public class PipeSourceConstant { public static final String EXTRACTOR_KEY = "extractor"; public static final String SOURCE_KEY = "source"; + public static final String EXTRACTOR_IOTDB_USER_KEY = "extractor.user"; + public static final String SOURCE_IOTDB_USER_KEY = "source.user"; + public static final String EXTRACTOR_IOTDB_USERNAME_KEY = "extractor.username"; + public static final String SOURCE_IOTDB_USERNAME_KEY = "source.username"; + public static final String EXTRACTOR_IOTDB_PASSWORD_KEY = "extractor.password"; + public static final String SOURCE_IOTDB_PASSWORD_KEY = "source.password"; + public static final String EXTRACTOR_INCLUSION_KEY = "extractor.inclusion"; public static final String SOURCE_INCLUSION_KEY = "source.inclusion"; public static final String EXTRACTOR_INCLUSION_DEFAULT_VALUE = "data.insert"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java index b316c5f155de6..498f3427690f5 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java @@ -136,6 +136,10 @@ public boolean containsConsumer(final String consumerId) { return consumerIdToConsumerMeta.containsKey(consumerId); } + public ConsumerMeta getConsumerMeta(final String consumerId) { + return consumerIdToConsumerMeta.get(consumerId); + } + public boolean isEmpty() { // When there are no consumers in a consumer group, it means that the ConsumerGroupMeta is // empty, and at this time, the topicNameToSubscribedConsumerIdSet is also empty. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java index f1bb9b4608531..75cca9ccbcdd9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerMeta.java @@ -54,6 +54,26 @@ public String getConsumerId() { return consumerId; } + public String getConsumerGroupId() { + return config.getConsumerGroupId(); + } + + public String getUsername() { + return config.getUsername(); + } + + public String getPassword() { + return config.getPassword(); + } + + public String getEncryptedPassword() { + return config.getEncryptedPassword(); + } + + public String getSubscriptionAuthPassword() { + return Objects.nonNull(getEncryptedPassword()) ? getEncryptedPassword() : getPassword(); + } + public ByteBuffer serialize() throws IOException { PublicBAOS byteArrayOutputStream = new PublicBAOS(); DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java index 93cd87e4ae386..ee9177112e710 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java @@ -181,11 +181,22 @@ public static TopicMeta deserialize(final ByteBuffer byteBuffer) { /////////////////////////////// utilities /////////////////////////////// public Map generateExtractorAttributes() { + return generateExtractorAttributes(null, null); + } + + public Map generateExtractorAttributes( + final String username, final String password) { final Map extractorAttributes = new HashMap<>(); // disable meta sync extractorAttributes.put("source", "iotdb-source"); extractorAttributes.put("inclusion", "data.insert"); extractorAttributes.put("inclusion.exclusion", "data.delete"); + if (Objects.nonNull(username)) { + extractorAttributes.put("username", username); + } + if (Objects.nonNull(password)) { + extractorAttributes.put("password", password); + } // path extractorAttributes.putAll(config.getAttributesWithPathOrPattern()); // time diff --git a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java index 4973f06d27cf2..d9c280e14938c 100644 --- a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java +++ b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/topic/TopicDeSerTest.java @@ -54,4 +54,15 @@ public void test() throws IOException { Assert.assertEquals( topicMeta.getSubscribedConsumerGroupIds(), topicMeta2.getSubscribedConsumerGroupIds()); } + + @Test + public void testGenerateExtractorAttributesWithEncryptedPassword() { + final TopicMeta topicMeta = new TopicMeta("test_topic", 1, new HashMap<>()); + + final Map extractorAttributes = + topicMeta.generateExtractorAttributes("test_user", "encrypted-password"); + + Assert.assertEquals("test_user", extractorAttributes.get("username")); + Assert.assertEquals("encrypted-password", extractorAttributes.get("password")); + } }