<img src="http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png" align=left>
<img src="http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png" align=left>

# **Spark Tutorial: Spark SQL and DataFrames**

## 0. Overview

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. 

There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

### 0.1 SQL

One use of Spark SQL is to execute SQL queries. Spark SQL can also be used to read data from an existing Hive installation. When running SQL from within another programming language the results will be returned as a DataFrame.

### 0.2 DataFrames

A DataFrame is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood. 

DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs. The DataFrame API is available in Scala, Java, Python, and R. 

In [None]:
# Run this cell to setup data path
import os

datapath = os.getcwd()
if datapath.find('databricks') != -1:
    ACCESS_KEY = "AKIAI2P5MSEO2JYXJVQQ"
    SECRET_KEY = "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"
    AWS_BUCKET_NAME = "nycdsabootcamp"
    datapath = "s3a://%s:%s@%s/" %(ACCESS_KEY, SECRET_KEY, AWS_BUCKET_NAME)

## 1. Starting Point: SparkSession

`SparkSession` is the entry point to programming Spark with the DataFrame API.

SparkSession in Spark 2.0 provides builtin support for Hive features including the ability to write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables. To use these features, you do not need to have an existing Hive setup.

In pyspark, `spark` is an existing `SparkSession`.

In [None]:
print type(spark)

If launching applications with spark-submit, then we need to create a basic SparkSession using `SparkSession.builder()`:

In [None]:
# Don't Run!

spark = SparkSession\
    .builder\
    .appName("PythonSQL")\
    .config("spark.some.config.option", "some-value")\
    .getOrCreate()

## 2. Working with DataFrames

`DataFrames` can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.

Here we will be using the file `./ml-100k/u.user` as an example to illustrate how we can create a DataFrame from an existing RDD. 

The column names are: `userId | age | gender | occupation | zipCode`.

In [None]:
filepath = os.path.join(datapath, "./ml-100k/u.user")

userRDD = sc.textFile(filepath)
print userRDD.take(2)

In [None]:
parts = userRDD.map(lambda l: l.split("|"))
print parts.take(2)

### 2.1. Interoperating with RDDs

Spark SQL supports two different methods for converting existing RDDs into DataFrames. 

* uses reflection to infer the schema of an RDD that contains specific types of objects. 

* through a programmatic interface that allows you to construct a schema and then apply it to an existing RDD.

**Inferring the Schema Using Reflection**

In [None]:
# spark is an existing SparkSession.
from pyspark.sql import Row

# Convert each line to a Row.
user = parts.map(lambda p: Row(userId=int(p[0]), 
                               age=int(p[1]),
                               gender=p[2],
                               occupation=p[3],
                               zipCode=p[4]))

# Infer the schema.
userDF = spark.createDataFrame(user)

In [None]:
# Once a DataFrame has been created, we can check it's schema.
userDF.printSchema()

# Instead of using take(), a better way is to use show() to display the first few rows.
userDF.show(5)

# DataFrame.dgtypes returns a list of tuples that contains the datatype of each column object.
print userDF.dtypes

**Programmatically Specifying the Schema**

In [None]:
from pyspark.sql.types import *

userSchema = StructType([StructField("userId", IntegerType(), True),
                         StructField("age", IntegerType(), True),
                         StructField("gender", StringType(), True),
                         StructField("occupation", StringType(), True),
                         StructField("zipCode", StringType(), True)]) # Note: some zipCodes are strings

In [None]:
typedPart = parts.map(lambda p: (int(p[0]), int(p[1]), p[2], p[3], p[4]))
userDFb = spark.createDataFrame(typedPart, userSchema)

In [None]:
userDFb.printSchema()
userDFb.show(5)

### Exercise 1

The u.data contains 100000 ratings by 943 users on 1682 items:

* path="./ml-100k/u.data"
* column name: userId | itemId | rating | timestamp
* sample row: u'196\t242\t3\t881250949'
  
1.1 Create a DataFrame called `ratingDF` with the `u.data` file from the `path` above.

1.2 Show schema and the first 5 rows of `ratingDF`.

In [None]:
# Your code goes here

ratingPath = os.path.join(datapath, "./ml-100k/u.data")

# 1.1
ratingSchema = StructType([StructField("userId", IntegerType(), True),
                           StructField("movieId", IntegerType(), True),
                           StructField("rating", IntegerType(), True),
                           StructField("timestamp", IntegerType(), True)])

ratingDF = spark.read.csv(path=ratingPath, sep=u"\t", schema=ratingSchema)

# 1.2 
ratingDF.printSchema()
ratingDF.show(5)

