# BDCC HW: Task 3

Task: Implement a MapReduce job that identifies the 100 most followed users in the dataset.

Hint: We are not interested in creating lists of followers here. We just need to count the followers of each user in the Reduce phase. This is called the in-degree of a user. Moreover, a temporary data structure D of fixed 100 positions is required. This structure will be initially filled with the first 100 users that are processed in the Reduce phase. Then, the next users (101, 102, 103…) will replace a user in D, only if their in-degree is greater than the in-degree of the least-followed user in D. Notice that the ideal data structure for D is a min-heap (https://docs.python.org/3/library/heapq.html). However, it is totally acceptable to appropriately use a data structure such as a dictionary, or a list.

In [1]:
%%file src/task3.py
#!/usr/bin/env python3

from mrjob.job import MRJob
from mrjob.step import MRStep

import heapq

TOP_FOLLOWERS = 100

# Implement a MapReduce job that creates a list of followees for each user in the dataset.
class MostFollowed(MRJob):

    # Arg 1: self: the class itself (this)
    # Arg 2: Input key to the map function (here:none)
    # Arg 3: Input value to the map function (here:one line from the input file)
    def mapper(self, _, line):
        # yield (followee, 1) pair
        (follower, followee) = line.split()
        yield(followee, 1)


    def combiner(self, followee, follower_count):
        # yield (followee, sum of followers)
        yield(followee, sum(follower_count))


    def reducer_init(self):
        self.heap = []


    # Arg 1: self: the class itself (this)
    # Arg 2: Input key to the reduce function (here: the key that was emitted by the mapper)
    # Arg 3: Input value to the reduce function (here: a generator object; something like a
    # sorted list of ALL values associated with the same key)
    def reducer(self, followee, follower_count):
        heapq.heappush(self.heap, (sum(follower_count), followee))
        
        if len(self.heap) > TOP_FOLLOWERS:
            heapq.heappop(self.heap)


    def reducer_final(self):
        for (follower_count, followee) in self.heap:
            yield (followee, follower_count)


    # Step 2: Run the TOP_FOLLOWERS
    # The mapper outputs "TOP_FOLLOWERS" as the key and (follower_count, followee) as value
    # Put the count as key so it can be used directly as input to heapq.nlargest()
    def top_mapper(self, followee, follower_count):
       yield (str(TOP_FOLLOWERS), (follower_count, followee))


    # The finds the largest of the values.
    def top_reducer(self ,_, follower_counts):
        for follower_count in heapq.nlargest(TOP_FOLLOWERS, follower_counts):
            yield (follower_count[1], follower_count[0])


    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer_init=self.reducer_init,
                   reducer=self.reducer,
                   reducer_final=self.reducer_final
                   ),

            MRStep(mapper=self.top_mapper,
                   reducer=self.top_reducer) 
        ]


if __name__ == '__main__':
    MostFollowed.run()


Overwriting src/task3.py


### Run in Standalone Mode

In [2]:
!python3 src/task3.py data/graph.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/task3.bdccuser.20210608.111609.612780
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/task3.bdccuser.20210608.111609.612780/output
Streaming final output from /tmp/task3.bdccuser.20210608.111609.612780/output...
"663931"	4256
"482709"	3137
"663560"	2602
"357531"	1970
"1034018"	1599
"664789"	1592
"663606"	1491
"155432"	963
"622420"	956
"280436"	907
"115241"	871
"665990"	836
"664320"	828
"115674"	812
"1120568"	798
"650922"	787
"666763"	785
"681308"	716
"402217"	698
"515984"	692
"108624"	664
"682469"	663
"681398"	639
"670310"	625
"682734"	618
"663683"	609
"280408"	607
"670353"	576
"655586"	572
"663571"	548
"401877"	543
"667359"	538
"670923"	521
"1032030"	503
"17405"	501
"663852"	499
"663534"	499
"656427"	499
"670279"	494
"254927"	492
"1095899"	491
"667116"	481
"642336"	465
"678519"	458
"663579"	458
"480826"	446
"105478"	428
"686285"	422
"697499"	417


### Run in the Hadoop cluster in a fully/pseudo distributed mode

In [3]:
!python3 src/task3.py -r hadoop data/graph.txt -o task3_output

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /home/hdoop/hadoop-3.2.1/bin...
Found hadoop binary: /home/hdoop/hadoop-3.2.1/bin/hadoop
Using Hadoop version 3.2.1
Looking for Hadoop streaming jar in /home/hdoop/hadoop-3.2.1...
Found Hadoop streaming jar: /home/hdoop/hadoop-3.2.1/share/hadoop/tools/lib/hadoop-streaming-3.2.1.jar
Creating temp directory /tmp/task3.bdccuser.20210608.112139.337160
uploading working dir files to hdfs:///user/bdccuser/tmp/mrjob/task3.bdccuser.20210608.112139.337160/files/wd...
Copying other local files to hdfs:///user/bdccuser/tmp/mrjob/task3.bdccuser.20210608.112139.337160/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar6352124431624138651/] [] /tmp/streamjob4544271477495990304.jar tmpDir=null
  Connecting to ResourceManager at /127.0.0.1:8032
  Connecting to ResourceManager at /127.0.0.1:8032
  Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/bdccuser/.sta

### Copy the output from HDFS to local file system.

In [4]:
!hdfs dfs -copyToLocal task3_output /home/bdccuser/bdcc-assignment1/output/task3

2021-06-08 14:24:43,242 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
