In [1]:
# Imports needed for the cells which run locally on Watson Studio.

import dsx_core_utils

<a id='Create_Livy_Session'></a>
## Create Remote Livy Session

First, let's get a list of registered Hadoop Integration systems. Look for a system that has the `imageId` of your custom image.

In [2]:
DSXHI_SYSTEMS = dsx_core_utils.get_dsxhi_info(showSummary=True)

Available Hadoop systems: 

        systemName LIVYSPARK  LIVYSPARK2                  imageId
0   durotar-hdp301            livyspark2                         
1  asgardia-hdp264            livyspark2  dsx-scripted-ml-python2


In [3]:
myConfig={
 "queue": "default",
 "driverMemory": "2G",
 "numExecutors": 1
}

In [6]:
# Set up sparkmagic to connect to the selected registered HI
# system with the specified configs.
dsx_core_utils.setup_livy_sparkmagic(
  system="asgardia-hdp264", 
  livy="livyspark2",
  imageId="dsx-scripted-ml-python2",
  addlConfig=myConfig)

# (Re-)load spark magic to apply the new configs.
%reload_ext sparkmagic.magics

sparkmagic has been configured to use https://asgardian-edge.fyre.ibm.com:8443/gateway/jalv-dsx121g-master-1/livy2/v1 with image Jupyter with Python 2.7, Scala 2.11, R 3.4.3
success configuring sparkmagic livy.


In [8]:
session_name = 'workshop-part1'
livy_endpoint = 'https://asgardian-edge.fyre.ibm.com:8443/gateway/jalv-dsx121g-master-1/livy2/v1'
%spark add -s $session_name -l python -k -u $livy_endpoint

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
450,application_1538175267044_0064,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


## Create and use custom functions remotely

**run_command** - Simple wrapper to subprocess, to run a linux command whithin the Driver YARN Container

**spark_dfs_topandas** - Sample function that takes 2 Spark DFs and returns 2 Pandas DFs, necessary for ML tools such as Scikit which can only use Pandas.

In [20]:
%%spark -s $session_name
from subprocess import Popen, PIPE, STDOUT

def run_command(command, sleepAfter=None):        
    p = Popen(command, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    output = p.stdout.read()
    print(output)
    if (sleepAfter != None):
        time.sleep(sleepAfter)
        
def spark_dfs_topandas(DF1,DF2):
    return DF1.toPandas(),DF2.toPandas()

For reference / debugging: Print out the name of the Hadoop node to which the remote session has been assigned. 
When "local" files are created within the remote session, they will be written to this node. All of the Yarn container artifacts (workspace and temp files) will exist on this node, as well.

In [19]:
%%spark -s $session_name
run_command("hostname -f")
run_command("pwd")

shad2.fyre.ibm.com

/hadoop/yarn/local/usercache/user1/appcache/application_1538175267044_0064/container_e25_1538175267044_0064_01_000001

## Part 2. Create quickens_demo_utils package and add via sc.addPyFile

Once a set of functions are "stable" and ready to package, you can use a Python setup.py file to create a new "quickens_demo_utils" python package.

In [31]:
!ls ../packages/python

quickens_demo_utils-0.1.tar.gz


The "quickens_demo_utils" package can now be imported locally (DSX) to test it was added to the image properly.

In [28]:
from quickens_demo_utils import qutils

### Upload the demo_utils tar to HDFS

Note: This does not require admin privileges. 

As a user, upload the tar.gz to an accessible HDFS directory (Such as your /user/ directory). This allows end users to replace the file / test it before making it a part of a base DSX Runtime Image. 

In [33]:
# Show registered WebHDFS Secure URLS which logged in user has access to:
import dsx_core_utils
dsx_core_utils.list_dsxhi_webhdfs_endpoints();

['https://asgardian-edge.fyre.ibm.com:8443/gateway/jalv-dsx121g-master-1/webhdfs/v1', 'https://durotar-edge.fyre.ibm.com:8443/gateway/jalv-dsx121g-master-1/webhdfs/v1']


In [32]:
dsxlocal_file_location="../packages/python/quickens_demo_utils-0.1.tar.gz"
dsxhi_upload_hdfs_location="/user/user1/quickens_demo_utils-0.1.tar.gz"
webhdfs_endpoint="<your cluster webhdfs output from above>"

dsx_core_utils.hdfs_util.upload_file(webhdfs_endpoint, dsxlocal_file_location, dsxhi_upload_hdfs_location )


### Add the zip from HDFS to a new Livy Session

In [None]:
session_name = 'workshop-part1'
livy_endpoint = HI_CONFIG['LIVY']
webhdfs_endpoint = HI_CONFIG['WEBHDFS']
%spark add -s $session_name -l python -k -u $livy_endpoint

In [None]:
%%spark

sc.addPyFile("/user/user1/quickens_demo_utils-0.1.tar.gz")

In [None]:
%%spark

from quickens_demo_utils import qutils