# Testing Spark SQL

Note that this Cloudera version comes with Spark 1.6. Spark 2.0 is the new and powerful version.

Source: some materials from Databricks courses at https://databricks.com/training

### Spark Context

In Spark, communication occurs between a driver and executors.  The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion.  The results from these tasks are delivered back to the driver.

In part 1, we saw that normal Python code can be executed via cells. When using Databricks this code gets executed in the Spark driver's Java Virtual Machine (JVM) and not in an executor's JVM, and when using an Jupyter notebook it is executed within the kernel associated with the notebook. Since no Spark functionality is actually being used, no tasks are launched on the executors.

In order to use Spark and its DataFrame API we will need to use a `SQLContext`.  When running Spark, you start a new Spark application by creating a [SparkContext](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext). You can then create a [SQLContext](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext) from the `SparkContext`. When the `SparkContext` is created, it asks the master for some cores to use to do work.  The master sets these cores aside just for you; they won't be used for other applications. When using Databricks, both a `SparkContext` and a `SQLContext` are created for you automatically. `sc` is your `SparkContext`, and `sqlContext` is your `SQLContext`.

### Example Cluster
The diagram shows an example cluster, where the slots allocated for an application are outlined in purple. (Note: We're using the term _slots_ here to indicate threads available to perform parallel work for Spark.
Spark documentation often refers to these threads as _cores_, which is a confusing term, as the number of slots available on a particular machine does not necessarily have any relationship to the number of physical CPU
cores on that machine.)

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-2a.png" style="height: 800px;float: right"/>

You can view the details of your Spark application in the Spark web UI.  The web UI is accessible in Databricks by going to "Clusters" and then clicking on the "Spark UI" link for your cluster.  In the web UI, under the "Jobs" tab, you can see a list of jobs that have been scheduled or run.  It's likely there isn't any thing interesting here yet because we haven't run any jobs, but we'll return to this page later.

At a high level, every Spark application consists of a driver program that launches various parallel operations on executor Java Virtual Machines (JVMs) running either in a cluster or locally on the same machine. In Databricks, "Databricks Shell" is the driver program.  When running locally, `pyspark` is the driver program. In all cases, this driver program contains the main loop for the program and creates distributed datasets on the cluster, then applies operations (transformations & actions) to those datasets.
Driver programs access Spark through a SparkContext object, which represents a connection to a computing cluster. A Spark SQL context object (`sqlContext`) is the main entry point for Spark DataFrame and SQL functionality. A `SQLContext` can be used to create DataFrames, which allows you to direct the operations on your data.

Try printing out `sqlContext` to see its type.

#### ** Part 1: Test Spark functionality **
Start creating a new SQLContext

In [26]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# Display the type of the Spark sqlContext
type(sqlContext)

pyspark.sql.context.SQLContext

In [27]:
# List sqlContext's attributes
dir(sqlContext)

['__class__',
 '__delattr__',
 '__dict__',
 '__doc__',
 '__format__',
 '__getattribute__',
 '__hash__',
 '__init__',
 '__module__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_createFromLocal',
 '_createFromRDD',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedContext',
 '_jsc',
 '_jvm',
 '_sc',
 '_scala_SQLContext',
 '_ssql_ctx',
 'applySchema',
 'cacheTable',
 'clearCache',
 'createDataFrame',
 'createExternalTable',
 'dropTempTable',
 'getConf',
 'getOrCreate',
 'inferSchema',
 'jsonFile',
 'jsonRDD',
 'load',
 'newSession',
 'parquetFile',
 'range',
 'read',
 'registerDataFrameAsTable',
 'registerFunction',
 'setConf',
 'sql',
 'table',
 'tableNames',
 'tables',
 'udf',
 'uncacheTable']

In [28]:
# Use help to obtain more detailed information
help(sqlContext)

Help on SQLContext in module pyspark.sql.context object:

class SQLContext(__builtin__.object)
 |  Main entry point for Spark SQL functionality.
 |  
 |  A SQLContext can be used create :class:`DataFrame`, register :class:`DataFrame` as
 |  tables, execute SQL over tables, cache tables, and read parquet files.
 |  
 |  :param sparkContext: The :class:`SparkContext` backing this SQLContext.
 |  :param sqlContext: An optional JVM Scala SQLContext. If set, we do not instantiate a new
 |      SQLContext in the JVM, instead we make all calls to this object.
 |  
 |  Methods defined here:
 |  
 |  __init__(self, sparkContext, sqlContext=None)
 |      Creates a new SQLContext.
 |      
 |      >>> from datetime import datetime
 |      >>> sqlContext = SQLContext(sc)
 |      >>> allTypes = sc.parallelize([Row(i=1, s="string", d=1.0, l=1,
 |      ...     b=True, list=[1, 2, 3], dict={"s": 0}, row=Row(a=1),
 |      ...     time=datetime(2014, 8, 1, 14, 1, 5))])
 |      >>> df = allTypes.toDF()
 

In [30]:
# After reading the help we've decided we want to use sc.version to see what version of Spark we are running
sc.version

u'1.6.0'

### Distributed data and using a collection to create a DataFrame

In Spark, datasets are represented as a list of entries, where the list is broken up into many different partitions that are each stored on a different machine.  Each partition holds a unique subset of the entries in the list.  Spark calls datasets that it stores "Resilient Distributed Datasets" (RDDs). Even DataFrames are ultimately represented as RDDs, with additional meta-data.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3b.png" style="width: 900px; float: right; margin: 5px"/>

