Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEPPELIN-1690] - ZeppelinHubNotebookRepo multy user handling #1635

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.shiro.authz.AuthorizationInfo;
import org.apache.shiro.realm.AuthorizingRealm;
import org.apache.shiro.subject.PrincipalCollection;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.server.ZeppelinServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@ public class ZeppelinHubRealm extends AuthorizingRealm {
private static final String USER_LOGIN_API_ENDPOINT = "api/v1/users/login";
private static final String JSON_CONTENT_TYPE = "application/json";
private static final String UTF_8_ENCODING = "UTF-8";
private static final String USER_SESSION_HEADER = "X-session";
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger();

private final HttpClient httpClient;
Expand Down Expand Up @@ -126,6 +128,7 @@ public void setZeppelinhubUrl(String url) {
protected User authenticateUser(String requestBody) {
PutMethod put = new PutMethod(Joiner.on("/").join(zeppelinhubUrl, USER_LOGIN_API_ENDPOINT));
String responseBody = StringUtils.EMPTY;
String userSession = StringUtils.EMPTY;
try {
put.setRequestEntity(new StringRequestEntity(requestBody, JSON_CONTENT_TYPE, UTF_8_ENCODING));
int statusCode = httpClient.executeMethod(put);
Expand All @@ -136,6 +139,7 @@ protected User authenticateUser(String requestBody) {
+ "Login or password incorrect");
}
responseBody = put.getResponseBodyAsString();
userSession = put.getResponseHeader(USER_SESSION_HEADER).getValue();
put.releaseConnection();

} catch (IOException e) {
Expand All @@ -150,13 +154,16 @@ protected User authenticateUser(String requestBody) {
LOG.error("Cannot deserialize ZeppelinHub response to User instance", e);
throw new AuthenticationException("Cannot login to ZeppelinHub");
}


// Add ZeppelinHub user_session token this singleton map, this will help ZeppelinHubRepo
// to get specific information about the current user.
UserSessionContainer.instance.setSession(account.login, userSession);

/* TODO(khalid): add proper roles and add listener */
HashSet<String> userAndRoles = new HashSet<String>();
userAndRoles.add(account.login);
ZeppelinServer.notebookWsServer.broadcastReloadedNoteList(
new org.apache.zeppelin.user.AuthenticationInfo(account.login), userAndRoles);

return account;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,13 @@
import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.notebook.*;
import org.apache.zeppelin.notebook.JobListenerFactory;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.Notebook;
import org.apache.zeppelin.notebook.NotebookAuthorization;
import org.apache.zeppelin.notebook.NotebookEventListener;
import org.apache.zeppelin.notebook.Paragraph;
import org.apache.zeppelin.notebook.ParagraphJobListener;
import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision;
import org.apache.zeppelin.notebook.socket.Message;
import org.apache.zeppelin.notebook.socket.Message.OP;
Expand Down Expand Up @@ -173,7 +179,8 @@ public void onMessage(NotebookSocket conn, String msg) {
if (StringUtils.isEmpty(conn.getUser())) {
addUserConnection(messagereceived.principal, conn);
}
AuthenticationInfo subject = new AuthenticationInfo(messagereceived.principal);
AuthenticationInfo subject =
new AuthenticationInfo(messagereceived.principal, messagereceived.ticket);

/** Lets be elegant here */
switch (messagereceived.op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.notebook.Note;
import org.apache.zeppelin.notebook.NoteInfo;
import org.apache.zeppelin.notebook.repo.NotebookRepo;
import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance;
import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer;
import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler;
import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client;
import org.apache.zeppelin.user.AuthenticationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -51,20 +54,27 @@ public class ZeppelinHubRepo implements NotebookRepo {
public static final String TOKEN_HEADER = "X-Zeppelin-Token";
private static final Gson GSON = new Gson();
private static final Note EMPTY_NOTE = new Note();
private final Client websocketClient;
//private final Client websocketClient;

private String token;
private ZeppelinhubRestApiHandler restApiClient;

private final ZeppelinConfiguration conf;

// In order to avoid too many call to ZeppelinHub backend, we save a map of user -> session.
private ConcurrentMap<String, String> usersToken = new ConcurrentHashMap<String, String>();

public ZeppelinHubRepo(ZeppelinConfiguration conf) {
this.conf = conf;
String zeppelinHubUrl = getZeppelinHubUrl(conf);
LOG.info("Initializing ZeppelinHub integration module");
token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, "");
restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl, token);

websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
getZeppelinhubWebsocketUri(conf), token, conf);
websocketClient.start();
// TODO(xxx): refactor this in the next itaration
//websocketClient = Client.initialize(getZeppelinWebsocketUri(conf),
// getZeppelinhubWebsocketUri(conf), token, conf);
//websocketClient.start();
}

private String getZeppelinHubWsUri(URI api) throws URISyntaxException {
Expand Down Expand Up @@ -144,10 +154,56 @@ String getZeppelinHubUrl(ZeppelinConfiguration conf) {
}
return zeppelinhubUrl;
}

/**
* Get Token directly from Zeppelinhub.
* This will avoid and remove the needs of setting up token in zeppelin-env.sh.
*/
private String getUserZeppelinInstanceToken(String ticket) throws IOException {
if (StringUtils.isBlank(ticket)) {
return "";
}

List<Instance> instances = restApiClient.getInstances(ticket);
// TODO(anthony): Implement NotebookRepo Setting to let user switch token at runtime.

token = instances.isEmpty() ? StringUtils.EMPTY : instances.get(0).token;
return token;
}

/**
* For a given user logged in is zeppelin (via zeppelinhub notebook repo), get default token.
* */
private String getUserToken(String principal) {
String token = usersToken.get(principal);
if (StringUtils.isBlank(token)) {
String ticket = UserSessionContainer.instance.getSession(principal);
try {
token = getUserZeppelinInstanceToken(ticket);
usersToken.putIfAbsent(principal, token);
} catch (IOException e) {
LOG.error("Cannot get user token", e);
token = StringUtils.EMPTY;
}
}

return token;
}

private boolean isSubjectValid(AuthenticationInfo subject) {
if (subject == null) {
return false;
}
return (subject.isAnonymous() && !conf.isAnonymousAllowed()) ? false : true;
}

@Override
public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {
String response = restApiClient.asyncGet("");
if (!isSubjectValid(subject)) {
return Collections.emptyList();
}
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, StringUtils.EMPTY);
List<NoteInfo> notes = GSON.fromJson(response, new TypeToken<List<NoteInfo>>() {}.getType());
if (notes == null) {
return Collections.emptyList();
Expand All @@ -158,11 +214,11 @@ public List<NoteInfo> list(AuthenticationInfo subject) throws IOException {

@Override
public Note get(String noteId, AuthenticationInfo subject) throws IOException {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
//String response = zeppelinhubHandler.get(noteId);
String response = restApiClient.asyncGet(noteId);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, noteId);
Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
Expand All @@ -173,45 +229,55 @@ public Note get(String noteId, AuthenticationInfo subject) throws IOException {

@Override
public void save(Note note, AuthenticationInfo subject) throws IOException {
if (note == null) {
throw new IOException("Zeppelinhub failed to save empty note");
if (note == null || !isSubjectValid(subject)) {
throw new IOException("Zeppelinhub failed to save note");
}
String notebook = GSON.toJson(note);
restApiClient.asyncPut(notebook);
LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
String jsonNote = GSON.toJson(note);
String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API saving note {} ", note.getId());
restApiClient.put(token, jsonNote);
}

@Override
public void remove(String noteId, AuthenticationInfo subject) throws IOException {
restApiClient.asyncDel(noteId);
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
throw new IOException("Zeppelinhub failed to remove note");
}
String token = getUserToken(subject.getUser());
LOG.info("ZeppelinHub REST API removing note {} ", noteId);
restApiClient.del(token, noteId);
}

@Override
public void close() {
websocketClient.stop();
//websocketClient.stop();
}

@Override
public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject)
throws IOException {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return null;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg));
String response = restApiClient.asyncPutWithResponseBody(endpoint, content);

String token = getUserToken(subject.getUser());
String response = restApiClient.putWithResponseBody(token, endpoint, content);

return GSON.fromJson(response, Revision.class);
}

@Override
public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException {
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId)) {
if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId) || !isSubjectValid(subject)) {
return EMPTY_NOTE;
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId);
String response = restApiClient.asyncGet(endpoint);

String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, endpoint);

