Skip to content

Commit

Permalink
[ZEPPELIN-1690] - ZeppelinHubNotebookRepo multy user handling
Browse files Browse the repository at this point in the history
### What is this PR for?
 This PR bring multi user handling to ZeppelinHubNotebookRepo.

### What type of PR is it?
[Improvement ]

### What is the Jira issue?
 * [ZEPPELIN-1690](https://issues.apache.org/jira/browse/ZEPPELIN-1690)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Anthony Corbacho <corbacho.anthony@gmail.com>

Closes #1635 from anthonycorbacho/feat/ZeppelinHubRepoMultiUser and squashes the following commits:

d9e0036 [Anthony Corbacho] Move code location
d8989aa [Anthony Corbacho] Handle invalid subject
aef6e00 [Anthony Corbacho] Fix check style
edb9e8c [Anthony Corbacho] Desactivate ws :: we will need to refactor this part
e884203 [Anthony Corbacho] Refactor :: remove 'async' on every http call
dbb8ebd [Anthony Corbacho] Fix test
25f6215 [Anthony Corbacho] pass user token to zeppelinhub rest api handler
674fb93 [Anthony Corbacho] Refactor ZeppelinHub rest API handler  - Now takes a token on every http request
3fbfcfa [Anthony Corbacho] Add new login on how user can get his token at runtime
a8aeb51 [Anthony Corbacho] add comment in zeppelinhubRealm about saving user session in a singleton map
5931ab6 [Anthony Corbacho] Fix check style
67051a0 [Anthony Corbacho] Add ZeppelinHub instance model
e3e5a15 [Anthony Corbacho] Add userTiket in AuthenticationInfo on OnMessage method in notebookServer
7a0c959 [Anthony Corbacho] Add zeppelinhub user session to userSession container after login throght zeppelinhubRealm
0729f51 [Anthony Corbacho] Add zeppelinhub session container
  • Loading branch information
anthonycorbacho committed Nov 24, 2016
1 parent 3389e8c commit 33e2dab
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.server.ZeppelinServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
private static final String USER_LOGIN_API_ENDPOINT = "api/v1/users/login";
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String UTF_8_ENCODING = "UTF-8";
private static final String USER_SESSION_HEADER = "X-session";
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();

private final HttpClient httpClient;
Expand Down Expand Up @@ -126,6 +128,7 @@ public void setZeppelinhubUrl(String url) {
protected User authenticateUser(String requestBody) {
PutMethod put = new PutMethod(Joiner.on("/").join(zeppelinhubUrl, USER_LOGIN_API_ENDPOINT));
String responseBody = StringUtils.EMPTY;
String userSession = StringUtils.EMPTY;
try {
put.setRequestEntity(new StringRequestEntity(requestBody, JSON_CONTENT_TYPE, UTF_8_ENCODING));
int statusCode = httpClient.executeMethod(put);
Expand All @@ -136,6 +139,7 @@ protected User authenticateUser(String requestBody) {
+ "Login or password incorrect");
}
responseBody = put.getResponseBodyAsString();
userSession = put.getResponseHeader(USER_SESSION_HEADER).getValue();
put.releaseConnection();

} catch (IOException e) {
Expand All @@ -150,13 +154,16 @@ protected User authenticateUser(String requestBody) {
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
throw new AuthenticationException("Cannot login to ZeppelinHub");
}


// Add ZeppelinHub user_session token this singleton map, this will help ZeppelinHubRepo
// to get specific information about the current user.
UserSessionContainer.instance.setSession(account.login, userSession);

/* TODO(khalid): add proper roles and add listener */
HashSet<String> userAndRoles = new HashSet<String>();
userAndRoles.add(account.login);
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);

return account;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ public void onMessage(NotebookSocket conn, String msg) {
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
AuthenticationInfo subject =
new AuthenticationInfo(messagereceived.principal, messagereceived.ticket);

/** Lets be elegant here */
switch (messagereceived.op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,20 +54,27 @@ public class ZeppelinHubRepo implements NotebookRepo {
public static final String TOKEN_HEADER = "X-Zeppelin-Token";
private static final Gson GSON = new Gson();
private static final Note EMPTY_NOTE = new Note();
private final Client websocketClient;
//private final Client websocketClient;

private String token;
private ZeppelinhubRestApiHandler restApiClient;

private final ZeppelinConfiguration conf;

// In order to avoid too many call to ZeppelinHub backend, we save a map of user -> session.
private ConcurrentMap<String, String> usersToken = new ConcurrentHashMap<String, String>();

public ZeppelinHubRepo(ZeppelinConfiguration conf) {
this.conf = conf;
String zeppelinHubUrl = getZeppelinHubUrl(conf);
LOG.info("Initializing ZeppelinHub integration module");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);

websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
getZeppelinhubWebsocketUri(conf), token, conf);
websocketClient.start();
// TODO(xxx): refactor this in the next itaration
//websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
// getZeppelinhubWebsocketUri(conf), token, conf);
//websocketClient.start();
}

private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
Expand Down Expand Up @@ -144,10 +154,56 @@ String getZeppelinHubUrl(ZeppelinConfiguration conf) {
}
return zeppelinhubUrl;
}

/**
* Get Token directly from Zeppelinhub.
* This will avoid and remove the needs of setting up token in zeppelin-env.sh.
*/
private String getUserZeppelinInstanceToken(String ticket) throws IOException {
if (StringUtils.isBlank(ticket)) {
return "";
}

List<Instance> instances = restApiClient.getInstances(ticket);
// TODO(anthony): Implement NotebookRepo Setting to let user switch token at runtime.

token = instances.isEmpty() ? StringUtils.EMPTY : instances.get(0).token;
return token;
}

/**
* For a given user logged in is zeppelin (via zeppelinhub notebook repo), get default token.
* */
private String getUserToken(String principal) {
String token = usersToken.get(principal);
if (StringUtils.isBlank(token)) {
String ticket = UserSessionContainer.instance.getSession(principal);
try {
token = getUserZeppelinInstanceToken(ticket);
usersToken.putIfAbsent(principal, token);
} catch (IOException e) {
LOG.error("Cannot get user token", e);
token = StringUtils.EMPTY;
}
}

return token;
}

private boolean isSubjectValid(AuthenticationInfo subject) {
if (subject == null) {
return false;
}
return (subject.isAnonymous() && !conf.isAnonymousAllowed()) ? false : true;
}

@Override
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
String response = restApiClient.asyncGet("");
if (!isSubjectValid(subject)) {
return Collections.emptyList();
}
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, StringUtils.EMPTY);
List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
if (notes == null) {
return Collections.emptyList();
Expand All @@ -158,11 +214,11 @@ public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {

@Override
public Note get(String noteId, AuthenticationInfo subject) throws IOException {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
//String response = zeppelinhubHandler.get(noteId);
String response = restApiClient.asyncGet(noteId);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, noteId);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
Expand All @@ -173,45 +229,55 @@ public Note get(String noteId, AuthenticationInfo subject) throws IOException {

@Override
public void save(Note note, AuthenticationInfo subject) throws IOException {
if (note == null) {
throw new IOException("Zeppelinhub failed to save empty note");
if (note == null || !isSubjectValid(subject)) {
throw new IOException("Zeppelinhub failed to save note");
}
String notebook = GSON.toJson(note);
restApiClient.asyncPut(notebook);
LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
String jsonNote = GSON.toJson(note);
String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
restApiClient.put(token, jsonNote);
}

@Override
public void remove(String noteId, AuthenticationInfo subject) throws IOException {
restApiClient.asyncDel(noteId);
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
throw new IOException("Zeppelinhub failed to remove note");
}
String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API removing note {} ", noteId);
restApiClient.del(token, noteId);
}

@Override
public void close() {
websocketClient.stop();
//websocketClient.stop();
}

@Override
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
throws IOException {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return null;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
String response = restApiClient.asyncPutWithResponseBody(endpoint, content);

String token = getUserToken(subject.getUser());
String response = restApiClient.putWithResponseBody(token, endpoint, content);

return GSON.fromJson(response, Revision.class);
}

@Override
public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
String response = restApiClient.asyncGet(endpoint);

String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, endpoint);

Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
Expand All @@ -222,13 +288,14 @@ public Note get(String noteId, String revId, AuthenticationInfo subject) throws

@Override
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return Collections.emptyList();
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
List<Revision> history = Collections.emptyList();
try {
String response = restApiClient.asyncGet(endpoint);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, endpoint);
history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
} catch (IOException e) {
LOG.error("Cannot get note history", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.model;

/**
* ZeppelinHub Instance structure.
*
*/
public class Instance {
public int id;
public String name;
public String token;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.model;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;

/**
* Simple and yet dummy container for zeppelinhub session.
*
*/
public class UserSessionContainer {
private static class Entity {
public final String userSession;

Entity(String userSession) {
this.userSession = userSession;
}
}

private Map<String, Entity> sessions = new ConcurrentHashMap<>();

public static final UserSessionContainer instance = new UserSessionContainer();

public synchronized String getSession(String principal) {
Entity entry = sessions.get(principal);
if (entry == null) {
return StringUtils.EMPTY;
}
return entry.userSession;
}

public synchronized String setSession(String principal, String userSession) {
Entity entry = new Entity(userSession);
sessions.put(principal, entry);
return entry.userSession;
}
}
Loading

0 comments on commit 33e2dab

Please sign in to comment.