# Exploring PySpark

This notebook covers some details on PySpark, and shows example code of how you can perform different functions / actions

#### 0. What is Apache Spark?

Spark is a big data processing framework (open-source), that improves upon previous Hadoop Map-Reduce solutions that existed, like Hive, by processing data **in-memory** and distributing the tasks amongst multiple workers (nodes), whilst being controlled and co-oridnated by a driver. This means smaller chuncks of data can be read, processed, and then collected together when needed, to perform data transformation or analytics.

The nodes in a cluster are abstracted, which means the individual nodes are not addressed directly.

#### 1. So what is `PySpark`?

PySpark is the Python API for Spark, which is originally written in/for Scala. It allows for the benefit of using Python with Spark, which is easier to pick up than a langauge such as Scala. This means you can you can use spark within your python scripting to leverage big data, and uncertake your transformations, analytics, ML etc.

#### 2. The Difference between `Local`, `Client` and `Cluster` mode

Local mode is simply running spark locally, like on your laptop. There is no submission to a cluster of machines. Only used for small, local development with limited data sizes.

In Client mode, the driver runs on the machine from where the spark application is submitted, so it could be your local laptop for example. The driver still coordinates & executes tasks on the cluster. The Client sends tasks to the cluster and takes results back. Its a good mode for development, and or debugging applications. It also provides easier access to logs from the application.

Cluster mode is where the driver is actually a node from the cluster, as well as the actual workers.<br>
The driver still does the same tasks.<br>
The client machine submits the Spark application to the cluster manager, and the cluster manager takes care of running the application on the cluster.<br>
This mode is suitable for production environments where the cluster manager handles resource allocation and scheduling of tasks.

#### 3. What is Fault Tolerance?

Spark ensures fault tolerance of data through it's lineage information. That allows it to recompute lost data rather than replicating that data across multiple nodes. At the core of fault tolerance is the RDDs (Resilient Distributed Datasets). RDDs are low-level, immutable and fault-tolerant collections of data that can be processed in parallel across a cluster. By maintaining the lineage of transformations applied to the data, it can re-compute that partition of lost data as needed, and thus not have to reprocess the entire dataset.

RDD lineage is represented as a DAG (Directed Acyclic Graph) of all the transformations applied to the base RDD. 

#### 4. RDD vs DataFrame ?

An RDD and a DataFrame are both storage organisation strategies for used by Apache Spark. An RDD is a collection of objects (data objects) across multiple nodes in a Spark cluster. A DataFrame is more similar to a standard SQL Database table, where an overlying schema on the data lays it out into columns and rows. The DataFrame API is useful, as with data laid out in columns, queries can be optimized for performance. 

An RDD distributes data across partitions across multiple nodes (servers etc.) as unstructured blocks of data. As this data is immutable, its never updated, but is recreated when changes are made. 

