<img src="uva_seal.png">  

## Spark SQL and DataFrames

### University of Virginia
### DS 7200: Distributed Computing
### Last Updated: September 6, 2023

---  

### Sources 

Learning Spark, Chapter 9: Spark SQL

https://spark.apache.org/docs/latest/sql-programming-guide.html

https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning

Demonstration of several useful DataFrame operations:  
https://docs.databricks.com/spark/latest/dataframes-datasets/introduction-to-dataframes-python.html

### OBJECTIVES
- Introduction to Spark SQL, the interface for working with structured and semistructured data
- Introduce DataFrames and show basic functionality
- Explain the benefits of the Parquet format

### CONCEPTS AND FUNCTIONS
- Schema
- SQL
- Dataset and DataFrame
- Partition
- Parquet files

---  

### 1. Overview

Two important ways of working with big data in Spark: 

- through Spark SQL
- using DataFrames

They also interoperate

### 2. Data Schema in Spark

The schema in Spark defines the data structure. For each field, a 3-tuple is specified: `(column name, data type, nullable)`  

---  

**Example of schema with two Fields *author* and *pages*, which cannot contain null values**
```
schema = StructType([StructField("author", StringType(), False), StructField("pages", IntegerType(), False)])
```
---  

It is possible to allow Spark to infer the schema of your data, but it's preferable to feed it the schema:

- avoids having Spark launch a separate job to read a large fraction of the data to infer schema
- early detection of errors if the data doesn't match the schema
- Spark inference may be incorrect. For example, it may think all numerical data are strings.

### This schema is different from database schema

A database *schema* is the structure that represents the logical view of the entire database.  
It defines how data is organized and how relations among them are associated.  
This is implemented through the use of tables, views, and integrity constraints.

 <img src="./db_schema.png" width=500> 

### 3. Common Spark Data Types

- integer types, all `int` in python:
  - ShortType
  - IntegerType
  - LongType
  - FloatType
  - DoubleType
- StringType
- BooleanType

### 4. SQL in Ten Seconds (tongue in cheek)


SQL is a structured query language used to communicate with relational databases.  
Commands include CREATE, SELECT, UPDATE, ALTER, INSERT INTO, DROP, DELETE.  
This course will focus on SELECT.

### 5. Spark SQL Capabilities:

- load data from various structured formats including JSON, Hive, Parquet  
- query data using SQL inside Spark or from external tools that connect to Spark (e.g., `Tableau`) 
- Spark SQL integrates between SQL and Python/Java/Scala/R code. Can do things like join RDDs and SQL tables.

### 6. Note on Spark SQL Development

 Spark SQL has been heavy development area in new releases.  
 As the module involves massive amounts of data, optimizing operations is valuable.  
 We will discuss the optimizer behind Spark SQL.
 
---

### 7. Dataset and DataFrame

- A Dataset is a distributed collection of data   
- A Dataset can be constructed from JVM objects and then manipulated using functional transformations (`map()`, `flatMap()`, `filter()`, etc.)  
- A DataFrame is a Dataset organized into named columns   

In practice, you will be thinking in terms of `DataFrames`, and not `Datasets`.  For users familiar with dataframes from R and Python, they are similar, yet with operations distinct to Spark.  As an example, adding a new column to a DataFrame is executed using `withColumn()`.  This may feel more formal compared to R and Python.  

Additionally - when compared to R and Python - **the Spark DataFrame uses richer optimizations under the hood.  
The structure makes use of distributed computing, in the same manner as RDDs.**  

DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs.  

The DataFrame API is available in Scala, Java, Python, and R. 

### 8. DataFrames vs RDDs  

Now that we have two powerful objects that parallelize data, we have more flexibility, but this can lead to confusion.  When is it better to use DataFrames, and when is it better to use RDDs?  

Here are some recommendations:   

- In general, most work can be done with DataFrames  

- Use DataFrames to use high-level expressions, to perform SQL queries to explore the data, and to gain columnar access.  For example, if you are thinking about the data by field names, you probably want the data in a DataFrame.

