# Running SageMaker Studio Notebooks Remotely via API

This notebook demonstrates use of the [JupyterServer API](https://github.com/jupyter/jupyter/wiki/Jupyter-Notebook-Server-API#Kernel-API) and the [Jupyter Client (websocket) API](https://jupyter-client.readthedocs.io/en/latest/messaging.html) to remotely run commands on (specifically, 'all the code cells of') a notebook in SageMaker Studio.

It's presented as a notebook to give more space for commentary, and because I used a SageMaker Notebook Instance in the same AWS region to test it out 😁 ...But you could re-purpose the same code in some other environment (like a Lambda function) to run whatever automations you need.

The main constraint is that your execution environment **needs IAM permission** `sagemaker:CreatePresignedDomainUrl` on the target `DomainId` and `UserProfileName` - which lets this script **log in as the SageMaker Studio user** to run the commands.

In [1]:
# Python Built-Ins:
import asyncio
from datetime import datetime
import json
import time
import uuid

# External Dependencies:
import boto3
import requests
import websocket

smclient = boto3.client("sagemaker")

## Log in

For access to the APIs, we'll need to:

- Generate the initial presigned login URL via SageMaker API
- Open a `requests.session` to persist the headers/cookies/etc that get set when we first open the URL and then make requests
- Remember to set the required **cross-site request forgery protection token** from cookies, on update request types like `POST`, `DELETE`, etc (if you're not familiar with this CSRF/XSRF protection mechanism, you can read more [here](https://en.wikipedia.org/wiki/Cross-site_request_forgery#Cookie-to-header_token))

In [2]:
# Generate the presigned URL which facilitates login:
presigned_resp = smclient.create_presigned_domain_url(
    DomainId="d-YOUR-DOMAIN-ID-HERE",
    UserProfileName="YOUR-TARGET-USER-PROFILE-NAME-HERE",
)

# Login like https://d-....studio.{AWSRegion}.sagemaker.aws/auth?token=...
login_url = presigned_resp["AuthorizedUrl"]
# API relative to https://d-....studio.{AWSRegion}.sagemaker.aws/jupyter/default
api_base_url = login_url.partition("?")[0].rpartition("/")[0] + "/jupyter/default"
print(api_base_url)

https://d-ngfhxewhrmqe.studio.ap-southeast-1.sagemaker.aws/jupyter/default


In [3]:
# Create an HTTP session (for cookie/header memory) and use it to log in:
reqsess = requests.Session()
login_resp = reqsess.get(presigned_resp["AuthorizedUrl"])
print(login_resp)

# (See login_resp.headers and login_resp.text (the loading page HTML) for more details)

<Response [200]>


In [4]:
# TODO: Need to wait here if the JupyterServer 'default' app is not ready?

## Find & load your target notebook file

This example hard-codes the notebook name to avoid accidentally executing something it shouldn't - but in general you'll want to use the `contents` APIs to locate your target notebook file, check it exists, or even upload it.

In [5]:
# We'll hard-code the notebook URI below, but there are also APIs to traverse the directories:
contents_resp = reqsess.get(f"{api_base_url}/api/contents").json()

nbpath = "HelloWorld.ipynb"
nbname = nbpath

print(json.dumps(contents_resp, indent=2))

# Check the notebook exists at the top level of the folder tree:
try:
    next(
        c for c in contents_resp["content"]
        if c["type"] == "notebook" # not 'file' or 'directory'
        and c["name"] == nbname
        and c["path"] == nbpath
    )
    print(f"\nFound {nbpath}")
except StopIteration:
    raise ValueError(f"Could not find {nbpath} in the user's account!")

{
  "name": "",
  "path": "",
  "last_modified": "2021-04-16T11:24:54.112000Z",
  "created": "2021-04-16T11:24:54.112000Z",
  "content": [
    {
      "name": "HelloWorld.json",
      "path": "HelloWorld.json",
      "last_modified": "2021-04-16T14:48:36.411000Z",
      "created": "2021-04-16T14:48:36.411000Z",
      "content": null,
      "format": null,
      "mimetype": "application/json",
      "size": 7,
      "writable": true,
      "type": "file"
    },
    {
      "name": "HelloWorld.ipynb",
      "path": "HelloWorld.ipynb",
      "last_modified": "2021-04-16T08:59:53.550000Z",
      "created": "2021-04-16T08:59:53.550000Z",
      "content": null,
      "format": null,
      "mimetype": null,
      "size": 1078,
      "writable": true,
      "type": "notebook"
    }
  ],
  "format": "json",
  "mimetype": null,
  "size": null,
  "writable": true,
  "type": "directory"
}

Found HelloWorld.ipynb


Next, you'll want to load the contents of the target file because it contains the code to execute, but also specifies the kernel we'll need to run it on:

In [6]:
# Load the notebook and get the code of each cell
print(f"Loading {nbpath}")
nb_resp = reqsess.get(f"{api_base_url}/api/contents/{nbpath}")
file = nb_resp.json()

nb_kernel_spec = file["content"]["metadata"]["kernelspec"]
nb_kernel_name = nb_kernel_spec["name"]
print(f"Kernel spec:\n{nb_kernel_spec}\n")

code = [
    c["source"] for c in file["content"]["cells"]
    if c["cell_type"] == "code" and len(c.get("source", "")) > 0
]
print(f"Loaded {len(code)} cells of code")

Loading HelloWorld.ipynb
Kernel spec:
{'display_name': 'Python 3 (Data Science)', 'language': 'python', 'name': 'python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0'}

Loaded 3 cells of code


## Initialise kernel and session

To run the code, we'll need a **kernel** running and a **session** on that kernel.

In SageMaker Studio, a kernel is specified by the combination of the kernel spec name (the container image URI) and the instance type to run it on.

Although creating a kernel will automatically create an "app", deleting the kernel will not automatically clear the "app" because these concepts are not exactly equivalent through the APIs (see the clean-up section later).

In [7]:
kernel_specs = reqsess.get(f"{api_base_url}/api/kernelspecs").json()

if nb_kernel_name in kernel_specs["kernelspecs"]:
    print(f"Found kernel spec!\n")
    print(kernel_specs["kernelspecs"][nb_kernel_name])
else:
    print(json.dumps(kernel_specs, indent=2))
    raise ValueError(f"{nb_kernel_name} not present!")
    # TODO: Find closest spec if exact kernel spec is missing
    # (This can happen when e.g. moving to different region, because the name of the kernelspec is the
    # docker URI which is region-specific)

Found kernel spec!

{'name': 'python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0', 'spec': {'argv': ['python3', '-m', 'IPython.kernel', '-f', '{connection_file}'], 'display_name': 'Python 3 (Data Science)', 'language': 'python', 'metadata': {'sme_metadata': {'environment_arn': 'arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0', 'display_name': 'Data Science', 'description': 'Anaconda Individual Edition https://www.anaconda.com/distribution/', 'gpu_optimized': False, 'is_template': True}, 'instance_type': 'ml.t3.medium'}}, 'resources': {'logo-64x64': '/kernelspecs/python3/logo-64x64.png', 'logo-32x32': '/kernelspecs/python3/logo-32x32.png'}}


In [8]:
# Retrieve currently running kernels:
kernels = reqsess.get(f"{api_base_url}/api/kernels").json()
print(f"Found {len(kernels)} running kernels\n")
print(kernels)

Found 0 running kernels

[]


In [9]:
# Locate compatible kernel for notebook:
try:
    kernel = next(
        k for k in kernels
        if k["name"] == nb_kernel_name
        # TODO: instance type matches notebook spec or kernel default?
    )
    print(f"Found compatible running kernel {kernel['id']}")
    print(kernel)
except StopIteration:
    kernel = None
    print(f"NO KERNEL ALREADY RUNNING: NEED TO CREATE")

NO KERNEL ALREADY RUNNING: NEED TO CREATE


In [10]:
# Create a kernel if required:
if kernel:
    print(f"Using existing kernel {kernel['id']}")
else:
    print(f"Creating kernel...")
    kernel_resp = reqsess.post(
        f"{api_base_url}/api/kernels",
        json={
            "name": nb_kernel_name,
            "instance_type": "ml.t3.medium",  # TODO: Take from NB metadata or default kernel spec or whatever
            "path": nbpath,
        },
        params={ "_xsrf": reqsess.cookies["_xsrf"] },  # Seems like this can be put in either header or query
    )
    # (If the SMStudio 'app' is not already created, this will just block until InService before returning)
    print(kernel_resp)
    kernel = kernel_resp.json()
    print(f"Created kernel:\n{kernel}")

<Response [201]>
Created kernel:
{'id': '28b365d2-7bd7-4698-a8df-0764dcb348ee', 'name': 'python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0', 'last_activity': '2021-04-16T15:00:07.742703Z', 'execution_state': 'starting', 'connections': 0, 'instance_type': 'ml.t3.medium', 'app_name': 'datascience-1-0-ml-t3-medium-81187bd2ae843298bc309cb256a3'}


In [11]:
# Create a kernel session on the target kernel & notebook:
nbsess_gen_uuid = uuid.uuid4()
nbsess_resp = reqsess.post(
    f"{api_base_url}/api/sessions",
    json={
        "kernel": {
            "id": kernel["id"],
        },
        # Attach a GUID to the path as SMStudio does (IDK, but needs it!)
        "path": f"{nbname}-{nbsess_gen_uuid}.ipynb",
        "name": nbname,
        "type": "pending",  # ?
    },
    params={ "_xsrf": reqsess.cookies["_xsrf"] },  # Seems like this can be put in either header or query
)
print(nbsess_resp)
nbsess = nbsess_resp.json()
nbsess

<Response [201]>


{'id': '688bfe73-ae29-4a8f-b78c-0c445524a740',
 'path': 'HelloWorld.ipynb-14a9785b-be56-4525-aaa6-b75ba1701ee3.ipynb',
 'name': 'HelloWorld.ipynb',
 'type': 'pending',
 'kernel': {'id': '28b365d2-7bd7-4698-a8df-0764dcb348ee',
  'name': 'python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0',
  'last_activity': '2021-04-16T15:00:08.691402Z',
  'execution_state': 'starting',
  'connections': 0,
  'instance_type': 'ml.t3.medium',
  'app_name': 'datascience-1-0-ml-t3-medium-81187bd2ae843298bc309cb256a3'}}

In [12]:
# TODO: Wait for execution_state?
# ...It seems to say execution_state='starting' forever though...
nbsess = reqsess.get(f"{api_base_url}/api/sessions/{nbsess['id']}").json()
nbsess

{'id': '688bfe73-ae29-4a8f-b78c-0c445524a740',
 'path': 'HelloWorld.ipynb-14a9785b-be56-4525-aaa6-b75ba1701ee3.ipynb',
 'name': 'HelloWorld.ipynb',
 'type': 'pending',
 'kernel': {'id': '28b365d2-7bd7-4698-a8df-0764dcb348ee',
  'name': 'python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:ap-southeast-1:492261229750:image/datascience-1.0',
  'last_activity': '2021-04-16T15:00:08.691402Z',
  'execution_state': 'starting',
  'connections': 0,
  'instance_type': 'ml.t3.medium',
  'app_name': 'datascience-1-0-ml-t3-medium-81187bd2ae843298bc309cb256a3'}}

## Run the code

You have the content you want to run, and a running kernel & session to do it on - we're finally ready to run the code!

Actual communication with the session is via WebSocket APIs, rather than REST: So we'll first define some utility classes to smooth things along:

In [13]:
class JupyterWSMessageBase:
    """Utility class for composing a Jupyter WebSocket message object"""
    def __init__(self, msg_type: str, session_id=None, user_id="dummyuser", parent_msg=None):
        self.msg_type = msg_type
        self.parent_msg = parent_msg
        self.session_id = session_id or uuid.uuid1().hex
        self.user_id = user_id

        self.msg_id = None
        self.timestamp = None

    def render(self):
        self.msg_id = uuid.uuid1().hex
        self.timestamp = datetime.now()
        header = {
            "date": self.timestamp.isoformat(),
            "msg_id": self.msg_id,
            "msg_type": self.msg_type,
            "session": self.session_id,
            "username": self.user_id,
            "version": "5.0",
        }
        if self.parent_msg:
            parent_header = {
                "date": self.parent_msg.timestamp.isoformat(),
                "msg_id": self.parent_msg.msg_id,
                "msg_type": self.parent_msg.msg_type,
                "session": self.parent_msg.session_id,
                "username": self.parent_msg.user_id,
                "version": "5.0",
            }
        else:
            parent_header = {}
        return {
            "header": header,
            "parent_header": parent_header,
            "metadata": {},
            "content": {},
        }

    def send(self, wsconn):
        return wsconn.send(json.dumps(self.render()))

class ExecuteCodeRequestMessage(JupyterWSMessageBase):
    """Utility class for composing a Jupyter WebSocket message to request execution of code on the kernel"""
    def __init__(self, code, **kwargs):
        super(ExecuteCodeRequestMessage, self).__init__("execute_request", **kwargs)
        self.code = code

    def render(self):
        base_msg = super(ExecuteCodeRequestMessage, self).render()
        base_msg["content"]["code"] = self.code
        base_msg["content"]["silent"] = False
        return base_msg

Then to run through the notebook, simply:

- Create a websocket connection, carrying over the required cookies from our REST session
- Send requests to run each of the cells in turn
- Wait for responses from the cells

In [14]:
# Execution request/reply is done on websockets channels
ws_base_url = "wss://" + api_base_url.partition("://")[2] + "/api/kernels"
cookies = reqsess.cookies.get_dict()

print(f"Connecting to:\n{ws_base_url}/{kernel['id']}/channels?session_id={nbsess['id']}")
ws = websocket.create_connection(
    f"{ws_base_url}/{kernel['id']}/channels?session_id={nbsess['id']}",
    cookie="; ".join(["%s=%s" %(i, j) for i, j in cookies.items()]),
)
print("Connected\n")

try:
    # Send commands to "Run all", and build a store of req/responses:
    cell_traffic = []
    cell_traffic_by_req_id = {}
    for ix, c in enumerate(code):
        msg = ExecuteCodeRequestMessage(
            code=c,
            session_id=nbsess["id"],
        )
        msg.send(ws)
        cell_traffic_by_req_id[msg.msg_id] = {
            "cell_ix": ix,
            "request": msg,
            "responses": [],
        }
        cell_traffic.append(cell_traffic_by_req_id[msg.msg_id])

    # Read responses until we have 'em all':
    cells_pending = [True for c in cell_traffic]
    while any(cells_pending):
        res = json.loads(ws.recv())
        res_type = res["msg_type"]
        res_parent_id = res["parent_header"].get("msg_id")
        if res_parent_id:
            # Optional, maybe over-strict check - could just ignore these:
            if not res_parent_id in cell_traffic_by_req_id:
                raise ValueError(f"Received 'reply' for unknown message ID:\n{res}")

            cell_ix = cell_traffic_by_req_id[res_parent_id]["cell_ix"]
            cell_traffic_by_req_id[res_parent_id]["responses"].append(res)
            print(f"Got cell response message of type {res_type} to cell {cell_ix}")
            if res_type == "execute_reply":
                # Each cell execution request will typically generate a one or more 'status' messages,
                # Zero or more 'stream' messages depending whether it has any output, and then a final
                # 'execute_reply' message - signalling that it's done
                cells_pending[cell_ix] = False
                if res["content"]["status"] != "ok":
                    raise ValueError(f"Cell {cell_ix} exited with status {res['status']}:\n{res}")
                else:
                    print(f"Cell {cell_ix} done")
        else:
            print(f"Ignoring msg of type {res_type}")
    print("Done")
finally:
    ws.close()

Connecting to:
wss://d-ngfhxewhrmqe.studio.ap-southeast-1.sagemaker.aws/jupyter/default/api/kernels/28b365d2-7bd7-4698-a8df-0764dcb348ee/channels?session_id=688bfe73-ae29-4a8f-b78c-0c445524a740
Connected

Ignoring msg of type status
Ignoring msg of type status
Ignoring msg of type status
Got cell response message of type status to cell 0
Got cell response message of type execute_input to cell 0
Got cell response message of type execute_reply to cell 0
Cell 0 done
Got cell response message of type status to cell 0
Got cell response message of type status to cell 1
Got cell response message of type execute_input to cell 1
Got cell response message of type stream to cell 1
Got cell response message of type status to cell 1
Got cell response message of type execute_reply to cell 1
Cell 1 done
Got cell response message of type status to cell 2
Got cell response message of type execute_input to cell 2
Got cell response message of type execute_reply to cell 2
Cell 2 done
Done


Optionally you can of course through the results of each cell:

In [15]:
for ctraffic in cell_traffic:
    for resp in ctraffic["responses"]:
        if resp["msg_type"] == "stream":
            print(resp["content"])

{'name': 'stdout', 'text': 'Hello, world!\n'}


## Clean-up

Of course when we're done we should clear up our session and kernel - and in particular, to release chargeable infrastructure, the SageMaker Studio "App" too.

In [16]:
# Clean up the session:
del_sess_resp = reqsess.delete(
    f"{api_base_url}/api/sessions/{nbsess['id']}",
    params={ "_xsrf": reqsess.cookies["_xsrf"] },
)
print(del_sess_resp)
nbsess = None
del_sess_resp.text

<Response [204]>


''

In [17]:
# May need to run this?
kernel = reqsess.post(
    f"{api_base_url}/api/kernels",
    json={
        "name": nb_kernel_name,
        "instance_type": "ml.t3.medium",  # TODO: Take from NB metadata or default kernel spec or whatever
        "path": nbpath,
    },
    params={ "_xsrf": reqsess.cookies["_xsrf"] },
).json()

In [18]:
# Clean up the kernel:
kernel_app_name = kernel["app_name"]
print(f"Deleting kernel {kernel['id']}...")
del_kernel_resp = reqsess.delete(
    f"{api_base_url}/api/kernels/{kernel['id']}",
    params={ "_xsrf": reqsess.cookies["_xsrf"] },
)
print(del_kernel_resp)
kernel = None
del_kernel_resp.text

Deleting kernel c4d0e591-d6af-41ab-937b-95b3eb56179c...
<Response [204]>


''

In [19]:
# Deleting the kernel does not automatically delete the 'app' - here's how:
del_app_resp = reqsess.delete(
    f"{api_base_url}/sagemaker/api/apps/{kernel_app_name}",
    params={ "_xsrf": reqsess.cookies["_xsrf"] },
)
print(del_app_resp)
del_app_resp.text

<Response [204]>


''

In [20]:
# Check the apps are gone:
reqsess.get(f"{api_base_url}/sagemaker/api/apps").json()

[]

## All done!

To extend into other use cases, you can refer to the Jupyter REST & client APIs mentioned earlier - and use browser network inspector/devtools to explore SageMaker extension APIs like the `apps` endpoint demonstrated here.