# Let's get started

<!-- ![](https://www.datadrivers.de/wp-content/uploads/2020/10/Weblogo_Datadrivers_GmbH.svg) -->

What could be better than starting with a blank slate? There we go...

**Note:** This notebook is aimed at Data Scientists and/or Data Engineers and treats the technical concepts superficially. It focuses on **first steps in data processing** and does not cover execution on graphs, model training and streaming/deployment purposes.

In [None]:
!java -version

In [None]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, SQLContext
from pyspark import StorageLevel
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, StringType
import random 
import os

## Initialize Spark on localhost

Home, sweet home. 🏠 

Before we move onto a real cluster, for example in the cloud, let's simulate this for now. This offers the chance to familiarize ourself with the key concepts, before moving ahead.

When a Spark cluster is started on localhost, a local instance of Spark is launched on the machine. The Spark driver program acts as the master node, and one or more Spark executors are launched as worker nodes.

The SparkContext is the entry point for low-level APIs of Spark, and it represents the connection to a Spark cluster. A SparkSession, on the other hand, is a higher-level API introduced in Spark 2.0 that provides a single entry point to create and manage Spark functionality, including SparkContext.

In [None]:
spark_session: SparkSession = SparkSession.builder.master("local").appName("Local").getOrCreate()

In [None]:
sc = spark_session.sparkContext

In [None]:
sc

## Start from scratch: RDDs

RDDs (Resilient Distributed Datasets) are fundamental data structures in Spark that enable distributed processing of by allowing data to be stored in memory across multiple machines and processed in parallel. They are immutable and fault-tolerant, making them ideal for distributed computing.

What happens when an RDD is created?

* The driver program (i.e., the program that creates the RDD) creates a logical representation of the RDD and sends it to the Spark cluster.
* The Spark cluster divides the RDD into smaller **partitions**, which are distributed across the nodes in the cluster.

The decision on how to split an RDD into partitions is made by Spark at the time the RDD is created. The number of partitions and the partitioning scheme are determined based on the size of the RDD and the available resources in the cluster. For small samples, it is likely that the entire RDD can fit in memory on each node in the cluster, and therefore the Spark cluster may choose to create only one partition for the entire RDD. 

By default, Spark creates one partition for every block of data in the RDD, where a block is typically 128MB by default, but this can be configured.

<img src="img/rdd_distribution.png" width="50%">

Having multiple partitions allows Spark to parallelize the processing of data, which can improve performance by allowing multiple nodes in the cluster to work on different parts of the data simultaneously. This enables Spark to scale to handle large datasets by breaking them down into smaller chunks that can be processed in parallel across multiple nodes in a cluster.

In [None]:
randoms = [random.randint(0,10) for _ in range(1000)]

In [None]:
randoms_rdd = sc.parallelize(randoms)

In [None]:
randoms_rdd

## Transformations, actions and lazy excecution

After covering the first concept of *parallelization*, we have three more buzzwords to deal with.

* Transformations
* Actions
* Lazy excecution

*Transformations* describe operations on the data like function mapping, filtering etc.

*Actions* are operations which use the result of data processing and output them, e.g. showing or printing a result, counting the number of observations or writing the result to an HDD. They process the data in that sence that the chain of **transformations needs to be excecuted** at this point.

Spark transformations are *lazy*. This means that they are executed only when it is necessary. This is generally the case with an action. All transformations are stored in an *excecution-plan* (viewable in a so-called DAG (*Directed Acyclic Graph*). This gives rise to **optimization opportunities** and **fault tolerance**. 

One example of an action is *take*, which forces Spark to to take the first n entries. In that case, the driver node needs to collect transformed data from the RDDs on the worker nodes.

In [None]:
randoms_rdd.take(10)

In [None]:
import time

def wait_then_sqare(x):
    time.sleep(2)
    return x ** 2

In [None]:
randoms_squared = randoms_rdd.map(lambda x: wait_then_sqare(x))

Wait.. It should take a nap? Why did the execution only took a millisecond?  

Because of the lazyness... No one asked for a result yet.

So, let's do it!

In [None]:
randoms_squared.take(5)

## DataFrames...

A Spark DataFrame is a distributed collection of data organized into named columns. It is similar to a table in a relational database. Under the hood, Spark DataFrames are built on top of the Spark SQL engine, which is responsible for executing SQL queries, performing optimizations, and managing data serialization and deserialization. It provides a high-level API for working with structured and semi-structured data, making it easy to manipulate large datasets. Spark DataFrames are implemented using RDDs.

### ...from scratch

In [None]:
df_from_scratch = spark_session.createDataFrame([
    {"random": random.randint(0, 10)}  for _ in range(100)
])

In [None]:
df_from_scratch.show(5)

In [None]:
df_from_scratch.printSchema()

### ... via RDD

In [None]:
randoms = [random.randint(0,10) for _ in range(1000)]
randoms_rdd = sc.parallelize(randoms)

In [None]:
randoms_rdd.take(5)

Note that the toDF() function expects an input of shape (n, 1) and not (1, n)

In [None]:
randoms_rdd = randoms_rdd.map(lambda x:[x])

In [None]:
randoms_rdd.take(5)

In [None]:
df_from_rdd = randoms_rdd.toDF()

In [None]:
df_from_rdd.printSchema()

In that case, it is better to define a schema.

In [None]:
schema = StructType([StructField("random", IntegerType(), False)])

df_from_rdd = randoms_rdd.toDF(schema)

In [None]:
df_from_rdd.show(5)

In [None]:
df_from_rdd.printSchema()

### ... or from a data source

In the real world, it's probably more likely to fetch a data source and read it in. 

When reading a CSV file in PySpark, the file is read in chunks and does not need to be fully opened in memory by the driver. This allows PySpark to efficiently process large CSV files: Each partition is read by a separate worker node in the PySpark cluster, allowing for parallel processing.

In contrast, when reading an Parquet file in PySpark, the data is read in a columnar format, which allows for more efficient processing. Parquet files are also highly compressed, which further reduces the data size and makes it more efficient to transfer across the network. 

Also, PySpark provides a JDBC data source, which allows one to read data from any database that supports JDBC.

In [None]:
UPDATE: bool = False

if UPDATE:
    current_season = 2223
    matches = f'https://www.football-data.co.uk/mmz4281/{current_season}/D1.csv'
    os.system(f"curl -o data/bundesliga_.csv {matches}")

In [None]:
if UPDATE:
    os.system("cut -d ',' -f 1,2,3,4,5,6,7 data/bundesliga_.csv > data/bundesliga.csv")

In [None]:
df_from_csv = spark_session.read.csv("data/bundesliga.csv", header=True)

In [None]:
df_from_csv.show(9)

In [None]:
df_from_csv.printSchema()

In [None]:
# Build sample data as parquet

# df = df_from_csv.withColumn("FTHG", F.col("FTHG").cast(IntegerType()))
# df = df.withColumn("FTAG", F.col("FTAG").cast(IntegerType()))
# df = df.withColumnRenamed("Div", "Matchday")
# df.coalesce(1).write.parquet("data/bundesliga")

In [None]:
df_from_parquet = spark_session.read.parquet("data/bundesliga")

In [None]:
df_from_parquet.show(5)

In [None]:
df_from_parquet.printSchema()

## Hands On: An example pipeline on a DataFrame

We take a look at the German Bundesliga. ⚽

In [None]:
df = spark_session.read.parquet("data/bundesliga")

In [None]:
df.show(2)

In [None]:
df = df.withColumn("GoalDelta", F.col("FTHG") - F.col("FTAG"))

In [None]:
df = df.withColumn("PointsHome", F.lit(1))
df = df.withColumn("PointsHome", F.when(
    F.col("GoalDelta") > 0, F.lit(3)).otherwise(F.col("PointsHome")))
df = df.withColumn("PointsHome", F.when(
    F.col("GoalDelta") < 0, F.lit(0)).otherwise(F.col("PointsHome")))

In [None]:
df = df.withColumn("PointsAway", F.lit(1))
df = df.withColumn("PointsAway", F.when(
    F.col("GoalDelta") < 0, F.lit(3)).otherwise(F.col("PointsAway")))
df = df.withColumn("PointsAway", F.when(
    F.col("GoalDelta") > 0, F.lit(0)).otherwise(F.col("PointsAway")))

In [None]:
df_home = df.select("HomeTeam", "FTHG", "FTAG", "PointsHome").\
withColumnRenamed("HomeTeam", "Team").\
withColumnRenamed("FTHG", "Goals").\
withColumnRenamed("FTAG", "GoalsAgainst").\
withColumnRenamed("PointsHome", "Points")

# An Alternative to avoid all this .withColumnRenamed-Calls: 
# new_columns = ["Team","Goals","GoalsAgainst", "Points"]
# df_home = df.select("HomeTeam", "FTHG", "FTAG", "PointsHome").toDF(*new_columns)

df_away = df.select("AwayTeam", "FTHG", "FTAG", "PointsAway").\
withColumnRenamed("AwayTeam", "Team").\
withColumnRenamed("FTHG", "GoalsAgainst").\
withColumnRenamed("FTAG", "Goals").\
withColumnRenamed("PointsAway", "Points")

In [None]:
# The Select Statement ensures a consistent order of the DataFrames to be merged.
cols = ["Team", "Goals" , "GoalsAgainst", "Points"]
df_home = df_home.select(*cols)
df_away = df_away.select(*cols)

df = df_home.union(df_away)

In [None]:
df = df.groupBy("Team").sum()
df = df.withColumnRenamed("sum(Goals)", "Goals").\
withColumnRenamed("sum(GoalsAgainst)", "GoalsAgainst").\
withColumnRenamed("sum(Points)", "Points")

df = df.withColumn("GoalDifference", F.col("Goals") - F.col("GoalsAgainst"))

Let's sort by points and goal difference to get the table...

Note: Sorting large datasets on a Spark cluster can be a resource-intensive operation. Sorting requires shuffling of data across the cluster. So avoid it where possible.

In [None]:
df = df.sort("Points", "GoalDifference", ascending=False)

In [None]:
df = df.withColumn("Place", F.monotonically_increasing_id() + 1)

Before the plan on a dataframe is executed, it is analyzed internally by the *Catalyst Optimizer* and the execution is then executed in an optimized manner. This leads to further **speed performance advantages.**  

In [None]:
# df.explain()

In [None]:
df.show()

Here we are.

You might argue that this would be way more easy via SQL. In this case,  you are probably right.  
Note that this acts as a demo. Also, here we have the chance to test each step individually. 
This is a huge advantage when developing a data pipeline.

### SQL

It is possible to use SQL, even in an intermediate step. 

```python
df.registerTempTable("my_table_name")

query = """
SELECT *
FROM my_table_name
WHERE ...
"""

df_queried = spark_session.sql(query)
```

### User defined functions

It is also possible to define a user defined function and apply it to the DataFrame. Note that in ETL processes one should try to avoid udfs and to solve the problem via native spark functions instead. The reason is that pure Python acts as a **blackbox** for SparkSQL and will **not be optimized**. Also, data might need to be unnecessarily shuffled between the worker nodes and the driver. The syntax is as follows:

```python
from pyspark.sql.udf import UserDefinedFunction

def my_user_defined_square_function(x):
    # square via pure Python
    return x ** 2
    
my_udf = UserDefinedFunction(lambda z: my_user_defined_square_function(z), FloatType())

df = df.withColumn("ResultColumn", my_udf(F.col("Column")))
```

## Pitfalls with lazy evaluation

### Don't

In an ideal world, the save method of the end of a script which marks the only action. In real world applications, however, it might be the case, that there occure multiple actions. This might be due to debugging purposes. Or take another example: Your application should react to the amount of observations in a DataFrame. Then, you need to apply the action *count*.

The problem with this is that the pipeline is now triggered twice. First, up to the first action. However, the intermediate result is not automatically cached. If you continue to work on the DataFrames, the DAG is expanded and **fully executed** again with the next action.

In [None]:
df = spark_session.read.parquet("data/bundesliga")

# First transformation
df = df.withColumn("GoalDelta", F.col("FTHG") - F.col("FTAG"))

n = df.count()

if n < 9*5:
    print("It is to early to judge teams performances.")
else:
    print("Dont hide the truth.")

# Further transformations
df = df.withColumn("Result", F.lit("D"))
df = df.withColumn("Result", F.when(
    F.col("GoalDelta") > 0, F.lit("H")).otherwise(F.col("Result")))
df = df.withColumn("Result", F.when(
    F.col("GoalDelta") < 0, F.lit("A")).otherwise(F.col("Result")))

df.show(5)

### Better: Persist! 

There is a solution: you can persist and save the DataFrame on the Hard Drive and/or memory to a certain point.

In [None]:
df = spark_session.read.parquet("data/bundesliga")

# First transformation
df = df.withColumn("GoalDelta", F.col("FTHG") - F.col("FTAG"))

# Force a persist
# Note that this does not happen immediately, since persist() is not an action itself!
df = df.persist(StorageLevel.MEMORY_AND_DISK)

n = df.count()

if n < 9*5:
    print("It is to early to judge teams performances.")
else:
    print("Dont hide the truth.")

# Further transformations
df = df.withColumn("Result", F.lit("D"))
df = df.withColumn("Result", F.when(
    F.col("GoalDelta") > 0, F.lit("H")).otherwise(F.col("Result")))
df = df.withColumn("Result", F.when(
    F.col("GoalDelta") < 0, F.lit("A")).otherwise(F.col("Result")))

df.show(5)

## Some other pitfalls

### Case sensitivity

Note that different columns with same name can coexist in a dataframe.

In [None]:
df = spark_session.createDataFrame([{"COL": 1, "col": 2}])

In [None]:
df.show()

### Duplicated Column Names

As long as no transformations are made on them, also columns with same case-sensitive names can coexist in a dataframe. This can be an issue when joining two dataframes where one has the same columns in both.  

In [None]:
df = df.withColumnRenamed("col", "COL")

In [None]:
df.show()

In [None]:
# df.select("COL").show() # will raise an error

## Wrap-Up

* We simulated the spark environment locally
* RDDs are the basic data structure of Spark. They are immutable and distributed across the cluster.
* DataFrames are a higher level abstraction of RDDs. They are also immutable and distributed across the cluster. They are also optimized for SQL-like operations.
* A schema consists of a list of columns with their names and types as well as the nullability.
* We distinguish between transformations and actions. Transformations are lazy and only executed when an action is called.
* The so-called Catalyst Optimizer optimizes the execution plan of a DataFrame.
* If several actions are called, persistence should be applied in the meantime.
* The syntax sometimes seems a bit hard to get used to, due to the Scala and Java presences.
* The tooling is very valuable. Especially when building large software and ETL processes.
* Individual steps can be tested and reused in comparison to a pure SQL-based script.

## Congrats!

You have made your first steps with Pyspark.  
🚀🚀🚀

At the end: Stop the local Spark session.

In [None]:
spark_session.stop()