### 2.2 DataFrames Operations

Here we will perform some basic DataFrames operations in pyspark. For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

`columns` returns all column names as a list.

In [None]:
userDF.columns

`describe(*cols)`
shows statistics for numeric columns. This include count, mean, stddev, min, and max.

In [None]:
userDF.describe().show()

**Selection**

select(*cols)
projects a set of expressions and returns a new DataFrame.

drop(col)
returns a new DataFrame that drops the specified column.

In [None]:
# cols can be column names or column objects
# in the following example "userId" is column name
# userDF.gender and userDF["occupation"] are both column objects
userDF.select("userId", userDF.gender, userDF["age"]).show(5)

In [None]:
userDF.drop("zipCode").show(5)

In [None]:
# When using column object, we can also apply operations on it very easily.

# cast(dataType) convert the column into type dataType.
# between(lowerBound, upperBound) returns true if the value of this 
# expression is between the given columns.

userDF.select(userDF["userId"].cast("string"), userDF["age"].between(20, 40)).show(5)

Although there's no difference to access a DataFrame’s columns either by attribute (df.age) or by indexing (df['age']), users are highly encouraged to use the latter form, which is future proof and won’t break with column names that are also attributes on the DataFrame class.

**Filter**

filter(condition)
filters rows using the given condition.

In [None]:
# using a Column of types.BooleanType as condition
userDF.filter(userDF["occupation"] == "programmer").show(5)

In [None]:
# using a string of SQL expression
userDF.filter("age between 20 and 40").show(5)

**Grouping and Aggregation**

`groupBy(*cols)`
groups the DataFrame using the specified columns, so we can run aggregation on them.

The available aggregation functions include:
* `avg(*cols)`
* `count()`
* `max(*cols)`
* `min(*cols)`
* `sum(*cols)`

In [None]:
userDF.groupBy(userDF["occupation"]).avg("age").show(10)

We can also use `agg(*exprs)` to compute multiple aggregates and returns the result as a DataFrame. `exprs` can be a dict mapping from column name (string) to aggregate functions (string),

The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.

In [None]:
userDF.groupBy("gender").agg({"age":"avg", "*":"count"}).show()

### Exercise 2

2.1 Find all the movies that have average ratings greater than 4.0.

2.2 Find all the movies that have average ratings greater than 4.0 and have recieved more than 400 reviews.

In [None]:
# Your code goes here

# 2.1
ratingDF.groupBy("movieId")\
.avg("rating").filter("avg(rating) > 4.0")\
.show()

# 2.2
ratingDF.groupBy("movieId")\
.agg({"rating":"avg", "*":"count"})\
.filter("avg(rating) > 4.0 AND count(1) > 400")\
.show()

### 2.3 Converting Column Format

Each column in a DataFrame has it own data format. Some time you need to convert column format. In the `ratingDF` DataFrame the timestamp column uses Unix time which is not human readable.

`from_unixtime` converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format.

In [None]:
from pyspark.sql.functions import from_unixtime
ratingDF = ratingDF.withColumn('timestamp', from_unixtime(ratingDF.timestamp))

In [None]:
ratingDF.printSchema()
ratingDF.show(3)

We can further conver the string format into TimestampType by applying cast() to the column:

`cast(dataType)`: Convert the column into type `dataType`.

In [None]:
ratingDF = ratingDF.withColumn('timestamp', ratingDF.timestamp.cast(TimestampType()))

In [None]:
ratingDF.printSchema()
ratingDF.show(3)

### 2.4 User Defined Functions

We can define our own function and apply it to a column in DataFrame. The following code creates a new column called "ageGroup" by applying the function `ageToGroup` to the `age` column.

In [None]:
# define UDF
def ageToGroup(num):
    if num < 40:
        return "young"
    elif num > 64:
        return "old"
    else:
        return "middle"
    
from pyspark.sql.functions import udf
# Create a Column expression representing a UDF.
udfAgeGroup=udf(ageToGroup, StringType())

# apply UDF to age column
userDF.withColumn("ageGroup", udfAgeGroup("age")).show(10)

### Exercise 3

Assume we want to evaluate the movies based on both the average rating and the review counts using the expression below:

$\exp{avg(rating)}\times \log{count(1)}$ 

Create a DataFrame that includes average rating, review count and score calculated based on the expression above for each movie. You may want to define a UDF and use your UDF to calculate the scores.

What are the top 10 movies based on this score? You can use `DataFrame.sort(*cols, ascending)` to sort the DataFrame.

In [None]:
# Your code goes here

import math

