## Learning PySpark
### Chapter 4: DataFrames
This notebook contains sample code from Chapter 4 of [Learning PySpark]() focusing on PySpark and DataFrames.

### Initial code to run

In [1]:
import os

# Set JAVA_HOME to Java 17 (required for PySpark)
# This ensures the notebook kernel uses the correct Java version
java_home = "/opt/homebrew/opt/openjdk@17/libexec/openjdk.jdk/Contents/Home"
if os.path.exists(java_home):
    os.environ["JAVA_HOME"] = java_home
    os.environ["PATH"] = f"/opt/homebrew/opt/openjdk@17/bin:{os.environ.get('PATH', '')}"

from pyspark.sql import SparkSession
from pyspark import SparkConf

# Configure Spark to bind to localhost to avoid network binding issues
conf = SparkConf().setAppName("LearningPySpark") \
                  .set("spark.driver.host", "localhost") \
                  .set("spark.driver.bindAddress", "127.0.0.1")

# Create SparkSession and get SparkContext
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/09 09:50:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/09 09:50:39 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Generate your own DataFrame
Instead of accessing the file system, let's create a DataFrame by generating the data.  In this case, we'll first create the `stringRDD` RDD and then convert it into a DataFrame when we're reading `stringJSONRDD` using `spark.read.json`.

