# DSE 230: Spark SQL

## Resources

---

Remember: when in doubt, read the documentation first. It's always helpful to search for the class that you're trying to work with, e.g. pyspark.sql.DataFrame.

Spark DataFrame Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html

PySpark API Documentation: https://spark.apache.org/docs/latest/api/python/index.html

Spark SQL Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html

Spark Streaming Guide: https://spark.apache.org/docs/latest/streaming-programming-guide.html

In [None]:
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Spark SQL

#### The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder:

In [None]:
conf = pyspark.SparkConf().setAll([('spark.master', 'local[*]'),
                                   ('spark.app.name', 'Python Spark SQL Demo')])
spark = SparkSession.builder.config(conf=conf).getOrCreate()

#### Create a DataFrame

In [None]:
# Create DataFrame based on contents of a JSON file
df = spark.read.json("file:/home/work/people.json")
df.show()

In [None]:
# Automatic schema inference
df.printSchema()

#### Manually specifying schema by casting

In [None]:
# Read from a text file
# Each line is read into the "value" column
df = spark.read.text("file:/home/work/people.txt")
df.show()

In [None]:
newDf = df.withColumn("name", split(col("value"), ",").getItem(0)) \
          .withColumn("age", split(col("value"), ",").getItem(1).cast("int"))
newDf.select("name", "age").show()

In [None]:
newDf.printSchema()

#### Running SQL queries programatically

The `sql` function on a `SparkSession` enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.

In [None]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

In [None]:
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

#### Infer Schema using Reflection
Spark SQL can convert an RDD of Row objects to a DataFrame, inferring the datatypes. Rows are constructed by passing a list of key/value pairs as kwargs to the Row class. The keys of this list define the column names of the table, and the types are inferred by sampling the whole dataset, similar to the inference that is performed on JSON files.

In [None]:
# Load a text file and convert each line to a Row.
# Lines is an RDD of rows
lines = spark.sparkContext.textFile("file:/home/work/people.txt") # Replace with spark.read.text
parts = lines.map(lambda l: l.split(","))

# Define a dictionary of kwargs to specify the schema
# Name is of type str(default) and age is of type int
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

In [None]:
# Infer the schema
schemaPeople = spark.createDataFrame(people)

# Register the DataFrame as a table.
schemaPeople.createOrReplaceTempView("people")

In [None]:
# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

In [None]:
# DataFrame.rdd returns the content as `pyspark.RDD` of `Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)

#### Programmatically Specifying the Schema


In [None]:
# Load a text file and convert each line to a Row.
# Lines is an RDD of rows
lines = spark.sparkContext.textFile("file:/home/work/people.txt")
parts = lines.map(lambda l: l.split(","))

# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

In [None]:
# Manually specify the schema
fields = [StructField("name", StringType(), True),
         StructField("age", IntegerType(), True)]
schema = StructType(fields)

In [None]:
fields

In [None]:
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

In [None]:
schemaPeople.printSchema()

In [None]:
# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()

In [None]:
spark.stop()