### DataFrames

It is an immutable distributed collection of data that is organized into
named columns analogous to a table in a relational database. A Spark DataFrame is a similar
concept like Python Pandas DataFrame or R Dataframe, in that it allows users to easily work with structured data.

The various Spark contexts: HiveContext, SQLContext, StreamingContext, and SparkContext
have merged together in SparkSession.


In [1]:
#import pyspark as psc
import pyspark.sql.context as sqlContext
from   pyspark import SparkContext
sc = SparkContext()

In [2]:
# The entry point into all functionality in Spark is the SparkSession class. 
# To create a basic SparkSession, just use SparkSession.builder:
#
#sc = SparkContext()
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

### Creating DataFrames

Instead of accessing the file system, let's create a DataFrame by generating the data. In this case, we'll first create the stringRDD RDD and then convert it into a DataFrame when we're reading stringJSONRDD using spark.read.json.

In [3]:
# Generate our own JSON data 
#   This way we don't have to access the file system yet.
#  The code creates an RDD comprised of swimmers 
# (their ID, name, age, and eye color) in JSON format.

stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{sc = SparkContext()
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

It is important to note that parallelize , map , and mapPartitions are all RDD
transformations. Wrapped within the DataFrame operation, spark.read.json (in
this case), are not only the RDD transformations, but also the action which converts
the RDD into a DataFrame.

In [4]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

Note that creating the temporary table is a DataFrame transformation and not
executed until a DataFrame action is executed

In [5]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

Running the .show() method will default to present the first 10 rows.

In [6]:
# DataFrame API
swimmersJSON.show()

+--------------------+----+--------+----+-------+
|     _corrupt_record| age|eyeColor|  id|   name|
+--------------------+----+--------+----+-------+
|                null|  19|   brown| 123|  Katie|
|                null|  22|   green| 234|Michael|
|{sc = SparkContex...|null|    null|null|   null|
+--------------------+----+--------+----+-------+



### DataFrame queries

Now that you have created the swimmersJSON DataFrame, we will be able to run
the DataFrame API, as well as SQL queries against it. Let's start with a simple query
showing all the rows within the DataFrame.

In [7]:
# SQL Query
# the .collect() method, which returns all the records as a list of Row objects
# you can use either the collect() or show() method for both DataFrames and SQL queries.
# Just make sure that if you use .collect() , this is for a small DataFrame, 
# since it will return all of the rows in the DataFrame and move them back from the 
# executors to the driver.
# You can instead use take(<n>) or show(<n>) ,
# which allow you to limit the number of rows returned by specifying <n>
spark.sql("select * from swimmersJSON").collect()

[Row(_corrupt_record=None, age=19, eyeColor='brown', id='123', name='Katie'),
 Row(_corrupt_record=None, age=22, eyeColor='green', id='234', name='Michael'),
 Row(_corrupt_record='{sc = SparkContext()\n    "id": "345",\n    "name": "Simone",\n    "age": 23,\n    "eyeColor": "blue"\n  }', age=None, eyeColor=None, id=None, name=None)]

### Inferring the Schema Using Reflection

Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.

In [8]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Notice that Spark was able to determine infer the schema (when reviewing the schema using .printSchema).

But what if we want to programmatically specify the schema?

### Programmatically Specifying the Schema

In this case, let's specify the schema for a CSV text file.

In [9]:
from pyspark.sql.types import *

# Generate our own CSV data 
#   This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [10]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



As you can see from above, we can programmatically apply the schema instead of allowing the Spark engine to infer the schema via reflection.

### Querying with SQL

With DataFrames, you can start writing your queries using Spark SQL - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).

In [11]:
# Execute SQL Query and return the data

spark.sql("select * from swimmers").show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



Let's get the row count:

In [12]:
# Get count of rows in SQL
spark.sql("select count(1) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [13]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [14]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [15]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [16]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



### Querying with the DataFrame API

With DataFrames, you can start writing your queries using the DataFrame API

In [17]:
# Show the values 
swimmers.show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



In [18]:
# Using Databricks `display` command to view the data easier
display(swimmers)

DataFrame[id: bigint, name: string, age: bigint, eyeColor: string]

In [19]:
# Get count of rows
swimmers.count()

3

In [20]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

+---+---+
| id|age|
+---+---+
|234| 22|
+---+---+



In [21]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+

