## DS 5460 Big Data Scaling, SPRING 2024

##      Homework Assignment \#1: Introduction to MapReduce

Objectives:
- This assignment is designed to help students better understand the MapReduce Paradigm through hands-on practice.
- Students will become familiar (if not already) with Linux/Unix piping and sorting and Jupyter magic command `%%writefile` and `%%timeit`. 

Instructions:
This assignment consists of 6 questions. Please be sure to read all text cell descriptions and comments closely to fill in your solution when expected. We will only grade code written in the designated spaces. If a question's instructions are unclear, please reach out for clarification on Piazza. We expect each student to write their own code independently. If GenAI tools are used, **where and how they were used must be disclosed properly in a separate text cell at the end of this notebook**.

__TIPS:__ 
1. If you're not familiar with Linux/Unix **piping** and **redirecting**, check out this tutorial first: https://ryanstutorials.net/linuxtutorial/piping.php You will need to understand the differences to answer some of the later questions.
2. Make use of your peers and TAs by asking questions on Piazza. Everyone has different experiences and background so don't be shy; all questions are welcome!



# Notebook Set-Up
Before starting your homework run the following cells to confirm your setup.

In [14]:
!mkdir -p /notebooks/hw1 

In [15]:
# TODO: update the working directory with the location of your homework notebook
%cd /notebooks/hw1 

/notebooks/hw1


In [16]:
!pwd

/notebooks/hw1


In [17]:
# confirm you are running Python 3
import sys
sys.version_info

sys.version_info(major=3, minor=7, micro=4, releaselevel='final', serial=0)

In [18]:
# imports
import re
import sys

# Question 1: Download the Data

In this assignment we'll continue to work with the free plain text version of _Alice's Adventures in Wonderland_ available from Project Gutenberg. __Use the first two cells below to download this text from http://www.gutenberg.org/files/11/11-0.txt and preview the first few lines.__ 

In [34]:
# TODO: write a Unix command download the data to this local file /data/alice.txt (like we did in week2-demo)
# from http://www.gutenberg.org/files/11/11-0.txt 
!curl -L http://www.gutenberg.org/files/11/11-0.txt --output /data/alice.txt

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100  170k  100  170k    0     0   579k      0 --:--:-- --:--:-- --:--:--  579k


In [36]:
# TODO: write a Unix command to print the first 5 lines of your downloaded file
!head -n 5 /data/alice.txt

﻿The Project Gutenberg eBook of Alice’s Adventures in Wonderland, by Lewis Carroll

This eBook is for the use of anyone anywhere in the United States and
most other parts of the world at no cost and with almost no restrictions
whatsoever. You may copy it, give it away or re-use it under the terms


It's nice to develop a habit of creating small files with simulated data for use in developing, debugging and testing your code, particularly in this assignment. The jupyter magic command `%%writefile` is a convenient way to do this. __Run the following cells to create a test data folder and file for use in our word counting task.__

In [22]:
%%writefile /data/hw1_test.txt
Good luck on the first homework assignment.
Hope you have fun while practicing your Linux/Unix skills

Writing /data/hw1_test.txt


In [24]:
# confirm the file was created in the data directory using a grep command:
!ls /data | grep test

alice_test.txt
hw1_test.txt


# Question 2: Word Count in Python

In this part of the homework, you will practice writing Python scripts to read from standard input and using Unix piping commands to run these programs and save their output to file, which are particularly useful skills to have when developing programs in the cloud. __In this question you will write a short python script to perform the Word Count task and then run your script on the _Alice in Wonderland_ text__. You can think of this like a 'baseline' implementation that we'll later compare to the parallelized version of the same task.

### Q2 Tasks:

* __a) code:__ Complete the Python script in the file __`wordCount.py`__. Read all comments carefully to be sure you understand the expected behavior of this function. Please do not code outside of the marked location.


