# Exercise 3 (MapReduce in Practice)   &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;     [4 points]
---

For this exercise, you are tasked with writing your own Hadoop MapReduce program in Python and to
run it on the cluster on the provided datasets.   
You may look at the exercise sheet for all the information on the datasets and this task.


**Note:** *When accessing files in the HDFS, you need to prepend “hdfs://” to the address string. For
quick testing of solutions, there are smaller versions of large datasets (> 1 GB) on the local file-system,
ending with “_small.csv”. Make sure that your MapReduce job also works on the complete dataset
on the cluster.*

In [1]:
# Saving variables to access the file locations
articles='/home/adbs22/shared/hm/articles.csv'
customers='/home/adbs22/shared/hm/customers.csv'
transactions='/home/adbs22/shared/hm/transactions.csv'
transactions_small ='/home/adbs22/shared/hm/transactions_small.csv'


- ### **a) Write a MapReduce job with “articles.csv” as input and following output:**  

For each garment group, show the most frequent product, the second most frequent section and the most frequent department it appears inside the article.csv file; make sure output has the following schema:

            garment_group_name, prod_name, section_name,  department_name

The product names are stored in "prod_name", the deparment name in "department_name", the garment group in "garment_group_name" and the section in "section_name". In case that there are multiple departments, garment groups or sections with the same number of occurences, you may resolve these conflicts randomly, i.e. pick one of them arbitrarily. In case there is only one section, or all sections appear with the same frequency, just pick the most frequent one, and resolve conflicts randomly. 

Make sure that your program correctly deals with the header, and possible sparse values.

In [40]:
%%file mymrjob1.py
# This will create a local file to run your MapReduce program
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv
import logging

log = logging.getLogger(__name__)


#
#  Below is the skeleton for a MapReduce program in mrjob.
#  Write your own solution here. Be sure that it actually runs successfully.
class MyMRJob1(MRJob):
    OUTPUT_PROTOCOL = CsvProtocol  # write output as CSV

    def set_up_logging(cls, quiet=False, verbose=False, stream=None):
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)

    def mapper_count_cols(self, _, line):
        # parse CSV
        values = list(csv.reader([line]))[0]

        # remove heading
        if values[0] == "article_id":
            return

        prod_name = values[2]
        department_name = values[15]
        section_name = values[21]
        garment_group_name = values[23]

        yield (garment_group_name, "prod_name", prod_name), 1
        yield (garment_group_name, "section_name", section_name), 1
        yield (garment_group_name, "department_name", department_name), 1

    def reducer_count_cols(self, key, value_list):
        yield (key[0], key[1]), (key[2], sum(value_list))

    # Job 2

    def reducer_filter(self, key, value_list):
        max_elements = [(None, 0), (None, 0)]
        for value in value_list:
            if value[1] > max_elements[0][1]:
                max_elements[1] = max_elements[0]
                max_elements[0] = value

            elif value[1] > max_elements[1][1]:
                max_elements[1] = value

        yield key[0], (key[1], max_elements[1 if key[1] == "section_name" else 0])

    def reducer_result_builder(self, key, value_list):
        prod_name, section_name, department_name = None, None, None
        for value in value_list:
            if value[0] == "prod_name":
                prod_name = value[1][0]
            elif value[0] == "section_name":
                section_name = value[1][0]
            else:
                department_name = value[1][0]

        yield None, [f"{key},{prod_name},{section_name},{department_name}"]

    def steps(self):
        count_cols_step = MRStep(
            mapper=self.mapper_count_cols,
            # combiner=self.combiner_count_cols,
            reducer=self.reducer_count_cols
        )

        filter_step = MRStep(reducer=self.reducer_filter)

        result_builder_step = MRStep(reducer=self.reducer_result_builder)

        return [count_cols_step, filter_step, result_builder_step]

    def combiner_count_cols(self, key, value_list):
        pass


if __name__ == '__main__':
    MyMRJob1.run()


Overwriting mymrjob1.py


Running a local MRjob 

In [26]:
!python3.6 mymrjob1.py  $articles

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/mymrjob1.e12132344.20220407.131009.391851
Running step 1 of 3...
Running step 2 of 3...
Running step 3 of 3...
job output is in /tmp/mymrjob1.e12132344.20220407.131009.391851/output
Streaming final output from /tmp/mymrjob1.e12132344.20220407.131009.391851/output...
"Accessories,SBC OWN,Womens Big accessories,Jewellery"
"Blouses,Despacito,Divided Collection,Blouse"
"Dressed,Mariette,Men Suits & Tailoring,Suit"
"Dresses Ladies,Bowie,Womens Everyday Collection,Dress"
"Dresses/Skirts girls,Elva Dress,Young Girl,Kids Girl Dresses"
"Jersey Basic,MY,Womens Everyday Basics,Jersey Basic"
"Jersey Fancy,Dragonfly dress,Young Girl,Jersey"
"Knitwear,TP Paddington Sweater,Womens Everyday Collection,Knitwear"
"Outdoor,SB Cliff vest TP,Womens Jackets,Outwear"
"Shirts,TP Princeton shirt,Kids Boy,Shirt"
"Shoes,Hannah basic ballerina SG,Womens Shoes,Kids Girl Shoes"
"Shorts,Pelle shor

