Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 74 additions & 20 deletions dataikuapi/dss/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DSSProject(object):
"""
A handle to interact with a project on the DSS instance.

Do not create this class directly, instead use :meth:`dataikuapi.DSSClient.get_project``
Do not create this class directly, instead use :meth:`dataikuapi.DSSClient.get_project`
"""
def __init__(self, client, project_key):
self.client = client
Expand Down Expand Up @@ -69,7 +69,7 @@ def rec(pf):
def move_to_folder(self, folder):
"""
Moves this project to a project folder
:param folder :class:`dataikuapi.dss.projectfolder.DSSProjectFolder
:param folder :class:`dataikuapi.dss.projectfolder.DSSProjectFolder`
"""
current_folder = self.get_project_folder()
current_folder.move_project_to(self.project_key, folder)
Expand Down Expand Up @@ -823,14 +823,13 @@ def start_job(self, definition):
"""
Create a new job, and return a handle to interact with it

Args:
definition: the definition for the job to create. The definition must contain the type of job (RECURSIVE_BUILD,
NON_RECURSIVE_FORCED_BUILD, RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD) and a list of outputs to build.
Optionally, a refreshHiveMetastore field can specify whether to re-synchronize the Hive metastore for recomputed
HDFS datasets.
:param dict definition: The definition should contain:

* the type of job (RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD, RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD)
* a list of outputs to build from the available types: (DATASET, MANAGED_FOLDER, SAVED_MODEL, STREAMING_ENDPOINT)
* (Optional) a refreshHiveMetastore field (True or False) to specify whether to re-synchronize the Hive metastore for recomputed HDFS datasets.

Returns:
A :class:`dataikuapi.dss.job.DSSJob` job handle
:returns: A :class:`dataikuapi.dss.job.DSSJob` job handle
"""
job_def = self.client._perform_json("POST", "/projects/%s/jobs/" % self.project_key, body = definition)
return DSSJob(self.client, self.project_key, job_def['id'])
Expand All @@ -839,11 +838,11 @@ def start_job_and_wait(self, definition, no_fail=False):
"""
Starts a new job and waits for it to complete.

Args:
definition: the definition for the job to create. The definition must contain the type of job (RECURSIVE_BUILD,
NON_RECURSIVE_FORCED_BUILD, RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD) and a list of outputs to build.
Optionally, a refreshHiveMetastore field can specify whether to re-synchronize the Hive metastore for recomputed
HDFS datasets.
:param dict definition: The definition should contain:

* the type of job (RECURSIVE_BUILD, NON_RECURSIVE_FORCED_BUILD, RECURSIVE_FORCED_BUILD, RECURSIVE_MISSING_ONLY_BUILD)
* a list of outputs to build from the available types: (DATASET, MANAGED_FOLDER, SAVED_MODEL, STREAMING_ENDPOINT)
* (Optional) a refreshHiveMetastore field (True or False) to specify whether to re-synchronize the Hive metastore for recomputed HDFS datasets.
"""
job_def = self.client._perform_json("POST", "/projects/%s/jobs/" % self.project_key, body = definition)
job = DSSJob(self.client, self.project_key, job_def['id'])
Expand Down Expand Up @@ -883,8 +882,8 @@ def list_jupyter_notebooks(self, active=False, as_type="object"):
:param bool as_type: How to return the list. Supported values are "listitems" and "objects".
:param bool active: if True, only return currently running jupyter notebooks.

:returns: The list of the notebooks. If "as_type" is "listitems", each one as a :class:`dataikuapi.dss.notebook.DSSJupyterNotebookListItem`,
if "as_type" is "objects", each one as a :class:`dataikuapi.dss.notebook.DSSJupyterNotebook`
:returns: The list of the notebooks. If "as_type" is "listitems", each one as a :class:`dataikuapi.dss.notebook.DSSJupyterNotebookListItem`, if "as_type" is "objects", each one as a :class:`dataikuapi.dss.notebook.DSSJupyterNotebook`

:rtype: list of :class:`dataikuapi.dss.notebook.DSSJupyterNotebook` or list of :class:`dataikuapi.dss.notebook.DSSJupyterNotebookListItem`
"""
notebook_items = self.client._perform_json("GET", "/projects/%s/jupyter-notebooks/" % self.project_key, params={"active": active})
Expand Down Expand Up @@ -968,7 +967,7 @@ def set_variables(self, obj):
WARNING: if executed from a python recipe, the changes made by `set_variables` will not be "seen" in that recipe.
Use the internal API dataiku.get_custom_variables() instead if this behavior is needed

@param dict obj: must be a modified version of the object returned by get_variables
:param dict obj: must be a modified version of the object returned by get_variables
"""
if not "standard" in obj:
raise ValueError("Missing 'standard' key in argument")
Expand Down Expand Up @@ -1035,17 +1034,39 @@ def get_api_service(self, service_id):
########################################################