* __b) testing:__ Run the cell marked `part b` to call your script on the test file we created above (`data/hw1_test.txt`). Confirm that your script returns the correct counts for each word by visually comparing the output to the test file. 


* __c) results:__ When you are confident in your implementation, run the cell marked `part c` to count the number of occurrences of each word in _Alice's Adventures in Wonderland_. In the same cell we'll pipe the output to file. Then use the provided `grep` commands to check your answers.


* __d) short response:__ Suppose you decide that you'd really like a word and its plural (e.g. 'hatter' and 'hatters' or 'person' and 'people') to be counted as the same word. After we have run the wordcount would it be more efficient to post-process your output file or discard your output file and start the analysis over with a new tokenizer? Briefly explain your reasoning.

### Q2 Student Answers:
> __a-c)__ _Complete the coding portions of this question before answering 'd'._

> __d)__ TODO: Type your answer here!

In [None]:
# part a - DO YOUR WORK IN wordCount.py

In [28]:
!chmod a+x /wordCount.py

In [29]:
# part a - DO NOT MODIFY THIS CELL - run this cell after you 
# complete wordCount.py so the TA can view your work
!cat /wordCount.py

#!/usr/bin/env python
"""
This script reads lines from STDIN and returns a list of
all words and the count of how many times they occurred.

INPUT:
    a text file
OUTPUT FORMAT:
    word \t count
USAGE:
    python wordCount.py < yourTextFile.txt

Instructions:
    Fill in the missing code below so that the script
    prints tab (\t) separated word counts to Standard Output.
    NOTE: the tokenizing is already done for you, please do
    NOT modify the provided code or you risk breaking things.
"""

# imports
import sys
import re
from collections import defaultdict

counts = defaultdict(int)

# stream over lines from Standard Input
for line in sys.stdin:

    # tokenize
    line = line.strip()
    words = re.findall(r'[a-z]+', line.lower())

############ TODO: YOUR CODE HERE #########
    for word in words:
        counts[word] += 1

for word, count in counts.items():
    print(f'{word}\t{count}')


############ (END) YOUR CODE ##############


In [31]:
# part b - DO NOT MODIFY THIS CELL, just run it as is to test your script
!python /wordCount.py < /data/hw1_test.txt

good	1
luck	1
on	1
the	1
first	1
homework	1
assignment	1
hope	1
you	1
have	1
fun	1
while	1
practicing	1
your	1
linux	1
unix	1
skills	1


In [37]:
# part c - DO NOT MODIFY THIS CELL, just run it as is to perform the word count.
!python /wordCount.py < /data/alice.txt > /data/alice_counts.txt

Take a look at the first 10 words & their counts.

In [39]:
!head /data/alice_counts.txt

the	1839
project	88
gutenberg	98
ebook	13
of	638
alice	403
s	222
adventures	11
in	435
wonderland	7


__Check your results:__ How many times does the word "alice" appear in the book? 

In [41]:
# EXPECTED OUTPUT: 403
!grep alice /data/alice_counts.txt

alice	403


__Check your results:__ How many times does the word "hatter" appear in the book? 

In [42]:
# EXPECTED OUTPUT: 56
!grep hatter /data/alice_counts.txt

hatter	56
hatters	1


__Check your results:__ How many times does the word "queen" appear in the book? 

In [43]:
# EXPECTED OUTPUT: 76
!grep queen /data/alice_counts.txt

queen	76
queens	1


# Question 3: Unix Sorting Practice
Sorting is a useful tool and an important process in MapReduce. Let's practice using Linux/Unix sort.

