In [None]:
#!pip install aiohttp

### 0. Prepare session variables

In [1]:
import dsx_core_utils
DSXHI_SYSTEMS = dsx_core_utils.get_dsxhi_info(showSummary=False)

Available Hadoop systems: 

['CDH-513-Stable', 'Chell Edge [shad1]', 'DataPuddle6', 'SL-Prod-HDP', 'ak-cdh632-edge-1', 'asgardian-edge', 'bosom', 'cdh513-outsider1', 'epizoon1', 'yccdh5']


In [2]:
import os, json
import logging, requests, json, time
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p',level=logging.INFO)

# User INPUT
USER_DSXHI_SYSTEM="CDH-513-Stable"
USER_DSXHI_IMAGEID="jupyter-py36"
import dsx_core_utils
HI_CONFIG = dsx_core_utils.setup_livy_sparkmagic(
                          system=USER_DSXHI_SYSTEM, 
                          imageId=USER_DSXHI_IMAGEID,
                          livy="livyspark2")

sparkmagic has been configured to use https://ibm-dsxhi-cdh513stable-svc:8443/livy2/v1 with image Jupyter with Python 3.6
success configuring sparkmagic livy.


In [3]:
headers = {'Content-Type':"application/json",
           'Authorization':"Bearer {}".format(os.environ['USER_ACCESS_TOKEN']),
           'X-Requested-By': 'dsx'}

import sparkmagic.utils.configuration as sm_conf
LIVY_PAYLOAD = sm_conf.session_configs()
LIVY_PAYLOAD['conf']['livy.rsc.server.connect.timeout']='300s'
LIVY_PAYLOAD['driverMemory']='900M'
LIVY_PAYLOAD['executorMemory']='600M'
LIVY_PAYLOAD['kind']='pyspark'


## 1. Define Methods used for testing

In [6]:
def start_livy_session():
    '''
    returns: sessionId if sucessful
    '''
    livy_url="{}/sessions".format(HI_CONFIG['LIVY'])
    livy_session_req = requests.post(livy_url, 
                               data=json.dumps(LIVY_PAYLOAD),
                               headers=headers, verify=False)
    return livy_session_req

def check_livy_sessions(session_id):
    '''
    Checks whether a livy session is started
    '''
    livy_url="{}/sessions/{}".format(HI_CONFIG['LIVY'],session_id)
    livy_session_req = requests.get(livy_url, 
                               headers=headers, verify=False)
    
    if livy_session_req:
        if livy_session_req.json()['state']=='idle':
            logging.info("Session %s started successfully",session_id)
        if livy_session_req.json()['state']=='starting':
            logging.info("Session %s still starting",session_id)
        if livy_session_req.json()['state']=='dead':
            logging.info("Session %s failed to start",session_id)
    else:
        logging.error("Session Not found: %s",session_id)
        return False
    return livy_session_req

def post_livy_statement(session_id):
    '''
    -X POST \
        ${dsxhi_svc_url}/livy2/v1/sessions/${USER2_SESSION_ID}/statements -d'{"code":"d=2"}'
    '''
    livy_url="{}/sessions/{}/statements".format(HI_CONFIG['LIVY'],session_id)
    stmt_payload={"code":"d=2\nprint('Ran successfull on yarn:{}'.format(sc.version))"}
    livy_stmt_req = requests.post(livy_url, 
                               data=json.dumps(stmt_payload),
                               headers=headers, verify=False)
    return livy_stmt_req

def check_livy_statement(session_id):
    livy_url="{}/sessions/{}/statements/0".format(HI_CONFIG['LIVY'],session_id)
    livy_stmt_req = requests.get(livy_url, 
                               headers=headers, verify=False)
    return livy_stmt_req

def delete_livy_session(session_id):
    '''
    Deletes a single livy session
    '''
    livy_url="{}/sessions".format(HI_CONFIG['LIVY'])
    delete_session_url="{}/{}".format(livy_url,session_id)
    
    delete_req = requests.delete(delete_session_url, 
                                headers=headers, verify=False)
    return delete_req

In [7]:
# new_session=start_livy_session()
#new_session.json()
#del_session = delete_livy_session(593)
#del_session.status_code
#del_session.json()

---
# 2. Test Multi-Livy Sessions
- Start sessions
- Wait for sessions to start
- Post a command
- Verify Heap usage on DSXHI Edge Node

  ```
  0. Grab the Livy PID and Knox PID, for each:

  1. Run /usr/java/jdk1.8.0_172/bin/jmap -heap <pid> 
  2. Add up the total "used: " values displayed(Eden,From,To,PS) 
  3. Check CPU usage of Livy PID / Knox PID (top -i)
  ```
  
- Delete Sessions

### 2.1 - Test 5 sessions

In [11]:
# Start sessions
list_of_session_ids=[]
for x in range(0,5):
    new_session=start_livy_session()
    if new_session and new_session.json()['id']:
        list_of_session_ids.append(new_session.json()['id'])
print(list_of_session_ids)

[625, 626, 627, 628, 629]


In [31]:
#Wait for sessions to start
for x in list_of_session_ids:
    check_livy_sessions(x)

03/13/2020 08:35:03 PM Session 625 started successfully
03/13/2020 08:35:04 PM Session 626 started successfully
03/13/2020 08:35:04 PM Session 627 started successfully
03/13/2020 08:35:04 PM Session 628 started successfully
03/13/2020 08:35:04 PM Session 629 started successfully


In [32]:
#Post a statement to each session (once all are started)
for x in list_of_session_ids:
    post_livy_statement(x)

In [33]:
#Verify stmt ran
for x in list_of_session_ids:
    print(check_livy_statement(x).json()['output'])

{'status': 'ok', 'execution_count': 0, 'data': {'text/plain': 'Ran successfull on yarn:2.3.0.cloudera3'}}
{'status': 'ok', 'execution_count': 0, 'data': {'text/plain': 'Ran successfull on yarn:2.3.0.cloudera3'}}
{'status': 'ok', 'execution_count': 0, 'data': {'text/plain': 'Ran successfull on yarn:2.3.0.cloudera3'}}
{'status': 'ok', 'execution_count': 0, 'data': {'text/plain': 'Ran successfull on yarn:2.3.0.cloudera3'}}
{'status': 'ok', 'execution_count': 0, 'data': {'text/plain': 'Ran successfull on yarn:2.3.0.cloudera3'}}


- **PAUSE HERE - CHECK DSXHI EDGE NODE JAVA PROCESSES**

In [34]:
# Delete sessions
for x in list_of_session_ids:
    delete_livy_session(x)

In [35]:
# Ensure they are deleted
for x in list_of_session_ids:
    check_livy_sessions(x)

03/13/2020 08:40:15 PM Session Not found: 625
03/13/2020 08:40:15 PM Session Not found: 626
03/13/2020 08:40:15 PM Session Not found: 627
03/13/2020 08:40:15 PM Session Not found: 628
03/13/2020 08:40:15 PM Session Not found: 629


---

In [None]:
check_livy_sessions(594).json()

In [None]:
async with aiohttp.ClientSession() as session:
    async with session.post(url, json={'test': 'object'})

In [None]:
import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
