From 038cdb1b09d89c4b7b9ce206d6b3613d0475c276 Mon Sep 17 00:00:00 2001 From: joelz Date: Thu, 6 Aug 2015 15:15:21 -0700 Subject: [PATCH 1/7] Fix to websocket origin bug --- .../zeppelin/socket/NotebookServerTests.java | 49 +++++++++++++++++++ .../socket/TestHttpServletRequest.java | 7 +++ .../socket/TestNotebookSocketListener.java | 7 +++ 3 files changed, 63 insertions(+) create mode 100644 zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java create mode 100644 zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java create mode 100644 zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java new file mode 100644 index 00000000000..d36719de690 --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java @@ -0,0 +1,49 @@ +/** + * Created by joelz on 8/6/15. + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.socket; + + import org.junit.Assert; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +/** + * BASIC Zeppelin rest api tests + * TODO: Add Post,Put,Delete test and method + * + * @author joelz + * + */ +@FixMethodOrder(MethodSorters.NAME_ASCENDING) + public class NotebookServerTest { + + @Test + public void CheckOrigin(){ + NotebookServer server = new NotebookServer(); + NotebookSocket socket = new NotebookSocket(new TestHttpServletRequest(), "http", new TestNotebookSocketListener()); + server.onOpen(socket); + Assert.assertEquals(1, server.seenOrigins.size()); + server.doWebSocketConnect(new TestHttpServletRequest(), "http"); + Assert.assertTrue(server.checkOrigin(new TestHttpServletRequest(), "http://localhost:8080")); + server.onClose(socket, 0, "Shutdown"); + Assert.assertFalse(server.checkOrigin(new TestHttpServletRequest(), "http://localhost:8080")); + Assert.assertEquals(0, server.seenOrigins.size()); + } +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java new file mode 100644 index 00000000000..ec88b6aaaac --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java @@ -0,0 +1,7 @@ +package org.apache.zeppelin.socket; + +/** + * Created by joelz on 8/6/15. + */ +public class TestHttpServletRequest { +} diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java new file mode 100644 index 00000000000..236f744ba5e --- /dev/null +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java @@ -0,0 +1,7 @@ +package org.apache.zeppelin.socket; + +/** + * Created by joelz on 8/6/15. + */ +public class TestNotebookSocketListener { +} From 86619a485fc41d095521e170104168ba92e05e9f Mon Sep 17 00:00:00 2001 From: joelz Date: Thu, 6 Aug 2015 15:23:51 -0700 Subject: [PATCH 2/7] Fixing websocket issue --- .../zeppelin/socket/NotebookServer.java | 121 ++++-------------- 1 file changed, 28 insertions(+), 93 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index ed35ea1b0ca..7e8ab62b944 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -14,19 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.zeppelin.socket; - import java.io.IOException; import java.net.InetSocketAddress; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - +import java.util.*; import javax.servlet.http.HttpServletRequest; - import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; @@ -46,10 +38,8 @@ import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.base.Strings; import com.google.gson.Gson; - /** * Zeppelin websocket service. * @@ -57,32 +47,34 @@ */ public class NotebookServer extends WebSocketServlet implements NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { - private static final Logger LOG = LoggerFactory - .getLogger(NotebookServer.class); - + .getLogger(NotebookServer.class); Gson gson = new Gson(); - Map> noteSocketMap = new HashMap>(); - List connectedSockets = new LinkedList(); - + final Map> noteSocketMap = new HashMap<>(); + final List connectedSockets = new LinkedList<>(); + final ArrayList seenOrigins = new ArrayList<>(); private Notebook notebook() { return ZeppelinServer.notebook; } - + @Override + public boolean checkOrigin(HttpServletRequest request, String origin) { + return seenOrigins.contains(origin); + } @Override public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) { return new NotebookSocket(req, protocol, this); } - @Override public void onOpen(NotebookSocket conn) { LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), conn.getRequest().getRemotePort()); synchronized (connectedSockets) { + synchronized (seenOrigins) { + seenOrigins.add(getOrigin(conn)); + } connectedSockets.add(conn); } } - @Override public void onMessage(NotebookSocket conn, String msg) { Notebook notebook = notebook(); @@ -141,42 +133,38 @@ public void onMessage(NotebookSocket conn, String msg) { LOG.error("Can't handle message", e); } } - @Override public void onClose(NotebookSocket conn, int code, String reason) { LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); synchronized (connectedSockets) { + synchronized (seenOrigins) { + seenOrigins.remove(getOrigin(conn)); + } connectedSockets.remove(conn); } } - private Message deserializeMessage(String msg) { - Message m = gson.fromJson(msg, Message.class); - return m; + return gson.fromJson(msg, Message.class); } - private String serializeMessage(Message m) { return gson.toJson(m); } - private void addConnectionToNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { removeConnectionFromAllNote(socket); // make sure a socket relates only a // single note. List socketList = noteSocketMap.get(noteId); if (socketList == null) { - socketList = new LinkedList(); + socketList = new LinkedList<>(); noteSocketMap.put(noteId, socketList); } - - if (socketList.contains(socket) == false) { + if (!socketList.contains(socket)) { socketList.add(socket); } } } - private void removeConnectionFromNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { List socketList = noteSocketMap.get(noteId); @@ -185,13 +173,11 @@ private void removeConnectionFromNote(String noteId, NotebookSocket socket) { } } } - private void removeNote(String noteId) { synchronized (noteSocketMap) { List socketList = noteSocketMap.remove(noteId); } } - private void removeConnectionFromAllNote(NotebookSocket socket) { synchronized (noteSocketMap) { Set keys = noteSocketMap.keySet(); @@ -200,7 +186,6 @@ private void removeConnectionFromAllNote(NotebookSocket socket) { } } } - private String getOpenNoteId(NotebookSocket socket) { String id = null; synchronized (noteSocketMap) { @@ -214,7 +199,6 @@ private String getOpenNoteId(NotebookSocket socket) { } return id; } - private void broadcastToNoteBindedInterpreter(String interpreterGroupId, Message m) { Notebook notebook = notebook(); @@ -228,16 +212,13 @@ private void broadcastToNoteBindedInterpreter(String interpreterGroupId, } } } - private void broadcast(String noteId, Message m) { synchronized (noteSocketMap) { List socketLists = noteSocketMap.get(noteId); if (socketLists == null || socketLists.size() == 0) { return; } - LOG.info("SEND >> " + m.op); - for (NotebookSocket conn : socketLists) { try { conn.send(serializeMessage(m)); @@ -247,7 +228,6 @@ private void broadcast(String noteId, Message m) { } } } - private void broadcastAll(Message m) { synchronized (connectedSockets) { for (NotebookSocket conn : connectedSockets) { @@ -259,24 +239,21 @@ private void broadcastAll(Message m) { } } } - private void broadcastNote(Note note) { broadcast(note.id(), new Message(OP.NOTE).put("note", note)); } - private void broadcastNoteList() { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); - List> notesInfo = new LinkedList>(); + List> notesInfo = new LinkedList<>(); for (Note note : notes) { - Map info = new HashMap(); + Map info = new HashMap<>(); info.put("id", note.id()); info.put("name", note.getName()); notesInfo.add(info); } broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } - private void sendNote(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); @@ -284,14 +261,12 @@ private void sendNote(NotebookSocket conn, Notebook notebook, return; } Note note = notebook.getNote(noteId); - if (note != null) { addConnectionToNote(note.id(), conn); conn.send(serializeMessage(new Message(OP.NOTE).put("note", note))); sendAllAngularObjects(note, conn); } } - private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); @@ -309,17 +284,14 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) boolean cronUpdated = isCronUpdated(config, note.getConfig()); note.setName(name); note.setConfig(config); - if (cronUpdated) { notebook.refreshCron(note.id()); } note.persist(); - broadcastNote(note); broadcastNoteList(); } } - private boolean isCronUpdated(Map configA, Map configB) { boolean cronUpdated = false; @@ -333,7 +305,6 @@ private boolean isCronUpdated(Map configA, } return cronUpdated; } - private void createNote(WebSocket conn, Notebook notebook) throws IOException { Note note = notebook.createNote(); note.addParagraph(); // it's an empty note. so add one paragraph @@ -341,7 +312,6 @@ private void createNote(WebSocket conn, Notebook notebook) throws IOException { broadcastNote(note); broadcastNoteList(); } - private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); @@ -353,7 +323,6 @@ private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) removeNote(noteId); broadcastNoteList(); } - private void updateParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); @@ -373,7 +342,6 @@ private void updateParagraph(NotebookSocket conn, Notebook notebook, note.persist(); broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } - private void removeParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); @@ -388,31 +356,27 @@ private void removeParagraph(NotebookSocket conn, Notebook notebook, broadcastNote(note); } } - private void completion(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); Message resp = new Message(OP.COMPLETION_LIST).put("id", paragraphId); - if (paragraphId == null) { conn.send(serializeMessage(resp)); return; } - final Note note = notebook.getNote(getOpenNoteId(conn)); List candidates = note.completion(paragraphId, buffer, cursor); resp.put("completions", candidates); conn.send(serializeMessage(resp)); } - /** * When angular object updated from client * - * @param conn - * @param notebook - * @param fromMessage + * @param conn the web socket. + * @param notebook the notebook. + * @param fromMessage the message. */ private void angularObjectUpdated(WebSocket conn, Notebook notebook, Message fromMessage) { @@ -420,10 +384,8 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); String varName = (String) fromMessage.get("name"); Object varValue = fromMessage.get("value"); - AngularObject ao = null; boolean global = false; - // propagate change to (Remote) AngularObjectRegistry Note note = notebook.getNote(noteId); if (note != null) { @@ -433,11 +395,9 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, if (setting.getInterpreterGroup() == null) { continue; } - if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); - // first trying to get local registry ao = angularObjectRegistry.get(varName, noteId); if (ao == null) { @@ -455,12 +415,10 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, ao.set(varValue, false); global = false; } - break; } } } - if (global) { // broadcast change to all web session that uses related // interpreter. for (Note n : notebook.getAllNotes()) { @@ -470,7 +428,6 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, if (setting.getInterpreterGroup() == null) { continue; } - if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting .getInterpreterGroup().getAngularObjectRegistry(); @@ -490,14 +447,12 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, .put("noteId", note.id())); } } - private void moveParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - final int newIndex = (int) Double.parseDouble(fromMessage.get("index") .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); @@ -505,30 +460,25 @@ private void moveParagraph(NotebookSocket conn, Notebook notebook, note.persist(); broadcastNote(note); } - private void insertParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index") .toString()); - final Note note = notebook.getNote(getOpenNoteId(conn)); note.insertParagraph(index); note.persist(); broadcastNote(note); } - private void cancelParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } - final Note note = notebook.getNote(getOpenNoteId(conn)); Paragraph p = note.getParagraph(paragraphId); p.abort(); } - private void runParagraph(NotebookSocket conn, Notebook notebook, Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); @@ -546,7 +496,6 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, Map config = (Map) fromMessage .get("config"); p.setConfig(config); - // if it's the last paragraph, let's add a new one boolean isTheLastParagraph = note.getLastParagraph().getId() .equals(p.getId()); @@ -555,7 +504,6 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, } note.persist(); broadcastNote(note); - try { note.run(paragraphId); } catch (Exception ex) { @@ -568,7 +516,6 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, } } } - /** * Need description here. * @@ -576,12 +523,10 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, public static class ParagraphJobListener implements JobListener { private NotebookServer notebookServer; private Note note; - public ParagraphJobListener(NotebookServer notebookServer, Note note) { this.notebookServer = notebookServer; this.note = note; } - @Override public void onProgressUpdate(Job job, int progress) { notebookServer.broadcast( @@ -589,11 +534,9 @@ public void onProgressUpdate(Job job, int progress) { new Message(OP.PROGRESS).put("id", job.getId()).put("progress", job.progress())); } - @Override public void beforeStatusChange(Job job, Status before, Status after) { } - @Override public void afterStatusChange(Job job, Status before, Status after) { if (after == Status.ERROR) { @@ -612,22 +555,18 @@ public void afterStatusChange(Job job, Status before, Status after) { notebookServer.broadcastNote(note); } } - @Override public JobListener getParagraphJobListener(Note note) { return new ParagraphJobListener(this, note); } - private void pong() { } - private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { List settings = note.getNoteReplLoader() .getInterpreterSettings(); if (settings == null || settings.size() == 0) { return; } - for (InterpreterSetting intpSetting : settings) { AngularObjectRegistry registry = intpSetting.getInterpreterGroup() .getAngularObjectRegistry(); @@ -641,31 +580,25 @@ private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOExce } } } - @Override public void onAdd(String interpreterGroupId, AngularObject object) { onUpdate(interpreterGroupId, object); } - @Override public void onUpdate(String interpreterGroupId, AngularObject object) { Notebook notebook = notebook(); if (notebook == null) { return; } - List notes = notebook.getAllNotes(); for (Note note : notes) { if (object.getNoteId() != null && !note.id().equals(object.getNoteId())) { continue; } - List intpSettings = note.getNoteReplLoader() .getInterpreterSettings(); - if (intpSettings.isEmpty()) continue; - for (InterpreterSetting setting : intpSettings) { if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { broadcast( @@ -678,7 +611,6 @@ public void onUpdate(String interpreterGroupId, AngularObject object) { } } } - @Override public void onRemove(String interpreterGroupId, String name, String noteId) { Notebook notebook = notebook(); @@ -687,16 +619,19 @@ public void onRemove(String interpreterGroupId, String name, String noteId) { if (noteId != null && !note.id().equals(noteId)) { continue; } - List ids = note.getNoteReplLoader().getInterpreters(); for (String id : ids) { if (id.equals(interpreterGroupId)) { broadcast( note.id(), new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put( - "noteId", noteId)); + "noteId", noteId)); } } } } + private String getOrigin(NotebookSocket conn) { + return conn.getRequest().getHeader("Origin"); + } } + From 2a00dd9bd89b0e332aa26553246abad718dee8cd Mon Sep 17 00:00:00 2001 From: joelz Date: Thu, 6 Aug 2015 16:34:51 -0700 Subject: [PATCH 3/7] Fixing websocket issue --- .../zeppelin/socket/NotebookServer.java | 8 +- .../zeppelin/socket/NotebookServerTests.java | 16 +- .../socket/TestHttpServletRequest.java | 347 +++++++++++++++++- .../socket/TestNotebookSocketListener.java | 16 +- 4 files changed, 381 insertions(+), 6 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 7e8ab62b944..c2e508248bb 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -16,8 +16,12 @@ */ package org.apache.zeppelin.socket; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.*; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.ArrayList; import javax.servlet.http.HttpServletRequest; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java index d36719de690..7ec877d7456 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java @@ -19,7 +19,7 @@ */ package org.apache.zeppelin.socket; - import org.junit.Assert; +import org.junit.Assert; import org.junit.FixMethodOrder; import org.junit.Test; import org.junit.runners.MethodSorters; @@ -32,7 +32,7 @@ * */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) - public class NotebookServerTest { + public class NotebookServerTests { @Test public void CheckOrigin(){ @@ -46,4 +46,16 @@ public void CheckOrigin(){ Assert.assertFalse(server.checkOrigin(new TestHttpServletRequest(), "http://localhost:8080")); Assert.assertEquals(0, server.seenOrigins.size()); } + + @Test + public void CheckInvalidOrigin(){ + NotebookServer server = new NotebookServer(); + NotebookSocket socket = new NotebookSocket(new TestHttpServletRequest(), "http", new TestNotebookSocketListener()); + server.onOpen(socket); + Assert.assertEquals(1, server.seenOrigins.size()); + server.doWebSocketConnect(new TestHttpServletRequest(), "http"); + Assert.assertFalse(server.checkOrigin(new TestHttpServletRequest(), "http://evillocalhost:8080")); + server.onClose(socket, 0, "Shutdown"); + Assert.assertEquals(0, server.seenOrigins.size()); + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java index ec88b6aaaac..3172d475f69 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java @@ -1,7 +1,352 @@ package org.apache.zeppelin.socket; +import javax.servlet.*; +import javax.servlet.http.*; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.security.Principal; +import java.util.Collection; +import java.util.Enumeration; +import java.util.Locale; +import java.util.Map; + /** * Created by joelz on 8/6/15. */ -public class TestHttpServletRequest { +public class TestHttpServletRequest implements HttpServletRequest { + @Override + public boolean authenticate(HttpServletResponse httpServletResponse) throws IOException, ServletException { + return false; + } + + @Override + public String getAuthType() { + return null; + } + + @Override + public String getContextPath() { + return null; + } + + @Override + public Cookie[] getCookies() { + return new Cookie[0]; + } + + @Override + public long getDateHeader(String s) { + return 0; + } + + @Override + public String getHeader(String s) { + switch (s) { + case "Origin": + return "http://localhost:8080"; + } + + return null; + } + + @Override + public Enumeration getHeaderNames() { + return null; + } + + @Override + public Enumeration getHeaders(String s) { + return null; + } + + @Override + public int getIntHeader(String s) { + return 0; + } + + @Override + public String getMethod() { + return null; + } + + @Override + public Part getPart(String s) throws IOException, ServletException { + return null; + } + + @Override + public Collection getParts() throws IOException, ServletException { + return null; + } + + @Override + public String getPathInfo() { + return null; + } + + @Override + public String getPathTranslated() { + return null; + } + + @Override + public String getQueryString() { + return null; + } + + @Override + public String getRemoteUser() { + return null; + } + + @Override + public String getRequestedSessionId() { + return null; + } + + @Override + public String getRequestURI() { + return null; + } + + @Override + public StringBuffer getRequestURL() { + return null; + } + + @Override + public String getServletPath() { + return null; + } + + @Override + public HttpSession getSession() { + return null; + } + + @Override + public HttpSession getSession(boolean b) { + return null; + } + + @Override + public Principal getUserPrincipal() { + return null; + } + + @Override + public boolean isRequestedSessionIdFromCookie() { + return false; + } + + @Override + public boolean isRequestedSessionIdFromUrl() { + return false; + } + + @Override + public boolean isRequestedSessionIdFromURL() { + return false; + } + + @Override + public boolean isRequestedSessionIdValid() { + return false; + } + + @Override + public boolean isUserInRole(String s) { + return false; + } + + @Override + public void login(String s, String s1) throws ServletException { + + } + + @Override + public void logout() throws ServletException { + + } + + @Override + public AsyncContext getAsyncContext() { + return null; + } + + @Override + public Object getAttribute(String s) { + return null; + } + + @Override + public Enumeration getAttributeNames() { + return null; + } + + @Override + public String getCharacterEncoding() { + return null; + } + + @Override + public int getContentLength() { + return 0; + } + + @Override + public String getContentType() { + return null; + } + + @Override + public DispatcherType getDispatcherType() { + return null; + } + + @Override + public ServletInputStream getInputStream() throws IOException { + return null; + } + + @Override + public String getLocalAddr() { + return null; + } + + @Override + public Locale getLocale() { + return null; + } + + @Override + public Enumeration getLocales() { + return null; + } + + @Override + public String getLocalName() { + return null; + } + + @Override + public int getLocalPort() { + return 0; + } + + @Override + public String getParameter(String s) { + return null; + } + + @Override + public Map getParameterMap() { + return null; + } + + @Override + public Enumeration getParameterNames() { + return null; + } + + @Override + public String[] getParameterValues(String s) { + return new String[0]; + } + + @Override + public String getProtocol() { + return null; + } + + @Override + public BufferedReader getReader() throws IOException { + return null; + } + + @Override + public String getRealPath(String s) { + return null; + } + + @Override + public String getRemoteAddr() { + return null; + } + + @Override + public String getRemoteHost() { + return null; + } + + @Override + public int getRemotePort() { + return 0; + } + + @Override + public RequestDispatcher getRequestDispatcher(String s) { + return null; + } + + @Override + public String getScheme() { + return null; + } + + @Override + public String getServerName() { + return null; + } + + @Override + public int getServerPort() { + return 0; + } + + @Override + public ServletContext getServletContext() { + return null; + } + + @Override + public boolean isAsyncStarted() { + return false; + } + + @Override + public boolean isAsyncSupported() { + return false; + } + + @Override + public boolean isSecure() { + return false; + } + + @Override + public void removeAttribute(String s) { + + } + + @Override + public void setAttribute(String s, Object o) { + + } + + @Override + public void setCharacterEncoding(String s) throws UnsupportedEncodingException { + + } + + @Override + public AsyncContext startAsync() { + return null; + } + + @Override + public AsyncContext startAsync(ServletRequest servletRequest, ServletResponse servletResponse) { + return null; + } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java index 236f744ba5e..dacbc556dd1 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java @@ -3,5 +3,19 @@ /** * Created by joelz on 8/6/15. */ -public class TestNotebookSocketListener { +public class TestNotebookSocketListener implements NotebookSocketListener { + @Override + public void onClose(NotebookSocket socket, int code, String message) { + + } + + @Override + public void onOpen(NotebookSocket socket) { + + } + + @Override + public void onMessage(NotebookSocket socket, String message) { + + } } From 8cb3316ff57a3888e50311b12bcda850df8792c6 Mon Sep 17 00:00:00 2001 From: joelz Date: Thu, 6 Aug 2015 16:47:27 -0700 Subject: [PATCH 4/7] Fixing websocket issue --- .../zeppelin/socket/NotebookServer.java | 210 +++++++++--------- .../zeppelin/socket/NotebookServerTests.java | 2 +- .../socket/TestHttpServletRequest.java | 1 + .../socket/TestNotebookSocketListener.java | 1 + 4 files changed, 108 insertions(+), 106 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index c2e508248bb..8dd070340a7 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -50,7 +50,7 @@ * @author anthonycorbacho */ public class NotebookServer extends WebSocketServlet implements - NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { + NotebookSocketListener, JobListenerFactory, AngularObjectRegistryListener { private static final Logger LOG = LoggerFactory .getLogger(NotebookServer.class); Gson gson = new Gson(); @@ -71,7 +71,7 @@ public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) { @Override public void onOpen(NotebookSocket conn) { LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort()); + conn.getRequest().getRemotePort()); synchronized (connectedSockets) { synchronized (seenOrigins) { seenOrigins.add(getOrigin(conn)); @@ -87,51 +87,51 @@ public void onMessage(NotebookSocket conn, String msg) { LOG.info("RECEIVE << " + messagereceived.op); /** Lets be elegant here */ switch (messagereceived.op) { - case LIST_NOTES: - broadcastNoteList(); - break; - case GET_NOTE: - sendNote(conn, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, notebook); - break; - case DEL_NOTE: - removeNote(conn, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, notebook, messagereceived); - break; - case PING: - pong(); - break; - case ANGULAR_OBJECT_UPDATED: - angularObjectUpdated(conn, notebook, messagereceived); - break; - default: - broadcastNoteList(); - break; + case LIST_NOTES: + broadcastNoteList(); + break; + case GET_NOTE: + sendNote(conn, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, notebook); + break; + case DEL_NOTE: + removeNote(conn, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, notebook, messagereceived); + break; + case PING: + pong(); + break; + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, notebook, messagereceived); + break; + default: + broadcastNoteList(); + break; } } catch (Exception e) { LOG.error("Can't handle message", e); @@ -140,7 +140,7 @@ public void onMessage(NotebookSocket conn, String msg) { @Override public void onClose(NotebookSocket conn, int code, String reason) { LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() - .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); + .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); synchronized (connectedSockets) { synchronized (seenOrigins) { @@ -158,7 +158,7 @@ private String serializeMessage(Message m) { private void addConnectionToNote(String noteId, NotebookSocket socket) { synchronized (noteSocketMap) { removeConnectionFromAllNote(socket); // make sure a socket relates only a - // single note. + // single note. List socketList = noteSocketMap.get(noteId); if (socketList == null) { socketList = new LinkedList<>(); @@ -204,7 +204,7 @@ private String getOpenNoteId(NotebookSocket socket) { return id; } private void broadcastToNoteBindedInterpreter(String interpreterGroupId, - Message m) { + Message m) { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); for (Note note : notes) { @@ -259,7 +259,7 @@ private void broadcastNoteList() { broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } private void sendNote(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -272,11 +272,11 @@ private void sendNote(NotebookSocket conn, Notebook notebook, } } private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); Map config = (Map) fromMessage - .get("config"); + .get("config"); if (noteId == null) { return; } @@ -297,10 +297,10 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) } } private boolean isCronUpdated(Map configA, - Map configB) { + Map configB) { boolean cronUpdated = false; if (configA.get("cron") != null && configB.get("cron") != null - && configA.get("cron").equals(configB.get("cron"))) { + && configA.get("cron").equals(configB.get("cron"))) { cronUpdated = true; } else if (configA.get("cron") == null && configB.get("cron") == null) { cronUpdated = false; @@ -317,7 +317,7 @@ private void createNote(WebSocket conn, Notebook notebook) throws IOException { broadcastNoteList(); } private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -328,15 +328,15 @@ private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) broadcastNoteList(); } private void updateParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } Map params = (Map) fromMessage - .get("params"); + .get("params"); Map config = (Map) fromMessage - .get("config"); + .get("config"); final Note note = notebook.getNote(getOpenNoteId(conn)); Paragraph p = note.getParagraph(paragraphId); p.settings.setParams(params); @@ -347,7 +347,7 @@ private void updateParagraph(NotebookSocket conn, Notebook notebook, broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } private void removeParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -361,7 +361,7 @@ private void removeParagraph(NotebookSocket conn, Notebook notebook, } } private void completion(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -377,13 +377,13 @@ private void completion(NotebookSocket conn, Notebook notebook, } /** * When angular object updated from client - * + * * @param conn the web socket. * @param notebook the notebook. * @param fromMessage the message. */ private void angularObjectUpdated(WebSocket conn, Notebook notebook, - Message fromMessage) { + Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); String varName = (String) fromMessage.get("name"); @@ -394,14 +394,14 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, Note note = notebook.getNote(noteId); if (note != null) { List settings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting - .getInterpreterGroup().getAngularObjectRegistry(); + .getInterpreterGroup().getAngularObjectRegistry(); // first trying to get local registry ao = angularObjectRegistry.get(varName, noteId); if (ao == null) { @@ -424,57 +424,57 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, } } if (global) { // broadcast change to all web session that uses related - // interpreter. + // interpreter. for (Note n : notebook.getAllNotes()) { List settings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting - .getInterpreterGroup().getAngularObjectRegistry(); + .getInterpreterGroup().getAngularObjectRegistry(); this.broadcast( - n.id(), - new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id())); + n.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", n.id())); } } } } else { // broadcast to all web session for the note this.broadcast( - note.id(), - new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } private void moveParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } final int newIndex = (int) Double.parseDouble(fromMessage.get("index") - .toString()); + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.moveParagraph(paragraphId, newIndex); note.persist(); broadcastNote(note); } private void insertParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index") - .toString()); + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.insertParagraph(index); note.persist(); broadcastNote(note); } private void cancelParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -484,7 +484,7 @@ private void cancelParagraph(NotebookSocket conn, Notebook notebook, p.abort(); } private void runParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -495,14 +495,14 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, p.setText(text); p.setTitle((String) fromMessage.get("title")); Map params = (Map) fromMessage - .get("params"); + .get("params"); p.settings.setParams(params); Map config = (Map) fromMessage - .get("config"); + .get("config"); p.setConfig(config); // if it's the last paragraph, let's add a new one boolean isTheLastParagraph = note.getLastParagraph().getId() - .equals(p.getId()); + .equals(p.getId()); if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { note.addParagraph(); } @@ -514,8 +514,8 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, LOG.error("Exception from run", ex); if (p != null) { p.setReturn( - new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), - ex); + new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), + ex); p.setStatus(Status.ERROR); } } @@ -534,9 +534,9 @@ public ParagraphJobListener(NotebookServer notebookServer, Note note) { @Override public void onProgressUpdate(Job job, int progress) { notebookServer.broadcast( - note.id(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", - job.progress())); + note.id(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", + job.progress())); } @Override public void beforeStatusChange(Job job, Status before, Status after) { @@ -567,20 +567,20 @@ private void pong() { } private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { List settings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); if (settings == null || settings.size() == 0) { return; } for (InterpreterSetting intpSetting : settings) { AngularObjectRegistry registry = intpSetting.getInterpreterGroup() - .getAngularObjectRegistry(); + .getAngularObjectRegistry(); List objects = registry.getAllWithGlobal(note.id()); for (AngularObject object : objects) { conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", - intpSetting.getInterpreterGroup().getId()) - .put("noteId", note.id()))); + .put("angularObject", object) + .put("interpreterGroupId", + intpSetting.getInterpreterGroup().getId()) + .put("noteId", note.id()))); } } } @@ -600,17 +600,17 @@ public void onUpdate(String interpreterGroupId, AngularObject object) { continue; } List intpSettings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); if (intpSettings.isEmpty()) continue; for (InterpreterSetting setting : intpSettings) { if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { broadcast( - note.id(), - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } } @@ -627,9 +627,9 @@ public void onRemove(String interpreterGroupId, String name, String noteId) { for (String id : ids) { if (id.equals(interpreterGroupId)) { broadcast( - note.id(), - new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put( - "noteId", noteId)); + note.id(), + new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put( + "noteId", noteId)); } } } diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java index 7ec877d7456..ed44460d7b7 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java @@ -32,7 +32,7 @@ * */ @FixMethodOrder(MethodSorters.NAME_ASCENDING) - public class NotebookServerTests { + public class NotebookServerTests { @Test public void CheckOrigin(){ diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java index 3172d475f69..40c64fbd2d1 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java @@ -13,6 +13,7 @@ /** * Created by joelz on 8/6/15. + * Helps mocking a http servlet request */ public class TestHttpServletRequest implements HttpServletRequest { @Override diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java index dacbc556dd1..00e2493ab61 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java @@ -2,6 +2,7 @@ /** * Created by joelz on 8/6/15. + * This enables mocking a socket listener. */ public class TestNotebookSocketListener implements NotebookSocketListener { @Override From 089965578aecbf31c5de5a6de7c1af693a16f395 Mon Sep 17 00:00:00 2001 From: joelz Date: Thu, 6 Aug 2015 17:10:04 -0700 Subject: [PATCH 5/7] Fixing websocket issue --- .../zeppelin/socket/NotebookServer.java | 202 +++++++++--------- 1 file changed, 101 insertions(+), 101 deletions(-) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 8dd070340a7..e6ebf443f7f 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -71,7 +71,7 @@ public WebSocket doWebSocketConnect(HttpServletRequest req, String protocol) { @Override public void onOpen(NotebookSocket conn) { LOG.info("New connection from {} : {}", conn.getRequest().getRemoteAddr(), - conn.getRequest().getRemotePort()); + conn.getRequest().getRemotePort()); synchronized (connectedSockets) { synchronized (seenOrigins) { seenOrigins.add(getOrigin(conn)); @@ -87,51 +87,51 @@ public void onMessage(NotebookSocket conn, String msg) { LOG.info("RECEIVE << " + messagereceived.op); /** Lets be elegant here */ switch (messagereceived.op) { - case LIST_NOTES: - broadcastNoteList(); - break; - case GET_NOTE: - sendNote(conn, notebook, messagereceived); - break; - case NEW_NOTE: - createNote(conn, notebook); - break; - case DEL_NOTE: - removeNote(conn, notebook, messagereceived); - break; - case COMMIT_PARAGRAPH: - updateParagraph(conn, notebook, messagereceived); - break; - case RUN_PARAGRAPH: - runParagraph(conn, notebook, messagereceived); - break; - case CANCEL_PARAGRAPH: - cancelParagraph(conn, notebook, messagereceived); - break; - case MOVE_PARAGRAPH: - moveParagraph(conn, notebook, messagereceived); - break; - case INSERT_PARAGRAPH: - insertParagraph(conn, notebook, messagereceived); - break; - case PARAGRAPH_REMOVE: - removeParagraph(conn, notebook, messagereceived); - break; - case NOTE_UPDATE: - updateNote(conn, notebook, messagereceived); - break; - case COMPLETION: - completion(conn, notebook, messagereceived); - break; - case PING: - pong(); - break; - case ANGULAR_OBJECT_UPDATED: - angularObjectUpdated(conn, notebook, messagereceived); - break; - default: - broadcastNoteList(); - break; + case LIST_NOTES: + broadcastNoteList(); + break; + case GET_NOTE: + sendNote(conn, notebook, messagereceived); + break; + case NEW_NOTE: + createNote(conn, notebook); + break; + case DEL_NOTE: + removeNote(conn, notebook, messagereceived); + break; + case COMMIT_PARAGRAPH: + updateParagraph(conn, notebook, messagereceived); + break; + case RUN_PARAGRAPH: + runParagraph(conn, notebook, messagereceived); + break; + case CANCEL_PARAGRAPH: + cancelParagraph(conn, notebook, messagereceived); + break; + case MOVE_PARAGRAPH: + moveParagraph(conn, notebook, messagereceived); + break; + case INSERT_PARAGRAPH: + insertParagraph(conn, notebook, messagereceived); + break; + case PARAGRAPH_REMOVE: + removeParagraph(conn, notebook, messagereceived); + break; + case NOTE_UPDATE: + updateNote(conn, notebook, messagereceived); + break; + case COMPLETION: + completion(conn, notebook, messagereceived); + break; + case PING: + pong(); + break; + case ANGULAR_OBJECT_UPDATED: + angularObjectUpdated(conn, notebook, messagereceived); + break; + default: + broadcastNoteList(); + break; } } catch (Exception e) { LOG.error("Can't handle message", e); @@ -140,7 +140,7 @@ public void onMessage(NotebookSocket conn, String msg) { @Override public void onClose(NotebookSocket conn, int code, String reason) { LOG.info("Closed connection to {} : {}. ({}) {}", conn.getRequest() - .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); + .getRemoteAddr(), conn.getRequest().getRemotePort(), code, reason); removeConnectionFromAllNote(conn); synchronized (connectedSockets) { synchronized (seenOrigins) { @@ -204,7 +204,7 @@ private String getOpenNoteId(NotebookSocket socket) { return id; } private void broadcastToNoteBindedInterpreter(String interpreterGroupId, - Message m) { + Message m) { Notebook notebook = notebook(); List notes = notebook.getAllNotes(); for (Note note : notes) { @@ -259,7 +259,7 @@ private void broadcastNoteList() { broadcastAll(new Message(OP.NOTES_INFO).put("notes", notesInfo)); } private void sendNote(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -272,11 +272,11 @@ private void sendNote(NotebookSocket conn, Notebook notebook, } } private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) - throws SchedulerException, IOException { + throws SchedulerException, IOException { String noteId = (String) fromMessage.get("id"); String name = (String) fromMessage.get("name"); Map config = (Map) fromMessage - .get("config"); + .get("config"); if (noteId == null) { return; } @@ -297,10 +297,10 @@ private void updateNote(WebSocket conn, Notebook notebook, Message fromMessage) } } private boolean isCronUpdated(Map configA, - Map configB) { + Map configB) { boolean cronUpdated = false; if (configA.get("cron") != null && configB.get("cron") != null - && configA.get("cron").equals(configB.get("cron"))) { + && configA.get("cron").equals(configB.get("cron"))) { cronUpdated = true; } else if (configA.get("cron") == null && configB.get("cron") == null) { cronUpdated = false; @@ -317,7 +317,7 @@ private void createNote(WebSocket conn, Notebook notebook) throws IOException { broadcastNoteList(); } private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) - throws IOException { + throws IOException { String noteId = (String) fromMessage.get("id"); if (noteId == null) { return; @@ -328,15 +328,15 @@ private void removeNote(WebSocket conn, Notebook notebook, Message fromMessage) broadcastNoteList(); } private void updateParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } Map params = (Map) fromMessage - .get("params"); + .get("params"); Map config = (Map) fromMessage - .get("config"); + .get("config"); final Note note = notebook.getNote(getOpenNoteId(conn)); Paragraph p = note.getParagraph(paragraphId); p.settings.setParams(params); @@ -347,7 +347,7 @@ private void updateParagraph(NotebookSocket conn, Notebook notebook, broadcast(note.id(), new Message(OP.PARAGRAPH).put("paragraph", p)); } private void removeParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -361,7 +361,7 @@ private void removeParagraph(NotebookSocket conn, Notebook notebook, } } private void completion(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { String paragraphId = (String) fromMessage.get("id"); String buffer = (String) fromMessage.get("buf"); int cursor = (int) Double.parseDouble(fromMessage.get("cursor").toString()); @@ -383,7 +383,7 @@ private void completion(NotebookSocket conn, Notebook notebook, * @param fromMessage the message. */ private void angularObjectUpdated(WebSocket conn, Notebook notebook, - Message fromMessage) { + Message fromMessage) { String noteId = (String) fromMessage.get("noteId"); String interpreterGroupId = (String) fromMessage.get("interpreterGroupId"); String varName = (String) fromMessage.get("name"); @@ -394,14 +394,14 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, Note note = notebook.getNote(noteId); if (note != null) { List settings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting - .getInterpreterGroup().getAngularObjectRegistry(); + .getInterpreterGroup().getAngularObjectRegistry(); // first trying to get local registry ao = angularObjectRegistry.get(varName, noteId); if (ao == null) { @@ -427,54 +427,54 @@ private void angularObjectUpdated(WebSocket conn, Notebook notebook, // interpreter. for (Note n : notebook.getAllNotes()) { List settings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); for (InterpreterSetting setting : settings) { if (setting.getInterpreterGroup() == null) { continue; } if (interpreterGroupId.equals(setting.getInterpreterGroup().getId())) { AngularObjectRegistry angularObjectRegistry = setting - .getInterpreterGroup().getAngularObjectRegistry(); + .getInterpreterGroup().getAngularObjectRegistry(); this.broadcast( - n.id(), - new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", n.id())); + n.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", n.id())); } } } } else { // broadcast to all web session for the note this.broadcast( - note.id(), - new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } private void moveParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; } final int newIndex = (int) Double.parseDouble(fromMessage.get("index") - .toString()); + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.moveParagraph(paragraphId, newIndex); note.persist(); broadcastNote(note); } private void insertParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final int index = (int) Double.parseDouble(fromMessage.get("index") - .toString()); + .toString()); final Note note = notebook.getNote(getOpenNoteId(conn)); note.insertParagraph(index); note.persist(); broadcastNote(note); } private void cancelParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -484,7 +484,7 @@ private void cancelParagraph(NotebookSocket conn, Notebook notebook, p.abort(); } private void runParagraph(NotebookSocket conn, Notebook notebook, - Message fromMessage) throws IOException { + Message fromMessage) throws IOException { final String paragraphId = (String) fromMessage.get("id"); if (paragraphId == null) { return; @@ -495,14 +495,14 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, p.setText(text); p.setTitle((String) fromMessage.get("title")); Map params = (Map) fromMessage - .get("params"); + .get("params"); p.settings.setParams(params); Map config = (Map) fromMessage - .get("config"); + .get("config"); p.setConfig(config); // if it's the last paragraph, let's add a new one boolean isTheLastParagraph = note.getLastParagraph().getId() - .equals(p.getId()); + .equals(p.getId()); if (!Strings.isNullOrEmpty(text) && isTheLastParagraph) { note.addParagraph(); } @@ -514,8 +514,8 @@ private void runParagraph(NotebookSocket conn, Notebook notebook, LOG.error("Exception from run", ex); if (p != null) { p.setReturn( - new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), - ex); + new InterpreterResult(InterpreterResult.Code.ERROR, ex.getMessage()), + ex); p.setStatus(Status.ERROR); } } @@ -534,9 +534,9 @@ public ParagraphJobListener(NotebookServer notebookServer, Note note) { @Override public void onProgressUpdate(Job job, int progress) { notebookServer.broadcast( - note.id(), - new Message(OP.PROGRESS).put("id", job.getId()).put("progress", - job.progress())); + note.id(), + new Message(OP.PROGRESS).put("id", job.getId()).put("progress", + job.progress())); } @Override public void beforeStatusChange(Job job, Status before, Status after) { @@ -567,20 +567,20 @@ private void pong() { } private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { List settings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); if (settings == null || settings.size() == 0) { return; } for (InterpreterSetting intpSetting : settings) { AngularObjectRegistry registry = intpSetting.getInterpreterGroup() - .getAngularObjectRegistry(); + .getAngularObjectRegistry(); List objects = registry.getAllWithGlobal(note.id()); for (AngularObject object : objects) { conn.send(serializeMessage(new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", - intpSetting.getInterpreterGroup().getId()) - .put("noteId", note.id()))); + .put("angularObject", object) + .put("interpreterGroupId", + intpSetting.getInterpreterGroup().getId()) + .put("noteId", note.id()))); } } } @@ -600,17 +600,17 @@ public void onUpdate(String interpreterGroupId, AngularObject object) { continue; } List intpSettings = note.getNoteReplLoader() - .getInterpreterSettings(); + .getInterpreterSettings(); if (intpSettings.isEmpty()) continue; for (InterpreterSetting setting : intpSettings) { if (setting.getInterpreterGroup().getId().equals(interpreterGroupId)) { broadcast( - note.id(), - new Message(OP.ANGULAR_OBJECT_UPDATE) - .put("angularObject", object) - .put("interpreterGroupId", interpreterGroupId) - .put("noteId", note.id())); + note.id(), + new Message(OP.ANGULAR_OBJECT_UPDATE) + .put("angularObject", object) + .put("interpreterGroupId", interpreterGroupId) + .put("noteId", note.id())); } } } @@ -627,9 +627,9 @@ public void onRemove(String interpreterGroupId, String name, String noteId) { for (String id : ids) { if (id.equals(interpreterGroupId)) { broadcast( - note.id(), - new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put( - "noteId", noteId)); + note.id(), + new Message(OP.ANGULAR_OBJECT_REMOVE).put("name", name).put( + "noteId", noteId)); } } } From 42ff8a3872e2f8105ac73ec8c2d4db88fc3d4523 Mon Sep 17 00:00:00 2001 From: joelz Date: Thu, 6 Aug 2015 17:47:47 -0700 Subject: [PATCH 6/7] Fixing websocket issue --- .../socket/TestHttpServletRequest.java | 19 +++++++++++++++++++ .../socket/TestNotebookSocketListener.java | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java index 40c64fbd2d1..9ec54baa95a 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestHttpServletRequest.java @@ -1,3 +1,22 @@ +/** + * Created by joelz on 8/6/15. + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.socket; import javax.servlet.*; diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java index 00e2493ab61..8d0637cdbce 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/TestNotebookSocketListener.java @@ -1,3 +1,22 @@ +/** + * Created by joelz on 8/6/15. + * + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.zeppelin.socket; /** From 6eecbe8859f46228aaca4f3a11c76ecba4f3fa45 Mon Sep 17 00:00:00 2001 From: joelz Date: Mon, 10 Aug 2015 14:48:51 -0700 Subject: [PATCH 7/7] Fixing websocket issue --- .../java/org/apache/zeppelin/socket/NotebookServerTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java index ed44460d7b7..cac1d0c18dc 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/socket/NotebookServerTests.java @@ -31,7 +31,6 @@ * @author joelz * */ -@FixMethodOrder(MethodSorters.NAME_ASCENDING) public class NotebookServerTests { @Test