### Q3 Tasks:
*   0) code: Write a Unix Command to check how many records are in your word count file.
* __a) code:__ Write a Unix command to sort your word count file alphabetically. Save (i.e. [redirect](https://superuser.com/questions/277324/pipes-vs-redirects)) the results to `data/alice_counts_A-Z.txt`. [*HINT: if Unix sort commands are new to you, start with [this biowize blogpost](https://biowize.wordpress.com/2015/03/13/unix-sort-sorting-with-both-numeric-and-non-numeric-keys/) or [this unixschool tutorial](http://www.theunixschool.com/2012/08/linux-sort-command-examples.html)*]

* __b) code:__ Write a Unix command to sort your word count file from highest to lowest count. Save (i.e. [redirect](https://superuser.com/questions/277324/pipes-vs-redirects)) your results to `data/alice_counts_sorted.txt`; then run the provided cell to print the top ten words. Compare your output to the expected output provided.

In [45]:
# part 0: write a command to check how many records are in your word count file (data/alice_count.txt)
!wc -l /data/alice_counts.txt

3006 /data/alice_counts.txt


In [51]:
# part a - unix command to sort your word counts alphabetically 
!cat /data/alice_counts.txt | sort -k1,1 > /data/alice_counts_A-Z.txt

In [52]:
# part a - DO NOT MODIFY THIS CELL, run it as is to confirm your sort worked
!head /data/alice_counts_A-Z.txt

a	695
abide	2
able	1
about	102
above	3
absence	1
absurd	2
accept	1
acceptance	1
accepted	2


In [53]:
# part b - unix command to sort your word counts from highest to lowest count
!cat /data/alice_counts.txt | sort -k2,2nr -k1,1 > /data/alice_counts_sorted.txt

In [55]:
# part b - DO NOT MODIFY THIS CELL, run it as is to confirm your sort worked
!head /data/alice_counts_sorted.txt  

the	1839
and	942
to	811
a	695
of	638
it	610
she	553
i	546
you	486
said	462


<table>
<th>expected output for (a):</th>
<th>expected output for (b):</th>
<tr><td><pre>
a	695
abide	2
able	1
about	102
above	3
absence	1
absurd	2
accept	1
acceptance	1
accepted	2
</pre></td>
<td><pre>
the	1839
and	942
to	811
a	695
of	638
it	610
she	553
i	546
you	486
said	462
</pre></td></tr>
</table>

# Question 4: Simplified Parallel Word Count 
Instead of running the script on the whole dataset at once, we could split our text up in to smaller 'chunks' and process them independently of each other. __In this question you'll use a bash script to "parallelize" your Word Count.__


### Q4 Tasks:
* __a) run provided script:__ The bash script `parallel_wc_v1.sh` takes an input file, splits it into a specified number of 'chunks', and then applies a script of your choice to each chunk. Read through the comments (you are also welcome to read the contents) of this bash file to understand how to run this script - similar to reading any other documentation, pay close attention to the arguments expected. In part a), complete the command to use this script to apply `wordCount.py` to the _Alice_ text in **FOUR (4)** parallel processes. Redirect the results into a file called `alice_pCounts.txt.`


* __b) short response:__ Examine the output from part a) and explain if the output matched what you expected and why or why not.


* __c) run provided script:__ Another script `aggregateCounts.py` reads word counts from standard input and combines any duplicates it encounters. Read through this script to be sure you understand what it does. Then follow the instructions in `parallel_wc_v2.sh` to complete the command in part c) that accepts `aggregateCounts.py` as a 4th argument.  Similarly, run with **FOUR (4)** parallel processes. Redirect the results into a file called `alice_pCounts_v2.txt.` Run the cell below to confirm that you now get the correct results for your 'alice' count. 

* __d) short response:__ Does the order of your scripts passed into the command matter? Explain why or why not.

### Q4 Student Answers:
> __b)__ The output matches my expectation as there is no reducer applied in the mapper.

> __d)__ Yes, because reducer=$4 which means reducer must be on the fourth argument passed to the shell script.

In [57]:
# part a - make sure your scripts are executable (RUN THIS CELL AS IS)
!chmod a+x /parallel_wc_v1.sh
!chmod a+x /wordCount.py

