In [1]:
# Caution : on the first time you have to restart the Python Kernel!
%pip install kubernetes tqdm pandas nbformat kubernetes

from kubernetes import client, config
from tqdm.notebook import tqdm as tqdm

Note: you may need to restart the kernel to use updated packages.


## Deploy dependencies

In [2]:
def deploy_common_modules():
    import subprocess
    subprocess.call(['./common_modules.sh'])


In [3]:
import subprocess
import shlex
import datetime
from dateutil.tz import tzlocal
import pandas as pd
import glob
import time
from time import sleep 
def run_command(command, shell=True, log=True):
    print(command)
    if (isinstance(command, list) == False):
        commands = [command]
    else:
        commands=command
    processes = [None] * len(commands)
    for i in range(len(commands)):
        if log == True:
            print (commands[i])
        processes[i] = subprocess.Popen(shlex.split(commands[i]), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=shell)
    while processes:
        for i, process in enumerate(processes):
            output = process.stdout.readline().decode()
            if output == '' and process.poll() is not None:
                processes.remove(process)
                break
            if output and log:
                now=datetime.datetime.now(tzlocal())
                strnow = now.strftime("%Y-%m-%d %H:%M:%S")
                print ("Log {0} - {1} : ".format(i,strnow) + output.strip())
    
    rc = process.poll()
    return rc

In [None]:
def init_label_nodes(jobmanagers_qty=1, kafka_qty=1, minio_qty=1):
   
    # Configs can be set in Configuration class directly or using helper utility
    config.load_kube_config()
    api_instance = client.CoreV1Api()

    # Listing the cluster nodes
    node_list = api_instance.list_node()
    label_master = "node-role.kubernetes.io/controlplane";
    label_worker = "node-role.kubernetes.io/worker";
    count = 0;
    manager_node = None
    jobmanager_node = None
    kafka_node= None
    minio_node = None
    taskmanager_nodes = []
    for i, node in enumerate(node_list.items):
        print("%s\t%s" % (node.metadata.name, node.metadata.labels))
        if (label_worker in node.metadata.labels and node.metadata.labels[label_worker] == "true"):
            if count == 0:
                body = {
                    "metadata": {
                        "labels": {
                            "tier":"manager"
                        }
                    }
                }
                api_response = api_instance.patch_node(node.metadata.name, body)
                manager_node = node.metadata.name
            elif count <= jobmanagers_qty:
                body = {
                    "metadata": {
                        "labels": {
                            "tier":"jobmanager"
                        }
                    }
                }
                api_response = api_instance.patch_node(node.metadata.name, body)
                jobmanager_node = node.metadata.name
            elif count <= jobmanagers_qty + kafka_qty:
                body = {
                    "metadata": {
                        "labels": {
                            "tier":"kafka"
                        }
                    }
                }
                api_response = api_instance.patch_node(node.metadata.name, body)
                kafka_node = node.metadata.name
            elif count <= jobmanagers_qty + kafka_qty + minio_qty:
                body = {
                    "metadata": {
                        "labels": {
                            "tier":"minio"
                        }
                    }
                }
                api_response = api_instance.patch_node(node.metadata.name, body)
                minio_node = node.metadata.name
            else:
                body = {
                    "metadata": {
                        "labels": {
                            "tier":"taskmanager"
                        }
                    }
                }
                api_response = api_instance.patch_node(node.metadata.name, body)
                taskmanager_nodes.append(node.metadata.name)

            count += 1 

In [None]:
def get_label_nodes(ip_address=False):

        # Configs can be set in Configuration class directly or using helper utility
    config.load_kube_config()
    api_instance = client.CoreV1Api()

    # Listing the cluster nodes
    node_list = api_instance.list_node()
    label_master = "node-role.kubernetes.io/controlplane";
    label_worker = "node-role.kubernetes.io/worker";
    count = 0;
    manager_node = None
    jobmanager_node = None
    kafka_node= None
    minio_node = None
    taskmanager_nodes = []
    for i, node in enumerate(node_list.items):
        print("%s\t%s" % (node.metadata.name, node.metadata.labels))
        address = [address.address for address in node.status.addresses if address.type=="InternalIP"][0]
        if ("tier" in node.metadata.labels and node.metadata.labels["tier"] == "manager" ):
            if ip_address:
                manager_node = address
            else:
                manager_node = node.metadata.name
        if ("tier" in node.metadata.labels and node.metadata.labels["tier"] == "jobmanager" ):
            if ip_address:
                jobmanager_node = address
            else:
                jobmanager_node = node.metadata.name

        if ("tier" in node.metadata.labels and node.metadata.labels["tier"] == "taskmanager" ):
            if ip_address:
                taskmanager_nodes = address
            else:            
                taskmanager_nodes.append(node.metadata.name)
        if ("tier" in node.metadata.labels and node.metadata.labels["tier"] == "kafka" ):
            if ip_address:
                kafka_node = address
            else:            
                kafka_node = node.metadata.name
        if ("tier" in node.metadata.labels and node.metadata.labels["tier"] == "minio" ):
            if ip_address:
                minio_node = address
            else:            
                minio_node = node.metadata.name
                
    return (manager_node, jobmanager_node, taskmanager_nodes, kafka_node, minio_node)