From 566b0ed44541383079efeac0c8e754e549fa19cb Mon Sep 17 00:00:00 2001 From: Hao Xia Date: Thu, 4 Aug 2016 14:08:50 -0700 Subject: [PATCH 1/2] Implement one-way sync for notebook repos --- conf/zeppelin-env.cmd.template | 1 + conf/zeppelin-env.sh.template | 1 + conf/zeppelin-site.xml.template | 6 +++ docs/install/install.md | 6 +++ .../zeppelin/conf/ZeppelinConfiguration.java | 1 + .../notebook/repo/NotebookRepoSync.java | 52 +++++++++++++++---- .../notebook/repo/NotebookRepoSyncTest.java | 49 +++++++++++++++++ 7 files changed, 106 insertions(+), 10 deletions(-) diff --git a/conf/zeppelin-env.cmd.template b/conf/zeppelin-env.cmd.template index d85e59f2709..4550a555d2e 100644 --- a/conf/zeppelin-env.cmd.template +++ b/conf/zeppelin-env.cmd.template @@ -35,6 +35,7 @@ REM set ZEPPELIN_IDENT_STRING REM A string representing this instance of zep REM set ZEPPELIN_NICENESS REM The scheduling priority for daemons. Defaults to 0. REM set ZEPPELIN_INTERPRETER_LOCALREPO REM Local repository for interpreter's additional dependency loading REM set ZEPPELIN_NOTEBOOK_STORAGE REM Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). +REM set ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC REM If there are multiple notebook storages, should we treat the first one as the only source of truth? REM Spark interpreter configuration diff --git a/conf/zeppelin-env.sh.template b/conf/zeppelin-env.sh.template index 52e36f7b5f6..4730cc3077b 100644 --- a/conf/zeppelin-env.sh.template +++ b/conf/zeppelin-env.sh.template @@ -36,6 +36,7 @@ # export ZEPPELIN_NICENESS # The scheduling priority for daemons. Defaults to 0. # export ZEPPELIN_INTERPRETER_LOCALREPO # Local repository for interpreter's additional dependency loading # export ZEPPELIN_NOTEBOOK_STORAGE # Refers to pluggable notebook storage class, can have two classes simultaneously with a sync between them (e.g. local and remote). +# export ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC # If there are multiple notebook storages, should we treat the first one as the only source of truth? #### Spark interpreter configuration #### diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 3fc1b4aec78..4b958fa39be 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -164,6 +164,12 @@ notebook persistence layer implementation + + zeppelin.notebook.one.way.sync + false + If there are multiple notebook storages, should we treat the first one as the only source of truth? + + zeppelin.interpreter.dir interpreter diff --git a/docs/install/install.md b/docs/install/install.md index 98adc745fe3..86fd6e95759 100644 --- a/docs/install/install.md +++ b/docs/install/install.md @@ -373,6 +373,12 @@ You can configure Apache Zeppelin with both **environment variables** in `conf/z org.apache.zeppelin.notebook.repo.VFSNotebookRepo Comma separated list of notebook storage + + ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC + zeppelin.notebook.one.way.sync + false + If there are multiple notebook storages, should we treat the first one as the only source of truth? + ZEPPELIN_INTERPRETERS zeppelin.interpreters diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 1845e6c6a9e..e5d542f97eb 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -542,6 +542,7 @@ public static enum ConfVars { ZEPPELIN_NOTEBOOK_AZURE_SHARE("zeppelin.notebook.azure.share", "zeppelin"), ZEPPELIN_NOTEBOOK_AZURE_USER("zeppelin.notebook.azure.user", "user"), ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()), + ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC("zeppelin.notebook.one.way.sync", false), ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", System.getProperty("os.name") .startsWith("Windows") ? "bin/interpreter.cmd" : "bin/interpreter.sh"), diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index 6c499c6e312..77d05cd1de3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -44,11 +44,13 @@ public class NotebookRepoSync implements NotebookRepo { private static final int maxRepoNum = 2; private static final String pushKey = "pushNoteIDs"; private static final String pullKey = "pullNoteIDs"; + private static final String delDstKey = "delDstNoteIDs"; private static ZeppelinConfiguration config; private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.VFSNotebookRepo"; private List repos = new ArrayList(); + private final boolean oneWaySync; /** * @param noteIndex @@ -58,6 +60,7 @@ public class NotebookRepoSync implements NotebookRepo { @SuppressWarnings("static-access") public NotebookRepoSync(ZeppelinConfiguration conf) { config = conf; + oneWaySync = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC); String allStorageClassNames = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE).trim(); if (allStorageClassNames.isEmpty()) { allStorageClassNames = defaultStorage; @@ -182,6 +185,8 @@ void sync(int sourceRepoIndex, int destRepoIndex) throws IOException { Map> noteIDs = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo); List pushNoteIDs = noteIDs.get(pushKey); List pullNoteIDs = noteIDs.get(pullKey); + List delDstNoteIDs = noteIDs.get(delDstKey); + if (!pushNoteIDs.isEmpty()) { LOG.info("Notes with the following IDs will be pushed"); for (String id : pushNoteIDs) { @@ -202,6 +207,16 @@ void sync(int sourceRepoIndex, int destRepoIndex) throws IOException { LOG.info("Nothing to pull"); } + if (!delDstNoteIDs.isEmpty()) { + LOG.info("Notes with the following IDs will be deleted from dest"); + for (String id : delDstNoteIDs) { + LOG.info("ID : " + id); + } + deleteNotes(delDstNoteIDs, dstRepo); + } else { + LOG.info("Nothing to delete from dest"); + } + LOG.info("Sync ended"); } @@ -216,6 +231,12 @@ private void pushNotes(List ids, NotebookRepo localRepo, } } + private void deleteNotes(List ids, NotebookRepo repo) throws IOException { + for (String id : ids) { + repo.remove(id, null); + } + } + public int getRepoCount() { return repos.size(); } @@ -237,6 +258,7 @@ private Map> notesCheckDiff(List sourceNotes, throws IOException { List pushIDs = new ArrayList(); List pullIDs = new ArrayList(); + List delDstIDs = new ArrayList(); NoteInfo dnote; Date sdate, ddate; @@ -246,14 +268,17 @@ private Map> notesCheckDiff(List sourceNotes, /* note exists in source and destination storage systems */ sdate = lastModificationDate(sourceRepo.get(snote.getId(), null)); ddate = lastModificationDate(destRepo.get(dnote.getId(), null)); - if (sdate.after(ddate)) { - /* source contains more up to date note - push */ - pushIDs.add(snote.getId()); - LOG.info("Modified note is added to push list : " + sdate); - } else if (sdate.compareTo(ddate) != 0) { - /* destination contains more up to date note - pull */ - LOG.info("Modified note is added to pull list : " + ddate); - pullIDs.add(snote.getId()); + + if (sdate.compareTo(ddate) != 0) { + if (sdate.after(ddate) || oneWaySync) { + /* source contains more up to date note - push */ + pushIDs.add(snote.getId()); + LOG.info("Modified note is added to push list : " + sdate); + } else { + /* destination contains more up to date note - pull */ + LOG.info("Modified note is added to pull list : " + ddate); + pullIDs.add(snote.getId()); + } } } else { /* note exists in source storage, and absent in destination @@ -266,14 +291,21 @@ private Map> notesCheckDiff(List sourceNotes, for (NoteInfo note : destNotes) { dnote = containsID(sourceNotes, note.getId()); if (dnote == null) { - /* note exists in destination storage, and absent in source - pull*/ - pullIDs.add(note.getId()); + /* note exists in destination storage, and absent in source */ + if (oneWaySync) { + LOG.info("Extraneous note is added to delete dest list : " + note.getId()); + delDstIDs.add(note.getId()); + } else { + LOG.info("Missing note is added to pull list : " + note.getId()); + pullIDs.add(note.getId()); + } } } Map> map = new HashMap>(); map.put(pushKey, pushIDs); map.put(pullKey, pullIDs); + map.put(delDstKey, delDstIDs); return map; } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java index 0c67d79789b..bd13120a4a3 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/repo/NotebookRepoSyncTest.java @@ -84,6 +84,7 @@ public void setUp() throws Exception { System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath()); System.setProperty(ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.VFSNotebookRepo,org.apache.zeppelin.notebook.repo.mock.VFSNotebookRepoMock"); + System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "false"); LOG.info("main Note dir : " + mainNotePath); LOG.info("secondary note dir : " + secNotePath); conf = ZeppelinConfiguration.create(); @@ -220,6 +221,54 @@ public void testSyncOnReloadedList() throws IOException { assertEquals(1, notebookRepoSync.list(1, null).size()); } + @Test + public void testOneWaySyncOnReloadedList() throws IOException, SchedulerException { + System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), mainNotebookDir.getAbsolutePath()); + System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC.getVarName(), "true"); + conf = ZeppelinConfiguration.create(); + notebookRepoSync = new NotebookRepoSync(conf); + notebookSync = new Notebook(conf, notebookRepoSync, schedulerFactory, factory, this, search, + notebookAuthorization, credentials); + + // check that both storage repos are empty + assertTrue(notebookRepoSync.getRepoCount() > 1); + assertEquals(0, notebookRepoSync.list(0, null).size()); + assertEquals(0, notebookRepoSync.list(1, null).size()); + + File srcDir = new File("src/test/resources/2A94M5J1Z"); + File destDir = new File(secNotebookDir + "/2A94M5J1Z"); + + // copy manually new notebook into secondary storage repo and check repos + try { + FileUtils.copyDirectory(srcDir, destDir); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + assertEquals(0, notebookRepoSync.list(0, null).size()); + assertEquals(1, notebookRepoSync.list(1, null).size()); + + // after reloading the notebook should be wiped from secondary storage + notebookSync.reloadAllNotes(null); + assertEquals(0, notebookRepoSync.list(0, null).size()); + assertEquals(0, notebookRepoSync.list(1, null).size()); + + destDir = new File(mainNotebookDir + "/2A94M5J1Z"); + + // copy manually new notebook into primary storage repo and check repos + try { + FileUtils.copyDirectory(srcDir, destDir); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + assertEquals(1, notebookRepoSync.list(0, null).size()); + assertEquals(0, notebookRepoSync.list(1, null).size()); + + // after reloading notebooks repos should be synchronized + notebookSync.reloadAllNotes(null); + assertEquals(1, notebookRepoSync.list(0, null).size()); + assertEquals(1, notebookRepoSync.list(1, null).size()); + } + @Test public void testCheckpointOneStorage() throws IOException, SchedulerException { System.setProperty(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE.getVarName(), "org.apache.zeppelin.notebook.repo.GitNotebookRepo"); From 75a067078a0bfe543fc11a65ebafe1c21a345a3c Mon Sep 17 00:00:00 2001 From: Hao Xia Date: Fri, 5 Aug 2016 09:57:36 -0700 Subject: [PATCH 2/2] Address PR comments --- .../org/apache/zeppelin/notebook/repo/NotebookRepoSync.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index 77d05cd1de3..3f41f33e5b5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -271,7 +271,8 @@ private Map> notesCheckDiff(List sourceNotes, if (sdate.compareTo(ddate) != 0) { if (sdate.after(ddate) || oneWaySync) { - /* source contains more up to date note - push */ + /* if source contains more up to date note - push + * if oneWaySync is enabled, always push no matter who's newer */ pushIDs.add(snote.getId()); LOG.info("Modified note is added to push list : " + sdate); } else { @@ -293,9 +294,11 @@ private Map> notesCheckDiff(List sourceNotes, if (dnote == null) { /* note exists in destination storage, and absent in source */ if (oneWaySync) { + /* if oneWaySync is enabled, delete the note from destination */ LOG.info("Extraneous note is added to delete dest list : " + note.getId()); delDstIDs.add(note.getId()); } else { + /* if oneWaySync is disabled, pull the note from destination */ LOG.info("Missing note is added to pull list : " + note.getId()); pullIDs.add(note.getId()); }