Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ZEPPELIN-1965] Livy SQL Interpreter: Should use df.show(1000, false)… #2201

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion docs/interpreter/livy.md
Expand Up @@ -56,10 +56,15 @@ Example: `spark.driver.memory` to `livy.spark.driver.memory`
<td>URL where livy server is running</td>
</tr>
<tr>
<td>zeppelin.livy.spark.maxResult</td>
<td>zeppelin.livy.spark.sql.maxResult</td>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this, but it might break existing users. Perhaps we don't change this for now?

<td>1000</td>
<td>Max number of Spark SQL result to display.</td>
</tr>
<tr>
<td>zeppelin.livy.spark.sql.field.truncate</td>
<td>true</td>
<td>Whether to truncate field values longer than 20 characters or not</td>
</tr>
<tr>
<td>zeppelin.livy.session.create_timeout</td>
<td>120</td>
Expand Down
Expand Up @@ -272,7 +272,9 @@ public InterpreterResult interpret(String code,
throw new LivyException(e);
}
stmtInfo = getStatementInfo(stmtInfo.id);
paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
if (paragraphId != null) {
paragraphId2StmtProgressMap.put(paragraphId, (int) (stmtInfo.progress * 100));
}
}
if (appendSessionExpired) {
return appendSessionExpire(getResultFromStatementInfo(stmtInfo, displayAppInfo),
Expand Down
Expand Up @@ -32,14 +32,25 @@
*/
public class LivySparkSQLInterpreter extends BaseLivyInterprereter {

public static final String ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE =
"zeppelin.livy.spark.sql.field.truncate";

public static final String ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT =
"zeppelin.livy.spark.sql.maxResult";

private LivySparkInterpreter sparkInterpreter;

private boolean isSpark2 = false;
private int maxResult = 1000;
private boolean truncate = true;

public LivySparkSQLInterpreter(Properties property) {
super(property);
this.maxResult = Integer.parseInt(property.getProperty("zeppelin.livy.spark.sql.maxResult"));
this.maxResult = Integer.parseInt(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_MAX_RESULT));
if (property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE) != null) {
this.truncate =
Boolean.parseBoolean(property.getProperty(ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE));
}
}

@Override
Expand Down Expand Up @@ -111,9 +122,11 @@ public InterpreterResult interpret(String line, InterpreterContext context) {
// use triple quote so that we don't need to do string escape.
String sqlQuery = null;
if (isSpark2) {
sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
sqlQuery = "spark.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
truncate + ")";
} else {
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ")";
sqlQuery = "sqlContext.sql(\"\"\"" + line + "\"\"\").show(" + maxResult + ", " +
truncate + ")";
}
InterpreterResult result = sparkInterpreter.interpret(sqlQuery, context.getParagraphId(),
this.displayAppInfo, true);
Expand Down
5 changes: 5 additions & 0 deletions livy/src/main/resources/interpreter-setting.json
Expand Up @@ -118,6 +118,11 @@
"defaultValue": "1000",
"description": "Max number of Spark SQL result to display."
},
"zeppelin.livy.spark.sql.field.truncate": {
"propertyName": "zeppelin.livy.spark.sql.field.truncate",
"defaultValue": "true",
"description": "If true, truncate field values longer than 20 characters."
},
"zeppelin.livy.concurrentSQL": {
"propertyName": "zeppelin.livy.concurrentSQL",
"defaultValue": "false",
Expand Down
127 changes: 125 additions & 2 deletions livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
Expand Up @@ -20,7 +20,6 @@

import com.cloudera.livy.test.framework.Cluster;
import com.cloudera.livy.test.framework.Cluster$;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.user.AuthenticationInfo;
Expand All @@ -33,7 +32,6 @@
import java.util.Properties;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

public class LivyInterpreterIT {
Expand Down Expand Up @@ -308,6 +306,131 @@ public void testSparkSQLInterpreter() {
}
}

@Test
public void testStringWithTruncation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(properties);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();

LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(properties);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();

try {
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());

boolean isSpark2 = isSpark2(sparkInterpreter, context);

// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\n12characters12cha...\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}

@Test
public void testStringWithoutTruncation() {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties newProps = new Properties();
for (Object name: properties.keySet()) {
newProps.put(name, properties.get(name));
}
newProps.put(LivySparkSQLInterpreter.ZEPPELIN_LIVY_SPARK_SQL_FIELD_TRUNCATE, "false");
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(newProps);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "livy.spark",
"title", "text", authInfo, null, null, null, null, null, output);
sparkInterpreter.open();

LivySparkSQLInterpreter sqlInterpreter = new LivySparkSQLInterpreter(newProps);
interpreterGroup.get("session_1").add(sqlInterpreter);
sqlInterpreter.setInterpreterGroup(interpreterGroup);
sqlInterpreter.open();

try {
// detect spark version
InterpreterResult result = sparkInterpreter.interpret("sc.version", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());

boolean isSpark2 = isSpark2(sparkInterpreter, context);

// test DataFrame api
if (!isSpark2) {
result = sparkInterpreter.interpret(
"val df=sqlContext.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
} else {
result = sparkInterpreter.interpret(
"val df=spark.createDataFrame(Seq((\"12characters12characters\",20))).toDF(\"col_1\", \"col_2\")\n"
+ "df.collect()", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, result.message().size());
assertTrue(result.message().get(0).getData()
.contains("Array[org.apache.spark.sql.Row] = Array([12characters12characters,20])"));
}
sparkInterpreter.interpret("df.registerTempTable(\"df\")", context);
// test LivySparkSQLInterpreter which share the same SparkContext with LivySparkInterpreter
result = sqlInterpreter.interpret("select * from df where col_1='12characters12characters'", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(InterpreterResult.Type.TABLE, result.message().get(0).getType());
assertEquals("col_1\tcol_2\n12characters12characters\t20", result.message().get(0).getData());
} finally {
sparkInterpreter.close();
sqlInterpreter.close();
}
}

@Test
public void testPySparkInterpreter() {
if (!checkPreCondition()) {
Expand Down