Skip to content

Commit

Permalink
[SPARK-5193][SQL] Remove Spark SQL Java-specific API.
Browse files Browse the repository at this point in the history
After the following patches, the main (Scala) API is now usable for Java users directly.

#4056
#4054
#4049
#4030
#3965
#3958

Author: Reynold Xin <rxin@databricks.com>

Closes #4065 from rxin/sql-java-api and squashes the following commits:

b1fd860 [Reynold Xin] Fix Mima
6d86578 [Reynold Xin] Ok one more attempt in fixing Python...
e8f1455 [Reynold Xin] Fix Python again...
3e53f91 [Reynold Xin] Fixed Python.
83735da [Reynold Xin] Fix BigDecimal test.
e9f1de3 [Reynold Xin] Use scala BigDecimal.
500d2c4 [Reynold Xin] Fix Decimal.
ba3bfa2 [Reynold Xin] Updated javadoc for RowFactory.
c4ae1c5 [Reynold Xin] [SPARK-5193][SQL] Remove Spark SQL Java-specific API.
  • Loading branch information
rxin committed Jan 17, 2015
1 parent ee1c1f3 commit 61b427d
Show file tree
Hide file tree
Showing 27 changed files with 125 additions and 1,516 deletions.
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
echo "You need to build Spark before running $1." 1>&2
echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import org.apache.spark.ml.tuning.CrossValidator;
import org.apache.spark.ml.tuning.CrossValidatorModel;
import org.apache.spark.ml.tuning.ParamGridBuilder;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

/**
* A simple example demonstrating model selection using CrossValidator.
Expand All @@ -55,7 +55,7 @@ public class JavaCrossValidatorExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaCrossValidatorExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaSQLContext jsql = new JavaSQLContext(jsc);
SQLContext jsql = new SQLContext(jsc);

// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
Expand All @@ -71,8 +71,7 @@ public static void main(String[] args) {
new LabeledDocument(9L, "a e c l", 0.0),
new LabeledDocument(10L, "spark compile", 1.0),
new LabeledDocument(11L, "hadoop software", 0.0));
JavaSchemaRDD training =
jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand Down Expand Up @@ -113,11 +112,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test).registerAsTable("prediction");
JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

/**
* A simple example demonstrating ways to specify parameters for Estimators and Transformers.
Expand All @@ -44,7 +44,7 @@ public class JavaSimpleParamsExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaSimpleParamsExample");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaSQLContext jsql = new JavaSQLContext(jsc);
SQLContext jsql = new SQLContext(jsc);

// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
Expand All @@ -54,7 +54,7 @@ public static void main(String[] args) {
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
JavaSchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);

// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
Expand Down Expand Up @@ -94,14 +94,14 @@ public static void main(String[] args) {
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);

// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
// column since we renamed the lr.scoreCol parameter previously.
model2.transform(test).registerAsTable("results");
JavaSchemaRDD results =
SchemaRDD results =
jsql.sql("SELECT features, label, probability, prediction FROM results");
for (Row r: results.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@

import com.google.common.collect.Lists;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.Row;

/**
* A simple text classification pipeline that recognizes "spark" from input text. It uses the Java
Expand All @@ -46,16 +46,15 @@ public class JavaSimpleTextClassificationPipeline {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("JavaSimpleTextClassificationPipeline");
JavaSparkContext jsc = new JavaSparkContext(conf);
JavaSQLContext jsql = new JavaSQLContext(jsc);
SQLContext jsql = new SQLContext(jsc);

// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
JavaSchemaRDD training =
jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
Expand All @@ -80,11 +79,11 @@ public static void main(String[] args) {
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(7L, "apache hadoop"));
JavaSchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);

// Make predictions on test documents.
model.transform(test).registerAsTable("prediction");
JavaSchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
for (Row r: predictions.collect()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
+ ", prediction=" + r.get(3));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.Row;

public class JavaSparkSQL {
public static class Person implements Serializable {
Expand All @@ -55,7 +55,7 @@ public void setAge(int age) {
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
SQLContext sqlCtx = new SQLContext(ctx);

System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
Expand All @@ -74,15 +74,15 @@ public Person call(String line) {
});

// Apply a schema to an RDD of Java Beans and register it as a table.
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
schemaPeople.registerTempTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
Expand All @@ -99,13 +99,13 @@ public String call(Row row) {
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile");
JavaSchemaRDD teenagers2 =
SchemaRDD teenagers2 =
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
teenagerNames = teenagers2.map(new Function<Row, String>() {
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0);
Expand All @@ -120,7 +120,7 @@ public String call(Row row) {
// The path can be either a single text file or a directory storing text files.
String path = "examples/src/main/resources/people.json";
// Create a JavaSchemaRDD from the file(s) pointed by path
JavaSchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);

// Because the schema of a JSON dataset is automatically inferred, to write queries,
// it is better to take a look at what is the schema.
Expand All @@ -134,11 +134,11 @@ public String call(Row row) {
peopleFromJsonFile.registerTempTable("people");

// SQL statements can be run by using the sql methods provided by sqlCtx.
JavaSchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
teenagerNames = teenagers3.map(new Function<Row, String>() {
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) { return "Name: " + row.getString(0); }
}).collect();
Expand All @@ -151,7 +151,7 @@ public String call(Row row) {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
JavaSchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD);
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());

// Take a look at the schema of this new JavaSchemaRDD.
peopleFromJsonRDD.printSchema();
Expand All @@ -164,8 +164,8 @@ public String call(Row row) {

peopleFromJsonRDD.registerTempTable("people2");

JavaSchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.map(new Function<Row, String>() {
SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
@Override
public String call(Row row) {
return "Name: " + row.getString(0) + ", City: " + row.getString(1);
Expand Down
38 changes: 0 additions & 38 deletions mllib/src/main/scala/org/apache/spark/ml/Estimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
package org.apache.spark.ml

import scala.annotation.varargs
import scala.collection.JavaConverters._

import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param.{ParamMap, ParamPair, Params}
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.api.java.JavaSchemaRDD

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -66,40 +64,4 @@ abstract class Estimator[M <: Model[M]] extends PipelineStage with Params {
def fit(dataset: SchemaRDD, paramMaps: Array[ParamMap]): Seq[M] = {
paramMaps.map(fit(dataset, _))
}

// Java-friendly versions of fit.

/**
* Fits a single model to the input data with optional parameters.
*
* @param dataset input dataset
* @param paramPairs optional list of param pairs (overwrite embedded params)
* @return fitted model
*/
@varargs
def fit(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): M = {
fit(dataset.schemaRDD, paramPairs: _*)
}