A DataFrame can take an RDD and add the schema structure to it.<br> 
Typically a DataFrame is best used for structured data (though can also be used for unstructured when needed), where as RDDs are more often used for Unstructured data (so you don't know the schemas etc that your data should conform to). 

`Datasets` : These offer a balance between the two.

Basically, DataFrames and Datasets are built upon RDDs, which is a core component of Spark.

#### 5. Performance : Spark 2.x vs Spark 3.x 

It used to be the case that, in Spark versions 2.x.x it was typically faster to use Scala over the Python or SQL APIs. Since Spark 3.x that is no longer true. Python & SQL, with the use of dataframes can even be faster now in some cases, or at the least, performance differences are negligible.

#### 6. What is Lazy Evaluation in Spark?

This is where as code runs, spark is not executing the transformations until an action is called, but instead building an execution plan to process the transformations in the most optimised way it can. Since Spark 3.0, it can also include measures now to rectify potential skew in the distributed data. But, as an action is called, for an example like a count() or a show(), or write data somewhere, then it executes the transformations and steps built into its plan, and thus its lazy evaluation. 

#### 7. What is Shuffling?

This is where, as we perform certain transformations or actions, data from across different nodes, needs to be moved to the same node as other corresponding data in order to perform the task, for example, joins, or group bys etc. This is known as shuffling, as data needs to move between nodes (shuffled) across the network. Shuffling isnt always avoidable, but steps can be taken to reduce it / minimise it. Things like bucketing and sorting (so you shuffle the data once upfront on certain key join keys you will often use, for example a Customer ID), can be used to have similar data in the same nodes and avoid future shuffling in your execution plan.

#### 8. YARN and why its used

YARN (Yet Another Resource Negotiator) is a resource management layer that serves as the cluster manager, so Spark can manage resources and schedule applications etc.<br>

This scheduler is in the form of identifying applications, jobs/tasks within them, what resources they need from the cluster, and are those resources available as they could be used by other competing jobs. For example, as tasks in one job finish that may release resources back to the cluster, then those resources can be targeted to another job in a queue waiting for them.

The application manager from YARN manages the acceptance, restart and completion of applications in the cluster. The driver acts as the application master in YARN mode. The ResourceManager allocates containers across various nodes in the cluster. The NodeManagers on those nodes launch the executors within those containers. The Spark driver coordinates with the executors to execute tasks. Executors process data and perform computations, storing intermediate results in memory or on disk as needed.

#### 9. Out of Memory Errors in Spark 

This is viewed through two lenses.<br>

The Driver, and the Executors. If the driver faces memory issues, which can happen during actions like `collect()` which pull all the data back to the driver node, is when the driver node does not have enough memory specified in the configs to hold the data in memory. Thus, your options would be to increase driver memory, or make code changes to work with smaller subsets of the data before pulling back to the driver.

For Executor memory, there can be a few different causes, but often it can be down to YARN memory overhead. This is where a portion of an executors memory is dedicated to off-heap storage. It stores things liike internal strings, internal objects, or objects for non scala languages lwhen using R or Python etc. So, if you see an error like `YARN killed the container due to memory limits` this might mean you need to reconfigure the (default) settings for how much executor memory is reserved for YARN memory.

High concurrency errors are where too many cores are assigned to each executor. If you have multiple cores on an executor, the memory of that executor is divided amongst them. This can potentially lead to memory exhaustion. The general guidelines for Apache Spark are four or five cores per executor, so that a machine's capacity isnt exhausted. 

Large partitions can also produce this issue. When a partition of your data, is significantly larger than others, it can lead to these issues. You may need to consider turning larger partitions into smaller ones, or increasing memory of the executors. 

#### 10. Debugging slow applications 

When you have a spark application that is performing slowly, there's a good chance a bottleneck exists somewhere.<br>
You can use the Spark UI and the Logs to try and identify which parts of the execution plan of the jobs are taking the longer time. You can follow the statistics captured on those tasks and use those from the UI to help debug.

Typically, this can help identify where one particular task could be taking a long time (larger partition compared to rest fo data), or where lots of shuffling might be occuring. You can then go back to your code, and look at things like calling an action after a certain transformation in the code, so you can help pinpoint issues, and then work on the code optimisation in the right place. 

Equally, you may want to use a UI for the cluster manager too, because it may be that your application is not getting the resources it requires, and is simply hanging in a accepted state, rather than running. The logs can help identify this.

#### 11. Starting a PySpark session interactively from the shell

So, in your shell, you can execute<br>
```
pyspark
```
which would start an interactive PySpark shell (assuming the relative installs and configs are place)<br>

<img src="./images/pyspark_shell.png">

You can then exit this shell using:

```
exit()
```

#### 12. Creating a local PySpark session in your Python Code

Note, when starting a spark session, it can take up to a few minutes at times to launch, depending on the setup being used

In [1]:
# imports 
from pyspark.sql import SparkSession 

spark = SparkSession.builder\
            .appName("my_local_spark_session")\
            .master("local")\
            .getOrCreate() 

# let's print the details of our local spark session 
print(spark.version) 

your 131072x1 screen size is bogus. expect trouble
24/05/15 14:58:11 WARN Utils: Your hostname, DCollins-Laptop1 resolves to a loopback address: 127.0.1.1; using 172.26.39.146 instead (on interface eth0)
24/05/15 14:58:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/15 14:58:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


3.4.1


In [None]:
# close the spark session 
spark.stop() 

#### 13. Spark Context

Spark Context establishes a connection to the spark cluster. It can connect to various cluster managers like YARN, Mesos, or a standalone Spark cluster. <br>

It's responsible for submitting jobs to the cluster, and handles the scheduling and distribution of tasks from your application to the cluster. Equally, it holds the configuration management of your spark application running on the cluster. 

It can be used to create RDDs etc. 

Basically, think of it as the main entry point for spark functionality, and was the traditional approach. SparkSession (introduced in Spark 2.x) is a unified interface that combines Spark's various functionalities into a single entry point. SparkSession integrates SparkContext and provides a higher level API for working with Spark.

In [6]:
# imports 
from pyspark.sql import SparkSession 

spark = SparkSession.builder\
            .appName("my_local_spark_session")\
            .master("local")\
            .getOrCreate() 

sc = spark.sparkContext 

print(f"Spark UI: {sc.uiWebUrl}") # Url of the Spark Web UI
print(f"Spark Application ID: {sc.applicationId}") # ID of the spark application 
print(f"Spark App Start Time: {sc.startTime}") # Start time of the application 
print(f"Spark default Paralleselism: {sc.defaultParallelism}") # Default parallelelism level 
print(f"Spark default min partitions: {sc.defaultMinPartitions}") # Default minimum partitions 

print("=" * 100)
# Status info
print(sc.statusTracker)

# close
spark.stop() 

24/05/15 12:33:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


Spark UI: http://172.26.39.146:4041
Spark Application ID: local-1715772804723
Spark App Start Time: 1715772804647
Spark default Paralleselism: 1
Spark default min partitions: 1
<bound method SparkContext.statusTracker of <SparkContext master=local appName=my_local_spark_session>>


#### 14. Creating a more complex Spark application where you need to specify configs and provide JAR files for additional functionality 

* You can specify configs explicitly during the session builder of your application
* you can proivde things like JAR files or Python code to be sent to each node of the cluster during the application, so you can do additional functionality like JDBC connections to databases, or using external code like AWS Deequ

In [None]:
# below covers some PySpark examples, when working with a cluster etc
# imports
from pyspark.sql import SparkSession

useJarFiles = [
    's3://my_bucket_location/folder1/jars/dummy1.jar',
    's3://my_bucket_location/folder1/jars/dummy2.jar'
]
jarList = ",".join(useJarFiles) 

# Python version settings, this allows us to target an executor environment to match our driver 
pyspark_deps = f"s3://user/spark/shared/lib/pyspark4.8-deps/environment.tar.gz#environment"

# build spark session 
def getSpark(
        appName: str,
        driverMemory: str = "2G",
        executorMemory: str = "4G",
        executorCores: str = "5",
        queue: str = 'default',
        addJarFiles: list = [r"s3://user/spark/jdbc/jarsFiles/postgresql-42.6.0.jar"]
) -> object:
    """
    Simple function to return a spark session through object `spark`
    """
    spark = SparkSession\
            .builder\
            .appName(appName)\
            .enableHiveSupport()\
            .master("yarn")\
            .config("spark.driver.memory", driverMemory)\
            .config("spark.executor.memory", executorMemory)\
            .config("spark.yarn.queue", queue)\
            .config("spark.dynamicAllocation.enabled", "true")\
            .config("spark.dynamicAllocation.initialExecutors", "0")\
            .config("spark.dynamicAllocation.maxExecutors", "16")\
            .config("spark.dynamicAllocation.minExecutors", "1")\
            .config("spark.executors.cores", executorCores)\
            .config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_ONLY")\
            .config("spark.sql.caseSensitive", "false")\
            .config("spark.sql.parquet.writeLegacyFormat", "true")\
            .config("spark.sql.sources.partitionOverwriteMode", "dynamic")\
            .config("hive.exec.dynamic.partition.mode", "nonstrict")\
            .config("spark.shuffle.service.enabled", "true")\
            .config("spark.dynamicAllocation.InitialExecutors", "0")\
            .config("spark.yarn.dist.archives", pyspark_deps)\
            .config("spark.jars", addJarFiles)\
            .getOrCreate()
    return spark


try:
    spark = getSpark(appName='Pyspark_EMR_Test') 
    print("PySpark session available through `spark` object")
except Exception as e:
    print(e)

# show databases 
databases = spark.sql("SHOW DATABASES")
databases.toPandas() 

spark.stop()

#### 15. Reading in CSV files to a Spark DataFrame 

In [None]:
# import 
from pyspark.sql import SparkSession

# create session 
spark = SparkSession.builder\
            .appName("my_local_spark_session")\
            .master("local")\
            .getOrCreate() 
sc = spark.sparkContext 

#print(spark.version)

Now, from the `/data` subfolder, read in the customer CSV file to a dataframe

In [2]:
filePath = "./data/customerMasterExtract.csv"
customer_df = (spark.read
    .option("delimiter", ",") # sets the delimiter to `,`
    .option("header", "true") # informs pyspark that row 1 should be treated as the column headings of the data 
    .option("inferSchema", "true") # lets spark infer the schema of the data itself, rather than us expliitly creating a schema
    .csv(filePath) 
)

customer_df.show(5, truncate=False) 

                                                                                

+----------+---------+--------+-------------+------------------------+--------+-------------------+----------+-------------------+
|customerID|firstName|lastName|rewardsMember|emailAddress            |postcode|profession         |dob       |customerJoined     |
+----------+---------+--------+-------------+------------------------+--------+-------------------+----------+-------------------+
|10000     |Helen    |Hope    |true         |gordon49@example.com    |N16 8GZ |Quantity surveyor  |1962-09-18|2002-01-28 13:27:36|
|10001     |George   |Hill    |false        |kgrant@example.org      |E35 0TP |Graphic Designer   |1984-10-02|2023-04-22 02:00:33|
|10002     |Hollie   |Morris  |true         |singhben@example.net    |M16 9GR |Roofer             |1985-08-19|2010-04-12 19:25:23|
|10003     |Carolyn  |Johnston|true         |barnesdawn@example.org  |TR8X 4YS|Pharmacy Technician|1958-11-02|1991-06-01 05:33:14|
|10004     |Roger    |Atkins  |true         |bernardstone@example.com|S50 0TD |Arch

#### 16. Creating a new column with a literal value

This is where you create a new column in the data, where you want all rows to have the same value.<br>
For example, lets take the above data frame and add a simple column called "cardHolder" and give every customer a default value of 'Y'

To do this, we need to import some extra PySpark methods/functions

*NOTE - In an actual script, you would do this all at the top, but for this demo, don't worry*

In [4]:
from pyspark.sql.functions import lit 

card_holder_customers = (customer_df   # specifies the base dataframe to transform from 
    .withColumn(                       # uses withColumn to create a new column 
        "cardHolder",                  # passes "cardHolder" as the new column name
        lit("Y")                       # gives each row the literal value 'Y' with the lit() method
    )
)

card_holder_customers.show(3, truncate=False) # using .show() calls an `action` which actually executes the transformation above

+----------+---------+--------+-------------+--------------------+--------+-----------------+----------+-------------------+----------+
|customerID|firstName|lastName|rewardsMember|emailAddress        |postcode|profession       |dob       |customerJoined     |cardHolder|
+----------+---------+--------+-------------+--------------------+--------+-----------------+----------+-------------------+----------+
|10000     |Helen    |Hope    |true         |gordon49@example.com|N16 8GZ |Quantity surveyor|1962-09-18|2002-01-28 13:27:36|Y         |
|10001     |George   |Hill    |false        |kgrant@example.org  |E35 0TP |Graphic Designer |1984-10-02|2023-04-22 02:00:33|Y         |
|10002     |Hollie   |Morris  |true         |singhben@example.net|M16 9GR |Roofer           |1985-08-19|2010-04-12 19:25:23|Y         |
+----------+---------+--------+-------------+--------------------+--------+-----------------+----------+-------------------+----------+
only showing top 3 rows



Note, since Version 3.3, there is now a new way to create multiple columns, under one `withColumn` method, rather than chaining individual ones

In [8]:
# create multiple columns via a dictionary map of column name & value 
from pyspark.sql.functions import col 

card_holder_customers_again = (
    customer_df                     # source dataframe 
    .withColumns(                   # use the `withColumns` method
        {                           # specify a dict of column names & values as key-pairs
            "cardHolder": lit("Y"),
            "staticNum": lit(10),
        }
    )
)
card_holder_customers_again.show(5, truncate=False) # show results 

+----------+---------+--------+-------------+------------------------+--------+-------------------+----------+-------------------+----------+---------+
|customerID|firstName|lastName|rewardsMember|emailAddress            |postcode|profession         |dob       |customerJoined     |cardHolder|staticNum|
+----------+---------+--------+-------------+------------------------+--------+-------------------+----------+-------------------+----------+---------+
|10000     |Helen    |Hope    |true         |gordon49@example.com    |N16 8GZ |Quantity surveyor  |1962-09-18|2002-01-28 13:27:36|Y         |10       |
|10001     |George   |Hill    |false        |kgrant@example.org      |E35 0TP |Graphic Designer   |1984-10-02|2023-04-22 02:00:33|Y         |10       |
|10002     |Hollie   |Morris  |true         |singhben@example.net    |M16 9GR |Roofer             |1985-08-19|2010-04-12 19:25:23|Y         |10       |
|10003     |Carolyn  |Johnston|true         |barnesdawn@example.org  |TR8X 4YS|Pharmacy 

#### 17. Re-partition Data

We can explore how many partitions our data has, and we can re-partition it where required

In [None]:
# current number of partitions
print(card_holder_customers.rdd.getNumPartitions()) 

Let's say, we want to partition on `profession`, which has relatively low cardinality

In [11]:
custs_partitioned_by_rewards = (
    card_holder_customers.repartition("profession")
)

We could even repartition by multiple columns if needed:

In [None]:
cust_partition_multiple = (
    card_holder_customers.repartition("profession", "postcode")
) # assuming those were columns you wished to partition the data by, and they had an even enough distribution of data 

You can also repartition to a fixed number, if you knew how many partitions you wanted, like so:

In [None]:
cust_partition_fixed_num = (
    card_holder_customers.repartition(5) 
)

#### 18. Joining Data

Often, we will want to join two (or more) dataframes together. Here, we can look at some options that align to the traditional SQL methods many will be familiar with:

* use the `customer_df` read in earlier
* read in the dummy transactions dataset

In [6]:
# check this is read in 
customer_df.show(3, truncate=False) 

+----------+---------+--------+-------------+--------------------+--------+-----------------+----------+-------------------+
|customerID|firstName|lastName|rewardsMember|emailAddress        |postcode|profession       |dob       |customerJoined     |
+----------+---------+--------+-------------+--------------------+--------+-----------------+----------+-------------------+
|10000     |Helen    |Hope    |true         |gordon49@example.com|N16 8GZ |Quantity surveyor|1962-09-18|2002-01-28 13:27:36|
|10001     |George   |Hill    |false        |kgrant@example.org  |E35 0TP |Graphic Designer |1984-10-02|2023-04-22 02:00:33|
|10002     |Hollie   |Morris  |true         |singhben@example.net|M16 9GR |Roofer           |1985-08-19|2010-04-12 19:25:23|
+----------+---------+--------+-------------+--------------------+--------+-----------------+----------+-------------------+
only showing top 3 rows



In [7]:
# transactions data 
txns_Path = "./data/dummy_txns.csv"
txns_df = (spark.read
    .option("delimiter", ",") # sets the delimiter to `,`
    .option("header", "true") # informs pyspark that row 1 should be treated as the column headings of the data 
    .option("inferSchema", "true") # lets spark infer the schema of the data itself, rather than us expliitly creating a schema
    .csv(txns_Path) 
)
txns_df.show(3, truncate=False) 

                                                                                

+----------+-------------------+---------+------+-----+----------+
|customerID|transaction_TS     |Product  |volume|Price|txn_amount|
+----------+-------------------+---------+------+-----+----------+
|142387    |2023-10-15 15:42:50|Projector|6.0   |300.0|1800.0    |
|126774    |2023-10-15 23:40:19|Mouse    |1.0   |8.0  |8.0       |
|26995     |2023-10-15 20:43:32|Projector|3.0   |300.0|900.0     |
+----------+-------------------+---------+------+-----+----------+
only showing top 3 rows



Join type 1 - INNER (note, this is the default implementation when not explicitly specified)

In [8]:
# some additional needed imports
from pyspark.sql.functions import upper, col 
 
# let's say we want to create a dataframe that has customer names of people who purchased a "mouse" from the two dataframes 
mouse_txns = (
    txns_df # source data
    .filter(upper(txns_df["Product"]) == "MOUSE") # apply where clause to restrict rows 
    .select("customerID", "product") # select only the columns of interest 
).distinct() # should ensure we only get back distinct records of customer id & mouse purchases 

# now, let's perform an inner join against the `customer_df` to pull back names of customers who purchased a mouse 

cust_bought_mouse = (
    customer_df # specify dataframe 1 
    .join(
        mouse_txns, # specify dataframe 2 
        customer_df["customerID"] == mouse_txns["customerID"], # specify the `ON` condition for the join 
        "inner" # specify join type 
    )
    .select( # select only certain columns after the join for output 
        customer_df["customerID"].alias("cust_id"), # selects customer_id from df1, renames it to `cust_id` via alias 
        customer_df["firstName"], customer_df["lastName"],
        mouse_txns["Product"].alias("product_bought")
    ) 
).distinct() # ensure there are no duplicate records on the output 

print(f"No. of customers who bought a mouse: {cust_bought_mouse.count()}") 
print("=" * 150)
print("Example join output:")
cust_bought_mouse.show(3, truncate=False) 

                                                                                

No. of customers who bought a mouse: 10511
Example join output:
+-------+---------+--------+--------------+
|cust_id|firstName|lastName|product_bought|
+-------+---------+--------+--------------+
|12741  |Bradley  |Wilson  |Mouse         |
|18436  |Marc     |Osborne |Mouse         |
|18986  |Lewis    |Williams|Mouse         |
+-------+---------+--------+--------------+
only showing top 3 rows



Left Join

Suppose we wish to keep all customers, but create a flag as to whether they bought a mouse or not, through a Y or N. We can do this via a left join, like so:

In [9]:
# import
from pyspark.sql.functions import when

# let's say we want to create a dataframe that has customer names of people who purchased a "mouse" from the two dataframes 
mouse_txns = (
    txns_df # source data
    .filter(upper(txns_df["Product"]) == "MOUSE") # apply where clause to restrict rows 
    .select("customerID", "product") # select only the columns of interest 
).distinct() # should ensure we only get back distinct records of customer id & mouse purchases 


lag_mouse_purchase = (
    customer_df.join(
        mouse_txns,
        customer_df["customerID"] == mouse_txns["customerID"],
        "left"
    ).select(
        customer_df["customerID"].alias("cust_id"), # selects customer_id from df1, renames it to `cust_id` via alias 
        customer_df["firstName"], customer_df["lastName"],
        mouse_txns["Product"].alias("product_bought")
    )
).distinct() # ensure no duplicate customers

# create a Y / N marker for if mouse was purchased 
mouse_marker = lag_mouse_purchase.withColumn(
        "bought_mouse", # name of new column 
        when(lag_mouse_purchase["product_bought"].isNotNull(), 'Y') # note, you can chain multiple "when" here via dot notation 
        .otherwise("N") # case when style condition to determine column value
)

# show example:
mouse_marker.show(3, truncate=False)

print("=" * 150) 
# filter to just NO mouse purchased 
no_mouse = mouse_marker.filter(
    mouse_marker["bought_mouse"] == "N"
).count() 

print(f"No. of Customer who DID NOT buy a mouse: {no_mouse}")

                                                                                

+-------+---------+--------+--------------+------------+
|cust_id|firstName|lastName|product_bought|bought_mouse|
+-------+---------+--------+--------------+------------+
|10340  |Denis    |Jones   |null          |N           |
|10683  |Emily    |Evans   |null          |N           |
|10702  |Gareth   |Pearson |null          |N           |
+-------+---------+--------+--------------+------------+
only showing top 3 rows



[Stage 32:>                                                         (0 + 1) / 1]

No. of Customer who DID NOT buy a mouse: 129489


                                                                                

Cross Join

Let's take a look at a cross join. Also known as a cartesian product join. This is where each combination of rows from two dataframes is returned. Often, this could be used in things like creating a date map with a list of products, before then wanting to join to this results from another source. Here's an example:

In [13]:
# import 
from pyspark.sql.functions import to_date

# Create a sample DataFrame with string dates
data1 = [("2024-01-01",), ("2024-02-01",), ("2024-03-01",), ("2024-04-01",)]
columns1 = ["Date"]
date_df = (spark.createDataFrame(data1, columns1)).withColumn("Date", to_date(col("Date"), "yyyy-MM-dd"))
date_df.show() 

print("=" * 150) 

# create simple products dataframe 
data2 = [("Hat",), ("Belt",)]
columns2 = ["Product"] 
product_df = spark.createDataFrame(data2, columns2) 
product_df.show()

+----------+
|      Date|
+----------+
|2024-01-01|
|2024-02-01|
|2024-03-01|
|2024-04-01|
+----------+

+-------+
|Product|
+-------+
|    Hat|
|   Belt|
+-------+



In [14]:
cross_join_df = (
    date_df.crossJoin(product_df)
).orderBy(col("Date"))

cross_join_df.show() 

+----------+-------+
|      Date|Product|
+----------+-------+
|2024-01-01|    Hat|
|2024-01-01|   Belt|
|2024-02-01|    Hat|
|2024-02-01|   Belt|
|2024-03-01|    Hat|
|2024-03-01|   Belt|
|2024-04-01|    Hat|
|2024-04-01|   Belt|
+----------+-------+



Full Outer Join

This is where we can join tables, but keep all records! 

In [33]:
from pyspark.sql.functions import coalesce, col

# example dataframe 1 
test1 = [("Dan", 31), ("Simon", 30), ("Tim", 40)]
cols1 = ["Name", "Age"]
df1 = spark.createDataFrame(test1, cols1) 
#df1.show() 

# example dataframe 2 
test2 = [("Dan", 98), ("Sally", 100), ("Tim", 92)]
cols2 = ["Name", "Score"]
df2 = spark.createDataFrame(test2, cols2) 
#df2.show() 

# Full Outer Join - should see Simon have no score, & Sally have no Age 
full_join_df = (
    df1.join(
        df2, 
        df1["Name"] == df2["Name"],
        "full_outer" # specify the full condition 
    ).select(
        df1["Name"].alias("Name1"),
        df2["Name"].alias("Name2"),
        df1["Age"], df2["Score"]
    ).withColumn(
        "Name",
        coalesce(col('Name1'), col('Name2'))
    ).select(
        "Name", "Age", "Score"
    )
)

# execute & show output 
full_join_df.show() 

+-----+----+-----+
| Name| Age|Score|
+-----+----+-----+
|  Dan|  31|   98|
|Sally|null|  100|
|Simon|  30| null|
|  Tim|  40|   92|
+-----+----+-----+



Left Semi Join

This is where we can use Table B, to filter Table A and retain only records from Table A that are also in Table B 

In [34]:
# Create DataFrames for Users and Purchases
data_users = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")]
data_purchases = [(1, "Book"), (2, "Pen"), (5, "Notebook")]

columns_users = ["id", "name"]
columns_purchases = ["user_id", "item"]

df_users = spark.createDataFrame(data_users, columns_users)
df_purchases = spark.createDataFrame(data_purchases, columns_purchases)

# Perform Left Semi Join
df_purchasers = df_users.join(df_purchases, df_users.id == df_purchases.user_id, "left_semi")

# Show the result - IDs 1 & 2 should return, as they are in the `df_purchases` dataframe, but 3 & 4 are not 
df_purchasers.show()

+---+-----+
| id| name|
+---+-----+
|  1|Alice|
|  2|  Bob|
+---+-----+



Left Anti Join 

This is where we use Table B, to filter Table A and only return rows from Table A that ARE NOT in Table B 

In [35]:
# Create DataFrames for Users and Purchases
data_users = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "David")]
data_purchases = [(1, "Book"), (2, "Pen"), (5, "Notebook")]

