### Node.js Crash Detection

We operationalized the template and have been performing advanced analytics on telemetry data of one of our own services.

Our service runs four Node.js processes on each cloud instance. From root cause analysis, we have noticed cases of Node.js crashes. However, due to limitations of the SDK, we cannot log when the crash occurs. So, we created a Jupyter Notebook to analyze the existing telemetry data to detect Node.js crashes.

A custom event NodeProcessStarted is logged when a new Node.js process starts in a cloud instance. Normally, all four processes start nearly simultaneously when they are recycled every 8-11 hours. So, when we see less than four NodeProcessStarted events occur at a different frequency, we can infer that new process(es) started to replace recently crashed process(es).

In this implemented template, you will see how we query for telemetry data, analyze the data, query for more telemetry data to enrich the analysis, and then send the derived data back to Application Insights.

#### Install packages
*exclude the below cell when uploading to Azure App Service*

In [13]:
%%capture 

!pip install --upgrade pip
!pip install --upgrade applicationinsights-jupyter
!pip install applicationinsights

#### Imports

In [14]:
import requests
import ast
from operator import itemgetter
from datetime import datetime
from datetime import timedelta
from applicationinsights_jupyter import Jupyter
from applicationinsights import TelemetryClient

#### Constants

In [15]:
API_URL = "https://api.aimon.applicationinsights.io/"
APP_ID = "REDACTED"
API_KEY = "REDACTED"
IKEY = "REDACTED"
RECYCLE_FREQUENCY_LOWER_BOUND = timedelta(hours=8) 
RECYCLE_FREQUENCY_UPPER_BOUND = timedelta(hours=11) 
NUM_PROCESSES = 4

#### Utility functions

In [16]:
def makeQuery(queryString): 
    """ Make a query to Application Insights with the specified queryString. 
        
    Returns: 
        A list of query results. 
    """
    jupyterObj = Jupyter(APP_ID, API_KEY, API_URL)
    jupyterObjData = jupyterObj.getAIData(queryString)
    return jupyterObjData["Rows"]

def parseTimestamp(timestamp): 
    """         
    Args: 
        timestamp (unicode) in '%Y-%m-%dT%H:%M:%SZ' format
        
    Returns: 
        A struct_time representation of the spcified timestamp.
    """
    return datetime.strptime(timestamp, '%Y-%m-%dT%H:%M:%SZ')
    
def formatDatetime(time): 
    """         
    Args: 
        time (struct_time)
        
    Returns: 
        A string representation of the spcified time in '%Y-%m-%dT%H:%M:%SZ' format.
    """
    return time.strftime('%Y-%m-%dT%H:%M:%SZ')

#### Check for PID changes
check how many NodeProcessStarted customEvents occured in 1 min duration in each cloud instance in each geo

In [17]:
def getEvents():
    """ Query for NodeProcessStarted customEvents that occured between 10 minutes (inclusive) and 5 minutes (exclusive) ago, 
            grouped in 1 min durations by geo and cloud instance
        
    Returns: 
        If successful, return list of events. Otherwise, return None
    """
    queryString = "customEvents\
    | where timestamp >= ago(10m) and timestamp < ago(5m)\
    | where name == 'NodeProcessStarted'\
    | summarize pids=makeset(tostring(customDimensions.PID)) by cloud_RoleName, cloud_RoleInstance, bin(timestamp, 1m)\
    | extend count=arraylength(pids)"
    try:  
        queryResult = makeQuery(queryString)
        return queryResult
    except requests.RequestException, e:
        return None 
    
events = getEvents()   

#### Investigate the cause for each PID change & send derived data back to Application Insights
Causes include
* scheduled recycle: every 8-11 hours, with some randomization, all four processes are recycled nearly simultaneously. 
* new instance: new cloud instances are spun up as necessary to handle request traffic. Then, four processes are started nearly simultaneously. 
* machine reboot: occasionally, the cloud instance is rebooted before scheduled recycling. Then, four processes are started nearly simultaneously.
* crash: one or more process may crash and be replaced by another

In [18]:
def getNumCrashes(events): 
    """ 
    Returns: 
        An int of the number of crashes. 
    """
    count = 0
    for event in events: 
        if event[4] != NUM_PROCESSES: 
            count += 1
    return count

def getPIDs(cloud_RoleName, cloud_RoleInstance, startTimestamp, endTimestamp): 
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        startTimestamp (str or unicode)
        endTimestamp (str or unicode)
    
    Returns: 
        A list of PIDs used to process requests from specified startTimestamp to endTimestamp in cloud_RoleInstace in cloud_RoleName. 
    """
    queryString = "requests\
    | where cloud_RoleName == '{0}' and cloud_RoleInstance == '{1}'".format(unicode(cloud_RoleName), unicode(cloud_RoleInstance)) + \
    "| where timestamp >= datetime('{0}') and timestamp < datetime('{1}')".format(unicode(startTimestamp), unicode(endTimestamp)) + \
    "| summarize pids=makeset(tostring(customDimensions.PID))"
    queryResult = makeQueryBackup(queryString)    
    pids = queryResult[0][0]
    return [str(i) for i in ast.literal_eval(pids)]

def getPreCrashPIDs(cloud_RoleName, cloud_RoleInstance, timestamp): 
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        timestamp (unicode)
    
    Returns: 
        A list of PIDs used to process requests before the crash at specified timestamp in cloud_RoleInstace in cloud_RoleName. 
    """
    previousPIDChangeTimestamp = getPreviousPIDChangeTimestamp(cloud_RoleName, cloud_RoleInstance, timestamp)
    startTimestamp = formatDatetime(parseTimestamp(previousPIDChangeTimestamp) + timedelta(minutes=1))
    return getPIDs(cloud_RoleName, cloud_RoleInstance, startTimestamp, timestamp)

