# Getting familiar with spark - parsing YARN logs

# 1. Imports and spark config

In [1]:
# start a pyspark instance
import pyspark
import pyspark.sql
from pyspark.sql import *
from pyspark.sql.functions import *

conf = pyspark.SparkConf().setMaster("local[*]").setAll([
                                   ('spark.executor.memory', '12g'),  # find
                                   ('spark.driver.memory','4g'), # your
                                   ('spark.driver.maxResultSize', '2G') # setup
                                  ])
# create the session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# create the context
sc = spark.sparkContext

# FIX for Spark 2.x
locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))



In [2]:
import re
import pandas as pd
import sys

# 2. Loading the data - parsing the YARN ressource manager log

The schema we want

* The application's id
* The user name
* The number of attempts made to run the app
* For each application's attempt,
    * The start time. We consider the start time to be when the appattempt state changes to LAUNCHED.
    * The end time. We consider the end time to be when the state of the appattempt changes from FINAL_SAVING to FINISHING/FAILED/KILLED.
    * The final status of the attempt defined by the state the appattempt transitioned to from FINAL_SAVING.
    * The list of containers requested and where they are hosted, sorted by container id.

In [3]:
# read file 
log_txt = sc.textFile("hadoop-yarn-resourcemanager-iccluster040.log")

We deconstruct the parsing of the data into four distinct parts, we try to get the info we want from different lines of the logs so we will obtain three RDDs as such. 

* First RDD
| ApplicationID | AttemptID | Starttime | Endtime | Final state | Number of attempts |
|---------------|-----------|-----------|---------|-------------|--------------------|
|               |           |           |         |             |                    |
* Second RDD
| ApplicationID | User |
|---------------|------|
|               |      |
* Third RDD 
| ApplicationID | AttemptID | ContainerID | Host name |
|---------------|-----------|-------------|-----------|
|               |           |             |           |

We then join all those RDDs into a single one

| ApplicationID | User | Number of attempts | Attempt Number | Starttime | Finishtime | Finish State | Containers |
|---------------|------|--------------------|----------------|-----------|------------|--------------|------------|
|               |      |                    |                |           |            |              |            |

####  a. Get applicationID, start time, endtime, states, number of attempts ...

In [4]:
#helper functions for parsing

def get_state(x):
    """
    Change our tuple structure to extract the meaningful state from the string
    For example we go from (LAUNCHED,LAUNCHED) to (LAUNCHED) and from (FINAL_SAVING,FINISHED) to (FINISHED)
    Parameters:
    x (Tuple): tuple containing app id - attempt id - and (state,state)
    Returns:
    x (Tuple): tuple reorganized app id - attempt id - (state)
    """
    if(x[2][0].startswith('LAUNCHED') or x[2][0].startswith('FINAL_SAVING')):
        return(x[0],x[1],x[2][1])
    

    
# Do some processing to get the intended output
def parse_attempt(x):
    """
    Change our tuple structure to get an output like the one required for the submission

    Parameters:
    x (Tuple): tuple containing app id - attempt id - and (state,state)
    Returns:
    x (Tuple): tuple reorganized
    """
    splitted = x[1][0].split('_')
    splitted[0]='application'
    return (('_'.join(splitted[:-1]),x[1][0],x[0],x[2]))



In [5]:
# First we filter to get all the attempts from the second epochs and while studying the logfile we saw that attempts are found in lines with 'attempt.RMAppAttemptImpl'
# We look for the states in the string, if it contains more than 1 instance of our keywords ( LAUNCHED/FINAL_SAVING/KILLED/FINISHING) we extract those states for later use
apps = log_txt.filter(lambda x : x.find('attempt.RMAppAttemptImpl')>-1 and x.find('1580812675067')>-1)\
              .map(lambda x: x.split())\
              .map(lambda x : (x[0]+' '+x[1],[attempt for attempt in x if attempt.startswith('appattempt')],\
                               [state for state in x if (state.startswith('LAUNCHED') or state.startswith('FINAL_SAVING') or state.startswith('KILLED') or state.startswith('FINISHING') or state.startswith('FAILED'))]))\
              .filter(lambda x: len(x[2])>1 and not(x[2][1].startswith('FINAL_SAVING')))\
              .map(lambda x: get_state(x))\
              .map(lambda x: parse_attempt(x))
                            