In [63]:
# part a - TODO: copmlete the command to run 4 parallel processes based on the usage of the parallel_wc_v1.sh script
!/parallel_wc_v1.sh 4 /data/alice.txt /wordCount.py  > /data/alice_pCounts.txt

In [64]:
# part b - check alice count (RUN THIS CELL AS IS)
!grep alice /data/alice_pCounts.txt

alice	113
alice	126
alice	122
alice	42


In [66]:
# part c - make sure the aggregateCounts script is executable and parallel_wc_v2 script is executable (RUN THIS CELL AS IS)
!chmod a+x /parallel_wc_v2.sh 
!chmod a+x /aggregateCounts.py

In [67]:
# part c - TODO: copmlete the command to run 4 parallel processes and apply the aggregateCounts.py script
!/parallel_wc_v2.sh 4 /data/alice.txt /wordCount.py /aggregateCounts.py > /data/alice_pCounts_v2.txt

In [68]:
# part c - check alice count (RUN THIS CELL AS IS)
!grep alice /data/alice_pCounts_v2.txt

alice	403


# Question 5: Hadoop Streaming for Average Temperature Calculation

In this question, you will apply Hadoop Streaming to calculate the average temperature per city from the provided dataset named `temperature_data.csv`. _Note: you will need to upload the data to your cluster's local storage_. This will involve writing and using mapper and reducer scripts in Python, and then executing a Hadoop Streaming job.


### Q5 Tasks:
* __a) mapper and reducer scripts development:__ Write two Python scripts, `mapper_temp.py` and `reducer_temp.py`. The mapper script should process each line of the input dataset, which contains city names and corresponding temperatures, and output the city name and temperature. The reducer script should read the mapper output, calculate the average temperature for each city, and output the city name with its average temperature.

* __b) unit testing:__ Make sure you unit test the running of your mapper and reducer scripts using a small dataset like we did before with `alice_test.txt` or `hw1_test.txt`. This time, you will want to create a small data file. _Challenge: see if you can use Unix commands to output the first 5 lines of the provided dataset to a new test file!_

* __c) run a Hadoop Streaming job:__ Use the Hadoop Streaming command to execute your MapReduce job. The command should include the paths to your mapper and reducer scripts, the input file path to `temperature_data.csv`, and the output directory path. Ensure that you properly configure all necessary arguments and environmental variables (Feel free to add additional cells before the hadoop streaming command to do this). After running the job, inspect the output to verify correctness. 

* __d) explore Hadoop Streaming processes:__ Modify your Hadoop Streaming command to experiment with different numbers of reducers and observe how this affects the execution time and the output. In your response, describe how changing the number of reducers impacts the MapReduce job's performance and result organization. _Hint: You will find the `%%timeit` magic command very helpful._


### Q5 Student Answers:

> __d)__ The time increases as the number of reducers increases.

number of reducers = 1: 33 s ± 15.2 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

number of reducers = 2: 43 s ± 28.9 s per loop (mean ± std. dev. of 7 runs, 1 loop each)

number of reducers = 3: 45.8 s ± 500 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [None]:
# part a - Do your work in mapper_temp.py and reducer_temp.py

In [None]:
# part b - TODO: show your unit test here

In [104]:
%%writefile /hw1/tem_test.csv
San Antonio,34.8
San Antonio,21.2
San Antonio,28.0
San Jose,7.8
Phoenix,26.7
Phoenix,24.4
Phoenix,32.9
Dallas,16.3
Dallas,5.2
Dallas,6.4

Overwriting /hw1/tem_test.csv


In [105]:
!chmod a+x /mapper_temp.py /reducer_temp.py

In [111]:
!cat /hw1/tem_test.csv | /mapper_temp.py

San Antonio	34.8
San Antonio	21.2
San Antonio	28.0
San Jose	7.8
Phoenix	26.7
Phoenix	24.4
Phoenix	32.9
Dallas	16.3
Dallas	5.2
Dallas	6.4


