Skip to content

Commit

Permalink
Branch 0.8 Changes related to zeppelin 0.7 (#12)
Browse files Browse the repository at this point in the history
* Some optimizations suggested by suyog along with some logs for testing

* Changes required to support zeppelin 0.7

* Adding default property for local repo

* Update version for release
  • Loading branch information
SachinJanani committed Mar 14, 2017
1 parent 2272a77 commit 29562d6
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 25 deletions.
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group 'io.snappydata'
version '0.6.1'
version '0.7.0'

apply plugin: 'java'
apply plugin: 'wrapper'
Expand All @@ -26,10 +26,10 @@ ext {
scalaBinaryVersion = '2.11'
scalaVersion = scalaBinaryVersion + '.8'
buildDir = 'build-artifacts'
snappySparkVersion = '2.0.1-3'
snappyDataVersion = '0.6.1'
gemfireVersion = "1.5.2"
zeppelinVersion = '0.6.1'
snappySparkVersion = '2.0.3-2'
snappyDataVersion = '0.8'
gemfireVersion = "1.5.4"
zeppelinVersion = '0.7.0'
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public class SnappyDataSqlZeppelinInterpreter extends Interpreter {
private static final String EMPTY_STRING = "";
private static final String SEMI_COLON = ";";
SparkContext sc = null;
LoadingCache<String, SnappyContext> paragraphContextCache = null;
// LoadingCache<String, SnappyContext> paragraphContextCache = null;

BlockingQueue<SnappyContext> connectionQueue = new ArrayBlockingQueue<SnappyContext>(50);
Map<String,SnappyContext> paragraphConnectionMap = new HashMap<String,SnappyContext>();

public SnappyDataSqlZeppelinInterpreter(Properties property) {
super(property);
Expand All @@ -102,17 +104,14 @@ public void open() {
}
}

paragraphContextCache = CacheBuilder.newBuilder()
.maximumSize(50)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build(
new CacheLoader<String, SnappyContext>() {
@Override
public SnappyContext load(String paragraphId) throws Exception {
return new SnappyContext(sc);
}
}
);
// long end,start;
for(int i=0;i<50;i++) {
//start = System.currentTimeMillis();
SnappyContext snc = new SnappyContext(sc);
//end = System.currentTimeMillis();
snc.tables().collect();
connectionQueue.add(snc);
}
}

private String getJobGroup(InterpreterContext context) {
Expand Down Expand Up @@ -167,14 +166,24 @@ public InterpreterResult interpret(String cmd, InterpreterContext contextInterpr
String id = contextInterpreter.getParagraphId();
SnappyContext snc = null;
try {
snc = paragraphContextCache.get(id);
//snc = paragraphContextCache.get(id);
if (paragraphConnectionMap.containsKey(id)) {
snc = paragraphConnectionMap.get(id);
} else {
if (connectionQueue.isEmpty()) {
snc = new SnappyContext(sc);
} else {
snc = connectionQueue.take();
}
paragraphConnectionMap.put(id, snc);
}
if (null != getProperty(Constants.SPARK_SQL_SHUFFLE_PARTITIONS)) {
snc.setConf(Constants.SPARK_SQL_SHUFFLE_PARTITIONS,
getProperty(Constants.SPARK_SQL_SHUFFLE_PARTITIONS));
}
} catch (ExecutionException e) {
} catch (InterruptedException interruptedException) {
logger.error("Error initializing SnappyContext");
e.printStackTrace();
interruptedException.printStackTrace();
}

cmd = cmd.trim();
Expand Down Expand Up @@ -286,7 +295,7 @@ private InterpreterResult executeSql(SnappyContext snc, String sql,
if (null != data && data != EMPTY_STRING && data.split("\n").length>1) {
msg.append(data);
msg.append(NEWLINE);

msg.append("<!--TABLE_COMMENT-->");

if (isApproxQuery) {
paragraphStateMap.get(paragraphId).setTimeRequiredForApproxQuery(endTime - startTime);
Expand All @@ -311,6 +320,7 @@ private InterpreterResult executeSql(SnappyContext snc, String sql,
}


//connectionQueue.offer(snc);
return new InterpreterResult(InterpreterResult.Code.SUCCESS, msg.toString());

} catch (Exception e) {
Expand All @@ -320,10 +330,12 @@ private InterpreterResult executeSql(SnappyContext snc, String sql,
stringBuilder.append(e.getMessage()).append("\n");
stringBuilder.append(e.getClass().toString()).append("\n");
stringBuilder.append(StringUtils.join(e.getStackTrace(), "\n"));
//connectionQueue.offer(snc);
return new InterpreterResult(InterpreterResult.Code.ERROR, stringBuilder.toString());
} else {
paragraphStateMap.remove(paragraphId);
// Don't show error in case of cancel
//connectionQueue.offer(snc);
return new InterpreterResult(InterpreterResult.Code.KEEP_PREVIOUS_RESULT, EMPTY_STRING);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@
import org.apache.zeppelin.interpreter.InterpreterUtils;
import org.apache.zeppelin.interpreter.WrappedInterpreter;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.util.InterpreterOutputStream;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.apache.zeppelin.spark.SparkOutputStream;
import org.apache.zeppelin.spark.SparkVersion;
import org.apache.zeppelin.spark.ZeppelinContext;
import org.apache.zeppelin.spark.dep.SparkDependencyResolver;
Expand Down Expand Up @@ -126,8 +126,9 @@ public class SnappyDataZeppelinInterpreter extends Interpreter {
private static JobProgressListener sparkListener;
private static Integer sharedInterpreterLock = new Integer(0);

private SparkOutputStream out;
private InterpreterOutputStream out;
private SparkDependencyResolver dep;
private static InterpreterHookRegistry hooks;

/**
* completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10)
Expand All @@ -141,7 +142,7 @@ public class SnappyDataZeppelinInterpreter extends Interpreter {

public SnappyDataZeppelinInterpreter(Properties property) {
super(property);
out = new SparkOutputStream(logger);
out = new InterpreterOutputStream(logger);
}

public SnappyDataZeppelinInterpreter(Properties property, SparkContext sc) {
Expand Down Expand Up @@ -494,7 +495,8 @@ public void open() {

dep = getDependencyResolver();

z = new ZeppelinContext(sc, snc, null, dep,
hooks = getInterpreterGroup().getInterpreterHookRegistry();
z = new ZeppelinContext(sc, snc, null, dep, hooks,
Integer.parseInt(getProperty("zeppelin.spark.maxResult")));

interpret("@transient val _binder = new java.util.HashMap[String, Object]()");
Expand Down
6 changes: 6 additions & 0 deletions src/main/resources/interpreter-setting.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@
"propertyName": "spark.app.name",
"defaultValue": "Zeppelin",
"description": "The name of spark application."
},
"zeppelin.dep.localrepo": {
"envName": "ZEPPELIN_DEP_LOCALREPO",
"propertyName": null,
"defaultValue": "local-repo",
"description": "local repository for dependency loader"
}
}
},
Expand Down

0 comments on commit 29562d6

Please sign in to comment.