# YARN / MapReduce - Usage Analysis

In [None]:
from __future__ import print_function
import json
import pprint
import pandas as pd
import numpy as np
from pandas.io.json import json_normalize
import matplotlib.pyplot as plt
import requests
from requests_kerberos import HTTPKerberosAuth
from datetime import datetime
plt.style.use('seaborn-deep')
%matplotlib inline

## Yarn 

### Fetching YARN statistics

Connect to an edge node to fetch the statistics from the YARN timeline server

In [None]:
host = 'SOMESERVER'
yarn_timeline = 'SOMESERVER'
!ssh -q {host} "kinit -kt /path/to/keytab <principalname>"
!ssh -q {host} "curl -q --negotiate -u : -L -H 'Accept: application/json' -X GET  http://{yarn_timeline}:8188/ws/v1/applicationhistory/apps > yarn-app-history"
!scp -q {host}:yarn-app-history data/yarn-app-history.json

In [None]:
yarn_data = json_normalize(json.loads(open('data/yarn-app-history.json').read())['app'])
yarn_data['submittedTimeAsDay'] = yarn_data['submittedTime'].apply(lambda x: datetime.utcfromtimestamp(x/1000).date())
yarn_data.head(1)

In [None]:
# We should be able to do the same directly with requests/kerberos if direct connection is possible. 
#yarn_stats_uri = 'http://someserver:8188/ws/v1/applicationhistory/apps'
#response = requests.get(yarn_stats_uri, auth=HTTPKerberosAuth())
#print(response.text)
#if response.status_code != 200:
#    raise RuntimeError('Error fetching yarn statistics')
#yarn_source_apps = response.json()['apps']
#print('%d YARN applications retrieved' % len(yarn_source_apps))
#yarn_data = json_normalize(yarn_source_apps)
#yarn_data_data.head()

### YARN Analysis

In [None]:
ax = yarn_data.groupby('type').size()\
    .plot(kind='barh', title='Repartition of jobs by type', figsize=(15, 5))
ax.set_ylabel('')
ax.set_xlabel('Number of jobs')

In [None]:
ax = yarn_data.groupby(['submittedTimeAsDay', 'type']).size()\
    .unstack('type')\
    .plot(kind='bar', stacked=True, title='Repartition of jobs by type', figsize=(15,5))
ax.set_xlabel('')
ax.set_ylabel('Number of jobs')

In [None]:
ax = yarn_data.groupby(['user', 'type']).size().unstack()\
    .plot(kind='barh', stacked=True, title='Repartition of jobs by type and user', figsize=(15,5))
ax.set_ylabel('')
ax.set_xlabel('Number of jobs')

In [None]:
ax = yarn_data.groupby('user').size().sort_values(ascending=True)\
    .plot(kind='barh', figsize=(15,5), title='Total number of jobs by user')
ax.set_xlabel('Number of jobs')
ax.set_ylabel('')

In [None]:
ax = yarn_data.groupby(['submittedTimeAsDay', 'user']).size().unstack()\
    .plot(kind='bar', stacked=True, title='Repartition of jobs by user', figsize=(15,5))
ax.set_ylabel('Number of jobs')
ax.set_xlabel('')

In [None]:
ax = yarn_data.groupby(['submittedTimeAsDay', 'finalAppStatus']).size().unstack()\
    .plot(kind='bar', stacked=True, title='Repartition of jobs by status', figsize=(15,5))
ax.set_ylabel('Number of jobs')
ax.set_xlabel('')

In [None]:
mapreduce_stats = yarn_data[(yarn_data['type'] == 'MAPREDUCE') & (yarn_data['elapsedTime'] != -1)]

In [None]:
mr_elapsedTime=pd.concat( [
        mapreduce_stats.groupby('user')['elapsedTime'].min().to_frame().rename(columns={'elapsedTime':'min'}),
        mapreduce_stats.groupby('user')['elapsedTime'].mean().to_frame().rename(columns={'elapsedTime':'mean'}),
        mapreduce_stats.groupby('user')['elapsedTime'].max().to_frame().rename(columns={'elapsedTime':'max'})
], axis=1)

In [None]:
ax = mr_elapsedTime['mean'].apply(lambda x: x/60.)\
    .plot(kind='barh', figsize=(15,5), title='Average elapsed time for MR jobs')
ax.set_xlabel('Time in minutes')
ax.set_ylabel('')

In [None]:
tez_stats = yarn_data[(yarn_data['type'] == 'TEZ') & (yarn_data['elapsedTime'] != -1)]
tez_elapsedTime = pd.concat( [
        tez_stats.groupby('user')['elapsedTime'].min().to_frame().rename(columns={'elapsedTime':'min'}),
        tez_stats.groupby('user')['elapsedTime'].mean().to_frame().rename(columns={'elapsedTime':'mean'}),
        tez_stats.groupby('user')['elapsedTime'].max().to_frame().rename(columns={'elapsedTime':'max'})
], axis=1)

In [None]:
ax = tez_elapsedTime['mean'].apply(lambda x: x/60.)\
    .plot(kind='barh', figsize=(15,5), title='Average elapsed time for Tez jobs')
ax.set_xlabel('Time in minutes')
ax.set_ylabel('')

## MapReduce statistics

## Fetching mapreduce statistics

In [None]:
mapreduce_uri = 'http://SOMESERVER:19888/ws/v1/history/mapreduce/jobs'
response = requests.get(mapreduce_uri)
if response.status_code != 200:
    raise RuntimeError('Error fetching mapreduce statistics')
mapreduce_source_jobs = response.json()['jobs']['job']
mapreduce_data = json_normalize(mapreduce_source_jobs)

In [None]:
mapreduce_data.head(2)

In [None]:
mapreduce_data.describe()

In [None]:
mapsTotal=pd.concat( [
        mapreduce_data.groupby('user')['mapsTotal'].min().to_frame().rename(columns={'mapsTotal':'min'}),
        mapreduce_data.groupby('user')['mapsTotal'].mean().to_frame().rename(columns={'mapsTotal':'mean'}),
        mapreduce_data.groupby('user')['mapsTotal'].max().to_frame().rename(columns={'mapsTotal':'max'})
], axis=1)

In [None]:
ax = mapsTotal.plot(kind='barh', figsize=(15,5), title='Overview of num maps per user')
ax.set_ylabel('')
ax.set_xlabel('Number of map tasks')