# MLflow Training TEST

In [1]:
import json, requests
from random import shuffle
from tabulate import tabulate
from IPython.core.magic_arguments import (argument, magic_arguments, parse_argstring)
import subprocess
import getpass
import yaml
import os, json
import os.path
import logging, sys
import base64
import contextlib
from dotenv import load_dotenv
import IPython.utils.capture


decode_format = 'utf-8'

VARIABLES_TO_REMOVE_FROM_ENV = ["http_proxy", "https_proxy", "HTTP_PROXY", "HTTPS_PROXY"]


# context manager to unset/update env variables while calling magics functions.
# to unset variables, pass them as parameters.
# to set variables, pass <var-name>=<var-value> pair
# source: https://stackoverflow.com/questions/2059482/python-temporarily-modify-the-current-processs-environment
@contextlib.contextmanager
def modified_environ(*remove, **update):
    """
    Temporarily updates the ``os.environ`` dictionary in-place.
    The ``os.environ`` dictionary is updated in-place so that the modification
    is sure to work in all situations.
    :param remove: Environment variables to remove.
    :param update: Dictionary of environment variables and values to add/update.
    """
    env = os.environ
    update = update or {}
    remove = remove or []

    # List of environment variables being updated or removed.
    stomped = (set(update.keys()) | set(remove)) & set(env.keys())
    # Environment variables and values to restore on exit.
    update_after = {k: env[k] for k in stomped}
    # Environment variables and values to remove on exit.
    remove_after = frozenset(k for k in update if k not in env)

    try:
        env.update(update)
        [env.pop(k, None) for k in remove]
        yield
    finally:
        env.update(update_after)
        [env.pop(k) for k in remove_after]

def replacePattern(sourceFile, pattern, value):
    try:
        fin = open(sourceFile, "rt")
        tmpData = fin.read()
        tmpData = tmpData.replace(pattern, value)
        fin.close()
        fin = open(sourceFile, "wt")
        fin.write(tmpData)
        fin.close()
    except Exception as inst:
        print('in replacePattern ')
        print(type(inst))
        print(inst.args)
        print(inst)

class TrainPython:

    def __init__(self, clustername, endpoints):
        self.clustername = clustername
        self.endpoints = endpoints
        self.history_url = None

# If mlflow variables exist in the designated file, it will be read and contents will be sent to training engine
    def train_python(self, line=None, code=None):
        with modified_environ(*VARIABLES_TO_REMOVE_FROM_ENV):
            shuffle(self.endpoints)
            url = self.endpoints[0] + "?auth=none"
            username = 'bluedata'
            Lines = ""
            try:
                file_name = '/home/' + getpass.getuser().strip() + '/.env'
                file = open(file_name, 'r') 
                Lines = file.readlines() 
                file.close()

            except:
                pass           
            d = {
                "env" : Lines,
                "training_code" : code,
                "calledBy": username
            }
            headers = {'content-type': 'application/json'}
            payload = json.dumps(d)
            json_response = requests.request("POST", url, data=payload, headers=headers).json()
            self.history_url = json_response['request_url']
            print("History URL: {0}".format(self.history_url))


class Logs:
    @magic_arguments()
    @argument('--url', help='History URL for logs.')
    def logs(self, line = None, code = None):
        with modified_environ(*VARIABLES_TO_REMOVE_FROM_ENV):
            args = parse_argstring(self.logs, line)
            if args.url is not None:
                args.url = args.url.replace("History URL: ", "").replace('"', "").rstrip()
                history_response = requests.request("GET", args.url + "?auth=none").json()[0]
                status = history_response['status']
                log_url = history_response['log_url'] + "?auth=none"
                print("Job Status: {0}".format(status))
                logs_json = requests.request("GET", log_url).json()
                if logs_json['logs']:
                    print(logs_json['logs'])
            else:
                print("No logs to show at this time")