In [117]:
!echo 'San Antonio	34.8\nSan Antonio	21.2\nSan Antonio	28.0\nSan Jose	7.8\nPhoenix	26.7\nPhoenix	24.4\nPhoenix	32.9\nDallas	16.3\nDallas	5.2\nDallas	6.4' | /reducer_temp.py

San Antonio	28.0
San Jose	7.8
Phoenix	28.0
Dallas	9.299999999999999


In [123]:
# part c - TODO: write the Hadoop Streaming command to test out your mapper and reducer scripts on the temperature dataset

JAR_FILE = "/usr/lib/hadoop-mapreduce/hadoop-streaming.jar"
HDFS_DIR = "/user/root/hw1"
!hdfs dfs -mkdir -p {HDFS_DIR}
!hdfs dfs -rm -r {HDFS_DIR}/temperature_data.csv
!hdfs dfs -put /hw1/temperature_data.csv {HDFS_DIR}
# store notebook environment path
from os import environ
PATH = environ['PATH']
PATH

rm: `/user/root/hw1/temperature_data.csv': No such file or directory


'/opt/conda/anaconda/bin:/opt/conda/anaconda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin'

In [124]:
!hdfs dfs -rm -r {HDFS_DIR}/test-output

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=1 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files /mapper_temp.py,/reducer_temp.py \
  -mapper mapper_temp.py \
  -reducer reducer_temp.py \
  -input {HDFS_DIR}/temperature_data.csv \
  -output {HDFS_DIR}/test-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

rm: `/user/root/hw1/test-output': No such file or directory
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.10.2.jar] /tmp/streamjob5002272313849714812.jar tmpDir=null
24/01/30 02:17:30 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 02:17:30 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 02:17:30 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 02:17:30 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 02:17:30 INFO mapred.FileInputFormat: Total input files to process : 1
24/01/30 02:17:31 INFO mapreduce.JobSubmitter: number of splits:9
24/01/30 02:17:31 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1706529591615_0009
24/01/30 02:17:31 INFO conf.Configuration: resource-types.xml not found
24/01/30 02:17:31 INFO resource.ResourceUtils: Unable to find '

In [125]:
!hdfs dfs -ls {HDFS_DIR}/test-output

Found 2 items
-rw-r--r--   1 root hadoop          0 2024-01-30 02:18 /user/root/hw1/test-output/_SUCCESS
-rw-r--r--   1 root hadoop        284 2024-01-30 02:18 /user/root/hw1/test-output/part-00000


In [126]:
!hdfs dfs -cat {HDFS_DIR}/test-output/part-0000* > /results_tem.txt
!head -n 20 /results_tem.txt

Chicago	7.4626437921459665
Dallas	17.57430145217826
Houston	19.935502724120877
Los Angeles	22.517839195979853
New York	12.611734492295556
Philadelphia	14.906933574152164
Phoenix	27.571388159224064
San Antonio	22.48523624013575
San Diego	20.008504635227766
San Jose	17.493455786409804


In [None]:
# part d - TODO: your explorations here with number of reducers

In [127]:
%%timeit
!hdfs dfs -rm -r {HDFS_DIR}/test-output

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=1 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files /mapper_temp.py,/reducer_temp.py \
  -mapper mapper_temp.py \
  -reducer reducer_temp.py \
  -input {HDFS_DIR}/temperature_data.csv \
  -output {HDFS_DIR}/test-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

Deleted /user/root/hw1/test-output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.10.2.jar] /tmp/streamjob2952226784069472598.jar tmpDir=null
24/01/30 03:11:17 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:11:17 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:11:18 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:11:18 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:11:18 INFO mapred.FileInputFormat: Total input files to process : 1
24/01/30 03:11:18 INFO mapreduce.JobSubmitter: number of splits:9
24/01/30 03:11:18 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1706529591615_0010
24/01/30 03:11:18 INFO conf.Configuration: resource-types.xml not found
24/01/30 03:11:18 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
24/0

