# Spark SQL and DataFrames

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.

## Architecture of Spark SQL

![Spark SQL Architecture](sparksql_arch.png)

## Major components of SparkSQL
- Datasets: A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API.
- DataFrames: A DataFrame is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. 
- Catalyst Optimizer:  Catalyst optimizer enables several key features, such as query optimization and schema inference (from JSON data).

## Starting Point: SparkSession
SparkSession was introduced in Spark version 2. It is the entry point into all functionality in Spark. To create a basic SparkSession, just use SparkSession.builder:

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .enableHiveSupport() \
    .getOrCreate()

### How is SparkSession different from SparkContext we learned last time?
Prior to Spark 2.0, SparkContext was the entry point for Spark applications with RDDs. For every other APIs, different contexts were required - For SQL, SQL Context was required; For Streaming, Streaming Context was required; For Hive, Hive Context was required. Starting from Apache Spark 2.0, Spark Session is the new entry point for all these Spark applications.

## Creating DataFrames

With a SparkSession, applications can create DataFrames from an existing RDD, from a Hive table, or from Spark data sources such as json or csv files. As an example, the following creates a DataFrame based on the content of a JSON file:

In [2]:
# spark is an existing SparkSession
df = spark.read.json("people.json")
# Displays the content of the DataFrame in a tabular format to stdout
df.show()
# last time, we learned the method collect(), which also works here, but it will output a list of rows.
print(df.collect())
# you can also sqlContext to create a DataFrame
dfSQL = sqlContext.read.json("people.json")
dfSQL.show()

+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

[Row(age=22, name='John'), Row(age=36, name='Andrew'), Row(age=22, name='Clarke'), Row(age=42, name='Kevin'), Row(age=51, name='Richard')]
+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+



## DataFrame Operations

In [3]:
# spark, df are from the previous example
# Print the schema in a tree format
df.printSchema()

# Select only the "name" column
df.select("name").show()

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

# Select people older than 21
df.filter(df['age'] > 21).show()

# Count people by age
df.groupBy("age").count().show()

# we are doing dataframe transformations.
print(type(df.groupBy("age").count()))

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

+-------+
|   name|
+-------+
|   John|
| Andrew|
| Clarke|
|  Kevin|
|Richard|
+-------+

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|   John|       23|
| Andrew|       37|
| Clarke|       23|
|  Kevin|       43|
|Richard|       52|
+-------+---------+

+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

+---+-----+
|age|count|
+---+-----+
| 22|    2|
| 51|    1|
| 36|    1|
| 42|    1|
+---+-----+

<class 'pyspark.sql.dataframe.DataFrame'>


There are many functions defined for DataFrame, you can refer to https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame for the complete list of functions.

DataFrames also have a rich library of functions including string manipulation, date arithmetic, common math operations and more. The complete list is available in https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#module-pyspark.sql.functions. Below is the example for computing the mean of ages. 

In [4]:
from pyspark.sql import functions as F
df.agg(F.avg(df['age'])).collect()

[Row(avg(age)=34.6)]

## Running SQL Queries Programmatically

In [5]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+



### Global Temporary View
Temporary views in Spark SQL are session-scoped and will disappear if the session that creates it terminates. If you want to have a temporary view that is shared among all sessions and keep alive until the Spark application terminates, you can create a global temporary view. Global temporary view is tied to a system preserved database global_temp, and we must use the qualified name to refer it, e.g. SELECT * FROM global_temp.view1.

In [6]:
# Register the DataFrame as a global temporary view
df.createOrReplaceGlobalTempView("people")

# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+



## Interoperating with RDDs
Spark SQL supports two different methods for converting existing RDDs into DataFrames/Datasets. The first method uses reflection to infer the schema of an RDD that contains specific types of objects. This reflection-based approach leads to more concise code and works well when you already know the schema while writing your Spark application.

The second method for creating DataFrames/Datasets is through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD. While this method is more verbose, it allows you to construct Datasets when the columns and their types are not known until runtime.

### Inferring the Schema Using Reflection
Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.

In [7]:
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
print(parts.collect())
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
print(people.collect())

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.printSchema()
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
youths = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 30")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
print(youths.rdd.collect())
someNames = youths.rdd.map(lambda p: "Name: " + p['name']).collect()
for name in someNames:
    print(name)

[['John', '22'], ['Andrew', '36'], ['Clarke', '22'], ['Kevin', '42'], ['Richard', '51']]
[Row(age=22, name='John'), Row(age=36, name='Andrew'), Row(age=22, name='Clarke'), Row(age=42, name='Kevin'), Row(age=51, name='Richard')]
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

[Row(age=22, name='John'), Row(age=22, name='Clarke')]
Name: John
Name: Clarke


### Programmatically Specifying the Schema

When a dictionary of kwargs cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

1. Create an RDD of tuples or lists from the original RDD;
2. Create the schema represented by a StructType matching the structure of tuples or lists in the RDD created in the step 1.
3. Apply the schema to the RDD via createDataFrame method provided by SparkSession.
For example:

In [8]:
# Import data types
from pyspark.sql.types import *

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
print(fields)

schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
schemaPeople.printSchema()

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")
results.show()

# how to change "age" to integer? simple:
df = spark.sql("select name, int(age) as age from people")
df.printSchema()
df.show()

[StructField(name,StringType,true), StructField(age,StringType,true)]
root
 |-- name: string (nullable = true)
 |-- age: string (nullable = true)

+-------+
|   name|
+-------+
|   John|
| Andrew|
| Clarke|
|  Kevin|
|Richard|
+-------+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|   John| 22|
| Andrew| 36|
| Clarke| 22|
|  Kevin| 42|
|Richard| 51|
+-------+---+



## How to read CSV file?

### Generic Load Functions 
You can also manually specify the data source that will be used along with any extra options that you would like to pass to the data source. 

In [9]:
# if the CSV file has a header
df = spark.read.load("people.csv",format="csv", sep=",", inferSchema="true", header="true")
df.show()
df.printSchema()
# if the CSV file does not have a header
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)])
df = spark.read.load("people2.csv",format="csv", sep=",", header="false", schema=schema)
df.show()
df.printSchema()

+-------+---+
|   name|age|
+-------+---+
|   John| 22|
| Andrew| 36|
| Clarke| 22|
|  Kevin| 42|
|Richard| 51|
+-------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

+-------+---+
|   name|age|
+-------+---+
|   John| 22|
| Andrew| 36|
| Clarke| 22|
|  Kevin| 42|
|Richard| 51|
+-------+---+

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



In [10]:
# you can also use the generic load function to read other types of files. For example:
df = spark.read.load("people.json", format="json")
df.show()
df.printSchema()

+---+-------+
|age|   name|
+---+-------+
| 22|   John|
| 36| Andrew|
| 22| Clarke|
| 42|  Kevin|
| 51|Richard|
+---+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

