README
asg6-bpyle

Assumptions:
    For part 2, I assumed that any job with timestamp of 0 can be ignored
    Also any job with eventType of 1 can be ignored
    Also any job that has a start with no end time or vice versa can be ignored
    
    For part 4, I calculated the "total CPU usage" for a job by adding together (exec_time * avg_usage) for all
    tasks of a given job. I then took the total time of the job and divided the sum/total_time. I did this because
    you said the units were CPUCore-Seconds PER SECOND.
    

Contributorship:
    I worked on this assignment alone, using only course materials. I posted a discussion 
    and consulted with other students about the results of the mapreduce calls (no one I talked
    to had the same answers but I think they are wrong).

In [None]:
!cypress-kinit

In [None]:
!klist

### Part 1
Create the two directories named job_events and task_usage under 
your home directory on HDFS and then store the decompressed 
job_events and task_usage files into these respective directories (5 points). 

In [None]:
!cp -r /scratch3/lngo/gtrace/job_events/ /scratch3/bpyle/gtrace/; 
!cp -r /scratch3/lngo/gtrace/task_usage/ /scratch3/bpyle/gtrace/;

In [None]:
!gunzip /scratch3/bpyle/gtrace/job_events/*.csv.gz;
!gunzip /scratch3/bpyle/gtrace/task_usage/*.csv.gz;

In [None]:
!hdfs dfs -mkdir job_events/; hdfs dfs -put /scratch3/bpyle/gtrace/job_events/*.csv ./job_events/;
!hdfs dfs -mkdir task_usage/; hdfs dfs -put /scratch3/bpyle/gtrace/task_usage/*.csv ./task_usage/

### Part 2
Calculate the total run time from SUBMIT to either EVICT, FAIL, 
FINISH, KILL, or LOST for each job (job_events) and return only 
the top 20 longest run time jobs. The printout should use a tab-separated 
format that includes jobID, the total run time, and the job's final status 
(EVICT, FAIL, FINISH, KILL, or LOST). Run this job inside your notebook 
and print out the result of this job (5 points).  

Job Events Table
1. timestamp
2. missing info
3. jobID
4. event type
5. user name
6. scheduling class
7. job name
8. logical job name

SUBMIT(0)

EVICT(2)
FAIL(3)
FINISH(4)
KILL(5)
LOST(6)

In [54]:
%%writefile jobEventsMapper.py
#!/usr/bin/env python
import sys

MAXTIMESTAMP = pow(2, 63) - 1

for line in sys.stdin:
    line = line.strip()
    line = line.split(",")
    try:
        ts = int(line[0])
        jobID = int(line[2])
        eventType = int(line[3])
    except ValueError:
        continue
        
    if ts != 0 and ts != MAXTIMESTAMP and eventType != 1:
        print('%d\t%d\t%d' % (jobID, ts, eventType))
    


Overwriting jobEventsMapper.py


In [55]:
%%writefile jobEventsReducer.py
#!/usr/bin/env python
import sys

lastID = None
lastType = None
startTime = None
elapsedTime = -1

#these are all counters for debugging
total = 0
newStartEvents = 0
startEvents = 0
multiStartEvents = 0
newEndEvents = 0
endEvents = 0
endEventsFirst = 0
successes = 0
noStart = 0

for line in sys.stdin:
    line = line.strip()
    line = line.split("\t")
    try:
        jobID = int(line[0])
        ts = int(line[1])
        eventType = int(line[2])
    except ValueError:
        continue
        
    total += 1
    
    if eventType == 0:
        startEvents += 1
    if eventType != 0:
        endEvents += 1
    
    if jobID != lastID:
        startTime = None
        lastID = jobID
        if eventType == 0: # if start event
            newStartEvents += 1
            startTime = ts
        else: #first event is end DO NOT USE
            endEventsFirst += 1
            
    else: #if on same ID
        if eventType != 0: # end event
            newEndEvents += 1
            if startTime != None:
                elapsedTime = ts - startTime
                print("%d\t%d\t%d\n" % (elapsedTime, jobID, eventType))
                successes += 1
                startTime = None
            else:
                noStart += 1
        else: #start event in same ID
            multiStartEvents += 1
            startTime = ts
    
# Uncomment for debugging
# print("New Start Events: %d\n" % (newStartEvents))
# print("Start Events: %d\n" %(startEvents))
# print("Multi Start Events: %d\n" % (multiStartEvents))
# print("No Start: %d\n" % (noStart))
# print("New End Events: %d\n" % (newEndEvents))
# print("End Events First: %d\n" % (endEventsFirst))
# print("End Events: %d\n" % (endEvents))
# print("Total: %d\n" %(total))
# print("Successes: %d\n" %(successes))

Overwriting jobEventsReducer.py


In [60]:
%%writefile jobEventsSorter.py
#!/usr/bin/env python

import sys
import bisect

def getEventString(eventInt):
    if eventInt == 0:
        return("SUBMIT")
    elif eventInt == 1:
        return("SCHEDULE")
    elif eventInt == 2:
        return("EVICT")
    elif eventInt == 3:
        return("FAIL")
    elif eventInt == 4:
        return("FINISH")
    elif eventInt == 5:
        return("KILL")
    elif eventInt == 6:
        return("LOST")
    
#SUBMIT(0) SCHEDULE(1) EVICT(2) FAIL(3) FINISH(4) KILL(5) LOST(6)

items = []

for line in sys.stdin:
        
    line = line.strip().split("\t")
    
    try: 
        time = int(line[0])
        jobID = int(line[1])
        eventType = int(line[2])
    except ValueError:
        continue
    
    bisect.insort(items, (int(time), int(jobID), int(eventType)))
    if len(items) > 20:
        items.remove(items[0])

print("Top 20 Jobs\n[ID]\t[Time]\t[Type]\n")
for item in reversed(items):
    print("%s\t%s\t%s\n" % (item[1], item[0], getEventString(item[2])))

Overwriting jobEventsSorter.py


In [None]:
!hdfs dfs -cat job_events/part-*-of-00500.csv \
    2>/dev/null \
    | python jobEventsMapper.py \
    | sort \
    | python jobEventsReducer.py \
    | sort

In [58]:
!hdfs dfs -rm -R job_events/output-top-20-times
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input job_events/part-*-of-00500.csv \
    -output job_events/output-top-20-times \
    -file ./jobEventsMapper.py \
    -mapper jobEventsMapper.py \
    -file ./jobEventsReducer.py \
    -reducer jobEventsReducer.py \

rm: `job_events/output-top-20-times': No such file or directory
17/10/17 20:15:24 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./jobEventsMapper.py, ./jobEventsReducer.py] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob5712733909097439707.jar tmpDir=null
17/10/17 20:15:26 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/17 20:15:26 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/17 20:15:27 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14792 for bpyle on ha-hdfs:dsci
17/10/17 20:15:27 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:dsci, Ident: (HDFS_DELEGATION_TOKEN token 14792 for bpyle)
17/10/17 20:15:27 INFO lzo.GPLNativeCodeLoader: Loaded native gpl librar

In [61]:
!hdfs dfs -cat job_events/output-top-20-times/part-00000 | python jobEventsSorter.py

Top 20 Jobs
[ID]	[Time]	[Type]

6272373604	2219071470839	FAIL

6262216777	2179583847857	KILL

6254863670	2075738599153	KILL

6254863987	2075736980619	KILL

6256219395	2057085980979	KILL

6273334224	1997920913533	KILL

6273473735	1915434479654	KILL

6283813709	1884643427720	KILL

6275743443	1881727884505	KILL

6276433166	1875768873954	KILL

6276523719	1874887733830	KILL

6276685724	1873363338733	KILL

6283245463	1843085463781	FAIL

6271227810	1842057638079	KILL

6288829356	1810226800987	KILL

6288842611	1809984463546	KILL

6295425701	1796937871697	KILL

6256516525	1787831764245	KILL

6276799325	1782978547093	KILL

6288553115	1775916679684	KILL



### Part 3
Count the number of computing tasks for 
each unique job (task_usage) and sort 
the results using total number of computing 
tasks. Run this job inside your notebook
and print out the top 20 lines of the result (10 points). 

1. timestamp
2. missing info
3. job ID
4. task index - within the job
5. machine ID
6. event type
...

In [64]:
%%writefile numTasksMapper.py
#!/usr/bin/env python

import sys

count = 0
tasks = {}

for line in sys.stdin:
    line = line.strip().split(",")
    try:
        jobID = int(line[2])
        index = int(line[3])
    except ValueError:
        continue
    
    if jobID not in tasks:
        tasks[jobID] = index
    else:
        if index > task[jobID]:
            tasks[jobID] = index

for key, value in tasks.items():
    print("%d\t%d\n" % (key, value))


Overwriting numTasksMapper.py


In [77]:
%%writefile numTasksReducer.py
#!/usr/bin/env python

import sys

lastID = None
maxIndex = -1

for line in sys.stdin:
    line = line.strip().split("\t")
    try:
        jobID = int(line[0])
        index = int(line[1])
    except ValueError:
        continue
        
    if jobID == lastID:
        if index > maxIndex:
            maxIndex = index
    else:
        if lastID:
            print("%d\t%d\n" %(maxIndex, lastID))
        lastID = jobID
        maxIndex = index
if lastID == jobID:
    print("%d\t%d\n" %(maxIndex, lastID))

Overwriting numTasksReducer.py


In [1]:
%%writefile sortTasks.py
#!/usr/bin/env python

import sys
import bisect


items = []

for line in sys.stdin:
    line = line.strip().split("\t")
    
    try:
        index = int(line[0])
        jobID = int(line[1])
    except ValueError:
        continue
        
    bisect.insort(items, (index, jobID))
    if len(items) > 20:
        items.remove(items[0])
        
print("Top 20 Task Count\n[ID]\t[Count]\n")
for item in reversed(items):
    print("%s\t%s\n" % (item[1], item[0] + 1)) #add one because max index is not the max count. Need to account
                                               # for index 0


Overwriting sortTasks.py


In [None]:
!hdfs dfs -cat ./task_usage/part-00000-of-00500.csv \
    2>/dev/null \
    | head -n 20 \
    | python numTasksMapper.py \
    | sort \
    | python numTasksReducer.py \
    | sort

In [79]:
!hdfs dfs -rm -R task_usage/output-top-20-tasks
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input task_usage/part-*-of-00500.csv \
    -output task_usage/output-top-20-tasks \
    -file ./numTasksMapper.py \
    -mapper numTasksMapper.py \
    -file ./numTasksReducer.py \
    -reducer numTasksReducer.py \

17/10/17 23:03:41 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/bpyle/task_usage/output-top-20-tasks' to trash at: hdfs://dsci/user/bpyle/.Trash/Current/user/bpyle/task_usage/output-top-20-tasks1508295821525
17/10/17 23:03:43 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./numTasksMapper.py, ./numTasksReducer.py] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob6713185405074756128.jar tmpDir=null
17/10/17 23:03:45 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/17 23:03:45 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/17 23:03:45 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14841 for bpyle on ha-hdfs:dsci
17/10/17 23:03:46 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN,

17/10/17 23:04:54 INFO mapreduce.Job:  map 88% reduce 29%
17/10/17 23:05:19 INFO mapreduce.Job:  map 89% reduce 29%
17/10/17 23:05:22 INFO mapreduce.Job:  map 89% reduce 30%
17/10/17 23:05:45 INFO mapreduce.Job:  map 90% reduce 30%
17/10/17 23:06:10 INFO mapreduce.Job:  map 91% reduce 30%
17/10/17 23:06:36 INFO mapreduce.Job:  map 92% reduce 30%
17/10/17 23:06:37 INFO mapreduce.Job:  map 92% reduce 31%
17/10/17 23:07:01 INFO mapreduce.Job:  map 93% reduce 31%
17/10/17 23:07:20 INFO mapreduce.Job:  map 94% reduce 31%
17/10/17 23:07:32 INFO mapreduce.Job:  map 95% reduce 31%
17/10/17 23:07:34 INFO mapreduce.Job:  map 95% reduce 32%
17/10/17 23:07:45 INFO mapreduce.Job:  map 96% reduce 32%
17/10/17 23:07:59 INFO mapreduce.Job:  map 97% reduce 32%
17/10/17 23:08:10 INFO mapreduce.Job:  map 98% reduce 32%
17/10/17 23:08:11 INFO mapreduce.Job:  map 98% reduce 33%
17/10/17 23:08:12 INFO mapreduce.Job:  map 99% reduce 33%
17/10/17 23:08:15 INFO mapreduce.Job:  map 100% reduce 33%
17/10/17 23:0

In [2]:
!hdfs dfs -cat task_usage/output-top-20-tasks/part-00000 | python sortTasks.py

Top 20 Task Count
[ID]	[Count]

6324858588	758720

6221861800	618576

6330981257	369529

6484993276	288265

6484359944	271407

6336594489	239168

6337197528	229551

6331417954	211529

6483991792	210658

6405910961	172023

6484189122	169764

6337545460	160469

6483748490	157323

6337022710	147364

6258695096	144285

6419322512	134101

6467588751	131944

6280685099	113012

6405537326	108241

6468875195	100813



### Part 4
Calculate total CPU usage time (CPU-core seconds per second) 
for each job (task_usage) and sort the results using total CPU 
usage time from highest to lowest. The printout should use a 
tab-separated format that includes jobID and the total CPU usage time. 
Run this job inside your notebook and print out the top 20 lines of the result (10 points).  

1. start time of the measurement period
2. end time of the measurement period
3. job ID
4. task index
5. machine ID
6. mean CPU usage rate
...

In [72]:
%%writefile cpuUsageMapper.py
#!/usr/bin/env python

import sys

tasks = {}

for line in sys.stdin:
    line = line.strip().split(",")
    try:
        jobID = int(line[2])
        time = int(line[1]) - int(line[0])
        rate = float(line[5])
        usage = float(time) * rate
    except ValueError:
        continue
    
    if jobID not in tasks:
        tasks[jobID] = (usage, time)
    else:
        newtuple = (tasks[jobID][0] + usage, tasks[jobID][1])
        tasks[jobID] = newtuple
        
for key, value in tasks.items():
    print("%d\t%f\t%d\n" % (key, value[0], value[1]))
    

Overwriting cpuUsageMapper.py


In [73]:
%%writefile cpuUsageReducer.py
#!/usr/bin/env python

import sys

lastID = None
totalTime = 0.0
totalUsage = 0.0

for line in sys.stdin:
    line = line.strip().split("\t")
    try:
        jobID = int(line[0])
        usage = float(line[1])
        time = float(line[2])
    except ValueError:
        continue
    
    if lastID == jobID:
        totalTime += time
        totalUsage += usage
    else:
        if lastID:
            print("%d\t%f\n" % (lastID, totalUsage/totalTime))
        totalTime = time
        totalUsage = usage
        lastID = jobID
    

Overwriting cpuUsageReducer.py


In [74]:
%%writefile cpuUsageSorter.py
#!/usr/bin/env python

import sys
import bisect

jobs = []

for line in sys.stdin:
    line = line.strip().split("\t")
    try:
        jobID = int(line[0])
        cpuUsage = float(line[1])
    except ValueError:
        continue
        
    bisect.insort(jobs, (cpuUsage, jobID))
    if len(jobs) > 20:
        jobs.remove(jobs[0])
    
print("Top 20 CPU Usage\n[ID]\t[CPU_Usage]")
for job in reversed(jobs):
    print("%d\t%f\n" % (job[1], job[0]))

Overwriting cpuUsageSorter.py


In [None]:
!hdfs dfs -cat ./task_usage/part-00000-of-00500.csv \
    2>/dev/null \
    | head -n 20 \
    | python cpuUsageMapper.py \
    | sort \
    | python cpuUsageReducer.py \
    | sort

In [75]:
!hdfs dfs -rm -R task_usage/output-top-20-cpu
!yarn jar /usr/hdp/current/hadoop-mapreduce-client/hadoop-streaming.jar \
    -input task_usage/part-*-of-00500.csv \
    -output task_usage/output-top-20-cpu \
    -file ./cpuUsageMapper.py \
    -mapper cpuUsageMapper.py \
    -file ./cpuUsageReducer.py \
    -reducer cpuUsageReducer.py \

17/10/17 20:31:35 INFO fs.TrashPolicyDefault: Moved: 'hdfs://dsci/user/bpyle/task_usage/output-top-20-cpu' to trash at: hdfs://dsci/user/bpyle/.Trash/Current/user/bpyle/task_usage/output-top-20-cpu
17/10/17 20:31:37 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./cpuUsageMapper.py, ./cpuUsageReducer.py] [/usr/hdp/2.6.0.3-8/hadoop-mapreduce/hadoop-streaming-2.7.3.2.6.0.3-8.jar] /hadoop_java_io_tmpdir/streamjob1341711588475691070.jar tmpDir=null
17/10/17 20:31:39 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/17 20:31:39 INFO client.AHSProxy: Connecting to Application History server at dscim003.palmetto.clemson.edu/10.125.8.215:10200
17/10/17 20:31:40 INFO hdfs.DFSClient: Created HDFS_DELEGATION_TOKEN token 14794 for bpyle on ha-hdfs:dsci
17/10/17 20:31:40 INFO security.TokenCache: Got dt for hdfs://dsci; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs

17/10/17 20:32:39 INFO mapreduce.Job:  map 72% reduce 23%
17/10/17 20:32:40 INFO mapreduce.Job:  map 74% reduce 23%
17/10/17 20:32:41 INFO mapreduce.Job:  map 76% reduce 25%
17/10/17 20:32:42 INFO mapreduce.Job:  map 79% reduce 25%
17/10/17 20:32:43 INFO mapreduce.Job:  map 80% reduce 25%
17/10/17 20:32:44 INFO mapreduce.Job:  map 81% reduce 26%
17/10/17 20:32:45 INFO mapreduce.Job:  map 82% reduce 26%
17/10/17 20:32:46 INFO mapreduce.Job:  map 83% reduce 26%
17/10/17 20:32:47 INFO mapreduce.Job:  map 83% reduce 28%
17/10/17 20:32:49 INFO mapreduce.Job:  map 84% reduce 28%
17/10/17 20:32:51 INFO mapreduce.Job:  map 85% reduce 28%
17/10/17 20:32:55 INFO mapreduce.Job:  map 86% reduce 28%
17/10/17 20:32:57 INFO mapreduce.Job:  map 86% reduce 29%
17/10/17 20:33:00 INFO mapreduce.Job:  map 87% reduce 29%
17/10/17 20:33:15 INFO mapreduce.Job:  map 88% reduce 29%
17/10/17 20:33:40 INFO mapreduce.Job:  map 89% reduce 29%
17/10/17 20:33:43 INFO mapreduce.Job:  map 89% reduce 30%
17/10/17 20:34

In [76]:
!hdfs dfs -cat ./task_usage/output-top-20-cpu/part-00000 | python cpuUsageSorter.py

Top 20 CPU Usage
[ID]	[CPU_Usage]
6283917433	46093.074042

6279559429	29227.922307

6436963420	28976.109380

6477911130	26732.013299

6440940956	22211.296463

6309295531	20152.227544

6415219880	18656.967729

6409272482	16615.456043

6301702571	15890.345033

6330480082	15227.012024

6304228795	14203.806631

6427605842	12768.415754

6455601542	12728.731480

6316799672	11650.149211

6419322512	11107.614565

6464342149	10489.754953

6282808418	10255.289219

6290003708	10219.845489

6288046928	8661.764749

6305645816	8023.026780