In [6]:
# We group the RDD according to its applicationid and attemptid so we could combine the start time and finish time of each attempt into a single row
apps = apps.map(lambda x: ((x[0],x[1]), x[2:])).groupByKey().mapValues(list).map(lambda x: (x[0][0],x[0][1],x[1][0][0],x[1][1][0],x[1][1][1]))

In [7]:
# calculate number of attempts
# creates a dict of application id - number of attempts 
number_attempts = apps.map(lambda x: (x[0],(x[1],x[2],x[3],x[4]) )).countByKey()


In [8]:
# get final apps RDD with number of attempts
apps = apps.map(lambda x: (x[0],x[1],x[2],x[3],x[4],number_attempts[x[0]]))

Structure of RDD obtained so far

| ApplicationID | AttemptID | Starttime | Endtime | Final state | Number of attempts |
|---------------|-----------|-----------|---------|-------------|--------------------|
|               |           |           |         |             |                    |

####  b. Get ApplicationID and users

In [9]:
# get user with app id 
users = log_txt.filter(lambda x : x.find('capacity.ParentQueue')>-1 and x.find('1580812675067')>-1)\
              .map(lambda x: x.split())\
              .map(lambda x: (x[10],x[12]))\
              .distinct()\


Structure of RDD 

| ApplicationID | User |
|---------------|------|
|               |      |

####  c. Get containers linked with attemptIDs/applicationIDs

In [10]:
# helper functions for parsing container info
def parse_container(x):
    """
    Change our tuple structure to get an output like the one required for the submission

    Parameters:
    x (Tuple): tuple containing some irrelevant data, waiting to get processed
    Returns:
    x (Tuple): tuple reorganized as such appid-attemptid-containerid-hostname
    """
    splitted_container = x[0].split('_')
    container_id = splitted_container[5]
    splitted_container.pop(5)
    
    #change the way attemptid is shown so that it is the same as before
    splitted_container[4]='0000'+splitted_container[4]
    
    #parse attempt id
    attemptid = splitted_container.copy()
    attemptid[0]='appattempt'
    attemptid.pop(1)
    attemptid = '_'.join(attemptid)
    
    #parse application id
    applicationid = splitted_container.copy()
    applicationid[0]='application'
    applicationid.pop(1)
    applicationid = '_'.join(applicationid[:-1])
    
    
    splitted_host = x[1].split(':')
    return(applicationid,attemptid,str(int(container_id)),splitted_host[0])

In [11]:
# get container with attempt id 
containers = log_txt.filter(lambda x : x.find('FiCaSchedulerNode')>-1 and x.find('1580812675067')>-1)\
              .map(lambda x: x.split())\
              .map(lambda x: (x[8],x[15]))\
              .map(lambda x: parse_container(x))\


Structure of RDD

| ApplicationID | AttemptID | ContainerID | Host name |
|---------------|-----------|-------------|-----------|
|               |           |             |           |

####  d. Combine the previous RDDs together

In [12]:
# zip the apps RDD so that we get the applicationid as key when doing an outerjoin
apps_zip = apps.map(lambda x: (x[0], (x[1], x[2],x[3],x[4],x[5])))
rdd_join = apps_zip.leftOuterJoin(users)
# we obtain an RDD that groups users and applications