columns_users = ["id", "name"]
columns_purchases = ["user_id", "item"]

df_users = spark.createDataFrame(data_users, columns_users)
df_purchases = spark.createDataFrame(data_purchases, columns_purchases)

# Perform Left Anti Join
df_non_purchasers = (df_users.join(
    df_purchases, 
    df_users.id == df_purchases.user_id, 
    "left_anti")
)

# Show the result - this time, IDs 3 & 4 should appear, as they are not in `df_purchases` 
df_non_purchasers.show()

+---+-------+
| id|   name|
+---+-------+
|  3|Charlie|
|  4|  David|
+---+-------+



### Best Practices on Joins

1. `Understand Your Data`<br>
**Pre-Inspect Data:** Familiarize yourself with the data, its size, distribution, and the columns you plan to join on. Understanding the nature of your data can help in choosing the most efficient join type and strategy.<br>
**Check for Duplicates:** Ensure that the keys you're joining on don't have unexpected duplicates, which can cause inflated results and performance issues.<br>

2. `Optimize Data Size`<br>
**Filter Early:** Apply filters to reduce the size of DataFrames before joining. Smaller DataFrames require less time and memory to join.<br>
**Select Necessary Columns:** Only select the columns needed for analysis before joining to reduce data shuffle.<br>

3. `Manage Skewness`<br>
**Detect Skew:** Identify if your data is skewed, meaning some keys have significantly more data than others. Skewness can lead to unequal distribution of data and can severely impact join performance.<br>
**Handle Skew:** If skew is detected, consider using techniques like salting or broadcasting smaller tables to minimize its impact.<br>

