<a rel="license" href="http://creativecommons.org/licenses/by-nc-nd/4.0/"><img alt="Creative Commons License" style="border-width:0" src="https://i.creativecommons.org/l/by-nc-nd/4.0/88x31.png" /></a><br />This work is licensed under a <a rel="license" href="http://creativecommons.org/licenses/by-nc-nd/4.0/">Creative Commons Attribution-NonCommercial-NoDerivatives 4.0 International License</a>.

# SparkSession - a new entry point

In Spark 2.0, spark introduced SparkSession

### Creating a SparkSession

A SparkSession can be created using a builder pattern. The builder will automatically reuse an existing SparkContext if one exists; and create a SparkContext if it does not exist. Configuration options set in the builder are automatically propagated over to Spark and Hadoop during I/O. 

A SparkSession can be used create DataFrame, register DataFrame as tables, execute SQL over tables, cache tables, and read parquet files.

In [3]:
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder\
  .master("local")\
  .appName("my-spark-app")\
  .config("spark.some.config.option", "config-value")\
  .getOrCreate()

In [4]:
sparkSession

In Databricks notebooks and Spark REPL, the SparkSession has been created automatically and assigned to variable "spark".

In [6]:
spark

### Unified entry point for reading data

SparkSession is the entry point for reading data, similar to the old SQLContext.read.

In [8]:
diamonds = spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")

In [9]:
display(diamonds)

### Dataframes

The primary abstraction in Spark
  - Immutable once constructed
  - Track lineage information to efficiently recompute lost data 
  - Enable operations on collection of elements in parallel
  
#### Creating Dataframes  
DataFrames can be constructed:
  - by parallelizing existing Python collections (lists) 
  - by transforming an existing Spark or pandas DFs
  - from files in HDFS or any other storage system
  
Each row of a DataFrame is a Row object
The fields in a Row can be accessed like attributes 


#### Operations:
Two types of operations: transformations and actions
Transformations are lazy (not computed immediately) Transformed DF is executed when action runs on it Persist (cache) DFs in memory or disk


See [Programming Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html)

In [11]:
# import pyspark class Row from module sql
from pyspark.sql import *

# Create Example Data - Departments and Employees

# Create the Departments
department1 = Row(id='123456', name='Computer Science')
department2 = Row(id='789012', name='Mechanical Engineering')
department3 = Row(id='345678', name='Theater and Drama')
department4 = Row(id='901234', name='Indoor Recreation')

# Create the Employees
Employee = Row("firstName", "lastName", "email", "salary")
employee1 = Employee('michael', 'armbrust', 'no-reply@berkeley.edu', 100000)
employee2 = Employee('xiangrui', 'meng', 'no-reply@stanford.edu', 120000)
employee3 = Employee('matei', None, 'no-reply@waterloo.edu', 140000)
employee4 = Employee(None, 'wendell', 'no-reply@berkeley.edu', 160000)

# Create the DepartmentWithEmployees instances from Departments and Employees
departmentWithEmployees1 = Row(department=department1, employees=[employee1, employee2])
departmentWithEmployees2 = Row(department=department2, employees=[employee3, employee4])
departmentWithEmployees3 = Row(department=department3, employees=[employee1, employee4])
departmentWithEmployees4 = Row(department=department4, employees=[employee2, employee3])

print department1
print employee2
print departmentWithEmployees1.employees[0].email

Create dataframe from a list of Rows

In [13]:
departmentsWithEmployeesSeq1 = [departmentWithEmployees1, departmentWithEmployees2]
df1 = spark.createDataFrame(departmentsWithEmployeesSeq1)
display(df1)

Create a second DataFrame from a list of rows.

In [15]:
departmentsWithEmployeesSeq2 = [departmentWithEmployees3, departmentWithEmployees4]
df2 = spark.createDataFrame(departmentsWithEmployeesSeq2)

display(df2)

### Working with Dataframes

In [17]:
unionDF = df1.unionAll(df2)
display(unionDF)

Write the `Unioned` DataFrame to a Parquet file.

In [19]:
# Remove the file if it exists
dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)
unionDF.write.parquet("/tmp/databricks-df-example.parquet")

Read from parquet file

In [21]:
parquetDF = sqlContext.read.parquet("/tmp/databricks-df-example.parquet")
display(parquetDF)

[Explode](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/sql/functions.html#explode) the employees column

In [23]:
eDF = spark.createDataFrame([Row(a=1, intlist=[1,2,3], mapfield={"a": "b"})])
display(eDF)

In [24]:
from pyspark.sql.functions import explode

eDF.select(explode(eDF.intlist).alias("anInt")).collect()

In [25]:
#Read from parquet file

df = parquetDF.select(explode("employees").alias("e"))
explodeDF = df.selectExpr("e.firstName", "e.lastName", "e.email", "e.salary")

display(explodeDF)

Use `filter()` to return only the rows that match the given predicate.

In [27]:
filterDF = explodeDF.filter(explodeDF.firstName == "xiangrui").sort(explodeDF.lastName)
display(filterDF)

In [28]:
from pyspark.sql.functions import col, asc

# Use `|` instead of `or`
filterDF = explodeDF.filter((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(filterDF)

The `where()` clause is equivalent to filter().

In [30]:
whereDF = explodeDF.where((col("firstName") == "xiangrui") | (col("firstName") == "michael")).sort(asc("lastName"))
display(whereDF)

Replace `null` values with `--` using DataFrame Na functions.

In [32]:
nonNullDF = explodeDF.fillna("--")
display(nonNullDF)

Retrieve only rows with missing `firstName` or `lastName`.

In [34]:
filterNonNullDF = explodeDF.filter(col("firstName").isNull() | col("lastName").isNull()).sort("email")
display(filterNonNullDF)

Example aggregations using `agg()` and `countDistinct()`.

In [36]:
from pyspark.sql.functions import countDistinct

countDistinctDF = explodeDF.select("firstName", "lastName")\
  .groupBy("firstName", "lastName")\
  .agg(countDistinct("firstName"))

display(countDistinctDF)

Examine the DataFrame  Physical Plans

In [38]:
countDistinctDF.explain()

sum up all the salaries

In [40]:
salarySumDF = explodeDF.agg({"salary" : "sum"})
display(salarySumDF)

In [41]:
type(explodeDF.salary)


In [42]:
# Print the summary statistics for the salaries.

explodeDF.describe("salary").show()
