# PySpark Primer Notebook
### By Kevin Chamberlin
__Updated July 2024__

### Installing codespace requirements 

In [1]:
%pip install fsspec --quiet
%pip install s3fs --quiet
%pip install pyspark --quiet

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

24/08/02 15:36:56 WARN Utils: Your hostname, codespaces-00e44c resolves to a loopback address: 127.0.0.1; using 10.0.0.55 instead (on interface eth0)
24/08/02 15:36:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/02 15:36:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Intializing dataframes
This is also commonly completed by reading data from a datasource.

In [3]:
# Example of manually creating a Spark DataFrame using PySpark code 
l_avengers_data = [[1, "Steve", "Rogers", "Brooklyn, NY", "Blue"]
              ,[2, "Tony", "Stark", "Manahattan, NY", "Gold"]
              ,[3, "Peter", "Parker", "Queens, NY", "Blue"]
              ,[4, "Scott", "Lang", "Coral Gables, FL", "Blue"]
              ,[5, "Natasha", "Romanoff", "Stalingrad, USSR", "Black"]
              ,[6, "Clint", "Barton", "Waverly, IA", "Purple"]]

l_avengers_col_names = ["ID", "FirstName", "LastName", "Hometown", "Favorite Color"]

sdf_avengers = spark.createDataFrame(l_avengers_data, l_avengers_col_names)

# display() can be used in a platform like Databricks to display data in an interactive format
display(sdf_avengers) 

# show() is used here because GitHub codespaces do not support display()
sdf_avengers.show()

DataFrame[ID: bigint, FirstName: string, LastName: string, Hometown: string, Favorite Color: string]

                                                                                

+---+---------+--------+----------------+--------------+
| ID|FirstName|LastName|        Hometown|Favorite Color|
+---+---------+--------+----------------+--------------+
|  1|    Steve|  Rogers|    Brooklyn, NY|          Blue|
|  2|     Tony|   Stark|  Manahattan, NY|          Gold|
|  3|    Peter|  Parker|      Queens, NY|          Blue|
|  4|    Scott|    Lang|Coral Gables, FL|          Blue|
|  5|  Natasha|Romanoff|Stalingrad, USSR|         Black|
|  6|    Clint|  Barton|     Waverly, IA|        Purple|
+---+---------+--------+----------------+--------------+



### PySpark Libraries

In [4]:
import pyspark.sql.functions as F
from datetime import datetime
from pyspark.sql.types import DateType, IntegerType
#from pyspark.sql.functions import col, udf, when

Above are examples of commonly used libraries and how to import them. If you are familiar with Python, this formatting will look very familiar. When we talk about Pyspark we are frequently referring to a set of SQL functions that have been written in Python to be used on a distributed computing platform like Databricks!
I suggest starting all Databricks notebooks with the command import pyspark.sql.functions as F. I prefer this notation over the potentially simplier from pyspark.sql.functions import * because the former highlights more tracability for troubleshooting and prevents any potential conflicts with function names. This conflict has appeared a handful of times in our main product code, so it's best to just use "F." notation to save integration time later on.

In [5]:
# Basic PySpark SQL Functions: .withColumn() and .select()

sdf_avengers_names = (sdf_avengers
                      .withColumn("FullName", F.concat(F.col("FirstName"), F.lit(" "), F.col("LastName")))
                      .select(F.col("ID"), F.col("FullName"))
                     )

# display(sdf_avengers_names)
sdf_avengers_names.show()

+---+----------------+
| ID|        FullName|
+---+----------------+
|  1|    Steve Rogers|
|  2|      Tony Stark|
|  3|    Peter Parker|
|  4|      Scott Lang|
|  5|Natasha Romanoff|
|  6|    Clint Barton|
+---+----------------+



In [6]:
# Create a new dataframe completely from scratch

l_hero_data = [[1, "Captain America"]
              ,[2, "Iron Man"]
              ,[3, "Spiderman"]
              ,[4, "Ant-Man"]
              ,[5, "Black Widow"]
              ,[6, "Hawkeye"]]

l_hero_col_names = ["ID", "Hero"]

sdf_avengers_heroes = spark.createDataFrame(l_hero_data, l_hero_col_names)


# Create a new dataframe that matches the columns of an existing dataframe

l_new_avenger_data = [[7, "Wanda", "Maximoff", "Sokovia", "Scarlet"]]

l_new_avenger_col_names = sdf_avengers.columns # data and metadata from DFs can be called upon

sdf_avengers_new = spark.createDataFrame(l_new_avenger_data, l_new_avenger_col_names)

# Display both new dataframes
# display(sdf_avengers_heroes)
sdf_avengers_heroes.show()

# display(sdf_avengers_new)
sdf_avengers_new.show()


+---+---------------+
| ID|           Hero|
+---+---------------+
|  1|Captain America|
|  2|       Iron Man|
|  3|      Spiderman|
|  4|        Ant-Man|
|  5|    Black Widow|
|  6|        Hawkeye|
+---+---------------+