class SetUser:
    @magic_arguments()
    @argument('--pwd', help='user to enable kubectl functionality for')
    def set_user(self, line = None, code = None):
        try:
            def getSession(ip, username, password, prefix):
                data = {"name":username, "password":password}
                headers  = {'content-type' : 'application/json'}
                if prefix == 'https':
                    response = requests.post(prefix +  "://" + ip + ":8080/api/v1/login", json=data, headers=headers, verify=False)
                else:
                    response = requests.post(prefix +  "://" + ip + ":8080/api/v1/login", json=data, headers=headers)
                session=response.headers['Location']
                return session

            def getKubeconfigUser(ip, username, password, prefix):
                session = getSession(ip, username, password, prefix)
                headers = { 'X-BDS-SESSION' : session}
                url = prefix + "://" + ip + ":8080/api/v2/k8skubeconfig"
                if prefix == 'https':
                    response = requests.request("GET", url, headers=headers, verify=False)
                else:
                    response = requests.request("GET", url, headers=headers)
                return response.text


            def getGateway(username):
                 with open('/etc/guestconfig/configmeta.json') as f:
                     data = json.load(f)
                     if 'kubeconfig' in data['connections']['secrets'] :
                         kubeconfigValues = data['connections']['secrets']['kubeconfig']
                         for d in kubeconfigValues:
                             kuser = base64.b64decode(d['labels']['kubedirector.hpe.com/username']).decode(decode_format).replace('\n', '')
                             if kuser == username:
                                 config = base64.b64decode(d['data']['config']).decode(decode_format)
                                 config = yaml.full_load(config)
                                 gateway = config['users'][0]['user']['exec']['args'][2][:-5]
                 return gateway
        
            args = parse_argstring(self.set_user, line)
            #set variables
            pwd = 'pass123'#  getpass.getpass()
            user = getpass.getuser().strip()

            gateway = getGateway(user)#f['users'][0]['user']['exec']['args'][2][:-5]
            #get the new kubeconfig and write it in home folder
            with open('/home/' + user + '/.kube/config', 'w') as file:
                try:
                    file.write(getKubeconfigUser(gateway, user, pwd, 'http'))
                except:
                    try:
                        file.write(getKubeconfigUser(gateway, user, pwd, 'https'))
                    except:
                        print("kubeconfig refresh failed")
                        return
            print("kubeconfig set for user " + user) 
            file.close()
        except:
              print("failed")

class ModelUpdate:
    @magic_arguments()
    @argument('--context', help='Name of current context, can be found in your kubeconfig')
    @argument('--namespace', help='Name of current namespace')
    @argument('--modelname', help='Name of model to update')
    @argument('--modelpath', help='(Optional) Path to updated model (such as repo://project_repo/models/tf.h5)')
    @argument('--scoringpath', help='(Optional) Path to updated scoring script (such as repo://project_repo/code/scoring.py)')
    def model_update(self, line = None, code = None):
        args = parse_argstring(self.model_update, line)
        if args.context is None:
            print("Please specify a valid context (--context), can be found in your kubeconfig")
            return
        if args.namespace is None:
            print("Please specify a valid namespace (--namespace)")
            return
        if args.modelname is None:
            print("Please specify a valid model name (--modelname)")
            return
        if args.scoringpath is not None:
            key = "scoring-path"
            cmd = f"""kubectl patch configmap --context {args.context} -n {args.namespace} {args.modelname} -p='{{"data":{{"{key}": "{args.scoringpath}"}}}}'"""
            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash')
            print((result.stdout).decode('utf-8'))
            print((result.stderr).decode('utf-8'))
        if args.modelpath is not None:
            key = "path"
            cmd = f"""kubectl patch configmap --context {args.context} -n {args.namespace} {args.modelname} -p='{{"data":{{"{key}": "{args.modelpath}"}}}}'"""
            result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash')          
            print((result.stdout).decode('utf-8'))
            print((result.stderr).decode('utf-8'))

class AttachmentsMagic:

    def __init__(self, attachments):
        self.attachments_map = attachments

    def attachments(self, line):
        table = []
        for clustername, cluster_details in self.attachments_map.items():
            row = [clustername, cluster_details["engine"]]
            table.append(row)
        print(tabulate(table, headers=['Training Cluster', 'ML Engine']))