def scoring(rating, cnt):
    score = math.exp(rating) * math.log(cnt)
    return score
    
# Create a Column expression representing a UDF.
udfScoring=udf(scoring, FloatType())

ratingAvg = ratingDF.groupBy("movieId")\
    .agg({"rating":"avg", "*":"count"})\
    .withColumnRenamed("avg(rating)", "avgRating")\
    .withColumnRenamed("count(1)", "reviewCnt")

# apply UDF to avgRating and reviewCnt
ratingAvg\
.withColumn("score", udfScoring("avgRating", "reviewCnt"))\
.sort('score', ascending=False)\
.show(10)

### 2.5 DataFrame NA Functions

`Spark SQL` also provide simple functions for working with missing data in DataFrame.

Let's first create a small DataFrame that contains some `na` fields

In [None]:
df = userDF.take(10)
df.append(Row(age=None, gender=u'F', occupation=None, userId=11, zipCode=u'30329'))
userDFna = spark.createDataFrame(df)
userDFna.show()

`drop(how='any')` returns a new DataFrame omitting rows with null values. Parameter how can be ‘any’ or ‘all’.

In [None]:
userDFna.na.drop().show()

`fill(value)` replaces null values. value can be a single value or a python dict.

In [None]:
# A single value will fill the null values in the columns of the same type
userDFna.na.fill(40).show()

In [None]:
userDFna.na.fill({'age': 30, 'occupation': u'other'}).show()

## 3 Using HiveQL in Spark SQL

`SparkSession` in Spark 2.0 provides builtin support for Hive features including the ability to 
* write queries using HiveQL, 
* access to Hive UDFs, 
* and to read data from Hive tables.

`SparkSession.sql(sqlQuery)` returns a DataFrame representing the result of the given query.

In [None]:
# This query will return all the available tables, 
# including views as well as tables in Hive metastore.
spark.sql("SHOW tables").show()

### 3.1 Creating Temporary Tables (Views)

To use a DataFrame as a temporary Hive table (view), we can use `createOrReplaceTempView(name)`:

In [None]:
# register DataFrame userDF as user
userDF.createOrReplaceTempView("userView")

spark.sql("SHOW tables").show()
spark.sql("SELECT * FROM userView WHERE occupation == 'programmer'").show(10)

### 3.2 Saving to Persistent Tables

`DataFrames` can also be saved as persistent tables into Hive metastore using the saveAsTable command. If no Hive deployment exists then Spark will create a default local Hive metastore for you.

`saveAsTable(name, mode=None)` saves the content of the DataFrame as the specified table. `mode` can be one of `append`, `overwrite`, `error`, `ignore` (default: error)

In [None]:
# When executing savdAsTable:
# a folder with the name `spark-warehouse` will be created in the same folder
# under `spark-warehouse` there will be a subfolder with the name `user`.
userDF.write.saveAsTable("user", mode="overwrite")
spark.sql("SHOW tables").show()

### Exercise 4

4.1 Register the ratingDF DataFrame you created in Exercise 1 as a View

4.2 Find the top 20 users's info based on the total number of ratings they gave using SQL query. 

In [None]:
# Your code goes here

# 4.1
ratingDF.createOrReplaceTempView("ratingView")
spark.sql("SHOW tables")

# 4.2
spark.sql("""
SELECT userView.*, userCount.cnt
FROM userView JOIN 
    (SELECT userId, COUNT(*) AS cnt
     FROM ratingView 
     GROUP BY userId) AS userCount
ON userView.userId == userCount.userId
ORDER BY cnt DESC
LIMIT 20
""").show()

### 3.3 Using User Defined Functions

To use an UDF in SQL statements, we need to register the UDF with `registerFunction(name, f, returnType=StringType)`. For example:

In [None]:
sqlContext.registerFunction("udfAgeGroup", ageToGroup, StringType())
spark.sql("SELECT *, udfAgeGroup(age) AS ageGroup FROM user").show(10)

### Exercise 5

Redo Exercise 3 with UDF in SQL.

In [None]:
# Your code goes here

import math

def scoring(rating, cnt):
    score = math.exp(rating) * math.log(cnt)
    return score
    
udfScoring=udf(scoring, FloatType())

sqlContext.registerFunction("udfScoring", scoring, FloatType())
spark.sql("""
SELECT *, udfScoring(avgRating, reviewCnt) AS score FROM 
    (SELECT movieId, AVG(rating) AS avgRating, COUNT(1) AS reviewCnt
     FROM ratingView
     GROUP BY movieId)
ORDER BY score DESC
LIMIT 10
""").show()