4. `Use Broadcast Joins for Small Tables`<br>
**Broadcast Small DataFrames:** If one of your DataFrames is small enough, use a broadcast join to send it to all nodes. This avoids shuffling large DataFrames across the network.

#### 18.1 - Broadcasting

In [None]:
# Broadcasting example 
from pyspark.sql.functions import broadcast

df_large.join(broadcast(df_small), join_condition)

5. `Choose the Right Join Type`<br>
**Appropriate Join Type:** Use the join type that suits your data and requirements. For instance, if you only need matching rows, consider using an inner join; if you need to retain all rows from one DataFrame, consider using an outer join.<br>

6. `Partitioning and Clustering`<br>
**Effective Partitioning:** Ensure your DataFrames are partitioned effectively before joining. Partitioning on the join key can reduce shuffling.
Clustering: If possible, cluster your data on the join key. Clustering can significantly speed up join operations as related data is physically stored together.<br>

7. `Monitor and Tune`<br>
**Examine Execution Plans:** Use .explain() to understand the physical and logical plan Spark is using to execute your join. Look for opportunities to reduce shuffles and stages.<br>
**Tune Spark Configuration:** Adjust Spark configurations like spark.sql.shuffle.partitions to better suit your job's requirements and available cluster resources.<br>

8. `Avoid Cartesian Joins`<br>
**Be Cautious with Cross Joins:** Only use cross joins when absolutely necessary, as they can produce an extensive number of rows and significantly degrade performance.

