In [None]:
# default_exp runner

In [None]:
#export
from datetime import datetime
from functools import lru_cache
import uuid
import os
from pathlib import Path
import tempfile
import yaml
import google.cloud.logging
from google.cloud.logging.handlers.handlers import CloudLoggingHandler, EXCLUDED_LOGGER_DEFAULTS

import logging

from blocks.filesystem import GCSFileSystem as gcsfs

In [None]:
class GCPConfig:

    @staticmethod
    @lru_cache(1)
    def bucket():
        return os.getenv("BUCKET") or input("Please enter the bucket path: ").strip()

    @staticmethod
    @lru_cache(1)
    def project_id():
        return os.getenv("PROJECT_ID") or google.auth.default()[1] or input("Please enter the project id: ").strip()


In [None]:
def _setup_logging():
    job = os.environ.get("CLOUD_ML_JOB_ID", None)
    trial = os.environ.get("CLOUD_ML_TRIAL_ID", None)
    project = os.environ.get("GCP_PROJECT", None)
    if job and project:
        client = google.cloud.logging.Client(project = project)
        resource = Resource(type = "ml_job", labels = dict(job_id = job, project_id = project, task_name = "master-replica-o"))
        # grouping by trial in AIP logs
        labels = {"ml.googleapis.com/trial_id": trial} if trial is not None else None
        handler = CloudLoggingHandler(client, resource=resource,labels=labels)
        logger = logging.getLogger()
        logger.handlers = []
        logger.setLevel(logging.DEBUG)
        logger.addHandler(handler)
        for logger_name in EXCLUDED_LOGGER_DEFAULTS:
            logging.getLogger(logger_name).propagate = False
    else:
        logger = logging.getLogger()
        logger.setLevel(logging.DEBUG)

In [None]:
#export
from googleapiclient import discovery
import warnings

class AIP:

    @property
    @lru_cache(1)
    def job_id(self):
        now = datetime.now()
        date_time = now.strftime("%Y%m%d_%H%M%S")
        gen_uunid = hex(uuid.getnode())
        return f"ai_run_{date_time}_{gen_uunid}"

    @property
    @lru_cache(1)
    def job_output(self):
        return GCPConfig.bucket()

    def run(self, image: str, machine_type: str = "n1-highmem-32", args=[], **overrides):
        """

        """
        # TODO: dealing with hyperparameters

        training_inputs = {
            "scaleTier": "CUSTOM",
            "masterType": machine_type,
            "args": args,
            "region": "us-central1",
            "masterConfig": {"imageUri": image}
        }
        training_inputs.update(overrides)
        
        job_spec = {"jobId": self.job_id, "trainingInput": training_inputs}
        project_id = "projects/{}".format(GCPConfig.project_id())

        _setup_logging()
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            cloudml = discovery.build("ml", "v1", cache_discovery = False)
            request = cloudml.projects().jobs().create(body = job_spec, parent=project_id)
            try:
                return request.execute()
            except e:
                logging.warn("You may want to check whether the image is in registery")
                raise e

In [None]:
import os
import yaml

with open(r'../credentials.yaml') as file:
    creds = yaml.load(file, Loader=yaml.FullLoader)

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds["google_credentials"]
image_url = creds["image_url"]

AIP().run(image_url)

{'jobId': 'ai_run_20200629_184259_0x8c8590a5b94c',
 'trainingInput': {'scaleTier': 'CUSTOM',
  'masterType': 'n1-highmem-32',
  'region': 'us-central1',
  'masterConfig': {'imageUri': 'gcr.io/wmt-customer-tech-case-sci-dev/yoda'}},
 'createTime': '2020-06-30T01:43:02Z',
 'state': 'QUEUED',
 'trainingOutput': {},
 'etag': '67/mbu2qEHk='}

In [None]:
#export
def _must_exist(key, dict_):
    assert key in dict_, "%r should be in the dictionary" % key

def _validate_config(conf_dict):
    _must_exist("image", conf_dict)
    _must_exist("data", conf_dict)
    _must_exist("output_path", conf_dict["data"])

