# Homework 2 - MapReduce

There is only one task in this homework. You are asked to implement the Social Triangle example discussed in class. In particular, given the email dataset, please list all "reciprocal" relationships in the company. Recall that:

If A emails B and B emails A, then A and B is *reciprocal*.

If A emails B but B doesn’t email A, then A and B is *directed*.

**Dataset:** We will use a subset of the open [Enron Email Dataset](https://www.cs.cmu.edu/~./enron/ "Enron Email Dataset"), which contains approximately 10,000 simplified email headers from the Enron Corporation. A subset of the data is available as **enron_mails_small.csv** as part of this notebook. The file contains 3 columns *Date*, *From*, and *To*. Their description is as follows:

|Column name|Description|
|--|--|
|Date |The date and time of the email, in the format YYYY-MM-DD hh-mm-ss, <br />e.g. "1998-10-30 07:43:00" |
|From |The sender email address, <br />e.g. "mark.taylor@enron.com" |
|To | A list of recipients' email addresses separated by semicolons ';', <br />e.g. "jennifer.fraser@enron.com;jeffrey.hodge@enron.com" |

Note that, we only care about users employed by Enron, i.e. only relationships where email addresses end with *'@enron.com'*.

The expected output is also provided below. For each reciprocal relationship, please output a tuple consisting of two strings. The first one is always **'reciprocal'**. And the second one is a string showing the name of the two person in the following format: **'Jane Doe : John Doe'**. The names should be presented in the lexical order, i.e. there will not be a 'John Doe : Jane Doe' since 'Jane' is ordered before 'John.

Though the dataset only contains email addresses, not actual names, we're assuming that the email aliases were created based on their name. For example:

|Email Address|Converted Name|
|--|--|
|mark.taylor@enron.com|Mark Taylor|
|alan.aronowitz@enron.com|Alan Aronowitz|
|marc.r.cutler@enron.com|Marc R Cutler|
|hugh@enron.com|Hugh|

You must complete the **MRFindReciprocal** class below (which is inherited from MRJob), and your code must run with the **mapreduce.py** package **mr.runJob()** as provided.

In [1]:
!pip install mrjob

from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Collecting mrjob
[?25l  Downloading https://files.pythonhosted.org/packages/8e/58/fc28ab743aba16e90736ad4e29694bd2adaf7b879376ff149306d50c4e90/mrjob-0.7.4-py2.py3-none-any.whl (439kB)
[K     |▊                               | 10kB 2.3MB/s eta 0:00:01[K     |█▌                              | 20kB 3.5MB/s eta 0:00:01[K     |██▎                             | 30kB 3.6MB/s eta 0:00:01[K     |███                             | 40kB 2.5MB/s eta 0:00:01[K     |███▊                            | 51kB 3.0MB/s eta 0:00:01[K     |████▌                           | 61kB 3.5MB/s eta 0:00:01[K     |█████▏                          | 71kB 3.7MB/s eta 0:00:01[K     |██████                          | 81kB 3.9MB/s eta 0:00:01[K     |██████▊                         | 92kB 4.2MB/s eta 0:00:01[K     |███████▌                        | 102kB 4.3MB/s eta 0:00:01[K     |████████▏                       | 112kB 4.3MB/s eta 0:00:01[K     |█████████                       | 122kB 4.3MB/s eta 0:0

In [2]:
drive.CreateFile({'id': '1sq4-zXn2Z82mdLSBBegEgsUsfqtgza-C'}).GetContentFile('mapreduce.py')
drive.CreateFile({'id': '1It6GP8O2JqkmUtZKbYp1kpwpuwOXlLps'}).GetContentFile('enron_mails_small.csv')

In [122]:
import csv
import mapreduce as mr
from mrjob.job import MRJob
from mrjob.step import MRStep

################################
### YOUR WORK SHOULD BE HERE ###
################################
class MRFindReciprocal(MRJob):
  '''
  PLEASE COMPLETE THIS CLASS. THIS SHOULD BE THE ONLY PLACE THAT YOU CAN EDIT.
  THE INPUT OF YOUR MAPREDUCE JOB WOULD BE LINE OF TEXT WITHOUT '\n'.
  '''
  
  def mapper1(self, _, line):

    values = line.strip().split(',')
    from_name = values[1]
    To_name = values[2]
    from_name_list = from_name.split(',')

    for sender in from_name_list :
      if "@enron.com" in sender:
        from_name = sender.split('@')[0] 
        from_fname = from_name.split('.')[0] 
        from_lname = from_name.split('.')[-1]
      #from_lname= from_name.split('.')] 
        from_name = from_fname.capitalize()  +" "+ from_lname.capitalize() 
        To_list = To_name.split(';')
        for item in To_list: 
          if "@enron.com" in item :
            To_name = item.split('@')[0] 
            To_name_fname  = To_name.split('.')[0]
            To_name_lname = To_name.split('.')[-1]
        #To_name_lname= To_name.split('.')[1]
            To_name =To_name_fname.capitalize()  +" "+ To_name_lname.capitalize() 
            yield ([from_name,To_name])

  def reducer1(self, from_name,To_name):
    yield (from_name, set(To_name))

  def __init__(self,args=[]):
    self.storage = {}

  def mapper2(self , sender , recivers):
    self.storage[sender] = recivers
    #dictionary.values() = recivers
    for reciver in recivers : 
      yield (reciver,sender)

  def reducer2(self,reciver,sender):
    if reciver in self.storage :
      setone_settwo = (set(sender),self.storage[reciver])
      yield (reciver, setone_settwo)
   # else:
    #  yield (reciver, set(sender) , [])

  def mapper3(self,reciver,setone_settwo):
    setone , settwo = setone_settwo

    for y in setone:
      if y in settwo:
       # print('reciprocal', reciver+''+ y)
        tuple =(reciver, y)
        first , second = sorted(tuple)
        yield 'reciprocal', first + ':' + second
       #yield (reciver+':'+ y)
  
   
  def reducer3(slf ,Recep, str_y1_y2 ):
   # str_y1_y2 = list(str_y1_y2)
   
    str_y1_y2 = sorted(set(str_y1_y2))
    
    for any in str_y1_y2:
      first, second = any.split(':')
      if first != second :
        yield 'reciprocal',(any)

  def steps(self):
      return [MRStep(mapper=self.mapper1, reducer=self.reducer1),
              MRStep(mapper_init= self.__init__ , mapper=self.mapper2, reducer=self.reducer2),
              MRStep(mapper=self.mapper3 ,reducer=self.reducer3)
             ]


###################################
### DO NOT EDIT BELOW THIS LINE ###
###################################
job = MRFindReciprocal(args=[])
with open('enron_mails_small.csv', 'r') as fi:
  next(fi)
  output = list(mr.runJob(enumerate(map(lambda x: x.strip(), fi)), job))

print(len(output))
output

35


[('reciprocal', 'Brenda Whitehead:Elizabeth Sager'),
 ('reciprocal', 'Carol Clair:Debra Perlingiere'),
 ('reciprocal', 'Carol Clair:Mark Taylor'),
 ('reciprocal', 'Carol Clair:Richard Sanders'),
 ('reciprocal', 'Carol Clair:Sara Shackleton'),
 ('reciprocal', 'Carol Clair:Tana Jones'),
 ('reciprocal', 'Debra Perlingiere:Kevin Ruscitti'),
 ('reciprocal', 'Drew Fossum:Susan Scott'),
 ('reciprocal', 'Elizabeth Sager:Janette Elbertson'),
 ('reciprocal', 'Elizabeth Sager:Mark Haedicke'),
 ('reciprocal', 'Elizabeth Sager:Mark Taylor'),
 ('reciprocal', 'Elizabeth Sager:Richard Sanders'),
 ('reciprocal', 'Eric Bass:Susan Scott'),
 ('reciprocal', 'Fletcher Sturm:Greg Whalley'),
 ('reciprocal', 'Fletcher Sturm:Sally Beck'),
 ('reciprocal', 'Gerald Nemec:Susan Scott'),
 ('reciprocal', 'Grant Masson:Vince Kaminski'),
 ('reciprocal', 'Greg Whalley:Richard Sanders'),
 ('reciprocal', 'Janette Elbertson:Mark Taylor'),
 ('reciprocal', 'Janette Elbertson:Richard Sanders'),
 ('reciprocal', 'Liz Taylor:Mar