In [9]:
spark.stop() # closes spark session 

-------

#### 19. Aggregations & Summaries

Here, we take a look at how we can perform Group Bys, creating aggregated metrics such as counts, sums, min, max & mean<br>

It also covers casting, rounding and aliases for the output data

In [1]:
# import 
from pyspark.sql import SparkSession
from pyspark.sql.functions import * 
from pyspark.sql.types import DecimalType

# create session 
spark = SparkSession.builder\
            .appName("my_local_spark_session")\
            .master("local")\
            .getOrCreate() 
sc = spark.sparkContext 

# read in some data to a dataframe 
# transactions data 
txns_Path = "./data/dummy_txns.csv"
txns_df = (spark.read
    .option("delimiter", ",") # sets the delimiter to `,`
    .option("header", "true") # informs pyspark that row 1 should be treated as the column headings of the data 
    .option("inferSchema", "true") # lets spark infer the schema of the data itself, rather than us expliitly creating a schema
    .csv(txns_Path) 
)
txns_df.show(3, truncate=False) 

your 131072x1 screen size is bogus. expect trouble
24/05/31 15:28:31 WARN Utils: Your hostname, DCollins-Laptop1 resolves to a loopback address: 127.0.1.1; using 172.26.39.146 instead (on interface eth0)
24/05/31 15:28:31 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/31 15:28:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+----------+-------------------+---------+------+-----+----------+
|customerID|transaction_TS     |Product  |volume|Price|txn_amount|
+----------+-------------------+---------+------+-----+----------+
|142387    |2023-10-15 15:42:50|Projector|6.0   |300.0|1800.0    |
|126774    |2023-10-15 23:40:19|Mouse    |1.0   |8.0  |8.0       |
|26995     |2023-10-15 20:43:32|Projector|3.0   |300.0|900.0     |
+----------+-------------------+---------+------+-----+----------+
only showing top 3 rows



Let's take a look at doing Group By, alongside the max, min & avg transaction amount, for each product!

