<a href="https://colab.research.google.com/github/jtao/dswebinar/blob/master/pyspark/PySpark_DataFrame.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark DataFrames and SQL

[Jian Tao](https://orcid.org/0000-0003-4228-6089), Texas A&M University

May 1, 2021

### 1. Set up the PySpark environment first

In [1]:
# For each Google Colab, we will need to run this cell to ensure that PySpark is installed properly.
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Colab").config('spark.ui.port', '4050').getOrCreate()

# !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
# !unzip -o ngrok-stable-linux-amd64.zip
# get_ipython().system_raw('./ngrok http 4050 &')
# !curl -s http://localhost:4040/api/tunnels | python3 -c "import sys, json; print(\"\nClick me to launch (give it a minute or two)\n\"); print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### 2. Create a DataFrame by reading from a CSV/JSON file

`spark.read.csv` can only read from local files, so we will have to download the CSV file from the URL first. We can use `SparkFiles` to do that or use `pandas`. For those CSV files with a header, please make sure to set `header=True` in the argument list for `spark.read.csv`. When the data types of the columns are not known, `inferSchema=True` will do the trick to automatically recognize the data types, but it is not perfect. In our example, `Horsepower` is not correctly recognized.

In [2]:
from pyspark import SparkFiles

csv_url = "https://raw.githubusercontent.com/jtao/AdvancedML/main/data/Auto.csv"
json_url = "https://raw.githubusercontent.com/jtao/dswebinar/master/pyspark/Auto.json"

spark.sparkContext.addFile(csv_url)
spark.sparkContext.addFile(json_url)

## One can create a spark dataframe from pandas dataframe as well.
# import pandas as pd
# df = spark.createDataFrame(pd.read_csv(url))

#df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", inferSchema=False)
df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", inferSchema=True)

df.printSchema()
df.show(5)

root
 |-- mpg: double (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- name: string (nullable = true)

+----+---------+------------+----------+------+------------+----+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|
+----+---------+------------+----------+------+------------+----+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|  70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|  70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|  70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|  70|     1|

In [3]:
# either will work
df = spark.read.json(SparkFiles.get("Auto.json"))
#df = spark.read.load(SparkFiles.get("Auto.json"),format="json")

df.printSchema()
df.show(10)

root
 |-- acceleration: double (nullable = true)
 |-- cylinders: long (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- name: string (nullable = true)
 |-- origin: long (nullable = true)
 |-- weight: long (nullable = true)
 |-- year: long (nullable = true)

+------------+---------+------------+----------+----+--------------------+------+------+----+
|acceleration|cylinders|displacement|horsepower| mpg|                name|origin|weight|year|
+------------+---------+------------+----------+----+--------------------+------+------+----+
|        12.0|        8|       307.0|       130|18.0|chevrolet chevell...|     1|  3504|  70|
|        11.5|        8|       350.0|       165|15.0|   buick skylark 320|     1|  3693|  70|
|        11.0|        8|       318.0|       150|18.0|  plymouth satellite|     1|  3436|  70|
|        12.0|        8|       304.0|       150|16.0|       amc rebel sst|     1|  3

We can define a schema to help `spark.read.csv` to correctly cast the type of all the columns.

In [4]:
from pyspark.sql.types import *

user_schema = StructType([
                     StructField("mpg", IntegerType(), True),
                     StructField("cylinders", IntegerType(), True),
                     StructField("displacement", IntegerType(), True),
                     StructField("horsepower", IntegerType(), True),
                     StructField("weight", IntegerType(), True),
                     StructField("acceleration", DoubleType(), True),
                     StructField("year", IntegerType(), True),
                     StructField("origin", IntegerType(), True),
                     StructField("name", StringType(), True)
])

df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", schema=user_schema, inferSchema=True)

df.printSchema()
df.show(5)

root
 |-- mpg: integer (nullable = true)
 |-- cylinders: integer (nullable = true)
 |-- displacement: integer (nullable = true)
 |-- horsepower: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- year: integer (nullable = true)
 |-- origin: integer (nullable = true)
 |-- name: string (nullable = true)

+---+---------+------------+----------+------+------------+----+------+--------------------+
|mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|
+---+---------+------------+----------+------+------------+----+------+--------------------+
| 18|        8|         307|       130|  3504|        12.0|  70|     1|chevrolet chevell...|
| 15|        8|         350|       165|  3693|        11.5|  70|     1|   buick skylark 320|
| 18|        8|         318|       150|  3436|        11.0|  70|     1|  plymouth satellite|
| 16|        8|         304|       150|  3433|        12.0|  70|     1|    

In [5]:
df = spark.read.json(SparkFiles.get("Auto.json"))
df.printSchema()
df.show()

root
 |-- acceleration: double (nullable = true)
 |-- cylinders: long (nullable = true)
 |-- displacement: double (nullable = true)
 |-- horsepower: string (nullable = true)
 |-- mpg: double (nullable = true)
 |-- name: string (nullable = true)
 |-- origin: long (nullable = true)
 |-- weight: long (nullable = true)
 |-- year: long (nullable = true)

+------------+---------+------------+----------+----+--------------------+------+------+----+
|acceleration|cylinders|displacement|horsepower| mpg|                name|origin|weight|year|
+------------+---------+------------+----------+----+--------------------+------+------+----+
|        12.0|        8|       307.0|       130|18.0|chevrolet chevell...|     1|  3504|  70|
|        11.5|        8|       350.0|       165|15.0|   buick skylark 320|     1|  3693|  70|
|        11.0|        8|       318.0|       150|18.0|  plymouth satellite|     1|  3436|  70|
|        12.0|        8|       304.0|       150|16.0|       amc rebel sst|     1|  3

### 3. Create a Spark DataFrame with a list

In [6]:
auto_list = [(1, 18, "Chevrolet"), (2, 15, "Buick"), (3, 18, "Plymouth"), (4, 16, "Amc"), (5, 17, "Ford")]

df = spark.createDataFrame(auto_list)
df.printSchema()
df.show(5)

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

users_schema = StructType([
                          StructField("id", IntegerType(), True),
                          StructField("mpg", IntegerType(), True),
                          StructField("name", StringType(), True)])

df = spark.createDataFrame(auto_list, schema=users_schema)
df.printSchema()
df.show(5)

root
 |-- _1: long (nullable = true)
 |-- _2: long (nullable = true)
 |-- _3: string (nullable = true)

+---+---+---------+
| _1| _2|       _3|
+---+---+---------+
|  1| 18|Chevrolet|
|  2| 15|    Buick|
|  3| 18| Plymouth|
|  4| 16|      Amc|
|  5| 17|     Ford|
+---+---+---------+

root
 |-- id: integer (nullable = true)
 |-- mpg: integer (nullable = true)
 |-- name: string (nullable = true)

+---+---+---------+
| id|mpg|     name|
+---+---+---------+
|  1| 18|Chevrolet|
|  2| 15|    Buick|
|  3| 18| Plymouth|
|  4| 16|      Amc|
|  5| 17|     Ford|
+---+---+---------+



### 4. Create a Spark DataFrame with a list of dictionaries

In [7]:
auto_list = [{"id": 1, "mpg": 18, "name": "Chevrolet"}, 
                {"id": 2, "mpg": 15, "name": "Buick"}, 
                {"id": 3, "mpg": 18, "name": "Plymouth"}, 
                {"id": 4, "mpg": 16, "name": "Amc"}, 
                {"id": 5, "mpg": 17, "name": "Ford"}]
df = spark.createDataFrame(auto_list)
df.printSchema()
df.show(5)

root
 |-- id: long (nullable = true)
 |-- mpg: long (nullable = true)
 |-- name: string (nullable = true)

+---+---+---------+
| id|mpg|     name|
+---+---+---------+
|  1| 18|Chevrolet|
|  2| 15|    Buick|
|  3| 18| Plymouth|
|  4| 16|      Amc|
|  5| 17|     Ford|
+---+---+---------+



### 5. Operations on Spark DataFrame

In [8]:
# Load the full data set again.
df = spark.read.csv(SparkFiles.get("Auto.csv"), header=True, sep=",", inferSchema=True)

In [9]:
# Select only the "name" column
df.select("name").show(5)

+--------------------+
|                name|
+--------------------+
|chevrolet chevell...|
|   buick skylark 320|
|  plymouth satellite|
|       amc rebel sst|
|         ford torino|
+--------------------+
only showing top 5 rows



In [10]:
# Select everybody, but increment the mpg by 100
df.select(df['name'], df['mpg'] + 100).show(5)

+--------------------+-----------+
|                name|(mpg + 100)|
+--------------------+-----------+
|chevrolet chevell...|      118.0|
|   buick skylark 320|      115.0|
|  plymouth satellite|      118.0|
|       amc rebel sst|      116.0|
|         ford torino|      117.0|
+--------------------+-----------+
only showing top 5 rows



In [11]:
# Select mpg greater than 30
df.filter(df['mpg'] > 30).show(5)

+----+---------+------------+----------+------+------------+----+------+-------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|               name|
+----+---------+------------+----------+------+------------+----+------+-------------------+
|31.0|        4|        71.0|        65|  1773|        19.0|  71|     3|toyota corolla 1200|
|35.0|        4|        72.0|        69|  1613|        18.0|  71|     3|        datsun 1200|
|31.0|        4|        79.0|        67|  1950|        19.0|  74|     3|        datsun b210|
|32.0|        4|        71.0|        65|  1836|        21.0|  74|     3|toyota corolla 1200|
|31.0|        4|        76.0|        52|  1649|        16.5|  74|     3|      toyota corona|
+----+---------+------------+----------+------+------------+----+------+-------------------+
only showing top 5 rows



In [12]:
# Count Cars by cylinders
df.groupBy("cylinders").count().show(10)

+---------+-----+
|cylinders|count|
+---------+-----+
|        6|   84|
|        3|    4|
|        5|    3|
|        4|  203|
|        8|  103|
+---------+-----+



### 6. Running SQL queries programmatically

In [13]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("auto")
sqlDF = spark.sql("SELECT * FROM auto")
sqlDF.show(5)

+----+---------+------------+----------+------+------------+----+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|
+----+---------+------------+----------+------+------------+----+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|  70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|  70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|  70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|  70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|  70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----+------+--------------------+
only showing top 5 rows



In [14]:
# Register the DataFrame as a global temporary view
df.createGlobalTempView("auto")

In [15]:
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.auto").show(5)

# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.auto").show(5)

+----+---------+------------+----------+------+------------+----+------+--------------------+
| mpg|cylinders|displacement|horsepower|weight|acceleration|year|origin|                name|
+----+---------+------------+----------+------+------------+----+------+--------------------+
|18.0|        8|       307.0|       130|  3504|        12.0|  70|     1|chevrolet chevell...|
|15.0|        8|       350.0|       165|  3693|        11.5|  70|     1|   buick skylark 320|
|18.0|        8|       318.0|       150|  3436|        11.0|  70|     1|  plymouth satellite|
|16.0|        8|       304.0|       150|  3433|        12.0|  70|     1|       amc rebel sst|
|17.0|        8|       302.0|       140|  3449|        10.5|  70|     1|         ford torino|
+----+---------+------------+----------+------+------------+----+------+--------------------+
only showing top 5 rows

+----+---------+------------+----------+------+------------+----+------+--------------------+
| mpg|cylinders|displacement|horsep

### 7. References:

SQL References
https://spark.apache.org/docs/latest/sql-ref-ansi-compliance.html