One of the defining features of Spark, compared to other data analytics frameworks (e.g., Hadoop), is that it stores data in memory rather than on disk.  This allows Spark applications to run much more quickly, because they are not slowed down by needing to read data from disk.
The figure to the right illustrates how Spark breaks a list of data entries into partitions that are each stored in memory on a worker.


To create the DataFrame, we'll use `sqlContext.createDataFrame()`, and we'll pass our array of data in as an argument to that function. Spark will create a new set of input data based on data that is passed in.  A DataFrame requires a _schema_, which is a list of columns, where each column has a name and a type. Our list of data has elements with types (mostly strings, but one integer). We'll supply the rest of the schema and the column names as the second argument to `createDataFrame()`.

### Working with your first DataFrames

In Spark, we first create a base [DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame). We can then apply one or more transformations to that base DataFrame. *A DataFrame is immutable, so once it is created, it cannot be changed.* As a result, each transformation creates a new DataFrame. Finally, we can apply one or more actions to the DataFrames.

> Note that Spark uses lazy evaluation, so transformations are not actually executed until an action occurs.

A DataFrame consists of a series of `Row` objects; each `Row` object has a set of named columns. You can think of a DataFrame as modeling a table, though the data source being processed does not have to be a table.

More formally, a DataFrame must have a _schema_, which means it must consist of columns, each of which has a _name_ and a _type_. Some data sources have schemas built into them. Examples include RDBMS databases, Parquet files, and NoSQL databases like Cassandra. Other data sources don't have computer-readable schemas, but you can often apply a schema programmatically.

In [2]:
# Check that Spark is working
from pyspark.sql import Row
data = [('Alice', 1), ('Bob', 2), ('Bill', 4)]
df = sqlContext.createDataFrame(data, ['name', 'age'])
fil = df.filter(df.age > 3).collect()
print fil

# If the Spark job doesn't work properly this will raise an AssertionError
assert fil == [Row(u'Bill', 4)]

[Row(name=u'Bill', age=4)]


** (2b) Loading a text file **

Let's load a text file.

In [5]:
dataDF = sqlContext.read.text("file:/home/cloudera/Code/resources/Shakespeare.txt")
shakespeareCount = dataDF.count()

print shakespeareCount

# If the text file didn't load properly an AssertionError will be raised
assert shakespeareCount == 124787

124787


### Use _collect_ to view results

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3d.png" style="height:700px;float:right"/>

To see a list of elements decremented by one, we need to create a new list on the driver from the the data distributed in the executor nodes.  To do this we can call the `collect()` method on our DataFrame.  `collect()` is often used after transformations to ensure that we are only returning a *small* amount of data to the driver.  This is done because the data returned to the driver must fit into the driver's available memory.  If not, the driver will crash.

The `collect()` method is the first action operation that we have encountered.  Action operations cause Spark to perform the (lazy) transformation operations that are required to compute the values returned by the action.  In our example, this means that tasks will now be launched to perform the `createDataFrame`, `select`, and `collect` operations.

In the diagram, the dataset is broken into four partitions, so four `collect()` tasks are launched. Each task collects the entries in its partition and sends the result to the driver, which creates a list of the values, as shown in the figure below.

A better way to visualize the data is to use the `show()` method. If you don't tell `show()` how many rows to display, it displays 20 rows.

In [7]:
dataDF.show()

+--------------------+
|               value|
+--------------------+
|The Project Guten...|
| William Shakespeare|
|                    |
|This eBook is for...|
|almost no restric...|
|re-use it under t...|
|with this eBook o...|
|                    |
|** This is a COPY...|
|**     Please fol...|
|                    |
|Title: The Comple...|
|                    |
|Author: William S...|
|                    |
|Posting Date: Sep...|
|Release Date: Jan...|
|                    |
|   Language: English|
|                    |
+--------------------+
only showing top 20 rows



Print the schema in tree format

In [13]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)



Select one column

In [14]:
df.select("name").show()

+-----+
| name|
+-----+
|Alice|
|  Bob|
| Bill|
+-----+



Be careful with dataFrames

Select everybody, but increment the age by 1

In [15]:
df.select(df['name'], df['age'] + 1).show()

+-----+---------+
| name|(age + 1)|
+-----+---------+
|Alice|        2|
|  Bob|        3|
| Bill|        5|
+-----+---------+



Select people older than 21

In [16]:
df.filter(df['age'] > 21).show()

+----+---+
|name|age|
+----+---+
+----+---+



### Use _count_ to get total

One of the most basic jobs that we can run is the `count()` job which will count the number of elements in a DataFrame, using the `count()` action. Since `select()` creates a new DataFrame with the same number of elements as the starting DataFrame, we expect that applying `count()` to each DataFrame will return the same result.

<img src="http://spark-mooc.github.io/web-assets/images/cs105x/diagram-3e.png" style="height:700px;float:right"/>

Note that because `count()` is an action operation, if we had not already performed an action with `collect()`, then Spark would now perform the transformation operations when we executed `count()`.

Each task counts the entries in its partition and sends the result to your SparkContext, which adds up all of the counts. The figure on the right shows what would happen if we ran `count()` on a small example dataset with just four partitions.

Count people by age

In [17]:
df.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
|  1|    1|
|  2|    1|
|  4|    1|
+---+-----+

