In [None]:
import json
from unskript import nbparams
from unskript.fwk.workflow import Task, Workflow
from unskript.secrets import ENV_MODE, ENV_MODE_LOCAL

env = {"ENV_MODE": "ENV_MODE_LOCAL"}
secret_store_cfg = {"SECRET_STORE_TYPE": "SECRET_STORE_TYPE_LOCAL"}

paramDict = {"Channel": None, "ClusterName": None, "Region": None, "ServiceName": None}
unSkriptOutputParamDict = {}
paramDict.update(env)
paramDict.update(secret_store_cfg)
paramsJson = json.dumps(paramDict)
nbParamsObj = nbparams.NBParams(paramsJson)
Channel = nbParamsObj.get('Channel')
ClusterName = nbParamsObj.get('ClusterName')
Region = nbParamsObj.get('Region')
ServiceName = nbParamsObj.get('ServiceName')
w = Workflow(env, secret_store_cfg, None, global_vars=globals())

<p><img src="https://unskript.com/assets/favicon.png" alt="unSkript.com" width="100" height="100"></p>
<h1 id="-unSkript-Runbooks-">unSkript Runbooks <a class="jp-InternalAnchorLink" href="#-unSkript-Runbooks-" target="_self">&para;</a></h1>
<div class="alert alert-block alert-success"><strong>&nbsp;This runbook demonstrates how to detect failed ECS deployments.</strong></div>
<p>&nbsp;</p>
<center>
<h2 id="Detect-Failed-ECS-Deployment">Detect failed ECS deployments<a class="jp-InternalAnchorLink" href="#Detect-Failed-ECS-Deployment" target="_self">&para;</a></h2>
</center>
<h2 id="Steps-Overview">Steps Overview<a class="jp-InternalAnchorLink" href="#Steps-Overview" target="_self">&para;</a></h2>
<ol>
<li>Find out the deployments in progress.</li>
<li>Figure out the tasks, which are in STOPPED state because of these deployments.</li>
<li>Post a Slack Message with the list of failed tasks and reason.</li>
</ol>

In [None]:
##
##  Copyright (c) 2021 unSkript, Inc
##  All rights reserved.
##
from pydantic import BaseModel, Field
from typing import List

from beartype import beartype
@beartype
def aws_ecs_detect_failed_deployment(handle, cluster_name: str, service_name: str, region: str) -> List:
    ecsClient = handle.client('ecs', region_name=region)
    try:
        serviceStatus = ecsClient.describe_services(cluster=cluster_name, services=[service_name])
    except Exception as e:
        print(f'Failed to get service status for {service_name}, cluster {cluster_name}, {e}')
        return None
    # When the deployment is in progress, there will be 2 deployment entries, one PRIMARY and one ACTIVE. PRIMARY will eventually replace
    # ACTIVE, if its successful.
    deployments = serviceStatus.get('services')[0].get('deployments')
    if deployments is None:
        print("Empty deployment")
        return None

    deploymentInProgress = False
    for deployment in deployments:
        if deployment['status'] == "PRIMARY":
            primaryDeploymentID = deployment['id']
        else:
            deploymentInProgress = True

    if deploymentInProgress is False:
        print("No deployment in progress")
        return None

    # Check if there are any stopped tasks because of this deployment
    stoppedTasks = ecsClient.list_tasks(cluster=cluster_name, startedBy=primaryDeploymentID, desiredStatus="STOPPED").get('taskArns')
    if len(stoppedTasks) == 0:
        print(f'No stopped tasks associated with the deploymentID {primaryDeploymentID}, service {service_name}, cluster {cluster_name}')
        return

    # Get the reason for the stopped tasks
    taskDetails = ecsClient.describe_tasks(cluster=cluster_name, tasks=stoppedTasks)
    output = []
    for taskDetail in taskDetails.get('tasks'):
        output.append({"TaskARN":taskDetail['taskArn'], "StoppedReason":taskDetail['stoppedReason']})
    return output




task = Task(Workflow())
task.configure(printOutput=True)
task.configure(inputParamsJson='''{
    "cluster_name": "ClusterName",
    "region": "Region",
    "service_name": "ServiceName"
    }''')

(err, hdl, args) = task.validate(vars=vars())
if err is None:
    task.output = task.execute(aws_ecs_detect_failed_deployment, hdl=hdl, args=args)
    if task.output_name != None:
        globals().update({task.output_name: task.output[0]})

if hasattr(task, 'output'):
    if isinstance(task.output, (list, tuple)):
        for item in task.output:
            print(f'item: {item}')
    elif isinstance(task.output, dict):
        for item in task.output.items():
            print(f'item: {item}')
    else:
        print(f'Output for {task.name}')
        print(task.output)
    w.tasks[task.name]= task.output

## 2 Construct List of failed deployment

Here we gather the output from the previous cell execution and iterate over it to find out which Tasks
failed to run, the reason of the failure.

In [5]:
from tabulate import tabulate
message = ""
if len(task.output) > 0:
    tasks = []
    for i in task.output:
        tasks.append([i.get('TaskARN'), i.get('StoppedReason')])
    message = f'Stopped tasks in cluster {ClusterName}, service {ServiceName} \n {tabulate(tasks, headers=["TaskARN", "Stopped Reason"], tablefmt="grid")}'


## 3 Post Slack Message

We post the failed list of deployments on to the given Slack Channel

In [None]:
##
# Copyright (c) 2021 unSkript, Inc
# All rights reserved.
##

import pprint

from pydantic import BaseModel, Field
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError

pp = pprint.PrettyPrinter(indent=2)


from beartype import beartype
def legoPrinter(func):
    def Printer(*args, **kwargs):
        output = func(*args, **kwargs)
        if output:
            channel = kwargs["channel"]
            pp.pprint(print(f"Message sent to Slack channel {channel}"))
        return output
    return Printer


@legoPrinter
@beartype
def slack_post_message(
        handle: WebClient,
        channel: str,
        message: str) -> bool:

    try:
        response = handle.chat_postMessage(
            channel=channel,
            text=message)
        return True
    except SlackApiError as e:
        print("\n\n")
        pp.pprint(
            f"Failed sending message to slack channel {channel}, Error: {e.response['error']}")
        return False
    except Exception as e:
        print("\n\n")
        pp.pprint(
            f"Failed sending message to slack channel {channel}, Error: {e.__str__()}")
        return False


task = Task(Workflow())
task.configure(printOutput=True)
task.configure(inputParamsJson='''{
    "channel": "Channel",
    "message": "message"
    }''')

(err, hdl, args) = task.validate(vars=vars())
if err is None:
    task.output = task.execute(slack_post_message, hdl=hdl, args=args)
    if task.output_name != None:
        globals().update({task.output_name: task.output[0]})

## Conclusion

In this runbook we saw how easy it is to piece together a Runbook with pre-built and custom-legos that can achieve the task of identifying the failed Deployments and posting it on Slack. To learn more about the full capability of unSkript platform please visit https://us.app.unskript.io 