# ESCAPE Summer School 2021: Big data for big science #2

<img src="../pictures/spark_escape_logo.png" alt="alt text" width="1000"/>

### Context

In this second notebook, we will learn on concrete examples how to interface and play with popular scientific libraries (Numpy, Pandas, ...).

### Learning objectives

- Interfacing popular Python scientific libraries with Apache Spark
- Developing your own modules for Spark
- Inspect, test, and debug Spark programs.

In [1]:
# Uncomment these lines if you are using Google Colab
# !pip install pyspark==3.1.1

# from pyspark.sql import SparkSession

# # Initialise our Spark session
# spark = SparkSession.builder.getOrCreate()

Through this series of exercises, we will use the same dataset as in the first session:

In [1]:
# Load data into a Spark DataFrame
df = spark.read.format("parquet").load("../data/clusters.parquet")

## User defined functions

Spark has many built-in functions, but it is often limited for scientific purposes. Ideally, you would like also to be able to apply any complex logic to your data. This is done through `User-Defined Functions` (UDFs). 

### Simple UDF

UDFs do not modify columns directly (concept of _immutability_) - you will create a new column in the DataFrame instead and populate it with your logic:

In [2]:
from pyspark.sql.functions import udf

def add_one(element):
    return element + 1

add_one_udf = udf(add_one)

df.withColumn('idPlusOne', add_one_udf(df['id'])).show(2)

+-------------------+------------------+------------------+---+---------+
|                  x|                 y|                 z| id|idPlusOne|
+-------------------+------------------+------------------+---+---------+
|0.40036865101002594| 6.377802717872659|  9.12320139596368|  2|        3|
|0.35619804381308917|4.0063127514493715|2.5682278136488326|  0|        1|
+-------------------+------------------+------------------+---+---------+
only showing top 2 rows



### Limitations

1. UDFs let you to define any processing to be done on the data, but they come at a price: they are black boxes for Spark! This means you will not benefit from the built-in optimisations that Spark offers.
2. For each element of the DataFrame, the UDF is called - hence it can be super slow!

Always prefer a built-in Spark function rather than a UDF if the alternative exists.

In [3]:
%timeit a = df.withColumn('idPlusOne', add_one_udf(df['id'])).collect()
%timeit b = df.withColumn('idPlusOne', df['id'] + 1).collect()

620 ms ± 181 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
359 ms ± 109 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Speeding-up UDF: Pandas UDF

To overcome the inefficiency of UDFs, you can use instead `Pandas UDFs`. They can be seen as vectorised UDFs. They use Pandas Series and pyarrow under the hood to speed-up the data movement and computation.

<img src="../pictures/spark_udf.png" alt="alt text" width="1000"/>

### Simple Pandas UDF

Pandas UDF are decorated functions. They take as input one (or several) column of the DataFrame, and output Pandas Series. Our basic UDF can be rewritten as:

In [4]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf('int')
def add_one_pandas(colAsSeries: pd.Series) -> pd.Series:
    return colAsSeries + 1

df.withColumn('idPlusOne', add_one_pandas(df['id'])).show(2)

+-------------------+------------------+------------------+---+---------+
|                  x|                 y|                 z| id|idPlusOne|
+-------------------+------------------+------------------+---+---------+
|0.40036865101002594| 6.377802717872659|  9.12320139596368|  2|        3|
|0.35619804381308917|4.0063127514493715|2.5682278136488326|  0|        1|
+-------------------+------------------+------------------+---+---------+
only showing top 2 rows



In [5]:
%timeit a = df.withColumn('idPlusOne', add_one_udf(df['id'])).collect()
%timeit b = df.withColumn('idPlusOne', df['id'] + 1).collect()
%timeit c = df.withColumn('idPlusOnePandas', add_one_pandas(df['id'])).collect()

396 ms ± 84.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
232 ms ± 42 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
358 ms ± 60.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Pandas UDF types

As of Spark 3.1, there are several Pandas UDF types:
1. Series to Series (and Iterator of Series to Iterator of Series)
2. Series to Scalar
3. Map
4. Grouped Map (and Co-grouped Map)

