# Spark Tutorial

## What is PySpark?

[https://medium.com/analytics-vidhya/how-does-pyspark-work-step-by-step-with-pictures-c011402ccd57](https://medium.com/analytics-vidhya/how-does-pyspark-work-step-by-step-with-pictures-c011402ccd57)

In [1]:
%env PYSPARK_PYTHON=./env/bin/python
%env SPARK_LOCAL_DIRS=/mnt/storage/grid/home/adam/spark_tmp

import pyspark

from pyspark.sql.functions import *
from pyspark.sql.types import *

env: PYSPARK_PYTHON=./env/bin/python
env: SPARK_LOCAL_DIRS=/mnt/storage/grid/home/adam/spark_tmp


## Start a Spark Session

The non-VCS files are located at `/home/adam/spark-tutorial` on `magarveylab-computational`.

Start a Spark Session to connect to the Spark cluster from the python client.
Additional configuration options are available in the Apache Spark documentation.
See more at [https://spark.apache.org/docs/latest/configuration.html](https://spark.apache.org/docs/latest/configuration.html).
Two options of importance are driver and executor memory,
which are the per-node memory allocations for the Spark Session.
The driver node performs aggregate operations across partitions,
while executor nodes only perform narrow operations.
In order to make dependencies, package the active conda environment
with `conda pack -o -f env.tar.gz` and update the configuration.
The spark archives convention is `<path>#<alias>`,
where the `<path>` is the path to the archive on the client machine,
and the `<alias>` maps to `./<alias>` on the executor machine.
For example, `env.tar.gz#env` is accessed on the executor via `./env`,
as is seen applied in `%env PYSPARK_PYTHON=./env/bin/python`.
When the disk space on the executor is not sufficient for `spark.archives`,
map the working directory of the executor to where there is space,
such as a network drive `%env SPARK_LOCAL_DIRS=/mnt/storage/grid/home/adam/spark_tmp`.
Checkout the Spark UI (on port 4040 and greater) to observe
task progress, computational graph, and resource utilization.

In [2]:
spark = (
    pyspark.sql.SparkSession.builder
    .appName("spark-tutorial")
    .master("spark://MagarveyLab-mn1:7077")
    .config("spark.archives", "env.tar.gz#env,rxnmapper-model-albert.tar.gz#rxnmapper-model-albert")
    .config("spark.dynamicAllocation.enable", "true")
    .config("spark.dynamicAllocation.shuffleTracking.enabled", "true")
    .config("spark.driver.memory", "64g")
    .config("spark.executor.memory", "64g")
    .getOrCreate()
)
spark

## Load Data

Load a directory of JSON files in a single line.
Data is loaded from disk into a Resilient Distributed Dataset (RDD),
which is the fundamental data abstraction for Apache Spark.
The interface layer built on RDD is the Spark Dataframe
and enables a convenient and performant interface to manipulate the underlying RDD.
The Spark Dataframe interface is similar to Pandas Dataframe.
Use `.cache()` on the current task when there are multiple downstream tasks that depend on the current task.
Here, operations described in `Glance Dataset` all depend on `df` so we cache it to avoid loading from disk many times.

In [None]:
df = spark.read.format("json").load("/mnt/storage/grid/home/gunam/data_sets/ms2/theoretical_fragmentations").cache()

## Glance Dataset

1. Examine dataset structure using `df.printSchema()`
2. Sample some rows using `df.show()`
3. Observe summary statistics using `df.summary().show()`

Each top-level item in the schema corresponds to a column in the dataset.
Items in nested levels represent the object structure of each cell of the column.
For more methods, see [https://spark.apache.org/docs/latest/api/python/reference/index.html](https://spark.apache.org/docs/latest/api/python/reference/index.html).

In [None]:
df.printSchema()
df.show()
df.summary().show()

## Count non-null values in `smiles` series

In [None]:
df.select("smiles").dropna().count()

## User Defined Function (UDF)

Arbitrary functions that capture serializable and process-safe objects can be mapped over datasets using UDF definitions.
In PySpark, UDFs are defined using a decorator, and additionally require the function return type to be specified.

In [None]:
@udf(StructType([
    StructField("input_ids", ArrayType(ShortType(), False), False),
    StructField("token_type_ids", ArrayType(ShortType(), False), False),
    StructField("attention_mask", ArrayType(ShortType(), False), False),
]))
def encode(smiles):
    import rdkit.Chem
    import re
    import transformers
    tokenizer = transformers.BertTokenizer.from_pretrained(
        "rxnmapper-model-albert",
        do_lower_case=False,
        do_basic_tokenize=False,
    )
    __smiles_regex = re.compile(r"(\[[^\]]+]|Br?|Cl?|N|O|S|P|F|I|b|c|n|o|s|p|\(|\)|\.|=|#|-|\+|\\|\/|:|~|@|\?|>>?|\*|\$|\%[0-9]{2}|[0-9])")
    make_words = __smiles_regex.findall
    normalize = rdkit.Chem.CanonSmiles
    return dict(**tokenizer(make_words(normalize(smiles))))
df = df.withColumn("encoding", encode("smiles"))
df.printSchema()
df.show()

## Resources

* PySpark concepts explained by examples [https://sparkbyexamples.com/category/pyspark/](https://sparkbyexamples.com/category/pyspark/)
* PySpark official documentation [https://spark.apache.org/docs/latest/api/python/](https://spark.apache.org/docs/latest/api/python/)
* Spark SQL documentation [https://spark.apache.org/docs/latest/api/sql/index.html](https://spark.apache.org/docs/latest/api/sql/index.html)
