In [1]:
import requests
import time
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Configs
NIFI_PG_IDS = {
    "pg": "134eb35b-0199-1000-9b35-a2b3144a420e",
}
NIFI_BASE_URL = "https://localhost:8443"
REQUESTS_VERIFY = False  # use "/etc/ssl/certs/nifi-ca.pem" if mounted

STOP_AFTER_FLOWFILES = 100
MAX_RUN_SECONDS = 10
STATUS_POLL_SECONDS = 5


def get_nifi_session():
    s = requests.Session()
    s.verify = REQUESTS_VERIFY
    s.headers.update({"User-Agent": "Notebook-NiFi-Client/1.0"})
    return s
get_nifi_session()



<requests.sessions.Session at 0x1087bb0a0>

In [2]:
def wait_until_ready(base_url: str, timeout_sec=60):
    s = get_nifi_session()
    start = time.time()
    while time.time() - start < timeout_sec:
        try:
            r = s.get(f"{base_url}/nifi-api/system-diagnostics", timeout=8)
            if r.status_code in (200, 401, 403):
                print(f"✅ NiFi is up (status={r.status_code})")
                return True
            else:
                print(f"⏳ NiFi not ready yet (status={r.status_code})")
        except Exception as e:
            print(f"❌ NiFi not ready yet: {e}")
        time.sleep(3)
    raise RuntimeError("NiFi did not become ready in time")

# Run
wait_until_ready(NIFI_BASE_URL)


✅ NiFi is up (status=401)


True

In [4]:
def get_nifi_access_token(username: str, password: str):
    wait_until_ready(NIFI_BASE_URL, timeout_sec=60)
    session = get_nifi_session()

    attempts, backoff = 5, 2
    for i in range(1, attempts + 1):
        try:
            print(f"[Attempt {i}] Getting token from {NIFI_BASE_URL}")
            r = session.post(
                f"{NIFI_BASE_URL}/nifi-api/access/token",
                data={"username": username, "password": password},
                headers={"Content-Type": "application/x-www-form-urlencoded"},
                timeout=30,
            )
            if r.status_code in (200, 201):
                token = r.text.strip()
                print(f"✅ Got token (len={len(token)})")
                return token
            else:
                print(f"❌ Token retrieval failed: {r.status_code} - {r.text}")
        except Exception as e:
            print(f"⚠️ Token request error: {e}")

        time.sleep(backoff ** i)

    raise Exception("Failed to obtain NiFi access token after retries")

# Run (replace with real credentials)
TOKEN = get_nifi_access_token("nifi", "nifi12345678")


✅ NiFi is up (status=401)
[Attempt 1] Getting token from https://localhost:8443
✅ Got token (len=438)


In [5]:
def start_nifi_flow(token: str):
    pg_id = list(NIFI_PG_IDS.values())[0]
    session = get_nifi_session()
    r = session.put(
        f"{NIFI_BASE_URL}/nifi-api/flow/process-groups/{pg_id}",
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json={"id": pg_id, "state": "RUNNING"},
        timeout=30,
    )
    print(f"Start flow status: {r.status_code}, body: {r.text}")
    if r.status_code not in (200, 201):
        raise Exception("Start flow failed")
    return {"status": "success", "pg_id": pg_id}

# Run
start_nifi_flow(TOKEN)


Start flow status: 200, body: {"id":"134eb35b-0199-1000-9b35-a2b3144a420e","state":"RUNNING"}


{'status': 'success', 'pg_id': '134eb35b-0199-1000-9b35-a2b3144a420e'}

