
# Exercise 3 (MapReduce in Practice)   &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;     [4 points]
---

For this exercise, you are tasked with writing your own Hadoop MapReduce program in Python and to
run it on the cluster on two provided datasets. The dataset for this exercise consists of freely accessible
data from New York City, NY, USA. One is a [dataset containing parking and camera violations recorded](https://www.kaggle.com/new-york-city/ny-open-parking-and-camera-violations), and the other is a [dataset of taxicab permits (called “medallions”) with their respective license plates](https://www.kaggle.com/new-york-city/ny-medallion-vehicles-and-drivers).
We provide a JupyterLab notebook file to write and execute MapReduce jobs directly on JupyterLab.
You can find the datasets on the cluster under the following locations:


* **local file system & Hadoop file system (HDFS)**  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;/home/adbs21/shared/medallion-vehicles.csv  
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;/home/adbs21/shared/parking-violations.csv  


##  

- ### **a) Write a MapReduce job with “medallion-vehicles.csv” as input and following output:**  
For each company, show the number of license plates for which they received a medallion; make sure to have the following format in your final output:
            name,count_license_plate      
The license plates are stored in the column “DMV License Plate Number”.
Make sure that your program correctly deals with the header, and possible sparse values.


In [1]:
%%file mymrjob1.py
# This will create a local file to run your MapReduce program  

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv 
import logging

log = logging.getLogger(__name__)
# 
#  Below is the skeleton for a MapReduce program in mrjob.
#  Write your own solution here. Be sure that it actually runs successfully.
class MyMRJob1(MRJob):
    
    
    OUTPUT_PROTOCOL = CsvProtocol  # write output as CSV
    
    def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)
        
#   Feel free to rename the functions
    def mapper_mrjob1(self, _, line):
        result = next(csv.reader([line]))
        name = result[1]
        p_no = result[4]
        
        if name == "Name" or p_no == "" or name == "": 
            return
        
        yield name, 1
               
# use of a combiner is optional. It may speed up your job. Be sure that using the combiner preserves the correctness. 
#     def combiner_mrjob1(self,key,valuelist):
        #TODO
        
    def reducer_mrjob1(self,name,value):

        yield None, (name, sum(value))

    def steps(self):
        first_step = MRStep(
            mapper=self.mapper_mrjob1, 
#             combiner=self.combiner_mrjob1, 
            reducer=self.reducer_mrjob1
        )
        # just generate more steps to run a multi-step MR job
        
        return [ first_step ]

if __name__ == '__main__':
    MyMRJob1.run()


Overwriting mymrjob1.py


Running a local MRjob 

In [2]:
! python mymrjob1.py  /home/adbs21/shared/medallion-vehicles-short.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory /tmp/mymrjob1.e11924480.20210425.075336.621293
job output is in /tmp/mymrjob1.e11924480.20210425.075336.621293/output
Streaming final output from /tmp/mymrjob1.e11924480.20210425.075336.621293/output...
" CHAUDRY,TARIQ,J & CHOUDHRY,ABBAS,G",73
"1212 TAXI LLC",192
"16 WHITESTONE CAB LLC",209
"168 CAB CORP",202
"178 CAB CORP",204
"18TH STREET HACKING CORP",231
"1C39 LLC",23
"1C46 LLC",37
"1D89 ACQUISITION LLC",2
"1F75 CORP.",14
"1KHALSA LLC",56
"1KHALSHA LLC",1
"1T15 CORP",83
"1T82 CORP.",16
"2 BIRDS LLC",94
"2 BROS TAXI GROUP LLC",46
"2 BROTHERS CAB CORP",216
"211 TAXI CORP.",99
"2138 TRANS LLC",200
"221 CAB LLC",103
"222 EAST CORP",228
"2510 PARSONS LLC",7
"2C32 LLC",65
"2D23 ACQUISITION LLC",5
"2L73 & 2L74 ACQUISITION LLC",23
"2T58 LLC",19
"2T65 CORP.",14
"2T79 LLC",34
"3 BROTHERS TAXI LLC",127
"333 CAB CORP.",165
"3511 SYSTEMS INC",184
"3511 SYSTE

Running a Hadoop job

In [3]:
! HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/ python mymrjob1.py -r hadoop hdfs:///home/adbs21/shared/medallion-vehicles.csv > output.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/bin...
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 3.0.0
Looking for Hadoop streaming jar in /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/...
Found Hadoop streaming jar: /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
Creating temp directory /tmp/mymrjob1.e11924480.20210425.075632.005884
Copying local files to hdfs:///user/e11924480/tmp/mrjob/mymrjob1.e11924480.20210425.075632.005884/files/...
Running step 1 of 1...
  Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.jar] /tmp/streamjob2116069636177104468.jar tmpDir=null
  Connecting to ResourceManager at c100.local/10.7.0.100:8032
  Con

---

- ### **b) Write a MapReduce job with both datasets as input and following output:**  
For each company with a medallion vehicle, show the number of parking violations they received
for these vehicles. Make sure to have the following format in your final output:
            name,count_violations


In [15]:
%%file mymrjob2.py
# This will create a local file to run your MapReduce program  

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv 
import logging

# 
#  Below is the skeleton for a MapReduce program in mrjob.
#  Write your own solution here. Be sure that it actually runs successfully.

