In [7]:
import requests
from requests.auth import HTTPBasicAuth
from datetime import datetime, timedelta
from base64 import b64encode 
import json
import time

url =  "https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/controller/cluster"  # Replace with your actual API endpoint
user_id = "your-cdp-login"
password = "your-cdp-password"


In [10]:


def get_http_response(url, user_id, password):
    basic = HTTPBasicAuth(user_id, password)
    response = requests.get(url, auth=basic, verify=False)
    print("returned :" , response.text)
    response.raise_for_status()  # Raise an exception for error responses

    return response.json() 

def make_put_request(url, data, username, password, content_type="application/json"):
    """
    Makes a PUT request to the specified URL with basic authentication.

    Args:
        url (str): The URL of the resource to update.
        data (dict or str): The data to send in the request body.
            If a dictionary, it will be converted to JSON.
        username (str): The username for basic authentication.
        password (str): The password for basic authentication.
        content_type (str, optional): The content type of the request body.
            Defaults to "application/json".

    Returns:
        requests.Response: The response object containing the server's response.
    """

    if isinstance(data, dict):
        data = json.dumps(data)  # Convert dictionary to JSON if necessary

    # Combine username and password for basic authentication
    auth_string = b64encode(f"{username}:{password}".encode("utf-8")).decode("ascii")
    headers = {"Authorization": f"Basic {auth_string}", "Content-Type": content_type}

    try:
        response = requests.put(url, headers=headers, data=data)
        response.raise_for_status()  # Raise an exception for non-2xx status codes
        return response
    except requests.exceptions.RequestException as e:
        print(f"Error making PUT request: {e}")
        return None

def get_nifi_processor_clientid_and_version(url, user_id, password):
    data =get_http_response(url, user_id, password) 

    print(data["revision"]) # {'clientId': 'c74a3773-cee5-15db-6b8c-7c5873a52dd7', 'version': 16}
    return data["revision"]["clientId"], data["revision"]["version"]

def get_primary_nodeId_and_address(url, user_id, password) :
    data =get_http_response(url, user_id, password) 
    primary_node = None

    for node in data["cluster"]["nodes"]:
        print(node)
        if "Primary Node" in node["roles"] : 
            print("Primary:", node)
            primary_node = node
            break
        
    if primary_node:
    # When Deploying to Airflow we need to uncomment this 
    #     kwargs['ti'].xcom_push(key='primary_node_id', value=primary_node['nodeId'])
    #     kwargs['ti'].xcom_push(key='primary_node_address', value=primary_node['address'])
        # COMMENT these 3 lines for Airflow and uncomment the earlier lines for Airflow
        print( "Nodeid : ", primary_node['nodeId'])
        print( "Nodeaddress : ", primary_node['address'])    
        return primary_node['nodeId'], primary_node['address']
    else:
        raise Exception("Primary node not found in API response.")
    
def fetch_state_value(url, state_property, primary_nodeId,user_id, password) :
# let us call this first time for the flow 
    data =get_http_response(url, user_id, password)     
    print(data["componentState"]["localState"]["state"])

    processor_states = data["componentState"]["localState"]["state"]
    for state in processor_states:
        if state["clusterNodeId"] == primary_nodeId and state["key"] == state_property :
            date_str=state["value"]
            print(date_str)
            converted_date=datetime.strptime(date_str, "%a %b %d %H:%M:%S UTC %Y")
            print(converted_date)
    return converted_date

In [11]:
# Get Primary Node Details
cluster_url = "https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/controller/cluster"  # Replace with your actual API endpoint
primary_nodeId , _ = get_primary_nodeId_and_address(cluster_url, user_id, password)
print(primary_nodeId)

# start_processor_id = "39793bc3-7869-1e73-8b15-8c2b6c6ebf25" # for testing use "0d082996-0192-1000-ffff-fffff4de0cfe"
# end_processor_id = "c74a337b-cee5-15db-ba2f-0fc29c1aa05a" # for testing use 0d11f872-0192-1000-ffff-ffff8cdff6f2

start_processor_id = "effa3ae8-49b5-1782-8275-e030ee5088dc" 
end_processor_id = "c74a337b-cee5-15db-ba2f-0fc29c1aa05a" 

# Let us get the Client Id and Version Id
url_nifi_start_processor=f"https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/processors/{start_processor_id}"
clientId, version = get_nifi_processor_clientid_and_version(url_nifi_start_processor, user_id, password)
print(f"ClientId: {clientId}  \n version: {version}")

# Now Getting the Start Processor to RUNNING MODE
run_url = f"https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/processors/{start_processor_id}/run-status"
payload_data = {"revision": {"clientId": clientId, "version": version}, "state": "RUNNING", "disconnectedNodeAcknowledged": True}

response = make_put_request(run_url, payload_data, user_id, password)
if response:
    print(f"Request successful! Response status code: {response.status_code}")
    print(response.text)  # Access the response content