In [128]:
%%timeit
!hdfs dfs -rm -r {HDFS_DIR}/test-output

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=1 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files /mapper_temp.py,/reducer_temp.py \
  -mapper mapper_temp.py \
  -reducer reducer_temp.py \
  -input {HDFS_DIR}/temperature_data.csv \
  -output {HDFS_DIR}/test-output \
  -numReduceTasks 2 \
  -cmdenv PATH={PATH}

rm: `/user/root/hw1/test-output': No such file or directory
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.10.2.jar] /tmp/streamjob6367409978192137150.jar tmpDir=null
24/01/30 03:15:51 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:15:52 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:15:52 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:15:52 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:15:53 INFO mapred.FileInputFormat: Total input files to process : 1
24/01/30 03:15:54 INFO mapreduce.JobSubmitter: number of splits:9
24/01/30 03:15:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1706529591615_0017
24/01/30 03:15:55 INFO conf.Configuration: resource-types.xml not found
24/01/30 03:15:55 INFO resource.ResourceUtils: Unable to find '

In [129]:
%%timeit
!hdfs dfs -rm -r {HDFS_DIR}/test-output

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=1 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files /mapper_temp.py,/reducer_temp.py \
  -mapper mapper_temp.py \
  -reducer reducer_temp.py \
  -input {HDFS_DIR}/temperature_data.csv \
  -output {HDFS_DIR}/test-output \
  -numReduceTasks 3 \
  -cmdenv PATH={PATH}

Deleted /user/root/hw1/test-output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.10.2.jar] /tmp/streamjob7766429566709621026.jar tmpDir=null
24/01/30 03:21:36 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:21:36 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:21:37 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:21:37 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:21:37 INFO mapred.FileInputFormat: Total input files to process : 1
24/01/30 03:21:37 INFO mapreduce.JobSubmitter: number of splits:9
24/01/30 03:21:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1706529591615_0025
24/01/30 03:21:38 INFO conf.Configuration: resource-types.xml not found
24/01/30 03:21:38 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
24/0

# Question 6: Hadoop Streaming for Analyzing Employee Salary by Department

In this exercise, you'll work with the `employees_data.csv` dataset that contains employee information. Each record in the dataset has three fields - Employee Name, Department, and Salary. The goal is to analyze this data to find the average salary per department, sorted first by department name and then by average salary in descending order.

### Q6 Tasks:
* __a) mapper and reducer scripts development:__ Write two Python scripts, `mapper_employee.py` and `reducer_employee.py`. The mapper script should process each line of the input dataset and output the department and salary. The reducer script should calculate the average salary for each department.

* __b) unit testing:__ Make sure you unit test the running of your mapper and reducer scripts using a small dataset like we did before with `alice_test.txt` or `hw1_test.txt`. This time, you will want to create a small data file. _Challenge: see if you can use Unix commands to output the first 5 lines of the provided dataset to a new test file!_

* __c) run a Hadoop Streaming job:__ Write the correct command to execute the MapReduce job, ensuring the output is sorted first by department and then by average salary in descending order. After running the job, inspect the output to verify correctness.


In [None]:
# part a - Do your work in mapper_employee.py and reducer_employee.py


In [132]:
# part b - TODO: show your unit test here

In [133]:
%%writefile /hw1/employee_test.csv 
Eve,Marketing,119144
Julia,Customer Service,37393
David,Human Resources,59780
Hannah,Engineering,149301
Frank,Customer Service,99634
Bob,IT,140526
Ivan,Human Resources,85833
Bob,Customer Service,74054
Charlie,Customer Service,116957
Grace,Engineering,126260

Writing /hw1/employee_test.csv


In [134]:
!chmod a+x /mapper_employee.py /reducer_employee.py

In [135]:
!cat /hw1/employee_test.csv | /mapper_employee.py

Marketing	119144
Customer Service	37393
Human Resources	59780
Engineering	149301
Customer Service	99634
IT	140526
Human Resources	85833
Customer Service	74054
Customer Service	116957
Engineering	126260


In [142]:
!echo 'Marketing	119144\nCustomer Service	37393\nCustomer Service	74054\nCustomer Service	116957\nCustomer Service	99634\nHuman Resources	59780\nHuman Resources	85833\nEngineering	149301\nEngineering	126260\nIT	140526' | /reducer_employee.py

Marketing	119144.0
Customer Service	82009.5
Human Resources	72806.5
Engineering	137780.5
IT	140526.0


In [None]:
# part c - TODO: write the Hadoop Streaming command to test out your mapper and reducer scripts on the employee dataset


In [143]:
JAR_FILE = "/usr/lib/hadoop-mapreduce/hadoop-streaming.jar"
HDFS_DIR = "/user/root/hw1"
!hdfs dfs -mkdir -p {HDFS_DIR}
!hdfs dfs -rm -r {HDFS_DIR}/employee_data
!hdfs dfs -put /hw1/employee_data.csv {HDFS_DIR}
# store notebook environment path
from os import environ
PATH = environ['PATH']
PATH

rm: `/user/root/hw1/employee_data': No such file or directory
put: `/user/root/hw1/employee_data.csv': File exists


'/opt/conda/anaconda/bin:/opt/conda/anaconda/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/snap/bin'

In [145]:
!hdfs dfs -rm -r {HDFS_DIR}/employee-output

!hadoop jar {JAR_FILE} \
  -D stream.num.map.output.key.fields=1 \
  -D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
  -D mapreduce.partition.keycomparator.options="-k1,1" \
  -files /mapper_employee.py,/reducer_employee.py \
  -mapper mapper_employee.py \
  -reducer reducer_employee.py \
  -input {HDFS_DIR}/employee_data.csv \
  -output {HDFS_DIR}/employee-output \
  -numReduceTasks 1 \
  -cmdenv PATH={PATH}

Deleted /user/root/hw1/employee-output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.10.2.jar] /tmp/streamjob7501998307273624396.jar tmpDir=null
24/01/30 03:59:52 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:59:52 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:59:53 INFO client.RMProxy: Connecting to ResourceManager at cluster-0283-m/10.128.0.2:8032
24/01/30 03:59:53 INFO client.AHSProxy: Connecting to Application History server at cluster-0283-m/10.128.0.2:10200
24/01/30 03:59:53 INFO mapred.FileInputFormat: Total input files to process : 1
24/01/30 03:59:53 INFO mapreduce.JobSubmitter: number of splits:9
24/01/30 03:59:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1706529591615_0036
24/01/30 03:59:54 INFO conf.Configuration: resource-types.xml not found
24/01/30 03:59:54 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.


In [146]:
!hdfs dfs -ls {HDFS_DIR}/employee-output
!hdfs dfs -cat {HDFS_DIR}/employee-output/part-0000* > /results_employee.txt
!head -n 20 /results_employee.txt

Found 2 items
-rw-r--r--   1 root hadoop          0 2024-01-30 04:00 /user/root/hw1/employee-output/_SUCCESS
-rw-r--r--   1 root hadoop        279 2024-01-30 04:00 /user/root/hw1/employee-output/part-00000
Customer Service	90295.63099630996
Engineering	89913.67894343384
Finance	90040.97204876598
Human Resources	89908.50350982751
IT	89895.23108948636
Marketing	89665.25036021265
Operations	89939.54684923292
Product	89907.30719683522
Research	89811.8197848177
Sales	90229.74844158455


# Submission Instructions
You will need to submit a zip file to **Gradescope** containing the following files:
- This notebook HW1.ipynb
- From Question 2, final output of alice_counts.txt AND wordCount.py 
- From Question 5, mapper_temp.py and reducer_temp.py
- From Question 6, mapper_employee.py and reducer_employee.py