# Apache Spark 3.2 (PySpark) Tutorial
- Author: Akira Takihara Wang (https://github.com/akiratwang)

Tutorial Operating System(s):
- Windows 10 and WSL2
- Linux

# Starting a Spark Session
To begin with Spark, we need to start a `SparkSession` class.
- `appName`: Name of the Spark app
- `config`: Configurations to initialise with. We will initialise this example with `'spark.sql.repl.eagerEval.enabled'` which enables a nicer HTML display (similar to `pandas`) for the DataFrame outputs.
    - This is the equivalent of running `spark.conf.set('spark.sql.repl.eagerEval.enabled', True)`
    
A general note is to understand that Spark is **immutable**. We'll discuss it further down the track, but for now, just remember this!

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark Fundamentals") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

21/11/24 13:52:13 WARN Utils: Your hostname, NeonEx resolves to a loopback address: 127.0.1.1; using 10.1.1.247 instead (on interface eth0)
21/11/24 13:52:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/11/24 13:52:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Loading Data
Now that we have a `SparkSession`, we can create `DataFrames` from an existing data source. Here's an example with reading in a sample dataset in CSV format:
- `header=True` has been specified as CSVs come with a header row.
- `sdf.limit(5)` shows the first 5 rows using the HTML display. The alternative is to use `sdf.show(5)` although it will be displayed using the original Spark UI.

In [2]:
sdf = spark.read.csv('../data/sample.csv', header=True)
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RatecodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,1/12/15 0:00,1/12/15 0:05,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,1/12/15 0:00,1/12/15 0:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,1/12/15 0:00,1/12/15 0:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
1,1/12/15 0:00,1/12/15 0:05,1,1.2,-73.99393463,40.74168396,1,N,-73.99766541,40.74746704,1,6.5,0.5,0.5,0.2,0.0,0.3,8.0
1,1/12/15 0:00,1/12/15 0:09,2,3.0,-73.98892212,40.72698975,1,N,-73.97559357,40.6968689,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3


# Untyped Dataset Operations
This is also known as **DataFrame Operations** and is the equivalent to `pandas.DataFrame` methods.

Documentation:
- PySpark DataFrame APIs: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#dataframe-apis
- SparkSQL Functions: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions
- Available dtypes for schemas: https://spark.apache.org/docs/latest/sql-ref-datatypes.html

## Schemas
To view the _schema_ in Spark, we can use `.printSchema()` to display it in a tree-like output. It's a more comprehensive equivalent to `df.dtypes` in `pandas`.

In [3]:
sdf.printSchema()

root
 |-- VendorID: string (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)



- As you can see above, all our data types are treated as `string`. As such, it is in your best interest to create a standard **schema** for your datasets.
- Creating a schema here is very similar to creating a schema for SQL.

We'll be using a DDL string (Data Definition Language) to create a schema.

Syntax:
```python
schema = """
`COLUMN_NAME_1` DTYPE_,
`COLUMN_NAME_2` DTYPE_,
...
`COLUMN_NAME_N` DTYPE_N
"""
```

Note, you need to be careful with `datetimes` as the formats can be inconsistent. You'll need some background context to determine if you can/should use `TIMESTAMP` or if it requires a custom parsing function to fix it.
- `DateType` assumes `yyyy-MM-dd`
- `TimestampType` assumes `yyyy-MM-dd HH:mm:ss.SSSS`
- If it cannot parse the field, it will return `null`

If we look at our DataFrame, our `datetime` field is of form `1/12/15 0:00` which follows neither formats. We'll cover the data type conversions in the next notebooks - for now, remember this.

In [4]:
schema = """
`VendorID` INT,  
`tpep_pickup_datetime` STRING, 
`tpep_dropoff_datetime` STRING,
`passenger_count` INT, 
`trip_distance` DOUBLE, 
`pickup_longitude` DOUBLE, 
`pickup_latitude` DOUBLE,
`RateCodeID` INT, 
`store_and_fwd_flag` STRING, 
`dropoff_longitude` DOUBLE, 
`dropoff_latitude` DOUBLE,
`payment_type` INT, 
`fare_amount` DOUBLE, 
`extra` DOUBLE, 
`mta_tax` DOUBLE, 
`tip_amount` DOUBLE,
`tolls_amount` DOUBLE, 
`improvement_surcharge` DOUBLE, 
`total_amount` DOUBLE
"""

Now that we've created the schema, let's re-read in the CSV with the pre-defined schema. 

If you want Spark to automatically infer the schema, you can load in your dataset with this argument:
- `spark.read.csv(filepath, inferSchema=True, header=True)`

In [5]:
# del sdf
sdf = spark.read.csv('../data/sample.csv', schema=schema, header=True)

In [6]:
sdf.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,1/12/15 0:00,1/12/15 0:05,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,1/12/15 0:00,1/12/15 0:00,2,2.69,-73.97233582,40.76237869,1,N,-73.99362946,40.74599838,1,21.5,0.0,0.5,3.34,0.0,0.3,25.64
2,1/12/15 0:00,1/12/15 0:00,1,2.62,-73.96884918,40.76453018,1,N,-73.97454834,40.79164124,1,17.0,0.0,0.5,3.56,0.0,0.3,21.36
1,1/12/15 0:00,1/12/15 0:05,1,1.2,-73.99393463,40.74168396,1,N,-73.99766541,40.74746704,1,6.5,0.5,0.5,0.2,0.0,0.3,8.0
1,1/12/15 0:00,1/12/15 0:09,2,3.0,-73.98892212,40.72698975,1,N,-73.97559357,40.6968689,2,11.0,0.5,0.5,0.0,0.0,0.3,12.3


## Basic Operations
### Selection
To show a specific column, we will use `sdf.select(col).limit(5)`. 
- The equivalent in `pandas` is `df[col].head()`.

To _access_ a specific column, use the `sdf[col]` syntax (equivalent to `df[col]`). Avoid using `sdf.col` or `df.col` as it is **not** robust (cannot handle columns with spaces) or future-proof. 

For multiple columns, pass them through an array as usual.

In [10]:
sdf.select('passenger_count').limit(5)

passenger_count
5
2
1
1
2


In [11]:
sdf.select(['passenger_count', 'trip_distance']).limit(5)

passenger_count,trip_distance
5,0.96
2,2.69
1,2.62
1,1.2
2,3.0


### Filtering
For filtering data, we use `sdf.filter(condition).limit(5)`. 
- The equivalent in `pandas` is `df.loc[condition].head()`

In [12]:
sdf.filter(sdf['passenger_count'] == 5).limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,0.83,-74.00122833,40.75160599,1,N,-73.99243164,40.75825882,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,17.7,-73.7960434,40.64468002,2,N,-74.00257874,40.73490906,1,52.0,0.0,0.5,11.67,5.54,0.3,70.01
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,2.57,-74.01177216,40.70434952,1,N,-73.98803711,40.73279953,1,11.0,0.5,0.5,1.23,0.0,0.3,13.53
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,4.1,-73.97471619,40.75342178,1,N,-74.00688171,40.70512009,1,14.5,0.5,0.5,3.16,0.0,0.3,18.96


### GroupBy (Aggregation)
To groupby the data (i.e mean), we can use `sdf.groupby(col).mean(aggregated columns).limit(5)`
- The equivalent in `pandas` is `df.groupby(col)[aggregated columns].mean().head()`

In [None]:
sdf.groupby('passenger_count').mean('trip_distance').limit(5)

We can also apply multiple different aggregations and change their output names!

In [None]:
from pyspark.sql.functions import mean, median

results = sdf.groupBy("passenger_count") \
    .agg(mean("total_amount").alias("Average Trip Amount USD$"),
         median("trip_distance").alias("Median Trip Distance in Miles")
    ) 

results

# Spark SQL
The `SparkSession` also allows for `SQL` queries to be run with the query results returned as a DataFrame. To do so, we will need to create a temporary SQL view.

Temporary SQL Views:
- The namespace is session-scoped (not local or global) meaning it will disappear if the `SparkSession` terminates.
- If you want a more persistent view that can be shared among all sessions until the Spar

In [14]:
# create a temporary SQL view for the DataFrame
sdf.createOrReplaceTempView('taxi')

sql_query = spark.sql("""
SELECT *
FROM taxi
WHERE passenger_count == 5
    AND trip_distance > 0
""")

sql_query.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,0.96,-73.97994232,40.76538086,1,N,-73.96630859,40.76308823,1,5.5,0.5,0.5,1.0,0.0,0.3,7.8
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,0.83,-74.00122833,40.75160599,1,N,-73.99243164,40.75825882,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,17.7,-73.7960434,40.64468002,2,N,-74.00257874,40.73490906,1,52.0,0.0,0.5,11.67,5.54,0.3,70.01
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,2.57,-74.01177216,40.70434952,1,N,-73.98803711,40.73279953,1,11.0,0.5,0.5,1.23,0.0,0.3,13.53
2,2015-12-01 00:00:00,2015-12-01 00:00:00,5,4.1,-73.97471619,40.75342178,1,N,-74.00688171,40.70512009,1,14.5,0.5,0.5,3.16,0.0,0.3,18.96


# Saving Data
By default, Spark will save your data sources as a `parquet` - an open source file format available. It's designed to be efficient and is similar to row based files such as `CSV`. 

From https://databricks.com/glossary/what-is-parquet:
- Parquet is optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types.
- This approach is best especially for those queries that need to read certain columns from a large table. 
- Parquet can only read the needed columns therefore greatly minimizing the IO.

In other words, it is both much smaller in size, much faster in query execution, and much _cheaper_ in cost (for cloud-based solutions).

As an example, let's save the output of the SQL query we did above.

In [15]:
sql_query.write.save('../data/subset.parquet')

                                                                                

## Reading in Parquet Files
As we had read in a `CSV` above, here's an example of reading in a `parquet` format (the method is quite intuitive).

In [17]:
sdf2 = spark.read.parquet('../data/subset.parquet')
sdf2.limit(5)

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,pickup_longitude,pickup_latitude,RateCodeID,store_and_fwd_flag,dropoff_longitude,dropoff_latitude,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount
2,2015-12-04 20:24:00,2015-12-04 20:24:00,5,2.23,-73.97263336,40.75886536,1,N,-73.99765015,40.7432785,1,14.5,0.5,0.5,0.0,0.0,0.3,15.8
2,2015-12-04 20:24:00,2015-12-04 20:24:00,5,13.75,-73.79033661,40.64397049,1,N,-73.9159317,40.62261963,2,39.0,0.5,0.5,0.0,0.0,0.3,40.3
2,2015-12-04 20:24:00,2015-12-04 20:24:00,5,2.24,-73.98329926,40.74370956,1,N,-74.00302887,40.73128128,1,14.5,0.5,0.5,3.16,0.0,0.3,18.96
2,2015-12-04 20:24:00,2015-12-04 20:24:00,5,2.36,-73.96770477,40.76294708,1,N,-73.94860077,40.75246429,2,10.0,0.5,0.5,0.0,0.0,0.3,11.3
2,2015-12-04 20:25:00,2015-12-04 20:25:00,5,1.75,-73.98252869,40.73942947,1,N,-73.97209167,40.76068878,1,11.5,0.5,0.5,2.56,0.0,0.3,15.36


In [18]:
sdf2.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- RateCodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)