- For machine learning and building predictive models, DataFrames are recommended. You will be exploring the data by column, and building features from the columns of data.  
- RDDs can be useful to perform low-level transformations and actions on unstructured data. For example, filtering strings and performing other simple transformations on text is best done with RDDs.  In these cases, the analyst doesn't care about field names, and there is no need to impose schema on the data.  

- Use RDDs when you want to manipulate the data with functional programming constructs rather than domain specific expressions.

### 9. Creating a DataFrame

There are multiple ways to do this:
- use a function such as `read.csv()` to read data from files into DataFrames (most common)
- pass data to `createDataFrame()` with schema
- conversion from RDD using `toDF()`

**Example 1: Create DataFrame from RDD using `toDF()`**

In [None]:
---  
```
# import modules 
from pyspark.sql import Row

# Map the RDD to a DF

df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()
```
---  

**Example 2: Create DataFrame by passing data and schema to `createDataFrame()`**

In [1]:
# import context manager: SparkSession
from pyspark.sql import SparkSession

# import data types
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# set up the session
spark = SparkSession.builder.getOrCreate()

# create some data; list of tuples
data = [
    (0, "ChatGPT is all the rage"),
    (1, "Google released BARD to compete"),
    (2, "What does AWS think about this?")
]

# define schema; each field holds (name, data type, nullable)
# for large number of fields, best to automate schema construction
schema = StructType([StructField('id', IntegerType(), False), 
                     StructField('sentence', StringType(), False)])

# create df by passing data, schema
sentenceDataFrame = spark.createDataFrame(data, schema)

# print first few records
sentenceDataFrame.show(3, False)

# print data type
print(type(sentenceDataFrame))

24/09/12 23:04:20 WARN Utils: Your hostname, Eileanors-Laptop.local resolves to a loopback address: 127.0.0.1; using 192.168.0.110 instead (on interface en0)
24/09/12 23:04:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/12 23:04:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+---+-------------------------------+
|id |sentence                       |
+---+-------------------------------+
|0  |ChatGPT is all the rage        |
|1  |Google released BARD to compete|
|2  |What does AWS think about this?|
+---+-------------------------------+

<class 'pyspark.sql.dataframe.DataFrame'>


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 65438)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socketserver.py", line 318, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socketserver.py", line 349, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socketserver.py", line 362, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socketserver.py", line 761, in __init__
    self.handle()
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/Library/Frameworks/Pyt