else:
    print("Request failed.")
#let us wait for seconds 
time.sleep(2  )

# Finally Let us Stop the Processor 
clientId, version = get_nifi_processor_clientid_and_version(url_nifi_start_processor, user_id, password)
payload_data = {"revision": {"clientId": clientId, "version": version}, "state": "STOPPED", "disconnectedNodeAcknowledged": True}

response = make_put_request(run_url, payload_data, user_id, password)

if response:
    print(f"Request successful! Response status code: {response.status_code}")
    print(response.text)  # Access the response content
else:
    print("Request failed.")
    
# Let us get the output of the end processor
end_nifiprocessor_url = f"https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/processors/{end_processor_id}/state"
state_property="last_tms"
initial_val = fetch_state_value(end_nifiprocessor_url, state_property, primary_nodeId, user_id, password)
print(initial_val)

# now Loop and wait till an update happens
while True:
    current_val = fetch_state_value(end_nifiprocessor_url, state_property, primary_nodeId, user_id, password)
    if initial_val == current_val :
        print("Waiting...")
        time.sleep(10)
    else:
        print(f"Update found: {current_val}")
        break 





returned : {"cluster":{"nodes":[{"nodeId":"8313ee9d-51af-4abe-bcb6-406d62a3f3e7","address":"se-indonesia-cfm1-nifi0.se-indon.a465-9q4k.cloudera.site","apiPort":8443,"status":"CONNECTED","heartbeat":"10/03/2024 01:20:39 UTC","roles":["Cluster Coordinator"],"activeThreadCount":0,"queued":"1 / 0 bytes","events":[{"timestamp":"10/02/2024 10:24:12 UTC","category":"INFO","message":"Received first heartbeat from connecting node. Node connected."},{"timestamp":"10/02/2024 10:24:09 UTC","category":"INFO","message":"Connection requested from existing node. Setting status to connecting."}],"nodeStartTime":"10/02/2024 10:23:32 UTC"},{"nodeId":"dd1d8c30-5469-4880-b83b-2e3ae6bfd8d3","address":"se-indonesia-cfm1-nifi2.se-indon.a465-9q4k.cloudera.site","apiPort":8443,"status":"CONNECTED","heartbeat":"10/03/2024 01:20:39 UTC","roles":[],"activeThreadCount":0,"queued":"1 / 0 bytes","events":[{"timestamp":"10/02/2024 10:24:12 UTC","category":"INFO","message":"Received first heartbeat from connecting node



returned : {"revision":{"clientId":"4fe17ba6-0192-1000-db91-d3c5a443da11","version":1},"id":"effa3ae8-49b5-1782-8275-e030ee5088dc","uri":"https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site:443/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/processors/effa3ae8-49b5-1782-8275-e030ee5088dc","position":{"x":1120.0,"y":0.0},"permissions":{"canRead":true,"canWrite":true},"bulletins":[],"component":{"id":"effa3ae8-49b5-1782-8275-e030ee5088dc","versionedComponentId":"e3460adc-c183-3a8e-b33e-496782f0fa0e","parentGroupId":"e541fcda-ec63-33a8-a7b9-795848add787","position":{"x":1120.0,"y":0.0},"name":"WorldBank Trigger","type":"org.apache.nifi.processors.standard.GenerateFlowFile","bundle":{"group":"org.apache.nifi","artifact":"nifi-standard-nar","version":"1.25.0.2.2.8.300-34"},"state":"STOPPED","style":{},"relationships":[{"name":"success","description":"","autoTerminate":false,"retry":false}],"supportsParallelProcessing":true,"supportsEventDriven":false,"supportsBatching"



returned : {"revision":{"clientId":"4fe17ba6-0192-1000-db91-d3c5a443da11","version":2},"id":"effa3ae8-49b5-1782-8275-e030ee5088dc","uri":"https://se-indonesia-cfm1-management0.se-indon.a465-9q4k.cloudera.site:443/se-indonesia-cfm1/cdp-proxy-api/nifi-app/nifi-api/processors/effa3ae8-49b5-1782-8275-e030ee5088dc","position":{"x":1120.0,"y":0.0},"permissions":{"canRead":true,"canWrite":true},"bulletins":[],"component":{"id":"effa3ae8-49b5-1782-8275-e030ee5088dc","versionedComponentId":"e3460adc-c183-3a8e-b33e-496782f0fa0e","parentGroupId":"e541fcda-ec63-33a8-a7b9-795848add787","position":{"x":1120.0,"y":0.0},"name":"WorldBank Trigger","type":"org.apache.nifi.processors.standard.GenerateFlowFile","bundle":{"group":"org.apache.nifi","artifact":"nifi-standard-nar","version":"1.25.0.2.2.8.300-34"},"state":"RUNNING","style":{},"relationships":[{"name":"success","description":"","autoTerminate":false,"retry":false}],"supportsParallelProcessing":true,"supportsEventDriven":false,"supportsBatching"



returned : {"componentState":{"componentId":"c74a337b-cee5-15db-ba2f-0fc29c1aa05a","stateDescription":"Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.","clusterState":{"scope":"CLUSTER","totalEntryCount":0,"state":[]},"localState":{"scope":"LOCAL","totalEntryCount":3,"state":[{"key":"last_tms","value":"Thu Sep 26 09:03:17 UTC 2024","clusterNodeId":"8313ee9d-51af-4abe-bcb6-406d62a3f3e7","clusterNodeAddress":"se-indonesia-cfm1-nifi0.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Sep 26 09:05:03 UTC 2024","clusterNodeId":"dd1d8c30-5469-4880-b83b-2e3ae6bfd8d3","clusterNodeAddress":"se-indonesia-cfm1-nifi2.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Oct 03 01:18:58 UTC 2024","clusterNodeId":"4734738b-773a-4419-8473-22b2f69ab243","clusterNodeAddress":"se-indonesia-cfm1-nifi1.se-indon.a465-9q4k.cloudera.site:8443"}]}}}
[{'key': 'last_tms', 'value': 'Thu Sep 26 09:03:1



returned : {"componentState":{"componentId":"c74a337b-cee5-15db-ba2f-0fc29c1aa05a","stateDescription":"Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.","clusterState":{"scope":"CLUSTER","totalEntryCount":0,"state":[]},"localState":{"scope":"LOCAL","totalEntryCount":3,"state":[{"key":"last_tms","value":"Thu Sep 26 09:03:17 UTC 2024","clusterNodeId":"8313ee9d-51af-4abe-bcb6-406d62a3f3e7","clusterNodeAddress":"se-indonesia-cfm1-nifi0.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Sep 26 09:05:03 UTC 2024","clusterNodeId":"dd1d8c30-5469-4880-b83b-2e3ae6bfd8d3","clusterNodeAddress":"se-indonesia-cfm1-nifi2.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Oct 03 01:18:58 UTC 2024","clusterNodeId":"4734738b-773a-4419-8473-22b2f69ab243","clusterNodeAddress":"se-indonesia-cfm1-nifi1.se-indon.a465-9q4k.cloudera.site:8443"}]}}}
[{'key': 'last_tms', 'value': 'Thu Sep 26 09:03:1



returned : {"componentState":{"componentId":"c74a337b-cee5-15db-ba2f-0fc29c1aa05a","stateDescription":"Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.","clusterState":{"scope":"CLUSTER","totalEntryCount":0,"state":[]},"localState":{"scope":"LOCAL","totalEntryCount":3,"state":[{"key":"last_tms","value":"Thu Sep 26 09:03:17 UTC 2024","clusterNodeId":"8313ee9d-51af-4abe-bcb6-406d62a3f3e7","clusterNodeAddress":"se-indonesia-cfm1-nifi0.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Sep 26 09:05:03 UTC 2024","clusterNodeId":"dd1d8c30-5469-4880-b83b-2e3ae6bfd8d3","clusterNodeAddress":"se-indonesia-cfm1-nifi2.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Oct 03 01:18:58 UTC 2024","clusterNodeId":"4734738b-773a-4419-8473-22b2f69ab243","clusterNodeAddress":"se-indonesia-cfm1-nifi1.se-indon.a465-9q4k.cloudera.site:8443"}]}}}
[{'key': 'last_tms', 'value': 'Thu Sep 26 09:03:1



returned : {"componentState":{"componentId":"c74a337b-cee5-15db-ba2f-0fc29c1aa05a","stateDescription":"Gives the option to store values not only on the FlowFile but as stateful variables to be referenced in a recursive manner.","clusterState":{"scope":"CLUSTER","totalEntryCount":0,"state":[]},"localState":{"scope":"LOCAL","totalEntryCount":3,"state":[{"key":"last_tms","value":"Thu Sep 26 09:03:17 UTC 2024","clusterNodeId":"8313ee9d-51af-4abe-bcb6-406d62a3f3e7","clusterNodeAddress":"se-indonesia-cfm1-nifi0.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Sep 26 09:05:03 UTC 2024","clusterNodeId":"dd1d8c30-5469-4880-b83b-2e3ae6bfd8d3","clusterNodeAddress":"se-indonesia-cfm1-nifi2.se-indon.a465-9q4k.cloudera.site:8443"},{"key":"last_tms","value":"Thu Oct 03 01:21:18 UTC 2024","clusterNodeId":"4734738b-773a-4419-8473-22b2f69ab243","clusterNodeAddress":"se-indonesia-cfm1-nifi1.se-indon.a465-9q4k.cloudera.site:8443"}]}}}
[{'key': 'last_tms', 'value': 'Thu Sep 26 09:03:1