def run_yoda_on_gcp(conf_dict):
    # validate config
    _validate_config(conf_dict)

    # upload config to gcp
    gcs_config_path = os.path.join(conf_dict["data"]["output_path"], "config.yaml")
    with gcsfs().open(gcs_config_path, "w") as f:
        yaml.safe_dump(conf_dict, f)

    # run on GCP
    args="yoda run {}".format(gcs_config_path).split()
    image=conf_dict["image"]
    aip = AIP()
    aip.run(image, args=args)

In [None]:
class FormatTag(yaml.YAMLObject):
    """
    This tag supporting: NOW, EPOCH, and anything from environment variable
    """
    yaml_tag = u'!format'
    yaml_loader = yaml.SafeLoader

    @classmethod
    def from_yaml(cls, loader, node):
        import calendar
        import time

        fillin_dict = dict(os.environ)
        update_dict = {
            "NOW": time.strftime("%Y%m%d_%H%M%S"),
            "EPOCH": calendar.timegm(time.gmtime()),
        }
        fillin_dict.update(update_dict)
        values = loader.construct_scalar(node)
        return values.format(**fillin_dict)

In [None]:
config2 = '../data/configs/config2.yaml'
with open(config2) as f:
    print(f.read())

os.environ["BUCKET"] = "testjobsubmit"
conf_dict2 = yaml.safe_load(open(config2))

image: "gcr.io/wmt-customer-tech-case-sci-dev/yoda:v2"
data: 
  input_path: !format "gs://{BUCKET}/{USER}/test/iris_data.csv"
  eval_path: !format "gs://{BUCKET}/{USER}/test/iris_data.csv"
  output_path: !format "gs://{BUCKET}/{USER}/test/output/"
  features: "sepal_length,sepal_width,petal_length"
  label: species
train:
  estimator: xgboost.XGBClassifier
  params:
    max_depth: 4
    num_estimator: 50
eval:
  metrics: "accuracy,f1_macro"


In [None]:
conf_dict2

{'image': 'gcr.io/wmt-customer-tech-case-sci-dev/yoda:v2',
 'data': {'input_path': 'gs://testjobsubmit/j0l04cl/test/iris_data.csv',
  'eval_path': 'gs://testjobsubmit/j0l04cl/test/iris_data.csv',
  'output_path': 'gs://testjobsubmit/j0l04cl/test/output/',
  'features': 'sepal_length,sepal_width,petal_length',
  'label': 'species'},
 'train': {'estimator': 'xgboost.XGBClassifier',
  'params': {'max_depth': 4, 'num_estimator': 50}},
 'eval': {'metrics': 'accuracy,f1_macro'}}

In [None]:
conf_dict2["image"] = 'gcr.io/wmt-customer-tech-case-sci-dev/yoda:test'

In [None]:
conf_dict2

{'image': 'gcr.io/wmt-customer-tech-case-sci-dev/yoda:test',
 'data': {'input_path': 'gs://testjobsubmit/j0l04cl/test/iris_data.csv',
  'eval_path': 'gs://testjobsubmit/j0l04cl/test/iris_data.csv',
  'output_path': 'gs://testjobsubmit/j0l04cl/test/output/',
  'features': 'sepal_length,sepal_width,petal_length',
  'label': 'species'},
 'train': {'estimator': 'xgboost.XGBClassifier',
  'params': {'max_depth': 4, 'num_estimator': 50}},
 'eval': {'metrics': 'accuracy,f1_macro'}}

In [None]:
run_yoda_on_gcp(conf_dict2)

INFO:root:Copying /var/folders/2k/b58ly_192yjgtv76zjxqj6f8_9cn2g/T/tmpnlk15dqr to gs://testjobsubmit/j0l04cl/test/output/config.yaml...
DEBUG:googleapiclient.discovery:URL being requested: GET https://www.googleapis.com/discovery/v1/apis/ml/v1/rest
DEBUG:googleapiclient.discovery:URL being requested: POST https://ml.googleapis.com/v1/projects/wmt-customer-tech-case-sci-dev/jobs?alt=json
DEBUG:google_auth_httplib2:Making request: POST https://oauth2.googleapis.com/token