# Read the environmental variables from 2 or more locations required to set for mlflow.
# Writes the contents in '.env' file of user and sets the environmental variables for current notebook.
# Creates a secret 'mlflow-dp' to be used for deployment cluster and applies it in the namespace.
# Secret is created in '.secret' directory of user home directory.
def loadMlflow(name):
    try:
        with open('/etc/guestconfig/configmeta.json') as f:
            def decode(inp):
                base64_message = inp
                base64_bytes = base64_message.encode('ascii')
                message_bytes = base64.b64decode(base64_bytes)
                message = message_bytes.decode('ascii')
                return message
    
            def eligible_clusters():
                cluster_list = list(data['connections']['clusters'])
                eligible_cluster_list =[]
                for cluster in cluster_list:
                    if data['connections']['clusters'][cluster]['nodegroups']['1']['distro_id'] == 'hpecp/mlflow':
                        eligible_cluster_list.append(cluster)
                f.close()    
                return eligible_cluster_list

            def runcommand (cmd):
                proc = subprocess.Popen(cmd,
                            stdout=subprocess.PIPE,
                            stderr=subprocess.PIPE,
                            shell=True,
                            universal_newlines=True)
                std_out, std_err = proc.communicate()
                return proc.returncode
    
            def encode_parameter(string):
                message_bytes = string.encode('ascii')
                base64_bytes = base64.b64encode(message_bytes)
                base64_message = base64_bytes.decode('ascii')   
                return base64_message

            data = json.load(f)
            scvalues = data['connections']['secrets']['mlflow']
            for d in scvalues:
                key_id = 'AWS_ACCESS_KEY_ID=' + base64.b64decode(d['data']['AWS_ACCESS_KEY_ID']).decode(decode_format).replace('\n', '')
                secret_access_key = 'AWS_SECRET_ACCESS_KEY=' + base64.b64decode(d['data']['AWS_SECRET_ACCESS_KEY']).decode(decode_format).replace('\n', '')

            cluster_name_list=eligible_clusters()
            for name in cluster_name_list:
                if list(data['connections']['clusters'][name]['nodegroups']['1']['roles']['controller']['services']['mlflow-server']['endpoints'])[0]:
                    mlflowui = 'MLFLOW_TRACKING_URI=' + list(data['connections']['clusters'][name]['nodegroups']['1']['roles']['controller']['services']['mlflow-server']['endpoints'])[0]
                    try:
                        artifactui = 'MLFLOW_S3_ENDPOINT_URL=' + base64.b64decode(d['data']['MLFLOW_S3_ENDPOINT_URL']).decode(decode_format).replace('\n', '')
                    except:
                        artifactui = 'MLFLOW_S3_ENDPOINT_URL=' + list(data['connections']['clusters'][name]['nodegroups']['1']['roles']['controller']['services']['minio-server']['endpoints'])[0]
            env_val = ('\n').join([key_id,secret_access_key,artifactui,mlflowui])
            name =  getpass.getuser().strip()  
            mlfile = "/home/" + name + "/.env"
            with open(mlfile, "w") as a:
                a.write(env_val)
            load_dotenv()
            print("Backend configured")

            # create mlflow deployment secret if it doesn't exist
            status = runcommand('kubectl get -o yaml secret mlflow-dp')
            if status:
                secret_dir = '/home/' + name + '/.secret/'
                os.system('mkdir -vp '+ secret_dir)
                standard_secret_file = '/opt/guestconfig/appconfig/templates/mlflow-dp'
                os.system('cp -f ' + standard_secret_file + ' ' + secret_dir)
                user_secret_file = secret_dir + 'mlflow-dp'
                encoded_key_id = encode_parameter(key_id[18:])
                encoded_secret_access_key = encode_parameter(secret_access_key[22:])
                encoded_url = encode_parameter(artifactui[23:])
                replacePattern(user_secret_file, '@@@AWS_ACCESS_KEY_ID@@@', encoded_key_id)
                replacePattern(user_secret_file, '@@@AWS_SECRET_ACCESS_KEY@@@', encoded_secret_access_key)
                replacePattern(user_secret_file, '@@@AWS_ENDPOINT_URL@@@', encoded_url)
                command = 'kubectl apply -f ' + user_secret_file
                os.system(command) 
    except:
        print("failed to set environmental variables for mlflow")