Note note = GSON.fromJson(response, Note.class);
if (note == null) {
return EMPTY_NOTE;
Expand All @@ -222,13 +288,14 @@ public Note get(String noteId, String revId, AuthenticationInfo subject) throws

@Override
public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) {
if (StringUtils.isBlank(noteId)) {
if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) {
return Collections.emptyList();
}
String endpoint = Joiner.on("/").join(noteId, "checkpoint");
List<Revision> history = Collections.emptyList();
try {
String response = restApiClient.asyncGet(endpoint);
String token = getUserToken(subject.getUser());
String response = restApiClient.get(token, endpoint);
history = GSON.fromJson(response, new TypeToken<List<Revision>>(){}.getType());
} catch (IOException e) {
LOG.error("Cannot get note history", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.model;

/**
* ZeppelinHub Instance structure.
*
*/
public class Instance {
public int id;
public String name;
public String token;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.zeppelin.notebook.repo.zeppelinhub.model;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.lang.StringUtils;

/**
* Simple and yet dummy container for zeppelinhub session.
*
*/
public class UserSessionContainer {
private static class Entity {
public final String userSession;

Entity(String userSession) {
this.userSession = userSession;
}
}

private Map<String, Entity> sessions = new ConcurrentHashMap<>();

public static final UserSessionContainer instance = new UserSessionContainer();

public synchronized String getSession(String principal) {
Entity entry = sessions.get(principal);
if (entry == null) {
return StringUtils.EMPTY;
}
return entry.userSession;
}

public synchronized String setSession(String principal, String userSession) {
Entity entry = new Entity(userSession);
sessions.put(principal, entry);
return entry.userSession;
}
}