def getPostCrashPIDs(cloud_RoleName, cloud_RoleInstance, timestamp): 
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        timestamp (unicode)
    
    Returns: 
        A list of PIDs used to process requests after the crash at specified timestamp in cloud_RoleInstace in cloud_RoleName. 
    """
    startTimestamp = formatDatetime(parseTimestamp(timestamp) + timedelta(minutes=1)) 
    endTimestamp = formatDatetime(parseTimestamp(timestamp) + timedelta(minutes=3))
    return getPIDs(cloud_RoleName, cloud_RoleInstance, startTimestamp, endTimestamp)

def getPreviousScheduledRecycleTimestamp(cloud_RoleName, cloud_RoleInstance, timestamp): 
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        timestamp (unicode)
    
    Returns: 
        A unicode timestamp of the last scheduled recyle before specified timestamp in cloud_RoleInstace in cloud_RoleName. 
    """
    queryString = "customEvents\
    | where name=='NodeProcessStarted'\
    | where cloud_RoleName == '{0}' and cloud_RoleInstance == '{1}'".format(unicode(cloud_RoleName), unicode(cloud_RoleInstance)) + \
    "| where timestamp < datetime('{0}')".format(unicode(timestamp)) + \
    "| summarize pids=makeset(tostring(customDimensions.PID)) by bin(timestamp, 1m)\
    | extend numPids=arraylength(pids)\
    | sort by numPids desc, timestamp desc\
    | take 1"
    return makeQueryBackup(queryString)[0][0]

def getPreviousPIDChangeTimestamp(cloud_RoleName, cloud_RoleInstance, timestamp): 
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        timestamp (unicode)
    
    Returns: 
        A unicode timestamp of the last PID change before specified timestamp in cloud_RoleInstace in cloud_RoleName. 
    """
    queryString = "customEvents\
    | where timestamp < datetime('{0}')".format(unicode(timestamp)) + \
    "| where name == 'NodeProcessStarted'\
    | where cloud_RoleName == '{0}' and cloud_RoleInstance == '{1}'".format(unicode(cloud_RoleName), unicode(cloud_RoleInstance)) + \
    "| top 1 by timestamp desc\
    | project bin(timestamp, 1m)"
    return makeQueryBackup(queryString)[0][0]

def getCrashDetails(cloud_RoleName, cloud_RoleInstance, timestamp):
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        timestamp (unicode)
    
    Returns: 
        A dictionary consisting of PIDs used before and after the crash at specified timestamp in cloud_RoleInstance in cloud_RoleName, as well as stable and crashed PIDs. 
    """
    preCrashPIDs = getPreCrashPIDs(cloud_RoleName, cloud_RoleInstance, timestamp)
    postCrashPIDs = getPostCrashPIDs(cloud_RoleName, cloud_RoleInstance, timestamp)
    crashedPIDs = list(set(preCrashPIDs) - set(postCrashPIDs))
    stablePIDs = list(set(preCrashPIDs) & set(postCrashPIDs))
    return {
        "previous PIDs" : str(preCrashPIDs),
        "stable PIDs" : str(stablePIDs),
        "crashed PIDs" : str(crashedPIDs),
        "new PIDs" : str(PIDs)  
    }

def getPIDChangeCause(cloud_RoleName, cloud_RoleInstance, timestamp, PIDs):
    """     
    Args: 
        cloud_RoleName (str)
        cloud_RoleInstance (str)
        timestamp (unicode)
        PIDs (list)
    
    Returns: 
        A dictionary consisting of details of the cause for PID change. 
    """
    causeDetails = {
        "cloud_RoleName": cloud_RoleName,
        "cloud_RoleInstance": cloud_RoleInstance,
        "current PIDs": str(PIDs), 
        "timestamp": str(timestamp),
    }
    if len(PIDs) == NUM_PROCESSES: 
        previousScheduledRecycleTimestamp = getPreviousScheduledRecycleTimestamp(cloud_RoleName, cloud_RoleInstance, timestamp) 
        causeDetails["time of last scheduled recycle"] = str(previousScheduledRecycleTimestamp)
        timeDiff = parseTimestamp(timestamp) - parseTimestamp(previousScheduledRecycleTimestamp)
        if timeDiff >= RECYCLE_FREQUENCY_LOWER_BOUND and timeDiff <= RECYCLE_FREQUENCY_UPPER_BOUND: 
            causeDetails["cause"] = "SCHEDULED RECYCLE" 
            return causeDetails
        elif timeDiff > RECYCLE_FREQUENCY_UPPER_BOUND: 
            causeDetails["cause"] = "NEW INSTANCE" 
            return causeDetails
        else: 
            causeDetails["cause"] = "MACHINE REBOOT" 
            return causeDetails
    else: 
        causeDetails.update(getCrashDetails(cloud_RoleName, cloud_RoleInstance, timestamp))
        causeDetails["cause"] = "CRASH"
        return causeDetails
    
tc = TelemetryClient(IKEY)

if events == None: 
    pass
elif len(events) == 0: 
    tc.track_metric("crashCount", 0)
    tc.flush()
    pass
else:
    tc.track_metric("crashCount", getNumCrashes(events))
    tc.flush()
    events = sorted(events, key=itemgetter(0, 1, 2))
    for event in events: 
        cloud_RoleName = event[0]
        cloud_RoleInstance = event[1]
        timestamp = event[2]
        PIDs = [str(i) for i in ast.literal_eval(event[3])]  
        try: 
            cause = getPIDChangeCause(cloud_RoleName, cloud_RoleInstance, timestamp, PIDs)
        except requests.RequestException, e:
            break
        tc.track_metric(cause["cause"], 1, properties=cause)
        tc.flush()