Skip to content
Permalink
Browse files
Validating users before processing in orchestrator
  • Loading branch information
DImuthuUpe committed Nov 9, 2021
1 parent c665a66 commit 063c10bb74820a9f91832c29a7093d7a6c586359
Showing 8 changed files with 194 additions and 6 deletions.
@@ -22,4 +22,9 @@ consumer:
tenantConfigs:
tenantId: "{{ datalake_synch_tenant_id }}"
userGroup: "{{ datalake_data_orch_user_group }}"
adminGroup: "{{ datalake_data_orch_admin_group }}"
adminGroup: "{{ datalake_data_orch_admin_group }}"
custosConfigs:
serverHost: "{{ datalake_drms_custos_host }}"
serverPort: {{ datalake_drms_custos_port }}
clientId: "{{ datalake_drms_custos_client_id }}"
clientSec: "{{ datalake_drms_custos_client_secret }}"
@@ -20,4 +20,9 @@ consumer:
tenantConfigs:
tenantId: "custos-ii8g0cfwsz6ruwezykn9-10002640"
userGroup: "emc-users_e8a37f6d-e3b3-4e4a-9081-265bb42b1b99"
adminGroup: "admin-group_c1aac070-1512-4e98-b1d1-e06a94f03665"
adminGroup: "admin-group_c1aac070-1512-4e98-b1d1-e06a94f03665"
custosConfigs:
serverHost: "custos.scigap.org"
serverPort: 31499
clientId: "custos-ii8g0cfwsz6ruwezykn9-10002640"
clientSec: "secret"
@@ -22,6 +22,8 @@ public void setMessageFilter(FilterConfig messageFilter) {

public TenantConfigs tenantConfigs;

private CustosConfigs custosConfigs;

public Configuration() {

}
@@ -70,6 +72,14 @@ public void setTenantConfigs(TenantConfigs tenantConfigs) {
this.tenantConfigs = tenantConfigs;
}

public CustosConfigs getCustosConfigs() {
return custosConfigs;
}

public void setCustosConfigs(CustosConfigs custosConfigs) {
this.custosConfigs = custosConfigs;
}

public static class Consumer {

private String brokerURL;
@@ -275,5 +285,44 @@ public void setAdminGroup(String adminGroup) {
}
}

public static class CustosConfigs {

private String serverHost;
private int serverPort;
private String clientId;
private String clientSec;

public String getServerHost() {
return serverHost;
}

public void setServerHost(String serverHost) {
this.serverHost = serverHost;
}

public int getServerPort() {
return serverPort;
}

public void setServerPort(int serverPort) {
this.serverPort = serverPort;
}

public String getClientId() {
return clientId;
}

public void setClientId(String clientId) {
this.clientId = clientId;
}

public String getClientSec() {
return clientSec;
}

public void setClientSec(String clientSec) {
this.clientSec = clientSec;
}
}

}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.connectors;

import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.core.connector.AbstractConnector;
import org.apache.custos.clients.CustosClientProvider;
import org.apache.custos.iam.service.FindUsersResponse;
import org.apache.custos.iam.service.UserRepresentation;
import org.apache.custos.user.management.client.UserManagementClient;

import java.util.Optional;

