In [1]:
import json
import subprocess

import requests


# create a requests Session object 
s = requests.Session()

In [2]:
# get the running server(s) from `jupyter notebook list`
p = subprocess.run(["jupyter", "lab", "list", "--json"], capture_output=True, text=True)
servers = []
for line in p.stdout.splitlines():
    servers.append(json.loads(line))
servers

[{'base_url': '/',
  'hostname': '0.0.0.0',
  'password': False,
  'pid': 13,
  'port': 8888,
  'root_dir': '/root/host_home',
  'secure': False,
  'sock': '',
  'token': '3cc975c871f1a4f6e2f3516ca9413fab90339d1bc372eec9',
  'url': 'http://575952227d11:8888/',
  'version': '2.15.0'}]

In [3]:
# assume one server for now
# can select by comparing base_url to $JUPYTERHUB_SERVICE_PREFIX
server = servers[0]
jupyter_api = f"{server['url']}api"
token = server['token']
s.headers = {"Authorization": f"token {token}"}

In [4]:
# List sessions using jupyter REST API

sessions = s.get(f"{jupyter_api}/sessions").json()

# sort by activity
# last_activity is ISO8601 strings, sortable without parsing
sessions = sorted(sessions, key=lambda s: s["kernel"]["last_activity"], reverse=True)
sessions

[{'id': 'a8d4d5c3-385a-491d-92c6-61ae6d378658',
  'path': 'Tumor_subcluster_wu-jvsc-2cc9d224-5ef7-46d0-8329-5a75dc72e807327bccea-0efc-4827-9804-fc3247d53958.ipynb',
  'name': 'Tumor_subcluster_wu-a8782db1-076f-41a7-8176-c7ce742b9d66.ipynb',
  'type': 'notebook',
  'kernel': {'id': '29b24413-3fc1-4302-97bd-30505b667c90',
   'name': 'python3',
   'last_activity': '2025-02-08T15:23:06.857609Z',
   'execution_state': 'idle',
   'connections': 1},
  'notebook': {'path': 'Tumor_subcluster_wu-jvsc-2cc9d224-5ef7-46d0-8329-5a75dc72e807327bccea-0efc-4827-9804-fc3247d53958.ipynb',
   'name': 'Tumor_subcluster_wu-a8782db1-076f-41a7-8176-c7ce742b9d66.ipynb'}}]

In [385]:
# don't shutdown the most recently active kernel, but shut down all others
if sessions:
    print(f"Not shutting down most recently active session: {sessions[0]['name']}")

for session in sessions[1:]:
    kernel = session['kernel']
    name = session['name']
    if kernel['execution_state'] == 'busy':
        print(f"Not shutting down busy kernel for {name}")
        continue
    if kernel['connections']:
        print(f"Not shutting down kernel with {kernel['connections']} connections: {name}")
        continue
    print(f"Shutting down session {name} idle since {kernel['last_activity']}")
    r = s.delete(f"{jupyter_api}/sessions/{session['id']}")
    r.raise_for_status()

Not shutting down most recently active session: Tumor_subcluster-0d581faa-5d9a-4de3-bec2-f19662dab7ab.ipynb
Not shutting down kernel with 1 connections: Zuani2-c72492af-6857-4f84-9f05-0033d94c1c73.ipynb