def list_exported_bundles(self):
"""
:returns: A dictionary of all bundles for a project on the Design node.
"""
return self.client._perform_json("GET",
"/projects/%s/bundles/exported" % self.project_key)

def export_bundle(self, bundle_id):
"""
Creates a new project bundle on the Design node

:param str bundle_id: bundle id tag
"""
return self.client._perform_json("PUT",
"/projects/%s/bundles/exported/%s" % (self.project_key, bundle_id))

def get_exported_bundle_archive_stream(self, bundle_id):
"""
Download a bundle archive that can be deployed in a DSS automation Node, as a binary stream.
Warning: this stream will monopolize the DSSClient until closed.

.. warning::

The stream must be closed after use. Use a ``with`` statement to handle closing the stream at the end of the block by default. For example:

.. code-block:: python

with project.get_exported_bundle_archive_stream('v1') as fp:
# use fp

# or explicitly close the stream after use
fp = project.get_exported_bundle_archive_stream('v1')
# use fp, then close
fp.close()

"""
return self.client._perform_raw("GET",
"/projects/%s/bundles/exported/%s/archive" % (self.project_key, bundle_id))
Expand All @@ -1054,7 +1075,7 @@ def download_exported_bundle_archive_to_file(self, bundle_id, path):
"""
Download a bundle archive that can be deployed in a DSS automation Node into the given output file.

:param path if "-", will write to /dev/stdout
:param string path: if "-", will write to /dev/stdout
"""
if path == "-":
path= "/dev/stdout"
Expand Down Expand Up @@ -1090,15 +1111,34 @@ def publish_bundle(self, bundle_id, published_project_key=None):
########################################################

def list_imported_bundles(self):
"""
:returns: a dict containing bundle imports for a project, on the Automation node.
"""
return self.client._perform_json("GET",
"/projects/%s/bundles/imported" % self.project_key)

def import_bundle_from_archive(self, archive_path):
"""
Imports a bundle from a zip archive path on the Automation node.

:param str archive_path: A full path to a zip archive, for example `/home/dataiku/my-bundle-v1.zip`
"""
return self.client._perform_json("POST",
"/projects/%s/bundles/imported/actions/importFromArchive" % (self.project_key),
params = { "archivePath" : osp.abspath(archive_path) })

def import_bundle_from_stream(self, fp):
"""
Imports a bundle from a file stream, on the Automation node.

:param file-like fp: file handler. Usage example:

.. code-block:: python

project = client.get_project('MY_PROJECT')
with open('/home/dataiku/my-bundle-v1.zip', 'rb') as f:
project.import_bundle_from_stream(f)
"""
files = {'file': fp}
return self.client._perform_empty("POST",
"/projects/%s/bundles/imported/actions/importFromStream" % (self.project_key),
Expand All @@ -1120,6 +1160,11 @@ def activate_bundle(self, bundle_id, scenarios_to_enable=None):
"/projects/%s/bundles/imported/%s/actions/activate" % (self.project_key, bundle_id), body=options)

def preload_bundle(self, bundle_id):
"""
Preloads a bundle that has been imported on the Automation node

:param str bundle_id: the bundle_id for an existing imported bundle
"""
return self.client._perform_json("POST",
"/projects/%s/bundles/imported/%s/actions/preload" % (self.project_key, bundle_id))

Expand Down Expand Up @@ -1200,8 +1245,9 @@ def list_recipes(self, as_type="listitems"):
def get_recipe(self, recipe_name):
"""
Gets a :class:`dataikuapi.dss.recipe.DSSRecipe` handle to interact with a recipe

:param str recipe_name: The name of the recipe
:rtype :class:`dataikuapi.dss.recipe.DSSRecipe`
:rtype: :class:`dataikuapi.dss.recipe.DSSRecipe`
"""
return DSSRecipe(self.client, self.project_key, recipe_name)

Expand Down Expand Up @@ -1295,6 +1341,9 @@ def new_recipe(self, type, name=None):
########################################################

def get_flow(self):
"""
:rtype: A :class:`dataikuapi.dss.flow.DSSProjectFlow`
"""
return DSSProjectFlow(self.client, self)

########################################################
Expand Down Expand Up @@ -1681,6 +1730,11 @@ def with_refresh_metastore(self, refresh_metastore):
def with_output(self, name, object_type=None, object_project_key=None, partition=None):
"""
Adds an item to build in this job

:param name: name of the output object
:param object_type: type of object to build from: DATASET, MANAGED_FOLDER, SAVED_MODEL, STREAMING_ENDPOINT
:param object_project_key: PROJECT_KEY for the project that contains the object to build
:param partition: specify partition to build
"""
self.definition['outputs'].append({'type':object_type, 'id':name, 'projectKey':object_project_key, 'partition':partition})
return self
Expand Down