# LivySession

<img src='livy.png' width='60%'>

## Livy REST API

Endpoints:
* `/batches`: To submit batch files that are already on HDFS.
* `/sessions`: Interactive exeuction.
    * `/sessions/<id>/statements`: Used by Sparkmagic. Sends strings of code.
    * `/sessions/<id>/jobs`: Undocumeted endpoint to submit pickled functions interactively.

## Rebuilt Python API

Light-weight Python module called `LivySession`. Currently a PR to Eric's `AnacondaPlatform/livy-submit`.

Supports both `statements` and `jobs`. Works in both scripts and Notebooks.

## Setup the session

In [1]:
from livy_submit import kinit_username, LivySession

In [2]:
kinit_username('instructor', 'anaconda')

pypath = '/opt/anaconda3/bin/python'
config =  {"spark.yarn.appMasterEnv.PYSPARK_PYTHON": pypath, 
           "spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON":  pypath,
           "spark.yarn.executorEnv.PYSPARK_PYTHON": pypath,
           "spark.pyspark.python": pypath,
           "spark.pyspark.driver.python": pypath}


host = 'http://livy.training.anaconda.com:8998'
pyspark = LivySession(host, conf = {"conf": config})

Ticket cache: FILE:/tmp/krb5cc_501
Default principal: instructor@TRAINING.ANACONDA.COM

Valid starting     Expires            Service principal
07/19/19 10:57:46  07/19/19 20:57:46  krbtgt/TRAINING.ANACONDA.COM@TRAINING.ANACONDA.COM
	renew until 07/20/19 10:57:46

Starting session 153


## Execute code

Awkward to prepare code as string. Sparkmagic does the hard work.

In [3]:
code = '''x = [
    {'a': 1, 'b': 2},
    2/3
]
%json x'''

x = pyspark.execute(code)
x

Waiting for http://livy.training.anaconda.com:8998/sessions/153
Waiting for http://livy.training.anaconda.com:8998/sessions/153/statements/0


{'id': 0,
 'code': "x = [\n    {'a': 1, 'b': 2},\n    2/3\n]\n%json x",
 'state': 'available',
 'output': {'status': 'ok',
  'execution_count': 0,
  'data': {'application/json': [{'a': 1, 'b': 2}, 0.6666666666666666]}},
 'progress': 1.0}

In [4]:
x['output']['data']['application/json']

[{'a': 1, 'b': 2}, 0.6666666666666666]

## Submit jobs

The decorator does the following on run:
1. cloudpickle the function with supplied arguemnts (including functions it references)
1. submit job and wait
     * Spark pickles all returned values
1. unpickle values

In [5]:
@pyspark
def grouper(hive_table, group_col, value_col, metric):
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.enableHiveSupport().getOrCreate()
    
    df = spark.table(hive_table)
    grp = df.groupby(group_col)
    result = getattr(grp, metric)(value_col)
    
    return result.toPandas()

mpg = grouper('autompg', 'origin', 'mpg', 'mean')
mpg

Waiting for http://livy.training.anaconda.com:8998/sessions/153
Waiting for http://livy.training.anaconda.com:8998/sessions/153/jobs/1


Unnamed: 0,origin,avg(mpg)
0,Europe,27.602941
1,America,20.033469
2,Asia,30.450633


In [6]:
pyspark.stop()

Waiting for http://livy.training.anaconda.com:8998/sessions/153
stopped http://livy.training.anaconda.com:8998/sessions/153


If it can be pickled by cloudpickle it can be sent both ways.

Packages and versions should match between local environment and Spark session.
* `conda_pack` --> S3, HDFS --> `spark.pyspark.python`
* Ask Eric for more details.

TODO: Python files can be sent before the function is called to allow imports of non-packaged code.

In [7]:
import numpy as np

def pi_wallis(n):                                                          
    x = np.arange(1,n)                                                          
    t = 4*x*x                                                                   
    y = t / (t - 1)                                                             
    z = 2.0 * y.prod()                                                          
    return z


with LivySession(host, conf={'conf':config}) as new_session:
    @new_session
    def pi(n):
        import numpy as np
        from pyspark.context import SparkContext
        sc = SparkContext.getOrCreate()
        from random import random
        
        def sample(_):
            '''Find random numbers within the unit circle'''
            x = random()
            y = random()
            return 1 if x*x + y*y < 1 else 0
        
        count = (sc
                 .parallelize(range(0, n))
                 .map(sample)
                 .reduce(lambda a, b: a + b)
                )
        
        pi_spark = count * 4 / n
        pi_numpy = pi_wallis(n)
        
        return np.array([pi_spark, pi_numpy])

    result = pi(int(1e4))

Starting session 154
Waiting for http://livy.training.anaconda.com:8998/sessions/154
Waiting for http://livy.training.anaconda.com:8998/sessions/154/jobs/1
Waiting for http://livy.training.anaconda.com:8998/sessions/154
stopped http://livy.training.anaconda.com:8998/sessions/154


In [8]:
result

array([3.14      , 3.14151411])

## Wrap up

* Does this seem at all useful compared to Sparkmagic + livy_submit?
* What would you like to see?