Running a Hadoop job

In [41]:
! HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/  python3.6 mymrjob1.py -r hadoop hdfs://$articles > output.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/bin...
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 3.0.0
Looking for Hadoop streaming jar in /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/...
Found Hadoop streaming jar: /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
Creating temp directory /tmp/mymrjob1.e12132344.20220407.132617.229063
uploading working dir files to hdfs:///user/e12132344/tmp/mrjob/mymrjob1.e12132344.20220407.132617.229063/files/wd...
Copying other local files to hdfs:///user/e12132344/tmp/mrjob/mymrjob1.e12132344.20220407.132617.229063/files/
Running step 1 of 3...
  Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.j

---

- ### **b) Write a MapReduce job with all three datasets as input and following output:**  
For all customers older than 30 years, show the number of transactions items they were involved in with articles from department with name 'Jersey Basic' or 'Shirt'. 


Make sure to have the following format in your final output:

            customer_id,count_transactions


In [55]:
%%file mymrjob2.py
# This will create a local file to run your MapReduce program

from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.util import log_to_stream, log_to_null
from mr3px.csvprotocol import CsvProtocol
import csv
import logging
import os


#
#  Below is the skeleton for a MapReduce program in mrjob.
#  Write your own solution here. Be sure that it actually runs successfully.

class MyMRJob2(MRJob):
    log = logging.getLogger(__name__)


#
#  Below is the skeleton for a MapReduce program in mrjob.
#  Write your own solution here. Be sure that it actually runs successfully.
class MyMRJob2(MRJob):
    OUTPUT_PROTOCOL = CsvProtocol  # write output as CSV

    def set_up_logging(cls, quiet=False, verbose=False, stream=None):
        log_to_stream(name='mrjob', debug=verbose, stream=stream)
        log_to_stream(name='__main__', debug=verbose, stream=stream)

    #   Feel free to rename the functions
    def mapper_mrjob2(self, _, line):
        # parse CSV
        values = list(csv.reader([line]))[0]

        # articles
        if len(values) == 25:
            if values[0] == "article_id":
                return
            if values[15] == "Jersey Basic" or values[15] == "Shirt":
                yield ("a", values[0]), True

        # transactions
        elif len(values) == 5:
            if values[0] == "t_dat":
                return

            yield ("a", values[2]), ("c", values[1])

        elif len(values) == 7:
            if values[0] == "customer_id":
                return

            if values[5] != "" and int(values[5]) > 30:
                yield ("c", values[0]), True

    # use of a combiner is optional. It may speed up your job. Be sure that using the combiner preserves the correctness.
    #     def combiner_mrjob2(self,key,valuelist):
    # TODO

    def reducer_mrjob2(self, key, value_list):

        if key[0] == "c":
            for v in value_list:
                yield key, v
        else:
            in_depart = False
            new_keys = []
            for value in value_list:
                if isinstance(value, bool):
                    in_depart = True
                else:
                    new_keys.append(value)
            if in_depart:
                for new_key in new_keys:
                    yield new_key, 1

    # Job 2

    def second_reducer_mrjob2(self, key, value_list):
        transaction_sum = 0
        older_30 = False
        for value in value_list:
            if isinstance(value, bool):
                older_30 = True
            else:
                transaction_sum += 1
        if older_30 and transaction_sum > 0:
            yield None, [key[1], transaction_sum]



    def steps(self):
        first_step = MRStep(
            mapper=self.mapper_mrjob2,
            #             combiner=self.combiner_mrjob2,
            reducer=self.reducer_mrjob2
        )

        second_step = MRStep(
            reducer=self.second_reducer_mrjob2
        )
        # just generate more steps to run a multi-step MR job

        return [first_step, second_step]


if __name__ == '__main__':
    MyMRJob2.run()


Overwriting mymrjob2.py


In [13]:
! python3.6 mymrjob2.py  $articles $transactions_small $customers

No configs specified for inline runner
"000da7cae0959d00f079d2d36f8cd7065fc91c685cd9e9b44c5f8052b03fe285",1
"001c1f8d70782f450524d3b3f404474dbd4a7d0d2ad78a0ec7db76248ce08346",1
"0021ce2be745fe55f9b2bc784dbfd0e374c315ccd15aab3343edd335203edf3d",1
"003cbebaf7fa9783dfaec0c0f7b0ccca210d4ebb42be547d93e0e8e91246c370",1
"004998d68605c42feef0fe1914569bceeeae6de0e300bb68f98ab6e3823994d9",1
"0077b3341cb4d72cfc3854fa9c2c607e42aa8279181e0a52f8c3bfc74b1d8588",1
"007adf119774c45757e7452d3bd71ff2a789af95b0b5f9f3c70b59e45030c062",1
"009514e242e70ab02326b52600b4b117a412eb21093b94fef93effe5a6935981",1
"00ac5119d21281802ecd309e8c7fa0b598e0be861f21caadefd9bc8cf3f87eed",1
"00b6ec8613e51d8eadc5157f5a12ae1366ca29da7d44cc8d0ae1e667dea6f268",1
"00c9667f054f0584c1068505bed6737a6e4bfce73656f3e56449e59c3f545178",1
"00edd27a63db7b6a0237cd258976dd0e7f1b79a2aaba3fcdcd80abdccf01af9d",1
"0103f9ab1ef1592e19226a7f37a61ad6078d055d2e5516a15a78dd227f54b4ce",1
"01065704cb2f5284fa9fbabdd4e7b8a84df555660ba533c45253b4383815dfe

