From 59032d7ded183b361c7745a36427c40ae5b608a3 Mon Sep 17 00:00:00 2001 From: David Justo Date: Thu, 18 Jun 2020 11:34:28 -0700 Subject: [PATCH] Promote to master (#130) * task_utility docstrings * ActivityType docstrings Contains refactoring to conform to python naming conventions to * Doc strings for Actions Call Activity and CallActivity with Retry * Doc Strings History Included refactor for enum naming conventions in python * Final docstring bits docstrings for tasks and json utilities * add waitforexternalevent basic files * fix bugs to make waitForExternalEvent working * add waitforexternalevent samples * remove explicit binding_info imp defined in conftest as a fixture, explicit import not required. * demo sample for function chaining with docstring and json changes * demo sample for function chaining with docstring and json changes * added dev pipeline status in README * minor fixes(variable name, delete comment) * flake8 fixes * add docstrings * implement task_any function * change pip install library and add docstrings to samples * change pip install library and add docstrings to samples * unittest for waitforexternalevents * fix bugs after merging dev * fix flake8 * Base implementation of tests test initial call test get activity and build of task set * parrot values success * test full complete flow * test failed scenario add missing bits in task_all to account for a failed task * docstring to numpy format * minor changes (rename, remove logging) * unittest for task_any, added tasks_test_utils * add class __eq__ function for Waitforexternalevent actions * add samples readme doc * fix flake8 * Refactoring HistoryEvent Get rid of all of those hardcoded property value references * add docstrings for HistoryEvent class * Refactor json conversion refactor the classes that are parsing json strings to remove all of the hardcoded property names also allows for the classes dangle additional attributes that may be present but not explicitly used for construction * simple Fan out fan in sample * Fix flake errors * Remove local debugging bits * remove state in task_any * add handle faulted task_any case +unittest * Undo De Morgan's Law cause it's really hard to read * replace filters with list comprehension more concise/readable * Add documentation for tracking API implementation * move datetime format string to azure package * replace filter with list comprehension more concise method * remove extra zimezone from format causing parsing warnings * Push context initialization our of handle method have this dependency injected instead of built within * able to pass in tasksets to task_any and task_all * update unittest for adding timestamp to taskset, add unittest for passing taskset to task_any as args * fix bugs in task_all(when all tasks fail), and fix unittest for that scenario * fix flake8 * test from orchestrator level(draft) * Remove IFunctionContext abstraction unneeded layer of abstraction with a DUCK typing language like Python * Starting of schema validation bits * createCheckStatusResponse() * wire up schema validation into the orchestrator tests * Test commit * fix flake 8 issues * fix pytest, remove task_any_tests from orchestrator level * fix flake8 * implement raise_event api, fix docstring * add unittest, create separate methods, fix naming style, handle placeholder in url validation * Fan Out Fan In sample uses tensorflow to predict whether images from bing are dogs or cats * fix flake8 * add aiohttp to requirement.txt and setup.py * add async await to start_new and raise_event api, flake8 * update api_implementation_status * fix variable naming style in docstring * update sample for external events, update readme in sample/external_events * Refactoring and docstrings removed the get generation data function. With the configurability, we can increase the volume of images to simulate the high CPU scenario. No need to fake it now. refactored some of the larger, multi task functions to create better readability added some docstrings on the functions to aid in understanding what is going on without the visual aid of powerpoint * Continue as new implementation * new_guid implementation * Fix flake8 issues * update sample to simple version * add func.httpresponse in durableorchestratorClient.py * update docstring for createcheckstatusresponse api * fix flake8 * update sample for create_check_status_response fixes * update pytest for the changes in createcheckstatusresponse api, add azure-function to requirement.txt for pytest * Implementation of call_http * Remove traceback print statement * remove url validator * Updates from demo feedback rename guid to uuid attach additional context attributes to an attribute of the durable context, not directly to the durable context * Fix flake8 * copy paste error giving a more descriptive orchestrator name * remove abcd from sample * Update API_IMPLEMENTATION_STATUS.md Sprint 7 update * Unit tests for call_http found a couple of bugs with these * merged shervyna changes * Fix the squiggles mostly formatting/inspection surpession * Move iAction moving and renaming IAction to Action * Update actions to implement action base class * remove the squiggles some minor formatting updates * Fix format of docstring * update to use rpc_base_url for start new and raise event urls * minor updates docstring on new function add Action to __init__ for actions module * flake8 bits * Add action_type property of base class * update docstrings per convention * docstring convention updates made sure docstrings conformed to the same format and indentions * Base class implementations for get_status * refactor massive string into a Dict[str, Any] object makes for a more readable bit of code * Move duplicate test rpc url constants to centralized location * adding updated contributing and getting started guides * get_status implementation includes refactoring of aiohttp calls to be able to test the rest of the bits without this server requirement tests for GetStatusOptions included base test for get_status_success * test for DurableOrchestrationStatus parsing * Add non ok message tests * add raises exception test * get_status_by and get_status_all implementation * flake8 fixes * fix docstring for get_status_by parameters copy paste error * addig nox to the project - updated requirements file * Updated noxfile to include flake, move getting started to investigations folder * typo in the commit from Jscript to python * PR recommendations remove PyCharm # noinspection tags move get and post async functions to util module replace abc123 stand-in test value with a uuid stand-in * add docstrings for new public functions * Continued Client API implementation purge_instance_history purge_instance_history_by terminate wait_for_completion_or_create_check_status_response * bug fixed for retry function. * refactor tests remove the redundant function declarations * unit tests for purge history and terminate * unit tests for wait or create check response includes bug fixes found from tests * update implementation status with features included in branch * update name match the name of the class under test * change to use asynio sleep don't want a blocking sleep call here * Use azure-functions>=1.2.0 * Make DurableOrchestrationContext available for import * Update start_new type hints to return str * Add get_input to DurableOrchestrationContext * Allow passing OrchestrationContext to create - maintain backwards compatibility when JSON body is passed * Change str to Awaitable[str] * Add get_input tests, get_input always returns str * Fix flake8 errors * add furl to setup.py * Fix merge error * # This is a combination of 3 commits. (#58) Fix samples to use the latest func.OrchestrationContext annotation Use new syntax to replace main endpoint Fix old python_durable_bindings samples * Replace badge with the new Azure Functions Python pipeline * edited contributor guide * furl is required for this project - added it to the requirements.doc * fixed pytest-asyncio in requirements.txt * fixed pytest-asyncio in requirements.txt * fixed pytest-asyncio in requirements.txt * Remove static method tag (#72) Remove static method tag (#72): residual bits left over from when a part of the client class. Causing failures in current state. Also removing unnecessary wrapper of http helper calls * Remove furl from RpcManagementOptions only durable class that is requiring this package, and is only using it to construct a url query string. Lighter requirements list to just use string joining * release artifacts * Added DateUtil and Furl to install_requires * changelog and setup.py changes * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * tested version of pipeline yaml * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipeline * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * Update azure-pipelines.yml for Azure Pipelines * setup.py CD testing * setup.py CD testing * PyPI testing * added info logging to the external event sample * Correctness and documentation of the samples (#85) * validated all samples except tensorflow * deleted non-samples, tested tensorflow sample, added types and comments * new readmes for fan-out-fan-in and function-chaining * renamed some functions so they are easier to reason about * disable aiohttp's input validation... potentially unsafe. ExternalEvents now shows no warnings! * Fixed Contributor Guide Issues * Fixed Contributor Guide Issues * Input value not required for start_new * input values not required for start_new * Add limitations, link to quickstart, update samples * Update example * Add link * Update words * Add versions * Update links * versioning-via-tags is enabled (#101) * updated readme for external events (#103) * Readme file for FanOutFanIn (#104) * Readme file for FanOutFanIn * Readme file for FanOutFanIn * Readme file for FanOutFanIn * Add activity trigger return type sample (#105) * Add sample for activity trigger type checks * Remove extensions.csproj Co-authored-by: Priya Ananthasankar * misc contributions (#108) * Set Custom Status (#110) * custom status * custom status * custom status * custom status * custom status * custom status * custom status * custom status * set custom status rework comments * Enabled nox to recognize docstring formatting errors (#122) * enables trivial orchestrators * added flake8-docstring, fixed arg to flake8 ,fixed formatting on noxfile * removed accidental commit from other PR * enables trivial orchestrators (#121) * removed grpc folder (#120) * [WIP] Semi-automatic type-serialization (#109) * Promote to master (#99) (#126) Co-authored-by: Shawn Gaul Co-authored-by: Shervyna Ruan Co-authored-by: Shawn Gaul Co-authored-by: asedighi Co-authored-by: Anthony Co-authored-by: Anthony Chu Co-authored-by: Hanzhang Zeng (Roger) <48038149+Hazhzeng@users.noreply.github.com> Co-authored-by: Hanzhang Zeng (Roger) Co-authored-by: David Justo Co-authored-by: Priya Ananthasankar Co-authored-by: Shawn Gaul Co-authored-by: Shervyna Ruan Co-authored-by: Shawn Gaul Co-authored-by: asedighi Co-authored-by: Anthony Co-authored-by: Anthony Chu Co-authored-by: Hanzhang Zeng (Roger) <48038149+Hazhzeng@users.noreply.github.com> Co-authored-by: Hanzhang Zeng (Roger) * Promote to master (#99) (#127) * Monitoring Feature (#119) * updated CHANGELOG (#128) * Changelog includes links (#129) * updated CHANGELOG * improved changelog descriptions w/ issue links Co-authored-by: Shawn Gaul Co-authored-by: Shervyna Ruan Co-authored-by: Priya Ananthasankar Co-authored-by: Shawn Gaul Co-authored-by: asedighi Co-authored-by: Anthony Co-authored-by: Anthony Chu Co-authored-by: Hanzhang Zeng (Roger) <48038149+Hazhzeng@users.noreply.github.com> Co-authored-by: Hanzhang Zeng (Roger) --- CHANGELOG.md | 9 ++ .../models/DurableOrchestrationContext.py | 3 +- .../models/actions/CreateTimerAction.py | 42 +++++++++ .../models/actions/__init__.py | 4 +- azure/durable_functions/tasks/__init__.py | 4 +- azure/durable_functions/tasks/create_timer.py | 41 +++++++++ .../durable_functions/tasks/task_utilities.py | 24 ++++++ azure/durable_functions/tasks/timer_task.py | 47 ++++++++++ samples/aml_monitoring/.funcignore | 5 ++ samples/aml_monitoring/.gitignore | 44 ++++++++++ .../aml_monitoring/.vscode/extensions.json | 5 ++ .../aml_durable_orchestrator/__init__.py | 45 ++++++++++ .../aml_durable_orchestrator/function.json | 11 +++ .../aml_monitoring/aml_pipeline/__init__.py | 33 +++++++ .../aml_monitoring/aml_pipeline/function.json | 12 +++ .../aml_poll_status/__init__.py | 50 +++++++++++ .../aml_poll_status/function.json | 12 +++ samples/aml_monitoring/extensions.csproj | 11 +++ samples/aml_monitoring/host.json | 3 + samples/aml_monitoring/proxies.json | 4 + samples/aml_monitoring/requirements.txt | 2 + samples/aml_monitoring/shared/__init__.py | 0 samples/aml_monitoring/shared/aml_helper.py | 85 +++++++++++++++++++ samples/aml_monitoring/shared/auth_helper.py | 34 ++++++++ 24 files changed, 527 insertions(+), 3 deletions(-) create mode 100644 azure/durable_functions/models/actions/CreateTimerAction.py create mode 100644 azure/durable_functions/tasks/create_timer.py create mode 100644 azure/durable_functions/tasks/timer_task.py create mode 100644 samples/aml_monitoring/.funcignore create mode 100644 samples/aml_monitoring/.gitignore create mode 100644 samples/aml_monitoring/.vscode/extensions.json create mode 100644 samples/aml_monitoring/aml_durable_orchestrator/__init__.py create mode 100644 samples/aml_monitoring/aml_durable_orchestrator/function.json create mode 100644 samples/aml_monitoring/aml_pipeline/__init__.py create mode 100644 samples/aml_monitoring/aml_pipeline/function.json create mode 100644 samples/aml_monitoring/aml_poll_status/__init__.py create mode 100644 samples/aml_monitoring/aml_poll_status/function.json create mode 100644 samples/aml_monitoring/extensions.csproj create mode 100644 samples/aml_monitoring/host.json create mode 100644 samples/aml_monitoring/proxies.json create mode 100644 samples/aml_monitoring/requirements.txt create mode 100644 samples/aml_monitoring/shared/__init__.py create mode 100644 samples/aml_monitoring/shared/aml_helper.py create mode 100644 samples/aml_monitoring/shared/auth_helper.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2462d409..2144672a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,15 @@ All notable changes to this project will be documented in this file. +## 1.0.0b6 + +- [Create timer](https://github.com/Azure/azure-functions-durable-python/issues/35) functionality available + +## 1.0.0b5 + +- [Object serialization](https://github.com/Azure/azure-functions-durable-python/issues/90) made available +- [Can set custom status](https://github.com/Azure/azure-functions-durable-python/issues/117) of orchestration + ## 1.0.0b3-b4 - Release to test CD pipeline with push to PyPI diff --git a/azure/durable_functions/models/DurableOrchestrationContext.py b/azure/durable_functions/models/DurableOrchestrationContext.py index 9d748316..c975ba88 100644 --- a/azure/durable_functions/models/DurableOrchestrationContext.py +++ b/azure/durable_functions/models/DurableOrchestrationContext.py @@ -9,7 +9,7 @@ from ..models.Task import Task from ..models.TokenSource import TokenSource from ..tasks import call_activity_task, task_all, task_any, call_activity_with_retry_task, \ - wait_for_external_event_task, continue_as_new, new_uuid, call_http + wait_for_external_event_task, continue_as_new, new_uuid, call_http, create_timer_task from azure.functions._durable_functions import _deserialize_custom_object @@ -51,6 +51,7 @@ def __init__(self, self.continue_as_new = lambda i: continue_as_new(input_=i) self.task_any = lambda t: task_any(tasks=t) self.task_all = lambda t: task_all(tasks=t) + self.create_timer = lambda d: create_timer_task(state=self.histories, fire_at=d) self.decision_started_event: HistoryEvent = \ [e_ for e_ in self.histories if e_.event_type == HistoryEventType.ORCHESTRATOR_STARTED][0] diff --git a/azure/durable_functions/models/actions/CreateTimerAction.py b/azure/durable_functions/models/actions/CreateTimerAction.py new file mode 100644 index 00000000..f2c918eb --- /dev/null +++ b/azure/durable_functions/models/actions/CreateTimerAction.py @@ -0,0 +1,42 @@ +from typing import Any, Dict + +from .ActionType import ActionType +from ..utils.json_utils import add_attrib, add_datetime_attrib +import datetime + + +class CreateTimerAction: + """Defines the structure of the Create Timer object. + + Returns + ------- + Information needed by durable extension to schedule the activity + + Raises + ------ + ValueError + if the event fired is not of valid datetime object + """ + + def __init__(self, fire_at: datetime, is_cancelled: bool = False): + self.action_type: ActionType = ActionType.CREATE_TIMER + self.fire_at: datetime = fire_at + self.is_cancelled: bool = is_cancelled + + if not isinstance(self.fire_at, datetime.date): + raise ValueError("fireAt: Expected valid datetime object but got ", self.fire_at) + + def to_json(self) -> Dict[str, Any]: + """ + Convert object into a json dictionary. + + Returns + ------- + Dict[str, Any] + The instance of the class converted into a json dictionary + """ + json_dict = {} + add_attrib(json_dict, self, 'action_type', 'actionType') + add_datetime_attrib(json_dict, self, 'fire_at', 'fireAt') + add_attrib(json_dict, self, 'is_cancelled', 'isCanceled') + return json_dict diff --git a/azure/durable_functions/models/actions/__init__.py b/azure/durable_functions/models/actions/__init__.py index e7d002d2..9ee471f0 100644 --- a/azure/durable_functions/models/actions/__init__.py +++ b/azure/durable_functions/models/actions/__init__.py @@ -5,6 +5,7 @@ from .CallActivityWithRetryAction import CallActivityWithRetryAction from .WaitForExternalEventAction import WaitForExternalEventAction from .CallHttpAction import CallHttpAction +from .CreateTimerAction import CreateTimerAction __all__ = [ 'Action', @@ -12,5 +13,6 @@ 'CallActivityAction', 'CallActivityWithRetryAction', 'CallHttpAction', - 'WaitForExternalEventAction' + 'WaitForExternalEventAction', + 'CreateTimerAction' ] diff --git a/azure/durable_functions/tasks/__init__.py b/azure/durable_functions/tasks/__init__.py index 334c6cb8..65fdd612 100644 --- a/azure/durable_functions/tasks/__init__.py +++ b/azure/durable_functions/tasks/__init__.py @@ -8,6 +8,7 @@ from .continue_as_new import continue_as_new from .new_uuid import new_uuid from .call_http import call_http +from .create_timer import create_timer_task __all__ = [ 'call_activity_task', @@ -18,5 +19,6 @@ 'task_all', 'task_any', 'should_suspend', - 'wait_for_external_event_task' + 'wait_for_external_event_task', + 'create_timer_task' ] diff --git a/azure/durable_functions/tasks/create_timer.py b/azure/durable_functions/tasks/create_timer.py new file mode 100644 index 00000000..e775790b --- /dev/null +++ b/azure/durable_functions/tasks/create_timer.py @@ -0,0 +1,41 @@ +from typing import List +from ..models.actions.CreateTimerAction import CreateTimerAction +from ..models.history import HistoryEvent +from .task_utilities import find_task_timer_created, find_task_retry_timer_fired, set_processed +import datetime +from .timer_task import TimerTask + + +def create_timer_task(state: List[HistoryEvent], + fire_at: datetime) -> TimerTask: + """Durable Timers are used in orchestrator function to implement delays. + + Parameters + ---------- + state : List[HistoryEvent] + The list of history events to search to determine the current state of the activity + fire_at : datetime + The time interval to fire the timer trigger + + Returns + ------- + TimerTask + A Durable Timer Task that schedules the timer to wake up the activity + """ + new_action = CreateTimerAction(fire_at) + + timer_created = find_task_timer_created(state, fire_at) + timer_fired = find_task_retry_timer_fired(state, timer_created) + + set_processed([timer_created, timer_fired]) + + if timer_fired: + return TimerTask( + is_completed=True, action=new_action, + timestamp=timer_fired.timestamp, + id_=timer_fired.event_id) + else: + return TimerTask( + is_completed=False, action=new_action, + timestamp=None, + id_=None) diff --git a/azure/durable_functions/tasks/task_utilities.py b/azure/durable_functions/tasks/task_utilities.py index e68717df..7323c400 100644 --- a/azure/durable_functions/tasks/task_utilities.py +++ b/azure/durable_functions/tasks/task_utilities.py @@ -1,5 +1,6 @@ import json from ..models.history import HistoryEventType +from ..constants import DATETIME_STRING_FORMAT from azure.functions._durable_functions import _deserialize_custom_object @@ -118,6 +119,29 @@ def find_task_failed(state, scheduled_task): return tasks[0] +def find_task_timer_created(state, fire_at): + """Locate the Timer Created Task. + + Within the state passed, search for an event that has hasn't been processed, + is a timer created task type, + and has the an event id that is one higher then Scheduled Id of the provided + failed task provided. + """ + if fire_at is None: + return None + + tasks = [] + for e in state: + if e.event_type == HistoryEventType.TIMER_CREATED and hasattr(e, "FireAt"): + if e.FireAt == fire_at.strftime(DATETIME_STRING_FORMAT): + tasks.append(e) + + if len(tasks) == 0: + return None + + return tasks[0] + + def find_task_retry_timer_created(state, failed_task): """Locate the Timer Created Task. diff --git a/azure/durable_functions/tasks/timer_task.py b/azure/durable_functions/tasks/timer_task.py new file mode 100644 index 00000000..0eaca181 --- /dev/null +++ b/azure/durable_functions/tasks/timer_task.py @@ -0,0 +1,47 @@ +from ..models.Task import Task + + +class TimerTask(Task): + """Represents a pending timer. + + All pending timers must be completed or canceled for an orchestration to complete. + + Example: Cancel a timer + ``` + timeout_task = context.df.create_timer(expiration_date) + if not timeout_task.is_completed(): + timeout_task.cancel() + ``` + """ + + def __init__(self, action, is_completed, timestamp, id_): + self._action = action + self._is_completed = is_completed + self._timestamp = timestamp + self._id = id_ + + super().__init__(self._is_completed, False, + self._action, None, self._timestamp, self._id, None) + + def is_cancelled(self) -> bool: + """Check of a timer is cancelled. + + Returns + ------- + bool + Returns whether a timer has been cancelled or not + """ + return self._action.is_cancelled + + def cancel(self): + """Cancel a timer. + + Raises + ------ + ValueError + Raises an error if the task is already completed and an attempt is made to cancel it + """ + if not self._is_completed: + self._action.is_cancelled = True + else: + raise ValueError("Cannot cancel a completed task.") diff --git a/samples/aml_monitoring/.funcignore b/samples/aml_monitoring/.funcignore new file mode 100644 index 00000000..0bba1b1f --- /dev/null +++ b/samples/aml_monitoring/.funcignore @@ -0,0 +1,5 @@ +.git* +.vscode +local.settings.json +test +py36 \ No newline at end of file diff --git a/samples/aml_monitoring/.gitignore b/samples/aml_monitoring/.gitignore new file mode 100644 index 00000000..9e3e052e --- /dev/null +++ b/samples/aml_monitoring/.gitignore @@ -0,0 +1,44 @@ +bin +obj +csx +.vs +edge +Publish + +*.user +*.suo +*.cscfg +*.Cache +project.lock.json + +/packages +/TestResults + +/tools/NuGet.exe +/App_Data +/secrets +/data +.secrets +appsettings.json +local.settings.json + +node_modules +dist + +# Local python packages +.python_packages/ + +# Python Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class +py36 \ No newline at end of file diff --git a/samples/aml_monitoring/.vscode/extensions.json b/samples/aml_monitoring/.vscode/extensions.json new file mode 100644 index 00000000..dde673dc --- /dev/null +++ b/samples/aml_monitoring/.vscode/extensions.json @@ -0,0 +1,5 @@ +{ + "recommendations": [ + "ms-azuretools.vscode-azurefunctions" + ] +} \ No newline at end of file diff --git a/samples/aml_monitoring/aml_durable_orchestrator/__init__.py b/samples/aml_monitoring/aml_durable_orchestrator/__init__.py new file mode 100644 index 00000000..115814ca --- /dev/null +++ b/samples/aml_monitoring/aml_durable_orchestrator/__init__.py @@ -0,0 +1,45 @@ +import logging,json +import azure.durable_functions as df +from datetime import datetime,timedelta + +def orchestrator_fn(context: df.DurableOrchestrationContext): + pipeline_endpoint = "" + experiment_name = "" + + # Step 1: Kickoff the AML pipeline + input_args= {} + input_args["pipeline_endpoint"] = pipeline_endpoint + input_args["experiment_name"] = experiment_name + input_args["params"] = None + run_id = yield context.call_activity("aml_pipeline",input_args) + polling_interval = 60 + expiry_time = context.current_utc_datetime + timedelta(minutes=30) + + # Consider continueAsNew - use this in the samples + # while loop explodes the history table on high scale + while context.current_utc_datetime < expiry_time: + + # Step 2: Poll the status of the pipeline + poll_args = {} + poll_args["run_id"] = run_id + poll_args["experiment_name"] = experiment_name + job_status = yield context.call_activity("aml_poll_status",poll_args) + + # Use native Dictionary fix the generic binding conversion in worker. Can it return a Dict? + activity_status = json.loads(job_status) + if activity_status["status_code"] == 202: + next_check = context.current_utc_datetime + timedelta(minutes=1) + + # Set intermediate status for anyone who wants to poll this durable function + context.set_custom_status(activity_status) + + yield context.create_timer(next_check) + + elif activity_status["status_code"] == 500: + job_completed = True + raise Exception("AML Job Failed/Cancelled...") + else: + job_completed = True + return activity_status + +main = df.Orchestrator.create(orchestrator_fn) diff --git a/samples/aml_monitoring/aml_durable_orchestrator/function.json b/samples/aml_monitoring/aml_durable_orchestrator/function.json new file mode 100644 index 00000000..46a44c50 --- /dev/null +++ b/samples/aml_monitoring/aml_durable_orchestrator/function.json @@ -0,0 +1,11 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "name": "context", + "type": "orchestrationTrigger", + "direction": "in" + } + ], + "disabled": false +} diff --git a/samples/aml_monitoring/aml_pipeline/__init__.py b/samples/aml_monitoring/aml_pipeline/__init__.py new file mode 100644 index 00000000..fe7c3400 --- /dev/null +++ b/samples/aml_monitoring/aml_pipeline/__init__.py @@ -0,0 +1,33 @@ +import logging,json +import os +import time +from typing import Dict +import requests +import azure.functions as func +from azureml.core.authentication import ServicePrincipalAuthentication + +from ..shared.auth_helper import get_access_token + + +def trigger_aml_endpoint(pipeline_endpoint, experiment_name, parameter_body, retries=3): + aad_token = get_access_token() + response = requests.post( + pipeline_endpoint, + headers=aad_token, + json={"ExperimentName": experiment_name, + "ParameterAssignments": parameter_body}) + + if response.status_code == 200: + success = True + + return json.loads(response.content) + +# explicitly typing input_args causes exception +def main(name): + input_args = json.loads(name) + try: + response = trigger_aml_endpoint(input_args["pipeline_endpoint"], input_args["experiment_name"], input_args["params"]) + except Exception as exception: + logging.error("Got exception: ", exc_info=True) + return exception + return response["Id"] diff --git a/samples/aml_monitoring/aml_pipeline/function.json b/samples/aml_monitoring/aml_pipeline/function.json new file mode 100644 index 00000000..186f3e7e --- /dev/null +++ b/samples/aml_monitoring/aml_pipeline/function.json @@ -0,0 +1,12 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "name": "name", + "type": "activityTrigger", + "direction": "in", + "datatype": "string" + } + ], + "disabled": false +} \ No newline at end of file diff --git a/samples/aml_monitoring/aml_poll_status/__init__.py b/samples/aml_monitoring/aml_poll_status/__init__.py new file mode 100644 index 00000000..34ab4b3b --- /dev/null +++ b/samples/aml_monitoring/aml_poll_status/__init__.py @@ -0,0 +1,50 @@ +import logging,json +import os +import time +from typing import Dict +import azure.functions as func +import requests +from azureml.core import Experiment, Workspace +from azureml.core.authentication import ServicePrincipalAuthentication +from azureml.pipeline.core import PipelineRun + +from ..shared.aml_helper import get_run_url_from_env, get_run_logs +from ..shared.auth_helper import get_service_principal_auth + +_SUBSCRIPTION_ID_ENV_NAME = "SubscriptionId" +_RESOURCE_GROUP_NAME_ENV_NAME = "ResourceGroupName" +_AML_WORKSPACE_NAME_ENV_NAME = "AMLWorkspaceName" + + +def get_aml_pipeline_run_status(run_id, experiment_name, retries=3): + + svc_pr = get_service_principal_auth() + workspace = Workspace( + subscription_id=os.environ[_SUBSCRIPTION_ID_ENV_NAME], + resource_group=os.environ[_RESOURCE_GROUP_NAME_ENV_NAME], + workspace_name=os.environ[_AML_WORKSPACE_NAME_ENV_NAME], + auth=svc_pr) + + experiment = Experiment(workspace, experiment_name) + pipeline_run = PipelineRun(experiment, run_id) + + response = pipeline_run.get_status() + return response + + +def main(name): + input_args = json.loads(name) + run_id = input_args["run_id"] + experiment_name = input_args["experiment_name"] + status = get_aml_pipeline_run_status(run_id,experiment_name) + run_url = get_run_url_from_env(run_id,experiment_name) + run_logs = get_run_logs(run_id,experiment_name) + status_code_map = {"Finished":200,"Failed":500,"Cancelled":500} + + response_obj = { + "status" : status, + "url" : run_url, + "logs" : run_logs, + "status_code": status_code_map[status] if status in status_code_map else 202 + } + return json.dumps(response_obj) diff --git a/samples/aml_monitoring/aml_poll_status/function.json b/samples/aml_monitoring/aml_poll_status/function.json new file mode 100644 index 00000000..e5e17caf --- /dev/null +++ b/samples/aml_monitoring/aml_poll_status/function.json @@ -0,0 +1,12 @@ +{ + "scriptFile": "__init__.py", + "bindings": [ + { + "name": "name", + "type": "activityTrigger", + "direction": "in", + "datatype": "string" + } + ], + "disabled": false +} diff --git a/samples/aml_monitoring/extensions.csproj b/samples/aml_monitoring/extensions.csproj new file mode 100644 index 00000000..0d947353 --- /dev/null +++ b/samples/aml_monitoring/extensions.csproj @@ -0,0 +1,11 @@ + + + netstandard2.0 + + ** + + + + + + diff --git a/samples/aml_monitoring/host.json b/samples/aml_monitoring/host.json new file mode 100644 index 00000000..83a92167 --- /dev/null +++ b/samples/aml_monitoring/host.json @@ -0,0 +1,3 @@ +{ + "version": "2.0" +} \ No newline at end of file diff --git a/samples/aml_monitoring/proxies.json b/samples/aml_monitoring/proxies.json new file mode 100644 index 00000000..b385252f --- /dev/null +++ b/samples/aml_monitoring/proxies.json @@ -0,0 +1,4 @@ +{ + "$schema": "http://json.schemastore.org/proxies", + "proxies": {} +} diff --git a/samples/aml_monitoring/requirements.txt b/samples/aml_monitoring/requirements.txt new file mode 100644 index 00000000..dc725cb2 --- /dev/null +++ b/samples/aml_monitoring/requirements.txt @@ -0,0 +1,2 @@ +azure-functions +azureml-sdk>=1.0.45 diff --git a/samples/aml_monitoring/shared/__init__.py b/samples/aml_monitoring/shared/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/samples/aml_monitoring/shared/aml_helper.py b/samples/aml_monitoring/shared/aml_helper.py new file mode 100644 index 00000000..3202cf4a --- /dev/null +++ b/samples/aml_monitoring/shared/aml_helper.py @@ -0,0 +1,85 @@ +import os + +# The imports below are only needed when the logic in get_run_logs +# is reapplied. +# from azureml.core import Experiment, Workspace +# from azureml.pipeline.core import PipelineRun + +# from ..shared.auth_helper import get_service_principal_auth + +_SUBSCRIPTION_ID_ENV_NAME = "SubscriptionId" +_RESOURCE_GROUP_NAME_ENV_NAME = "ResourceGroupName" +_AML_WORKSPACE_NAME_ENV_NAME = "AMLWorkspaceName" + +_RUN_URL_TEMPLATE = ( + "https://mlworkspace.azure.ai/portal/subscriptions/{0}" + "/resourceGroups/{1}/providers/Microsoft.MachineLearningServices" + "/workspaces/{2}/experiments/{3}/runs/{4}" +) + +def get_run_url_from_env(run_id: str, experiment_name: str): + """Retrieves the appropriate environment variables. + Uses an run url template and formats that string with the appropriate parameters. + Return a string containing an run url based on + function params and environment variables. + + Arguments: + run_id string -- The string representation of the experiments run id + experiment_name string -- The string representation of the experiments name + + Returns: + string -- The url string that points to the current run params. + """ + if not run_id or not experiment_name: + raise ValueError("Missing required param") + subscription_id = os.environ.get(_SUBSCRIPTION_ID_ENV_NAME) + resource_group_name = os.environ.get(_RESOURCE_GROUP_NAME_ENV_NAME) + aml_workspace_name = os.environ.get(_AML_WORKSPACE_NAME_ENV_NAME) + + return _RUN_URL_TEMPLATE.format(subscription_id, resource_group_name, \ + aml_workspace_name, experiment_name, run_id) + + +def get_run_logs(run_id: str, experiment_name: str): + """Retrieves the appropriate environment variables. + Retrieves steps for the experiments pipeline run. + Builds a dictionary of logs for each step by the steps id. + + Arguments: + run_id string -- The string representation of the experiments run id + experiment_name string -- The string representation of the experiments name + + Returns: + dictionary -- A dictionary containing logs for pipeline run + steps keyed by the steps run id. + """ + + if not run_id or not experiment_name: + raise ValueError("Missing required param") + + # Commenting out code due to a bug in the adf pipeline that doesn't allow + # us to properly set the aml sdk version. This bug is results in the + # PipelineRun to return Run objects instead of StepRun objects when + # the get_steps object is called. Since Run objects don't have the + # get_job_log method, the code errors out when running via adf, but not + # when testing locally. When this bug is resolved, this code can be un + # commented and redeployed. + + # svc_pr = get_service_principal_auth() + # workspace = Workspace( + # subscription_id=os.environ[_SUBSCRIPTION_ID_ENV_NAME], + # resource_group=os.environ[_RESOURCE_GROUP_NAME_ENV_NAME], + # workspace_name=os.environ[_AML_WORKSPACE_NAME_ENV_NAME], + # auth=svc_pr) + + # experiment = Experiment(workspace, experiment_name) + # pipeline_run = PipelineRun(experiment, run_id) + # run_steps = list(pipeline_run.get_steps()) + + # iterate over steps to get logs + run_log_dict = dict() + # for step in run_steps: + # j_log = step.get_job_log() + # run_log_dict[str(step.id)] = j_log + + return run_log_dict diff --git a/samples/aml_monitoring/shared/auth_helper.py b/samples/aml_monitoring/shared/auth_helper.py new file mode 100644 index 00000000..c8ff8ecf --- /dev/null +++ b/samples/aml_monitoring/shared/auth_helper.py @@ -0,0 +1,34 @@ +import os +import logging +import time + +from azureml.core.authentication import ServicePrincipalAuthentication + +_TENANT_ID_ENV_NAME = "TenantId" +_SERVICE_PRINCIPAL_ID_ENV_NAME = "ServicePrincipalId" +_SERVICE_PRINCIPAL_SECRET_ENV_NAME = "ServicePrincipalSecret" + + +def get_service_principal_auth(): + tenant_id = os.environ[_TENANT_ID_ENV_NAME] + service_principal_id = os.environ[_SERVICE_PRINCIPAL_ID_ENV_NAME] + service_principal_password = os.environ[_SERVICE_PRINCIPAL_SECRET_ENV_NAME] + + svc_pr = ServicePrincipalAuthentication( + tenant_id=tenant_id, + service_principal_id=service_principal_id, + service_principal_password=service_principal_password) + + return svc_pr + + +def get_access_token(): + start_time = time.time() + + svc_pr = get_service_principal_auth() + aad_token = svc_pr.get_authentication_header() + + end_time = time.time() + + logging.info('Get Access Token Time: %s seconds', end_time - start_time) + return aad_token