From 770abc52e2f3d34e8638b2126587daa4af3490c2 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Mon, 6 Jun 2016 21:03:15 -0700 Subject: [PATCH 1/4] update sql programming guide --- docs/sql-programming-guide.md | 291 +++++++++++++++++----------------- 1 file changed, 148 insertions(+), 143 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 940c1d77045ad..8e81b9cc5efbb 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -55,21 +55,21 @@ in a future release. # Getting Started -## Starting Point: SQLContext +## Starting Point: SparkSession
The entry point into all functionality in Spark SQL is the -[`SQLContext`](api/scala/index.html#org.apache.spark.sql.SQLContext) class, or one of its -descendants. To create a basic `SQLContext`, all you need is a SparkContext. +[`SparkSession`](api/scala/index.html#org.apache.spark.sql.SparkSession) class, or one of its +descendants. To create a basic `SparkSession`, all you need is the following code. {% highlight scala %} -val sc: SparkContext // An existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark = SparkSession.builder.appName("theAppName").getOrCreate() +val sc = spark.sparkContext // get the underlying SparkContext if needed // this is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ {% endhighlight %}
@@ -77,12 +77,11 @@ import sqlContext.implicits._
The entry point into all functionality in Spark SQL is the -[`SQLContext`](api/java/index.html#org.apache.spark.sql.SQLContext) class, or one of its -descendants. To create a basic `SQLContext`, all you need is a SparkContext. +[`SparkSession`](api/java/index.html#org.apache.spark.sql.SparkSession) class, or one of its +descendants. To create a basic `SparkSession`, all you need is the following code. {% highlight java %} -JavaSparkContext sc = ...; // An existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = SparkSession.builder().appName("theAppName").getOrCreate(); {% endhighlight %}
@@ -90,12 +89,12 @@ SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
The entry point into all relational functionality in Spark is the -[`SQLContext`](api/python/pyspark.sql.html#pyspark.sql.SQLContext) class, or one -of its decedents. To create a basic `SQLContext`, all you need is a SparkContext. +[`SparkSession`](api/python/pyspark.sql.html#pyspark.sql.SparkSession) class, or one +of its decedents. To create a basic `SparkSession`, all you need is the following code. {% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +from pyspark.sql import SparkSession +spark = SparkSession.builder.appName("theAppName").getOrCreate() {% endhighlight %}
@@ -112,30 +111,26 @@ sqlContext <- sparkRSQL.init(sc)
-In addition to the basic `SQLContext`, you can also create a `HiveContext`, which provides a -superset of the functionality provided by the basic `SQLContext`. Additional features include +In addition to the basic `SparkSession`, you can also create a `SparkSession` with Hive support, +using code `val spark = SparkSession.builder.appName("HiveFromSpark").enableHiveSupport().getOrCreate()`, +which provides a superset of the functionality provided by the basic `SparkSession`. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the -ability to read data from Hive tables. To use a `HiveContext`, you do not need to have an -existing Hive setup, and all of the data sources available to a `SQLContext` are still available. -`HiveContext` is only packaged separately to avoid including all of Hive's dependencies in the default -Spark build. If these dependencies are not a problem for your application then using `HiveContext` -is recommended for the 1.3 release of Spark. Future releases will focus on bringing `SQLContext` up -to feature parity with a `HiveContext`. +ability to read data from Hive tables. To use a `SparkSession` with Hive support, you do not need to have an +existing Hive setup, and all of the data sources available to a basic `SparkSession` are still available. ## Creating DataFrames -With a `SQLContext`, applications can create `DataFrame`s from an existing `RDD`, from a Hive table, or from data sources. +With a `SparkSession`, applications can create `DataFrame`s from an existing `RDD`, from a Hive table, or from data sources. As an example, the following creates a `DataFrame` based on the content of a JSON file:
{% highlight scala %} -val sc: SparkContext // An existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession // An existing SparkSession. -val df = sqlContext.read.json("examples/src/main/resources/people.json") +val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() @@ -145,10 +140,9 @@ df.show()
{% highlight java %} -JavaSparkContext sc = ...; // An existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark; // An existing SparkSession. -DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); +DataFrame df = spark.read().json("examples/src/main/resources/people.json"); // Displays the content of the DataFrame to stdout df.show(); @@ -158,10 +152,10 @@ df.show();
{% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +from pyspark.sql import SparkSession +spark = SparkSession.builder.appName("theAppName").getOrCreate() -df = sqlContext.read.json("examples/src/main/resources/people.json") +df = spark.read.json("examples/src/main/resources/people.json") # Displays the content of the DataFrame to stdout df.show() @@ -193,11 +187,10 @@ Here we include some basic examples of structured data processing using DataFram
{% highlight scala %} -val sc: SparkContext // An existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession // An existing SparkSession. // Create the DataFrame -val df = sqlContext.read.json("examples/src/main/resources/people.json") +val df = spark.read.json("examples/src/main/resources/people.json") // Show the content of the DataFrame df.show() @@ -248,11 +241,10 @@ In addition to simple column references and expressions, DataFrames also have a
{% highlight java %} -JavaSparkContext sc // An existing SparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc) +SparkSession spark; // An existing SparkSession. // Create the DataFrame -DataFrame df = sqlContext.read().json("examples/src/main/resources/people.json"); +DataFrame df = spark.read().json("examples/src/main/resources/people.json"); // Show the content of the DataFrame df.show(); @@ -308,11 +300,11 @@ latter form, which is future proof and won't break with column names that are also attributes on the DataFrame class. {% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +from pyspark.sql import SparkSession +spark = SparkSession.builder.appName("theAppName").getOrCreate() # Create the DataFrame -df = sqlContext.read.json("examples/src/main/resources/people.json") +df = spark.read.json("examples/src/main/resources/people.json") # Show the content of the DataFrame df.show() @@ -419,28 +411,28 @@ In addition to simple column references and expressions, DataFrames also have a ## Running SQL Queries Programmatically -The `sql` function on a `SQLContext` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`. +The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.
{% highlight scala %} -val sqlContext = ... // An existing SQLContext -val df = sqlContext.sql("SELECT * FROM table") +val spark: SparkSession = ... // An existing SparkSession. +val df = spark.sql("SELECT * FROM table") {% endhighlight %}
{% highlight java %} -SQLContext sqlContext = ... // An existing SQLContext -DataFrame df = sqlContext.sql("SELECT * FROM table") +SparkSession spark = ...; // An existing SparkSession. +DataFrame df = spark.sql("SELECT * FROM table") {% endhighlight %}
{% highlight python %} -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) -df = sqlContext.sql("SELECT * FROM table") +from pyspark.sql import SparkSession +spark = SparkSession.builder.appName("theAppName").getOrCreate() +df = spark.sql("SELECT * FROM table") {% endhighlight %}
@@ -467,7 +459,8 @@ the bytes back into an object.
{% highlight scala %} -// Encoders for most common types are automatically provided by importing sqlContext.implicits._ +SparkSession spark = ...; // An existing SparkSession. +// Encoders for most common types are automatically provided by importing spark.implicits._ val ds = Seq(1, 2, 3).toDS() ds.map(_ + 1).collect() // Returns: Array(2, 3, 4) @@ -477,7 +470,7 @@ val ds = Seq(Person("Andy", 32)).toDS() // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name. val path = "examples/src/main/resources/people.json" -val people = sqlContext.read.json(path).as[Person] +val people = spark.read.json(path).as[Person] {% endhighlight %} @@ -486,8 +479,7 @@ val people = sqlContext.read.json(path).as[Person]
{% highlight java %} -JavaSparkContext sc = ...; // An existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = ...; // An existing SparkSession. {% endhighlight %}
@@ -517,10 +509,10 @@ types such as Sequences or Arrays. This RDD can be implicitly converted to a Dat registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession = ...; // An existing SparkSession. +val sc = spark.sparkContext // this is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ // Define the schema using a case class. // Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, @@ -531,8 +523,8 @@ case class Person(name: String, age: Int) val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF() people.createOrReplaceTempView("people") -// SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") +// SQL statements can be run by using the sql methods provided by SparkSession. +val teenagers = spark.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index: @@ -586,8 +578,8 @@ A schema can be applied to an existing RDD by calling `createDataFrame` and prov for the JavaBean. {% highlight java %} -// sc is an existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = ...; // An existing SparkSession. +SparkContext sc = spark.sparkContext(); // Load a text file and convert each line to a JavaBean. JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( @@ -604,11 +596,11 @@ JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").m }); // Apply a schema to an RDD of JavaBeans and register it as a table. -DataFrame schemaPeople = sqlContext.createDataFrame(people, Person.class); +DataFrame schemaPeople = spark.createDataFrame(people, Person.class); schemaPeople.createOrReplaceTempView("people"); // SQL can be run over RDDs that have been registered as tables. -DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +DataFrame teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -633,8 +625,9 @@ performed on JSON files. {% highlight python %} # sc is an existing SparkContext. -from pyspark.sql import SQLContext, Row -sqlContext = SQLContext(sc) +from pyspark.sql import SparkSession, Row +spark = SparkSession.builder.appName("theAppName").getOrCreate() +sc = spark.sparkContext # Load a text file and convert each line to a Row. lines = sc.textFile("examples/src/main/resources/people.txt") @@ -642,11 +635,11 @@ parts = lines.map(lambda l: l.split(",")) people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) # Infer the schema, and register the DataFrame as a table. -schemaPeople = sqlContext.createDataFrame(people) +schemaPeople = spark.createDataFrame(people) schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. -teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # The results of SQL queries are RDDs and support all the normal RDD operations. teenNames = teenagers.map(lambda p: "Name: " + p.name) @@ -673,12 +666,12 @@ a `DataFrame` can be created programmatically with three steps. 2. Create the schema represented by a `StructType` matching the structure of `Row`s in the RDD created in Step 1. 3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided -by `SQLContext`. +by `SparkSession`. For example: {% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +val spark: SparkSession = ...; // An existing SparkSession. +val sc = spark.sparkContext // Create an RDD val people = sc.textFile("examples/src/main/resources/people.txt") @@ -701,13 +694,13 @@ val schema = val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // Apply the schema to the RDD. -val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema) +val peopleDataFrame = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame. peopleDataFrame.createOrReplaceTempView("people") -// SQL statements can be run by using the sql methods provided by sqlContext. -val results = sqlContext.sql("SELECT name FROM people") +// SQL statements can be run by using the sql methods provided by SparkSession. +val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by field index or by field name. @@ -728,7 +721,7 @@ a `DataFrame` can be created programmatically with three steps. 2. Create the schema represented by a `StructType` matching the structure of `Row`s in the RDD created in Step 1. 3. Apply the schema to the RDD of `Row`s via `createDataFrame` method provided -by `SQLContext`. +by `SparkSesstion`. For example: {% highlight java %} @@ -743,8 +736,8 @@ import org.apache.spark.sql.Row; // Import RowFactory. import org.apache.spark.sql.RowFactory; -// sc is an existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +SparkSession spark = ...; // An existing SparkSession. +SparkContext sc = spark.sparkContext(); // Load a text file and convert each line to a JavaBean. JavaRDD people = sc.textFile("examples/src/main/resources/people.txt"); @@ -769,13 +762,13 @@ JavaRDD rowRDD = people.map( }); // Apply the schema to the RDD. -DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema); +DataFrame peopleDataFrame = spark.createDataFrame(rowRDD, schema); // Creates a temporary view using the DataFrame. peopleDataFrame.createOrReplaceTempView("people"); // SQL can be run over a temporary view created using DataFrames. -DataFrame results = sqlContext.sql("SELECT name FROM people"); +DataFrame results = spark.sql("SELECT name FROM people"); // The results of SQL queries are DataFrames and support all the normal RDD operations. // The columns of a row in the result can be accessed by ordinal. @@ -799,16 +792,16 @@ a `DataFrame` can be created programmatically with three steps. 1. Create an RDD of tuples or lists from the original RDD; 2. Create the schema represented by a `StructType` matching the structure of tuples or lists in the RDD created in the step 1. -3. Apply the schema to the RDD via `createDataFrame` method provided by `SQLContext`. +3. Apply the schema to the RDD via `createDataFrame` method provided by `SparkSession`. For example: {% highlight python %} -# Import SQLContext and data types -from pyspark.sql import SQLContext +# Import SparkSession and data types +from pyspark.sql import SparkSession from pyspark.sql.types import * -# sc is an existing SparkContext. -sqlContext = SQLContext(sc) +spark = SparkSession.builder.appName("theAppName").getOrCreate() +sc = spark.sparkContext # Load a text file and convert each line to a tuple. lines = sc.textFile("examples/src/main/resources/people.txt") @@ -822,13 +815,13 @@ fields = [StructField(field_name, StringType(), True) for field_name in schemaSt schema = StructType(fields) # Apply the schema to the RDD. -schemaPeople = sqlContext.createDataFrame(people, schema) +schemaPeople = spark.createDataFrame(people, schema) # Creates a temporary view using the DataFrame schemaPeople.createOrReplaceTempView("people") # SQL can be run over DataFrames that have been registered as a table. -results = sqlContext.sql("SELECT name FROM people") +results = spark.sql("SELECT name FROM people") # The results of SQL queries are RDDs and support all the normal RDD operations. names = results.map(lambda p: "Name: " + p.name) @@ -858,7 +851,8 @@ In the simplest form, the default data source (`parquet` unless otherwise config
{% highlight scala %} -val df = sqlContext.read.load("examples/src/main/resources/users.parquet") +val spark: SparkSession = ...; // An existing SparkSession. +val df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") {% endhighlight %} @@ -868,7 +862,8 @@ df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") {% highlight java %} -DataFrame df = sqlContext.read().load("examples/src/main/resources/users.parquet"); +SparkSession spark = ...; // An existing SparkSession. +DataFrame df = spark.read().load("examples/src/main/resources/users.parquet"); df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); {% endhighlight %} @@ -879,7 +874,8 @@ df.select("name", "favorite_color").write().save("namesAndFavColors.parquet"); {% highlight python %} -df = sqlContext.read.load("examples/src/main/resources/users.parquet") +spark = SparkSession.builder.appName("theAppName").getOrCreate() +df = spark.read.load("examples/src/main/resources/users.parquet") df.select("name", "favorite_color").write.save("namesAndFavColors.parquet") {% endhighlight %} @@ -908,7 +904,8 @@ using this syntax.
{% highlight scala %} -val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json") +val spark: SparkSession = ...; // An existing SparkSession. +val df = spark.read.format("json").load("examples/src/main/resources/people.json") df.select("name", "age").write.format("parquet").save("namesAndAges.parquet") {% endhighlight %} @@ -918,7 +915,8 @@ df.select("name", "age").write.format("parquet").save("namesAndAges.parquet") {% highlight java %} -DataFrame df = sqlContext.read().format("json").load("examples/src/main/resources/people.json"); +SparkSession spark = ...; // An existing SparkSession. +DataFrame df = spark.read().format("json").load("examples/src/main/resources/people.json"); df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); {% endhighlight %} @@ -929,7 +927,8 @@ df.select("name", "age").write().format("parquet").save("namesAndAges.parquet"); {% highlight python %} -df = sqlContext.read.load("examples/src/main/resources/people.json", format="json") +spark = SparkSession.builder.appName("theAppName").getOrCreate() +df = spark.read.load("examples/src/main/resources/people.json", format="json") df.select("name", "age").write.save("namesAndAges.parquet", format="parquet") {% endhighlight %} @@ -956,7 +955,8 @@ file directly with SQL.
{% highlight scala %} -val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") +val spark: SparkSession = ...; // An existing SparkSession. +val df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") {% endhighlight %}
@@ -964,14 +964,16 @@ val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/user
{% highlight java %} -DataFrame df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); +SparkSession spark = ...; // An existing SparkSession. +DataFrame df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`"); {% endhighlight %}
{% highlight python %} -df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") +spark = SparkSession.builder.appName("theAppName").getOrCreate() +df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") {% endhighlight %}
@@ -1032,12 +1034,12 @@ new data. ### Saving to Persistent Tables -When working with a `HiveContext`, `DataFrames` can also be saved as persistent tables using the -`saveAsTable` command. Unlike the `registerTempTable` command, `saveAsTable` will materialize the +When working with a `SparkSession` with Hive support, `DataFrames` can also be saved as persistent tables using the +`write.saveAsTable` command. Unlike the `createOrReplaceTempView` command, `saveAsTable` will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the `table` -method on a `SQLContext` with the name of the table. +method on a `SparkSession` with the name of the table. By default `saveAsTable` will create a "managed table", meaning that the location of the data will be controlled by the metastore. Managed tables will also have their data deleted automatically @@ -1059,9 +1061,9 @@ Using the data from the above example:
{% highlight scala %} -// sqlContext from the previous example is used in this example. +// spark:SparkSession from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ val people: RDD[Person] = ... // An RDD of case class objects, from the previous example. @@ -1070,11 +1072,11 @@ people.write.parquet("people.parquet") // Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a Parquet file is also a DataFrame. -val parquetFile = sqlContext.read.parquet("people.parquet") +val parquetFile = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile") -val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +val teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers.map(t => "Name: " + t(0)).collect().foreach(println) {% endhighlight %} @@ -1083,7 +1085,7 @@ teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
{% highlight java %} -// sqlContext from the previous example is used in this example. +// spark:SparkSession from the previous example is used in this example. DataFrame schemaPeople = ... // The DataFrame from the previous example. @@ -1092,11 +1094,11 @@ schemaPeople.write().parquet("people.parquet"); // Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. // The result of loading a parquet file is also a DataFrame. -DataFrame parquetFile = sqlContext.read().parquet("people.parquet"); +DataFrame parquetFile = spark.read().parquet("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile"); -DataFrame teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +DataFrame teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); List teenagerNames = teenagers.javaRDD().map(new Function() { public String call(Row row) { return "Name: " + row.getString(0); @@ -1109,7 +1111,7 @@ List teenagerNames = teenagers.javaRDD().map(new Function()
{% highlight python %} -# sqlContext from the previous example is used in this example. +# spark:SparkSession from the previous example is used in this example. schemaPeople # The DataFrame from the previous example. @@ -1118,11 +1120,11 @@ schemaPeople.write.parquet("people.parquet") # Read in the Parquet file created above. Parquet files are self-describing so the schema is preserved. # The result of loading a parquet file is also a DataFrame. -parquetFile = sqlContext.read.parquet("people.parquet") +parquetFile = spark.read.parquet("people.parquet") # Parquet files can also be used to create a temporary view and then used in SQL statements. parquetFile.createOrReplaceTempView("parquetFile"); -teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") +teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenNames = teenagers.map(lambda p: "Name: " + p.name) for teenName in teenNames.collect(): print(teenName) @@ -1207,7 +1209,7 @@ path {% endhighlight %} -By passing `path/to/table` to either `SQLContext.read.parquet` or `SQLContext.read.load`, Spark SQL +By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL will automatically extract the partitioning information from the paths. Now the schema of the returned DataFrame becomes: @@ -1229,7 +1231,7 @@ can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, w Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths by default. For the above example, if users pass `path/to/table/gender=male` to either -`SQLContext.read.parquet` or `SQLContext.read.load`, `gender` will not be considered as a +`SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a partitioning column. If users need to specify the base path that partition discovery should start with, they can set `basePath` in the data source options. For example, when `path/to/table/gender=male` is the path of the data and @@ -1254,9 +1256,9 @@ turned it off by default starting from 1.5.0. You may enable it by
{% highlight scala %} -// sqlContext from the previous example is used in this example. +// spark:SparkSession from the previous example is used in this example. // This is used to implicitly convert an RDD to a DataFrame. -import sqlContext.implicits._ +import spark.implicits._ // Create a simple DataFrame, stored into a partition directory val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double") @@ -1268,7 +1270,7 @@ val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple") df2.write.parquet("data/test_table/key=2") // Read the partitioned table -val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") +val df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() // The final schema consists of all 3 columns in the Parquet files together @@ -1285,21 +1287,21 @@ df3.printSchema()
{% highlight python %} -# sqlContext from the previous example is used in this example. +# spark:SparkSession from the previous example is used in this example. # Create a simple DataFrame, stored into a partition directory -df1 = sqlContext.createDataFrame(sc.parallelize(range(1, 6))\ +df1 = spark.createDataFrame(sc.parallelize(range(1, 6))\ .map(lambda i: Row(single=i, double=i * 2))) df1.write.parquet("data/test_table/key=1") # Create another DataFrame in a new partition directory, # adding a new column and dropping an existing column -df2 = sqlContext.createDataFrame(sc.parallelize(range(6, 11)) +df2 = spark.createDataFrame(sc.parallelize(range(6, 11)) .map(lambda i: Row(single=i, triple=i * 3))) df2.write.parquet("data/test_table/key=2") # Read the partitioned table -df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table") +df3 = spark.read.option("mergeSchema", "true").parquet("data/test_table") df3.printSchema() # The final schema consists of all 3 columns in the Parquet files together @@ -1417,7 +1419,7 @@ REFRESH TABLE my_table; ### Configuration -Configuration of Parquet can be done using the `setConf` method on `SQLContext` or by running +Configuration of Parquet can be done using the `config` method on `SparkSession` or by running `SET key=value` commands using SQL. @@ -1484,7 +1486,7 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. -This conversion can be done using `SQLContext.read.json()` on either an RDD of String, +This conversion can be done using `SparkSession.read.json()` on either an RDD of String, or a JSON file. Note that the file that is offered as _a json file_ is not a typical JSON file. Each @@ -1492,13 +1494,14 @@ line must contain a separate, self-contained valid JSON object. As a consequence a regular multi-line JSON file will most often fail. {% highlight scala %} -// sc is an existing SparkContext. -val sqlContext = new org.apache.spark.sql.SQLContext(sc) +// spark is an existing SparkSession. +val spark = ... +val sc = spark.sparkContext // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. val path = "examples/src/main/resources/people.json" -val people = sqlContext.read.json(path) +val people = spark.read.json(path) // The inferred schema can be visualized using the printSchema() method. people.printSchema() @@ -1509,21 +1512,21 @@ people.printSchema() // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people") -// SQL statements can be run by using the sql methods provided by sqlContext. -val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +// SQL statements can be run by using the sql methods provided by SparkSession. +val teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. val anotherPeopleRDD = sc.parallelize( """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil) -val anotherPeople = sqlContext.read.json(anotherPeopleRDD) +val anotherPeople = spark.read.json(anotherPeopleRDD) {% endhighlight %}
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. -This conversion can be done using `SQLContext.read().json()` on either an RDD of String, +This conversion can be done using `SparkSession.read().json()` on either an RDD of String, or a JSON file. Note that the file that is offered as _a json file_ is not a typical JSON file. Each @@ -1531,12 +1534,13 @@ line must contain a separate, self-contained valid JSON object. As a consequence a regular multi-line JSON file will most often fail. {% highlight java %} -// sc is an existing JavaSparkContext. -SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); +// spark is an existing SparkSession. +SparkSession spark = ...; +JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // A JSON dataset is pointed to by path. // The path can be either a single text file or a directory storing text files. -DataFrame people = sqlContext.read().json("examples/src/main/resources/people.json"); +DataFrame people = spark.read().json("examples/src/main/resources/people.json"); // The inferred schema can be visualized using the printSchema() method. people.printSchema(); @@ -1547,21 +1551,21 @@ people.printSchema(); // Creates a temporary view using the DataFrame people.createOrReplaceTempView("people"); -// SQL statements can be run by using the sql methods provided by sqlContext. -DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); +// SQL statements can be run by using the sql methods provided by SparkSession. +DataFrame teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19"); // Alternatively, a DataFrame can be created for a JSON dataset represented by // an RDD[String] storing one JSON object per string. List jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD anotherPeopleRDD = sc.parallelize(jsonData); -DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD); +DataFrame anotherPeople = SparkSession.read().json(anotherPeopleRDD); {% endhighlight %}
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. -This conversion can be done using `SQLContext.read.json` on a JSON file. +This conversion can be done using `SparkSession.read.json` on a JSON file. Note that the file that is offered as _a json file_ is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, @@ -1569,12 +1573,13 @@ a regular multi-line JSON file will most often fail. {% highlight python %} # sc is an existing SparkContext. -from pyspark.sql import SQLContext -sqlContext = SQLContext(sc) +from pyspark.sql import SparkSession +spark = ... // spark is an existing SparkSession. +sc = spark.sparkContext() # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files. -people = sqlContext.read.json("examples/src/main/resources/people.json") +people = spark.read.json("examples/src/main/resources/people.json") # The inferred schema can be visualized using the printSchema() method. people.printSchema() @@ -1585,14 +1590,14 @@ people.printSchema() # Creates a temporary view using the DataFrame. people.createOrReplaceTempView("people") -# SQL statements can be run by using the sql methods provided by `sqlContext`. -teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") +# SQL statements can be run by using the sql methods provided by `SparkSession`. +teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") # Alternatively, a DataFrame can be created for a JSON dataset represented by # an RDD[String] storing one JSON object per string. anotherPeopleRDD = sc.parallelize([ '{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']) -anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD) +anotherPeople = spark.jsonRDD(anotherPeopleRDD) {% endhighlight %}
@@ -1896,7 +1901,7 @@ the Data Sources API. The following options are supported:
{% highlight scala %} -val jdbcDF = sqlContext.read.format("jdbc").options( +val jdbcDF = spark.read.format("jdbc").options( Map("url" -> "jdbc:postgresql:dbserver", "dbtable" -> "schema.tablename")).load() {% endhighlight %} @@ -1911,7 +1916,7 @@ Map options = new HashMap<>(); options.put("url", "jdbc:postgresql:dbserver"); options.put("dbtable", "schema.tablename"); -DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); +DataFrame jdbcDF = spark.read().format("jdbc"). options(options).load(); {% endhighlight %} @@ -1921,7 +1926,7 @@ DataFrame jdbcDF = sqlContext.read().format("jdbc"). options(options).load(); {% highlight python %} -df = sqlContext.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load() +df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load() {% endhighlight %} @@ -1966,11 +1971,11 @@ turning on some experimental options. ## Caching Data In Memory -Spark SQL can cache tables using an in-memory columnar format by calling `sqlContext.cacheTable("tableName")` or `dataFrame.cache()`. +Spark SQL can cache tables using an in-memory columnar format by calling `sparkSession.catalog.cacheTable` or `dataFrame.cache()`. Then Spark SQL will scan only required columns and will automatically tune compression to minimize -memory usage and GC pressure. You can call `sqlContext.uncacheTable("tableName")` to remove the table from memory. +memory usage and GC pressure. You can call `sparkSession.catalog.uncacheTable("tableName")` to remove the table from memory. -Configuration of in-memory caching can be done using the `setConf` method on `SQLContext` or by running +Configuration of in-memory caching can be done using the `config` method on `SparkSession` or by running `SET key=value` commands using SQL.
From 20bff4b6ae551d4a367dcc57e56bb1df2fc63bad Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 7 Jun 2016 06:17:20 -0700 Subject: [PATCH 2/4] update JavaSparkContext where forget to modify --- docs/sql-programming-guide.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 8e81b9cc5efbb..ae216bed8a6b3 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -579,7 +579,7 @@ for the JavaBean. {% highlight java %} SparkSession spark = ...; // An existing SparkSession. -SparkContext sc = spark.sparkContext(); +JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // Load a text file and convert each line to a JavaBean. JavaRDD people = sc.textFile("examples/src/main/resources/people.txt").map( @@ -737,7 +737,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; SparkSession spark = ...; // An existing SparkSession. -SparkContext sc = spark.sparkContext(); +JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // Load a text file and convert each line to a JavaBean. JavaRDD people = sc.textFile("examples/src/main/resources/people.txt"); From e86119e5ea59d1cafc4f3fc1884b1ae5044adf8f Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 7 Jun 2016 06:41:14 -0700 Subject: [PATCH 3/4] sparkSession.catalog.refreshTable --- docs/sql-programming-guide.md | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index ae216bed8a6b3..34ffb68447c9d 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1383,8 +1383,8 @@ metadata.
{% highlight scala %} -// sqlContext is an existing HiveContext -sqlContext.refreshTable("my_table") +// spark is an existing SparkSession +spark.catalog.refreshTable("my_table") {% endhighlight %}
@@ -1392,8 +1392,8 @@ sqlContext.refreshTable("my_table")
{% highlight java %} -// sqlContext is an existing HiveContext -sqlContext.refreshTable("my_table") +// spark is an existing SparkSession +spark.catalog.refreshTable("my_table") {% endhighlight %}
@@ -1401,8 +1401,8 @@ sqlContext.refreshTable("my_table")
{% highlight python %} -# sqlContext is an existing HiveContext -sqlContext.refreshTable("my_table") +# // spark is an existing SparkSession +spark.catalog.refreshTable("my_table") {% endhighlight %}
@@ -2157,7 +2157,7 @@ options. - JSON data source will not automatically load new files that are created by other applications (i.e. files that are not inserted to the dataset through Spark SQL). For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore), - users can use `REFRESH TABLE` SQL command or `HiveContext`'s `refreshTable` method + users can use `REFRESH TABLE` SQL command or `SparkSession.catalog`'s `refreshTable` method to include those new files to the table. For a DataFrame representing a JSON dataset, users need to recreate the DataFrame and the new DataFrame will include new files. - DataFrame.withColumn method in pySpark supports adding a new column or replacing existing columns of the same name. From 254ea7fcf66b08ff4893357e81b832f11a7be889 Mon Sep 17 00:00:00 2001 From: WeichenXu Date: Tue, 7 Jun 2016 13:12:33 -0700 Subject: [PATCH 4/4] correct python sparkContext --- docs/sql-programming-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 34ffb68447c9d..ed58e206ee1da 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1575,7 +1575,7 @@ a regular multi-line JSON file will most often fail. # sc is an existing SparkContext. from pyspark.sql import SparkSession spark = ... // spark is an existing SparkSession. -sc = spark.sparkContext() +sc = spark.sparkContext # A JSON dataset is pointed to by path. # The path can be either a single text file or a directory storing text files.