# Basics of Spark on HDInsight

<a href="http://spark.apache.org/" target="_blank">Apache Spark</a> is an open-source parallel processing framework that supports in-memory processing to boost the performance of big-data analytic applications. When you provision a Spark cluster in HDInsight, you provision Azure compute resources with Spark installed and configured. The data to be processed is stored in Azure Blob storage (WASB).

![Spark on HDInsight](https://mysstorage.blob.core.windows.net/notebookimages/overview/SparkArchitecture.png "Spark on HDInsight")

Now that you have created a Spark cluster, let us understand some basics of working with Spark on HDInsight. For detailed discussion on working with Spark, see [Spark Programming Guide](https://spark.apache.org/docs/1.6.2/programming-guide.html).

----------
## Notebook setup

When using Spark kernel notebooks on HDInsight, there is no need to create a SparkContext or a HiveContext; those are all created for you automatically when you run the first code cell, and you'll be able to see the progress printed. The contexts are created with the following variable names:
- SparkContext (sc)
- HiveContext (sqlContext)

To run the cells below, place the cursor in the cell and then press **SHIFT + ENTER**.

----------

## What is an RDD?

Big Data applications rely on iterative, distributed computing for faster processing of large data sets. To distribute data processing over multiple jobs, the data is typically reused or shared across jobs. To share data between  existing distributed computing systems you need to store data in some intermediate stable distributed store such as HDFS. This makes the overall computations of jobs slower.

**Resilient Distributed Datasets** or RDDs address this by enabling fault-tolerant, distributed, in-memory computations.

----------

## How do I make an RDD?

RDDs can be created from stable storage or by transforming other RDDs. Run the cells below to create RDDs from the sample data files available in the storage container associated with your Spark cluster. One such sample data file is available on the cluster at `wasb:///example/data/fruits.txt`. 

To run the snippets in the cells, place the cursor in the cell below and press **SHIFT + ENTER**. You can also click the **Run Cell** button from the menu above.

In [None]:
val fruits = sc.textFile("wasb:///example/data/fruits.txt")
val yellowThings = sc.textFile("wasb:///example/data/yellowthings.txt")

For more examples on how to create RDDs see the following notebooks available with your Spark cluster:

* Read and write data from Azure Storage Blobs (WASB)
* Read and write data from Hive tables

----------

## What are RDD operations?
RDDs support two types of operations: transformations and actions.

* **Transformations** create a new dataset from an existing one. Transformations are lazy, meaning that no transformation is executed until you execute an action.
* **Actions** return a value to the driver program after running a computation on the dataset.

### RDD transformations
Following are examples of some of the common transformations available. For a detailed list, see [RDD Transformations](https://spark.apache.org/docs/1.6.2/programming-guide.html#transformations)

Run some transformations below to understand this better. Place the cursor in the cell and press **SHIFT + ENTER**.

In [None]:
/* map */
val fruitsReversed = fruits.map((fruit) => fruit.reverse)

In [None]:
/* filter */
val shortFruits = fruits.filter((fruit) =>  fruit.length <= 5)

In [None]:
/* flatMap */
val characters = fruits.flatMap((fruit) => fruit.toList)

In [None]:
/* union */
val fruitsAndYellowThings = fruits.union(yellowThings)

In [None]:
/* intersection */
val yellowFruits = fruits.intersection(yellowThings)

In [None]:
/* distinct */
val distinctFruitsAndYellowThings = fruitsAndYellowThings.distinct()

In [None]:
/* groupByKey */
val yellowThingsByFirstLetter = yellowThings.map((thing) => (thing(0), thing)).groupByKey()

In [None]:
/* reduceByKey */
val numFruitsByLength = fruits.map((fruit) => (fruit.length, 1)).reduceByKey((x, y) => x + y)

### RDD actions
Following are examples of some of the common actions available. For a detailed list, see [RDD Actions](https://spark.apache.org/docs/1.6.2/programming-guide.html#actions).

Run some transformations below to understand this better. Place the cursor in the cell and press **SHIFT + ENTER**.

In [None]:
/* collect */
val fruitsArray = fruits.collect()
val yellowThingsArray = yellowThings.collect()

In [None]:
/* count */
val numFruits = fruits.count()

In [None]:
/* take */
val first3Fruits = fruits.take(3)

In [None]:
/* reduce */
val letterSet = fruits.map((fruit) => fruit.toSet).reduce((x, y) => x ++ y)

> **IMPORTANT**: Another important RDD action is saving the output to a file. See the **Read and write data from Azure Storage Blobs (WASB)** notebook for more information.

----------

## What is a dataframe?

The `pyspark.sql` library provides an alternative API for manipulating structured datasets, known as "dataframes". (Dataframes are not a Spark-specific concept but `pyspark` provides its own dedicated dataframe library.) These are different from RDDs, but you can convert an RDD into a dataframe or vice-versa, if required.

See [Spark SQL and DataFrame Guide](https://spark.apache.org/docs/1.6.2/sql-programming-guide.html#dataframes) for more information.

### How do I make a dataframe?

You can load a dataframe directly from an input data source. See the following notebooks included with your Spark cluster for more information.

* Read and write data from Azure Storage Blobs (WASB)
* Read and write data from Hive tables

You can also create a dataframe from an RDD by specifying the schema of the dataframe as shown in the snippet below.

In [None]:
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};
import org.apache.spark.sql.Row;
val buildings = (sc.textFile("wasb:///HdiSamples/HdiSamples/SensorSampleData/building/building.csv")
                   .map((line) => line.split(","))
                   .filter((r) => r(0) != "BuildingID")
                   .map((r) => Row(r(0).toInt, r(1), r(2).toInt, r(3), r(4))))
val schema = StructType(List(StructField("BuildingID", IntegerType, true),
                             StructField("BuildingMgr", StringType, true),
                             StructField("BuildingAge", IntegerType, true),
                             StructField("HVACProduct", StringType, true),
                             StructField("Country", StringType, true)))
val df = sqlContext.createDataFrame(buildings, schema)

-------

## Spark SQL and dataframes

You can run SQL queries over dataframes once you register them as temporary tables within the SQL context. Run the snippet below to see an example.

In [None]:
/* Register the dataframe as a temporary table called HVAC */
df.registerTempTable("HVAC")

The HDInsight Spark kernel supports easy inline SQL queries. Simply type `%sql` followed by a SQL query to run a SQL query on a dataframe.

In [None]:
%%sql 
SELECT * FROM HVAC WHERE BuildingAge >= 10

In [None]:
%%sql 
SELECT BuildingID, Country FROM HVAC LIMIT 3