# Spark basics

As most of the course is `python` centered we will use `python` frontend (a.k.a. `pyspark`) to interact with the cluster.

# Installation

## Advised

Visit [PySpark download page](https://spark.apache.org/downloads.html) and:
- choose latest release
- download package locally

Install package located under `python` folder (when `conda` environment is active) via (line below means install local package):

In [None]:
# pip install -e .

> Remember to check [compatibility of pyarrow for latest supported Python version](https://arrow.apache.org/docs/python/install.html#python-compatibility)!

> Spark has to be installed separately on your system

> __Remember to make sure `pip` version plays well with the Spark engine!__

## Unadvised

Use `pip` or `conda` manager to install `pyspark`.

Please note that:
- __You will have to download `Spark` separately__
- __Might need to link it appropriately__
- __Verify the version installed works with `Spark` we have downloaded__.

> All in all, use the advised methods to save yourself some headaches

In [None]:
# !pip install pyspark

## findspark

As the `spark` version may not be discoverable from `pyspark` package one can use `findspark` utility to connect the components together.

- Install `findspark` package
- Run `findspark.init()` (which will set up necessary environment variables so `pyspark` can connect to JVM)

In [None]:
import findspark

findspark.init()

## Spark config

Given all of the steps above, we can set up `spark` distributed engine via:
- Frontend (`pyspark` in our case) - usable for application specific tasks and varying configuration
- Command line - usable for `spark-submit` and __overriding default values__
- Config file - usable as a base config and __when we submit job to the cluster__
- Global config file

> Above is also a priority list and the config at certain position ovverides values from the ones below it!

In [None]:
import multiprocessing

import pyspark

cfg = (
    pyspark.SparkConf()
    # Setting where master node is located [cores for multiprocessing]
    .setMaster(f"local[{multiprocessing.cpu_count()}]")
    # Setting application name
    .setAppName("TestApp")
    # Setting config value via string
    .set("spark.eventLog.enabled", False)
    # Setting environment variables for executors to use
    .setExecutorEnv(pairs=[("VAR3", "value3"), ("VAR4", "value4")])
    # Setting memory if this setting was not set previously
    .setIfMissing("spark.executor.memory", "1g")
)

# Getting a single variable
print(cfg.get("spark.executor.memory"))
# Listing all of them in string readable format
print(cfg.toDebugString())

# Sessions

> PySpark provides session object __which represents UNIFIED connection to Spark cluster__.

There are a few ways to set it up:
- directly through named/unnamed arguments
- __using `SparkConf` object__ (which we created and will use)
- Providing `SparkContext` with settings (__deprecated, avoid__)

It is used to:
- create `DataFrame`s (__main object containing data within cluster__)
- __broadcast variables to machines within the cluster__
- Run operations across HDFS enabled systems

There are some confusing points though...

## Contexts, Sessions and all of that...

`Spark` and `pyspark` provide a few objects to interact with `Spark` engine:
- `pyspark.SparkContext`
- __Only `scala`__: `org.apache.spark.sql.SQLContext`
- __Only `scala`__: `org.apache.spark.sql.hive.HiveContext`
- `pyspark.sql.SparkContext`

So what's the deal?

### SparkContext

> Object used by driver to communicate with cluster manager, executes and coordinates jobs

This object is always used and provides __generic `spark` capabilities__

### SQLContext

> __Given `SparkContext` interact with `SparkSQL` library__

One __had to__ provide `SparkContext` to this object in order to interact with SQL-like capabilities (e.g. creating `DataFrame`)

### HiveContext

> __Extension of SQLContext providing gateway to Hive__

Hive is similar in structure to SQL but provides capabilities for data warehouse and is better suited for analyzing large scale data

## SparkSession

Above differentiation was pretty unsustainable and since `v2.0` of Spark "one object to rule them all was introduced" `spark.SparkSession`.

In `pyspark` one can use it via `spark.sql.SparkSession` and it has the following capabilities:
- __wraps functionalities of ALL CONTEXTS above__
- we use a single object to interact with these APIs
- __We should use `builder` attribute to obtain appropriate `SparkSession`__

After `builder` attribute is run (which constructs appropriate context) one can use it just like `context` objects.

Please note:
- We use `config` to specify `SparkSession` configuration (essentially Spark configuration)
- We use `getOrCreate()` which:
    - If no global `Session` exists create a new one with specified config
    - If global `Session` exists:
        - Get an instance of it
        - Apply new configuration to it
    - This approach is save as __using multiple context is a bad practice!__ (although possible)


In [None]:
session = pyspark.sql.SparkSession.builder.config(conf=cfg).getOrCreate()

# Data Structures

Before diving in we need to talk about `3` available data structures in `spark`:
- `RDD` - __R__esilient __D__istributed __D__ataset - fault-tolerant collection of elements that can be operated on in parallel.
- `DataFrame` -  dataset organised into named columns. Conceptually equivalent to a table in a relational database or a data frame in R/Python, __but with richer optimisations under the hood.__
- `Dataset` - distributed collection of data. Provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine

![](./images/rdd_df_dataset_history.png)

# RDD & Core Spark API

> __Core and basic of Spark applications with "low-level" operations__

> __Fault-tolerant collection of elements that can be operated on in parallel.__

This structure provides strong typing (via `JVM` objects) and can be constructued in two ways:
- __parallelizing existing collection__ (e.g. Python's `list`)
- __referencing dataset in external storage__ (anything compatible with Hadoop's InputFormat like HDFS, HBase, Amazon S3, text files etc.)

Let's see these options:

In [None]:
rddDistributedData = session.sparkContext.parallelize([1, 2, 3, 4, 5])
rddDistributedFile = session.sparkContext.textFile("lorem.txt")

__Things to note for files__:
- __Each file has to be in the same path on each worker node!__ (in our case we are running locally hence this is fine)
- All file-based methods operate on:
    - directories - `textFile("/my/directory")`
    - wildcards - `textFile("/my/directory/*.txt")`
    - compressed files - `textFile("/my/directory/*.gz")`
- We can change number of partitions created for this file
- See API [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.textFile.html#pyspark.SparkContext.textFile)

> __Other ways to create RDD from file can be seen in [Spark Context API](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#spark-context-apis), e.g. a way to create it from `pickle`__

## Lazy Evaulation

> Created RDDs __ARE NOT FILES__, they are merely a description of operation that __has to be run at some point__

What we did above means:
- Parallelize `list` operation
- Read from text file `lorem.txt` (__but the read wasn't performed!__)

> All of the operations will be run when we __request an ACTION__

Actions may include:
- return number of lines in file (whole map-reduce went through)
- sum the list and return the result

## Persist

> Persisting is used in order to speed-up computations (saving intermediate results in memory)

If we run the line below it means:

> Read data file and cache read contents in the memory (if possible)

> __If we run "action" on the file it will use the cached data (faster) rather than loading data from disk once again!__

Rule of thumb: 

> Use cache when the lineage (operations to run on certain RDD) of your RDD branches out or when an RDD is used multiple times like in a loop.

In [None]:
# All of the operations return self
# This allows us to chain operations (we will see it in the next cell)

rddDistributedFile = rddDistributedFile.cache()

> __`.cache()` is the same as `.persist()` with `StorageLevel.MEMORY_ONLY`__

There are few other options to store the data:
- `MEMORY_ONLY` - keep everything we can in memory otherwise do not cache and compute results
- `MEMORY_AND_DISK` - keep everything we can in memory otherwise serialize to disk (__encouraged for long running computations we would like to cache__)
- `DISK_ONLY` - cache everything on disk, nothing in memory (__discouraged__)
- `MEMORY_ONLY_2` - same as `MEMORY_ONLY` but replicates cache on two cluster nodes for improved fault tolerance (`DISK_ONLY_2` is also available)

In [None]:
pyspark.StorageLevel.DISK_ONLY

## MapReduce operations

> Given parallelized data structure we can run map-reduce operations on it

All of them can be seen [in the documentation](https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis), a few interesting ones:
- [`rdd.checkpoint()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.checkpoint.html#pyspark.RDD.checkpoint) - will be saved in checkpoint directory and all the operations creating it __are discarded__ (action)
- [`rdd.collect()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.collect.html#pyspark.RDD.collect) - __return the structure__ (collect it after operations) (action)
- [`rdd.count()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.count.html#pyspark.RDD.count) - count elements in the structure (action)
- [`rdd.countByKey()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.countByKey.html#pyspark.RDD.countByKey) - count number of elements for each `key` in `(key, value)` pairs (similar to what the graphic before did)
- [`rdd.countByValue()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.countByValue.html#pyspark.RDD.countByValue) - count __how many unique values__ are in this structure (returned as `(value, count)` dictionary)

__And the essential ones we will use are:__
- [`rdd.map(f)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.map.html#pyspark.RDD.map) - apply function __to each element in the collection__
- [`rdd.filter(f)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.filter.html#pyspark.RDD.filter) - __choose values which fulfill `f` function__
- [`rdd.flatMap(f)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.flatMap.html#pyspark.RDD.flatMap) - __apply function to each element and `flatten` the list if necessary__
- [`rdd.fold(neutralValue, f)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.fold.html#pyspark.RDD.fold) - __given associative function (like `add`) takes every 2 elements together and returns the result__
- [`rdd.sortBy(keyfunction)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.sortBy.html#pyspark.RDD.sortBy) - sort by specific function which returns some value from the `(key, value)` pair

> __PLEASE REFER TO DOCUMENTATION WHEN LOOKING FOR AN OPERATOR! MANY OF THEM ARE ALREADY IMPLEMENTED!__

> __TAKE TIME TO COME UP WITH THE OPERATORS NEEDED! EACH OPERATION SAVED MIGHT IMPROVE RUNTIME TREMENDOUSLY!__

Let's see an example chaining on data:

In [None]:
# sc is standard name for sparkContext
# it will be easier to use from now on

sc = session.sparkContext

In [None]:
import operator

data = list(range(10,-11,-1))
print(data)

result = (
    sc.parallelize(data)
    .filter(lambda val: val % 3 == 0)
    .map(operator.abs)
    .fold(0, operator.add)
)

result

In [None]:
sc.parallelize(["b", "a", "c"]).count()

In [None]:
rddDistributedFile.flatMap(lambda text: text.split()).countByValue()

# Spark SQL

## Dataset and DataFrame

Dataset is a distributed collection of data which provides:
- strong typing and powerful lambda functions from `RDD`
- __allows for Spark SQL optimized execution engine__

It can be created from JVM objects __and manipulated in the same functional manner__.

> __`pyspark` has no Dataset API but many benefits of `Dataset` are available for `DataFrame`s DUE TO IT'S DYNAMIC NATURE__

DataFrame shortcomings included:
- No compile-time safety, hence __you cannot manipulate data of which structure is not specified__

> DataFrame is a a  Dataset organized into named columns (__same as for `pd.DataFrame`__)

From now on we will use `DataFrame`s (__not `Dataset`, also due to Python's community similarity with `pd.DataFrame`__) to keep our records.

See [this discussion](https://stackoverflow.com/questions/31508083/difference-between-dataframe-dataset-and-rdd-in-spark) for an extended description.

## Creating DataFrames

> __For all of the operations we can use `SparkSession` directly to interact with the cluster!__

There are a few options usable for us to read data residing on clusters (__for each node it has to be at the same location if reading from file!__):
- [`session.createDataFrame`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.createDataFrame.html#pyspark.sql.SparkSession.createDataFrame) - create `pyspark.sql.DataFrame` from:
    - `RDD`
    - `list`
    - `pandas.DataFrame`
    - __Optionally: with `schema`__ which specifies datatypes and format for data contained within it. See documentation for more info.
    - By default `schema` is inferred if possible
- [`session.range`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.range.html#pyspark.sql.SparkSession.range) - works like Python's range but distributed and as a `spark.DataFrame`
- [`session.sql(query)`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.sql.html#pyspark.sql.SparkSession.sql) - __return DataFrame which represents result of `sql` query__
- [`session.read.{how_to_read}()`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame) - __returns `DataFrameReader` object__ which allows us to read `df` from:
    - `json`
    - `parquet`
    - `csv`
    - and many more
- [`session.readStream`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.SparkSession.readStream.html#pyspark.sql.SparkSession.readStream) - __used for streaming, we will see it a little later__

Let's see some code with `pyspark.sql.DataFrame`:

In [None]:
import numpy as np
import pandas as pd

df = session.createDataFrame(
    pd.DataFrame(
        np.random.randint(0, 100, size=(100, 4)),
        columns=list("ABCD"),
    )
)

df.show()

In [None]:
df.printSchema()

In [None]:
# Show is an action, nothing would be returned without it
# Just an operation representing what will happen
df.select("A").show()

In [None]:
df.select(df["A"], df["B"] + 1)

In [None]:
# Increase column value by one
# This operation is shown in the output

df.select(df["A"], df["B"] + 1).show()

In [None]:
counted = df.groupby("B").count().persist()
counted.filter(counted["count"] > 1).show()

## Operations on DataFrame

> __`pyspark.sql.DataFrame` supports most of the `pd.DataFrame` operations + the RDD ones__

You can see the whole list [here](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.html#pyspark.sql.DataFrame)

> __In general one can work with it similarly to how one works with `pd.DataFrame` objects__

there are a few exceptions though...

## Running SQL queries

> In order to run SQL queries against the DataFrame __we have to register them as `TemporaryViews`__

Properties of `TemporaryViews`:
- __Session scoped__ - if session runs out of scope so will the views registered for it
- One can set up `DataFrame` globally for any `SparkSession` by using `df.createGlobalTempView("name_of_database")`

After that, we can run SQL queries against __distributed data across nodes__:

In [None]:
df.createOrReplaceTempView("any_name")

# WE USE SESSION TO RUN QUERIES!
sqlDf = session.sql("SELECT * FROM any_name")
sqlDf.show()

# Challenges

## Assessment

- Check out [`rdd.aggregate`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.aggregate.html#pyspark.RDD.aggregate) method for RDDs.
- What is the difference between `forEach` and `map`? Check [this StackOverflow answer](https://stackoverflow.com/questions/354909/is-there-a-difference-between-foreach-and-map) if in doubt
- What is the difference between `reduce` and `fold`? check [this StackOverflow answer](https://stackoverflow.com/a/36060141/10886420). Which one is "safer" to use?
- Which operations on RDDs induce `shuffle` and why is it a problem? See [here](https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations) for more info
- Check how to use [Hive](https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html) with PySpark. What Hive is and how it differs from SQL?
- Check out how to specify schema programmaticaly (presented [in this tutorial](https://spark.apache.org/docs/latest/sql-getting-started.html#programmatically-specifying-the-schema)). What are the upsides/downsides of using it?

## Non-assessment

- Read more about multiple `SparkContext`s and `SparkSession`s and why would we need it in some... contexts. Check it [over here](https://www.waitingforcode.com/apache-spark-sql/multiple-sparksession-one-sparkcontext/read)
- What is [`rdd.meanApprox`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.meanApprox.html#pyspark.RDD.meanApprox) and why might we need it?
- Generally discouraged, but what are the options to share data between tasks and nodes in the cluster? Check out [this part of RDD tutorial](https://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables)
- Check [performance tuning options for `spark.sql`](https://spark.apache.org/docs/latest/sql-performance-tuning.html). One can use them when creating `pyspark.SparkConf()` object