In [None]:
# Generate our own JSON data 
#   This way we don't have to access the file system yet.
# Three double quotes create multi-line string literals in Python. Here you can use single " quotes without escaping
stringJSONRDD = sc.parallelize(("""
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

In [None]:
stringJSONRDD.take(1)   # inspecting the RDD

                                                                                

[' \n  { "id": "123",\n    "name": "Katie",\n    "age": 19,\n    "eyeColor": "brown"\n  }']

In [4]:
# Create DataFrame
swimmersJSON = spark.read.json(stringJSONRDD)

In [None]:
swimmersJSON.show() # inspecting the DataFrame

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [8]:
# Create temporary table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

# purpose
# Registers the DataFrame as a temporary view in Spark's SQL Catalog
# Enables SQL queries on the DataFrame using spark.sql(...)
# Temporary -- exists only for the current Spark session

In [10]:
spark.sql("SELECT name, age FROM swimmersJSON WHERE age > 20").show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 22|
| Simone| 23|
+-------+---+



In [12]:
# DataFrame API
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [11]:
# SQL Query
spark.sql("select * from swimmersJSON").collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

In [14]:
%sql 
-- Query Data
select * from swimmersJSON

SyntaxError: invalid syntax (222894409.py, line 2)

#### Inferring the Schema Using Reflection
Note that Apache Spark is inferring the schema using reflection; i.e. it automaticlaly determines the schema of the data based on reviewing the JSON data.

In [13]:
# Print the schema
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



Notice that Spark was able to determine infer the schema (when reviewing the schema using `.printSchema`).

But what if we want to programmatically specify the schema?

#### Programmatically Specifying the Schema
In this case, let's specify the schema for a `CSV` text file.

In [15]:
from pyspark.sql.types import *

# Generate our own CSV data 
#   This way we don't have to access the file system yet.
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

# The schema is encoded in a string, using StructType we define the schema using various pyspark.sql.types
schemaString = "id name age eyeColor"
schema = StructType([
    StructField("id", LongType(), True),    
    StructField("name", StringType(), True),
    StructField("age", LongType(), True),
    StructField("eyeColor", StringType(), True)
])

# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)

# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")

In [16]:
# Print the schema
#   Notice that we have redefined id as Long (instead of String)
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [None]:
%sql 
-- Query the data
select * from swimmers

As you can see from above, we can programmatically apply the `schema` instead of allowing the Spark engine to infer the schema via reflection.

Additional Resources include:
* [PySpark API Reference](https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html)
* [Spark SQL, DataFrames, and Datasets Guide](https://spark.apache.org/docs/latest/sql-programming-guide.html#programmatically-specifying-the-schema): This is in reference to Programmatically Specifying the Schema using a `CSV` file.

####|| SparkSession

Notice that we're no longer using `sqlContext.read...` but instead `spark.read...`.  This is because as part of Spark 2.0, `HiveContext`, `SQLContext`, `StreamingContext`, `SparkContext` have been merged together into the Spark Session `spark`.
* Entry point for reading data
* Working with metadata
* Configuration
* Cluster resource management

For more information, please refer to [How to use SparkSession in Apache Spark 2.0](http://bit.ly/2br0Fr1) (http://bit.ly/2br0Fr1).

### Querying with SQL
With DataFrames, you can start writing your queries using `Spark SQL` - a SQL dialect that is compatible with the Hive Query Language (or HiveQL).

In [17]:
# Execute SQL Query and return the data
spark.sql("select * from swimmers").show()

+---+-------+---+--------+
| id|   name|age|eyeColor|
+---+-------+---+--------+
|123|  Katie| 19|   brown|
|234|Michael| 22|   green|
|345| Simone| 23|    blue|
+---+-------+---+--------+



Let's get the row count:

In [19]:
# Get count of rows in SQL
spark.sql("select count(*) from swimmers").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



Note, you can make use of `%sql` within the notebook cells of a Databricks notebook.

In [None]:
%sql 
-- Query all data
select * from swimmers

In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# Query id and age for swimmers with age = 22 via DataFrame API in another way
swimmers.select(swimmers.id, swimmers.age).filter(swimmers.age == 22).show()


In [None]:
# Query id and age for swimmers with age = 22 in SQL
spark.sql("select id, age from swimmers where age = 22").show()

In [None]:
%sql 
-- Query id and age for swimmers with age = 22
select id, age from swimmers where age = 22

In [20]:
# Query name and eye color for swimmers with eye color starting with the letter 'b'
spark.sql("select name, eyeColor from swimmers where eyeColor like 'b%'").show()

+------+--------+
|  name|eyeColor|
+------+--------+
| Katie|   brown|
|Simone|    blue|
+------+--------+



In [None]:
%sql 
-- Query name and eye color for swimmers with eye color starting with the letter 'b'
select name, eyeColor from swimmers where eyeColor like 'b%'

### Querying with the DataFrame API
With DataFrames, you can start writing your queries using the DataFrame API

In [None]:
# Show the values 
swimmers.show()

In [None]:
# Using Databricks `display` command to view the data easier
display(swimmers)

In [None]:
# Get count of rows
swimmers.count()

In [None]:
# Get the id, age where age = 22
swimmers.select("id", "age").filter("age = 22").show()

In [None]:
# Get the name, eyeColor where eyeColor like 'b%'
swimmers.select("name", "eyeColor").filter("eyeColor like 'b%'").show()

## On-Time Flight Performance
Query flight departure delays by State and City by joining the departure delay and join to the airport codes (to identify state and city).

### DataFrame Queries
Let's run a flight performance using DataFrames; let's first build the DataFrames from the source datasets.

In [21]:
# Set File Paths
flightPerfFilePath = "/Users/byeongminko/Documents/repos/Learning-PySpark/Chapter02/flight-data/departuredelays.csv"
airportsFilePath = "/Users/byeongminko/Documents/repos/Learning-PySpark/Chapter02/flight-data/airport-codes-na.txt"

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')    # sets the column delimiter to tab
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset 
flightPerf.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

In [None]:
# Query Sum of Flight Delays by City and Origin Code (for Washington State)
spark.sql("""
    select
        a.City,
        f.origin,
        sum(f.delay) as Delays
    from
        FlightPerformance f
    join airports a -- inner join 
        on a.IATA = f.origin
    where a.State = 'WA'
    group by a.City, f.origin
    order by sum(f.delay) desc
""").show()

+-------+------+--------+
|   City|origin|  Delays|
+-------+------+--------+
|Seattle|   SEA|159086.0|
|Spokane|   GEG| 12404.0|
|  Pasco|   PSC|   949.0|
+-------+------+--------+



In [40]:
%sql
-- Query Sum of Flight Delays by City and Origin Code (for Washington State)
select a.City, f.origin, sum(f.delay) as Delays
  from FlightPerformance f
    join airports a
      on a.IATA = f.origin
 where a.State = 'WA'
 group by a.City, f.origin
 order by sum(f.delay) desc
 

In [41]:
# Query Sum of Flight Delays by State (for the US)
spark.sql("select a.State, sum(f.delay) as Delays from FlightPerformance f join airports a on a.IATA = f.origin where a.Country = 'USA' group by a.State ").show()

In [42]:
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
  from FlightPerformance f
    join airports a
      on a.IATA = f.origin
 where a.Country = 'USA'
 group by a.State 

In [43]:
%sql
-- Query Sum of Flight Delays by State (for the US)
select a.State, sum(f.delay) as Delays
  from FlightPerformance f
    join airports a
      on a.IATA = f.origin
 where a.Country = 'USA'
 group by a.State 

For more information, please refer to:
* [Spark SQL, DataFrames and Datasets Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#sql)
* [PySpark SQL Module: DataFrame](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame)
* [PySpark SQL Functions Module](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions)