# Introduction to PySpark Basics

## About This Notebook

This notebook was designed to provide a hands-on introduction to **PySpark**, the Python API for **Apache Spark**, a powerful distributed data processing framework. The primary goal is to demonstrate how PySpark works, including key concepts like lazy transformations, parallel processing, and SparkSQL for querying data.

---

### What You Will Learn

1. **Spark Basics**: How to create a Spark session and load data into Spark DataFrames.
2. **Lazy Transformations**: Understand how Spark optimizes execution by delaying computations until an action is triggered.
3. **Actions vs. Transformations**: Learn the difference between these two core operations.
4. **Parallelism in Spark**: See how Spark distributes data and processing across multiple nodes for better performance.
5. **SparkSQL**: Use SQL queries to analyze data in Spark DataFrames and leverage the full power of distributed processing.

---

### Cluster Setup

For demonstration purposes, this notebook connects to a local Spark cluster running in cluster mode. Spark is configured to utilize multiple worker nodes to showcase distributed data processing.


---
### **Start by creating a PySpark application that will be used accross the entire Notebook**
After running this command, the Spark Application will be up and running and you can check it by going to the [http://localhost:4040](http://localhost:4040) page

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Create Spark session with more detailed configuration
spark = SparkSession.builder \
    .appName("SparkTest") \
    .master("spark://spark-master:7077") \
    .config("spark.driver.host", "jupyter") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.bindAddress", "0.0.0.0") \
    .config("spark.driver.port", "29417") \
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/11 20:04:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


---
### **Now let's load the dataset files into a PySpark Dataframe and check its schema as well as the first 10 rows**

In [2]:
# Read the dataset into a PySpark Dataframe
# As the file path is *parquet, PySpark will read all the files with the .parquet extension
df = spark.read.parquet('file:///opt/spark/data/*.parquet')

# Print the dataframe schema inferred from the .parquet files
df.printSchema()

# Show the first 10 rows
df.show(10)

                                                                                

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_ride_f



+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

                                                                                

---
### **Transformations and Actions**
In Spark we have these two fundamental types of operations:
1. **Transformations**: Operations that run on RDDs, Dataframes or Datasets that produce a new distributed dataset from the existing one. They are lazy, which means that product an execution plan and does not actually execute the operation. The transformation (or transformations) applied on a Dataframe are just executed once an action is performed on the Dataframe. Examples are `map`, `filter`, `groupBy`, `join`, `repartition`
2. **Actions**: Operations that trigger the execution of transformations and produce a value as return and not a new dataset or write the result to the storage. Actions are the operations that trigger data computation. Examples are `collect`, `count`, `show`, `first`, `write`

In the next block, you will see how transformations and actions interact between each other

In [3]:
# This is a transformation example. It will select just 2 columns from the dataframe (pickup_location and trip_duration)
# It will also filter the dataset by the rows where trip_duration > 30
# After executing this command, no computation will be triggered and you can verify this in the Spark Application UI
# You will not see any job beeing triggered in the Spark Application UI after running this command

transformed_df = df.select("PULocationID", "trip_time").filter(df.trip_time > 30)

Only after running an action on top of the `transformed_df` the transformation will be applied on the data. Just run the next command and you will see the computation being applied

In [4]:
transformed_df.show()

+------------+---------+
|PULocationID|trip_time|
+------------+---------+
|          48|     1709|
|         246|     2069|
|           9|     1047|
|         129|      431|
|         129|      724|
|         130|     1249|
|          38|     1238|
|          90|      473|
|         125|      664|
|          68|     1481|
|          79|     2078|
|         143|     2336|
|          49|      771|
|         181|      391|
|          25|     2263|
|         216|     1818|
|         223|      580|
|           7|      798|
|         223|     1131|
|          79|     1792|
+------------+---------+
only showing top 20 rows



---
### **Aggregations**
The next command will do some aggregation on top of the Dataframe:
1. **Group**: Aggregate the data on top of `pickup_location`
2. **Average**: Calculate the average on top of `trip_duration` and give an alias of `avg_duration`
3. **Order**: The last transformation is ordering the resultset by `avg_duration` descending (`ascending=False`)

In [5]:
from pyspark.sql.functions import avg

# Lazy transformation
aggregated_df = df.groupBy("PULocationID") \
    .agg(avg("trip_time").alias("avg_trip_duration")) \
    .orderBy("avg_trip_duration", ascending=False)

# Action that trigger the transformations
aggregated_df.show(30)



+------------+------------------+
|PULocationID| avg_trip_duration|
+------------+------------------+
|         132| 2353.454890823891|
|         138|1747.5942813485483|
|         199| 1612.388888888889|
|         202|1460.9190927555856|
|          46|1324.7144465290808|
|           2|1321.4761904761904|
|         261|1314.8798892350294|
|         194|1310.3412698412699|
|          88|1308.5389453422265|
|         100|1304.4686264182608|
|         230| 1278.785676513117|
|          12|1271.1152312599681|
|          87|         1265.6182|
|         195|1260.3514171617805|
|         117|1239.9431008248578|
|         186|1237.4374664978286|
|          27|     1232.54296875|
|         161|1221.2275508871828|
|         163|1218.4810828420916|
|          48|1210.5463232368127|
|          68|1207.6508681022256|
|         162|1202.4361121152654|
|         154| 1194.007332722273|
|         140| 1188.507840248026|
|         209|1183.5469328755985|
|          33| 1183.322699088146|
|         201|

                                                                                

---
### **Parallelism in Spark**
In Spark, we can parallelize the execution and take advantage of parallel processing where each cluster node will take care of a portion of the computation. 
We can achieve this using the `repartition` command on top of the Dataframe. In the commands below we can check the number of partitions we have in the Dataframe before and after performing a `repartition` (as this is a transformation, it is lazy so, before any action, it will not be executed)

In [6]:
print("Default partitions:", df.rdd.getNumPartitions())

df_repartitioned = df.repartition(8)
print("Partitions after repartitioning:", df_repartitioned.rdd.getNumPartitions())


Default partitions: 6




Partitions after repartitioning: 8


---
## **Using SparkSQL**
SparkSQL is a Spark component that we can use in order to interact with the data using SQL queries. For this, we can register our Dataframe as a virtual table (using the command `createOrReplaceTempView`) and then run a SQL query in order to interact with the data.

In [10]:
df_repartitioned.createOrReplaceTempView("trips")

In [11]:
df_filtered = spark.sql("""
    SELECT PULocationID, AVG(trip_time) AS avg_trip_time
    FROM trips
    WHERE trip_time > 60
    GROUP BY PULocationID
""")

df_filtered.show()

                                                                                

+------------+------------------+
|PULocationID|     avg_trip_time|
+------------+------------------+
|          29| 945.4347455133862|
|          26| 968.7638147239832|
|          65|1178.2978641720947|
|         191| 962.0914268218004|
|         222| 994.9696597525473|
|         243|1039.3799716796468|
|          19| 969.8039161988224|
|          54|1119.7924583171773|
|         113|1100.4041809808002|
|         167| 993.6639540449976|
|         112|1050.7473704364797|
|         155|1070.4930071339325|
|         241| 955.1747939662611|
|         237|1097.6257803802412|
|          22| 990.1286824214957|
|         198|1073.9567761319909|
|         196|1001.4254911461545|
|         130| 964.8507134546948|
|           7| 987.2249549891809|
|          77|1008.1851713859911|
+------------+------------------+
only showing top 20 rows



---
### **Write Data back to disk**
Spark also allow us to write the data back to disk. When we perform this, each Executor will write a portion of the data in the disk. We will perform a write operation without applying any repartition (using the default one) and we will check how many files are written to disc. After that we will perform a repartition on the data and apply the write again and check how many files do we have (we should have one per partition defined by us)

In [13]:
# Read again the data into the Dataframe in order to avoid any previously defined transformations
df = spark.read.parquet('file:///opt/spark/data/*.parquet')

# This command will write the data using the default partitions
#df.write.mode("overwrite").parquet("file:///opt/spark/data/test/default_partitions")

# This command will repartition the data and write it to disk
# After this, we should see under the folder test/custom_partitions 8 parquet files (ignore the files with extension .parquet.crc)
df_repartitioned = df.repartition(8)
df_repartitioned.write.mode("overwrite").parquet("file:///opt/spark/data/test/custom_partitions")

                                                                                

After executing the previous commands, you should be able to see the data like in the image below
![text](../static/data_repartition.png)

---
### **Stop Spark Application**
After reaching the end of your application, you should stop it in order to free the resources that were allocated to it. Please run the command below.

In [None]:
spark.stop()