In [7]:
product_stats_df = (
    txns_df # specify source data frame 
    .select("Product", "volume", "Price", "txn_amount") # select initial columns
    .groupBy("Product") # set a group by on `product` 
    .agg(
        count(col("txn_amount")).alias("no_of_transactions"), # create an agg for metrics & round to 2 decimal places,cast as decimal type
        round(sum(col("txn_amount")), 2).cast(DecimalType(10, 2)).alias("sum_of_transactions"), # total sum
        round(min(col("txn_amount")), 2).cast(DecimalType(10, 2)).alias("min_transaction_amt"), # min transaction amount
        round(max(col("txn_amount")), 2).cast(DecimalType(10, 2)).alias("max_transaction_amt"), # max amount 
        round(avg(col("txn_amount")), 2).cast(DecimalType(10, 2)).alias("avg_transaction_amt")  # avg amount 
    )
    .select(
        "Product", "no_of_transactions", "sum_of_transactions", 
        "min_transaction_amt", "max_transaction_amt", "avg_transaction_amt"
    )
    .orderBy(col("Product").desc()) # order by `product` descending 
)

product_stats_df.show(truncate=False) 



+---------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|Product              |no_of_transactions|sum_of_transactions|min_transaction_amt|max_transaction_amt|avg_transaction_amt|
+---------------------+------------------+-------------------+-------------------+-------------------+-------------------+
|WiFi Range Extender  |11097             |1169790.00         |30.00              |180.00             |105.41             |
|USB Flash Drive 16gb |11139             |156807.00          |3.99               |23.94              |14.08              |
|Tablet               |11075             |4409560.00         |115.00             |690.00             |398.15             |
|Projector            |11213             |11743500.00        |300.00             |1800.00            |1047.31            |
|Printer              |10924             |2679390.00         |70.00              |420.00             |245.28             |
|Office Chair St

                                                                                

#### 20. Window Functions

Here, we take a look at window functions. These offer a powerful way to perform calculations, such as `lead` & `lag` or `row_number` or `rank`<br>
These can also be achieved through self-joins, but window functions require much less code


Using the transaction dataframe, let's look at a window function, to find the *most* recent transaction per customer, for the date in question

First, let's confirm that we do indeed have some customers with more than 1 transaction in a day ...

In [8]:
multi_txns = (
    txns_df
        .select("customerID", "transaction_TS")
        .groupBy("customerID")
        .agg(
            countDistinct("transaction_TS").alias("no_of_txns")
        )
        .select("customerID", "no_of_txns")
        .orderBy(col("no_of_txns").desc())
)

# show top 5 wos 
multi_txns.show(5, truncate=False) 

[Stage 17:>                                                         (0 + 1) / 1]

+----------+----------+
|customerID|no_of_txns|
+----------+----------+
|114542    |9         |
|137076    |9         |
|108567    |9         |
|94604     |8         |
|33420     |8         |
+----------+----------+
only showing top 5 rows



                                                                                

great!<br>
we can see from above that we have instances of customers purchasing more than once during the day. Let's use a window function, to rank the purchases by the timestamp they happened, and keep the most recent transaction per customer

In [9]:
# import the window method 
from pyspark.sql.window import Window 

In [13]:
# create a "window" definition. This is the PARTITION BY & ORDER BY within SQL 
windowDef = (
    Window.partitionBy("customerID").orderBy(col("transaction_TS").desc())
)

# apply the window function within row_number() 
cust_txns_ranked = (
    txns_df
        .withColumn(     # new column
            "txn_rank",  # define it's name 
            row_number().over(windowDef) # specify it's value as using row_number(), by desc txn timestamp, so most recent has value = 1
        )
)
# find latest txn per customer with filter to `txn_rank` = 1 
cust_last_txn = (
    cust_txns_ranked
        .filter(col("txn_rank") == 1)
        .drop("cust_txns_ranked")
)

# no show the results 
cust_last_txn.show(5, truncate=False) 

+----------+-------------------+--------------------+------+-----+----------+--------+
|customerID|transaction_TS     |Product             |volume|Price|txn_amount|txn_rank|
+----------+-------------------+--------------------+------+-----+----------+--------+
|10000     |2023-10-15 16:40:44|Office Chair Premium|2.0   |250.0|500.0     |1       |
|10004     |2023-10-15 14:22:21|USB Flash Drive 16gb|3.0   |3.99 |11.97     |1       |
|10005     |2023-10-15 02:34:08|Keyboard            |5.0   |35.0 |175.0     |1       |
|10006     |2023-10-15 18:24:22|Extension Cable     |6.0   |4.99 |29.94     |1       |
|10007     |2023-10-15 09:29:21|USB Flash Drive 16gb|1.0   |3.99 |3.99      |1       |
+----------+-------------------+--------------------+------+-----+----------+--------+
only showing top 5 rows



To prove the case, take customerID `10000` from above, and pull their records from the original dataframe

In [12]:
checkout = (
    txns_df
        .filter(col("customerID") == 10000)
)
checkout.show(truncate=False) 

+----------+-------------------+--------------------+------+-----+----------+
|customerID|transaction_TS     |Product             |volume|Price|txn_amount|
+----------+-------------------+--------------------+------+-----+----------+
|10000     |2023-10-15 01:13:36|Printer             |3.0   |70.0 |210.0     |
|10000     |2023-10-15 16:40:44|Office Chair Premium|2.0   |250.0|500.0     |
+----------+-------------------+--------------------+------+-----+----------+



So, we can see from above, the ROW_NUMBER & filter to latest transaction has worked as expected

In [14]:
spark.stop() # close spark session 

------------

#### 21. Using Spark SQL within PySpark framework

We can make use of Spark SQL from within the PySpark framework, by using the `.sql()` method, which can run queries

Let's take a look at a simple example:

In [15]:
# import 
from pyspark.sql import SparkSession

# create session 
spark = SparkSession.builder\
            .appName("my_local_spark_session")\
            .master("local")\
            .getOrCreate() 
sc = spark.sparkContext 

# read in some data to a dataframe 
# transactions data 
txns_Path = "./data/dummy_txns.csv"
txns_df = (spark.read
    .option("delimiter", ",") # sets the delimiter to `,`
    .option("header", "true") # informs pyspark that row 1 should be treated as the column headings of the data 
    .option("inferSchema", "true") # lets spark infer the schema of the data itself, rather than us expliitly creating a schema
    .csv(txns_Path) 
)
txns_df.show(3, truncate=False) 

+----------+-------------------+---------+------+-----+----------+
|customerID|transaction_TS     |Product  |volume|Price|txn_amount|
+----------+-------------------+---------+------+-----+----------+
|142387    |2023-10-15 15:42:50|Projector|6.0   |300.0|1800.0    |
|126774    |2023-10-15 23:40:19|Mouse    |1.0   |8.0  |8.0       |
|26995     |2023-10-15 20:43:32|Projector|3.0   |300.0|900.0     |
+----------+-------------------+---------+------+-----+----------+
only showing top 3 rows



Now, in order to use SQL on a dataframe in PySpark, you need to register a temporary view. This will allow for SQL to be performed upon the dataframe. 

