Skip to content
This repository has been archived by the owner on Feb 3, 2021. It is now read-only.

Commit

Permalink
Feature: spark ui proxy plugin (#467)
Browse files Browse the repository at this point in the history
* initial commit

* add args

* add docs

* change default plugins

* update ssh cli ui, remove plugin name

* change conditional

* update docs to include jupyterlab

* remove spark_ui_proxy as default plugin
  • Loading branch information
jafreck committed Apr 23, 2018
1 parent 4ba3c9d commit 2e995b4
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 19 deletions.
1 change: 1 addition & 0 deletions aztk/models/plugins/internal/plugin_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PluginManager:
jupyterlab=plugins.JupyterLabPlugin,
rstudio_server=plugins.RStudioServerPlugin,
hdfs=plugins.HDFSPlugin,
spark_ui_proxy=plugins.SparkUIProxyPlugin,
)

def __init__(self):
Expand Down
1 change: 1 addition & 0 deletions aztk/spark/models/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
from .jupyter import *
from .jupyter_lab import *
from .rstudio_server import *
from .spark_ui_proxy import *
1 change: 1 addition & 0 deletions aztk/spark/models/plugins/spark_ui_proxy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .configuration import *
26 changes: 26 additions & 0 deletions aztk/spark/models/plugins/spark_ui_proxy/configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os
from aztk.models.plugins.plugin_configuration import PluginConfiguration, PluginPort, PluginRunTarget
from aztk.models.plugins.plugin_file import PluginFile
from aztk.utils import constants

dir_path = os.path.dirname(os.path.realpath(__file__))


class SparkUIProxyPlugin(PluginConfiguration):
def __init__(self):
super().__init__(
name="spark_ui_proxy",
ports=[
PluginPort(
internal=9999,
public=True
)
],
run_on=PluginRunTarget.Master,
execute="spark_ui_proxy.sh",
args=["localhost:8080", "9999"],
files=[
PluginFile("spark_ui_proxy.sh", os.path.join(dir_path, "spark_ui_proxy.sh")),
PluginFile("spark_ui_proxy.py", os.path.join(dir_path, "spark_ui_proxy.py")),
],
)
116 changes: 116 additions & 0 deletions aztk/spark/models/plugins/spark_ui_proxy/spark_ui_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#!/usr/bin/env python
# MIT License
#
# Copyright (c) 2016 Alexis Seigneurin
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import os
import sys
from urllib.request import urlopen
import socketserver
from http.server import BaseHTTPRequestHandler, HTTPServer

BIND_ADDR = os.environ.get("BIND_ADDR", "0.0.0.0")
SERVER_PORT = int(os.environ.get("SERVER_PORT", "80"))
URL_PREFIX = os.environ.get("URL_PREFIX", "").rstrip('/') + '/'
SPARK_MASTER_HOST = ""


class ProxyHandler(BaseHTTPRequestHandler):
def do_GET(self):
# redirect if we are hitting the home page
if self.path in ("", URL_PREFIX):
self.send_response(302)
self.send_header("Location", URL_PREFIX + "proxy:" + SPARK_MASTER_HOST)
self.end_headers()
return
self.proxyRequest(None)

def do_POST(self):
length = int(self.headers.getheader('content-length'))
postData = self.rfile.read(length)
self.proxyRequest(postData)

def proxyRequest(self, data):
targetHost, path = self.extractUrlDetails(self.path)
targetUrl = "http://" + targetHost + path

print("get: %s host: %s path: %s target: %s" % (self.path, targetHost, path, targetUrl))

proxiedRequest = urlopen(targetUrl, data)
resCode = proxiedRequest.getcode()

if resCode == 200:
page = proxiedRequest.read()
page = self.rewriteLinks(page, targetHost)
self.send_response(200)
self.end_headers()
self.wfile.write(page)
elif resCode == 302:
self.send_response(302)
self.send_header("Location", URL_PREFIX + "proxy:" + SPARK_MASTER_HOST)
self.end_headers()
else:
raise Exception("Unsupported response: " + resCode)

def extractUrlDetails(self, path):
if path.startswith(URL_PREFIX + "proxy:"):
start_idx = len(URL_PREFIX) + 6 # len('proxy:') == 6
idx = path.find("/", start_idx)
targetHost = path[start_idx:] if idx == -1 else path[start_idx:idx]
path = "" if idx == -1 else path[idx:]
else:
targetHost = SPARK_MASTER_HOST
path = path
return (targetHost, path)

def rewriteLinks(self, page, targetHost):
target = "{0}proxy:{1}/".format(URL_PREFIX, targetHost).encode()
page = page.replace(b'href="/', b'href="' + target)
page = page.replace(b"'<div><a href=' + logUrl + '>'",
b"'<div><a href=' + location.origin + logUrl.replace('http://', '/proxy:') + '>'")
page = page.replace(b'href="log', b'href="' + target + b'log')
page = page.replace(b'href="http://', b'href="' + URL_PREFIX.encode() + b'proxy:')
page = page.replace(b'src="/', b'src="' + target)
page = page.replace(b'action="', b'action="' + target)
page = page.replace(b'"/api/v1/', b'"' + target + b'api/v1/')
return page


