Skip to content

Commit

Permalink
feat: add workflows PC-398 (#341)
Browse files Browse the repository at this point in the history
* chore: add workflow models and serializers

* feat(workflows): add list and runs command

* feat(workflows): workflows and workflow runs get and list command

* feat(workflows): create, run commands

* feat(workflows): create run and logs command

* fix: update workflowId to id

* using --show-runs flag

* fix logs command
  • Loading branch information
reynld committed Jan 9, 2021
1 parent 91cb0f7 commit 35d615b
Show file tree
Hide file tree
Showing 12 changed files with 703 additions and 0 deletions.
1 change: 1 addition & 0 deletions gradient/api_sdk/clients/__init__.py
Expand Up @@ -15,3 +15,4 @@
from .storage_provider_client import StorageProvidersClient
from .sdk_client import SdkClient
from .tensorboards_client import TensorboardClient
from .workflow_client import WorkflowsClient
2 changes: 2 additions & 0 deletions gradient/api_sdk/clients/sdk_client.py
@@ -1,6 +1,7 @@
from . import DeploymentsClient, ExperimentsClient, HyperparameterJobsClient, ModelsClient, ProjectsClient, \
MachinesClient, NotebooksClient, SecretsClient
from .job_client import JobsClient
from .workflow_client import WorkflowsClient
from .. import logger as sdk_logger


Expand All @@ -19,3 +20,4 @@ def __init__(self, api_key, logger=sdk_logger.MuteLogger()):
self.machines = MachinesClient(api_key=api_key, logger=logger)
self.notebooks = NotebooksClient(api_key=api_key, logger=logger)
self.secrets = SecretsClient(api_key=api_key, logger=logger)
self.workflows = WorkflowsClient(api_key=api_key, logger=logger)
136 changes: 136 additions & 0 deletions gradient/api_sdk/clients/workflow_client.py
@@ -0,0 +1,136 @@
from .base_client import BaseClient
from .. import models, repositories
from ...exceptions import ReceivingDataFailedError


class WorkflowsClient(BaseClient):

def create(self, name, project_id):
"""Create workflow with spec
:param str name: workflow name
:param str project_id: project id
:returns: workflow create response
:rtype: list[models.Workflow]
"""

repository = self.build_repository(repositories.CreateWorkflow)
workflow = repository.create(name=name, project_id=project_id)
return workflow

def run_workflow(self, spec, inputs, workflow_id, cluster_id):
"""Create workflow with spec
:param obj spec: workflow spec
:param obj inputs: workflow inputs
:param str workflow_id: workflow id
:param str cluster_id: cluster id
:returns: workflow create response
:rtype: list[models.Workflow]
"""

repository = self.build_repository(repositories.CreateWorkflowRun)
workflow = repository.create(spec=spec, inputs=inputs, id=workflow_id, cluster_id=cluster_id)
return workflow

def list(self, project_id):
"""List workflows by project
:param str project_id: project ID
:returns: list of workflows
:rtype: list[models.Workflow]
"""

repository = self.build_repository(repositories.ListWorkflows)
workflows = repository.list(project_id=project_id)
return workflows

def get(self, workflow_id):
"""Get a Workflow
:param str workflow_id: Workflow ID [required]
:returns: workflow
:rtype: models.Workflow
"""
repository = self.build_repository(repositories.GetWorkflow)
return repository.get(id=workflow_id)


def list_runs(self, workflow_id):
"""List workflows runs by workflow id
:param str workflow_id: workflow ID
:returns: list of workflow runs
"""

repository = self.build_repository(repositories.ListWorkflowRuns)
workflows_runs = repository.get(id=workflow_id)
return workflows_runs

def get_run(self, workflow_id, run):
"""List workflows runs by workflow id
:param str workflow_id: workflow ID
:param str run: run count
:returns: list of workflow runs
"""

repository = self.build_repository(repositories.GetWorkflowRun)
workflows_runs = repository.get(id=workflow_id, run=run)
return workflows_runs

def yield_logs(self, job_id, line=1, limit=10000):
"""Get log generator. Polls the API for new logs
.. code-block:: python
:linenos:
:emphasize-lines: 2
job_logs_generator = job_client.yield_logs(
job_id='Your_job_id_here',
line=100,
limit=100
)
:param str job_id:
:param int line: line number at which logs starts to display on screen
:param int limit: maximum lines displayed on screen, default set to 10 000
:returns: generator yielding LogRow instances
:rtype: Iterator[models.LogRow]
"""

repository = self.build_repository(repositories.ListWorkflowLogs)
logs = repository.yield_logs(id=job_id, line=line, limit=limit)
return logs

def logs(self, job_id, line=1, limit=10000):
"""Get log generator. Polls the API for new logs
.. code-block:: python
:linenos:
:emphasize-lines: 2
job_logs_generator = job_client.yield_logs(
job_id='Your_job_id_here',
line=100,
limit=100
)
:param str job_id:
:param int line: line number at which logs starts to display on screen
:param int limit: maximum lines displayed on screen, default set to 10 000
:returns: generator yielding LogRow instances
:rtype: Iterator[models.LogRow]
"""

repository = self.build_repository(repositories.ListWorkflowLogs)
logs = repository.list(id=job_id, line=line, limit=limit)
return logs
1 change: 1 addition & 0 deletions gradient/api_sdk/models/__init__.py
Expand Up @@ -20,3 +20,4 @@
from .tag import Tag
from .tensorboard import Instance, Tensorboard
from .vm_type import VmType, VmTypeGpuModel
from .workflows import Workflow, WorkflowRun, WorkflowSpec
41 changes: 41 additions & 0 deletions gradient/api_sdk/models/workflows.py
@@ -0,0 +1,41 @@
import datetime

import attr


@attr.s
class Workflow(object):
id = attr.ib(type=str, default=None)
team_id = attr.ib(type=int, default=None)
project_id = attr.ib(type=int, default=None)
name = attr.ib(type=str, default=None)
workflow_spec_id = attr.ib(type=str, default=None)
dt_deleted = attr.ib(type=datetime.datetime, default=None)
dt_created = attr.ib(type=datetime.datetime, default=None)
dt_modified = attr.ib(type=datetime.datetime, default=None)

@attr.s
class WorkflowSpec(object):
id = attr.ib(type=str, default=None)
data = attr.ib(type=str, default=None)
hash_sha256 = attr.ib(type=str, default=None)
dt_created = attr.ib(type=datetime.datetime, default=None)

@attr.s
class WorkflowRun(object):
id = attr.ib(type=str, default=None)
team_id = attr.ib(type=int, default=None)
workflow_id = attr.ib(type=str, default=None)
cluster_id = attr.ib(type=int, default=None)
user_id = attr.ib(type=int, default=None)
workflow_spec_id = attr.ib(type=str, default=None)
seq_num = attr.ib(type=int, default=None)
timeout = attr.ib(type=int, default=None)
workflow_phase_id = attr.ib(type=int, default=None)
name = attr.ib(type=str, default=None)
message = attr.ib(type=str, default=None)
dt_status = attr.ib(type=datetime.datetime, default=None)
dt_started = attr.ib(type=datetime.datetime, default=None)
dt_finished = attr.ib(type=datetime.datetime, default=None)
dt_created = attr.ib(type=datetime.datetime, default=None)
dt_modified = attr.ib(type=datetime.datetime, default=None)
1 change: 1 addition & 0 deletions gradient/api_sdk/repositories/__init__.py
Expand Up @@ -23,3 +23,4 @@
from .storage_providers import ListStorageProviders, CreateStorageProvider, DeleteStorageProvider, \
GetStorageProvider, UpdateStorageProvider
from .tensorboards import CreateTensorboard, GetTensorboard, ListTensorboards, UpdateTensorboard, DeleteTensorboard
from .workflows import ListWorkflows, GetWorkflow, ListWorkflowRuns, GetWorkflowRun, CreateWorkflow, CreateWorkflowRun, ListWorkflowLogs
148 changes: 148 additions & 0 deletions gradient/api_sdk/repositories/workflows.py
@@ -0,0 +1,148 @@
from .common import BaseRepository, ListResources, GetResource, ListLogs
from .. import config, serializers
from ..clients import http_client
import json

class WorkflowsMixin(object):
SERIALIZER_CLS = serializers.WorkflowSchema

@staticmethod
def _get_api_url(**kwargs):
return config.config.CONFIG_HOST


class ListWorkflows(WorkflowsMixin, ListResources):
def get_request_url(self, **kwargs):
project_id = kwargs.get("project_id")
if project_id is not None:
return "/workflows?filter[where][projectId]={}".format(project_id)
return "/workflows"

def _get_instances(self, response, **kwargs):
if not response.data:
return []

objects = self._parse_objects(response.data, **kwargs)
return objects

class GetWorkflow(WorkflowsMixin, BaseRepository):
def get_request_url(self, **kwargs):
return "/workflows/{}".format(kwargs.get("id"))

def get(self, **kwargs):
json_ = self._get_request_json(kwargs)
params = self._get_request_params(kwargs)
url = self.get_request_url(**kwargs)
client = self._get_client(**kwargs)
response = self._send_request(client, url, json=json_, params=params)
gradient_response = http_client.GradientResponse.interpret_response(response)

if not gradient_response.data:
return {}

return gradient_response.data


class WorkflowRunsMixin(object):
@staticmethod
def _get_api_url(**kwargs):
return config.config.CONFIG_HOST

class ListWorkflowRuns(WorkflowRunsMixin, BaseRepository):
@staticmethod
def _get_api_url(**kwargs):
return config.config.CONFIG_HOST


def get_request_url(self, **kwargs):
return "/workflows/{}/runs".format(kwargs.get("id"))

def get(self, **kwargs):
json_ = self._get_request_json(kwargs)
params = self._get_request_params(kwargs)
url = self.get_request_url(**kwargs)
client = self._get_client(**kwargs)
response = self._send_request(client, url, json=json_, params=params)
gradient_response = http_client.GradientResponse.interpret_response(response)

if not gradient_response.data:
return []

return gradient_response.data

class GetWorkflowRun(WorkflowRunsMixin, BaseRepository):
def get_request_url(self, **kwargs):
return "/workflows/{}/runs/{}".format(kwargs.get("id"), kwargs.get("run"))

def get(self, **kwargs):
json_ = self._get_request_json(kwargs)
params = self._get_request_params(kwargs)
url = self.get_request_url(**kwargs)
client = self._get_client(**kwargs)
response = self._send_request(client, url, json=json_, params=params)
gradient_response = http_client.GradientResponse.interpret_response(response)

if not gradient_response.data:
return {}

return gradient_response.data


class CreateWorkflow(WorkflowsMixin, BaseRepository):
def get_request_url(self, **kwargs):
return "/workflows"

def _get_request_json(self, kwargs):
return {"name": kwargs.get("name"), "projectId": kwargs.get("project_id")}

def _send_request(self, client, url, json=None, params=None):
response = client.post(url, json=json, params=params)
return response

def create(self, **kwargs):
response = self._get(**kwargs)
self._validate_response(response)

if not response.data:
return {}

return response.data

class CreateWorkflowRun(WorkflowsMixin, BaseRepository):
def get_request_url(self, **kwargs):
return "/workflows/{}/runs".format(kwargs.get("id"))

def _get_request_json(self, **kwargs):
if kwargs.get("inputs") is not None:
return {"spec": kwargs.get("spec"), "clusterId": kwargs.get("cluster_id"), "run": True, "markDefault": False, "inputs": kwargs.get("inputs") }

return {"spec": kwargs.get("spec"), "clusterId": kwargs.get("cluster_id"), "run": True, "markDefault": False }

def _send_create_request(self, **kwargs):
url = self.get_request_url(**kwargs)
client = self._get_client(**kwargs)
json_ = self._get_request_json(**kwargs)

response = client.post(url, json=json_)
gradient_response = http_client.GradientResponse.interpret_response(response)

json_formatted_str = json.dumps(gradient_response.data, indent=4)
return gradient_response

def create(self, **kwargs):
response = self._send_create_request(**kwargs)
self._validate_response(response)

if not response.data:
return {}

return response.data

class ListWorkflowLogs(ListLogs):
def _get_request_params(self, kwargs):
params = {
"jobId": kwargs["id"],
"line": kwargs["line"],
"limit": kwargs["limit"],
}
return params
1 change: 1 addition & 0 deletions gradient/api_sdk/serializers/__init__.py
Expand Up @@ -17,3 +17,4 @@
from .tag import TagSchema
from .tensorboard import InstanceSchema, TensorboardSchema, TensorboardDetailSchema
from .vm_type import VmTypeSchema, VmTypeGpuModelSchema
from .workflows import WorkflowSchema, WorkflowRunSchema, WorkflowSpecSchema

0 comments on commit 35d615b

Please sign in to comment.