# SparkSession

## SparkSession with Python dependencies

> + Resource Manager is buildin "Standalone"  
> + Deploy Mode is "client" (only possible with interactive session)

Starting a SparkSession with Python dependencies send by `--archives` flag, see [Python Package Management](https://spark.apache.org/docs/latest/api/python/user_guide/python_packaging.html)

Equivalent submit command: 

```bash
export PYSPARK_DRIVER_PYTHON=python
export PYSPARK_PYTHON=./environment/bin/python

spark-submit --deploy-mode client --master spark://spark-master:7077 \
    --archives /app/jobs/pyspark_venv.tar.gz#environment \
    <pyspark-job.py>
```

+ N.B. these configs don't work as `os.environ`

```python
    .config("spark.archives", "/app/jobs/pyspark_venv.tar.gz#environment")\
    .config("spark.pyspark.python", "./environment/bin/python")\
```

In [1]:
import os
from pyspark.sql import SparkSession 
os.environ['PYSPARK_PYTHON'] = "./environment/bin/python"
# sets the Python path for workers, pointing to the extracted archive under a SparkSession's working directory
# e.g. full path on worker is like: /opt/spark/work/app-20211009135519-0000/1/./environment

spark = SparkSession.builder\
    .appName("pyspark-notebook-dep")\
    .master("spark://spark-master:7077")\
    .config("spark.archives", "/app/jobs/pyspark_venv.tar.gz#environment")\
    .getOrCreate()
# spark.archvies add the file from this machine (jupyter-server) to the workers
# it's extracted by Spark to subdirectory "environment" in SparkSession's working directory
# note this is in Client mode, so Driver should have the same dependencies installed with running Python environment
spark

21/10/10 08:21:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark.mllib.random import RandomRDDs

x = RandomRDDs.normalVectorRDD(
    spark.sparkContext, 
    numRows=10000, 
    numCols=5, 
    numPartitions=20, 
    seed=42
)
x.collect()[:3]
# requires numpy package

                                                                                

[array([-0.75661355, -0.83595055, -0.54290339,  0.83210849, -0.78727577]),
 array([ 0.95666722, -1.34126376, -0.68323051, -1.15742816, -0.03667599]),
 array([-0.66918965, -0.54477455, -0.34275965, -0.46614391, -1.07408784])]

In [3]:
spark.stop()

## SparkSession with JAVA packages

> + Add JAVA packages from Maven repository, e.g. JDBC driver

+ [Advanced Dependency Management](http://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management)
+ [JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)
+ [PostgreSQL JDBC Driver](https://mvnrepository.com/artifact/org.postgresql/postgresql)
+ [Connecting to the Database](https://jdbc.postgresql.org/documentation/head/connect.html)


Equivalent submit command: 

```bash
spark-submit --deploy-mode client --master spark://spark-master:7077 \
    --packages "org.postgresql:postgresql:42.2.24" \
    <pyspark-job.py>
```

In [3]:
from pyspark.sql import SparkSession 

spark = SparkSession.builder\
    .appName("pyspark-notebook-java-dep")\
    .master("spark://spark-master:7077")\
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.24")\
    .getOrCreate()
# if multiple, separated by comma
spark

Ivy Default Cache set to: /home/yuan/.ivy2/cache
The jars for the packages stored in: /home/yuan/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dc59db93-2336-4583-b29e-841d0885fe12;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/yuan/.local/lib/python3.8/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found org.postgresql#postgresql;42.2.24 in central
	found org.checkerframework#checker-qual;3.5.0 in central
downloading https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.24/postgresql-42.2.24.jar ...
	[SUCCESSFUL ] org.postgresql#postgresql;42.2.24!postgresql.jar (267ms)
downloading https://repo1.maven.org/maven2/org/checkerframework/checker-qual/3.5.0/checker-qual-3.5.0.jar ...
	[SUCCESSFUL ] org.checkerframework#checker-qual;3.5.0!checker-qual.jar (52ms)
:: resolution report :: resolve 620ms :: artifacts dl 321ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.postgresql#postgresql;42.2.24 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   2

In [8]:
print(spark.sparkContext._jsc.sc().listJars())

Vector(spark://1556bf03aab8:41119/jars/org.postgresql_postgresql-42.2.24.jar, spark://1556bf03aab8:41119/jars/org.checkerframework_checker-qual-3.5.0.jar)


## SparkSession with files (not working)

> + Send files to worker by SparkFiles

+ [pyspark.SparkContext.addFile](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.addFile.html)
+ [Class SparkContext](https://spark.apache.org/docs/latest/api/java/org/apache/spark/SparkContext.html)
+ [Help with Simple pyspark example on Dataproc](https://groups.google.com/g/cloud-dataproc-discuss/c/cubkWrjkk2g)

TODO: 
+ try with Scala or `--files` flag

Notes: 

1. `spark.sparkContext.addFile` just adds the given file to a temporary path on Driver (e.g. this jupyter instance). The path and copied file can be found by `SparkFiles.get(<filename>)`, or just go to its directory at `SparkFiles.getRootDirectory()`. When action is performed, the copied file will be added to the workers in their working directory, which can be seen in _stderr_ of worker's logs. 
2. The problem is that how the path is interpreted. On driver it is fine, but on workers if it's interpreted literally then find is not found. Since the actual path on workers are just in the working directory. The example from documentation does work, however call to `SparkFiles.get(<filename>)` is executed on the workers. So I think that returns the correct path to the added file. Some more details are given in the third link above. 
3. For now, the data files should be added through the mounted data directory




## SparkSession

> + Resource Manager is buildin "Standalone"  
> + Deploy Mode is "client" (only possible with interactive session)

The Python dependencies must be alreay installed on all workers (and Jupyter server)

## SparkSession local mode

> + Scheduler and executore all on the same JVM, i.e. this jupyter server instance

If the dependencies are already with Jupyter Server, then it'll work