# Data Wrangling with PySpark

#### Referencing
[Master Data Wrangling with PySpark: A Comprehensive Cheat Sheet](https://amandeepsinghkhanna.github.io/data-wrangling-with-pyspark/) by Amandeep Singh Khanna

First, I had to set up the right Java version along with the Spark package onto this environment and set the path variables.

Aman gave tips to get help from [Medard Story's Tutorial](https://youtu.be/e1wdJuBoRz4?si=q7LbZkeMK3_BwmYU) on how to get this set up for the first time. The code is optional and hence will be in a text cell.

### Code from Medard's tutorial on setting up the Java requirements and Spark
*Check if we have java 8 or not*  
!ls /usr/lib/jvm/  

*Download and install Java 8*  
!apt-get update  
!apt-get install   openjdk-8-jdk-headless -qq > /dev/null

*Download Apache Spark binary*  
*Install spark (change the version number if needed)*  
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz  

*Unzip the spark file to the current folder*  
!tar xf spark-3.0.0-bin-hadoop3.2.tgz  

*Set your spark folder to your system path environment.*  
import os  
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"  
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"  

*Install findspark using pip*  
!pip install -q findspark

In [16]:
# Importing the package with Spark and initialising it
import findspark
findspark.init()

In [17]:
# Importing pyspark
import pyspark

In [18]:
# Checking which PySpark version is loaded
pyspark.__version__

'3.0.0'

In [19]:
# Import the package to start a Spark Session to work within
from pyspark.sql import SparkSession

In [20]:
# Creating the Spark session object
spark = (
	SparkSession
	.builder
	.appName("DataWranglingwithPyspark")
	.getOrCreate()
)

In [21]:
# Reading from a csv file
airports_df = (
spark.read.csv("/content/sample_data/airports.csv", header = True, inferSchema = True
              )
)

Displaying the pyspark dataframe

In [22]:
airports_df.show()

+---+--------------------+------------------+-------------------+----+---+---+-------------------+
|faa|                name|               lat|                lon| alt| tz|dst|              tzone|
+---+--------------------+------------------+-------------------+----+---+---+-------------------+
|04G|   Lansdowne Airport|        41.1304722|        -80.6195833|1044| -5|  A|   America/New_York|
|06A|Moton Field Munic...|        32.4605722|        -85.6800278| 264| -6|  A|    America/Chicago|
|06C| Schaumburg Regional|        41.9893408|        -88.1012428| 801| -6|  A|    America/Chicago|
|06N|     Randall Airport|         41.431912|        -74.3915611| 523| -5|  A|   America/New_York|
|09J|Jekyll Island Air...|        31.0744722|        -81.4277778|  11| -5|  A|   America/New_York|
|0A9|Elizabethton Muni...|        36.3712222|        -82.1734167|1593| -5|  A|   America/New_York|
|0G6|Williams County A...|        41.4673056|        -84.5067778| 730| -5|  A|   America/New_York|
|0G7|Finge

Checking the schema of the dataframe

In [23]:
airports_df.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)
 |-- tzone: string (nullable = true)



Displaying the shape of the dataframe

In [26]:
print(f"The number of rows are: {airports_df.count()} \nThe number of columns are: {len(airports_df.columns)}")

The number of rows are: 1458 
The number of columns are: 8


Listing all columns in our dataframe

In [27]:
airports_df.columns

['faa', 'name', 'lat', 'lon', 'alt', 'tz', 'dst', 'tzone']

Now lets gets down to the wrangling of our data! 😆

In [34]:
# Lets filter the rows based on a condition: airports over an altitude of 1000!!

airports_over_1000_df = airports_df.filter(airports_df["alt"]>1000)
# PySpark dataframes aren't callable

In [33]:
airports_over_1000_df.show()

+---+--------------------+----------+------------+----+---+---+-------------------+
|faa|                name|       lat|         lon| alt| tz|dst|              tzone|
+---+--------------------+----------+------------+----+---+---+-------------------+
|04G|   Lansdowne Airport|41.1304722| -80.6195833|1044| -5|  A|   America/New_York|
|0A9|Elizabethton Muni...|36.3712222| -82.1734167|1593| -5|  A|   America/New_York|
|17G|Port Bucyrus-Craw...|40.7815556| -82.9748056|1003| -5|  A|   America/New_York|
|1A3|Martin Campbell F...|35.0158056| -84.3468333|1789| -5|  A|   America/New_York|
|1G3|  Kent State Airport|41.1513889| -81.4151111|1134| -5|  A|   America/New_York|
|1G4|Grand Canyon West...| 35.899904| -113.815674|4813| -7|  A|    America/Phoenix|
|29D|  Grove City Airport|41.1460278|   -80.16775|1371| -5|  A|   America/New_York|
|2G2|Jefferson County ...|40.3602179| -80.7008742|1196| -5|  A|   America/New_York|
|2G9|Somerset County A...|40.0388708| -79.0149951|2275| -5|  A|   America/Ne

In [36]:
# Lets select a couple of columns from the main dataframe
# In our example, we want just the names of the airports and their time zone w.r.t. GMT
airports_df.select(['name','tz']).show()

+--------------------+---+
|                name| tz|
+--------------------+---+
|   Lansdowne Airport| -5|
|Moton Field Munic...| -6|
| Schaumburg Regional| -6|
|     Randall Airport| -5|
|Jekyll Island Air...| -5|
|Elizabethton Muni...| -5|
|Williams County A...| -5|
|Finger Lakes Regi...| -5|
|Shoestring Aviati...| -5|
|Jefferson County ...| -8|
|Harford County Ai...| -5|
|  Galt Field Airport| -6|
|Port Bucyrus-Craw...| -5|
|Jackson County Ai...| -5|
|Martin Campbell F...| -5|
| Mansfield Municipal| -5|
|Frazier Lake Airpark| -8|
|Clow Internationa...| -6|
|  Kent State Airport| -5|
|Grand Canyon West...| -7|
+--------------------+---+
only showing top 20 rows



In [38]:
# Renaming columns is key in data handling
# Lets rename lat and lon to their full-forms
airports_df = airports_df.withColumnRenamed("lat", "latitude")

In [40]:
# Checking the change
airports_df.columns

['faa', 'name', 'latitude', 'lon', 'alt', 'tz', 'dst', 'tzone']

In [43]:
# Now the above works for only one column, lets try for multiple columns
colname_replacements = {
    "latitude": "new_latitude",
    "lon": "longitude"
}

for old_colname, new_colname in colname_replacements.items():
  airports_df = airports_df.withColumnRenamed(old_colname, new_colname)

In [44]:
# Lets check again
airports_df.columns

['faa', 'name', 'new_latitude', 'longitude', 'alt', 'tz', 'dst', 'tzone']

In [50]:
# Handling duplicates is so essential! Lets say we want to drop duplicates in the tzone column
airports_df.dropDuplicates(["tzone"]).show()
# This will take the first instance only and discard all remaining instances of duplicate values

+---+--------------------+------------------+-------------------+----+---+---+-------------------+
|faa|                name|      new_latitude|          longitude| alt| tz|dst|              tzone|
+---+--------------------+------------------+-------------------+----+---+---+-------------------+
|1C9|Frazier Lake Airpark|54.013333333333335|-124.76833333333333| 152| -8|  A|  America/Vancouver|
|06A|Moton Field Munic...|        32.4605722|        -85.6800278| 264| -6|  A|    America/Chicago|
|0S9|Jefferson County ...|        48.0538086|       -122.8106436| 108| -8|  A|America/Los_Angeles|
|EEN|Dillant Hopkins A...|         72.270833|          42.898333| 149| -5|  A|                 NA|
|04G|   Lansdowne Airport|        41.1304722|        -80.6195833|1044| -5|  A|   America/New_York|
|369|  Atmautluak Airport|         60.866667|        -162.273056|  18| -9|  A|  America/Anchorage|
|36U|Heber City Munici...|        40.4818056|       -111.4288056|5637| -7|  A|     America/Denver|
|BKH|  Bar

In [59]:
# Finding the number of distinct time zones will also be helpful
airports_df.select("tzone").distinct().count()
# We need to use the 'select' function because pyspark dataframes are not callable

10

In [None]:
!pip install

In [63]:
# Now lets say we want to add a new column fill a default value. In this example, we want a new column to indicate whether the airports are in operation
# This will require a new package called lit
from pyspark.sql.functions import lit

constant_value = "OPERATIONAL"
airports_df = airports_df.withColumn("Operating Status", lit(constant_value))

In [65]:
# Ta-da!!
airports_df.show()

+---+--------------------+------------------+-------------------+----+---+---+-------------------+----------------+
|faa|                name|      new_latitude|          longitude| alt| tz|dst|              tzone|Operating Status|
+---+--------------------+------------------+-------------------+----+---+---+-------------------+----------------+
|04G|   Lansdowne Airport|        41.1304722|        -80.6195833|1044| -5|  A|   America/New_York|     OPERATIONAL|
|06A|Moton Field Munic...|        32.4605722|        -85.6800278| 264| -6|  A|    America/Chicago|     OPERATIONAL|
|06C| Schaumburg Regional|        41.9893408|        -88.1012428| 801| -6|  A|    America/Chicago|     OPERATIONAL|
|06N|     Randall Airport|         41.431912|        -74.3915611| 523| -5|  A|   America/New_York|     OPERATIONAL|
|09J|Jekyll Island Air...|        31.0744722|        -81.4277778|  11| -5|  A|   America/New_York|     OPERATIONAL|
|0A9|Elizabethton Muni...|        36.3712222|        -82.1734167|1593| -

In [70]:
# Looking at our latitude and longitude columns, we got some pretty long decimals there
# Lets use a function to get them down to just two decimal values (might not be appropriate for actual geopositional data, but will be implemented to understand the concept)

from pyspark.sql.functions import round

airports_df.withColumn("rounded_latitude", round(airports_df["new_latitude"], 2)).show()

+---+--------------------+------------------+-------------------+----+---+---+-------------------+----------------+----------------+
|faa|                name|      new_latitude|          longitude| alt| tz|dst|              tzone|Operating Status|rounded_latitude|
+---+--------------------+------------------+-------------------+----+---+---+-------------------+----------------+----------------+
|04G|   Lansdowne Airport|        41.1304722|        -80.6195833|1044| -5|  A|   America/New_York|     OPERATIONAL|           41.13|
|06A|Moton Field Munic...|        32.4605722|        -85.6800278| 264| -6|  A|    America/Chicago|     OPERATIONAL|           32.46|
|06C| Schaumburg Regional|        41.9893408|        -88.1012428| 801| -6|  A|    America/Chicago|     OPERATIONAL|           41.99|
|06N|     Randall Airport|         41.431912|        -74.3915611| 523| -5|  A|   America/New_York|     OPERATIONAL|           41.43|
|09J|Jekyll Island Air...|        31.0744722|        -81.4277778|  11

In [75]:
# Last but not the least, we sometimes need to typecast the columns
# This can be done in the following manner

# Lets check the schema once more
airports_df.printSchema()

root
 |-- faa: string (nullable = true)
 |-- name: string (nullable = true)
 |-- new_latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- alt: integer (nullable = true)
 |-- tz: integer (nullable = true)
 |-- dst: string (nullable = true)
 |-- tzone: string (nullable = true)
 |-- Operating Status: string (nullable = false)



In [80]:
# Say, we want to change rounded_latitude to integer
from pyspark.sql import functions as f

# Creates a new column with the type-casted values
airports_df.withColumn("new_latitude_integer", f.col("new_latitude").cast("integer")).show()

+---+--------------------+------------------+-------------------+----+---+---+-------------------+----------------+--------------------+
|faa|                name|      new_latitude|          longitude| alt| tz|dst|              tzone|Operating Status|new_latitude_integer|
+---+--------------------+------------------+-------------------+----+---+---+-------------------+----------------+--------------------+
|04G|   Lansdowne Airport|        41.1304722|        -80.6195833|1044| -5|  A|   America/New_York|     OPERATIONAL|                  41|
|06A|Moton Field Munic...|        32.4605722|        -85.6800278| 264| -6|  A|    America/Chicago|     OPERATIONAL|                  32|
|06C| Schaumburg Regional|        41.9893408|        -88.1012428| 801| -6|  A|    America/Chicago|     OPERATIONAL|                  41|
|06N|     Randall Airport|         41.431912|        -74.3915611| 523| -5|  A|   America/New_York|     OPERATIONAL|                  41|
|09J|Jekyll Island Air...|        31.0744

In [81]:
# If we'd like a temporary view from a dataframe, we can do it as so

# Creating the temporary view
airports_df.createOrReplaceTempView("airports_table")

# Then we can query the temporary view using spark sql
queried_df = spark.sql("SELECT * FROM airports_table")

and thus we come to the end of the journey

In [84]:
# This is how we'll end our Spark session
spark.stop()