In [56]:
! HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/  python3.6  mymrjob2.py -r hadoop hdfs://$articles hdfs://$transactions hdfs://$customers > output2.csv

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/bin...
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 3.0.0
Looking for Hadoop streaming jar in /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/...
Found Hadoop streaming jar: /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar
Creating temp directory /tmp/mymrjob2.e12132344.20220407.133049.646963
uploading working dir files to hdfs:///user/e12132344/tmp/mrjob/mymrjob2.e12132344.20220407.133049.646963/files/wd...
Copying other local files to hdfs:///user/e12132344/tmp/mrjob/mymrjob2.e12132344.20220407.133049.646963/files/
Running step 1 of 2...
  Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
  packageJobJar: [] [/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/hadoop-streaming-3.0.0-cdh6.3.2.j

---

- ### **c) Once your jobs have run successfully on the cluster, use the provided commands in the notebook to look up the counters of your Mapreduce job(s).**  
Alternatively, you can also read the counters from the output cells above, after a job has succesfully run on the cluster.
Use the counters to determine for each job what the replication rate was, as well as the input and output size. Note: for this you will need to determine the job ids. These are shown in the output when running a job.  

**Note:** _Be sure to replace the dummy job ID below with the real one you get after running it on the cluster!_


**Info on the relevant counters**:

MAP_INPUT_RECORDS  .... _indicates the sum of all of input records  received by mappers_  
MAP_OUTPUT_RECORDS  .... _indicates the sum of all output records emitted by mappers_  
MAP_OUTPUT_BYTES  .... _indicates the total size in bytes of the records output by all the mappers_  
BYTES_READ  .... _indicates the number of bytes read by the MapReduce Job from the filesystem_  
BYTES_WRITTEN  .... _indicates the number of bytes written by the MapReduce Job to the filesystem_

If you want to use more counters, you can find a list in the documentation:  
https://hadoop.apache.org/docs/r2.10.1/api/org/apache/hadoop/mapreduce/TaskCounter.html  
https://hadoop.apache.org/docs/r2.10.1/api/org/apache/hadoop/mapreduce/lib/input/FileInputFormatCounter.html  
https://hadoop.apache.org/docs/r2.10.1/api/org/apache/hadoop/mapreduce/lib/output/FileOutputFormatCounter.html

In [61]:
job_id = "job_1647527369838_1310" # this is only a dummy value, replace with the job id you want to know about
!mapred job -counter $job_id org.apache.hadoop.mapreduce.TaskCounter MAP_INPUT_RECORDS

22/04/07 15:42:03 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
22/04/07 15:42:04 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2314690


In [62]:
!mapred job -counter $job_id org.apache.hadoop.mapreduce.TaskCounter MAP_OUTPUT_RECORDS

22/04/07 15:42:06 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
22/04/07 15:42:07 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
2314690


In [63]:

!mapred job -counter $job_id org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter BYTES_READ

22/04/07 15:42:08 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
22/04/07 15:42:09 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
178097059


In [64]:
!mapred job -counter $job_id org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter BYTES_WRITTEN

22/04/07 15:42:11 INFO client.RMProxy: Connecting to ResourceManager at c100.local/10.7.0.100:8032
22/04/07 15:42:12 INFO mapred.ClientServiceDelegate: Application state is completed. FinalApplicationStatus=SUCCEEDED. Redirecting to job history server
17092019


### Replication Rates and Input and Output Sizes:

* Replication Rate for Task 1. : 3, 1, 1
* Communication Cost:  ((36151869 + 2488054) + (2489211 + 3511) + (5267 + 1240)) B = 41139152 B  $\approx$ 41 MB
* Input Size for Task 1. :  36151869 B $\approx$ 36 MB
* Output Size for Task 1. :  1240 B = 1.24 kB

Note: if your job had multiple steps, just state the replication rates for each step. Make sure to compute the cummulative costs for the other measures, though. For the communication cost of a multi-step MR job, make sure you do not count intermediate output sizes. For more info, please check the "Clarification" section of the Preview slides for Block 2.


* Replication Rate for Task 2. : $\approx$ 1, 1
* Communication Cost: ((3732969913 + 178067269 ) + (178097059 + 17092019)) B  $\approx$ 4.1 GB
* Input Size for Task 2. :  178097059 B $\approx$ 3.7 GB
* Output Size for Task 2. :  17092019 B $\approx$ 17 MB

For more counters, you can check the documentation, under https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/TaskCounter and https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/lib/output/FileOutputFormatCounter.html

---
## **Your solution for Exercise 3 will consist of:**  
*  This notebook, filled with your solution, including the information on the replication rate, and the input and output sizes. 
