In the previous notebook, we saw Spark RDD. However, there are three way of data storage - RDD, DataFrame, and DataSet. Here, we deal with Spark DataFrame.

# PySpark DataFrame and Operations
A PySpark dataframe is not the same as a pandas dataframe. Instead, it is **Spark SQL dataframe**.

## A. Import PySpark libraries

In [0]:
import pyspark
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.master("local[*]").getOrCreate()
# optional
spark_session.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark_session

## B. Create Spark DataFrame.

### B.1 From imported csv.

#### Import data from data source.
Here, we will use a csv file to import data but data can be imported into Databricks workspace from different sources. This dataframe is called a SQL dataframe and many native SQL operations can be performed on this dataframe.

In [0]:
path_to_filestore_tables = "/FileStore/tables/"
training_csv_filename = "housing.csv"
titanic_df = spark_session.read.csv(path_to_filestore_tables+training_csv_filename,
                            header=True,
                            sep=",",
                           inferSchema = True)
titanic_df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

Note that while printing the PySpark SQL dataframe above, we are not retrieving all the rows of the dataframe from the distributed storage of Spark.<br>Let's print the schema of the imported csv file.

In [0]:
titanic_df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



### B.2 From RDDs.
Spark SQL DataFrames can also be created from RDDs.

In [0]:
from datetime import date
list_of_tuples = [("Kathmandu", 1, 0.91, date(2022,10,11)), ("Pokhara", 2, 0.73, date(2022,10,11)), ("Biratnagar", 3, 0.66, date(2022,10,11))]

# use user-declared Spark Context
# rdd = spark_session.sparkContext.parallelize(list_of_tuples)
# or use the default Spark Context
rdd = spark_session.sparkContext.parallelize(list_of_tuples)

rdd

Out[18]: ParallelCollectionRDD[70] at readRDDFromInputStream at PythonRDD.scala:435

In [0]:
df = spark.createDataFrame(rdd, schema=["City","Rank", "Air Quality", "Date"])
df.show(5)

+----------+----+-----------+----------+
|      City|Rank|Air Quality|      Date|
+----------+----+-----------+----------+
| Kathmandu|   1|       0.91|2022-10-11|
|   Pokhara|   2|       0.73|2022-10-11|
|Biratnagar|   3|       0.66|2022-10-11|
+----------+----+-----------+----------+



In [0]:
df.printSchema()

root
 |-- City: string (nullable = true)
 |-- Rank: long (nullable = true)
 |-- Air Quality: double (nullable = true)
 |-- Date: date (nullable = true)



## C. SQL DataFrame Operations

#### SQL Statements

A lot of the syntax keywords from SQL are useful to interact with Spark dataframe because Spark dataframe class was built with consideration for SQL. For example, the `LIMIT` keyword useed in SQL can be used as below.

In [0]:
titanic_df.limit(5)

longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


#### SQL SELECT

Similarly the `SELECT` keyword from SQL can be used as below.

In [0]:
titanic_df.select("*")

longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY
-122.25,37.85,52.0,919.0,213.0,413.0,193.0,4.0368,269700.0,NEAR BAY
-122.25,37.84,52.0,2535.0,489.0,1094.0,514.0,3.6591,299200.0,NEAR BAY
-122.25,37.84,52.0,3104.0,687.0,1157.0,647.0,3.12,241400.0,NEAR BAY
-122.26,37.84,42.0,2555.0,665.0,1206.0,595.0,2.0804,226700.0,NEAR BAY
-122.25,37.84,52.0,3549.0,707.0,1551.0,714.0,3.6912,261100.0,NEAR BAY


We can also combine these commands as below.

In [0]:
titanic_df.select("latitude", "longitude").limit(5)

latitude,longitude
37.88,-122.23
37.86,-122.22
37.85,-122.24
37.85,-122.25
37.85,-122.25


#### SQL WHERE
We can also make complex queries using SQL keywords as below.

In [0]:
titanic_df.where((titanic_df.total_rooms > 1000) & (titanic_df.ocean_proximity == "NEAR BAY")).limit(5)

longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY
-122.25,37.84,52.0,2535.0,489.0,1094.0,514.0,3.6591,299200.0,NEAR BAY


#### SQL Aggregate Functions

In [0]:
titanic_df.agg({"total_rooms":"avg"})

avg(total_rooms)
2635.7630813953488


In [0]:
titanic_df.groupBy("ocean_proximity").agg({"total_bedrooms":"avg"}).orderBy("ocean_proximity", ascending = False)

ocean_proximity,avg(total_bedrooms)
NEAR OCEAN,538.6156773211568
NEAR BAY,514.1828193832599
ISLAND,420.4
INLAND,533.8816194581281
<1H OCEAN,546.5391852999778


A temporay view is needed to run SQL queries on the dataframe data since SQL queries cannot be done directly on a dataframe. This temporary view's lifetime is tied to this SparkSession and gets killed off once the session ends. The temporary view is useful if you want to access the same data multiple times within the notebook. Also, it will not copy the actual data at any place.

In [0]:
df.createOrReplaceTempView("temp_table_view")

This temporay view can be used to run SQL queries on. For example, as below:

In [0]:
%sql
SELECT * FROM temp_table_view
LIMIT 5


City,Rank,Air Quality,Date
Kathmandu,1,0.91,2022-10-11
Pokhara,2,0.73,2022-10-11
Biratnagar,3,0.66,2022-10-11