In [13]:
# zip the containers rdd to get (applicationid,attemptid)as key and join it to the previous obtained rdd (after also performing some transformations)
containers_zip = containers.map(lambda x: ((x[0],x[1]),(x[2],x[3])))
rdd_join_zip = rdd_join.map(lambda x: ((x[0],x[1][0][0]),(x[1][0][1],x[1][0][2],x[1][0][3],x[1][0][4],x[1][1])))

In [14]:
# prepare RDD for a groupby then get the application attempts and all its containers
joined = rdd_join_zip.leftOuterJoin(containers_zip)\
         .map(lambda x: ((x[0][0],x[0][1],x[1][0][0],x[1][0][1],x[1][0][2],x[1][0][3],x[1][0][4]),(x[1][1][0],x[1][1][1])))\
         .groupByKey().mapValues(list)

In [15]:
# sort according to containerid
joined = joined.map(lambda x : (x[0],sorted(x[1])))

In [16]:
# change the format of output to fit the submission criteria 
# change the attemptid to a number so that it fits the submission criteria ( we take the last 6 digits )
joined = joined.map(lambda x : ((x[0][0],x[0][6],x[0][5]),(int(x[0][1][-6:]),x[0][2],x[0][3],x[0][4],x[1]))).groupByKey().mapValues(list)

# 3. Printing Output

In [17]:
# filter RDD so that we get applications with ids between 121 and 130 
output = joined.filter(lambda x: int(x[0][0][-4:]) >= 121 and int(x[0][0][-4:]) <= 130).sortByKey()


In [18]:
def print_output(x):
    """
    Prints information required for the submission
    Parameters:
    x (RDD): The RDD containing the applications info
    """
    f= open("answers.txt","a")
    nb_attempts = x[0][2]
    f.write('ApplicationId : ' + x[0][0]+ '\n\n')
    f.write('User : ' + x[0][1] + '\n\n')
    f.write('NumAttempts  : ' + str(nb_attempts) + '\n\n')
    for i in range(nb_attempts):
        f.write('AttemptNumber : ' + str(x[1][i][0]) + '\n\n')
        f.write('StartTime   : ' + x[1][i][1]+ '\n\n')
        f.write('EndTime   : ' + x[1][i][2] + '\n\n')
        f.write('FinalStatus  : ' + x[1][i][3] + '\n\n')
        f.write('Containers  : ' + (str(x[1][i][4])[1:-1]).replace('\'', '') + '\n\n')
    f.close()

In [19]:
# Collect and print the output to answers.txt
text = output.collect()
for x in text:
    print_output(x)

# 4. Answering questions 

#### Which user has submitted the highest number of applications? How many?

In [20]:
# get all users with the count of how many application they launched
users_app_frequency = joined.map(lambda x: ((x[0][1],x[0][0]))).distinct().countByKey()

In [21]:
# get the top user 
top_user = sorted(users_app_frequency, key=users_app_frequency.get, reverse=True)[0]
top_user_count = users_app_frequency[top_user]

In [22]:
# Write answer to file. 
f= open("answers.txt","a")
f.write('1. ' + top_user + ', ' + str(top_user_count) + '\n\n')
f.close()

#### Which user has the highest number of unsuccessful attempts of applications? How many? We define an unsuccessful attempt to be one with a final status which is not FINISHING (i.e., KILLED or FAILED)


In [23]:
# get user with the end state of each attempt
temp = rdd_join.map(lambda x: (x[1][1],x[1][0][3]))

In [24]:
# filter end state of each attempt to a final status which is not FINISHING
temp = temp.filter(lambda x: not(x[1].startswith('FINISHING') ))

In [25]:
# get users with the failed attempts count
failed_users = temp.countByKey()

In [26]:
# get top user and count
top_user = sorted(failed_users, key=failed_users.get, reverse=True)[0]
top_user_count = failed_users[top_user]

In [27]:
# Write answer to file. 
f= open("answers.txt","a")
f.write('2. ' + top_user + ', ' + str(top_user_count) + '\n\n')
f.close()

#### List the number of applications that started on the same date for each date on which at least one application started. We define application start time as the start time of its first attempt.

