From 5b59dccc6b79f16f341dba29897049221daa9bfa Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 25 Sep 2018 16:59:10 +0800 Subject: [PATCH] [ZEPPELIN-3758]. Convert old note file note.json to new style --- .../zeppelin/conf/ZeppelinConfiguration.java | 2 + .../notebook/repo/OldAzureNotebookRepo.java | 217 ++++++++++ .../repo/OldFileSystemNotebookRepo.java | 92 +++++ .../notebook/repo/OldGCSNotebookRepo.java | 216 ++++++++++ .../notebook/repo/OldGitNotebookRepo.java | 206 ++++++++++ .../notebook/repo/OldGitHubNotebookRepo.java | 130 ++++++ .../notebook/repo/OldS3NotebookRepo.java | 292 +++++++++++++ .../notebook/repo/OldVFSNotebookRepo.java | 283 +++++++++++++ .../repo/zeppelinhub/OldZeppelinHubRepo.java | 386 ++++++++++++++++++ .../apache/zeppelin/notebook/OldNoteInfo.java | 68 +++ .../notebook/repo/NotebookRepoSync.java | 22 + .../notebook/repo/OldNotebookRepo.java | 103 +++++ .../OldNotebookRepoWithVersionControl.java | 98 +++++ .../apache/zeppelin/plugin/PluginManager.java | 50 ++- 14 files changed, 2163 insertions(+), 2 deletions(-) create mode 100644 zeppelin-plugins/notebookrepo/azure/src/main/java/org/apache/zeppelin/notebook/repo/OldAzureNotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/filesystem/src/main/java/org/apache/zeppelin/notebook/repo/OldFileSystemNotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/gcs/src/main/java/org/apache/zeppelin/notebook/repo/OldGCSNotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/git/src/main/java/org/apache/zeppelin/notebook/repo/OldGitNotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/github/src/main/java/org/apache/zeppelin/notebook/repo/OldGitHubNotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/s3/src/main/java/org/apache/zeppelin/notebook/repo/OldS3NotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/vfs/src/main/java/org/apache/zeppelin/notebook/repo/OldVFSNotebookRepo.java create mode 100644 zeppelin-plugins/notebookrepo/zeppelin-hub/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/OldZeppelinHubRepo.java create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/OldNoteInfo.java create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepo.java create mode 100644 zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepoWithVersionControl.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 2b2f3b6fac6..bbaf7fa5319 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -732,6 +732,8 @@ public enum ConfVars { ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", "org.apache.zeppelin.notebook.repo.GitNotebookRepo"), ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false), + ZEPPELIN_NOTEBOOK_NEW_FORMAT_CONVERT("zeppelin.notebook.new_format.convert", false), + ZEPPELIN_NOTEBOOK_NEW_FORMAT_DELETE_OLD("zeppelin.notebook.new_format.delete_old", false), // whether by default note is public or private ZEPPELIN_NOTEBOOK_PUBLIC("zeppelin.notebook.public", true), ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", diff --git a/zeppelin-plugins/notebookrepo/azure/src/main/java/org/apache/zeppelin/notebook/repo/OldAzureNotebookRepo.java b/zeppelin-plugins/notebookrepo/azure/src/main/java/org/apache/zeppelin/notebook/repo/OldAzureNotebookRepo.java new file mode 100644 index 00000000000..2b1056d45d6 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/azure/src/main/java/org/apache/zeppelin/notebook/repo/OldAzureNotebookRepo.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.microsoft.azure.storage.CloudStorageAccount; +import com.microsoft.azure.storage.StorageException; +import com.microsoft.azure.storage.file.CloudFile; +import com.microsoft.azure.storage.file.CloudFileClient; +import com.microsoft.azure.storage.file.CloudFileDirectory; +import com.microsoft.azure.storage.file.CloudFileShare; +import com.microsoft.azure.storage.file.ListFileItem; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Azure storage backend for notebooks + */ +public class OldAzureNotebookRepo implements OldNotebookRepo { + private static final Logger LOG = LoggerFactory.getLogger(OldAzureNotebookRepo.class); + + private ZeppelinConfiguration conf; + private String user; + private String shareName; + private CloudFileDirectory rootDir; + + public OldAzureNotebookRepo() { + + } + + public void init(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + user = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_AZURE_USER); + shareName = conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_AZURE_SHARE); + + try { + CloudStorageAccount account = CloudStorageAccount.parse( + conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING)); + CloudFileClient client = account.createCloudFileClient(); + CloudFileShare share = client.getShareReference(shareName); + share.createIfNotExists(); + + CloudFileDirectory userDir = StringUtils.isBlank(user) ? + share.getRootDirectoryReference() : + share.getRootDirectoryReference().getDirectoryReference(user); + userDir.createIfNotExists(); + + rootDir = userDir.getDirectoryReference("notebook"); + rootDir.createIfNotExists(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + List infos = new LinkedList<>(); + OldNoteInfo info = null; + + for (ListFileItem item : rootDir.listFilesAndDirectories()) { + if (item.getClass() == CloudFileDirectory.class) { + CloudFileDirectory dir = (CloudFileDirectory) item; + + try { + if (dir.getFileReference("note.json").exists()) { + info = new OldNoteInfo(getNote(dir.getName())); + + if (info != null) { + infos.add(info); + } + } + } catch (StorageException | URISyntaxException e) { + String msg = "Error enumerating notebooks from Azure storage"; + LOG.error(msg, e); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + + return infos; + } + + private Note getNote(String noteId) throws IOException { + InputStream ins = null; + + try { + CloudFileDirectory dir = rootDir.getDirectoryReference(noteId); + CloudFile file = dir.getFileReference("note.json"); + + ins = file.openRead(); + } catch (URISyntaxException | StorageException e) { + String msg = String.format("Error reading notebook %s from Azure storage", noteId); + + LOG.error(msg, e); + + throw new IOException(msg, e); + } + + String json = IOUtils.toString(ins, + conf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_ENCODING)); + ins.close(); + return Note.fromJson(json); + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + return getNote(noteId); + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + String json = note.toJson(); + + ByteArrayOutputStream output = new ByteArrayOutputStream(); + Writer writer = new OutputStreamWriter(output); + writer.write(json); + writer.close(); + output.close(); + + byte[] buffer = output.toByteArray(); + + try { + CloudFileDirectory dir = rootDir.getDirectoryReference(note.getId()); + dir.createIfNotExists(); + + CloudFile cloudFile = dir.getFileReference("note.json"); + cloudFile.uploadFromByteArray(buffer, 0, buffer.length); + } catch (URISyntaxException | StorageException e) { + String msg = String.format("Error saving notebook %s to Azure storage", note.getId()); + + LOG.error(msg, e); + + throw new IOException(msg, e); + } + } + + // unfortunately, we need to use a recursive delete here + private void delete(ListFileItem item) throws StorageException { + if (item.getClass() == CloudFileDirectory.class) { + CloudFileDirectory dir = (CloudFileDirectory) item; + + for (ListFileItem subItem : dir.listFilesAndDirectories()) { + delete(subItem); + } + + dir.deleteIfExists(); + } else if (item.getClass() == CloudFile.class) { + CloudFile file = (CloudFile) item; + + file.deleteIfExists(); + } + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + try { + CloudFileDirectory dir = rootDir.getDirectoryReference(noteId); + + delete(dir); + } catch (URISyntaxException | StorageException e) { + String msg = String.format("Error deleting notebook %s from Azure storage", noteId); + + LOG.error(msg, e); + + throw new IOException(msg, e); + } + } + + @Override + public void close() { + } + + @Override + public List getSettings(AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + return Collections.emptyList(); + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + } + +} diff --git a/zeppelin-plugins/notebookrepo/filesystem/src/main/java/org/apache/zeppelin/notebook/repo/OldFileSystemNotebookRepo.java b/zeppelin-plugins/notebookrepo/filesystem/src/main/java/org/apache/zeppelin/notebook/repo/OldFileSystemNotebookRepo.java new file mode 100644 index 00000000000..8b9605e1939 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/filesystem/src/main/java/org/apache/zeppelin/notebook/repo/OldFileSystemNotebookRepo.java @@ -0,0 +1,92 @@ +package org.apache.zeppelin.notebook.repo; + +import org.apache.hadoop.fs.Path; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.FileSystemStorage; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * NotebookRepos for hdfs. + * + * Assume the notebook directory structure is as following + * - notebookdir + * - noteId/note.json + * - noteId/note.json + * - noteId/note.json + */ +public class OldFileSystemNotebookRepo implements OldNotebookRepo { + private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemNotebookRepo.class); + + private FileSystemStorage fs; + private Path notebookDir; + + public OldFileSystemNotebookRepo() { + + } + + public void init(ZeppelinConfiguration zConf) throws IOException { + this.fs = new FileSystemStorage(zConf, zConf.getNotebookDir()); + LOGGER.info("Creating FileSystem: " + this.fs.getFs().getClass().getName() + + " for Zeppelin Notebook."); + this.notebookDir = this.fs.makeQualified(new Path(zConf.getNotebookDir())); + LOGGER.info("Using folder {} to store notebook", notebookDir); + this.fs.tryMkDir(notebookDir); + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + List notePaths = fs.list(new Path(notebookDir, "*/note.json")); + List noteInfos = new ArrayList<>(); + for (Path path : notePaths) { + OldNoteInfo noteInfo = new OldNoteInfo(path.getParent().getName(), "", null); + noteInfos.add(noteInfo); + } + return noteInfos; + } + + @Override + public Note get(final String noteId, AuthenticationInfo subject) throws IOException { + String content = this.fs.readFile( + new Path(notebookDir.toString() + "/" + noteId + "/note.json")); + return Note.fromJson(content); + } + + @Override + public void save(final Note note, AuthenticationInfo subject) throws IOException { + this.fs.writeFile(note.toJson(), + new Path(notebookDir.toString() + "/" + note.getId() + "/note.json"), + true); + } + + @Override + public void remove(final String noteId, AuthenticationInfo subject) throws IOException { + this.fs.delete(new Path(notebookDir.toString() + "/" + noteId)); + } + + @Override + public void close() { + LOGGER.warn("close is not implemented for HdfsNotebookRepo"); + } + + @Override + public List getSettings(AuthenticationInfo subject) { + LOGGER.warn("getSettings is not implemented for HdfsNotebookRepo"); + return null; + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + LOGGER.warn("updateSettings is not implemented for HdfsNotebookRepo"); + } + +} diff --git a/zeppelin-plugins/notebookrepo/gcs/src/main/java/org/apache/zeppelin/notebook/repo/OldGCSNotebookRepo.java b/zeppelin-plugins/notebookrepo/gcs/src/main/java/org/apache/zeppelin/notebook/repo/OldGCSNotebookRepo.java new file mode 100644 index 00000000000..a0851e6e5e3 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/gcs/src/main/java/org/apache/zeppelin/notebook/repo/OldGCSNotebookRepo.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobListOption; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.gson.JsonParseException; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * A NotebookRepo implementation for storing notebooks in Google Cloud Storage. + * + * Notes are stored in the GCS "directory" specified by zeppelin.notebook.gcs.dir. This path + * must be in the form gs://bucketName/path/to/Dir. The bucket must already exist. N.B: GCS is an + * object store, so this "directory" should not itself be an object. Instead, it represents the base + * path for the note.json files. + * + * Authentication is provided by google-auth-library-java. + * @see + * google-auth-library-java. + */ +public class OldGCSNotebookRepo implements OldNotebookRepo { + + private static final Logger LOG = LoggerFactory.getLogger(GCSNotebookRepo.class); + private String encoding; + private String bucketName; + private Optional basePath; + private Pattern noteNamePattern; + private Storage storage; + + public OldGCSNotebookRepo() { + } + + @VisibleForTesting + public OldGCSNotebookRepo(ZeppelinConfiguration zConf, Storage storage) throws IOException { + init(zConf); + this.storage = storage; + } + + @Override + public void init(ZeppelinConfiguration zConf) throws IOException { + this.encoding = zConf.getString(ConfVars.ZEPPELIN_ENCODING); + + String gcsStorageDir = zConf.getGCSStorageDir(); + if (gcsStorageDir.isEmpty()) { + throw new IOException("GCS storage directory must be set using 'zeppelin.notebook.gcs.dir'"); + } + if (!gcsStorageDir.startsWith("gs://")) { + throw new IOException(String.format( + "GCS storage directory '%s' must start with 'gs://'.", gcsStorageDir)); + } + String storageDirWithoutScheme = gcsStorageDir.substring("gs://".length()); + + // pathComponents excludes empty string if trailing slash is present + List pathComponents = Arrays.asList(storageDirWithoutScheme.split("/")); + if (pathComponents.size() < 1) { + throw new IOException(String.format( + "GCS storage directory '%s' must be in the form gs://bucketname/path/to/dir", + gcsStorageDir)); + } + this.bucketName = pathComponents.get(0); + if (pathComponents.size() > 1) { + this.basePath = Optional.of(StringUtils.join( + pathComponents.subList(1, pathComponents.size()), "/")); + } else { + this.basePath = Optional.absent(); + } + + // Notes are stored at gs://bucketName/basePath//note.json + if (basePath.isPresent()) { + this.noteNamePattern = Pattern.compile( + "^" + Pattern.quote(basePath.get() + "/") + "([^/]+)/note\\.json$"); + } else { + this.noteNamePattern = Pattern.compile("^([^/]+)/note\\.json$"); + } + + this.storage = StorageOptions.getDefaultInstance().getService(); + } + + private BlobId makeBlobId(String noteId) { + if (basePath.isPresent()) { + return BlobId.of(bucketName, basePath.get() + "/" + noteId + "/note.json"); + } else { + return BlobId.of(bucketName, noteId + "/note.json"); + } + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + try { + List infos = new ArrayList<>(); + Iterable blobsUnderDir; + if (basePath.isPresent()) { + blobsUnderDir = storage + .list(bucketName, BlobListOption.prefix(this.basePath.get() + "/")) + .iterateAll(); + } else { + blobsUnderDir = storage + .list(bucketName) + .iterateAll(); + } + for (Blob b : blobsUnderDir) { + Matcher matcher = noteNamePattern.matcher(b.getName()); + if (matcher.matches()) { + // Callers only use the id field, so do not fetch each note + // This matches the implementation in FileSystemNoteRepo#list + infos.add(new OldNoteInfo(matcher.group(1), "", null)); + } + } + return infos; + } catch (StorageException se) { + throw new IOException("Could not list GCS directory: " + se.getMessage(), se); + } + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + BlobId blobId = makeBlobId(noteId); + byte[] contents; + try { + contents = storage.readAllBytes(blobId); + } catch (StorageException se) { + throw new IOException("Could not read " + blobId.toString() + ": " + se.getMessage(), se); + } + + try { + return Note.fromJson(new String(contents, encoding)); + } catch (JsonParseException jpe) { + throw new IOException( + "Could note parse as json " + blobId.toString() + jpe.getMessage(), jpe); + } + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + BlobInfo info = BlobInfo.newBuilder(makeBlobId(note.getId())) + .setContentType("application/json") + .build(); + try { + storage.create(info, note.toJson().getBytes("UTF-8")); + } catch (StorageException se) { + throw new IOException("Could not write " + info.toString() + ": " + se.getMessage(), se); + } + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + Preconditions.checkArgument(!Strings.isNullOrEmpty(noteId)); + BlobId blobId = makeBlobId(noteId); + try { + boolean deleted = storage.delete(blobId); + if (!deleted) { + throw new IOException("Tried to remove nonexistent blob " + blobId.toString()); + } + } catch (StorageException se) { + throw new IOException("Could not remove " + blobId.toString() + ": " + se.getMessage(), se); + } + } + + @Override + public void close() { + //no-op + } + + @Override + public List getSettings(AuthenticationInfo subject) { + LOG.warn("getSettings is not implemented for GCSNotebookRepo"); + return Collections.emptyList(); + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + LOG.warn("updateSettings is not implemented for GCSNotebookRepo"); + } +} diff --git a/zeppelin-plugins/notebookrepo/git/src/main/java/org/apache/zeppelin/notebook/repo/OldGitNotebookRepo.java b/zeppelin-plugins/notebookrepo/git/src/main/java/org/apache/zeppelin/notebook/repo/OldGitNotebookRepo.java new file mode 100644 index 00000000000..91c93c08d64 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/git/src/main/java/org/apache/zeppelin/notebook/repo/OldGitNotebookRepo.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.api.errors.NoHeadException; +import org.eclipse.jgit.diff.DiffEntry; +import org.eclipse.jgit.dircache.DirCache; +import org.eclipse.jgit.internal.storage.file.FileRepository; +import org.eclipse.jgit.lib.Constants; +import org.eclipse.jgit.lib.ObjectId; +import org.eclipse.jgit.lib.Repository; +import org.eclipse.jgit.revwalk.RevCommit; +import org.eclipse.jgit.treewalk.filter.PathFilter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +/** + * NotebookRepo that hosts all the notebook FS in a single Git repo + * + * This impl intended to be simple and straightforward: + * - does not handle branches + * - only basic local git file repo, no remote Github push\pull. GitHub integration is + * implemented in @see {@link org.apache.zeppelin.notebook.repo.GitNotebookRepo} + * + * TODO(bzz): add default .gitignore + */ +public class OldGitNotebookRepo extends OldVFSNotebookRepo + implements OldNotebookRepoWithVersionControl { + private static final Logger LOG = LoggerFactory.getLogger(GitNotebookRepo.class); + + private String localPath; + private Git git; + + public OldGitNotebookRepo() { + super(); + } + + @VisibleForTesting + public OldGitNotebookRepo(ZeppelinConfiguration conf) throws IOException { + this(); + init(conf); + } + + @Override + public void init(ZeppelinConfiguration conf) throws IOException { + //TODO(zjffdu), it is weird that I can not call super.init directly here, as it would cause + //AbstractMethodError + this.conf = conf; + setNotebookDirectory(conf.getNotebookDir()); + + localPath = getRootDir().getName().getPath(); + LOG.info("Opening a git repo at '{}'", localPath); + Repository localRepo = new FileRepository(Joiner.on(File.separator).join(localPath, ".git")); + if (!localRepo.getDirectory().exists()) { + LOG.info("Git repo {} does not exist, creating a new one", localRepo.getDirectory()); + localRepo.create(); + } + git = new Git(localRepo); + } + + @Override + public synchronized void save(Note note, AuthenticationInfo subject) throws IOException { + super.save(note, subject); + } + + /* implemented as git add+commit + * @param pattern is the noteId + * @param commitMessage is a commit message (checkpoint message) + * (non-Javadoc) + * @see org.apache.zeppelin.notebook.repo.VFSNotebookRepo#checkpoint(String, String) + */ + @Override + public Revision checkpoint(String pattern, String commitMessage, AuthenticationInfo subject) { + Revision revision = Revision.EMPTY; + try { + List gitDiff = git.diff().call(); + if (!gitDiff.isEmpty()) { + LOG.debug("Changes found for pattern '{}': {}", pattern, gitDiff); + DirCache added = git.add().addFilepattern(pattern).call(); + LOG.debug("{} changes are about to be commited", added.getEntryCount()); + RevCommit commit = git.commit().setMessage(commitMessage).call(); + revision = new Revision(commit.getName(), commit.getShortMessage(), commit.getCommitTime()); + } else { + LOG.debug("No changes found {}", pattern); + } + } catch (GitAPIException e) { + LOG.error("Failed to add+commit {} to Git", pattern, e); + } + return revision; + } + + /** + * the idea is to: + * 1. stash current changes + * 2. remember head commit and checkout to the desired revision + * 3. get note and checkout back to the head + * 4. apply stash on top and remove it + */ + @Override + public synchronized Note get(String noteId, String revId, AuthenticationInfo subject) + throws IOException { + Note note = null; + RevCommit stash = null; + try { + List gitDiff = git.diff().setPathFilter(PathFilter.create(noteId)).call(); + boolean modified = !gitDiff.isEmpty(); + if (modified) { + // stash changes + stash = git.stashCreate().call(); + Collection stashes = git.stashList().call(); + LOG.debug("Created stash : {}, stash size : {}", stash, stashes.size()); + } + ObjectId head = git.getRepository().resolve(Constants.HEAD); + // checkout to target revision + git.checkout().setStartPoint(revId).addPath(noteId).call(); + // get the note + note = super.get(noteId, subject); + // checkout back to head + git.checkout().setStartPoint(head.getName()).addPath(noteId).call(); + if (modified && stash != null) { + // unstash changes + ObjectId applied = git.stashApply().setStashRef(stash.getName()).call(); + ObjectId dropped = git.stashDrop().setStashRef(0).call(); + Collection stashes = git.stashList().call(); + LOG.debug("Stash applied as : {}, and dropped : {}, stash size: {}", applied, dropped, + stashes.size()); + } + } catch (GitAPIException e) { + LOG.error("Failed to return note from revision \"{}\"", revId, e); + } + return note; + } + + @Override + public List revisionHistory(String noteId, AuthenticationInfo subject) { + List history = Lists.newArrayList(); + LOG.debug("Listing history for {}:", noteId); + try { + Iterable logs = git.log().addPath(noteId).call(); + for (RevCommit log: logs) { + history.add(new Revision(log.getName(), log.getShortMessage(), log.getCommitTime())); + LOG.debug(" - ({},{},{})", log.getName(), log.getCommitTime(), log.getFullMessage()); + } + } catch (NoHeadException e) { + //when no initial commit exists + LOG.warn("No Head found for {}, {}", noteId, e.getMessage()); + } catch (GitAPIException e) { + LOG.error("Failed to get logs for {}", noteId, e); + } + return history; + } + + @Override + public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) + throws IOException { + Note revisionNote = get(noteId, revId, subject); + if (revisionNote != null) { + save(revisionNote, subject); + } + return revisionNote; + } + + @Override + public void close() { + git.getRepository().close(); + } + + //DI replacements for Tests + protected Git getGit() { + return git; + } + + void setGit(Git git) { + this.git = git; + } + +} + diff --git a/zeppelin-plugins/notebookrepo/github/src/main/java/org/apache/zeppelin/notebook/repo/OldGitHubNotebookRepo.java b/zeppelin-plugins/notebookrepo/github/src/main/java/org/apache/zeppelin/notebook/repo/OldGitHubNotebookRepo.java new file mode 100644 index 00000000000..a6f86ff3185 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/github/src/main/java/org/apache/zeppelin/notebook/repo/OldGitHubNotebookRepo.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.eclipse.jgit.api.Git; +import org.eclipse.jgit.api.PullCommand; +import org.eclipse.jgit.api.PushCommand; +import org.eclipse.jgit.api.RemoteAddCommand; +import org.eclipse.jgit.api.errors.GitAPIException; +import org.eclipse.jgit.transport.URIish; +import org.eclipse.jgit.transport.UsernamePasswordCredentialsProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; + +/** + * GitHub integration to store notebooks in a GitHub repository. + * It uses the same simple logic implemented in @see + * {@link org.apache.zeppelin.notebook.repo.GitNotebookRepo} + * + * The logic for updating the local repository from the remote repository is the following: + * - When the GitHubNotebookRepo is initialized + * - When pushing the changes to the remote repository + * + * The logic for updating the remote repository on GitHub from local repository is the following: + * - When commit the changes (saving the notebook) + * + * You should be able to use this integration with all remote git repositories that accept + * username + password authentication, not just GitHub. + */ +public class OldGitHubNotebookRepo extends OldGitNotebookRepo { + private static final Logger LOG = LoggerFactory.getLogger(GitHubNotebookRepo.class); + private ZeppelinConfiguration zeppelinConfiguration; + private Git git; + + @Override + public void init(ZeppelinConfiguration conf) throws IOException { + super.init(conf); + LOG.debug("initializing GitHubNotebookRepo"); + this.git = super.getGit(); + this.zeppelinConfiguration = conf; + + configureRemoteStream(); + pullFromRemoteStream(); + } + + @Override + public Revision checkpoint(String pattern, String commitMessage, AuthenticationInfo subject) { + Revision revision = super.checkpoint(pattern, commitMessage, subject); + + updateRemoteStream(); + + return revision; + } + + private void configureRemoteStream() { + try { + LOG.debug("Setting up remote stream"); + RemoteAddCommand remoteAddCommand = git.remoteAdd(); + remoteAddCommand.setName(zeppelinConfiguration.getZeppelinNotebookGitRemoteOrigin()); + remoteAddCommand.setUri(new URIish(zeppelinConfiguration.getZeppelinNotebookGitURL())); + remoteAddCommand.call(); + } catch (GitAPIException e) { + LOG.error("Error configuring GitHub", e); + } catch (URISyntaxException e) { + LOG.error("Error in GitHub URL provided", e); + } + } + + private void updateRemoteStream() { + LOG.debug("Updating remote stream"); + + pullFromRemoteStream(); + pushToRemoteSteam(); + } + + private void pullFromRemoteStream() { + try { + LOG.debug("Pulling latest changes from remote stream"); + PullCommand pullCommand = git.pull(); + pullCommand.setCredentialsProvider( + new UsernamePasswordCredentialsProvider( + zeppelinConfiguration.getZeppelinNotebookGitUsername(), + zeppelinConfiguration.getZeppelinNotebookGitAccessToken() + ) + ); + + pullCommand.call(); + + } catch (GitAPIException e) { + LOG.error("Error when pulling latest changes from remote repository", e); + } + } + + private void pushToRemoteSteam() { + try { + LOG.debug("Pushing latest changes to remote stream"); + PushCommand pushCommand = git.push(); + pushCommand.setCredentialsProvider( + new UsernamePasswordCredentialsProvider( + zeppelinConfiguration.getZeppelinNotebookGitUsername(), + zeppelinConfiguration.getZeppelinNotebookGitAccessToken() + ) + ); + + pushCommand.call(); + } catch (GitAPIException e) { + LOG.error("Error when pushing latest changes to remote repository", e); + } + } +} diff --git a/zeppelin-plugins/notebookrepo/s3/src/main/java/org/apache/zeppelin/notebook/repo/OldS3NotebookRepo.java b/zeppelin-plugins/notebookrepo/s3/src/main/java/org/apache/zeppelin/notebook/repo/OldS3NotebookRepo.java new file mode 100644 index 00000000000..8cbd79a5cdb --- /dev/null +++ b/zeppelin-plugins/notebookrepo/s3/src/main/java/org/apache/zeppelin/notebook/repo/OldS3NotebookRepo.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.ClientConfiguration; +import com.amazonaws.ClientConfigurationFactory; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.AmazonS3EncryptionClient; +import com.amazonaws.services.s3.model.CryptoConfiguration; +import com.amazonaws.services.s3.model.EncryptionMaterialsProvider; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.KMSEncryptionMaterialsProvider; +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectSummary; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Backend for storing Notebooks on S3 + */ +public class OldS3NotebookRepo implements OldNotebookRepo { + private static final Logger LOG = LoggerFactory.getLogger(S3NotebookRepo.class); + + // Use a credential provider chain so that instance profiles can be utilized + // on an EC2 instance. The order of locations where credentials are searched + // is documented here + // + // http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ + // auth/DefaultAWSCredentialsProviderChain.html + // + // In summary, the order is: + // + // 1. Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY + // 2. Java System Properties - aws.accessKeyId and aws.secretKey + // 3. Credential profiles file at the default location (~/.aws/credentials) + // shared by all AWS SDKs and the AWS CLI + // 4. Instance profile credentials delivered through the Amazon EC2 metadata service + private AmazonS3 s3client; + private String bucketName; + private String user; + private boolean useServerSideEncryption; + private ZeppelinConfiguration conf; + + public OldS3NotebookRepo() { + + } + + public void init(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + bucketName = conf.getS3BucketName(); + user = conf.getS3User(); + useServerSideEncryption = conf.isS3ServerSideEncryption(); + + // always use the default provider chain + AWSCredentialsProvider credentialsProvider = new DefaultAWSCredentialsProviderChain(); + CryptoConfiguration cryptoConf = new CryptoConfiguration(); + String keyRegion = conf.getS3KMSKeyRegion(); + + if (StringUtils.isNotBlank(keyRegion)) { + cryptoConf.setAwsKmsRegion(Region.getRegion(Regions.fromName(keyRegion))); + } + + ClientConfiguration cliConf = createClientConfiguration(); + + // see if we should be encrypting data in S3 + String kmsKeyID = conf.getS3KMSKeyID(); + if (kmsKeyID != null) { + // use the AWS KMS to encrypt data + KMSEncryptionMaterialsProvider emp = new KMSEncryptionMaterialsProvider(kmsKeyID); + this.s3client = new AmazonS3EncryptionClient(credentialsProvider, emp, cliConf, cryptoConf); + } + else if (conf.getS3EncryptionMaterialsProviderClass() != null) { + // use a custom encryption materials provider class + EncryptionMaterialsProvider emp = createCustomProvider(conf); + this.s3client = new AmazonS3EncryptionClient(credentialsProvider, emp, cliConf, cryptoConf); + } + else { + // regular S3 + this.s3client = new AmazonS3Client(credentialsProvider, cliConf); + } + + // set S3 endpoint to use + s3client.setEndpoint(conf.getS3Endpoint()); + } + + /** + * Create an instance of a custom encryption materials provider class + * which supplies encryption keys to use when reading/writing data in S3. + */ + private EncryptionMaterialsProvider createCustomProvider(ZeppelinConfiguration conf) + throws IOException { + // use a custom encryption materials provider class + String empClassname = conf.getS3EncryptionMaterialsProviderClass(); + EncryptionMaterialsProvider emp; + try { + Object empInstance = Class.forName(empClassname).newInstance(); + if (empInstance instanceof EncryptionMaterialsProvider) { + emp = (EncryptionMaterialsProvider) empInstance; + } + else { + throw new IOException("Class " + empClassname + " does not implement " + + EncryptionMaterialsProvider.class.getName()); + } + } + catch (Exception e) { + throw new IOException("Unable to instantiate encryption materials provider class " + + empClassname + ": " + e, e); + } + + return emp; + } + + /** + * Create AWS client configuration and return it. + * @return AWS client configuration + */ + private ClientConfiguration createClientConfiguration() { + ClientConfigurationFactory configFactory = new ClientConfigurationFactory(); + ClientConfiguration config = configFactory.getConfig(); + + String s3SignerOverride = conf.getS3SignerOverride(); + if (StringUtils.isNotBlank(s3SignerOverride)) { + config.setSignerOverride(s3SignerOverride); + } + + return config; + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + List infos = new LinkedList<>(); + OldNoteInfo info; + try { + ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName) + .withPrefix(user + "/" + "notebook"); + ObjectListing objectListing; + do { + objectListing = s3client.listObjects(listObjectsRequest); + for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { + if (objectSummary.getKey().endsWith("note.json")) { + info = getNoteInfo(objectSummary.getKey()); + if (info != null) { + infos.add(info); + } + } + } + listObjectsRequest.setMarker(objectListing.getNextMarker()); + } while (objectListing.isTruncated()); + } catch (AmazonClientException ace) { + throw new IOException("Unable to list objects in S3: " + ace, ace); + } + return infos; + } + + private Note getNote(String key) throws IOException { + S3Object s3object; + try { + s3object = s3client.getObject(new GetObjectRequest(bucketName, key)); + } + catch (AmazonClientException ace) { + throw new IOException("Unable to retrieve object from S3: " + ace, ace); + } + + try (InputStream ins = s3object.getObjectContent()) { + String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + return Note.fromJson(json); + } + } + + private OldNoteInfo getNoteInfo(String key) throws IOException { + Note note = getNote(key); + return new OldNoteInfo(note); + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + return getNote(user + "/" + "notebook" + "/" + noteId + "/" + "note.json"); + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + String json = note.toJson(); + String key = user + "/" + "notebook" + "/" + note.getId() + "/" + "note.json"; + + File file = File.createTempFile("note", "json"); + try { + Writer writer = new OutputStreamWriter(new FileOutputStream(file)); + writer.write(json); + writer.close(); + + PutObjectRequest putRequest = new PutObjectRequest(bucketName, key, file); + + if (useServerSideEncryption) { + // Request server-side encryption. + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setSSEAlgorithm(ObjectMetadata.AES_256_SERVER_SIDE_ENCRYPTION); + putRequest.setMetadata(objectMetadata); + } + + s3client.putObject(putRequest); + } + catch (AmazonClientException ace) { + throw new IOException("Unable to store note in S3: " + ace, ace); + } + finally { + FileUtils.deleteQuietly(file); + } + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + String key = user + "/" + "notebook" + "/" + noteId; + final ListObjectsRequest listObjectsRequest = new ListObjectsRequest() + .withBucketName(bucketName).withPrefix(key); + + try { + ObjectListing objects = s3client.listObjects(listObjectsRequest); + do { + for (S3ObjectSummary objectSummary : objects.getObjectSummaries()) { + s3client.deleteObject(bucketName, objectSummary.getKey()); + } + objects = s3client.listNextBatchOfObjects(objects); + } while (objects.isTruncated()); + } + catch (AmazonClientException ace) { + throw new IOException("Unable to remove note in S3: " + ace, ace); + } + } + + @Override + public void close() { + //no-op + } + + @Override + public List getSettings(AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + return Collections.emptyList(); + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + LOG.warn("Method not implemented"); + } + +} diff --git a/zeppelin-plugins/notebookrepo/vfs/src/main/java/org/apache/zeppelin/notebook/repo/OldVFSNotebookRepo.java b/zeppelin-plugins/notebookrepo/vfs/src/main/java/org/apache/zeppelin/notebook/repo/OldVFSNotebookRepo.java new file mode 100644 index 00000000000..aa1f449c790 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/vfs/src/main/java/org/apache/zeppelin/notebook/repo/OldVFSNotebookRepo.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.vfs2.FileContent; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.Selectors; +import org.apache.commons.vfs2.VFS; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** +* +*/ +public class OldVFSNotebookRepo implements OldNotebookRepo { + private static final Logger LOG = LoggerFactory.getLogger(OldVFSNotebookRepo.class); + + private FileSystemManager fsManager; + private URI filesystemRoot; + protected ZeppelinConfiguration conf; + + @Override + public void init(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + setNotebookDirectory(conf.getNotebookDir()); + } + + protected void setNotebookDirectory(String notebookDirPath) throws IOException { + try { + LOG.info("Using notebookDir: " + notebookDirPath); + if (conf.isWindowsPath(notebookDirPath)) { + filesystemRoot = new File(notebookDirPath).toURI(); + } else { + filesystemRoot = new URI(notebookDirPath); + } + } catch (URISyntaxException e1) { + throw new IOException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + File f = new File(conf.getRelativeDir(filesystemRoot.getPath())); + this.filesystemRoot = f.toURI(); + } + + fsManager = VFS.getManager(); + FileObject file = fsManager.resolveFile(filesystemRoot.getPath()); + if (!file.exists()) { + LOG.info("Notebook dir doesn't exist, create on is {}.", file.getName()); + file.createFolder(); + } + } + + private String getNotebookDirPath() { + return filesystemRoot.getPath().toString(); + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + FileObject rootDir = getRootDir(); + + FileObject[] children = rootDir.getChildren(); + + List infos = new LinkedList<>(); + for (FileObject f : children) { + String fileName = f.getName().getBaseName(); + if (f.isHidden() + || fileName.startsWith(".") + || fileName.startsWith("#") + || fileName.startsWith("~")) { + // skip hidden, temporary files + continue; + } + + if (!isDirectory(f)) { + // currently single note is saved like, [NOTE_ID]/note.json. + // so it must be a directory + continue; + } + + OldNoteInfo info = null; + + try { + info = getNoteInfo(f); + if (info != null) { + infos.add(info); + } + } catch (Exception e) { + LOG.error("Can't read note " + f.getName().toString()); + } + } + + return infos; + } + + private Note getNote(FileObject noteDir) throws IOException { + if (!isDirectory(noteDir)) { + throw new IOException(noteDir.getName().toString() + " is not a directory"); + } + + FileObject noteJson = noteDir.resolveFile("note.json", NameScope.CHILD); + if (!noteJson.exists()) { + throw new IOException(noteJson.getName().toString() + " not found"); + } + + FileContent content = noteJson.getContent(); + InputStream ins = content.getInputStream(); + String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + ins.close(); + + return Note.fromJson(json); + } + + private OldNoteInfo getNoteInfo(FileObject noteDir) throws IOException { + Note note = getNote(noteDir); + return new OldNoteInfo(note); + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + FileObject noteDir = rootDir.resolveFile(noteId, NameScope.CHILD); + + return getNote(noteDir); + } + + protected FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + + if (!rootDir.exists()) { + throw new IOException("Root path does not exists"); + } + + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + @Override + public synchronized void save(Note note, AuthenticationInfo subject) throws IOException { + LOG.info("Saving note:" + note.getId()); + String json = note.toJson(); + + FileObject rootDir = getRootDir(); + + FileObject noteDir = rootDir.resolveFile(note.getId(), NameScope.CHILD); + + if (!noteDir.exists()) { + noteDir.createFolder(); + } + if (!isDirectory(noteDir)) { + throw new IOException(noteDir.getName().toString() + " is not a directory"); + } + + FileObject noteJson = noteDir.resolveFile(".note.json", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = noteJson.getContent().getOutputStream(false); + out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING))); + out.close(); + noteJson.moveTo(noteDir.resolveFile("note.json", NameScope.CHILD)); + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + FileObject noteDir = rootDir.resolveFile(noteId, NameScope.CHILD); + + if (!noteDir.exists()) { + // nothing to do + return; + } + + if (!isDirectory(noteDir)) { + // it is not look like zeppelin note savings + throw new IOException("Can not remove " + noteDir.getName().toString()); + } + + noteDir.delete(Selectors.SELECT_SELF_AND_CHILDREN); + } + + @Override + public void close() { + //no-op + } + + @Override + public List getSettings(AuthenticationInfo subject) { + NotebookRepoSettingsInfo repoSetting = NotebookRepoSettingsInfo.newInstance(); + List settings = new ArrayList<>(); + repoSetting.name = "Notebook Path"; + repoSetting.type = NotebookRepoSettingsInfo.Type.INPUT; + repoSetting.value = Collections.emptyList(); + repoSetting.selected = getNotebookDirPath(); + + settings.add(repoSetting); + return settings; + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + if (settings == null || settings.isEmpty()) { + LOG.error("Cannot update {} with empty settings", this.getClass().getName()); + return; + } + String newNotebookDirectotyPath = StringUtils.EMPTY; + if (settings.containsKey("Notebook Path")) { + newNotebookDirectotyPath = settings.get("Notebook Path"); + } + + if (StringUtils.isBlank(newNotebookDirectotyPath)) { + LOG.error("Notebook path is invalid"); + return; + } + LOG.warn("{} will change notebook dir from {} to {}", + subject.getUser(), getNotebookDirPath(), newNotebookDirectotyPath); + try { + setNotebookDirectory(newNotebookDirectotyPath); + } catch (IOException e) { + LOG.error("Cannot update notebook directory", e); + } + } + +} + diff --git a/zeppelin-plugins/notebookrepo/zeppelin-hub/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/OldZeppelinHubRepo.java b/zeppelin-plugins/notebookrepo/zeppelin-hub/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/OldZeppelinHubRepo.java new file mode 100644 index 00000000000..a56c4735ee2 --- /dev/null +++ b/zeppelin-plugins/notebookrepo/zeppelin-hub/src/main/java/org/apache/zeppelin/notebook/repo/zeppelinhub/OldZeppelinHubRepo.java @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook.repo.zeppelinhub; + +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.notebook.repo.NotebookRepoSettingsInfo; +import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; +import org.apache.zeppelin.notebook.repo.OldNotebookRepoWithVersionControl; +import org.apache.zeppelin.notebook.repo.zeppelinhub.model.Instance; +import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserSessionContainer; +import org.apache.zeppelin.notebook.repo.zeppelinhub.model.UserTokenContainer; +import org.apache.zeppelin.notebook.repo.zeppelinhub.rest.ZeppelinhubRestApiHandler; +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.Client; +import org.apache.zeppelin.notebook.repo.zeppelinhub.websocket.utils.ZeppelinhubUtils; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * ZeppelinHub repo class. + */ +public class OldZeppelinHubRepo implements OldNotebookRepoWithVersionControl { + private static final Logger LOG = LoggerFactory.getLogger(ZeppelinHubRepo.class); + private static final String DEFAULT_SERVER = "https://www.zeppelinhub.com"; + static final String ZEPPELIN_CONF_PROP_NAME_SERVER = "zeppelinhub.api.address"; + static final String ZEPPELIN_CONF_PROP_NAME_TOKEN = "zeppelinhub.api.token"; + + private static final Gson GSON = new Gson(); + private static final Note EMPTY_NOTE = new Note(); + private Client websocketClient; + private UserTokenContainer tokenManager; + + private String token; + private ZeppelinhubRestApiHandler restApiClient; + + private ZeppelinConfiguration conf; + + public OldZeppelinHubRepo() { + + } + + public OldZeppelinHubRepo(ZeppelinConfiguration conf) { + this(); + init(conf); + } + + public void init(ZeppelinConfiguration conf) { + this.conf = conf; + String zeppelinHubUrl = getZeppelinHubUrl(conf); + LOG.info("Initializing ZeppelinHub integration module"); + + token = conf.getString("ZEPPELINHUB_API_TOKEN", ZEPPELIN_CONF_PROP_NAME_TOKEN, ""); + restApiClient = ZeppelinhubRestApiHandler.newInstance(zeppelinHubUrl); + //TODO(khalid): check which realm for authentication, pass to token manager + tokenManager = UserTokenContainer.init(restApiClient, token); + + websocketClient = Client.initialize(getZeppelinWebsocketUri(conf), + getZeppelinhubWebsocketUri(conf), token, conf); + websocketClient.start(); + } + + private String getZeppelinHubWsUri(URI api) throws URISyntaxException { + URI apiRoot = api; + String scheme = apiRoot.getScheme(); + int port = apiRoot.getPort(); + if (port <= 0) { + port = (scheme != null && scheme.equals("https")) ? 443 : 80; + } + + if (scheme == null) { + LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}", + apiRoot, DEFAULT_SERVER); + apiRoot = new URI(DEFAULT_SERVER); + scheme = apiRoot.getScheme(); + port = apiRoot.getPort(); + if (port <= 0) { + port = (scheme != null && scheme.equals("https")) ? 443 : 80; + } + } + String ws = scheme.equals("https") ? "wss://" : "ws://"; + return ws + apiRoot.getHost() + ":" + port + "/async"; + } + + String getZeppelinhubWebsocketUri(ZeppelinConfiguration conf) { + String zeppelinHubUri = StringUtils.EMPTY; + try { + zeppelinHubUri = getZeppelinHubWsUri(new URI(conf.getString("ZEPPELINHUB_API_ADDRESS", + ZEPPELIN_CONF_PROP_NAME_SERVER, DEFAULT_SERVER))); + } catch (URISyntaxException e) { + LOG.error("Cannot get ZeppelinHub URI", e); + } + return zeppelinHubUri; + } + + private String getZeppelinWebsocketUri(ZeppelinConfiguration conf) { + int port = conf.getServerPort(); + if (port <= 0) { + port = 80; + } + String ws = conf.useSsl() ? "wss" : "ws"; + return ws + "://localhost:" + port + "/ws"; + } + + // Used in tests + void setZeppelinhubRestApiHandler(ZeppelinhubRestApiHandler zeppelinhub) { + restApiClient = zeppelinhub; + } + + String getZeppelinHubUrl(ZeppelinConfiguration conf) { + if (conf == null) { + LOG.error("Invalid configuration, cannot be null. Using default address {}", DEFAULT_SERVER); + return DEFAULT_SERVER; + } + URI apiRoot; + String zeppelinhubUrl; + try { + String url = conf.getString("ZEPPELINHUB_API_ADDRESS", + ZEPPELIN_CONF_PROP_NAME_SERVER, + DEFAULT_SERVER); + apiRoot = new URI(url); + } catch (URISyntaxException e) { + LOG.error("Invalid zeppelinhub url, using default address {}", DEFAULT_SERVER, e); + return DEFAULT_SERVER; + } + + String scheme = apiRoot.getScheme(); + if (scheme == null) { + LOG.info("{} is not a valid zeppelinhub server address. proceed with default address {}", + apiRoot, DEFAULT_SERVER); + zeppelinhubUrl = DEFAULT_SERVER; + } else { + zeppelinhubUrl = scheme + "://" + apiRoot.getHost(); + if (apiRoot.getPort() > 0) { + zeppelinhubUrl += ":" + apiRoot.getPort(); + } + } + return zeppelinhubUrl; + } + + private boolean isSubjectValid(AuthenticationInfo subject) { + if (subject == null) { + return false; + } + return (subject.isAnonymous() && !conf.isAnonymousAllowed()) ? false : true; + } + + @Override + public List list(AuthenticationInfo subject) throws IOException { + if (!isSubjectValid(subject)) { + return Collections.emptyList(); + } + String token = getUserToken(subject.getUser()); + String response = restApiClient.get(token, StringUtils.EMPTY); + List notes = GSON.fromJson(response, new TypeToken>() {}.getType()); + if (notes == null) { + return Collections.emptyList(); + } + LOG.info("ZeppelinHub REST API listing notes "); + return notes; + } + + @Override + public Note get(String noteId, AuthenticationInfo subject) throws IOException { + if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) { + return EMPTY_NOTE; + } + String token = getUserToken(subject.getUser()); + String response = restApiClient.get(token, noteId); + Note note = Note.fromJson(response); + if (note == null) { + return EMPTY_NOTE; + } + LOG.info("ZeppelinHub REST API get note {} ", noteId); + return note; + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + if (note == null || !isSubjectValid(subject)) { + throw new IOException("Zeppelinhub failed to save note"); + } + String jsonNote = note.toJson(); + String token = getUserToken(subject.getUser()); + LOG.info("ZeppelinHub REST API saving note {} ", note.getId()); + restApiClient.put(token, jsonNote); + } + + @Override + public void remove(String noteId, AuthenticationInfo subject) throws IOException { + if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) { + throw new IOException("Zeppelinhub failed to remove note"); + } + String token = getUserToken(subject.getUser()); + LOG.info("ZeppelinHub REST API removing note {} ", noteId); + restApiClient.del(token, noteId); + } + + @Override + public void close() { + websocketClient.stop(); + restApiClient.close(); + } + + @Override + public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject) + throws IOException { + if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) { + return Revision.EMPTY; + } + String endpoint = Joiner.on("/").join(noteId, "checkpoint"); + String content = GSON.toJson(ImmutableMap.of("message", checkpointMsg)); + + String token = getUserToken(subject.getUser()); + String response = restApiClient.putWithResponseBody(token, endpoint, content); + + return GSON.fromJson(response, Revision.class); + } + + @Override + public Note get(String noteId, String revId, AuthenticationInfo subject) throws IOException { + if (StringUtils.isBlank(noteId) || StringUtils.isBlank(revId) || !isSubjectValid(subject)) { + return EMPTY_NOTE; + } + String endpoint = Joiner.on("/").join(noteId, "checkpoint", revId); + String token = getUserToken(subject.getUser()); + String response = restApiClient.get(token, endpoint); + + Note note = Note.fromJson(response); + if (note == null) { + return EMPTY_NOTE; + } + LOG.info("ZeppelinHub REST API get note {} revision {}", noteId, revId); + return note; + } + + @Override + public List revisionHistory(String noteId, AuthenticationInfo subject) { + if (StringUtils.isBlank(noteId) || !isSubjectValid(subject)) { + return Collections.emptyList(); + } + String endpoint = Joiner.on("/").join(noteId, "checkpoint"); + List history = Collections.emptyList(); + try { + String token = getUserToken(subject.getUser()); + String response = restApiClient.get(token, endpoint); + history = GSON.fromJson(response, new TypeToken>(){}.getType()); + } catch (IOException e) { + LOG.error("Cannot get note history", e); + } + return history; + } + + private String getUserToken(String user) { + return tokenManager.getUserToken(user); + } + + @Override + public List getSettings(AuthenticationInfo subject) { + if (!isSubjectValid(subject)) { + return Collections.emptyList(); + } + + List settings = Lists.newArrayList(); + String user = subject.getUser(); + String zeppelinHubUserSession = UserSessionContainer.instance.getSession(user); + String userToken = getUserToken(user); + List instances; + List> values = Lists.newLinkedList(); + + try { + instances = tokenManager.getUserInstances(zeppelinHubUserSession); + } catch (IOException e) { + LOG.warn("Couldnt find instances for the session {}, returning empty collection", + zeppelinHubUserSession); + // user not logged + //TODO(xxx): handle this case. + instances = Collections.emptyList(); + } + + NotebookRepoSettingsInfo repoSetting = NotebookRepoSettingsInfo.newInstance(); + repoSetting.type = NotebookRepoSettingsInfo.Type.DROPDOWN; + for (Instance instance : instances) { + if (instance.token.equals(userToken)) { + repoSetting.selected = Integer.toString(instance.id); + } + values.add(ImmutableMap.of("name", instance.name, "value", Integer.toString(instance.id))); + } + + repoSetting.value = values; + repoSetting.name = "Instance"; + settings.add(repoSetting); + return settings; + } + + private void changeToken(int instanceId, String user) { + if (instanceId <= 0) { + LOG.error("User {} tried to switch to a non valid instance {}", user, instanceId); + return; + } + + LOG.info("User {} will switch instance", user); + String ticket = UserSessionContainer.instance.getSession(user); + List instances; + String currentToken = StringUtils.EMPTY, targetToken = StringUtils.EMPTY; + try { + instances = tokenManager.getUserInstances(ticket); + if (instances.isEmpty()) { + return; + } + currentToken = tokenManager.getExistingUserToken(user); + for (Instance instance : instances) { + if (instance.id == instanceId) { + LOG.info("User {} switched to instance {}", user, instance.name); + tokenManager.setUserToken(user, instance.token); + targetToken = instance.token; + break; + } + } + if (!StringUtils.isBlank(currentToken) && !StringUtils.isBlank(targetToken)) { + ZeppelinhubUtils.userSwitchTokenRoutine(user, currentToken, targetToken); + } + } catch (IOException e) { + LOG.error("Cannot switch instance for user {}", user, e); + } + } + + @Override + public void updateSettings(Map settings, AuthenticationInfo subject) { + if (!isSubjectValid(subject)) { + LOG.error("Invalid subject, cannot update Zeppelinhub settings"); + return; + } + if (settings == null || settings.isEmpty()) { + LOG.error("Cannot update ZeppelinHub repo settings because of invalid settings"); + return; + } + + int instanceId = 0; + if (settings.containsKey("Instance")) { + try { + instanceId = Integer.parseInt(settings.get("Instance")); + } catch (NumberFormatException e) { + LOG.error("ZeppelinHub Instance Id in not a valid integer", e); + } + } + changeToken(instanceId, subject.getUser()); + } + + @Override + public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) + throws IOException { + // Auto-generated method stub + return null; + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/OldNoteInfo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/OldNoteInfo.java new file mode 100644 index 00000000000..7c00aab19fd --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/OldNoteInfo.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class OldNoteInfo { + String id; + String name; + private Map config = new HashMap<>(); + + public OldNoteInfo(String id, String name, Map config) { + super(); + this.id = id; + this.name = name; + this.config = config; + } + + public OldNoteInfo(Note note) { + id = note.getId(); + name = note.getName(); + config = note.getConfig(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Map getConfig() { + return config; + } + + public void setConfig(Map config) { + this.config = config; + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index ad486b9ecbd..eda6e10df1b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -23,6 +23,7 @@ import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.notebook.NotebookAuthorization; +import org.apache.zeppelin.notebook.OldNoteInfo; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.plugin.PluginManager; import org.apache.zeppelin.user.AuthenticationInfo; @@ -83,6 +84,27 @@ public void init(ZeppelinConfiguration conf) throws IOException { defaultNotebookRepo.init(conf); repos.add(defaultNotebookRepo); } + + // convert old note file (noteId/note.json) to new note file (note_name_note_id.zpln) + boolean convertToNew = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_NEW_FORMAT_CONVERT); + boolean deleteOld = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_NEW_FORMAT_DELETE_OLD); + if (convertToNew) { + NotebookRepo newNotebookRepo = repos.get(0); + OldNotebookRepo oldNotebookRepo = + PluginManager.get().loadOldNotebookRepo(newNotebookRepo.getClass().getCanonicalName()); + oldNotebookRepo.init(conf); + List oldNotesInfo = oldNotebookRepo.list(AuthenticationInfo.ANONYMOUS); + LOGGER.info("Convert old note file to new style, note count: " + oldNotesInfo.size()); + for (OldNoteInfo oldNoteInfo : oldNotesInfo) { + Note note = oldNotebookRepo.get(oldNoteInfo.getId(), AuthenticationInfo.ANONYMOUS); + note.setPath(note.getName()); + newNotebookRepo.save(note, AuthenticationInfo.ANONYMOUS); + if (deleteOld) { + oldNotebookRepo.remove(note.getId(), AuthenticationInfo.ANONYMOUS); + } + } + } + // sync for anonymous mode on start if (getRepoCount() > 1 && conf.getBoolean(ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED)) { try { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepo.java new file mode 100644 index 00000000000..63f0026420b --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepo.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import org.apache.zeppelin.annotation.ZeppelinApi; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.OldNoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Notebook repository (persistence layer) abstraction + */ +public interface OldNotebookRepo { + + void init(ZeppelinConfiguration zConf) throws IOException; + + /** + * Lists notebook information about all notebooks in storage. + * @param subject contains user information. + * @return + * @throws IOException + */ + @ZeppelinApi + public List list(AuthenticationInfo subject) throws IOException; + + /** + * Get the notebook with the given id. + * @param noteId is note id. + * @param subject contains user information. + * @return + * @throws IOException + */ + @ZeppelinApi + public Note get(String noteId, AuthenticationInfo subject) throws IOException; + + /** + * Save given note in storage + * @param note is the note itself. + * @param subject contains user information. + * @throws IOException + */ + @ZeppelinApi + public void save(Note note, AuthenticationInfo subject) throws IOException; + + /** + * Remove note with given id. + * @param noteId is the note id. + * @param subject contains user information. + * @throws IOException + */ + @ZeppelinApi + public void remove(String noteId, AuthenticationInfo subject) throws IOException; + + /** + * Release any underlying resources + */ + @ZeppelinApi + public void close(); + + /** + * Versioning API (optional, preferred to have). + */ + + /** + * Get NotebookRepo settings got the given user. + * + * @param subject + * @return + */ + @ZeppelinApi + public List getSettings(AuthenticationInfo subject); + + /** + * update notebook repo settings. + * + * @param settings + * @param subject + */ + @ZeppelinApi + public void updateSettings(Map settings, AuthenticationInfo subject); + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepoWithVersionControl.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepoWithVersionControl.java new file mode 100644 index 00000000000..b85a0f9ca42 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/OldNotebookRepoWithVersionControl.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.annotation.ZeppelinApi; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.user.AuthenticationInfo; + +import java.io.IOException; +import java.util.List; + +/** + * Notebook repository (persistence layer) abstraction + */ +public interface OldNotebookRepoWithVersionControl extends OldNotebookRepo { + + /** + * chekpoint (set revision) for notebook. + * @param noteId Id of the Notebook + * @param checkpointMsg message description of the checkpoint + * @return Rev + * @throws IOException + */ + @ZeppelinApi + public Revision checkpoint(String noteId, String checkpointMsg, + AuthenticationInfo subject) throws IOException; + + /** + * Get particular revision of the Notebook. + * + * @param noteId Id of the Notebook + * @param revId revision of the Notebook + * @return a Notebook + * @throws IOException + */ + @ZeppelinApi + public Note get(String noteId, String revId, AuthenticationInfo subject) + throws IOException; + + /** + * List of revisions of the given Notebook. + * + * @param noteId id of the Notebook + * @return list of revisions + */ + @ZeppelinApi + public List revisionHistory(String noteId, AuthenticationInfo subject); + + /** + * Set note to particular revision. + * + * @param noteId Id of the Notebook + * @param revId revision of the Notebook + * @return a Notebook + * @throws IOException + */ + @ZeppelinApi + public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) + throws IOException; + + /** + * Represents the 'Revision' a point in life of the notebook + */ + static class Revision { + public static final Revision EMPTY = new Revision(StringUtils.EMPTY, StringUtils.EMPTY, 0); + + public String id; + public String message; + public int time; + + public Revision(String revId, String message, int time) { + this.id = revId; + this.message = message; + this.time = time; + } + + public static boolean isEmpty(Revision revision) { + return revision == null || EMPTY.equals(revision); + } + } + +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java index 5f7dc1d5983..6f9b0e9445f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java @@ -22,6 +22,7 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.repo.OldNotebookRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +35,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * Class for loading Plugins @@ -89,8 +89,54 @@ public NotebookRepo loadNotebookRepo(String notebookRepoClassName) throws IOExce return notebookRepo; } + private String getOldNotebookRepoClassName(String notebookRepoClassName) { + int pos = notebookRepoClassName.lastIndexOf("."); + return notebookRepoClassName.substring(0, pos) + ".Old" + notebookRepoClassName.substring(pos + 1); + } + + /** + * This is a temporary class which is used for loading old implemention of NotebookRepo. + * + * @param notebookRepoClassName + * @return + * @throws IOException + */ + public OldNotebookRepo loadOldNotebookRepo(String notebookRepoClassName) throws IOException { + LOGGER.info("Loading OldNotebookRepo Plugin: " + notebookRepoClassName); + // load plugin from classpath directly when it is test. + // otherwise load it from plugin folder + String isTest = System.getenv("IS_ZEPPELIN_TEST"); + if (isTest != null && isTest.equals("true")) { + try { + OldNotebookRepo notebookRepo = (OldNotebookRepo) + (Class.forName(notebookRepoClassName).newInstance()); + return notebookRepo; + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + LOGGER.warn("Fail to instantiate notebookrepo from classpath directly:" + notebookRepoClassName, e); + } + } + + String simpleClassName = notebookRepoClassName.substring(notebookRepoClassName.lastIndexOf(".") + 1); + URLClassLoader pluginClassLoader = getPluginClassLoader(pluginsDir, "NotebookRepo", simpleClassName); + if (pluginClassLoader == null) { + return null; + } + OldNotebookRepo notebookRepo = null; + try { + notebookRepoClassName = getOldNotebookRepoClassName(notebookRepoClassName); + notebookRepo = (OldNotebookRepo) (Class.forName(notebookRepoClassName, true, pluginClassLoader)).newInstance(); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + LOGGER.warn("Fail to instantiate notebookrepo from plugin classpath:" + notebookRepoClassName, e); + } + + if (notebookRepo == null) { + LOGGER.warn("Unable to load NotebookRepo Plugin: " + notebookRepoClassName); + } + return notebookRepo; + } + public synchronized InterpreterLauncher loadInterpreterLauncher(String launcherPlugin, - RecoveryStorage recoveryStorage) + RecoveryStorage recoveryStorage) throws IOException { if (cachedLaunchers.containsKey(launcherPlugin)) {