+---+---------+--------+--------+--------------+
| ID|FirstName|LastName|Hometown|Favorite Color|
+---+---------+--------+--------+--------------+
|  7|    Wanda|Maximoff| Sokovia|       Scarlet|
+---+---------+--------+--------+--------------+



In [7]:
# Two ways of combining data
sdf_avengers_expanded = (sdf_avengers
                         .union(sdf_avengers_new)
                         .join(sdf_avengers_heroes, on = "ID", how = 'left')
                        )
# NOTE: If a table you're joing with is relatively small a "broadcast join" may improve processing time while distributed

# display(sdf_avengers_expanded)
sdf_avengers_expanded.show()

                                                                                

+---+---------+--------+----------------+--------------+---------------+
| ID|FirstName|LastName|        Hometown|Favorite Color|           Hero|
+---+---------+--------+----------------+--------------+---------------+
|  1|    Steve|  Rogers|    Brooklyn, NY|          Blue|Captain America|
|  3|    Peter|  Parker|      Queens, NY|          Blue|      Spiderman|
|  2|     Tony|   Stark|  Manahattan, NY|          Gold|       Iron Man|
|  6|    Clint|  Barton|     Waverly, IA|        Purple|        Hawkeye|
|  5|  Natasha|Romanoff|Stalingrad, USSR|         Black|    Black Widow|
|  4|    Scott|    Lang|Coral Gables, FL|          Blue|        Ant-Man|
|  7|    Wanda|Maximoff|         Sokovia|       Scarlet|           NULL|
+---+---------+--------+----------------+--------------+---------------+



In [8]:
# Filtering is generally a good skill to be able to utilize

sdf_avengers_filtered = (sdf_avengers_expanded
                         .filter(F.col("Hero").isNotNull())
                         .filter(F.col("Favorite Color") != "Gold")
                         .withColumn("FirstInitial", F.substring("FirstName", 1,1))
                         .drop("FirstName")
                         .select("FirstInitial", "LastName", "Hometown", "Hero", F.col("Favorite Color").alias("FavoriteColor"))
                        )

# display(sdf_avengers_filtered)
sdf_avengers_filtered.show()

[Stage 15:>                                                         (0 + 2) / 2]

+------------+--------+----------------+---------------+-------------+
|FirstInitial|LastName|        Hometown|           Hero|FavoriteColor|
+------------+--------+----------------+---------------+-------------+
|           S|  Rogers|    Brooklyn, NY|Captain America|         Blue|
|           P|  Parker|      Queens, NY|      Spiderman|         Blue|
|           S|    Lang|Coral Gables, FL|        Ant-Man|         Blue|
|           N|Romanoff|Stalingrad, USSR|    Black Widow|        Black|
|           C|  Barton|     Waverly, IA|        Hawkeye|       Purple|
+------------+--------+----------------+---------------+-------------+



                                                                                

### Distributed Cloud Computing
At its core, Databricks is a platform for doing cloud computing. That's why you run notebooks on a "Cluster" -- it's a grouping of workers and a driver that process your code.
When you run code distributed across the worker nodes, it's very fast until the results are collected by the driver node. This notably happens in any displays, graphing, or
when you convert out of the Spark environment. This is why you install the PySpark SQL functions and use Spark DataFrames instead of something like Pandas DataFrames.

In [9]:
# Take a look at what your environment classifies sdf_avengers_filtered as

print(sdf_avengers_filtered.describe())

DataFrame[summary: string, FirstInitial: string, LastName: string, Hometown: string, Hero: string, FavoriteColor: string]


### Common Tasks
Below are some examples of common tasks that might need to be done on data in Databricks. I'll add to this section of this primer notebook as necessary.

In [10]:
# Verify data stucture using the "Data Profile" option on display() output (select the "+" button next to the word "Table" on the output of this cell)
# This is a unique feature of Databricks and may not be useful in all contexts.

# display(sdf_avengers)

In [11]:
# Aggregate sums of columns with a groupBy() on other columns

sdf_avengers_weights = (sdf_avengers_filtered
                        .withColumn('WeightLbs', F.lit(200)) # in this example, each avenger weighs 200 lbs for simplicity
                       )

sdf_avengers_sum = (sdf_avengers_weights
                    .groupBy('FavoriteColor')
                    .agg(F.sum(F.col('WeightLbs')).alias('TotalWeightLbs')) # the alias method is attached to the F.sum() function to rename the output of F.sum()
                   )

# display(sdf_avengers_sum)
sdf_avengers_sum.show()



+-------------+--------------+
|FavoriteColor|TotalWeightLbs|
+-------------+--------------+
|       Purple|           200|
|         Blue|           600|
|        Black|           200|
+-------------+--------------+



                                                                                

A complete list of functions within PySpark can be found here: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

