# Part III: From Scala to Python

Author: **Julien Peloton** [@JulienPeloton](https://github.com/astrolabsoftware/spark-tutorials/issues/new?body=@JulienPeloton)  
Last Verifed to Run: **2018-10-30**  

The core of Apache Spark is written is [Scala](https://scala-lang.org/), a general-purpose programming language that has been started in 2004 by Martin Odersky (EPFL). The language is inter-operable with Java and Java-like languages, and Scala executables run on the Java Virtual Machine (JVM). Like Java it has a strong static type system, but it does not require type declarations. Finally, Scala being younger than Java (1995), its design tries to solve some of the problems of Java.

Scala is a portmanteau of scalable and language, to reflect the fact that the language is designed to grow and easily integrate new features over time. There are pros and cons to this... For example it is easy to extend or add specific features from other languages, but then there is no guarantee for backward compatibility between Scala versions. In addition, the syntax of the language is not unique (many optional rules, like type hints).

Scala is not a pure functional programming language. It is multi-paradigm, including functional programming, imperative programming, object-oriented programming and concurrent computing. For those interested in Scala, you can follow this quick (~40min) introductory tutorial: [scala-tutorials](https://github.com/astrolabsoftware/scala-tutorials).

Why am I talking about Scala in a PySpark context? Because you cannot ignore it! While PySpark is nowadays more than just an interface around Spark, most of it still comes straight from Spark (in Scala). To get the best of PySpark you need to know a few functional programming concepts present in Scala.

__Learning objectives__

- Strict vs non-strict (lazy) evaluation
- Transformations vs actions
- User Defined Functions
- Cache or not cache? That's the question.


In [1]:
from pyspark.sql import SparkSession

# Initialise our Spark session
spark = SparkSession.builder.getOrCreate()

## Strict vs non-strict (lazy) evaluation

Let's take a simple example (adapted from M. Odersky):

```python
# Initialise a RDD
rdd = sc.parallelize(range(10))

# xs is f(data)
xs = rdd.map(lambda x: ...)

# Action 1: filter some elements of xs and sum them
xs.filter(...).sum()

# Action 2: Take n elements of xs
xs.take(n)
```

In the case of a strict language, `map` is evaluated once and we produce intermediate list `xs`. In the case of a lazy language, `map` is evaluated twice and there is no intermediate list.

In other words strict evaluates expressions as soon as they are available, rather than as needed. Lazy waits for the whole chain of actions before taking a decision on when the evaluation should happen.

Spark collections however are **lazy** by default. Scala collections are **strict** by default.

### What does it imply in the case of Apache Spark (i.e. lazy)?

- You can define thousands intermediate objects using _transformations_ (`map` is one, see below for more information on action vs transformation), the computation will not be triggered as long as you do not call an action at the end.
- Once you have define you chain of transformations plus the action at the end, Spark will analyse it, create a DAG (direct acyclic graph), and often optimize much better than we could do (as humans).
- A long chain of operations on a RDD will be evaluated each time you call an action at the end. No panic, to avoid re-processing everything from the initial RDD you have also the possibility of _caching_ intermediate results. This is discussed in the last section of this notebook.

## Transformations vs actions

There is a clear distinction between [transformation](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) and [action](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions): a transformation creates a new dataset from an existing one, and an action returns a value to the driver program after running a computation on the dataset.

In [2]:
rdd = sc.parallelize(range(10))

# Transformation: create a new rdd with twice
# the initial value. Nothing really happens in 
# terms of IO or computation.
rddp = rdd.map(lambda x: 2 * x)

# Action: compute the sum and return it.
# Computation is triggered.
# note: we could have use the sum() method
# rddp.sum()
rddp.reduce(lambda x, y: x + y)

90

In practice, we manipulate the dataset by applying transformations onto it (`map`, `filter`, `union`, ...) and only when the data exploration has reduced most of the data set size, we perform an action (`count`, `reduce`, `collect`, `sum`, `take`...) which triggers the computation.

### Loading data: lazy!

In the previous part (2), we dicussed the IO part of Spark. You probably did not notice, but this is a lazy operation!

In [3]:
# No data transfer or work have been performed on the cluster, 
# since no actions have been called yet.
df = spark.read.format('parquet').load('data/points.parquet')

The computation will be triggered only if you call an action:

In [4]:
# Just another transformation
df_filt = df.filter(df["x"] > 0.999)

# Trigger a proper action now
print(df_filt.select("x").collect())

# Show is special in the sense that it collects
# only a subset of the full dataset and print it.
# That's why it is much faster than other actions
# in general.
df_filt.show()

[Row(x=0.9992780089378357), Row(x=0.9998085498809814), Row(x=0.9999639987945557), Row(x=0.9999489784240723), Row(x=0.9999311566352844), Row(x=0.9999566078186035), Row(x=0.9994782209396362), Row(x=0.9991017580032349), Row(x=0.999708354473114), Row(x=0.9999779462814331), Row(x=0.9993855953216553), Row(x=0.9997941851615906), Row(x=0.9991426467895508), Row(x=0.9990198612213135), Row(x=0.9999620318412781), Row(x=0.9998014569282532), Row(x=0.9995845556259155), Row(x=0.9998140335083008), Row(x=0.9997224807739258), Row(x=0.9993610978126526), Row(x=0.9998552799224854), Row(x=0.9995030164718628), Row(x=0.9993981122970581), Row(x=0.9993990063667297), Row(x=0.9996508955955505), Row(x=0.9996503591537476), Row(x=0.9992586374282837), Row(x=0.9995623826980591), Row(x=0.9995316863059998), Row(x=0.999855637550354), Row(x=0.999435544013977), Row(x=0.9994217753410339), Row(x=0.9996558427810669), Row(x=0.9994787573814392), Row(x=0.9997873902320862), Row(x=0.9998015761375427), Row(x=0.9995604753494263), Row

### RDD vs DataFrame: the map case

A DataFrame being based on a RDD, the kind of transformation you can apply onto it is similar. Most of transformations are available for both RDD and DataFrame and the syntax is quasi-identical (`filter`, `union`, etc...). The most notable difference is `map` which is not a DataFrame method. Let's investigate a bit:

In [5]:
# Initial DataFrame
df = spark.read.format('csv')\
    .option("inferSchema", True)\
    .option("header", True)\
    .load('data/points.csv')

# Corresponding RDD
rdd = df.rdd

# Data
df.show(2)

+----------+----------+---------+
|         x|         y|        z|
+----------+----------+---------+
| 0.5488135|0.30764535|0.5352571|
|0.71518934|0.06200521|0.9040443|
+----------+----------+---------+
only showing top 2 rows



Note there are plenty of ways to create a DataFrame from a RDD or a collection. For more information, see for example `spark.createDataFrame?`. Let's now multiply all `x` entries by 2:

In [6]:
# RDDs are simple
rdd.map(lambda arow: [2 * arow[0], arow[1], arow[2]]).take(1)

[[1.097627, 0.30764535, 0.5352571]]

In [7]:
# DataFrame are slightly more convoluted...

#############################
# Method 1: Go back to RDD...
df.rdd\
    .map(lambda arow: [2 * arow[0], arow[1], arow[2]])\
    .toDF(df.columns)\
    .show(2)

#############################
# Method 2: manipulate entire column at once
from pyspark.sql.functions import col
df.withColumn("x", 2 * col("x")).show(2)

#############################
# Method 3: à la SQL
# SQL - register first the DataFrame
df.createOrReplaceTempView("myDF")

# Define your command
sql_command = """
    SELECT 2 * x AS x, y, z
    FROM myDF 
"""

# Execute the expression - return a DataFrame
df_mult = spark.sql(sql_command)
df_mult.show(2)

#############################
# Method 4: User Defined function
# We will explore this in the next section
from pyspark.sql import functions as F
mult = F.udf(lambda x: 2 * x)
df.select(mult(df["x"]).alias("x"), "y", "z").show(2)

+----------+----------+---------+
|         x|         y|        z|
+----------+----------+---------+
|  1.097627|0.30764535|0.5352571|
|1.43037868|0.06200521|0.9040443|
+----------+----------+---------+
only showing top 2 rows

+----------+----------+---------+
|         x|         y|        z|
+----------+----------+---------+
|  1.097627|0.30764535|0.5352571|
|1.43037868|0.06200521|0.9040443|
+----------+----------+---------+
only showing top 2 rows

+----------+----------+---------+
|         x|         y|        z|
+----------+----------+---------+
|  1.097627|0.30764535|0.5352571|
|1.43037868|0.06200521|0.9040443|
+----------+----------+---------+
only showing top 2 rows

+----------+----------+---------+
|         x|         y|        z|
+----------+----------+---------+
|  1.097627|0.30764535|0.5352571|
|1.43037868|0.06200521|0.9040443|
+----------+----------+---------+
only showing top 2 rows



In general (not just for map), DataFrames have several way for doing the same thing. Example on `filter`:

In [8]:
# Filter data
a = df.filter(df["x"] > 0.1).count()
b = df.where(df["x"] > 0.1).count()
c = spark.sql("SELECT x FROM myDF WHERE x > 0.1").count()
print("{} = {} = {}".format(a, b, c))
# and more...

44874 = 44874 = 44874


This is very much in the spirit of Scala... ;-)

## User Defined Functions

The previous difference in behaviour brings us to... User Defined Functions! Although there are many simple functions accessible from the Spark SQL module, you ultimately want to execute aribtrary code on your DataFrame data. One way to do so is via User Defined Functions (UDF). 

Let's face the reality: there is a problem of performance with UDF in Python compared to their counterpart in Scala. It is discussed in length in [1807.03078](https://arxiv.org/abs/1807.03078) for example. If you write down a simple UDF and apply it to your dataset, you might be 100 times slower than with the same code in Scala. Oups... 

The reasons are rather technical, mainly in the way Python interacts with the JVM + the way Python processes work. Keep in mind PySpark is a Spark Python API which is more or less an interface for Spark Scala in Python via py4j.

In order to solve this problem, recent Spark version introduces vectorised UDF (or called pandas UDF). Once used, the difference with the Scala counterpart is less (but still non-negligible). See for example this simple explanation [here](https://databricks.com/session/keynote-from-reynold-xin) (from 7.50).

In [9]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType

# Standard version
mult = F.udf(lambda x: float(np.cos(x**2) * np.sin(x**3)))

# Vectorized version
@pandas_udf("double", PandasUDFType.SCALAR)
def multpd(x):
    """ Compute something...
    
    Parameters
    ----------
    x : Column
        One-dimensional ndarray. This is typically
        a DataFrame column (df["name"]).
    
    Returns
    ----------
        pandas.Series: One-dimensional ndarray.
    
    """
    ret = np.cos(x**2) * np.sin(x**3)
    return pd.Series(ret)

# Initial DataFrame
df = spark.read.format('fits')\
    .option("hdu", 1)\
    .load('data/points.fits')\
    .cache()

# Note that given the small dataset, and 
# the rather simple computation, you might not
# notice the time difference depending on your machine...
%timeit df.select(mult(df["x"]).alias("x")).count()
%timeit df.select(multpd(df["x"]).alias("x")).count()

61 ms ± 2.01 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
50.3 ms ± 6 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Note that you can pass several columns to UDF:

In [10]:
# Vectorized version
@pandas_udf("double", PandasUDFType.SCALAR)
def add_cols(x, y):
    """ Add two columns
    
    Parameters
    ----------
    x : One-dimensional ndarray
        This is typically a DataFrame column (df["name"]).
    y : One-dimensional ndarray
        This is typically a DataFrame column (df["othername"]).
    
    Returns
    ----------
        pandas.Series: One-dimensional ndarray.
    
    """
    ret = x + y
    return pd.Series(ret)

df.select(add_cols(df["x"], df["y"]).alias("x + y")).show(3)

+------------------+
|             x + y|
+------------------+
|0.8564589023590088|
|0.7771945595741272|
|1.5915484428405762|
+------------------+
only showing top 3 rows



## Cache or not cache? That's the question.

You might notice that we used the method `cache()` earlier. What is it?

Spark is based on the so-called MapReduce cluster computing paradigm, popularized by the Hadoop framework using implicit data parallelism and fault tolerance. But prior to Spark, in Hadoop for example, MapReduce was really Map-Reduce-Map-Reduce-Map-... That is data was loaded, manipulated once, dumped to disk, reloaded, re-manipulated once, etc... This is not super efficient if you want to perform iterative steps or re-use the same input many times in different context.

Although Spark is not an in-memory technology per se (Spark has pluggable connectors for different persistent storage systems but it does not have native persistence code), it allows users to efficiently use in-memory Least Recently Used (LRU) cache relying on disk only when the allocated memory is not sufficient. Ideally, the data will be loaded from disk into partitions only the first time and the totality or some part of it will be kept in-memory (distributed among processors), such that the subsequent data explorations will be limited by the computation time only. If the dataset size is bigger than the total available cache memory in the cluster, the user needs to decide whether remaining partitions not cached will be spill to disk at the first iteration or recomputed from scratch later if needed.

For more information, including which storage level to choose, see http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence.

In [11]:
from pyspark import StorageLevel

# StorageLevel gives you control of the kind
# of partition storage you want to use.
print(StorageLevel.__doc__)


    Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
    whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
    in a JAVA-specific serialized format, and whether to replicate the RDD partitions on multiple
    nodes. Also contains static constants for some commonly used storage levels, MEMORY_ONLY.
    Since the data is always serialized on the Python side, all the constants use the serialized
    formats.
    


In [12]:
# Initial DataFrame - nothing has been
# assumed about the cache, i.e. no cache.
df = spark.read.format('fits')\
    .option("hdu", 1)\
    .load('data/points.fits')
print(df.is_cached)

# Decide to cache entirely the DF
# Make sure you have enough memory available
# df.cache() is a shorthand for df.persist(StorageLevel.MEMORY_ONLY)
df.persist(StorageLevel.MEMORY_ONLY)
print(df.is_cached)

# Manually unpersist data. Not mandatory as Spark
# automatically monitors cache usage on each node and 
# drops out old data partitions in a Least Recently Used (LRU) fashion.
df.unpersist()
print(df.is_cached)

False
True
False


You can check, even on this very small dataset, the benefit of caching the data:

In [13]:
# Just in case...
df.unpersist()
%timeit df.count()

# Cache it
df_cached = df.cache()
df_cached.count()
%timeit df_cached.count()

61.2 ms ± 3.76 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
31.6 ms ± 2.38 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


Finally, a common mistake is to think that all the CPU RAM can be used for the cache. It is wrong. The default in Spark sets the total memory fraction dedicated to the caching to 60% of the total RAM. Why? Because you need the remaining for other things (JVM, computation, shuffle, ...) ;-)
So if you have a cluster with 1000 CPU @ 2 GB RAM each, you will be able to cache up to 1.2 TB max (and not 2 TB!). As usual in Spark, this parameter is tunable either via configuration file or directly in command line when launching your job.

## Going further

Here is a series of useful links on similar topics:

- Doc on Apache Spark [transformation](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) and [action](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions).
- Apache Spark and data persistence: http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence
- Talks on (vectorised) PySpark UDF: [here](https://databricks.com/session/vectorized-udf-scalable-analysis-with-python-and-pyspark) and [there](https://databricks.com/session/making-pyspark-amazing-from-faster-udfs-to-dependency-management-graphing) for example.
- Tutorials for Scala: https://github.com/astrolabsoftware/scala-tutorials