See this [post](https://databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html) for more information.

### Series to Series

This corresponds to the previous example. It expects the given function to take one or more `pandas.Series` and outputs one `pandas.Series`.

<img src="../pictures/pudf_1.png" alt="alt text" width="300"/>

In [6]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

@pandas_udf('int')
def add_one_pandas(colAsSeries: pd.Series) -> pd.Series:
    return colAsSeries + 1

df.withColumn('idPlusOne', add_one_pandas(df['id'])).show(2)

+-------------------+------------------+------------------+---+---------+
|                  x|                 y|                 z| id|idPlusOne|
+-------------------+------------------+------------------+---+---------+
|0.40036865101002594| 6.377802717872659|  9.12320139596368|  2|        3|
|0.35619804381308917|4.0063127514493715|2.5682278136488326|  0|        1|
+-------------------+------------------+------------------+---+---------+
only showing top 2 rows



### Series to Scalar

The function takes one or more `pandas.Series` and outputs a primitive data type. 

<img src="../pictures/pudf_2.png" alt="alt text" width="300"/>

In [7]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

@pandas_udf("double")
def extract_mean(col: pd.Series) -> float:
    return col.mean()

# Compute the mean over all data
df.select(extract_mean(df['x']).alias('meanx_total')).show()

# Compute the mean per ID group
df.groupby("id").agg(extract_mean(df['x']).alias('meanx_per_group')).show()

# Compute the mean per ID group, and reassign it back to all elements
df.withColumn('meanx_per_group', extract_mean(df['x']).over(Window.partitionBy('id'))).show()

+------------------+
|       meanx_total|
+------------------+
|0.2246114316118941|
+------------------+

+---+------------------+
| id|   meanx_per_group|
+---+------------------+
|  1|0.9084311322436587|
|  2|-1.236493822799703|
|  0|1.0013143125628075|
+---+------------------+

+--------------------+--------------------+------------------+---+------------------+
|                   x|                   y|                 z| id|   meanx_per_group|
+--------------------+--------------------+------------------+---+------------------+
|-0.16938263429070788| -3.2704779332785194| 3.461377027352177|  1|0.9084311322436587|
|   2.015218902970069| -1.9768000326242543|3.6711990601129005|  1|0.9084311322436587|
|  1.6564925029468829|  -2.373951729711308|3.0974884248661003|  1|0.9084311322436587|
|  0.3102615619720164| -2.0743969915840643|3.2329328785995464|  1|0.9084311322436587|
| 0.45261425901695157| -1.4735817501793693| 2.802700438479896|  1|0.9084311322436587|
|  1.3472199788674648| -0.9086

### Map

It maps every batch in each partition and transforms each. The function takes an iterator of `pandas.DataFrame` and outputs an iterator of `pandas.DataFrame`.

<img src="../pictures/pudf_3.png" alt="alt text" width="300"/>

In [8]:
from typing import Iterator
import pandas as pd

def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

# Extract only rows with ID=1
df.mapInPandas(pandas_filter, schema=df.schema).count()

1333

### Grouped Map (experimental API)

Grouped map in the Pandas Function API is `applyInPandas` at a grouped DataFrame, e.g., `df.groupby(...)`. It maps each group to each `pandas.DataFrame` in the function. Note that it does not require for the output to be the same length of the input.

<img src="../pictures/pudf_4.png" alt="alt text" width="300"/>

In [9]:
import pandas as pd

def subtract_x_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    x = pdf.x
    return pdf.assign(x=x - x.mean())

df.select(['x', 'id']).groupby("id").applyInPandas(subtract_x_mean, schema='x double, id long').show(2)

+-------------------+---+
|                  x| id|
+-------------------+---+
|-1.0778137665343666|  1|
| 1.1067877707264102|  1|
+-------------------+---+
only showing top 2 rows



## Exercises

**Exercise:** Use a pandas UDF to compute the distance of each row to the center (x, y, z) = (0, 0, 0), and store the result in a new Dataframe column:

In [10]:
from pyspark.sql.functions import pandas_udf

import numpy as np
import pandas as pd

@pandas_udf('double')
def compute_distance_to_center(x, y, z):
    """ Compute the distance to the center (0, 0, 0)
    
    Parameters
    ----------
    x, y, z: double
        row coordinates
        
    Returns
    ----------
    series: pandas Series
        Series containing distance to the center for each row
    """
    r_square = x*x + y*y + z*z
    return pd.Series(np.sqrt(r_square))

df.withColumn(
    "distance", 
    compute_distance_to_center(
        df["x"],
        df["y"],
        df["z"]
    )
).show(5)

+--------------------+-------------------+------------------+---+------------------+
|                   x|                  y|                 z| id|          distance|
+--------------------+-------------------+------------------+---+------------------+
| 0.40036865101002594|  6.377802717872659|  9.12320139596368|  2|11.138647416815433|
| 0.35619804381308917| 4.0063127514493715|2.5682278136488326|  0| 4.772128771485695|
|  1.8851627680444136|   6.11585014171703|1.7987871043042176|  0| 6.647788855294085|
| -1.7480450713588191|  7.582580700598671| 9.635550121929803|  2|12.385274232119963|
|-0.16938263429070788|-3.2704779332785194| 3.461377027352177|  1| 4.765065300113147|
+--------------------+-------------------+------------------+---+------------------+
only showing top 5 rows



**Exercise:** As in session 1, find the barycentre of each clusters in the dataset but this time using aggregation and user defined function (hint: look for `GROUPED MAP`). 

In [11]:
from pyspark.sql.functions import pandas_udf
import pandas as pd

def compute_barycentre(pdf: pd.DataFrame) -> pd.DataFrame:
    """ Compute the barycentre of a partition
    
    Parameters
    ----------
    pdf : pandas DataFrame
        pandas DataFrame containing partition data
        
    Returns
    ----------
    Pandas DataFrame with barycentre coordinates.
    """
    mean = pdf.mean()

    out = {colname:[value] for colname, value in zip(mean.keys(), mean.values)}
    
    return pd.DataFrame(out)

df.groupBy("id").applyInPandas(compute_barycentre, schema=df.schema).show()


+------------------+------------------+------------------+---+
|                 x|                 y|                 z| id|
+------------------+------------------+------------------+---+
|0.9084311322436587|-1.533560888313291|2.9262012553633943|  1|
|-1.236493822799703| 7.783716322745622| 9.292937669035524|  2|
|1.0013143125628075|4.2508799077973025| 2.021690072130541|  0|
+------------------+------------------+------------------+---+



## Debugging Spark application: Spark UI

Finding the root of a problem in a distributed environment is not easy: the logs are usually on the executors (or redirected to an external storage system). The Spark UI is a tool that helps you visualising the processing, resources utilisation, and accessing the logs.

In [1]:
# let's start the history server
!. $SPARK_HOME/sbin/start-history-server.sh

starting org.apache.spark.deploy.history.HistoryServer, logging to /tmp/spark-events/spark--org.apache.spark.deploy.history.HistoryServer-1-29456d5438bb.out


### Spark UI

- It contains the details of the current job. Automatically launched when the job starts, the Spark UI closes at the end of the job. The default port is 4040: http://127.0.0.1:4040

### History Server

- The history server contains all information about all the previous jobs. You need to start it manually. The default port is 18080: http://127.0.0.1:18080

## Testing Spark

As any tools, you must test your code: unit tests, integration tests, etc. They all apply for Spark. Exit the notebook (save your running notebook, shutdown the kernels, and CTRL+C the terminal tab where Jupyter has been launched), and enter the container (see `launch_container.sh`) to practice a bit:

```bash
# in school2021/spark
docker run -it --rm  \
    -v $PWD:/home/jovyan/work:rw \
    -p 8888:8888 -p 4040:4040 -p 18080:18080 \
    spark_escape2021 bash \
    
# You should see a similar prompt
(base) jovyan@77081e01d859:~/work$
```

## Acknowledgements

<img src="../pictures/logo-Escape_0.png" alt="alt text" width="400" align="right"/>

This event is organized in the framework and with the support of the European Science Cluster of Astronomy & Particle physics ESFRI research infrastructures (ESCAPE), funded by the European Union's Horizon 2020 - Grant N. 824064.