class MyMRJob2(MRJob):
    
 log = logging.getLogger(__name__)
# 
#  Below is the skeleton for a MapReduce program in mrjob.
#  Write your own solution here. Be sure that it actually runs successfully.
class MyMRJob2(MRJob):
    
    
    OUTPUT_PROTOCOL = CsvProtocol  # write output as CSV
    
    def set_up_logging(cls, quiet=False, verbose=False, stream=None):  
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)

#   Feel free to rename the functions
    def mapper_mrjob2(self, _, line):
        #TODO
        
# use of a combiner is optional. It may speed up your job. Be sure that using the combiner preserves the correctness. 
#     def combiner_mrjob2(self,key,valuelist):
        #TODO
        
    def reducer_mrjob2(self,key,valuelist):
         #TODO

    def steps(self):
        first_step = MRStep(
            mapper=self.mapper_mrjob2, 
#             combiner=self.combiner_mrjob2, 
            reducer=self.reducer_mrjob2
        )
        # just generate more steps to run a multi-step MR job
        
        return [ first_step ]

if __name__ == '__main__':
    MyMRJob2.run()

Overwriting mymrjob2.py


Running a local MRjob 

In [None]:
! python mymrjob2.py  /home/adbs21/shared/medallion-vehicles-short.csv /home/adbs21/shared/parking-violations-short.csv

Running a Hadoop job

In [None]:
! HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/ python mymrjob2.py -r hadoop hdfs:///home/adbs21/shared/medallion-vehicles.csv hdfs:///home/adbs21/shared/parking-violations.csv > output2.csv

---

- ### **c) Once your jobs have run successfully on the cluster, use the provided commands in the notebook to look up the counters of your Mapreduce job(s).**  
Alternatively, you can also read the counters from the output cells above, after a job has succesfully run on the cluster.
Use the counters to determine for each job what the replication rate and the communication cost was, as well as the input and output size. For this you will need to determine the job ids. These are shown in the output when running a job.  

**Note:** _Be sure to replace the dummy job ID below with the real one you get after running it on the cluster!_

**Info on the relevant counters**:

MAP_INPUT_RECORDS  .... _indicates the sum of all of input records  received by mappers_  
MAP_OUTPUT_RECORDS  .... _indicates the sum of all output records emitted by mappers_  
MAP_OUTPUT_BYTES  .... _indicates the total size in bytes of the records output by all the mappers_  
BYTES_READ  .... _indicates the number of bytes read by the MapReduce Job from the filesystem_  
BYTES_WRITTEN  .... _indicates the number of bytes written by the MapReduce Job to the filesystem_

If you want to use more counters, you can find a list in the documentation:  
https://hadoop.apache.org/docs/r2.10.1/api/org/apache/hadoop/mapreduce/TaskCounter.html  
https://hadoop.apache.org/docs/r2.10.1/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormatCounter.html  
https://hadoop.apache.org/docs/r2.10.1/api/org/apache/hadoop/mapreduce/lib/output/FileOutputFormatCounter.html


In [4]:
job_id = "job_1596895008206_20293" # replace this value with the one from your job. Using the unaltered value will not work
!mapred job -counter $job_id org.apache.hadoop.mapreduce.TaskCounter MAP_INPUT_RECORDS

21/04/25 10:00:53 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
21/04/25 10:00:54 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
13518773


In [5]:
!mapred job -counter $job_id org.apache.hadoop.mapreduce.TaskCounter MAP_OUTPUT_RECORDS

21/04/25 10:01:00 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
21/04/25 10:01:01 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
13518772


In [6]:
!mapred job -counter $job_id org.apache.hadoop.mapreduce.TaskCounter MAP_OUTPUT_BYTES

21/04/25 10:01:07 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
21/04/25 10:01:08 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
289954224


In [7]:
!mapred job -counter $job_id org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter BYTES_READ

21/04/25 10:01:13 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
21/04/25 10:01:14 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2278886238


In [8]:
!mapred job -counter $job_id org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter BYTES_WRITTEN

21/04/25 10:01:18 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
21/04/25 10:01:18 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
258967


### Replication Rates and Input and Output Sizes:

* Replication Rate for Task 1. :   **13518772/17**
* Communication Cost:  **13518773 + 2*13518772 +258967**
*  Input Size for Task 1. :   **2278886238**
* Output Size for Task 1. :  **258967**

Note: if your job had multiple steps, just state the replication rates  for each step. 
Make sure to compute the cummulative costs for the other measures, though, especially for the communication cost. For a multi-step MR job, **also count the communication between steps** towards the total communication cost of the job. 
* Replication Rate for Task 2. :  **YOUR ANSWER HERE**
* Communication Cost:  **YOUR ANSWER HERE**
* Input Size for Task 2. :  **YOUR ANSWER HERE**
* Output Size for Task 2. :  **YOUR ANSWER HERE**

For more counters, you can check the documentation, under https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/TaskCounter and https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/output/FileOutputFormatCounter.html

---
## **Your solution for Exercise 3 will consist of:**  
*  This notebook, filled with your solution, including the information on the replication rate, and the input and output sizes. 
_Submissions which do not run on the cluster will not be counted as valid._