***(Note, some installations of PySpark can allow for direct SQL on dataframes without this)***

In [18]:
# create temp view 
txns_df.createOrReplaceTempView("txns_data") # you name the view from which you wish your sql query to reference 

simple_query = """
SELECT
    PRODUCT, 
    SUM(VOLUME) AS NUM_SOLD
FROM txns_data
GROUP BY PRODUCT
ORDER BY NUM_SOLD DESC 
"""

test_sql_df = spark.sql(simple_query) # this executes the Spark SQL query above

test_sql_df.show(5, truncate=False) # show results 

+--------------------+--------+
|PRODUCT             |NUM_SOLD|
+--------------------+--------+
|Laptop Stand        |39797.0 |
|Extension Cable     |39433.0 |
|USB Flash Drive 16gb|39300.0 |
|Docking Station     |39236.0 |
|Laptop              |39162.0 |
+--------------------+--------+
only showing top 5 rows



Now, there is a change here. Introduced in `Spark 3.4`, you can directly perform queries on the dataframe itself, without needing to create a temporary view first. <br>

We can use parameters to specify the dataframe to target in the query!

let's take a look ...

In [24]:
another_sql_df = (
    spark.sql(
        "SELECT PRODUCT, SUM(VOLUME) AS NUM_SOLD FROM {source_df} GROUP BY PRODUCT ORDER BY NUM_SOLD DESC", # the query
        source_df=txns_df    # specify the source dataframe (without creating a view in the session) 
    )
)
another_sql_df.show(5, truncate=False) # show some results 

+--------------------+--------+
|PRODUCT             |NUM_SOLD|
+--------------------+--------+
|Laptop Stand        |39797.0 |
|Extension Cable     |39433.0 |
|USB Flash Drive 16gb|39300.0 |
|Docking Station     |39236.0 |
|Laptop              |39162.0 |
+--------------------+--------+
only showing top 5 rows



In [25]:
spark.stop() # close the spark session 

----------------------

#### 22. Writing Data Out

You can write data out to file storage in a variety of formats, for example, let's Read a CSV, and write it back out as Parquet

In [26]:
# import 
from pyspark.sql import SparkSession

# create session 
spark = SparkSession.builder\
            .appName("my_local_spark_session")\
            .master("local")\
            .getOrCreate() 
sc = spark.sparkContext 

# read in some data to a dataframe 
# transactions data 
txns_Path = "./data/dummy_txns.csv"
txns_df = (spark.read
    .option("delimiter", ",") # sets the delimiter to `,`
    .option("header", "true") # informs pyspark that row 1 should be treated as the column headings of the data 
    .option("inferSchema", "true") # lets spark infer the schema of the data itself, rather than us expliitly creating a schema
    .csv(txns_Path) 
)
txns_df.show(3, truncate=False) 

+----------+-------------------+---------+------+-----+----------+
|customerID|transaction_TS     |Product  |volume|Price|txn_amount|
+----------+-------------------+---------+------+-----+----------+
|142387    |2023-10-15 15:42:50|Projector|6.0   |300.0|1800.0    |
|126774    |2023-10-15 23:40:19|Mouse    |1.0   |8.0  |8.0       |
|26995     |2023-10-15 20:43:32|Projector|3.0   |300.0|900.0     |
+----------+-------------------+---------+------+-----+----------+
only showing top 3 rows



                                                                                

In [27]:
# write the `txns_df` out as parquet to the same file location as the CSV file 
output_path = "./data/parquet_dummy_txns"
txns_df.write.parquet(output_path) 

                                                                                

In [29]:
# read back in as test
df_parquet = spark.read.parquet(output_path)

df_parquet.show(5) 

+----------+-------------------+---------+------+-----+----------+
|customerID|     transaction_TS|  Product|volume|Price|txn_amount|
+----------+-------------------+---------+------+-----+----------+
|    142387|2023-10-15 15:42:50|Projector|   6.0|300.0|    1800.0|
|    126774|2023-10-15 23:40:19|    Mouse|   1.0|  8.0|       8.0|
|     26995|2023-10-15 20:43:32|Projector|   3.0|300.0|     900.0|
|    118641|2023-10-15 02:45:11|Projector|   3.0|300.0|     900.0|
|    112835|2023-10-15 17:39:18|Projector|   1.0|300.0|     300.0|
+----------+-------------------+---------+------+-----+----------+
only showing top 5 rows



In [30]:
spark.stop() # close spark session

--------

#### 23. Delta Tables & Partitioning

A key component of working with PySpark is how you save the outputs of your transformations. A powerful open-source format known as `delta` is a great option for a table-format of your datalakes & lakehouses. Some key reasons:

* Delta is an efficient, compressed format (utilises Parquet) that stores data as files, and stores relevant metadata about the files, so that you can have a "table" over files. This links back to Hive & HDFS from the Hadoop ecosystem many years back (but still in use today).

* Delta supports ACID properties, even though it is not a database like we are traditionally used to. This means we can perform actions like UPSERT on data, rather than just overwriting whole tables or adding new partitions, which is functionality that has existed for a while. 

* This also means that delta supports the concept of data versioning. The metadata log, keeps track of the changes, meaning you can move between historic versions of the data in the delta table when reading files etc. It also means that your datalake can support things like schema changes to tables, without needing to overwrite all existing files that hold the data etc.

Let's take a look at some examples of using Delta with PySpark (v 3.4):

In [1]:
# If you don't already have the Delta-Core installed, you'll need these in your environment where PySpark is installed 
# Delta-Lake 2.4.x is the compatible version with Apache Spark 3.4.x

#!pip install pip install delta-spark==2.4

In [None]:
import pyspark
from delta import *
from pyspark.sql.types import *
from delta.tables import *
from pyspark.sql.functions import *

#  Create a spark session with Delta
builder = pyspark.sql.SparkSession.builder.appName("DeltaTutorial") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

# Create spark context
spark = configure_spark_with_delta_pip(builder).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Ok, we now have a local Spark Session which has the relevant Delta configuration as well. Let's take a look at some examples of how to use Delta with Spark

In [None]:
#spark.sparkContext.getConf().getAll()

In [3]:
# create some example data 
data = [("Robert", "Baratheon", "Baratheon", "Storms End", 48),
        ("Eddard", "Stark", "Stark", "Winterfell", 46),
        ("Jamie", "Lannister", "Lannister", "Casterly Rock", 29)
        ]
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("house", StringType(), True),
    StructField("location", StringType(), True),
    StructField("age", IntegerType(), True)
])

sample_dataframe = spark.createDataFrame(data=data, schema=schema)

sample_dataframe.show() 

                                                                                

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|   Robert|Baratheon|Baratheon|   Storms End| 48|
|   Eddard|    Stark|    Stark|   Winterfell| 46|
|    Jamie|Lannister|Lannister|Casterly Rock| 29|
+---------+---------+---------+-------------+---+



