From 54a2423f7910a9f6230f813c5bdbcb4acd66660c Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Thu, 22 Dec 2016 23:53:04 +0900 Subject: [PATCH 1/5] Moved result(s) from Job into Paragraph --- .../remote/RemoteInterpreterServer.java | 11 ++++ .../org/apache/zeppelin/scheduler/Job.java | 29 +++------ .../remote/RemoteInterpreterTest.java | 60 ++++++++++++++++++- .../scheduler/RemoteSchedulerTest.java | 32 ++++++++++ .../zeppelin/scheduler/SleepingJob.java | 11 ++++ .../apache/zeppelin/notebook/Paragraph.java | 21 +++++++ 6 files changed, 142 insertions(+), 22 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index b167256e6bc..6baed824cea 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -402,6 +402,7 @@ class InterpretJob extends Job { private String script; private InterpreterContext context; private Map infos; + private Object results; public InterpretJob( String jobId, @@ -417,6 +418,11 @@ public InterpretJob( this.context = context; } + @Override + public Object getReturn() { + return results; + } + @Override public int progress() { return 0; @@ -514,6 +520,11 @@ protected Object jobRun() throws Throwable { protected boolean jobAbort() { return false; } + + @Override + public void setResult(Object results) { + this.results = results; + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index 4be6da5b161..dbea8df1d89 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -73,13 +73,6 @@ public boolean isPending() { private String jobName; String id; - // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph - // see ZEPPELIN-212 - Object results; - - // For backward compatibility of note.json format after ZEPPELIN-212 - Object result; - Date dateCreated; Date dateStarted; Date dateFinished; @@ -184,7 +177,7 @@ public void run() { progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs); progressUpdator.start(); dateStarted = new Date(); - results = jobRun(); + jobRun(); // jobRun will store results by itself and job can get it by calling getReturn(); this.exception = null; errorMessage = null; dateFinished = new Date(); @@ -193,14 +186,14 @@ public void run() { LOGGER.error("Job failed", e); progressUpdator.terminate(); this.exception = e; - results = e.getMessage(); + setResult(e.getMessage()); errorMessage = getStack(e); dateFinished = new Date(); } catch (Throwable e) { LOGGER.error("Job failed", e); progressUpdator.terminate(); this.exception = e; - results = e.getMessage(); + setResult(e.getMessage()); errorMessage = getStack(e); dateFinished = new Date(); } finally { @@ -226,13 +219,7 @@ protected void setException(Throwable t) { errorMessage = getStack(t); } - public Object getPreviousResultFormat() { - return result; - } - - public Object getReturn() { - return results; - } + public abstract Object getReturn(); public String getJobName() { return jobName; @@ -246,6 +233,10 @@ public void setJobName(String jobName) { public abstract Map info(); + /** + * jobRun should store the data into results so that job interface gets the result by calling + * getReturn(); + */ protected abstract Object jobRun() throws Throwable; protected abstract boolean jobAbort(); @@ -270,7 +261,5 @@ public Date getDateFinished() { return dateFinished; } - public void setResult(Object results) { - this.results = results; - } + public abstract void setResult(Object results); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 2a834b126e2..b25cc67119c 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -298,6 +298,17 @@ public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOExc long start = System.currentTimeMillis(); Job jobA = new Job("jobA", null) { + private Object results; + + @Override + public Object getReturn() { + return results; + } + + @Override + public void setResult(Object results) { + this.results = results; + } @Override public int progress() { @@ -336,6 +347,18 @@ protected boolean jobAbort() { Job jobB = new Job("jobB", null) { + private Object results; + + @Override + public Object getReturn() { + return results; + } + + @Override + public void setResult(Object results) { + this.results = results; + } + @Override public int progress() { return 0; @@ -403,6 +426,17 @@ public void testRunOrderPreserved() throws InterruptedException { for (int i = 0; i < concurrency; i++) { final String jobId = Integer.toString(i); scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { + private Object results; + + @Override + public Object getReturn() { + return results; + } + + @Override + public void setResult(Object results) { + this.results = results; + } @Override public int progress() { @@ -430,7 +464,7 @@ protected Object jobRun() throws Throwable { new LinkedList(), null)); synchronized (results) { - results.addAll(ret.message()); + ((List) results).addAll(ret.message()); results.notify(); } return null; @@ -483,6 +517,17 @@ public void testRunParallel() throws InterruptedException { for (int i = 0; i < concurrency; i++) { final String jobId = Integer.toString(i); scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { + private Object results; + + @Override + public Object getReturn() { + return results; + } + + @Override + public void setResult(Object results) { + this.results = results; + } @Override public int progress() { @@ -511,7 +556,7 @@ protected Object jobRun() throws Throwable { new LinkedList(), null)); synchronized (results) { - results.addAll(ret.message()); + ((List) results).addAll(ret.message()); results.notify(); } return stmt; @@ -586,6 +631,17 @@ public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedEx intpA.open(); Job jobA = new Job("jobA", null) { + private Object results; + + @Override + public Object getReturn() { + return results; + } + + @Override + public void setResult(Object results) { + this.results = results; + } @Override public int progress() { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index c450891ecc7..d7b2007e73c 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -93,6 +93,11 @@ public void test() throws Exception { 10); Job job = new Job("jobId", "jobName", null, 200) { + Object results; + @Override + public Object getReturn() { + return results; + } @Override public int progress() { @@ -125,6 +130,11 @@ protected Object jobRun() throws Throwable { protected boolean jobAbort() { return false; } + + @Override + public void setResult(Object results) { + this.results = results; + } }; scheduler.submit(job); @@ -185,6 +195,7 @@ public void testAbortOnPending() throws Exception { 10); Job job1 = new Job("jobId1", "jobName1", null, 200) { + Object results; InterpreterContext context = new InterpreterContext( "note", "jobId1", @@ -198,6 +209,11 @@ public void testAbortOnPending() throws Exception { new LocalResourcePool("pool1"), new LinkedList(), null); + @Override + public Object getReturn() { + return results; + } + @Override public int progress() { return 0; @@ -221,9 +237,15 @@ protected boolean jobAbort() { } return true; } + + @Override + public void setResult(Object results) { + this.results = results; + } }; Job job2 = new Job("jobId2", "jobName2", null, 200) { + public Object results; InterpreterContext context = new InterpreterContext( "note", "jobId2", @@ -237,6 +259,11 @@ protected boolean jobAbort() { new LocalResourcePool("pool1"), new LinkedList(), null); + @Override + public Object getReturn() { + return results; + } + @Override public int progress() { return 0; @@ -260,6 +287,11 @@ protected boolean jobAbort() { } return true; } + + @Override + public void setResult(Object results) { + this.results = results; + } }; job2.setResult("result2"); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/SleepingJob.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/SleepingJob.java index 68b21aeb141..359305871b9 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/SleepingJob.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/SleepingJob.java @@ -33,6 +33,7 @@ public class SleepingJob extends Job{ private int count; static Logger LOGGER = LoggerFactory.getLogger(SleepingJob.class); + private Object results; public SleepingJob(String jobName, JobListener listener, int time){ @@ -61,6 +62,16 @@ public boolean jobAbort() { return true; } + @Override + public void setResult(Object results) { + this.results = results; + } + + @Override + public Object getReturn() { + return results; + } + @Override public int progress() { long p = (System.currentTimeMillis() - start)*100 / time; diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 7e72564331d..48d3d42d11a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -65,6 +65,13 @@ public class Paragraph extends Job implements Serializable, Cloneable { private Map config; // paragraph configs like isOpen, colWidth, etc public GUI settings; // form and parameter settings + // since zeppelin-0.7.0, zeppelin stores multiple results of the paragraph + // see ZEPPELIN-212 + Object results; + + // For backward compatibility of note.json format after ZEPPELIN-212 + Object result; + /** * Applicaiton states in this paragraph */ @@ -116,6 +123,11 @@ public Paragraph getUserParagraph(String user) { return userParagraphMap.get(user); } + @Override + public void setResult(Object results) { + this.results = results; + } + public Paragraph cloneParagraphForUser(String user) { Paragraph p = new Paragraph(); p.settings.setParams(Maps.newHashMap(settings.getParams())); @@ -285,6 +297,15 @@ public InterpreterResult getResult() { return (InterpreterResult) getReturn(); } + @Override + public Object getReturn() { + return results; + } + + public Object getPreviousResultFormat() { + return result; + } + @Override public int progress() { String replName = getRequiredReplName(); From 69230023cb82ab72b33afebebd7894a09bee7731 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Dec 2016 03:11:11 +0900 Subject: [PATCH 2/5] Changed to store result from jobRun() in Job --- .../src/main/java/org/apache/zeppelin/scheduler/Job.java | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index dbea8df1d89..c86845174cc 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -177,7 +177,7 @@ public void run() { progressUpdator = new JobProgressPoller(this, progressUpdateIntervalMs); progressUpdator.start(); dateStarted = new Date(); - jobRun(); // jobRun will store results by itself and job can get it by calling getReturn(); + setResult(jobRun()); this.exception = null; errorMessage = null; dateFinished = new Date(); @@ -233,10 +233,6 @@ public void setJobName(String jobName) { public abstract Map info(); - /** - * jobRun should store the data into results so that job interface gets the result by calling - * getReturn(); - */ protected abstract Object jobRun() throws Throwable; protected abstract boolean jobAbort(); From 8de238b9ba72cb1b0d9c3e6f152a8d072a368e00 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 23 Dec 2016 03:31:33 +0900 Subject: [PATCH 3/5] Changed wrong variable name --- .../remote/RemoteInterpreterTest.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index b25cc67119c..71e5f5643e3 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -298,16 +298,16 @@ public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOExc long start = System.currentTimeMillis(); Job jobA = new Job("jobA", null) { - private Object results; + private Object r; @Override public Object getReturn() { - return results; + return r; } @Override public void setResult(Object results) { - this.results = results; + this.r = results; } @Override @@ -347,16 +347,16 @@ protected boolean jobAbort() { Job jobB = new Job("jobB", null) { - private Object results; + private Object r; @Override public Object getReturn() { - return results; + return r; } @Override public void setResult(Object results) { - this.results = results; + this.r = results; } @Override @@ -426,16 +426,16 @@ public void testRunOrderPreserved() throws InterruptedException { for (int i = 0; i < concurrency; i++) { final String jobId = Integer.toString(i); scheduler.submit(new Job(jobId, Integer.toString(i), null, 200) { - private Object results; + private Object r; @Override public Object getReturn() { - return results; + return r; } @Override public void setResult(Object results) { - this.results = results; + this.r = results; } @Override @@ -464,7 +464,7 @@ protected Object jobRun() throws Throwable { new LinkedList(), null)); synchronized (results) { - ((List) results).addAll(ret.message()); + results.addAll(ret.message()); results.notify(); } return null; @@ -517,16 +517,16 @@ public void testRunParallel() throws InterruptedException { for (int i = 0; i < concurrency; i++) { final String jobId = Integer.toString(i); scheduler.submit(new Job(jobId, Integer.toString(i), null, 300) { - private Object results; + private Object r; @Override public Object getReturn() { - return results; + return r; } @Override public void setResult(Object results) { - this.results = results; + this.r = results; } @Override @@ -556,7 +556,7 @@ protected Object jobRun() throws Throwable { new LinkedList(), null)); synchronized (results) { - ((List) results).addAll(ret.message()); + results.addAll(ret.message()); results.notify(); } return stmt; @@ -631,16 +631,16 @@ public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedEx intpA.open(); Job jobA = new Job("jobA", null) { - private Object results; + private Object r; @Override public Object getReturn() { - return results; + return r; } @Override public void setResult(Object results) { - this.results = results; + this.r = results; } @Override From 2fe9b63c42d340daaa280ac18166c3852279f161 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sun, 25 Dec 2016 02:52:38 +0900 Subject: [PATCH 4/5] Reformat code --- .../org/apache/zeppelin/scheduler/Job.java | 25 +- .../zeppelin/socket/NotebookServer.java | 818 ++++++++---------- .../org/apache/zeppelin/notebook/Note.java | 15 +- .../apache/zeppelin/notebook/Paragraph.java | 74 +- 4 files changed, 412 insertions(+), 520 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java index c86845174cc..9bb26f3acdd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/Job.java @@ -28,15 +28,14 @@ /** * Skeletal implementation of the Job concept. - * - designed for inheritance - * - should be run on a separate thread - * - maintains internal state: it's status - * - supports listeners who are updated on status change - * - * Job class is serialized/deserialized and used server<->client communication - * and saving/loading jobs from disk. - * Changing/adding/deleting non transitive field name need consideration of that. + * - designed for inheritance + * - should be run on a separate thread + * - maintains internal state: it's status + * - supports listeners who are updated on status change * + * Job class is serialized/deserialized and used server<->client communication + * and saving/loading jobs from disk. + * Changing/adding/deleting non transitive field name need consideration of that. */ public abstract class Job { /** @@ -48,15 +47,10 @@ public abstract class Job { * FINISHED - Job finished run. with success * ERROR - Job finished run. with error * ABORT - Job finished by abort - * */ public static enum Status { - READY, - PENDING, - RUNNING, - FINISHED, - ERROR, - ABORT; + READY, PENDING, RUNNING, FINISHED, ERROR, ABORT; + public boolean isReady() { return this == READY; } @@ -70,6 +64,7 @@ public boolean isPending() { } } + private String jobName; String id; diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 81f466a33a7..e390f052d0d 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -86,23 +86,26 @@ /** * Zeppelin websocket service. */ -public class NotebookServer extends WebSocketServlet implements - NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, - RemoteInterpreterProcessListener, ApplicationEventListener { +public class NotebookServer extends WebSocketServlet + implements NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, + RemoteInterpreterProcessListener, ApplicationEventListener { /** * Job manager service type */ protected enum JOB_MANAGER_SERVICE { JOB_MANAGER_PAGE("JOB_MANAGER_PAGE"); private String serviceTypeKey; + JOB_MANAGER_SERVICE(String serviceType) { this.serviceTypeKey = serviceType; } + String getKey() { return this.serviceTypeKey; } } + private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); final Map> noteSocketMap = new HashMap<>(); @@ -163,7 +166,7 @@ public void onMessage(NotebookSocket conn, String msg) { } String ticket = TicketContainer.instance.getTicket(messagereceived.principal); - if (ticket != null && !ticket.equals(messagereceived.ticket)){ + if (ticket != null && !ticket.equals(messagereceived.ticket)) { /* not to pollute logs, log instead of exception */ if (StringUtils.isEmpty(messagereceived.ticket)) { LOG.debug("{} message: invalid ticket {} != {}", messagereceived.op, @@ -172,7 +175,7 @@ public void onMessage(NotebookSocket conn, String msg) { if (!messagereceived.op.equals(OP.PING)) { conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", "Your ticket is invalid possibly due to server restart. " - + "Please refresh the page and login again."))); + + "Please refresh the page and login again."))); } } return; @@ -187,8 +190,9 @@ public void onMessage(NotebookSocket conn, String msg) { HashSet userAndRoles = new HashSet<>(); userAndRoles.add(messagereceived.principal); if (!messagereceived.roles.equals("")) { - HashSet roles = gson.fromJson(messagereceived.roles, - new TypeToken>(){}.getType()); + HashSet roles = + gson.fromJson(messagereceived.roles, new TypeToken>() { + }.getType()); if (roles != null) { userAndRoles.addAll(roles); } @@ -201,121 +205,121 @@ public void onMessage(NotebookSocket conn, String msg) { /** Lets be elegant here */ switch (messagereceived.op) { - case LIST_NOTES: - unicastNoteList(conn, subject, userAndRoles); - break; - case RELOAD_NOTES_FROM_REPO: - broadcastReloadedNoteList(subject, userAndRoles); - break; - case GET_HOME_NOTE: - sendHomeNote(conn, userAndRoles, notebook, messagereceived); - break; - case GET_NOTE: - sendNote(conn, userAndRoles, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, userAndRoles, notebook, messagereceived); - break; - case DEL_NOTE: - removeNote(conn, userAndRoles, notebook, messagereceived); - break; - case CLONE_NOTE: - cloneNote(conn, userAndRoles, notebook, messagereceived); - break; - case IMPORT_NOTE: - importNote(conn, userAndRoles, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case COPY_PARAGRAPH: - copyParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case PARAGRAPH_CLEAR_OUTPUT: - clearParagraphOutput(conn, userAndRoles, notebook, messagereceived); - break; - case PARAGRAPH_CLEAR_ALL_OUTPUT: - clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, userAndRoles, notebook, messagereceived); - break; - case NOTE_RENAME: - renameNote(conn, userAndRoles, notebook, messagereceived); - break; - case FOLDER_RENAME: - renameFolder(conn, userAndRoles, notebook, messagereceived); - break; - case UPDATE_PERSONALIZED_MODE: - updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, userAndRoles, notebook, messagereceived); - break; - case PING: - break; //do nothing - case ANGULAR_OBJECT_UPDATED: - angularObjectUpdated(conn, userAndRoles, notebook, messagereceived); - break; - case ANGULAR_OBJECT_CLIENT_BIND: - angularObjectClientBind(conn, userAndRoles, notebook, messagereceived); - break; - case ANGULAR_OBJECT_CLIENT_UNBIND: - angularObjectClientUnbind(conn, userAndRoles, notebook, messagereceived); - break; - case LIST_CONFIGURATIONS: - sendAllConfigurations(conn, userAndRoles, notebook); - break; - case CHECKPOINT_NOTE: - checkpointNote(conn, notebook, messagereceived); - break; - case LIST_REVISION_HISTORY: - listRevisionHistory(conn, notebook, messagereceived); - break; - case SET_NOTE_REVISION: - setNoteRevision(conn, userAndRoles, notebook, messagereceived); - break; - case NOTE_REVISION: - getNoteByRevision(conn, notebook, messagereceived); - break; - case LIST_NOTE_JOBS: - unicastNoteJobInfo(conn, messagereceived); - break; - case UNSUBSCRIBE_UPDATE_NOTE_JOBS: - unsubscribeNoteJobInfo(conn); - break; - case GET_INTERPRETER_BINDINGS: - getInterpreterBindings(conn, messagereceived); - break; - case SAVE_INTERPRETER_BINDINGS: - saveInterpreterBindings(conn, messagereceived); - break; - case EDITOR_SETTING: - getEditorSetting(conn, messagereceived); - break; - case GET_INTERPRETER_SETTINGS: - getInterpreterSettings(conn, subject); - break; - case WATCHER: - switchConnectionToWatcher(conn, messagereceived); - break; - default: - break; + case LIST_NOTES: + unicastNoteList(conn, subject, userAndRoles); + break; + case RELOAD_NOTES_FROM_REPO: + broadcastReloadedNoteList(subject, userAndRoles); + break; + case GET_HOME_NOTE: + sendHomeNote(conn, userAndRoles, notebook, messagereceived); + break; + case GET_NOTE: + sendNote(conn, userAndRoles, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, userAndRoles, notebook, messagereceived); + break; + case DEL_NOTE: + removeNote(conn, userAndRoles, notebook, messagereceived); + break; + case CLONE_NOTE: + cloneNote(conn, userAndRoles, notebook, messagereceived); + break; + case IMPORT_NOTE: + importNote(conn, userAndRoles, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case COPY_PARAGRAPH: + copyParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case PARAGRAPH_CLEAR_OUTPUT: + clearParagraphOutput(conn, userAndRoles, notebook, messagereceived); + break; + case PARAGRAPH_CLEAR_ALL_OUTPUT: + clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, userAndRoles, notebook, messagereceived); + break; + case NOTE_RENAME: + renameNote(conn, userAndRoles, notebook, messagereceived); + break; + case FOLDER_RENAME: + renameFolder(conn, userAndRoles, notebook, messagereceived); + break; + case UPDATE_PERSONALIZED_MODE: + updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, userAndRoles, notebook, messagereceived); + break; + case PING: + break; //do nothing + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, userAndRoles, notebook, messagereceived); + break; + case ANGULAR_OBJECT_CLIENT_BIND: + angularObjectClientBind(conn, userAndRoles, notebook, messagereceived); + break; + case ANGULAR_OBJECT_CLIENT_UNBIND: + angularObjectClientUnbind(conn, userAndRoles, notebook, messagereceived); + break; + case LIST_CONFIGURATIONS: + sendAllConfigurations(conn, userAndRoles, notebook); + break; + case CHECKPOINT_NOTE: + checkpointNote(conn, notebook, messagereceived); + break; + case LIST_REVISION_HISTORY: + listRevisionHistory(conn, notebook, messagereceived); + break; + case SET_NOTE_REVISION: + setNoteRevision(conn, userAndRoles, notebook, messagereceived); + break; + case NOTE_REVISION: + getNoteByRevision(conn, notebook, messagereceived); + break; + case LIST_NOTE_JOBS: + unicastNoteJobInfo(conn, messagereceived); + break; + case UNSUBSCRIBE_UPDATE_NOTE_JOBS: + unsubscribeNoteJobInfo(conn); + break; + case GET_INTERPRETER_BINDINGS: + getInterpreterBindings(conn, messagereceived); + break; + case SAVE_INTERPRETER_BINDINGS: + saveInterpreterBindings(conn, messagereceived); + break; + case EDITOR_SETTING: + getEditorSetting(conn, messagereceived); + break; + case GET_INTERPRETER_SETTINGS: + getInterpreterSettings(conn, subject); + break; + case WATCHER: + switchConnectionToWatcher(conn, messagereceived); + break; + default: + break; } } catch (Exception e) { LOG.error("Can't handle message", e); @@ -324,8 +328,8 @@ public void onMessage(NotebookSocket conn, String msg) { @Override public void onClose(NotebookSocket conn, int code, String reason) { - LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() - .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); + LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest().getRemoteAddr(), + conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); connectedSockets.remove(conn); removeUserConnection(conn.getUser(), conn); @@ -412,8 +416,7 @@ private String getOpenNoteId(NotebookSocket socket) { return id; } - private void broadcastToNoteBindedInterpreter(String interpreterGroupId, - Message m) { + private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); for (Note note : notes) { @@ -471,7 +474,7 @@ private void multicastToUser(String user, Message m) { return; } - for (NotebookSocket conn: userConnectedSockets.get(user)) { + for (NotebookSocket conn : userConnectedSockets.get(user)) { unicast(m, conn); } } @@ -488,15 +491,13 @@ private void unicast(Message m, NotebookSocket conn) { public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException { addConnectionToNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - List> noteJobs = notebook() - .getJobListByUnixTime(false, 0, subject); + List> noteJobs = notebook().getJobListByUnixTime(false, 0, subject); Map response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", noteJobs); - conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS) - .put("noteJobs", response))); + conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response))); } public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException { @@ -513,7 +514,7 @@ public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOExcepti response.put("jobs", noteJobs != null ? noteJobs : new LinkedList<>()); broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); } public void unsubscribeNoteJobInfo(NotebookSocket conn) { @@ -523,9 +524,10 @@ public void unsubscribeNoteJobInfo(NotebookSocket conn) { public void saveInterpreterBindings(NotebookSocket conn, Message fromMessage) { String noteId = (String) fromMessage.data.get("noteId"); try { - List settingIdList = gson.fromJson(String.valueOf( - fromMessage.data.get("selectedSettingIds")), new TypeToken>() { - }.getType()); + List settingIdList = + gson.fromJson(String.valueOf(fromMessage.data.get("selectedSettingIds")), + new TypeToken>() { + }.getType()); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); notebook().bindInterpretersToNote(subject.getUser(), noteId, settingIdList); broadcastInterpreterBindings(noteId, @@ -535,13 +537,12 @@ public void saveInterpreterBindings(NotebookSocket conn, Message fromMessage) { } } - public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) - throws IOException { + public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException { String noteId = (String) fromMessage.data.get("noteId"); List settingList = InterpreterBindingUtils.getInterpreterBindings(notebook(), noteId); - conn.send(serializeMessage(new Message(OP.INTERPRETER_BINDINGS) - .put("interpreterBindings", settingList))); + conn.send(serializeMessage( + new Message(OP.INTERPRETER_BINDINGS).put("interpreterBindings", settingList))); } public List> generateNotesInfo(boolean needsReload, @@ -551,8 +552,8 @@ public List> generateNotesInfo(boolean needsReload, ZeppelinConfiguration conf = notebook.getConf(); String homescreenNoteId = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); - boolean hideHomeScreenNotebookFromList = conf - .getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); + boolean hideHomeScreenNotebookFromList = + conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); if (needsReload) { try { @@ -608,11 +609,8 @@ public void broadcastParagraphs(Map userParagraphMap, private void broadcastNewParagraph(Note note, Paragraph para) { LOG.info("Broadcasting paragraph on run call instead of note."); int paraIndex = note.getParagraphs().indexOf(para); - broadcast( - note.getId(), - new Message(OP.PARAGRAPH_ADDED) - .put("paragraph", para) - .put("index", paraIndex)); + broadcast(note.getId(), + new Message(OP.PARAGRAPH_ADDED).put("paragraph", para).put("index", paraIndex)); } public void broadcastNoteList(AuthenticationInfo subject, HashSet userAndRoles) { @@ -648,7 +646,7 @@ private void broadcastNoteListExcept(List> notesInfo, AuthenticationInfo subject) { Set userAndRoles; NotebookAuthorization authInfo = NotebookAuthorization.getInstance(); - for (String user: userConnectedSockets.keySet()) { + for (String user : userConnectedSockets.keySet()) { if (subject.getUser().equals(user)) { continue; } @@ -660,26 +658,22 @@ private void broadcastNoteListExcept(List> notesInfo, } } - void permissionError(NotebookSocket conn, String op, - String userName, - Set userAndRoles, - Set allowed) throws IOException { - LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", - op, userAndRoles, allowed); + void permissionError(NotebookSocket conn, String op, String userName, Set userAndRoles, + Set allowed) throws IOException { + LOG.info("Cannot {}. Connection readers {}. Allowed readers {}", op, userAndRoles, allowed); conn.send(serializeMessage(new Message(OP.AUTH_INFO).put("info", - "Insufficient privileges to " + op + "note.\n\n" + - "Allowed users or roles: " + allowed.toString() + "\n\n" + - "But the user " + userName + " belongs to: " + userAndRoles.toString()))); + "Insufficient privileges to " + op + "note.\n\n" + "Allowed users or roles: " + allowed + .toString() + "\n\n" + "But the user " + userName + " belongs to: " + userAndRoles + .toString()))); } private void sendNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws IOException { LOG.info("New operation from {} : {} : {} : {} : {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort(), - fromMessage.principal, fromMessage.op, fromMessage.get("id") - ); + conn.getRequest().getRemotePort(), fromMessage.principal, fromMessage.op, + fromMessage.get("id")); String noteId = (String) fromMessage.get("id"); if (noteId == null) { @@ -708,8 +702,8 @@ private void sendNote(NotebookSocket conn, HashSet userAndRoles, Noteboo } } - private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String noteId = notebook.getConf().getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); String user = fromMessage.principal; @@ -721,8 +715,8 @@ private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, if (note != null) { NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isReader(noteId, userAndRoles)) { - permissionError(conn, "read", fromMessage.principal, - userAndRoles, notebookAuthorization.getReaders(noteId)); + permissionError(conn, "read", fromMessage.principal, userAndRoles, + notebookAuthorization.getReaders(noteId)); return; } addConnectionToNote(note.getId(), conn); @@ -734,13 +728,11 @@ private void sendHomeNote(NotebookSocket conn, HashSet userAndRoles, } } - private void updateNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + private void updateNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); - Map config = (Map) fromMessage - .get("config"); + Map config = (Map) fromMessage.get("config"); if (noteId == null) { return; } @@ -750,8 +742,8 @@ private void updateNote(NotebookSocket conn, HashSet userAndRoles, NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "update", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "update", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } @@ -766,9 +758,7 @@ private void updateNote(NotebookSocket conn, HashSet userAndRoles, AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); note.persist(subject); - broadcast(note.getId(), new Message(OP.NOTE_UPDATED) - .put("name", name) - .put("config", config) + broadcast(note.getId(), new Message(OP.NOTE_UPDATED).put("name", name).put("config", config) .put("info", note.getInfo())); broadcastNoteList(subject, userAndRoles); } @@ -786,8 +776,8 @@ private void updatePersonalizedMode(NotebookSocket conn, HashSet userAnd NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { - permissionError(conn, "persoanlized ", fromMessage.principal, - userAndRoles, notebookAuthorization.getOwners(noteId)); + permissionError(conn, "persoanlized ", fromMessage.principal, userAndRoles, + notebookAuthorization.getOwners(noteId)); return; } @@ -800,9 +790,8 @@ private void updatePersonalizedMode(NotebookSocket conn, HashSet userAnd } } - private void renameNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + private void renameNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); @@ -812,8 +801,8 @@ private void renameNote(NotebookSocket conn, HashSet userAndRoles, NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { - permissionError(conn, "rename", fromMessage.principal, - userAndRoles, notebookAuthorization.getOwners(noteId)); + permissionError(conn, "rename", fromMessage.principal, userAndRoles, + notebookAuthorization.getOwners(noteId)); return; } @@ -828,9 +817,8 @@ private void renameNote(NotebookSocket conn, HashSet userAndRoles, } } - private void renameFolder(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + private void renameFolder(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws SchedulerException, IOException { String oldFolderId = (String) fromMessage.get("id"); String newFolderId = (String) fromMessage.get("name"); @@ -843,7 +831,7 @@ private void renameFolder(NotebookSocket conn, HashSet userAndRoles, String noteId = note.getId(); if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { permissionError(conn, "rename folder of '" + note.getName() + "'", fromMessage.principal, - userAndRoles, notebookAuthorization.getOwners(noteId)); + userAndRoles, notebookAuthorization.getOwners(noteId)); return; } } @@ -863,11 +851,10 @@ private void renameFolder(NotebookSocket conn, HashSet userAndRoles, } } - private boolean isCronUpdated(Map configA, - Map configB) { + private boolean isCronUpdated(Map configA, Map configB) { boolean cronUpdated = false; - if (configA.get("cron") != null && configB.get("cron") != null - && configA.get("cron").equals(configB.get("cron"))) { + if (configA.get("cron") != null && configB.get("cron") != null && configA.get("cron") + .equals(configB.get("cron"))) { cronUpdated = true; } else if (configA.get("cron") == null && configB.get("cron") == null) { cronUpdated = false; @@ -878,9 +865,8 @@ private boolean isCronUpdated(Map configA, return cronUpdated; } - private void createNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message message) - throws IOException { + private void createNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message message) throws IOException { AuthenticationInfo subject = new AuthenticationInfo(message.principal); try { @@ -891,7 +877,7 @@ private void createNote(NotebookSocket conn, HashSet userAndRoles, List interpreterSettingIds = new LinkedList<>(); interpreterSettingIds.add(defaultInterpreterId); for (String interpreterSettingId : notebook.getInterpreterFactory(). - getDefaultInterpreterSettingList()) { + getDefaultInterpreterSettingList()) { if (!interpreterSettingId.equals(defaultInterpreterId)) { interpreterSettingIds.add(interpreterSettingId); } @@ -916,16 +902,15 @@ private void createNote(NotebookSocket conn, HashSet userAndRoles, } catch (FileSystemException e) { LOG.error("Exception from createNote", e); conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", - "Oops! There is something wrong with the notebook file system. " - + "Please check the logs for more details."))); + "Oops! There is something wrong with the notebook file system. " + + "Please check the logs for more details."))); return; } broadcastNoteList(subject, userAndRoles); } - private void removeNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws IOException { + private void removeNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -934,8 +919,8 @@ private void removeNote(NotebookSocket conn, HashSet userAndRoles, Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isOwner(noteId, userAndRoles)) { - permissionError(conn, "remove", fromMessage.principal, - userAndRoles, notebookAuthorization.getOwners(noteId)); + permissionError(conn, "remove", fromMessage.principal, userAndRoles, + notebookAuthorization.getOwners(noteId)); return; } @@ -985,9 +970,8 @@ private void updateParagraph(NotebookSocket conn, HashSet userAndRoles, } } - private void cloneNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws IOException, CloneNotSupportedException { + private void cloneNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException, CloneNotSupportedException { String noteId = getOpenNoteId(conn); String name = (String) fromMessage.get("name"); Note newNote = notebook.cloneNote(noteId, name, new AuthenticationInfo(fromMessage.principal)); @@ -998,8 +982,7 @@ private void cloneNote(NotebookSocket conn, HashSet userAndRoles, } private void clearAllParagraphOutput(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws IOException { + Notebook notebook, Message fromMessage) throws IOException { final String noteId = (String) fromMessage.get("id"); if (StringUtils.isBlank(noteId)) { return; @@ -1007,8 +990,8 @@ private void clearAllParagraphOutput(NotebookSocket conn, HashSet userAn Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "clear output", fromMessage.principal, - userAndRoles, notebookAuthorization.getOwners(noteId)); + permissionError(conn, "clear output", fromMessage.principal, userAndRoles, + notebookAuthorization.getOwners(noteId)); return; } @@ -1016,9 +999,8 @@ private void clearAllParagraphOutput(NotebookSocket conn, HashSet userAn broadcastNote(note); } - protected Note importNote(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws IOException { + protected Note importNote(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { Note note = null; if (fromMessage != null) { String noteName = (String) ((Map) fromMessage.get("note")).get("name"); @@ -1037,8 +1019,8 @@ protected Note importNote(NotebookSocket conn, HashSet userAndRoles, return note; } - private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -1048,8 +1030,8 @@ private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } @@ -1059,13 +1041,13 @@ private void removeParagraph(NotebookSocket conn, HashSet userAndRoles, note.persist(subject); if (para != null) { broadcast(note.getId(), new Message(OP.PARAGRAPH_REMOVED). - put("id", para.getId())); + put("id", para.getId())); } } } private void clearParagraphOutput(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -1074,8 +1056,8 @@ private void clearParagraphOutput(NotebookSocket conn, HashSet userAndRo final Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } note.clearParagraphOutput(paragraphId); @@ -1103,12 +1085,12 @@ private void completion(NotebookSocket conn, HashSet userAndRoles, Noteb /** * When angular object updated from client * - * @param conn the web socket. - * @param notebook the notebook. + * @param conn the web socket. + * @param notebook the notebook. * @param fromMessage the message. */ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) { + Notebook notebook, Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String paragraphId = (String) fromMessage.get("paragraphId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); @@ -1120,15 +1102,15 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRo // propagate change to (Remote) AngularObjectRegistry Note note = notebook.getNote(noteId); if (note != null) { - List settings = notebook.getInterpreterFactory() - .getInterpreterSettings(note.getId()); + List settings = + notebook.getInterpreterFactory().getInterpreterSettings(note.getId()); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup(user, note.getId()) == null) { continue; } if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()).getId())) { - AngularObjectRegistry angularObjectRegistry = setting - .getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); + AngularObjectRegistry angularObjectRegistry = + setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); // first trying to get local registry ao = angularObjectRegistry.get(varName, noteId, paragraphId); @@ -1162,33 +1144,27 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRo if (global) { // broadcast change to all web session that uses related // interpreter. for (Note n : notebook.getAllNotes()) { - List settings = notebook.getInterpreterFactory() - .getInterpreterSettings(note.getId()); + List settings = + notebook.getInterpreterFactory().getInterpreterSettings(note.getId()); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup(user, n.getId()) == null) { continue; } if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()).getId())) { - AngularObjectRegistry angularObjectRegistry = setting - .getInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); - this.broadcastExcept( - n.getId(), + AngularObjectRegistry angularObjectRegistry = + setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); + this.broadcastExcept(n.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.getId()) - .put("paragraphId", ao.getParagraphId()), - conn); + .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId()) + .put("paragraphId", ao.getParagraphId()), conn); } } } } else { // broadcast to all web session for the note - this.broadcastExcept( - note.getId(), + this.broadcastExcept(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.getId()) - .put("paragraphId", ao.getParagraphId()), - conn); + .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) + .put("paragraphId", ao.getParagraphId()), conn); } } @@ -1196,14 +1172,14 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRo * Push the given Angular variable to the target * interpreter angular registry given a noteId * and a paragraph id + * * @param conn * @param notebook * @param fromMessage * @throws Exception */ protected void angularObjectClientBind(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws Exception { + Notebook notebook, Message fromMessage) throws Exception { String noteId = fromMessage.getType("noteId"); String varName = fromMessage.getType("name"); Object varValue = fromMessage.get("value"); @@ -1211,24 +1187,23 @@ protected void angularObjectClientBind(NotebookSocket conn, HashSet user Note note = notebook.getNote(noteId); if (paragraphId == null) { - throw new IllegalArgumentException("target paragraph not specified for " + - "angular value bind"); + throw new IllegalArgumentException( + "target paragraph not specified for " + "angular value bind"); } if (note != null) { - final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note, - paragraphId); + final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note, paragraphId); final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; pushAngularObjectToRemoteRegistry(noteId, paragraphId, varName, varValue, remoteRegistry, - interpreterGroup.getId(), conn); + interpreterGroup.getId(), conn); } else { pushAngularObjectToLocalRepo(noteId, paragraphId, varName, varValue, registry, - interpreterGroup.getId(), conn); + interpreterGroup.getId(), conn); } } } @@ -1237,37 +1212,36 @@ protected void angularObjectClientBind(NotebookSocket conn, HashSet user * Remove the given Angular variable to the target * interpreter(s) angular registry given a noteId * and an optional list of paragraph id(s) + * * @param conn * @param notebook * @param fromMessage * @throws Exception */ protected void angularObjectClientUnbind(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) - throws Exception{ + Notebook notebook, Message fromMessage) throws Exception { String noteId = fromMessage.getType("noteId"); String varName = fromMessage.getType("name"); String paragraphId = fromMessage.getType("paragraphId"); Note note = notebook.getNote(noteId); if (paragraphId == null) { - throw new IllegalArgumentException("target paragraph not specified for " + - "angular value unBind"); + throw new IllegalArgumentException( + "target paragraph not specified for " + "angular value unBind"); } if (note != null) { - final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note, - paragraphId); + final InterpreterGroup interpreterGroup = findInterpreterGroupForParagraph(note, paragraphId); final AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { RemoteAngularObjectRegistry remoteRegistry = (RemoteAngularObjectRegistry) registry; removeAngularFromRemoteRegistry(noteId, paragraphId, varName, remoteRegistry, - interpreterGroup.getId(), conn); + interpreterGroup.getId(), conn); } else { removeAngularObjectFromLocalRepo(noteId, paragraphId, varName, registry, - interpreterGroup.getId(), conn); + interpreterGroup.getId(), conn); } } } @@ -1281,39 +1255,30 @@ private InterpreterGroup findInterpreterGroupForParagraph(Note note, String para return paragraph.getCurrentRepl().getInterpreterGroup(); } - private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, - String varName, Object varValue, RemoteAngularObjectRegistry remoteRegistry, - String interpreterGroupId, NotebookSocket conn) { + private void pushAngularObjectToRemoteRegistry(String noteId, String paragraphId, String varName, + Object varValue, RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId, + NotebookSocket conn) { - final AngularObject ao = remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, - noteId, paragraphId); + final AngularObject ao = + remoteRegistry.addAndNotifyRemoteProcess(varName, varValue, noteId, paragraphId); - this.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); + this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); } - private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, - String varName, RemoteAngularObjectRegistry remoteRegistry, - String interpreterGroupId, NotebookSocket conn) { - final AngularObject ao = remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, - paragraphId); - this.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); + private void removeAngularFromRemoteRegistry(String noteId, String paragraphId, String varName, + RemoteAngularObjectRegistry remoteRegistry, String interpreterGroupId, NotebookSocket conn) { + final AngularObject ao = + remoteRegistry.removeAndNotifyRemoteProcess(varName, noteId, paragraphId); + this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); } private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, String varName, - Object varValue, AngularObjectRegistry registry, - String interpreterGroupId, NotebookSocket conn) { + Object varValue, AngularObjectRegistry registry, String interpreterGroupId, + NotebookSocket conn) { AngularObject angularObject = registry.get(varName, noteId, paragraphId); if (angularObject == null) { angularObject = registry.add(varName, varValue, noteId, paragraphId); @@ -1321,26 +1286,20 @@ private void pushAngularObjectToLocalRepo(String noteId, String paragraphId, Str angularObject.set(varValue, true); } - this.broadcastExcept( - noteId, - new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); + this.broadcastExcept(noteId, + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", angularObject) + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); } private void removeAngularObjectFromLocalRepo(String noteId, String paragraphId, String varName, - AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) { + AngularObjectRegistry registry, String interpreterGroupId, NotebookSocket conn) { final AngularObject removed = registry.remove(varName, noteId, paragraphId); if (removed != null) { - this.broadcastExcept( - noteId, + this.broadcastExcept(noteId, new Message(OP.ANGULAR_OBJECT_REMOVE).put("angularObject", removed) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", noteId) - .put("paragraphId", paragraphId), - conn); + .put("interpreterGroupId", interpreterGroupId).put("noteId", noteId) + .put("paragraphId", paragraphId), conn); } } @@ -1351,36 +1310,33 @@ private void moveParagraph(NotebookSocket conn, HashSet userAndRoles, No return; } - final int newIndex = (int) Double.parseDouble(fromMessage.get("index") - .toString()); + final int newIndex = (int) Double.parseDouble(fromMessage.get("index").toString()); String noteId = getOpenNoteId(conn); final Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } note.moveParagraph(paragraphId, newIndex); note.persist(subject); - broadcast(note.getId(), new Message(OP.PARAGRAPH_MOVED) - .put("id", paragraphId) - .put("index", newIndex)); + broadcast(note.getId(), + new Message(OP.PARAGRAPH_MOVED).put("id", paragraphId).put("index", newIndex)); } private String insertParagraph(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { - final int index = (int) Double.parseDouble(fromMessage.get("index") - .toString()); + Notebook notebook, Message fromMessage) throws IOException { + final int index = (int) Double.parseDouble(fromMessage.get("index").toString()); String noteId = getOpenNoteId(conn); final Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return null; } @@ -1391,8 +1347,8 @@ private String insertParagraph(NotebookSocket conn, HashSet userAndRoles return newPara.getId(); } - private void copyParagraph(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + private void copyParagraph(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String newParaId = insertParagraph(conn, userAndRoles, notebook, fromMessage); if (newParaId == null) { @@ -1414,8 +1370,8 @@ private void cancelParagraph(NotebookSocket conn, HashSet userAndRoles, final Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } @@ -1434,8 +1390,8 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not final Note note = notebook.getNote(noteId); NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "write", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "write", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } @@ -1447,11 +1403,9 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not new AuthenticationInfo(fromMessage.principal, fromMessage.ticket); p.setAuthenticationInfo(authenticationInfo); - Map params = (Map) fromMessage - .get("params"); + Map params = (Map) fromMessage.get("params"); p.settings.setParams(params); - Map config = (Map) fromMessage - .get("config"); + Map config = (Map) fromMessage.get("config"); p.setConfig(config); // if it's the last paragraph, let's add a new one @@ -1468,8 +1422,8 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not } catch (FileSystemException ex) { LOG.error("Exception from run", ex); conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", - "Oops! There is something wrong with the notebook file system. " - + "Please check the logs for more details."))); + "Oops! There is something wrong with the notebook file system. " + + "Please check the logs for more details."))); // don't run the paragraph when there is error on persisting the note information return; } @@ -1479,9 +1433,7 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not } catch (Exception ex) { LOG.error("Exception from run", ex); if (p != null) { - p.setReturn( - new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), - ex); + p.setReturn(new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), ex); p.setStatus(Status.ERROR); broadcast(note.getId(), new Message(OP.PARAGRAPH).put("paragraph", p)); } @@ -1489,63 +1441,61 @@ private void runParagraph(NotebookSocket conn, HashSet userAndRoles, Not } private void sendAllConfigurations(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook) throws IOException { + Notebook notebook) throws IOException { ZeppelinConfiguration conf = notebook.getConf(); - Map configurations = conf.dumpConfigurations(conf, - new ZeppelinConfiguration.ConfigurationKeyPredicate() { + Map configurations = + conf.dumpConfigurations(conf, new ZeppelinConfiguration.ConfigurationKeyPredicate() { @Override public boolean apply(String key) { - return !key.contains("password") && - !key.equals(ZeppelinConfiguration - .ConfVars - .ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING + return !key.contains("password") && !key.equals( + ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_AZURE_CONNECTION_STRING .getVarName()); } }); - conn.send(serializeMessage(new Message(OP.CONFIGURATIONS_INFO) - .put("configurations", configurations))); + conn.send(serializeMessage( + new Message(OP.CONFIGURATIONS_INFO).put("configurations", configurations))); } - private void checkpointNote(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + private void checkpointNote(NotebookSocket conn, Notebook notebook, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); String commitMessage = (String) fromMessage.get("commitMessage"); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); Revision revision = notebook.checkpointNote(noteId, commitMessage, subject); if (!Revision.isEmpty(revision)) { List revisions = notebook.listRevisionHistory(noteId, subject); - conn.send(serializeMessage(new Message(OP.LIST_REVISION_HISTORY) - .put("revisionList", revisions))); + conn.send( + serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); } else { conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", "Couldn't checkpoint note revision: possibly storage doesn't support versioning. " - + "Please check the logs for more details."))); + + "Please check the logs for more details."))); } } - private void listRevisionHistory(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + private void listRevisionHistory(NotebookSocket conn, Notebook notebook, Message fromMessage) + throws IOException { String noteId = (String) fromMessage.get("noteId"); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); List revisions = notebook.listRevisionHistory(noteId, subject); - conn.send(serializeMessage(new Message(OP.LIST_REVISION_HISTORY) - .put("revisionList", revisions))); + conn.send( + serializeMessage(new Message(OP.LIST_REVISION_HISTORY).put("revisionList", revisions))); } - - private void setNoteRevision(NotebookSocket conn, HashSet userAndRoles, - Notebook notebook, Message fromMessage) throws IOException { + + private void setNoteRevision(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("noteId"); String revisionId = (String) fromMessage.get("revisionId"); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - + NotebookAuthorization notebookAuthorization = notebook.getNotebookAuthorization(); if (!notebookAuthorization.isWriter(noteId, userAndRoles)) { - permissionError(conn, "update", fromMessage.principal, - userAndRoles, notebookAuthorization.getWriters(noteId)); + permissionError(conn, "update", fromMessage.principal, userAndRoles, + notebookAuthorization.getWriters(noteId)); return; } @@ -1562,8 +1512,7 @@ private void setNoteRevision(NotebookSocket conn, HashSet userAndRoles, notebook.loadNoteFromRepo(noteId, subject); } - conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION) - .put("status", setRevisionStatus))); + conn.send(serializeMessage(new Message(OP.SET_NOTE_REVISION).put("status", setRevisionStatus))); if (setRevisionStatus) { Note reloadedNote = notebook.getNote(headNote.getId()); @@ -1571,7 +1520,7 @@ private void setNoteRevision(NotebookSocket conn, HashSet userAndRoles, } else { conn.send(serializeMessage(new Message(OP.ERROR_INFO).put("info", "Couldn't set note to the given revision. " - + "Please check the logs for more details."))); + + "Please check the logs for more details."))); } } @@ -1581,55 +1530,49 @@ private void getNoteByRevision(NotebookSocket conn, Notebook notebook, Message f String revisionId = (String) fromMessage.get("revisionId"); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); Note revisionNote = notebook.getNoteByRevision(noteId, revisionId, subject); - conn.send(serializeMessage(new Message(OP.NOTE_REVISION) - .put("noteId", noteId) - .put("revisionId", revisionId) - .put("note", revisionNote))); + conn.send(serializeMessage( + new Message(OP.NOTE_REVISION).put("noteId", noteId).put("revisionId", revisionId) + .put("note", revisionNote))); } /** * This callback is for the paragraph that runs on ZeppelinServer + * * @param noteId * @param paragraphId - * @param output output to append + * @param output output to append */ @Override public void onOutputAppend(String noteId, String paragraphId, int index, String output) { - Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("index", index) - .put("data", output); + Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", noteId) + .put("paragraphId", paragraphId).put("index", index).put("data", output); broadcast(noteId, msg); } /** * This callback is for the paragraph that runs on ZeppelinServer + * * @param noteId * @param paragraphId - * @param output output to update (replace) + * @param output output to update (replace) */ @Override - public void onOutputUpdated( - String noteId, String paragraphId, int index, InterpreterResult.Type type, String output) { - Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("index", index) - .put("type", type) - .put("data", output); + public void onOutputUpdated(String noteId, String paragraphId, int index, + InterpreterResult.Type type, String output) { + Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", noteId) + .put("paragraphId", paragraphId).put("index", index).put("type", type).put("data", output); broadcast(noteId, msg); } /** * This callback is for the paragraph that runs on ZeppelinServer + * * @param noteId * @param paragraphId */ @Override - public void onOutputClear( - String noteId, String paragraphId) { + public void onOutputClear(String noteId, String paragraphId) { Notebook notebook = notebook(); final Note note = notebook.getNote(noteId); note.clearParagraphOutput(paragraphId); @@ -1639,67 +1582,56 @@ public void onOutputClear( /** * When application append output + * * @param noteId * @param paragraphId * @param appId * @param output */ @Override - public void onOutputAppend( - String noteId, String paragraphId, int index, String appId, String output) { - Message msg = new Message(OP.APP_APPEND_OUTPUT) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("index", index) - .put("appId", appId) - .put("data", output); + public void onOutputAppend(String noteId, String paragraphId, int index, String appId, + String output) { + Message msg = + new Message(OP.APP_APPEND_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) + .put("index", index).put("appId", appId).put("data", output); broadcast(noteId, msg); } /** * When application update output + * * @param noteId * @param paragraphId * @param appId * @param output */ @Override - public void onOutputUpdated( - String noteId, String paragraphId, int index, String appId, InterpreterResult.Type type, - String output) { - Message msg = new Message(OP.APP_UPDATE_OUTPUT) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("index", index) - .put("type", type) - .put("appId", appId) - .put("data", output); + public void onOutputUpdated(String noteId, String paragraphId, int index, String appId, + InterpreterResult.Type type, String output) { + Message msg = + new Message(OP.APP_UPDATE_OUTPUT).put("noteId", noteId).put("paragraphId", paragraphId) + .put("index", index).put("type", type).put("appId", appId).put("data", output); broadcast(noteId, msg); } @Override public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) { - Message msg = new Message(OP.APP_LOAD) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("appId", appId) - .put("pkg", pkg); + Message msg = new Message(OP.APP_LOAD).put("noteId", noteId).put("paragraphId", paragraphId) + .put("appId", appId).put("pkg", pkg); broadcast(noteId, msg); } @Override public void onStatusChange(String noteId, String paragraphId, String appId, String status) { - Message msg = new Message(OP.APP_STATUS_CHANGE) - .put("noteId", noteId) - .put("paragraphId", paragraphId) - .put("appId", appId) - .put("status", status); + Message msg = + new Message(OP.APP_STATUS_CHANGE).put("noteId", noteId).put("paragraphId", paragraphId) + .put("appId", appId).put("status", status); broadcast(noteId, msg); } @Override - public void onGetParagraphRunners( - String noteId, String paragraphId, RemoteWorksEventListener callback) { + public void onGetParagraphRunners(String noteId, String paragraphId, + RemoteWorksEventListener callback) { Notebook notebookIns = notebook(); List runner = new LinkedList<>(); @@ -1815,9 +1747,7 @@ public void onNoteRemove(Note note) { @Override public void onParagraphCreate(Paragraph p) { Notebook notebook = notebookServer.notebook(); - List> notebookJobs = notebook.getJobListByParagraphId( - p.getId() - ); + List> notebookJobs = notebook.getJobListByParagraphId(p.getId()); Map response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notebookJobs); @@ -1829,9 +1759,7 @@ public void onParagraphCreate(Paragraph p) { @Override public void onNoteCreate(Note note) { Notebook notebook = notebookServer.notebook(); - List> notebookJobs = notebook.getJobListByNoteId( - note.getId() - ); + List> notebookJobs = notebook.getJobListByNoteId(note.getId()); Map response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notebookJobs); @@ -1843,9 +1771,7 @@ public void onNoteCreate(Note note) { @Override public void onParagraphStatusChange(Paragraph p, Status status) { Notebook notebook = notebookServer.notebook(); - List> notebookJobs = notebook.getJobListByParagraphId( - p.getId() - ); + List> notebookJobs = notebook.getJobListByParagraphId(p.getId()); Map response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); @@ -1858,9 +1784,7 @@ public void onParagraphStatusChange(Paragraph p, Status status) { @Override public void onUnbindInterpreter(Note note, InterpreterSetting setting) { Notebook notebook = notebookServer.notebook(); - List> notebookJobs = notebook.getJobListByNoteId( - note.getId() - ); + List> notebookJobs = notebook.getJobListByNoteId(note.getId()); Map response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", notebookJobs); @@ -1870,9 +1794,9 @@ public void onUnbindInterpreter(Note note, InterpreterSetting setting) { } } + /** * Need description here. - * */ public static class ParagraphListenerImpl implements ParagraphJobListener { private NotebookServer notebookServer; @@ -1885,10 +1809,8 @@ public ParagraphListenerImpl(NotebookServer notebookServer, Note note) { @Override public void onProgressUpdate(Job job, int progress) { - notebookServer.broadcast( - note.getId(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", - job.progress())); + notebookServer.broadcast(note.getId(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); } @Override @@ -1924,22 +1846,23 @@ public void afterStatusChange(Job job, Status before, Status after) { /** * This callback is for paragraph that runs on RemoteInterpreterProcess + * * @param paragraph * @param idx * @param output */ @Override public void onOutputAppend(Paragraph paragraph, int idx, String output) { - Message msg = new Message(OP.PARAGRAPH_APPEND_OUTPUT) - .put("noteId", paragraph.getNote().getId()) - .put("paragraphId", paragraph.getId()) - .put("data", output); + Message msg = + new Message(OP.PARAGRAPH_APPEND_OUTPUT).put("noteId", paragraph.getNote().getId()) + .put("paragraphId", paragraph.getId()).put("data", output); notebookServer.broadcast(paragraph.getNote().getId(), msg); } /** * This callback is for paragraph that runs on RemoteInterpreterProcess + * * @param paragraph * @param idx * @param result @@ -1947,10 +1870,9 @@ public void onOutputAppend(Paragraph paragraph, int idx, String output) { @Override public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage result) { String output = result.getData(); - Message msg = new Message(OP.PARAGRAPH_UPDATE_OUTPUT) - .put("noteId", paragraph.getNote().getId()) - .put("paragraphId", paragraph.getId()) - .put("data", output); + Message msg = + new Message(OP.PARAGRAPH_UPDATE_OUTPUT).put("noteId", paragraph.getNote().getId()) + .put("paragraphId", paragraph.getId()).put("data", output); notebookServer.broadcast(paragraph.getNote().getId(), msg); } @@ -2010,19 +1932,15 @@ public void onUpdate(String interpreterGroupId, AngularObject object) { continue; } - List intpSettings = notebook.getInterpreterFactory() - .getInterpreterSettings(note.getId()); + List intpSettings = + notebook.getInterpreterFactory().getInterpreterSettings(note.getId()); if (intpSettings.isEmpty()) { continue; } - broadcast( - note.getId(), - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.getId()) - .put("paragraphId", object.getParagraphId())); + broadcast(note.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId).put("noteId", note.getId()) + .put("paragraphId", object.getParagraphId())); } } @@ -2038,11 +1956,8 @@ public void onRemove(String interpreterGroupId, String name, String noteId, Stri List settingIds = notebook.getInterpreterFactory().getInterpreters(note.getId()); for (String id : settingIds) { if (interpreterGroupId.contains(id)) { - broadcast( - note.getId(), - new Message(OP.ANGULAR_OBJECT_REMOVE) - .put("name", name) - .put("noteId", noteId) + broadcast(note.getId(), + new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put("noteId", noteId) .put("paragraphId", paragraphId)); break; } @@ -2050,8 +1965,7 @@ public void onRemove(String interpreterGroupId, String name, String noteId, Stri } } - private void getEditorSetting(NotebookSocket conn, Message fromMessage) - throws IOException { + private void getEditorSetting(NotebookSocket conn, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("paragraphId"); String replName = (String) fromMessage.get("magic"); String noteId = getOpenNoteId(conn); @@ -2066,14 +1980,13 @@ private void getEditorSetting(NotebookSocket conn, Message fromMessage) private void getInterpreterSettings(NotebookSocket conn, AuthenticationInfo subject) throws IOException { List availableSettings = notebook().getInterpreterFactory().get(); - conn.send(serializeMessage(new Message(OP.INTERPRETER_SETTINGS) - .put("interpreterSettings", availableSettings))); + conn.send(serializeMessage( + new Message(OP.INTERPRETER_SETTINGS).put("interpreterSettings", availableSettings))); } @Override public void onMetaInfosReceived(String settingId, Map metaInfos) { - InterpreterSetting interpreterSetting = notebook().getInterpreterFactory() - .get(settingId); + InterpreterSetting interpreterSetting = notebook().getInterpreterFactory().get(settingId); interpreterSetting.setInfos(metaInfos); } @@ -2099,8 +2012,8 @@ private void switchConnectionToWatcher(NotebookSocket conn, Message messagerecei private boolean isSessionAllowedToSwitchToWatcher(NotebookSocket session) { String watcherSecurityKey = session.getRequest().getHeader(WatcherSecurityKey.HTTP_HEADER); - return !(StringUtils.isBlank(watcherSecurityKey) - || !watcherSecurityKey.equals(WatcherSecurityKey.getKey())); + return !(StringUtils.isBlank(watcherSecurityKey) || !watcherSecurityKey + .equals(WatcherSecurityKey.getKey())); } private void broadcastToWatchers(String noteId, String subject, Message message) { @@ -2110,12 +2023,9 @@ private void broadcastToWatchers(String noteId, String subject, Message message) } for (NotebookSocket watcher : watcherSockets) { try { - watcher.send(WatcherMessage - .builder(noteId) - .subject(subject) - .message(serializeMessage(message)) - .build() - .serialize()); + watcher.send( + WatcherMessage.builder(noteId).subject(subject).message(serializeMessage(message)) + .build().serialize()); } catch (IOException e) { LOG.error("Cannot broadcast message to watcher", e); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 3daeda0ceab..0995169d0af 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -223,9 +223,9 @@ void setInterpreterFactory(InterpreterFactory factory) { public void initializeJobListenerForParagraph(Paragraph paragraph) { final Note paragraphNote = paragraph.getNote(); if (paragraphNote.getId().equals(this.getId())) { - throw new IllegalArgumentException(format("The paragraph %s from note %s " + - "does not belong to note %s", paragraph.getId(), paragraphNote.getId(), - this.getId())); + throw new IllegalArgumentException( + format("The paragraph %s from note %s " + "does not belong to note %s", paragraph.getId(), + paragraphNote.getId(), this.getId())); } boolean foundParagraph = false; @@ -237,8 +237,9 @@ public void initializeJobListenerForParagraph(Paragraph paragraph) { } if (!foundParagraph) { - throw new IllegalArgumentException(format("Cannot find paragraph %s " + - "from note %s", paragraph.getId(), paragraphNote.getId())); + throw new IllegalArgumentException( + format("Cannot find paragraph %s " + "from note %s", paragraph.getId(), + paragraphNote.getId())); } } @@ -419,8 +420,8 @@ public void moveParagraph(String paragraphId, int index, boolean throwWhenIndexI if (index < 0 || index >= paragraphs.size()) { if (throwWhenIndexIsOutOfBound) { - throw new IndexOutOfBoundsException("paragraph size is " + paragraphs.size() + - " , index is " + index); + throw new IndexOutOfBoundsException( + "paragraph size is " + paragraphs.size() + " , index is " + index); } else { return; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 48d3d42d11a..a69c0e7f1e9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -47,7 +47,6 @@ /** * Paragraph is a representation of an execution unit. - * */ public class Paragraph extends Job implements Serializable, Cloneable { private static final long serialVersionUID = -6328572073497992016L; @@ -75,7 +74,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { /** * Applicaiton states in this paragraph */ - private final List apps = new LinkedList<>(); + private final List apps = new LinkedList<>(); @VisibleForTesting Paragraph() { @@ -85,7 +84,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { } public Paragraph(String paragraphId, Note note, JobListener listener, - InterpreterFactory factory) { + InterpreterFactory factory) { super(paragraphId, generateId(), listener); this.note = note; this.factory = factory; @@ -111,8 +110,8 @@ public Paragraph(Note note, JobListener listener, InterpreterFactory factory) { } private static String generateId() { - return "paragraph_" + System.currentTimeMillis() + "_" - + new Random(System.currentTimeMillis()).nextInt(); + return "paragraph_" + System.currentTimeMillis() + "_" + new Random(System.currentTimeMillis()) + .nextInt(); } public Map getUserParagraphMap() { @@ -248,10 +247,10 @@ public Interpreter getCurrentRepl() { public List getInterpreterCompletion() { List completion = new LinkedList(); - for (InterpreterSetting intp: factory.getInterpreterSettings(note.getId())){ + for (InterpreterSetting intp : factory.getInterpreterSettings(note.getId())) { List intInfo = intp.getInterpreterInfos(); if (intInfo.size() > 1) { - for (InterpreterInfo info : intInfo){ + for (InterpreterInfo info : intInfo) { String name = intp.getName() + "." + info.getName(); completion.add(new InterpreterCompletion(name, name)); } @@ -264,9 +263,7 @@ public List getInterpreterCompletion() { public List completion(String buffer, int cursor) { String lines[] = buffer.split(System.getProperty("line.separator")); - if (lines.length > 0 - && lines[0].startsWith("%") - && cursor <= lines[0].trim().length()) { + if (lines.length > 0 && lines[0].startsWith("%") && cursor <= lines[0].trim().length()) { int idx = lines[0].indexOf(' '); if (idx < 0 || (idx > 0 && cursor <= idx)) { @@ -327,7 +324,7 @@ private boolean hasPermission(String user, List intpUsers) { return true; } - for (String u: intpUsers) { + for (String u : intpUsers) { if (user.trim().equals(u.trim())) { return true; } @@ -351,16 +348,15 @@ protected Object jobRun() throws Throwable { } InterpreterSetting intp = getInterpreterSettingById(repl.getInterpreterGroup().getId()); while (intp.getStatus().equals( - org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) { + org.apache.zeppelin.interpreter.InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES)) { Thread.sleep(200); } if (this.noteHasUser() && this.noteHasInterpreters()) { - if (intp != null && - interpreterHasUser(intp) && - isUserAuthorizedToAccessInterpreter(intp.getOption()) == false) { + if (intp != null && interpreterHasUser(intp) + && isUserAuthorizedToAccessInterpreter(intp.getOption()) == false) { logger.error("{} has no permission for {} ", authenticationInfo.getUser(), repl); - return new InterpreterResult(Code.ERROR, authenticationInfo.getUser() + - " has no permission for " + getRequiredReplName()); + return new InterpreterResult(Code.ERROR, + authenticationInfo.getUser() + " has no permission for " + getRequiredReplName()); } } @@ -371,10 +367,10 @@ protected Object jobRun() throws Throwable { } else if (repl.getFormType() == FormType.SIMPLE) { String scriptBody = getScriptBody(); Map inputs = Input.extractSimpleQueryParam(scriptBody); // inputs will be built - // from script body + // from script body - final AngularObjectRegistry angularRegistry = repl.getInterpreterGroup() - .getAngularObjectRegistry(); + final AngularObjectRegistry angularRegistry = + repl.getInterpreterGroup().getAngularObjectRegistry(); scriptBody = extractVariablesFromAngularRegistry(scriptBody, inputs, angularRegistry); @@ -425,14 +421,14 @@ private boolean interpreterHasUser(InterpreterSetting intp) { return intp.getOption().permissionIsSet() && intp.getOption().getUsers() != null; } - private boolean isUserAuthorizedToAccessInterpreter(InterpreterOption intpOpt){ - return intpOpt.permissionIsSet() && - hasPermission(authenticationInfo.getUser(), intpOpt.getUsers()); + private boolean isUserAuthorizedToAccessInterpreter(InterpreterOption intpOpt) { + return intpOpt.permissionIsSet() && hasPermission(authenticationInfo.getUser(), + intpOpt.getUsers()); } private InterpreterSetting getInterpreterSettingById(String id) { InterpreterSetting setting = null; - for (InterpreterSetting i: factory.getInterpreterSettings(note.getId())) { + for (InterpreterSetting i : factory.getInterpreterSettings(note.getId())) { if (id.startsWith(i.getId())) { setting = i; break; @@ -475,8 +471,8 @@ public void onAppend(int index, InterpreterResultMessageOutput out, byte[] line) @Override public void onUpdate(int index, InterpreterResultMessageOutput out) { try { - ((ParagraphJobListener) getListener()).onOutputUpdate( - self, index, out.toInterpreterResultMessage()); + ((ParagraphJobListener) getListener()) + .onOutputUpdate(self, index, out.toInterpreterResultMessage()); } catch (IOException e) { logger.error(e.getMessage(), e); } @@ -521,24 +517,15 @@ private InterpreterContext getInterpreterContext(InterpreterOutput output) { Credentials credentials = note.getCredentials(); if (authenticationInfo != null) { - UserCredentials userCredentials = credentials.getUserCredentials( - authenticationInfo.getUser()); + UserCredentials userCredentials = + credentials.getUserCredentials(authenticationInfo.getUser()); authenticationInfo.setUserCredentials(userCredentials); } - InterpreterContext interpreterContext = new InterpreterContext( - note.getId(), - getId(), - getRequiredReplName(), - this.getTitle(), - this.getText(), - this.getAuthenticationInfo(), - this.getConfig(), - this.settings, - registry, - resourcePool, - runners, - output); + InterpreterContext interpreterContext = + new InterpreterContext(note.getId(), getId(), getRequiredReplName(), this.getTitle(), + this.getText(), this.getAuthenticationInfo(), this.getConfig(), this.settings, registry, + resourcePool, runners, output); return interpreterContext; } @@ -619,7 +606,7 @@ public List getAllApplicationStates() { } String extractVariablesFromAngularRegistry(String scriptBody, Map inputs, - AngularObjectRegistry angularRegistry) { + AngularObjectRegistry angularRegistry) { final String noteId = this.getNote().getId(); final String paragraphId = this.getId(); @@ -655,8 +642,7 @@ public String getMagic() { private boolean isValidInterpreter(String replName) { try { - return factory.getInterpreter(user, - note.getId(), replName) != null; + return factory.getInterpreter(user, note.getId(), replName) != null; } catch (InterpreterException e) { // ignore this exception, it would be recaught when running paragraph. return false; From ca0be8671a6c5014cc48a983b27a5bab8ed6f329 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sun, 25 Dec 2016 03:59:04 +0900 Subject: [PATCH 5/5] Fixed some weird indentation --- .../zeppelin/socket/NotebookServer.java | 276 ++++++++---------- 1 file changed, 122 insertions(+), 154 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index e390f052d0d..a903af2d89c 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -89,6 +89,7 @@ public class NotebookServer extends WebSocketServlet implements NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener, RemoteInterpreterProcessListener, ApplicationEventListener { + /** * Job manager service type */ @@ -205,121 +206,121 @@ public void onMessage(NotebookSocket conn, String msg) { /** Lets be elegant here */ switch (messagereceived.op) { - case LIST_NOTES: - unicastNoteList(conn, subject, userAndRoles); - break; - case RELOAD_NOTES_FROM_REPO: - broadcastReloadedNoteList(subject, userAndRoles); - break; - case GET_HOME_NOTE: - sendHomeNote(conn, userAndRoles, notebook, messagereceived); - break; - case GET_NOTE: - sendNote(conn, userAndRoles, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, userAndRoles, notebook, messagereceived); - break; - case DEL_NOTE: - removeNote(conn, userAndRoles, notebook, messagereceived); - break; - case CLONE_NOTE: - cloneNote(conn, userAndRoles, notebook, messagereceived); - break; - case IMPORT_NOTE: - importNote(conn, userAndRoles, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case COPY_PARAGRAPH: - copyParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, userAndRoles, notebook, messagereceived); - break; - case PARAGRAPH_CLEAR_OUTPUT: - clearParagraphOutput(conn, userAndRoles, notebook, messagereceived); - break; - case PARAGRAPH_CLEAR_ALL_OUTPUT: - clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, userAndRoles, notebook, messagereceived); - break; - case NOTE_RENAME: - renameNote(conn, userAndRoles, notebook, messagereceived); - break; - case FOLDER_RENAME: - renameFolder(conn, userAndRoles, notebook, messagereceived); - break; - case UPDATE_PERSONALIZED_MODE: - updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, userAndRoles, notebook, messagereceived); - break; - case PING: - break; //do nothing - case ANGULAR_OBJECT_UPDATED: - angularObjectUpdated(conn, userAndRoles, notebook, messagereceived); - break; - case ANGULAR_OBJECT_CLIENT_BIND: - angularObjectClientBind(conn, userAndRoles, notebook, messagereceived); - break; - case ANGULAR_OBJECT_CLIENT_UNBIND: - angularObjectClientUnbind(conn, userAndRoles, notebook, messagereceived); - break; - case LIST_CONFIGURATIONS: - sendAllConfigurations(conn, userAndRoles, notebook); - break; - case CHECKPOINT_NOTE: - checkpointNote(conn, notebook, messagereceived); - break; - case LIST_REVISION_HISTORY: - listRevisionHistory(conn, notebook, messagereceived); - break; - case SET_NOTE_REVISION: - setNoteRevision(conn, userAndRoles, notebook, messagereceived); - break; - case NOTE_REVISION: - getNoteByRevision(conn, notebook, messagereceived); - break; - case LIST_NOTE_JOBS: - unicastNoteJobInfo(conn, messagereceived); - break; - case UNSUBSCRIBE_UPDATE_NOTE_JOBS: - unsubscribeNoteJobInfo(conn); - break; - case GET_INTERPRETER_BINDINGS: - getInterpreterBindings(conn, messagereceived); - break; - case SAVE_INTERPRETER_BINDINGS: - saveInterpreterBindings(conn, messagereceived); - break; - case EDITOR_SETTING: - getEditorSetting(conn, messagereceived); - break; - case GET_INTERPRETER_SETTINGS: - getInterpreterSettings(conn, subject); - break; - case WATCHER: - switchConnectionToWatcher(conn, messagereceived); - break; - default: - break; + case LIST_NOTES: + unicastNoteList(conn, subject, userAndRoles); + break; + case RELOAD_NOTES_FROM_REPO: + broadcastReloadedNoteList(subject, userAndRoles); + break; + case GET_HOME_NOTE: + sendHomeNote(conn, userAndRoles, notebook, messagereceived); + break; + case GET_NOTE: + sendNote(conn, userAndRoles, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, userAndRoles, notebook, messagereceived); + break; + case DEL_NOTE: + removeNote(conn, userAndRoles, notebook, messagereceived); + break; + case CLONE_NOTE: + cloneNote(conn, userAndRoles, notebook, messagereceived); + break; + case IMPORT_NOTE: + importNote(conn, userAndRoles, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case COPY_PARAGRAPH: + copyParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, userAndRoles, notebook, messagereceived); + break; + case PARAGRAPH_CLEAR_OUTPUT: + clearParagraphOutput(conn, userAndRoles, notebook, messagereceived); + break; + case PARAGRAPH_CLEAR_ALL_OUTPUT: + clearAllParagraphOutput(conn, userAndRoles, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, userAndRoles, notebook, messagereceived); + break; + case NOTE_RENAME: + renameNote(conn, userAndRoles, notebook, messagereceived); + break; + case FOLDER_RENAME: + renameFolder(conn, userAndRoles, notebook, messagereceived); + break; + case UPDATE_PERSONALIZED_MODE: + updatePersonalizedMode(conn, userAndRoles, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, userAndRoles, notebook, messagereceived); + break; + case PING: + break; //do nothing + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, userAndRoles, notebook, messagereceived); + break; + case ANGULAR_OBJECT_CLIENT_BIND: + angularObjectClientBind(conn, userAndRoles, notebook, messagereceived); + break; + case ANGULAR_OBJECT_CLIENT_UNBIND: + angularObjectClientUnbind(conn, userAndRoles, notebook, messagereceived); + break; + case LIST_CONFIGURATIONS: + sendAllConfigurations(conn, userAndRoles, notebook); + break; + case CHECKPOINT_NOTE: + checkpointNote(conn, notebook, messagereceived); + break; + case LIST_REVISION_HISTORY: + listRevisionHistory(conn, notebook, messagereceived); + break; + case SET_NOTE_REVISION: + setNoteRevision(conn, userAndRoles, notebook, messagereceived); + break; + case NOTE_REVISION: + getNoteByRevision(conn, notebook, messagereceived); + break; + case LIST_NOTE_JOBS: + unicastNoteJobInfo(conn, messagereceived); + break; + case UNSUBSCRIBE_UPDATE_NOTE_JOBS: + unsubscribeNoteJobInfo(conn); + break; + case GET_INTERPRETER_BINDINGS: + getInterpreterBindings(conn, messagereceived); + break; + case SAVE_INTERPRETER_BINDINGS: + saveInterpreterBindings(conn, messagereceived); + break; + case EDITOR_SETTING: + getEditorSetting(conn, messagereceived); + break; + case GET_INTERPRETER_SETTINGS: + getInterpreterSettings(conn, subject); + break; + case WATCHER: + switchConnectionToWatcher(conn, messagereceived); + break; + default: + break; } } catch (Exception e) { LOG.error("Can't handle message", e); @@ -1085,8 +1086,8 @@ private void completion(NotebookSocket conn, HashSet userAndRoles, Noteb /** * When angular object updated from client * - * @param conn the web socket. - * @param notebook the notebook. + * @param conn the web socket. + * @param notebook the notebook. * @param fromMessage the message. */ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRoles, @@ -1172,11 +1173,6 @@ private void angularObjectUpdated(NotebookSocket conn, HashSet userAndRo * Push the given Angular variable to the target * interpreter angular registry given a noteId * and a paragraph id - * - * @param conn - * @param notebook - * @param fromMessage - * @throws Exception */ protected void angularObjectClientBind(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws Exception { @@ -1212,11 +1208,6 @@ protected void angularObjectClientBind(NotebookSocket conn, HashSet user * Remove the given Angular variable to the target * interpreter(s) angular registry given a noteId * and an optional list of paragraph id(s) - * - * @param conn - * @param notebook - * @param fromMessage - * @throws Exception */ protected void angularObjectClientUnbind(NotebookSocket conn, HashSet userAndRoles, Notebook notebook, Message fromMessage) throws Exception { @@ -1538,9 +1529,7 @@ private void getNoteByRevision(NotebookSocket conn, Notebook notebook, Message f /** * This callback is for the paragraph that runs on ZeppelinServer * - * @param noteId - * @param paragraphId - * @param output output to append + * @param output output to append */ @Override public void onOutputAppend(String noteId, String paragraphId, int index, String output) { @@ -1552,9 +1541,7 @@ public void onOutputAppend(String noteId, String paragraphId, int index, String /** * This callback is for the paragraph that runs on ZeppelinServer * - * @param noteId - * @param paragraphId - * @param output output to update (replace) + * @param output output to update (replace) */ @Override public void onOutputUpdated(String noteId, String paragraphId, int index, @@ -1567,9 +1554,6 @@ public void onOutputUpdated(String noteId, String paragraphId, int index, /** * This callback is for the paragraph that runs on ZeppelinServer - * - * @param noteId - * @param paragraphId */ @Override public void onOutputClear(String noteId, String paragraphId) { @@ -1582,11 +1566,6 @@ public void onOutputClear(String noteId, String paragraphId) { /** * When application append output - * - * @param noteId - * @param paragraphId - * @param appId - * @param output */ @Override public void onOutputAppend(String noteId, String paragraphId, int index, String appId, @@ -1599,11 +1578,6 @@ public void onOutputAppend(String noteId, String paragraphId, int index, String /** * When application update output - * - * @param noteId - * @param paragraphId - * @param appId - * @param output */ @Override public void onOutputUpdated(String noteId, String paragraphId, int index, String appId, @@ -1699,6 +1673,7 @@ public void onRemoteRunParagraph(String noteId, String paragraphId) throws Excep * Notebook Information Change event */ public static class NotebookInformationListener implements NotebookEventListener { + private NotebookServer notebookServer; public NotebookInformationListener(NotebookServer notebookServer) { @@ -1799,6 +1774,7 @@ public void onUnbindInterpreter(Note note, InterpreterSetting setting) { * Need description here. */ public static class ParagraphListenerImpl implements ParagraphJobListener { + private NotebookServer notebookServer; private Note note; @@ -1846,10 +1822,6 @@ public void afterStatusChange(Job job, Status before, Status after) { /** * This callback is for paragraph that runs on RemoteInterpreterProcess - * - * @param paragraph - * @param idx - * @param output */ @Override public void onOutputAppend(Paragraph paragraph, int idx, String output) { @@ -1862,10 +1834,6 @@ public void onOutputAppend(Paragraph paragraph, int idx, String output) { /** * This callback is for paragraph that runs on RemoteInterpreterProcess - * - * @param paragraph - * @param idx - * @param result */ @Override public void onOutputUpdate(Paragraph paragraph, int idx, InterpreterResultMessage result) {