**Example 3: Create a DataFrame from some JSON data with spark.read()**  
(For an example of JSON data see: http://json.org/example.html)


In [2]:
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Read data in json format
df = spark.read.json("people.json")

# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



**Creating an RDD from a DataFrame**

This is very simple: `df.rdd`

Here we convert our df containing sentences:

In [3]:
sentence_rdd = sentenceDataFrame.rdd
print(sentence_rdd.take(2))
print(type(sentence_rdd))

[Row(id=0, sentence='ChatGPT is all the rage'), Row(id=1, sentence='Google released BARD to compete')]
<class 'pyspark.rdd.RDD'>


### 10. Some Useful DataFrames Operations

Next we explore subsetting, filtering, and aggregation among others.

In [4]:
# Read data in json format
df = spark.read.json("people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### Selecting

There are three ways to select columns and we show them all. Find your favorite.

- using bracket operator
- using `col()` method
- using dot operator (my favorite)

We see them below

### Filtering

Keep records where age > 21

In [5]:
# bracket operator
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [6]:
# dot operator
df.filter(df.age > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [7]:
# column operator
from pyspark.sql.functions import col

df.filter(col('age') > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



Keep records subject to filters on name, then sort

In [8]:
from pyspark.sql.functions import asc

# alternatively using df.name instead of col("name")
df.filter((df.name == "Andy") | (df.name == "Michael")).sort(asc("name")).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|NULL|Michael|
+----+-------+



Fetch records with age *null*

In [9]:
df.filter(col("age").isNull()).show() 

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
+----+-------+



Fetch records with age *not null*

In [10]:
df.filter(col("age").isNotNull()).show() 

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



### where() is equivalent to filter()

In [11]:
df.where((df.name == "Andy") | (df.name == "Michael")).sort(asc("name")).show()

+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|NULL|Michael|
+----+-------+



### Impute missing with 0 (just for illustration; not a great idea for this data)

In [12]:
df.fillna(0).show()

+---+-------+
|age|   name|
+---+-------+
|  0|Michael|
| 30|   Andy|
| 19| Justin|
+---+-------+



### Summarize the age field

In [13]:
df.describe("age").show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|                 2|
|   mean|              24.5|
| stddev|7.7781745930520225|
|    min|                19|
|    max|                30|
+-------+------------------+



### 11. Spark SQL Queries and Temp Views

Spark SQL is at the interface of SQL and DataFrames.    
To write SQL queries against DataFrames, first register DF as a `SQL temp view`, and then write the query.

A temp view is session scoped:  
- visible only to the session that created it 
- dropped when the session ends (not persisted)

**Example of SQL Query against DataFrame**

In [14]:
# register DataFrame as temp view with name "people"
df.createOrReplaceTempView("people")

# query the view
sqlDF = spark.sql("SELECT * FROM people where name == 'Andy'")
sqlDF.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



### 12. Aggregate on columns

SQL functions can be loaded from this library: `pyspark.sql.functions`

We will load some stock data to demonstrate aggregation.

In [15]:
# import data types and functions
from pyspark.sql.types import StructType, StructField, DateType, StringType, LongType, FloatType
from pyspark.sql import functions as F

# set up schema
stock_schema = StructType([StructField('date',DateType(),False),
                           StructField('ticker',StringType(),False),
                           StructField('close',FloatType(),False),
                           StructField('adjusted_close',FloatType(),False),
                           StructField('volume',LongType(),False),
                         ])

In [16]:
# Read in stock data. Source: Yahoo! finance
DATAPATH_STOCKS = './amzn_msft_prices.csv'

df_stx = spark.read.csv(DATAPATH_STOCKS, header=True, schema=stock_schema)
df_stx.show()

+----------+------+------+--------------+--------+
|      date|ticker| close|adjusted_close|  volume|
+----------+------+------+--------------+--------+
|2022-02-10|  MSFT|302.38|     299.57297|45386200|
|2022-02-11|  MSFT|295.04|     292.30112|39175600|
|2022-02-14|  MSFT| 295.0|      292.2615|36359500|
|2022-02-15|  MSFT|300.47|      297.6807|27058300|
|2022-02-16|  MSFT| 299.5|     297.33322|29982100|
|2022-02-17|  MSFT|290.73|     288.62668|32461600|
|2022-02-18|  MSFT|287.93|     285.84692|34264000|
|2022-02-22|  MSFT|287.72|     285.63846|41736100|
|2022-02-23|  MSFT|280.27|     278.24234|37811200|
|2022-02-24|  MSFT|294.59|     292.45874|56989700|
|2022-02-25|  MSFT|297.31|     295.15906|32546700|
|2022-02-28|  MSFT|298.79|     296.62836|34627500|
|2022-03-01|  MSFT|294.95|     292.81613|31217800|
|2022-03-02|  MSFT|300.19|     298.01822|31873000|
|2022-03-03|  MSFT|295.92|     293.77914|27314500|
|2022-03-04|  MSFT|289.86|     287.76294|32356500|
|2022-03-07|  MSFT|278.91|     

In [17]:
# check the schema

df_stx.printSchema()

root
 |-- date: date (nullable = true)
 |-- ticker: string (nullable = true)
 |-- close: float (nullable = true)
 |-- adjusted_close: float (nullable = true)
 |-- volume: long (nullable = true)



We will calculate some statistics on each stock:
- minimum closing price
- maximum closing price
- minimum volume
- maximum volume

In [18]:
agg_df = df_stx.groupBy("ticker").agg(F.min("close"), F.max("close"), F.min("volume"), F.max("volume"))
agg_df.show()

+------+----------+----------+-----------+-----------+
|ticker|min(close)|max(close)|min(volume)|max(volume)|
+------+----------+----------+-----------+-----------+
|  AMZN|     81.82|   169.315|   35088600|  272662000|
|  MSFT|    214.25|    315.41|    9200800|   86102000|
+------+----------+----------+-----------+-----------+



24/09/13 02:06:36 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 1013499 ms exceeds timeout 120000 ms
24/09/13 02:06:36 WARN SparkContext: Killing executors is not supported by current scheduler.
24/09/13 02:06:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$

**IMPORTANT NOTE**  
Do NOT use loops to aggregate data. Loops are run sequentially and do not harness parallelization.  
Using the `groupBy()` method will do the job using parallelization.

### 13. Input/Output

#### 13A. I/O with DataFrames

Here we show various examples of reading and writing dataframes.  
This code is for illustration only.

**Read in Data and Infer the Schema**

```
adult_df = spark.read.\
    format("com.spark.csv").\
    option("header", "false").\
    option("inferSchema", "true").load("dbfs:/databricks-datasets/adult/adult.data")
```

**Read and Write using Generic Functions**

```
df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
```

**Read and Write using Manually Specified Formats**

```
df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")
```

#### 13B. Parquet Files

- Parquet is a **columnar format** that is supported by many other data processing systems  

- Stores metadata about the columns, which can provide efficiency

- Files hold binary data

- Read and writing parquet files can be MUCH faster in Spark

- Especially useful when querying columns for analytics and ML (don't generally need entire rows of data)

- Good compression and encoding options:  
  Implements a hybrid of bit packing and RLE - encoding switches based on which produces the best compression results

  *Bit Packing* - Usually 32 or 64 bits of storage dedicated per integer.  
  For small integers, packs multiple integers into same space for efficient storage.  
  
  *Run Length Encoding* - for sequence of duplicate values, store single value with number of occurrences.


<img src="./RLE.png">

#### 13C. Partition Discovery

Database tables can be partitioned to make querying more efficient.  
For example, the data can be
split by gender and country, producing smaller tables.  
If the analyst is only interested in a single country, the query will run faster.


In a partitioned table, data are usually stored in different directories, with partitioning column values encoded in the path of each partition directory.  

All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able to discover and infer partitioning information automatically. 

In [None]:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...


**Examples of writing DF to Parquet file, partitioning columns**

```
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").parquet("/tmp/sample_table")
```

**TRY FOR YOURSELF (UNGRADED EXERCISES)**

1) Given the stock dataframe, use Spark SQL to select all AMZN records. 

In [19]:
# register stock dataFrame as temp view with name "stocks"
df_stx.createOrReplaceTempView("stocks")

# query the view
sqlDF = spark.sql("SELECT * FROM stocks where ticker == 'AMZN'")
sqlDF.show()

ConnectionRefusedError: [Errno 61] Connection refused

2) Given the stock dataframe, do an aggregation to compute minimum, mean, and maximum adjusted close for each stock.

In [None]:
df_stx.groupBy("ticker").agg(F.min("adjusted_close"), F.mean("adjusted_close"), F.max("adjusted_close")).show()

3) Select the date, ticker, and adjusted_close columns, saving this data as a parquet file.

In [None]:
df_stx.select("date","ticker","adjusted_close").write.save("stocks.parquet", format="parquet")

4) Load the parquet file into a new dataframe and verify that things look correct.

In [None]:
test = spark.read.load("stocks.parquet")
test.show()

### 14. Summary

You should now have a basic understanding of Spark SQL, DataFrames, and how to use some of the common transformations on DataFrames.  
With practice, you will gain comfort in selecting and processing data with Spark SQL and DataFrames, which is essential.

Additionally, you should have some sense of when DataFrames are preferred over RDDs, and vice versa.