In [6]:
def wait_for_nifi_flow_completion(token: str):
    pg_id = list(NIFI_PG_IDS.values())[0]
    session = get_nifi_session()

    start_ts = time.time()
    baseline_out = None
    last_out = 0

    while True:
        r = session.get(
            f"{NIFI_BASE_URL}/nifi-api/flow/process-groups/{pg_id}/status",
            headers={"Authorization": f"Bearer {token}"},
            timeout=30,
        )
        snap = r.json().get("processGroupStatus", {}).get("aggregateSnapshot", {})

        flowfiles_out = int(snap.get("flowFilesOut", 0))
        flowfiles_queued = int(snap.get("flowFilesQueued", 0))

        if baseline_out is None:
            baseline_out = flowfiles_out

        produced_now = flowfiles_out - baseline_out
        elapsed = int(time.time() - start_ts)

        print(f"[{elapsed}s] queued={flowfiles_queued}, out_total={flowfiles_out}, out_new={produced_now}")

        if produced_now >= STOP_AFTER_FLOWFILES:
            return {"status": "completed-by-count", "produced": produced_now}
        if elapsed >= MAX_RUN_SECONDS:
            return {"status": "completed-by-time", "produced": produced_now}
        if flowfiles_queued == 0 and produced_now == last_out and produced_now > 0:
            return {"status": "completed-by-idle", "produced": produced_now}

        last_out = produced_now
        time.sleep(STATUS_POLL_SECONDS)

# Run
wait_for_nifi_flow_completion(TOKEN)


[0s] queued=100, out_total=0, out_new=0
[5s] queued=100, out_total=0, out_new=0
[10s] queued=100, out_total=0, out_new=0


{'status': 'completed-by-time', 'produced': 0}

In [7]:
def stop_nifi_flow(token: str):
    pg_id = list(NIFI_PG_IDS.values())[0]
    session = get_nifi_session()
    r = session.put(
        f"{NIFI_BASE_URL}/nifi-api/flow/process-groups/{pg_id}",
        headers={"Authorization": f"Bearer {token}", "Content-Type": "application/json"},
        json={"id": pg_id, "state": "STOPPED"},
        timeout=30,
    )
    print(f"Stop flow status: {r.status_code}, body: {r.text}")
    return {"status": "stopped" if r.status_code in (200, 201) else "failed"}

# Run
stop_nifi_flow(TOKEN)


Stop flow status: 200, body: {"id":"134eb35b-0199-1000-9b35-a2b3144a420e","state":"STOPPED"}


{'status': 'stopped'}

In [8]:
def purge_nifi_queues(token: str):
    pg_id = list(NIFI_PG_IDS.values())[0]
    session = get_nifi_session()

    # Get all connections
    r = session.get(
        f"{NIFI_BASE_URL}/nifi-api/process-groups/{pg_id}/connections",
        headers={"Authorization": f"Bearer {token}"},
        timeout=30,
    )
    connections = [c["id"] for c in r.json().get("connections", [])]

    for cid in connections:
        dr = session.post(
            f"{NIFI_BASE_URL}/nifi-api/flowfile-queues/{cid}/drop-requests",
            headers={"Authorization": f"Bearer {token}"},
        ).json()
        dr_id = dr["dropRequest"]["id"]

        # Poll until finished
        while True:
            resp = session.get(
                f"{NIFI_BASE_URL}/nifi-api/flowfile-queues/{cid}/drop-requests/{dr_id}",
                headers={"Authorization": f"Bearer {token}"},
            ).json()["dropRequest"]

            print(f"[purge {cid}] dropped={resp.get('droppedCount')}, finished={resp.get('finished')}")
            if resp.get("finished"):
                break
            time.sleep(2)

    print("✅ All queues purged.")
    return {"status": "purged", "connections": len(connections)}

# Run
purge_nifi_queues(TOKEN)


[purge bfa70522-6681-3792-0b14-7243661bef82] dropped=0, finished=True
[purge a49dbb03-11a8-3faa-e763-8fa1ee062e68] dropped=0, finished=True
[purge ae6c9a2e-4e23-3cc0-701d-a3337ff3bea4] dropped=0, finished=True
[purge 03f95a6c-e694-357b-0f96-ce91f57240f6] dropped=1, finished=True
[purge e81cfcc9-355c-309a-1dde-41824f03d591] dropped=0, finished=True
[purge 32599c19-14fe-31f4-a7dd-75fe26c7b063] dropped=0, finished=True
[purge 88e00738-a0d6-316a-d9d9-354cd28bad17] dropped=1, finished=True
[purge 557c2f38-7bdc-39e2-c0b0-3880327f113c] dropped=99, finished=True
✅ All queues purged.


{'status': 'purged', 'connections': 8}