# import pandas as pd
from pandas import Series
from pandas import DataFrame
pd.options.display.mpl_style = 'default'

import matplotlib.pyplot as plt
%matplotlib inline  
import seaborn as sns
sns.set_context("talk")

## Spark SQL
Spark SQL is a Spark module for structured data processing
![IPython](Image/sparksql-log1.jpg "IPython")

### Agenda
- Spark SQL features
- Spark SQL flow dagram
- Spark SQL architecture
- Spark SQL libaries

- Spark SQL components
- SchemaRDD

- Spark SQL example - json file

- Spark SQL EXAMPLE--Oracle database

- Running SQL Queries Programmatically

- Spark SQL VS Hive


referecne:
https://spark.apache.org/sql/
https://spark.apache.org/docs/1.6.0/sql-programming-guide.html

### Analytics the Old Way
Put the data into an RDBMS

- Pros: High level query language, optimized execution engine

- Cons: Have to ETL the data in, some analysis is hard to do in SQL, doesn’t scale

### Analytics the New Way
Map/Reduce, etc
- Pros: Full featured programming language,
easy parallelism
- Cons: Difficult to do ad-hoc analysis,
optimizations are left up to the developer.

### SQL on HDFS
Put the data into a （not RDBMS） ***HDFS***
- Pros: High level query language, optimized execution engine
- Cons: (Have to ETL the data in,） some analysis is hard to do in SQL,（ doesn’t scale HDFS）

## Spark SQL features
![IPython](Image/sparksql-feature1.jpg "IPython")

![IPython](Image/sparksql-feature2.jpg "IPython")

![IPython](Image/sparksql-feature3.jpg "IPython")

Spark SQL architecture
![IPython](Image/sparksql-architecture1.jpg "IPython")

![IPython](Image/sparksql-flow-diagram2.jpg "IPython")

Spark SQL Libararies
![IPython](Image/sparksql-lib1.png "IPython")

Spark SQL data source API
![IPython](Image/sparksql-datasource-api1.png "IPython")

 Spark SQL data frame API
![IPython](Image/sparksql-dataframe-api1.jpg "IPython")

Spark SQL Interpreter and Optimizer
![IPython](Image/sparksql-interpreter1.jpg "IPython")

Spark SQL Service
![IPython](Image/sparksql-service1.png "IPython")

## Spark SQL components
![IPython](Image/sparksql-components1.png "IPython")

## SchemaRDD
Adding Schema to RDDs
![IPython](Image/sparksql-schema-rdd1.png "IPython")

## Spark SQL EXAMPLE--json file
1. 启动local / standalone cluster and pyspark shell
2. Initializing Spark - SparkContext object
3. Create a basic SQLContext
4. Creating DataFrames
5. DataFrame Operations


#### 1. 启动local model and pyspark shell

In [None]:
#一般以 Spark 的安装目录（/usr/local/spark）为当前路径
$ cd /usr/local/spark

#run pyspark shell as local mode
$ ./bin/pyspark
#不需要加任何参量

#### 启动standalone cluster and pyspark shell

In [None]:
#一般以 Spark 的安装目录（/usr/local/spark）为当前路径
$ cd /usr/local/spark

#start the cluster by script
$ ./sbin/start-all.sh 


#to launch an interactive Python shell，and you must define the master node 
# (do not foreget the master argument!!)
$ ./bin/pyspark  --master  spark://10.9.1.56:7077


# Stops both the master and the slaves as described above.
$ ./sbin/stop-all.sh 

#### 2. Initializing Spark - SparkContext object

In [None]:
# The first thing a Spark program must do is to create a SparkContext object, 
# which tells Spark how to access a cluster.

# To create a SparkContext you first need to build a SparkConf object that contains 
# information about your application.
conf = SparkConf().setAppName(appName).setMaster(master)

sc = SparkContext(conf=conf)

# master is a Spark, Mesos or YARN cluster URL, or a special “local” string to run in 
#local mode. 
#In practice, when running on a cluster, you will not want to hardcode master in the 
#program, but rather launch the application with spark-submit and receive it there

#python defalt for you

#### 3. create a basic SQLContext

In [None]:
#The entry point into all relational functionality in Spark is the SQLContext class, or one of its decedents
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


#### 4 Creating DataFrames

In [None]:
#The following creates a DataFrame based on the content of a JSON file:
df = sqlContext.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")

# Displays the content of the DataFrame to stdout
df.show()

## age  name
## null Michael
## 30   Andy
## 19   Justin

#### 5. DataFrame Operations


In [None]:
# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)

# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin

# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
## name    (age + 1)
## Michael null
## Andy    31
## Justin  20

# Select people older than 21
df.filter(df['age'] > 21).show()
## age name
## 30  Andy

# Count people by age
df.groupBy("age").count().show()
## age  count
## null 1
## 19   1
## 30   1

## Connecting to SQL Databases using JDBC
Spark SQL EXAMPLE--Oracle database

1. launch local / standalone cluster and pyspark shell
2. Initializing Spark - SparkContext object
3. Create a basic SQLContext
4. Set Spark environment
5. Using Spark SQL and Spark Shell (Pyspark /Scala)
5. Creating DataFrames
6. DataFrame Operations


In [None]:
 1. launch standalone cluster model and pyspark shell
#一般以 Spark 的安装目录（/usr/local/spark）为当前路径
$ cd /usr/local/spark
#start the cluster by script
$ ./sbin/start-all.sh 
#to launch an interactive Python shell，and you must define the master node 
# (do not foreget the master argument!!)
$ ./bin/pyspark  --master  spark://10.9.1.56:7077
# Stops both the master and the slaves as described above.
$ ./sbin/stop-all.sh 

2. Initializing Spark - SparkContext object
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)