# Takes one argument and sets the name of the experiment and the current user executing it.
class SetExp:
    @magic_arguments()
    @argument('--name', help='Name of the MLflow experiment')
    def exp(self, line = None, code = None):
        args = parse_argstring(self.exp, line)
        if args.name is not None:
            import mlflow
            usr = getpass.getuser()
            mlflow.set_experiment(args.name)
            mlflow.set_tag('mlflow.user', usr)
        else:
            print("Failed to set mlflow experiment")

# Set livy endpoint for spark magic to be run for livy session
# Optional argument can be given to set livy endpoint.
# Password would be asked everytime the magic is invoked, if provided, authentication parameter set.
class SetLivy:
    @magic_arguments()
    @argument('--url', help='Name of the MLflow experiment')
    def set_url(self, line = None, code = None):
        try:
            user = getpass.getuser()
            args = parse_argstring(self.set_url, line)
            config_file = '/home/' + user + '/.sparkmagic/config.json'
            os.system('cp -f /opt/guestconfig/appconfig/templates/base_spark_config.json ' + config_file)
            if args.url is not None:
                replacePattern(config_file, '@@@LIVY_ENDPOINT@@@', args.url)
            else:
                LIVY_ENDPOINT = '@@@TENANT_ENDPOINT@@@'
                replacePattern(config_file, '@@@LIVY_ENDPOINT@@@', LIVY_ENDPOINT)
            password = getpass.getpass()
            USERNAME = ''
            if not password:
                PASSWORD = ''
                replacePattern(config_file, '@@@PASSWORD@@@', PASSWORD)
                replacePattern(config_file, '@@@USERNAME@@@', USERNAME)
            else:
                USERNAME = getpass.getuser()
                PASSWORD = password
                replacePattern(config_file, '@@@PASSWORD@@@', PASSWORD)
                replacePattern(config_file, '@@@USERNAME@@@', USERNAME)
            print("set parameters")
        except:
            print("unable to set parameters")

# By default register kubeRefresh and mlflow experiments. 
# Register other magic functions only if training engine is attached.
def load_ipython_extension(ipython):
    # The `ipython` argument is the currently active `InteractiveShell`
    # instance, which can be used in any way. This allows you to register
    # new magics or aliases, for example.
    ipython.register_magic_function(loadMlflow, "line_cell")
    setexp = SetExp()
    ipython.register_magic_function(setexp.exp, "line_cell", "Setexp")
    UserObj = SetUser()
    ipython.register_magic_function(UserObj.set_user, "line_cell", "kubeRefresh")
    LivyObj = SetLivy()
    ipython.register_magic_function(LivyObj.set_url, "line_cell", "setLivy")
    try:
        MAGIC_CONFIG_FILE = "/etc/notebook_cluster/magic-config.json"
        with open(MAGIC_CONFIG_FILE, "r") as f:
            names_to_endpoints = json.load(f)

        attachmentmagicobj = AttachmentsMagic(names_to_endpoints)
        ipython.register_magic_function(attachmentmagicobj.attachments, "line")
        modelUpdateObj = ModelUpdate()
        ipython.register_magic_function(modelUpdateObj.model_update, "line_cell", "modelupdate")
        for clustername, cluster_details in names_to_endpoints.items():
            endpoints = cluster_details["endpoints"]
            logsObj = Logs()
            ipython.register_magic_function(logsObj.logs, "line_cell", "logs")
            trainpythonobj = TrainPython(clustername, endpoints)
            ipython.register_magic_function(trainpythonobj.train_python, "line_cell", clustername)
            #ipython.register_magic_function(trainpythonobj.logs, "line_cell", "logs")
    except:
        print("configuring training magics failed")
        
load_ipython_extension(get_ipython())

In [2]:
%kubeRefresh
# enter the notebook user password in the prompt



kubeconfig set for user ad_user1




In [3]:
%attachments

Training Cluster        ML Engine
----------------------  -----------
trainingengineinstance  python


In [4]:
%%trainingengineinstance
print('test')

ConnectionError: HTTPConnectionPool(host='trainingengineinstance-loadbalancer-vpggh-0.trainingengineinstance7sgnx.hpecp-tenant-6-q84gp.svc.cluster.local', port=32700): Max retries exceeded with url: /train?auth=none (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7ff425be7a20>: Failed to establish a new connection: [Errno 111] Connection refused'))