Let's write out our first delta table! This will be to a local location in the `./data` folder:

In [6]:
# write out data to delta format
sample_dataframe.write\
    .mode("overwrite")\
    .format("delta")\
    .save("./data/got_delta")

                                                                                

Let's look at reading that delta table back in 

In [7]:
got_df = spark.read.format("delta").load("./data/got_delta")
got_df.show()

                                                                                

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|    Jamie|Lannister|Lannister|Casterly Rock| 29|
|   Robert|Baratheon|Baratheon|   Storms End| 48|
|   Eddard|    Stark|    Stark|   Winterfell| 46|
+---------+---------+---------+-------------+---+



Overwrite the entire table ...

In [8]:
# notice we are changing the ages 
data = [("Robert", "Baratheon", "Baratheon", "Storms End", 49),
        ("Eddard", "Stark", "Stark", "Winterfell", 47),
        ("Jamie", "Lannister", "Lannister", "Casterly Rock", 30)
        ]
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("house", StringType(), True),
    StructField("location", StringType(), True),
    StructField("age", IntegerType(), True)
])
sample_dataframe = spark.createDataFrame(data=data, schema=schema)
(sample_dataframe.write
    .mode(saveMode="overwrite") # specify to overwrite 
    .format("delta")            # delta format
    .save("./data/got_delta")   # same location for the files as before, so they are overwritten 
)

                                                                                

read that back in again, and notice the new ages!

In [9]:
got_df = spark.read.format("delta").load("./data/got_delta")
got_df.show()

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|    Jamie|Lannister|Lannister|Casterly Rock| 30|
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
+---------+---------+---------+-------------+---+



Now, let's look at upserting a Delta Table

In [10]:
# Import 
from delta.tables import DeltaTable

# Upsert Data
# delta table path
deltaTable = DeltaTable.forPath(spark, "./data/got_delta")
deltaTable.toDF().show()

# define new data
data = [("Gendry", "Baratheon", "Baratheon", "Kings Landing", 19),
        ("Jon", "Snow", "Stark", "Winterfell", 21),
        ("Jamie", "Lannister", "Lannister", "Casterly Rock", 36)
        ]
schema = StructType([
    StructField("firstname", StringType(), True),
    StructField("lastname", StringType(), True),
    StructField("house", StringType(), True),
    StructField("location", StringType(), True),
    StructField("age", IntegerType(), True)
])

newData = spark.createDataFrame(data=data, schema=schema)

(
    deltaTable
        .alias("oldData") # alias the current dataframe as old table
        .merge(
            newData.alias("newData"),"oldData.firstname = newData.firstname" # specify what dataframe to merge in & how
        )
        .whenMatchedUpdate( # specify what to do on when olData.firtname = newData.firstname matches. You use UPDATE
            set={
                "firstname": col("newData.firstname"), 
                "lastname": col("newData.lastname"), 
                "house": col("newData.house"),
                "location": col("newData.location"), 
                "age": col("newData.age")
            }
        )
    .whenNotMatchedInsert( # specify what to do on when olData.firtname = newData.firstname do NOT match. You use INSERT
        values={
            "firstname": col("newData.firstname"), 
            "lastname": col("newData.lastname"), 
            "house": col("newData.house"),
            "location": col("newData.location"), 
            "age": col("newData.age")
        }
    )
    .execute()
)

# look at the new dataframe ...
deltaTable.toDF().show()

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|    Jamie|Lannister|Lannister|Casterly Rock| 30|
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
+---------+---------+---------+-------------+---+



                                                                                

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|   Gendry|Baratheon|Baratheon|Kings Landing| 19|
|    Jamie|Lannister|Lannister|Casterly Rock| 36|
|      Jon|     Snow|    Stark|   Winterfell| 21|
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
+---------+---------+---------+-------------+---+



                                                                                

as you can see from above, Jamie Lannister was updated because he already existed, and Jon & Gendry were inserted, as they did not exist before

So, below, if we read the delta table into a new spark dataframe, notice the new updates above into that dataframe!

In [11]:
updated_got_df = spark.read.format("delta").load("./data/got_delta")
updated_got_df.show()

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|   Gendry|Baratheon|Baratheon|Kings Landing| 19|
|    Jamie|Lannister|Lannister|Casterly Rock| 36|
|      Jon|     Snow|    Stark|   Winterfell| 21|
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
+---------+---------+---------+-------------+---+



So, let's say we want to delete a record?

In [12]:
# let's remove the Gendry record 
# delta table path
deltaTable = DeltaTable.forPath(spark, "./data/got_delta")
#deltaTable.toDF().show()

deltaTable.delete(condition=expr("firstname == 'Gendry'"))

deltaTable.toDF().show()

                                                                                

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
|    Jamie|Lannister|Lannister|Casterly Rock| 36|
|      Jon|     Snow|    Stark|   Winterfell| 21|
+---------+---------+---------+-------------+---+



so, if we repeat the steps above, reading in the delta table to a new spark dataframe, we'll see gendry has been deleted!

In [13]:
updated_got_df_again = spark.read.format("delta").load("./data/got_delta")
updated_got_df_again.show()

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
|    Jamie|Lannister|Lannister|Casterly Rock| 36|
|      Jon|     Snow|    Stark|   Winterfell| 21|
+---------+---------+---------+-------------+---+



So, above, we've looked at how we can perform transactions on delta tables with PySpark

Finally, let's take a quick look at the version history of delta, and how we can read that into dataframes ... 

In [14]:
df_versionzero = spark.read.format("delta").option("versionAsOf", 0).load("./data/got_delta")
df_versionzero.show() # uses version zero of the delta table (aka the first version of the table)

print("=" * 100) 

df_versionzone = spark.read.format("delta").option("versionAsOf", 1).load("./data/got_delta")
df_versionzone.show() # uses version one of the delta table (aka the second version of the table) 

# This should show you the original updates we did, when we overwite the whole table to change the ages of each of Jamie, Robert & Eddard

                                                                                

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|    Jamie|Lannister|Lannister|Casterly Rock| 29|
|   Robert|Baratheon|Baratheon|   Storms End| 48|
|   Eddard|    Stark|    Stark|   Winterfell| 46|
+---------+---------+---------+-------------+---+



                                                                                

+---------+---------+---------+-------------+---+
|firstname| lastname|    house|     location|age|
+---------+---------+---------+-------------+---+
|    Jamie|Lannister|Lannister|Casterly Rock| 30|
|   Robert|Baratheon|Baratheon|   Storms End| 49|
|   Eddard|    Stark|    Stark|   Winterfell| 47|
+---------+---------+---------+-------------+---+



And thats the end of the example Delta Tables

In [15]:
spark.stop() # close spark session 