In [28]:
# get dates and the number of applications for each date by counting by key ( and selecting the date as key)
dates_freq = joined.map(lambda x: (x[1][0][1].split()[0],x[0][0])).countByKey()

In [29]:
# sort dates 
dates_freq_sorted = sorted(dates_freq.items(), key=lambda kv: kv[1], reverse=True)



In [30]:
dates_freq_sorted

[('2020-02-11', 61),
 ('2020-02-13', 29),
 ('2020-02-12', 22),
 ('2020-02-10', 14),
 ('2020-02-16', 4),
 ('2020-02-04', 1)]

In [31]:
# Write answer to file. 
f= open("answers.txt","a")

f.write('3. ')
answer = ''
for date in dates_freq_sorted:
    answer+=date[0]+ ': ' + str(date[1])+', '
f.write(answer[:-2]+'\n\n')

f.close()

#### What is the mean application duration (from starting the first attempt till the end of the last attempt) in ms (rounded to an integer value)?

In [32]:
# get application id with sorted attempts
temp = joined.map(lambda x: (x[0][0],sorted(x[1])))

In [33]:
# get the application id with the start of first attempt and end of last attempt
temp = temp.map(lambda x: (x[0][0],x[1][0][1],x[1][-1][2]))


In [34]:
from datetime import datetime
from datetime import timedelta
# get the difference between first attempt and end of last attempt
temp2 = temp.map(lambda x: (datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S,%f") - datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S,%f")).total_seconds()*1000)

In [35]:
# Write answer to file. 
f= open("answers.txt","a")
f.write('4. ')
f.write(str(int(temp2.mean())) +'\n\n')
f.close()


#### What is the mean duration of application attempts that completed successfully in ms (rounded to an integer value)? We define an appattempt to be successful if its final state (defined as earlier) is FINISHING.

In [36]:
# get application id with sorted attempts
temp = joined.map(lambda x: (x[0][0],sorted(x[1])))

temp = temp.filter(lambda x: x[1][-1][3].startswith('FINISHING'))

# get the application id with the start of first attempt and end of last attempt
temp = temp.map(lambda x: (x[0][0],x[1][-1][1],x[1][-1][2]))

# get the difference between first attempt and end of last attempt
temp2 = temp.map(lambda x: (datetime.strptime(x[2], "%Y-%m-%d %H:%M:%S,%f") - datetime.strptime(x[1], "%Y-%m-%d %H:%M:%S,%f")).total_seconds()*1000)

# Write answer to file. 
f= open("answers.txt","a")
f.write('5. ')
f.write(str(int(temp2.mean())) +'\n\n')
f.close()

#### How many different machines have hosted containers? What are their hostnames, sorted in lexicographic order?

In [37]:
# get machines
machines = containers.map(lambda x: x[3]).distinct()

In [38]:
# host names of machines in lexicographic order
machines = sorted(machines.collect(),reverse=False)

In [39]:
# number of different machines in totel
nr = len(machines)

In [40]:
# Write answer to file. 
f= open("answers.txt","a")
f.write('6. ' + str(nr) + ', ')
names = ''
for host in machines:
    names += host + ', '

f.write(names[:-2]+'\n\n')
f.close()


#### Which machine hosted the maximum number of applications? How many? We consider a machine to have hosted an application if it launched at least one container in any of the attempts of that application.

In [41]:
# get machines with applications
machines_with_apps_count = containers.map(lambda x: (x[3],x[0])).distinct().countByKey()

In [42]:
# get top user and count
top_machine = sorted(machines_with_apps_count, key=machines_with_apps_count.get, reverse=True)[0]
top_machine_count = machines_with_apps_count[top_machine ]

In [43]:
# Write answer to file. 
f= open("answers.txt","a")
f.write('7. ' + top_machine + ', ' + str(top_machine_count) + '\n\n')
f.close()