public class CustosConnector implements AbstractConnector<Configuration> {

private UserManagementClient umClient = null;

public CustosConnector(Configuration configuration) throws Exception {
this.init(configuration);
}

@Override
public void init(Configuration configuration) throws Exception {
CustosClientProvider clientProvider = new CustosClientProvider.Builder()
.setServerHost(configuration.getCustosConfigs().getServerHost())
.setServerPort(configuration.getCustosConfigs().getServerPort())
.setClientId(configuration.getCustosConfigs().getClientId())
.setClientSec(configuration.getCustosConfigs().getClientSec()).build();

this.umClient = clientProvider.getUserManagementClient();
}

@Override
public void close() throws Exception {
if (isOpen()) {
umClient.close();
}
}

public Optional<UserRepresentation> findUserByUserName(String userName) {
FindUsersResponse userResp = umClient.findUser(userName, "", "", "", 0, 1);
if (userResp.getUsersCount() == 0) {
return Optional.empty();
} else {
return Optional.of(userResp.getUsers(0));
}
}

public Optional<UserRepresentation> findUserByEmail(String email) {
FindUsersResponse userResp = umClient.findUser("", "", "", email, 0, 1);
if (userResp.getUsersCount() == 0) {
return Optional.empty();
} else {
return Optional.of(userResp.getUsers(0));
}
}

@Override
public boolean isOpen() {
return umClient != null;
}
}
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.airavata.datalake.orchestrator.connectors;

import com.google.protobuf.Struct;
@@ -1,5 +1,22 @@
package org.apache.airavata.datalake.orchestrator.connectors;

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.airavata.datalake.orchestrator.Configuration;
@@ -25,6 +25,7 @@
import org.apache.airavata.datalake.drms.storage.TransferMapping;
import org.apache.airavata.datalake.orchestrator.Configuration;
import org.apache.airavata.datalake.orchestrator.Utils;
import org.apache.airavata.datalake.orchestrator.connectors.CustosConnector;
import org.apache.airavata.datalake.orchestrator.connectors.DRMSConnector;
import org.apache.airavata.datalake.orchestrator.connectors.WorkflowServiceConnector;
import org.apache.airavata.dataorchestrator.clients.core.NotificationClient;
@@ -35,6 +36,7 @@
import org.apache.airavata.mft.api.service.MFTApiServiceGrpc;
import org.apache.airavata.mft.common.AuthToken;
import org.apache.airavata.mft.common.DelegateAuth;
import org.apache.custos.iam.service.UserRepresentation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@@ -51,6 +53,7 @@ public class OrchestratorEventProcessor implements Runnable {
private final WorkflowServiceConnector workflowServiceConnector;
private final Set<String> eventCache;
private final NotificationClient notificationClient;
private final CustosConnector custosConnector;

public OrchestratorEventProcessor(Configuration configuration, Notification notificationEvent,
Set<String> eventCache, NotificationClient notificationClient) throws Exception {
@@ -60,6 +63,7 @@ public OrchestratorEventProcessor(Configuration configuration, Notification noti
this.workflowServiceConnector = new WorkflowServiceConnector(configuration);
this.configuration = configuration;
this.notificationClient = notificationClient;
this.custosConnector = new CustosConnector(configuration);
}

private List<GenericResource> createResourceWithParentDirectories(String hostName, String storageId, String basePath,
@@ -156,6 +160,20 @@ private void shareResourcesWithGroups(List<GenericResource> resourceList, String
}
}

private String verifyUser(String userName) throws Exception {
if (custosConnector.findUserByUserName(userName).isEmpty()) {
Optional<UserRepresentation> userByEmail = custosConnector.findUserByEmail(userName);
if (userByEmail.isPresent()) {
return userByEmail.get().getUsername();
} else {
logger.error("No user {} by email or user name", userName);
throw new Exception("Could not find the user " + userName);
}
} else {
return userName;
}
}

@Override
public void run() {
logger.info("Processing resource path {} on storage {}", notification.getResourcePath(),
@@ -188,8 +206,8 @@ public void run() {
throw new Exception("Invalid path. Need at least two folder levels from base");
}

String adminUser = splitted[0];
String owner = splitted[1].split("_")[0];
String adminUser = verifyUser(splitted[0]);
String owner = verifyUser(splitted[1].split("_")[0]);

Map<String, String> ownerRules = new HashMap<>();
ownerRules.put(adminUser, "VIEWER");
@@ -59,8 +59,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>11</source>
<target>11</target>
<fork>true</fork>
</configuration>
</plugin>

0 comments on commit 063c10b

Please sign in to comment.