### Window Functions
As with most SQL operations, windowing can dramatically improve your code's performance. I've found them to b the solution to a lot of
headaches with compute limitations when using PySpark. Below are some simple syntax examples of what might be useful to know.

In [12]:
# Importing necessary class
from pyspark.sql.window import Window as W

# Partitioning within the data
sdf_avengers_partition = (sdf_avengers_weights
                          .withColumn("ColorSumWeightLbs", 
                                      F.sum("WeightLbs").over(W.partitionBy(["FavoriteColor"]))
                                      )
                          )
# display(sdf_avengers_partition)
sdf_avengers_partition.show()



# Ordering data
sdf_avengers_ordered = (sdf_avengers_filtered
                        .withColumn("AlphaRank",
                                    F.rank().over(W.orderBy("LastName")) # NOTE: The parameter for these Window functions can be a string or list of strings
                                    )
                        .withColumn("AlphaRankWithinColor",
                                    F.rank().over(W.partitionBy("FavoriteColor").orderBy("LastName"))
                                    )
                        )
# display(sdf_avengers_ordered)
sdf_avengers_ordered.show()

24/08/02 15:37:14 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/08/02 15:37:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/08/02 15:37:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+--------+----------------+---------------+-------------+---------+-----------------+
|FirstInitial|LastName|        Hometown|           Hero|FavoriteColor|WeightLbs|ColorSumWeightLbs|
+------------+--------+----------------+---------------+-------------+---------+-----------------+
|           N|Romanoff|Stalingrad, USSR|    Black Widow|        Black|      200|              200|
|           S|  Rogers|    Brooklyn, NY|Captain America|         Blue|      200|              600|
|           P|  Parker|      Queens, NY|      Spiderman|         Blue|      200|              600|
|           S|    Lang|Coral Gables, FL|        Ant-Man|         Blue|      200|              600|
|           C|  Barton|     Waverly, IA|        Hawkeye|       Purple|      200|              200|
+------------+--------+----------------+---------------+-------------+---------+-----------------+



24/08/02 15:37:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/08/02 15:37:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/08/02 15:37:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/08/02 15:37:15 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/08/02 15:37:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/08/02 15:37:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+------------+--------+----------------+---------------+-------------+---------+--------------------+
|FirstInitial|LastName|        Hometown|           Hero|FavoriteColor|AlphaRank|AlphaRankWithinColor|
+------------+--------+----------------+---------------+-------------+---------+--------------------+
|           N|Romanoff|Stalingrad, USSR|    Black Widow|        Black|        5|                   1|
|           S|    Lang|Coral Gables, FL|        Ant-Man|         Blue|        2|                   1|
|           P|  Parker|      Queens, NY|      Spiderman|         Blue|        3|                   2|
|           S|  Rogers|    Brooklyn, NY|Captain America|         Blue|        4|                   3|
|           C|  Barton|     Waverly, IA|        Hawkeye|       Purple|        1|                   1|
+------------+--------+----------------+---------------+-------------+---------+--------------------+



                                                                                

### Databricks Details

__Workspace__ is the term used to describe the file structure around notebooks in Databricks. They're generally organized by the original author of the notebooks, but companies can handle this differently if they wish.


__Repos__ or __Repositories__ reference mostly Github or Azure DevOps repositories of code. This is a way to maintain version controlling and organized development/implementation of code. In Databricks, they are stored as a type of "Workspace".

__Catalog__ is the term used in Databricks for the database storage accessible natively within Databricks. In this notebook, I use a generic example Spark Dataframe created manually. 

__Uploading / downloading notebooks__ is fairly straightforward. If you've ever used Jupyter Notebooks within Python before, .IPYNB files are one of the primary ways to download a notebook. This option is within the "File" menu visible when viewing a notebook.
To upload, right-click within the intended workspace, and selected "Import". From there you can drag-and-drop, or select a number of file types to be used within Databricks.

__Cloning__ is a key feature of Databricks file management. You can essentially think of it as a Copy/Paste for the entire notebook. This can be usefully when you want to perhaps make edits to someone else's notebook you don't have
edit permissions for, or when duplicating large sections of code in one of your own previously developed notebooks. Try cloning this notebook to your own workspace now.

__Workflows/Jobs__ are something I used extensively in previous roles to automated when Notebooks are run in the case of regular data pipelining operations. They are viewable on the "Workflows" tab on the left side of the screen, and new Workflows for a specific notebook
can easily be made with the "Schedule" button at the top right corner of the screen while viewing a notebook. There are some nuances to workflows and the clusters they use, but we can go deeper into that at a later date if necessary.

__Compute__ is the source of setting up and monitoring clusters within Databricks. For most users, monitoring is only necessary when bottlenecking is occuring. Data engineers may use this information for optimizing pipeline functionality.

_Additional resources from Databricks:_ https://docs.databricks.com/notebooks/notebooks-use.html