/**
* Fits a single model to the input data with provided parameter map.
*
* @param dataset input dataset
* @param paramMap parameter map
* @return fitted model
*/
def fit(dataset: JavaSchemaRDD, paramMap: ParamMap): M = {
fit(dataset.schemaRDD, paramMap)
}

/**
* Fits multiple models to the input data with multiple sets of parameters.
*
* @param dataset input dataset
* @param paramMaps an array of parameter maps
* @return fitted models, matching the input parameter maps
*/
def fit(dataset: JavaSchemaRDD, paramMaps: Array[ParamMap]): java.util.List[M] = {
fit(dataset.schemaRDD, paramMaps).asJava
}
}
24 changes: 0 additions & 24 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.SchemaRDD
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.analysis.Star
import org.apache.spark.sql.catalyst.expressions.ScalaUdf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -55,29 +54,6 @@ abstract class Transformer extends PipelineStage with Params {
* @return transformed dataset
*/
def transform(dataset: SchemaRDD, paramMap: ParamMap): SchemaRDD

// Java-friendly versions of transform.

/**
* Transforms the dataset with optional parameters.
* @param dataset input datset
* @param paramPairs optional list of param pairs, overwrite embedded params
* @return transformed dataset
*/
@varargs
def transform(dataset: JavaSchemaRDD, paramPairs: ParamPair[_]*): JavaSchemaRDD = {
transform(dataset.schemaRDD, paramPairs: _*).toJavaSchemaRDD
}

/**
* Transforms the dataset with provided parameter map as additional parameters.
* @param dataset input dataset
* @param paramMap additional parameters, overwrite embedded params
* @return transformed dataset
*/
def transform(dataset: JavaSchemaRDD, paramMap: ParamMap): JavaSchemaRDD = {
transform(dataset.schemaRDD, paramMap).toJavaSchemaRDD
}
}

/**
Expand Down
17 changes: 8 additions & 9 deletions mllib/src/test/java/org/apache/spark/ml/JavaPipelineSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,23 @@
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.StandardScaler;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import static org.apache.spark.mllib.classification.LogisticRegressionSuite
.generateLogisticInputAsList;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.SQLContext;
import static org.apache.spark.mllib.classification.LogisticRegressionSuite.generateLogisticInputAsList;

/**
* Test Pipeline construction and fitting in Java.
*/
public class JavaPipelineSuite {

private transient JavaSparkContext jsc;
private transient JavaSQLContext jsql;
private transient JavaSchemaRDD dataset;
private transient SQLContext jsql;
private transient SchemaRDD dataset;

@Before
public void setUp() {
jsc = new JavaSparkContext("local", "JavaPipelineSuite");
jsql = new JavaSQLContext(jsc);
jsql = new SQLContext(jsc);
JavaRDD<LabeledPoint> points =
jsc.parallelize(generateLogisticInputAsList(1.0, 1.0, 100, 42), 2);
dataset = jsql.applySchema(points, LabeledPoint.class);
Expand All @@ -66,7 +65,7 @@ public void pipeline() {
.setStages(new PipelineStage[] {scaler, lr});
PipelineModel model = pipeline.fit(dataset);
model.transform(dataset).registerTempTable("prediction");
JavaSchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
predictions.collect();
SchemaRDD predictions = jsql.sql("SELECT label, score, prediction FROM prediction");
predictions.collectAsList();
}
}
Loading

0 comments on commit 61b427d

Please sign in to comment.