<a href="https://colab.research.google.com/github/Davz33/tutorials/blob/data/apache_spark_pyspark_with_Docker_from_jupyter.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Run a stand-alone Apache Spark on a Docker Container with PySpark from a Jupyter notebook running elsewhere

> Note: I haven't tested this on G. Collab notebooks, but on my jupyter-lab local instance

Copyright 2023 Davide Vitiello
> davide_vitiello@outlook.com 

> github.com/davz33

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

In [None]:
!python --version
!pip --version

Python 3.10.9
pip 22.3.1 from /opt/conda/envs/tf/lib/python3.10/site-packages/pip (python 3.10)


## Preliminary checks

Here I check I'm in the right conda env (or virtualenv, by extension):  
I look for the presence of tensorflow, that I know to have installed , and the name of the conda-env

In [None]:
!pip freeze | grep tensorflow
!echo $CONDA_DEFAULT_ENV

tensorflow-estimator==2.10.0
tensorflow-gpu @ file:///mnt/d/Davide/Dati/Download/tensorflow_gpu-2.10.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl
tensorflow-io-gcs-filesystem==0.30.0
tf


## Apache Spark with Pyspark docker container pull and run

I create a new `docker-compose.yml` with the following:  
> note: I'm using .env with .autoenv, hence I tried to replace the default .env feeding step. You don't need to do that if your .env is meant to be fed to docker, or if you don't have any
```yaml
#run via 'docker compose up --env-file=dummy.env -d'
version: '3'
services:
  pyspark:
    env_file:
      # avoid consuming .env (used by .autoenv), note: in compose v2 this does not work and it's necessary to run compose with the --env-file=/dev/null flag 
      - dummy.env
    image: apache/spark-py:v3.2.3
    volumes:
      - ./pycode:/var/code:ro
    entrypoint: /bin/sh
    stdin_open: true 
    tty: true
```

In [None]:
!docker ps | grep spark

7e45fa215276   apache/spark-py:v3.2.3   "/bin/sh"                11 hours ago   Up 11 hours                              apache_spark-pyspark-1


In [None]:
!docker exec --help

## Invoking pyspark within the docker container from a jupyter-notebook located elsewhere

In [None]:
!mkdir pyspark_submits 
import time

In [None]:
!echo "print(1+1)" > pyspark_submits/test_pyspark_submit.py

In [None]:
!docker exec apache_spark-pyspark-1 /opt/spark/bin/spark-submit \
    --conf spark.jars.ivy=/tmp/.ivy \
    /var/code/pyspark_submits/test_pyspark_submit.py \
    &> spark_res.txt #redirect output to a file 
!grep -Ev 'WARNING|WARN' spark_res.txt #skip printing warnings

2


## Set up an alias to invoke a pyspark statement with ease

In [None]:
SPARK_submits = !ls | grep submits
print(SPARK_submits)

['pyspark_submits']


In [None]:
import time
import os

def time_decorator(function):
   def wrapper(*args, **kwargs):
       if(kwargs.get('benchmark')):
            res = %timeit function(*args, **kwargs)
       else:
            res = function(*args, **kwargs)
       return res
   return wrapper

In [None]:
@time_decorator
def spksub(statement, dump = False, nohup_installed = True, print_warns = False, benchmark = False) -> str:
    now_ = str(time.time()).replace('.','_')
    path_prefix = os.path.join(SPARK_submits[0],now_)
    submit_job = str(path_prefix) + '.py'
    
    with open(submit_job,'w+') as f:
        f.write(statement)
    res_f = str(path_prefix) + '.out'  
    mapped_sjob = os.path.join('/var/code/',submit_job)#is a: dir within container
    
    res = !docker exec apache_spark-pyspark-1 \
    /opt/spark/bin/spark-submit \
    --conf spark.jars.ivy=/tmp/.ivy \
    {mapped_sjob} \
    &> {res_f} #is a: dir within jupyer-lab host
    
    if not dump: 
        if nohup_installed:
            !nohup rm {res_f} &;
        else:
            !rm {res_f}
    res = ''
    if not print_warns:        
        res = !grep -Ev 'WARNING|WARN' {res_f}
    else:
        res = !cat {res_f}
    return res


In [None]:
spksub('print(1+1)')

['2']

In [None]:
spksub('print(1+1)', benchmark = True)

2.17 s ± 96.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