3. create a basic SQLContext
#The entry point into all relational functionality in Spark is the SQLContext class, or one of its decedents
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


####  4.Set Spark environment
the JDBC driver for you particular database on the spark classpath.

Our next step will be providing the Spark environment with the classpath for the JDBC driver we’re going to use. 
I used Oracle 10g I had installed on that server – but we can use any driver and any system we like here:

In [None]:
export SPARK_CLASSPATH=C:\oracle\product\10.2.0\client_1\jdbc\lib\ojdbc4.jar
    
#### 5 .Using Spark SQL and Spark Shell (Pyspark /Scala)
$ ./bin/pyspark


# or combine the two steps
SPARK_CLASSPATH=C:\oracle\product\10.2.0\client_1\jdbc\lib\ojdbc4.jar ./bin/spark-shell

In [None]:
#### 6. create a data frame:
#for pyspark
df = sqlContext.read.format('jdbc').options(
                                url='jdbc:postgresql:dbserver', 
                                dbtable='schema.tablename').load()


#for scala
scala> val employees = sqlContext.load("jdbc", 
    Map("url" -> "jdbc:oracle:thin:zohar/zohar@//localhost:1521/single", 
    "dbtable" -> "hr.employees"))

val jdbcDF = sqlContext.read.format("jdbc").options(
  Map("url" -> "jdbc:postgresql:dbserver",
  "dbtable" -> "schema.tablename")).load()

In [None]:
#### 7.Dataframe operation

scala> employees.count()
scala> employees.printSchema
scala> employees.show
scala> employees.filter("EMPLOYEE_ID = 101").show

## Running SQL Queries Programmatically

In [None]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")

### RDDs into Relations (Python)

In [None]:
# Load a text file and convert each line to a dictionary.
lines = sc.textFile("examples/…/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p:{"name": p[0],"age": int(p[1])})

# Infer the schema, and register the SchemaRDD as a table
peopleTable = sqlCtx.inferSchema(people)
peopleTable.registerAsTable("people")

Spark SQL VS Hive
![IPython](Image/sparksql-vs-hive1.jpg "IPython")

Spark SQL VS Hive
![IPython](Image/sparksql-vs-hive2.jpg "IPython")