if __name__ == '__main__':
if len(sys.argv) < 2:
print("Usage: <proxied host:port> [<proxy port>]")
sys.exit(1)

SPARK_MASTER_HOST = sys.argv[1]

if len(sys.argv) >= 3:
SERVER_PORT = int(sys.argv[2])

print("Starting server on http://{0}:{1}".format(BIND_ADDR, SERVER_PORT))

class ForkingHTTPServer(socketserver.ForkingMixIn, HTTPServer):
def finish_request(self, request, client_address):
request.settimeout(30)
HTTPServer.finish_request(self, request, client_address)

server_address = (BIND_ADDR, SERVER_PORT)
httpd = ForkingHTTPServer(server_address, ProxyHandler)
httpd.serve_forever()
2 changes: 2 additions & 0 deletions aztk/spark/models/plugins/spark_ui_proxy/spark_ui_proxy.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
python $DOCKER_WORKING_DIR/plugins/spark_ui_proxy/spark_ui_proxy.py $1 $2 &
12 changes: 7 additions & 5 deletions aztk_cli/config/cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ docker_repo:
# To add your cluster to a virtual network provide the full arm resoruce id below
# subnet_id: /subscriptions/********-****-****-****-************/resourceGroups/********/providers/Microsoft.Network/virtualNetworks/*******/subnets/******

# To define plugins
# plugins:
# - name: rstudio_server
# args:
# version: 1.2.3
# Enable plugins
plugins:
# - name: spark_ui_proxy
# - name: jupyterlab
# - name: jupyter
# - name: hdfs
# - name: rstudio_server

# wait: <true/false>
wait: false
27 changes: 14 additions & 13 deletions aztk_cli/spark/endpoints/cluster/cluster_ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,25 +78,26 @@ def execute(args: typing.NamedTuple):


def print_plugin_ports(cluster_config: ClusterConfiguration):

if cluster_config and cluster_config.plugins:
plugins = cluster_config.plugins
has_ports = False
plugin_ports = {}
for plugin in plugins:
plugin_ports[plugin.name] = []
for port in plugin.ports:
if port.expose_publicly:
has_ports = True
break

if has_ports > 0:
plugin_ports[plugin.name].append(port)
if has_ports:
log.info("plugins:")
for plugin in plugins:
for port in plugin.ports:
if port.expose_publicly:
label = " - open {}".format(plugin.name)

if port.name:
label += " {}".format(port.name)

url = "{0}{1}".format(http_prefix, port.public_port)
utils.log_property(label, url)
for plugin in plugin_ports:
if plugin_ports[plugin]:
log.info(" " + plugin)
for port in plugin_ports[plugin]:
label = " - open"
if port.name:
label += " {}".format(port.name)
url = "{0}{1}".format(http_prefix, port.public_port)
utils.log_property(label, url)
5 changes: 5 additions & 0 deletions docs/13-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,8 @@ spark.jars $spark_home/jars/my_jar_file_1.jar,$spark_home/jars/my_jar_file_2.jar
````
Note: _This tool automatically registers several JARS for default cloud storage in the spark-default.conf file. If you want to modify this file, simply append any additional JARS to the end of this list_.
## Next Steps
- [Add plugins](./15-plugins.md)
- [Set up your Cloud Storage](./30-cloud-storage.md)
40 changes: 40 additions & 0 deletions docs/15-plugins.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Plugins

## Supported Plugins
AZTK ships with a library of default plugins that enable auxillary services to use with your Spark cluster.

Currently the following plugins are supported:

- JupyterLab
- Jupyter
- HDFS
- RStudioServer
- Spark UI Proxy

### Enable a plugin using the CLI
If you are uing the `aztk` CLI and wish to enable a supported plugin, you need to update you `.aztk/cluster.yaml` configuration file.

Add or uncomment the `plugins` section and set the plugins you desire to enable as follows:
```yaml
plugins:
- name: jupyterlab
- name: jupyter
- name: hdfs
- name: spark_ui_proxy
- name: rsutio_server
version: "1.1.383"
```
### Enable a plugin using the SDK
If you are uing the `aztk` SDK and wish to enable a supported plugin, you need to import the necessary plugins from the `aztk.spark.models.plugin` module and add them to your ClusterConfiguration object's plugin list:
```python
from aztk.spark.models.plugins import RStudioServerPlugin, HDFSPlugin
cluster_config = ClusterConfiguration(
...# Other config,
plugins=[
JupyterPlugin(),
RStudioServerPlugin(version="1.1.383"),
HDFSPlugin(),
]
)
```
2 changes: 1 addition & 1 deletion docs/20-spark-submit.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Submitting a Job
# Submitting an Application
Submitting a job to your Spark cluster in this package mimics the experience of a typical standalone cluster. A spark job will be submitted to the system and run to completion.

## Spark-Submit
Expand Down

0 comments on commit 2e995b4

Please sign in to comment.