From 14fef45bae55d7b81e9d82aa1a89a80453357a32 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 23 Feb 2018 13:20:44 +0800 Subject: [PATCH 1/2] ZEPPELIN-3255. Can not run spark1 and spark2 in one zeppelin instance --- spark/interpreter/figure/null-1.png | Bin 13599 -> 0 bytes spark/interpreter/pom.xml | 12 + .../zeppelin/spark/NewSparkInterpreter.java | 67 +---- .../zeppelin/spark/OldSparkInterpreter.java | 235 +----------------- .../spark/OldSparkInterpreterTest.java | 10 +- spark/pom.xml | 41 +-- spark/spark-shims/pom.xml | 70 ++++++ .../org/apache/zeppelin/spark/SparkShims.java | 110 ++++++++ spark/spark1-shims/pom.xml | 89 +++++++ .../apache/zeppelin/spark/Spark1Shims.java | 57 +++++ spark/spark2-shims/pom.xml | 88 +++++++ .../apache/zeppelin/spark/Spark2Shims.java | 36 +++ 12 files changed, 488 insertions(+), 327 deletions(-) delete mode 100644 spark/interpreter/figure/null-1.png create mode 100644 spark/spark-shims/pom.xml create mode 100644 spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java create mode 100644 spark/spark1-shims/pom.xml create mode 100644 spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java create mode 100644 spark/spark2-shims/pom.xml create mode 100644 spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java diff --git a/spark/interpreter/figure/null-1.png b/spark/interpreter/figure/null-1.png deleted file mode 100644 index 8b1ce07ea9e7d0f24bae214f3bda98a7787ee662..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 13599 zcmeHuc{r5s+kUnvE&A9}7^RZLR1#T+7F4z(DcclD$i6dUNTsYvS;7noNl3PAGnC;| zLX09igE1Im7|fXMea!TEx8wce{p0=p@%d~4XmFL?=($hf z&|g1>*{G5lBRNS0UyxJ9^c&H~Nqmci*6l%jVe82t{aIW2Ri~Mn;|49@u#e-;eV_dy z5eaU0CXY_w9t&Apxj#-5OZtYT8Es=6fBzGjW9B~hc4jKiDKl_5r`(5pi;ePZzWZ?5 z^K)TW1`6_ke9NApqH&Ngex=3a?K6}SQ)I%!6J9MlHkg{3yi0%c)1i@8KRsvv{Wmn4{;{+2$n6-&eW?$7n2GPCa4wYMhxX3m`VmJp`SlOUDE##+ zy36sz#noir6MXwaV|C3|Q)EAg-zKm3U%F@LsH^#t)ex_4S&(Tse7dqe=O#z}h^f6* z-@Btbsvjw~D>H-~8kw!oa(m)Y%`4RtGTC)DopdODj(K8Lh3Wo!3^8d#sqt&G{Z{|M z^z;0^>;A#kdptMer53w0uurgZ%f6IJo1I1xsb4Y(WmE^<8{TR=jNYXQt-I5&$(Sdc ze@Bg`##A0{RDvI7wKjIZzrs7!x=PP$goGNUE_PgDCeh| zv8}DI5`CU4%U$y}(?XvSk3VqkcKq{GSJyt;Ea&cjVKHUtCT68$C0(22R^+ZhMDhut`sf)1I z6CLYan_tDOp9qOy^%93#{NE8&BdauZvN5CNfS(WpPVpw!1O#6W3TA!zCfj?{rehr! z&!4#(!bck3{$9=nuD?h?j~-ZiB$gsptYIkQxl84++?D71kK8XAkd7i&MuS!H{o2~m z+pFXXSaJP5&np$*;JQ1tvWuKzuG*S ze2B58C?`IpLx(ICQlk(jd>pOvG_v{GJ>zQX(2uNhs*7N)R|bjE~MUzAk`CPHcdK)Q00nN-aG3E3c_?+&wzQOe13kk zrFjB(rJ{mFL})?r(T==zx+EkB%g@@Q$X!E{nAjp|w{RnGMPC5xcImW+5qG%;_9=`F z9^VmmhC4bUDYp2+&pm!)K}9Oh88p>6%Tn)iM^}$1jHyiZ?%uc}b~h$6(z{J-Mu5-p z=H})~Cv2=lwI}k?qerm|+jrK4|Jc#h|9AbdRfikKuTZAN!`khnh+DUE2Og<-l?*L7 z%iUd;KvK$PN$m2V-2<#6v;)SoO;G|^^+SgaZEaa%r`#*}8Dn8?UZ6(xhrx8s%*-|b zYzs%;EAMY|7I=w7Mtra+S)r5*{8iwvn*l_I?ymIoYc)sO%;ERe{QdoZXuQnLUutr` zbLY^xxgQ!bWfsoP&M(SdC5JX1A~&yD9#7&TCcxgHDHygX5XBu){}Nkly!wp?NI7Y- zMLCecuo4%ZjR2^@lFK|yw*kOy6TZB*Wdp|sU;s=eSX~~ne1QRluNTf6nHV(vnd;91 zDt!MxITjx_GX-|)K9z?pHY)~V0G9r`exa_Irm3M}{Y^h)@n#)9QO9y96}K>4k4u!( z>0vTqr&U&722Z9%@Un`a)Rx<#G{!X*Ti^Ag4x+CzFVyG%xkNW+b zAr0z_l_W0Ilg%?Vz0E1Azo#najBo=xM$qpW^&VFUby6N zMn=ZPJ}3enHZBScd5tAUtko0eE4s|3p)g-@t)!Yf~89fS1E|MMp=w-Mp!g5FanGHpm?P{{8aSvTBR&E^%|M zeR2@k#~%&}zDaIQPI)V=_!X*5SBOzDLneE!M|%&8^mVoOMMUY1iURdPxU?h6Dg13% zJt{jp!J6|?*`x_%Gu164?HF>aZtT8jYy!D;ueYh@D9k@r`lFU#lJbZ*XzyN?2sKV- zW~QB=qwExB&VhVH9_@>7bt3Frh)+tA_Dmx$r;(~CLD*PHVzRNkn{%JkigM7KJk;?+ zo=7MBla<->YqV;r-nz`3_xxfteD*8wSn)-ogH|@C&V>+1<0Q83)xCXvujCif{ISjAk|IlFLF<`*kmI}C<>!1BY61tGUU3^JewXkbE zTo5*Bh8OyxZAzeTVcLOS>q`< zy&Y;^e68fjNL*6mr}W@K&l<^F@O5ILbm6r!;Ium>9si`G9YM@bD{v-Mg^H~taF57I z)nkh@NQ)Bhx2^KmY6~6In10hQJX<_N1wn-d6Nl9xIen|j=D3AX42CppI`^|g1p*ou zLBGt6RissU^hu$P7ZqS(B>^sS2@$Qm4@pHT1-=D=B_$<$hS1-CeV0*1 zMTO_IX}Q0&Ge}1fAdoRJR_V6J#e4JnQ=Ob_v)$$&$m@n2>*(kJW_$$9=r3Jn(cSm9 zuiJ54{+oHX>-AqJFLKfLQc7&mEGcrGTev7hM@DuYliPgN z?5_<}Mil@!#3|-LxHZz>3JPQDIquJXhZKdeOnJT)9x68jsN}Ow;T1)00LP<>yy1cT z1W?j#d-IY9Ycl}j1;z$E_V@IRL6fV&j|04F%+{buQ;&aEoIC9{etw9++e5s2;{f;* zcU|UD(Az*6D05lk&nx}^xJ&<;zzswGgRxnah1Lg?&2@t2FDx%FPkpv53pI`)3jf2e z45}kBWHh5^Dh-MRk}fjurXWkw#y)ug%~+@mU7kLvr|0m1Q@-qlVXamIdPgOocT~Ex zr%u#1POI8(4d!N)uTrx_~do?+B)n1GjWrPiUCDvc@9>SK86X&&;oB|_4>t^!eVo@=V9&JQ1 zF3Dtgvb_OO2aBn;xU~oGM_U}f7Oc^#$1bt2{jkE4m6febyry+&D(LFSwf=x9vp|>n zq`;Go9)sv%V}0k?=(bZ9l!52Y_4@0i`B!O(FSqHvg^(8`jFGj1zJ_THq5z0yu>KHA zdpPko!vo5&)h0JACwdWl5s43vP+qA6H754HybgKh(e(6mY@6QC1`35j9q2<>(^ddx z68NfxPgPIWrG(*YP=xCl-y9?sLDOpEQ7>pu*q z=>&R?F5p*`*KJp{ZPhsCikRC?ZMz27&})r`UGyXlEs2^%&~@!@Q1Vmar;UV@Rn3>l zL$&JJfj@5;UYFGYCl26eAc;%Cq=6TwyoRpJegWx#-CM8N&IdfV%G&iW8OOZU!yhIJ0o~+A-3n{$Lk5Rp=y8ZBJX6no8X2 zT^&2xb-3-!h0MYbxNW(U;nW~J-m1*)OC$#iWn!r*`t%f{3Y30ny^F)_3K)MVE#;Zz z_f@%Zz}9bM4$4^%AXW7h40_dC{c3~$5R$Z9REGt~V2yQkc~+>) zUSB);<~H-QvRJGzbpV3B@r=b-t7*(sh9GvK9{LDEJ1%3pr-51`t}Fd2&Ix^um)mAP zt@cXdXa*}Ijo~SZH23N+T8r2NTY?XWqPA)X;hdX zgUlKs=Yu0CaU_R5dOQh4aBCFOcoD>2jENKd)}l0$gdVyF6Sp_Ta2Obrxelpv!jXR3ylVM@Y(Rv>?%4|Xf^UVZGbL02RgBcz5lR8ymeQn1pX(EI;{ ziB-{~ba@yotzQ3$2S;IcA2(xaUCSZtg3)uNF*Gc8t%`%wXIX`p8MLqc&V{a!<&3sYKVC)^yPMqfj1Oc}61O&{IEnvIMX6cD zta@Nlua!9t=NW#N{QZE}qYjQnbEY*h86(sJP03MV*|g%u7OyK}wE~OvjG(6>YTqm+1~(`=1pRU$rrLQat3^~a^dQZCR{?)rUlM_o-nNNI zvHumNWsSM@`O#9T9dFv4hZ`XhmcH-`#JU?cvW!eq1A^K_3KhUbU)-y8O$@z*C<>*W z6_vi2_Iu|KT66=D=Y+)Ks8bS!ge2s|+V2CF0|)NL1R{+kQ~meG3!Ar*erA zH90bp{HnR~FpmsLh%M$n;JwUuV}QvdZN}yCr1M9p#4s z|6)U|==k=nGNRk)zY+HD?l?eQ|&p|1s0Txhb@jZ1Ad8(!z&rv*{_`r4G=?U zv7>SL!|-D|hNxRExIbk055MQemw%}$?2X3Qf0bL0GMb}>adN>!$rIICLFAF_p!xpf zX#%1BMp0jbB8Do!jhYJYFXU85qL4^)8;3ya^;sK?Ei-&3H>M|gCnUY}<3w!0+Hi1T zN2X?B$Tw2WD6-mX_+_;>@Okf-Wk7XbgQ3azj(=`+Go-sQJ{ZjyCKpt6L3A8o6fCht z+~VkwVV}?2iO)>bE)uf*&26t=KYW%|km+^F#xd08WmeYFvz+l5kojP>r;gXi?2lrr z`2+Uhq$)!@Fq~+D+aO;Pgz@q5lgExdHP;${5N~qT47U+seBM{H-W3iJb8~Y~9zT91 z9%tqC`jJvA|tfW_eoBJ9BN-ZA-0g9<=fF9#Y zB0W}8C5(+$Reyy7te$&e?@n(Dj32vK??%7hyBg8D8{n#dq}Nh;^UXY_e;U8(U5C}_RfiCGY+nzc zAuO#-_8)(%ItFf7{1nhEB>lR(yE_1%pjoiNiuzNOZL)T!12(fEOY}khG!{AKlI9RJ zdAj_wm{mhl8$XDJW+w|*&Bv5d)mP&|p_9ocb#w+#G-HRAlj0{x$Ws>O{(B03cF!6W z!IGVAvx#3{sKk-bvj%TVWAyJS`yDI*G;>hrT_T6FE|GC9N*XJ+myt=9sMI3Ubq13p)3c_kgCt)+E6o&{KPY^U}& zVk}t@$im9Xs{L+0vM`7nqi)xic{N_I^>fh%Fh5tr079=*E~;~o)zZ@9E1(Y>$^tBP z3S&p%S`zn(fBaA*u@ z()x#hWuCI)RsEHYIpTZ>2C1g7#IB}*G+0xpYFAw;Z!P?f=Kk3%c#--Dw1i;@rp zSpICS`}%e5yC$a`ZDp~XpWdvW${03j zPix$h(dRMSvI{0#rN7uDV95EcANG6P_}wET$hOE4H(pG040Eg7AQTqgQ9$Fc^6Gul z->KVKG+_Uj#p$W%)ZCr$!&qY4($#Oy;Af*;g@6xu9Yg4j6d&-xA)XzRFQhHGXabHM zg);Y^hs$Akc zqX<|v#?fw9xXOuvGT;(RSIerA16Zq!QBFj;2*uN%(nwK4hB(bhE3>OHq&|xsEgw4( zA+1$KgSyzj8N7*-6BGC?o&2jpNpJ2(m%G1a96HcN4h|6cSa8mU`E_YHcogo&T3;!_ zH1%`cuMN3Y=S0L}H0U*mo(wATdMfYf1~~ZG8n)cvU8oSO)(6DsZ?k;BBC{9BsTD(N zR%5Oa)2=mBqs=QB6S0hvhpIbiF6-$Xya8(|BIB{~)y;Tu~vqoE2_7?FtsoB0{trl-1y`i7kvS)!GL80CJcgDzWJ zl&91?wG{P!_9d%T&l%k-$C&W41MJwx@(T9KML6wJ7xfl5mPw_mu|&}N_n27y zxg{h8@29sQ0IV%r4bEOhSx{~yV`y=h3uSPU2WaiI{QJtLR8_kkzy$5lXXEvy5JBq= z1U+w}5{)GO*GKmH98MEWNua(3M%4Eu%@fcD z=f)(fcf6ZUN=G#wQJ+(blVB^b04($0Ma=s~s7^BY|mvGIRO<$h{ z1j{>s1MJhHdUoV|P6EI0Yq~39)s;9~T&7mq&94_pd=g|#+rbxj%OAi7z^2tv7ky<`(kJTn1KlLHj?orlzu>`-_mX{E z4!Yw5SURKhP9=k}eS))+w2gWEy`SB{h3?bMs+qT{`KB#ck9&X6Y>EPet zJE*I@&TIWe=0!##VttUm8-b4kjlloIwWbv5$0lD!V9OmA>?g=GkxeOp7jE(Lo4AM= z06hdCP-g($TuUP3@Gq+RgM_CIJhzW6NJvtpY z=K-@tg$1tqj9=LNTb~c8HPT1c(3&Ms41_B`L0kmqB7{fx5lPW$NI>Uw;Kzvuxc|`lKv|sVW zjZ{oG`s)zD?)!}sHjh~yRqTCD-$qG4D|TT zyZi9(wnmw6iBC*atGl+^+k4gbOOHos?V(1Jr?(0^oMi)eHdeAFZAFaquik#~D8+gF z`r>hL;&gjk3mHg8I}l`ufQA@(6yeQ{-b6TrQXcTQq! zbd46<7<6)IS9mO;m9Y*O2}@f(O2Hv^s^T-EQ6WjJnX>9E6bd{NoK)uO=pIYJYBnY$ zB&dd6hvDsDzaD|3ZCfUGM12KTed_fdUK>P*DDA1kvU83w?@0RrzQXCE%{=C{3?y#) j@$Uk>TwUA5;cq;ZUy9+JzytsIwCSRu`T5ebH}C%&jmAWl diff --git a/spark/interpreter/pom.xml b/spark/interpreter/pom.xml index e8d57a23f1e..c89cfa6ecbe 100644 --- a/spark/interpreter/pom.xml +++ b/spark/interpreter/pom.xml @@ -81,6 +81,18 @@ ${project.version} + + org.apache.zeppelin + spark1-shims + ${project.version} + + + + org.apache.zeppelin + spark2-shims + ${project.version} + + org.apache.zeppelin zeppelin-python diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java index 1d3ccd65fde..c8efa7a7d9f 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/NewSparkInterpreter.java @@ -69,6 +69,7 @@ public class NewSparkInterpreter extends AbstractSparkInterpreter { private SparkVersion sparkVersion; private boolean enableSupportedVersionCheck; private String sparkUrl; + private SparkShims sparkShims; private static InterpreterHookRegistry hooks; @@ -117,7 +118,8 @@ public void open() throws InterpreterException { sqlContext = this.innerInterpreter.sqlContext(); sparkSession = this.innerInterpreter.sparkSession(); sparkUrl = this.innerInterpreter.sparkUrl(); - setupListeners(); + sparkShims = SparkShims.getInstance(sc.version()); + sparkShims.setupSparkListener(sparkUrl); hooks = getInterpreterGroup().getInterpreterHookRegistry(); z = new SparkZeppelinContext(sc, hooks, @@ -125,7 +127,7 @@ public void open() throws InterpreterException { this.innerInterpreter.bind("z", z.getClass().getCanonicalName(), z, Lists.newArrayList("@transient")); } catch (Exception e) { - LOGGER.error(ExceptionUtils.getStackTrace(e)); + LOGGER.error("Fail to open SparkInterpreter", ExceptionUtils.getStackTrace(e)); throw new InterpreterException("Fail to open SparkInterpreter", e); } } @@ -213,67 +215,6 @@ public int getProgress(InterpreterContext context) { return innerInterpreter.getProgress(Utils.buildJobGroupId(context), context); } - private void setupListeners() { - JobProgressListener pl = new JobProgressListener(sc.getConf()) { - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - super.onJobStart(jobStart); - int jobId = jobStart.jobId(); - String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); - String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled"); - String jobUrl = getJobUrl(jobId); - String noteId = Utils.getNoteId(jobGroupId); - String paragraphId = Utils.getParagraphId(jobGroupId); - // Button visible if Spark UI property not set, set as invalid boolean or true - java.lang.Boolean showSparkUI = - uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); - if (showSparkUI && jobUrl != null) { - RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); - Map infos = new java.util.HashMap<>(); - infos.put("jobUrl", jobUrl); - infos.put("label", "SPARK JOB"); - infos.put("tooltip", "View in Spark web UI"); - if (eventClient != null) { - eventClient.onParaInfosReceived(noteId, paragraphId, infos); - } - } - } - - private String getJobUrl(int jobId) { - String jobUrl = null; - if (sparkUrl != null) { - jobUrl = sparkUrl + "/jobs/job?id=" + jobId; - } - return jobUrl; - } - }; - try { - Object listenerBus = sc.getClass().getMethod("listenerBus").invoke(sc); - Method[] methods = listenerBus.getClass().getMethods(); - Method addListenerMethod = null; - for (Method m : methods) { - if (!m.getName().equals("addListener")) { - continue; - } - Class[] parameterTypes = m.getParameterTypes(); - if (parameterTypes.length != 1) { - continue; - } - if (!parameterTypes[0].isAssignableFrom(JobProgressListener.class)) { - continue; - } - addListenerMethod = m; - break; - } - if (addListenerMethod != null) { - addListenerMethod.invoke(listenerBus, pl); - } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - LOGGER.error(e.toString(), e); - } - } - public SparkZeppelinContext getZeppelinContext() { return this.z; } diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java index ff3a2caa565..1f59d18d339 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/OldSparkInterpreter.java @@ -151,6 +151,8 @@ public class OldSparkInterpreter extends AbstractSparkInterpreter { private JavaSparkContext jsc; private boolean enableSupportedVersionCheck; + private SparkShims sparkShims; + public OldSparkInterpreter(Properties property) { super(property); out = new InterpreterOutputStream(logger); @@ -158,10 +160,10 @@ public OldSparkInterpreter(Properties property) { public OldSparkInterpreter(Properties property, SparkContext sc) { this(property); - this.sc = sc; env = SparkEnv.get(); - sparkListener = setupListeners(this.sc); + sparkShims = SparkShims.getInstance(sc.version()); + sparkShims.setupSparkListener(sparkUrl); } public SparkContext getSparkContext() { @@ -169,7 +171,6 @@ public SparkContext getSparkContext() { if (sc == null) { sc = createSparkContext(); env = SparkEnv.get(); - sparkListener = setupListeners(sc); } return sc; } @@ -190,157 +191,6 @@ public boolean isSparkContextInitialized() { } } - static SparkListener setupListeners(SparkContext context) { - SparkListener pl = new SparkListener() { - @Override - public synchronized void onJobStart(SparkListenerJobStart jobStart) { - int jobId = jobStart.jobId(); - String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); - String uiEnabled = jobStart.properties().getProperty("spark.ui.enabled"); - String jobUrl = getJobUrl(jobId); - String noteId = Utils.getNoteId(jobGroupId); - String paragraphId = Utils.getParagraphId(jobGroupId); - // Button visible if Spark UI property not set, set as invalid boolean or true - java.lang.Boolean showSparkUI = - uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); - if (showSparkUI && jobUrl != null) { - RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); - Map infos = new java.util.HashMap<>(); - infos.put("jobUrl", jobUrl); - infos.put("label", "SPARK JOB"); - infos.put("tooltip", "View in Spark web UI"); - if (eventClient != null) { - eventClient.onParaInfosReceived(noteId, paragraphId, infos); - } - } - } - - private String getJobUrl(int jobId) { - String jobUrl = null; - if (sparkUrl != null) { - jobUrl = sparkUrl + "/jobs/job/?id=" + jobId; - } - return jobUrl; - } - - @Override - public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { - - } - - @Override - public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { - - } - - @Override - public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { - - } - - @Override - public void onExecutorMetricsUpdate( - SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { - - } - - @Override - public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { - - } - - @Override - public void onApplicationStart(SparkListenerApplicationStart applicationStart) { - - } - - @Override - public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { - - } - - @Override - public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { - - } - - @Override - public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { - - } - - @Override - public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { - - } - - @Override - public void onJobEnd(SparkListenerJobEnd jobEnd) { - - } - - @Override - public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { - - } - - @Override - public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { - - } - - @Override - public void onTaskEnd(SparkListenerTaskEnd taskEnd) { - - } - - @Override - public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { - - } - - @Override - public void onTaskStart(SparkListenerTaskStart taskStart) { - - } - }; - try { - Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); - - Method[] methods = listenerBus.getClass().getMethods(); - Method addListenerMethod = null; - for (Method m : methods) { - if (!m.getName().equals("addListener")) { - continue; - } - - Class[] parameterTypes = m.getParameterTypes(); - - if (parameterTypes.length != 1) { - continue; - } - - if (!parameterTypes[0].isAssignableFrom(SparkListener.class)) { - continue; - } - - addListenerMethod = m; - break; - } - - if (addListenerMethod != null) { - addListenerMethod.invoke(listenerBus, pl); - } else { - return null; - } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - logger.error(e.toString(), e); - return null; - } - return pl; - } - private boolean useHiveContext() { return java.lang.Boolean.parseBoolean(getProperty("zeppelin.spark.useHiveContext")); } @@ -1020,6 +870,10 @@ public void open() throws InterpreterException { } } + sparkUrl = getSparkUIUrl(); + sparkShims = SparkShims.getInstance(sc.version()); + sparkShims.setupSparkListener(sparkUrl); + numReferenceOfSparkContext.incrementAndGet(); } @@ -1373,75 +1227,6 @@ public int getProgress(InterpreterContext context) { return JobProgressUtil.progress(sc, jobGroup); } - private int[] getProgressFromStage_1_0x(SparkListener sparkListener, Object stage) - throws IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException, SecurityException { - int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); - int completedTasks = 0; - - int id = (int) stage.getClass().getMethod("id").invoke(stage); - - Object completedTaskInfo = null; - - completedTaskInfo = JavaConversions.mapAsJavaMap( - (HashMap) sparkListener.getClass() - .getMethod("stageIdToTasksComplete").invoke(sparkListener)).get(id); - - if (completedTaskInfo != null) { - completedTasks += (int) completedTaskInfo; - } - List parents = JavaConversions.seqAsJavaList((Seq) stage.getClass() - .getMethod("parents").invoke(stage)); - if (parents != null) { - for (Object s : parents) { - int[] p = getProgressFromStage_1_0x(sparkListener, s); - numTasks += p[0]; - completedTasks += p[1]; - } - } - - return new int[] {numTasks, completedTasks}; - } - - private int[] getProgressFromStage_1_1x(SparkListener sparkListener, Object stage) - throws IllegalAccessException, IllegalArgumentException, - InvocationTargetException, NoSuchMethodException, SecurityException { - int numTasks = (int) stage.getClass().getMethod("numTasks").invoke(stage); - int completedTasks = 0; - int id = (int) stage.getClass().getMethod("id").invoke(stage); - - try { - Method stageIdToData = sparkListener.getClass().getMethod("stageIdToData"); - HashMap, Object> stageIdData = - (HashMap, Object>) stageIdToData.invoke(sparkListener); - Class stageUIDataClass = - this.getClass().forName("org.apache.spark.ui.jobs.UIData$StageUIData"); - - Method numCompletedTasks = stageUIDataClass.getMethod("numCompleteTasks"); - Set> keys = - JavaConverters.setAsJavaSetConverter(stageIdData.keySet()).asJava(); - for (Tuple2 k : keys) { - if (id == (int) k._1()) { - Object uiData = stageIdData.get(k).get(); - completedTasks += (int) numCompletedTasks.invoke(uiData); - } - } - } catch (Exception e) { - logger.error("Error on getting progress information", e); - } - - List parents = JavaConversions.seqAsJavaList((Seq) stage.getClass() - .getMethod("parents").invoke(stage)); - if (parents != null) { - for (Object s : parents) { - int[] p = getProgressFromStage_1_1x(sparkListener, s); - numTasks += p[0]; - completedTasks += p[1]; - } - } - return new int[] {numTasks, completedTasks}; - } - private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r) { if (r instanceof scala.tools.nsc.interpreter.Results.Success$) { return Code.SUCCESS; @@ -1479,10 +1264,6 @@ public FormType getFormType() { return FormType.NATIVE; } - public SparkListener getJobProgressListener() { - return sparkListener; - } - @Override public Scheduler getScheduler() { return SchedulerFactory.singleton().createOrGetFIFOScheduler( diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java index 14214a284f2..068ff50c3d8 100644 --- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java +++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/OldSparkInterpreterTest.java @@ -192,13 +192,7 @@ public void testNextLineCompanionObject() throws InterpreterException { public void testEndWithComment() throws InterpreterException { assertEquals(InterpreterResult.Code.SUCCESS, repl.interpret("val c=1\n//comment", context).code()); } - - @Test - public void testListener() { - SparkContext sc = repl.getSparkContext(); - assertNotNull(OldSparkInterpreter.setupListeners(sc)); - } - + @Test public void testCreateDataFrame() throws InterpreterException { if (getSparkVersionNumber(repl) >= 13) { @@ -362,7 +356,7 @@ public void testParagraphUrls() throws InterpreterException { } String sparkUIUrl = repl.getSparkUIUrl(); assertNotNull(jobUrl); - assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job/?id=")); + assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id=")); } } diff --git a/spark/pom.xml b/spark/pom.xml index 7a0c7c2ac66..def865e5c55 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -63,21 +63,26 @@ scala-2.10 scala-2.11 spark-dependencies + spark-shims + spark1-shims + spark2-shims - org.apache.zeppelin - zeppelin-interpreter - ${project.version} + org.slf4j + slf4j-api - org.apache.zeppelin - zeppelin-display - ${project.version} - test + org.slf4j + slf4j-log4j12 + + + + log4j + log4j @@ -92,28 +97,6 @@ junit test - - - org.datanucleus - datanucleus-core - ${datanucleus.core.version} - test - - - - org.datanucleus - datanucleus-api-jdo - ${datanucleus.apijdo.version} - test - - - - org.datanucleus - datanucleus-rdbms - ${datanucleus.rdbms.version} - test - - diff --git a/spark/spark-shims/pom.xml b/spark/spark-shims/pom.xml new file mode 100644 index 00000000000..619c7a42a86 --- /dev/null +++ b/spark/spark-shims/pom.xml @@ -0,0 +1,70 @@ + + + + + + + spark-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + spark-shims + 0.9.0-SNAPSHOT + jar + Zeppelin: Spark Shims + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + + + + maven-dependency-plugin + + true + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + \ No newline at end of file diff --git a/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java new file mode 100644 index 00000000000..acf717c5ae3 --- /dev/null +++ b/spark/spark-shims/src/main/scala/org/apache/zeppelin/spark/SparkShims.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + + +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.util.Map; +import java.util.Properties; + +/** + * This is abstract class for anything that is api incompatible between spark1 and spark2. + * It will load the correct version of SparkShims based on the version of Spark. + */ +public abstract class SparkShims { + + private static final Logger LOGGER = LoggerFactory.getLogger(SparkShims.class); + + private static SparkShims sparkShims; + + private static SparkShims loadShims(String sparkVersion) throws ReflectiveOperationException { + Class sparkShimsClass; + if ("2".equals(sparkVersion)) { + LOGGER.info("Initializing shims for Spark 2.x"); + sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark2Shims"); + } else { + LOGGER.info("Initializing shims for Spark 1.x"); + sparkShimsClass = Class.forName("org.apache.zeppelin.spark.Spark1Shims"); + } + + Constructor c = sparkShimsClass.getConstructor(); + return (SparkShims) c.newInstance(); + } + + public static SparkShims getInstance(String sparkVersion) { + if (sparkShims == null) { + String sparkMajorVersion = getSparkMajorVersion(sparkVersion); + try { + sparkShims = loadShims(sparkMajorVersion); + } catch (ReflectiveOperationException e) { + throw new RuntimeException(e); + } + } + return sparkShims; + } + + private static String getSparkMajorVersion(String sparkVersion) { + return sparkVersion.startsWith("2") ? "2" : "1"; + } + + /** + * This is due to SparkListener api change between spark1 and spark2. + * SparkListener is trait in spark1 while it is abstract class in spark2. + */ + public abstract void setupSparkListener(String sparkWebUrl); + + + protected String getNoteId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(indexOf + 1, secondIndex); + } + + protected String getParagraphId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); + } + + protected void buildSparkJobUrl(String sparkWebUrl, int jobId, Properties jobProperties) { + String jobGroupId = jobProperties.getProperty("spark.jobGroup.id"); + String uiEnabled = jobProperties.getProperty("spark.ui.enabled"); + String jobUrl = sparkWebUrl + "/jobs/job?id=" + jobId; + String noteId = getNoteId(jobGroupId); + String paragraphId = getParagraphId(jobGroupId); + // Button visible if Spark UI property not set, set as invalid boolean or true + boolean showSparkUI = + uiEnabled == null || !uiEnabled.trim().toLowerCase().equals("false"); + if (showSparkUI && jobUrl != null) { + RemoteEventClientWrapper eventClient = BaseZeppelinContext.getEventClient(); + Map infos = new java.util.HashMap(); + infos.put("jobUrl", jobUrl); + infos.put("label", "SPARK JOB"); + infos.put("tooltip", "View in Spark web UI"); + if (eventClient != null) { + eventClient.onParaInfosReceived(noteId, paragraphId, infos); + } + } + } +} diff --git a/spark/spark1-shims/pom.xml b/spark/spark1-shims/pom.xml new file mode 100644 index 00000000000..93640c6ffe0 --- /dev/null +++ b/spark/spark1-shims/pom.xml @@ -0,0 +1,89 @@ + + + + + + + spark-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + spark1-shims + 0.9.0-SNAPSHOT + jar + Zeppelin: Spark1 Shims + + + 2.10 + 1.6.3 + + + + + + org.apache.zeppelin + spark-shims + ${project.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + + + + maven-dependency-plugin + + true + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + \ No newline at end of file diff --git a/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java new file mode 100644 index 00000000000..9f233136799 --- /dev/null +++ b/spark/spark1-shims/src/main/scala/org/apache/zeppelin/spark/Spark1Shims.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerApplicationStart; +import org.apache.spark.scheduler.SparkListenerBlockManagerAdded; +import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved; +import org.apache.spark.scheduler.SparkListenerBlockUpdated; +import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorAdded; +import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate; +import org.apache.spark.scheduler.SparkListenerExecutorRemoved; +import org.apache.spark.scheduler.SparkListenerJobEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; +import org.apache.spark.scheduler.SparkListenerStageCompleted; +import org.apache.spark.scheduler.SparkListenerStageSubmitted; +import org.apache.spark.scheduler.SparkListenerTaskEnd; +import org.apache.spark.scheduler.SparkListenerTaskGettingResult; +import org.apache.spark.scheduler.SparkListenerTaskStart; +import org.apache.spark.scheduler.SparkListenerUnpersistRDD; +import org.apache.spark.ui.jobs.JobProgressListener; +import org.apache.zeppelin.interpreter.BaseZeppelinContext; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; + +import java.util.Map; + +public class Spark1Shims extends SparkShims { + + public void setupSparkListener(final String sparkWebUrl) { + SparkContext sc = SparkContext.getOrCreate(); + sc.addSparkListener(new JobProgressListener(sc.getConf()) { + @Override + public void onJobStart(SparkListenerJobStart jobStart) { + buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties()); + } + }); + } +} diff --git a/spark/spark2-shims/pom.xml b/spark/spark2-shims/pom.xml new file mode 100644 index 00000000000..000e3abd864 --- /dev/null +++ b/spark/spark2-shims/pom.xml @@ -0,0 +1,88 @@ + + + + + + spark-parent + org.apache.zeppelin + 0.9.0-SNAPSHOT + ../pom.xml + + + 4.0.0 + org.apache.zeppelin + spark2-shims + 0.9.0-SNAPSHOT + jar + Zeppelin: Spark2 Shims + + + 2.11 + 2.1.2 + + + + + + org.apache.zeppelin + spark-shims + ${project.version} + + + + org.apache.spark + spark-core_${scala.binary.version} + ${spark.version} + provided + + + + org.apache.zeppelin + zeppelin-interpreter + ${project.version} + provided + + + + + + + maven-dependency-plugin + + true + + + + + maven-resources-plugin + + + copy-interpreter-setting + none + + true + + + + + + + + \ No newline at end of file diff --git a/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java new file mode 100644 index 00000000000..4b3961064c5 --- /dev/null +++ b/spark/spark2-shims/src/main/scala/org/apache/zeppelin/spark/Spark2Shims.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.spark; + +import org.apache.spark.SparkContext; +import org.apache.spark.scheduler.SparkListener; +import org.apache.spark.scheduler.SparkListenerJobStart; + +public class Spark2Shims extends SparkShims { + + public void setupSparkListener(final String sparkWebUrl) { + SparkContext sc = SparkContext.getOrCreate(); + sc.addSparkListener(new SparkListener() { + @Override + public void onJobStart(SparkListenerJobStart jobStart) { + buildSparkJobUrl(sparkWebUrl, jobStart.jobId(), jobStart.properties()); + } + }); + } +} From 68fa4373f6c313294b801752aa5fe022a6f255af Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sun, 25 Feb 2018 21:24:08 +0800 Subject: [PATCH 2/2] Remove akka from spark-dependencies --- spark/spark-dependencies/pom.xml | 41 +------------------------------- 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/spark/spark-dependencies/pom.xml b/spark/spark-dependencies/pom.xml index 58977b4db77..4e90a9304b2 100644 --- a/spark/spark-dependencies/pom.xml +++ b/spark/spark-dependencies/pom.xml @@ -294,46 +294,7 @@ hadoop-client ${hadoop.version} - - - - com.google.protobuf - protobuf-java - ${protobuf.version} - - - - ${akka.group} - akka-actor_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-remote_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-slf4j_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-testkit_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-zeromq_${scala.binary.version} - ${akka.version} - - - ${akka.group} - akka-actor_${scala.binary.version} - - - - + org.apache.spark