In [91]:
import json
import pprint
import requests
from IPython import display
import attr
import urllib.parse

## Setup Livy session

In [80]:
host = 'http://localhost:8998'
data = {'kind': 'spark'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)

r.json()

{'id': 5,
 'appId': None,
 'owner': None,
 'proxyUser': None,
 'state': 'starting',
 'kind': 'spark',
 'appInfo': {'driverLogUrl': None, 'sparkUiUrl': None},
 'log': ['stdout: ', '\nstderr: ']}

In [20]:
display.HTML(f'''<iframe src="{host}" width=610px height=320px> </iframe>''')

## Configure GeoSpark Environment

GeoSpark Jars were copied into <b> $SPARK_HOME/jars directory </b>

After that we can get Livy <b> session id </b> from response headers. In our case id is 0.

## sending example code 

In [29]:
session_url = host + r.headers["Location"]
statements_url = session_url + '/statements'

In [30]:
data = {'code': '1 + 1'}
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
r.json()

{'id': 0, 'code': '1 + 1', 'state': 'waiting', 'output': None, 'progress': 0.0}

We can find result in statement 0 by sending proper get request.

In [33]:
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
pprint.pprint(r.json())

{'code': '1 + 1',
 'id': 0,
 'output': {'data': {'text/plain': 'res0: Int = 2\n'},
            'execution_count': 0,
            'status': 'ok'},
 'progress': 1.0,
 'state': 'available'}


## Let's prepare spark context for GeoSpark development

In [72]:
import logging
from importlib import reload
reload(logging)
log_format = "%(levelname)s %(asctime)-15s %(message)s"
logging.basicConfig(format=log_format, level="INFO")

In [73]:
logger = logging.getLogger()

In [116]:
class SessionNotExistError(Exception):
    pass

@attr.s
class LivySession:
    
    livy_session_ids = []
    host = attr.ib()
    is_running = attr.ib(default=False)
    session_id = attr.ib(default=None, validator=[attr.validators.instance_of(int)])
    
    def kill(self):
        pass
    
    def __attrs_post_init__(self):
        self.session_url = urllib.parse.urljoin(self.host, str(self.session_id))
    
    @classmethod
    def create_new(cls, host, data, headers):
        response = requests.post(host + '/sessions',
                                 data=json.dumps(data),
                                  headers=headers)
        if response.status_code == 201:
            sesion_id = response.json()["id"]
            cls.livy_session_ids.append(sesion_id)
            return cls(host, True, sesion_id)
        else:
            logger.info(f"Current code is {response.status_code}")
            raise ConnectionError("Response does not return code 200")
            
            
    @classmethod
    def from_existing(cls, host, session_id):
        if session_id not in cls.livy_session_ids:
            raise SessionNotExistError("This session id does not exists")
        
        return cls(host, True, session_id)

In [140]:
@attr.s
class LivyStatement:
    
    
    livy_session: LivySession = attr.ib()
    is_send = attr.ib(default=False, init=False)
    statement_id = attr.ib(default=None, init=False)
    
    def __attrs_post_init__(self):
        self.statements_url = self.livy_session.session_url + '/statements'
        
    def get_status(self):
        
        r = requests.get(statement_url, headers=headers)
        
        if r.status_code != 200:
            logger.error("Response code is not equal to 200")
            raise ConnectionError("Request returned code: {r.status_code}")
        pprint.pprint(r.json())  

    def send_from_text(self, code):
        data = {'code': code}
        if not self.is_send:
            self.is_send = True
            self.statement_id = self.send_code(data, headers)
            self.statement_url = urllib.parse.urljoin(self.statements_url,
                                                      str(self.statement_id ))
        else: 
            logger.info("This statement was sent")
            
    def send_from_file(self, code):
        """TODO add from file"""
    
    @staticmethod
    def send_code(data, headers = {'Content-Type': 'application/json'}):
        response = requests.post(statements_url,
                                 data=json.dumps(data),
                                 headers=headers)
        
        if response.status_code != 201:
            logger.error(f"Returned code {response.status_code}")
            raise ConnectionError("Connection is failed")
        
        return r.json()["id"]


In [110]:
livy_session = LivySession.create_new(host,
                                      data = {'kind': 'spark'},
                                      headers={'Content-Type': 'application/json'})

In [111]:
livy_session

LivySession(host='http://localhost:8998', is_running=True, session_id=0)

In [None]:
livy_statement = LivyStatement(livy_session)

In [146]:
livy_statement

LivyStatement(livy_session=LivySession(host='http://localhost:8998', is_running=True, session_id=0), is_send=True, statement_id=5)

In [144]:
livy_statement.send_from_text("println(1)")

INFO 2019-03-24 19:37:55,370 This statement was sent


In [145]:
livy_statement.get_status()

{'code': 'println(1)',
 'id': 0,
 'output': {'data': {'text/plain': '1\n'},
            'execution_count': 0,
            'status': 'ok'},
 'progress': 1.0,
 'state': 'available'}


{'Date': 'Sun, 24 Mar 2019 17:00:06 GMT', 'Content-Type': 'application/json', 'Content-Encoding': 'gzip', 'Location': '/sessions/0', 'Transfer-Encoding': 'chunked', 'Server': 'Jetty(9.2.16.v20160414)'}