# Coursework 2: Data Processing

## Task 1
This coursework will assess your understanding of using NoSQL to store and retrieve data.  You will perform operations on data from the Enron email dataset in a MongoDB database, and write a report detailing the suitability of different types of databases for data science applications.  You will be required to run code to answer the given questions in the Jupyter notebook provided, and write a report describing alternative approaches to using MongoDB.

Download the JSON version of the Enron data (using the “Download as zip” to download the data file from http://edshare.soton.ac.uk/19548/, the file is about 380MB) and import into a collection called messages in a database called enron.  You do not need to set up any authentication.  In the Jupyter notebook provided, perform the following tasks, using the Python PyMongo library.

Answers should be efficient in terms of speed.  Answers which are less efficient will not get full marks.

In [1]:
# I used MongoDB v4.0.3 in this coursework

In [1]:
import pymongo
from pymongo import MongoClient
from datetime import datetime
from pprint import pprint
import pandas as pd
import re

In [2]:
# %%bash
# mongoimport --db enron --collection messages --file ./data/messages.json

### 1)
Write a function which returns a MongoDB connection object to the "messages" collection. [4 points] 

In [3]:
def get_collection():
    """
    Connects to the server, and returns a collection object
    of the `messages` collection in the `enron` database
    """
    # YOUR CODE HERE
    client = MongoClient('mongodb://localhost:27017')
    db = client.enron
    collection = db.messages

    return collection

In [4]:
collection = get_collection()

### 2)

Write a function which returns the amount of emails in the messages collection in total. [4 points] 

In [5]:
def get_amount_of_messages(collection):
    """
    :param collection A PyMongo collection object
    :return the amount of documents in the collection
    """    
    # YOUR CODE HERE
    messages_amount = collection.count_documents({})
    
    return messages_amount

In [6]:
# Test
messages_amount = get_amount_of_messages(collection)
print('The amount of messages is: ', messages_amount)

The amount of messages is:  501513


### 3) 

Write a function which returns each person who was BCCed on an email.  Include each person only once, and display only their name according to the X-To header. [4 points] 



In [21]:
def get_bcced_people(collection):
    """
    :param collection A PyMongo collection object
    :return the names of the people who have received an email by BCC
    """    
    # YOUR CODE HERE
    filter = {'headers.Bcc' : {'$exists' : True}}
    cursor = collection.find(filter)
    names = cursor.distinct('headers.X-bcc')
     
    bcced_people = set([])
    for name in names:
        if name != '':
            name = name.split('>,')
            for item in name:
                if item != '':
                    pattern = re.compile(r'[a-zA-Z, .]+')
                    item = pattern.match(item).group(0).strip()
                    bcced_people.add(item)
    
    return  bcced_people

In [22]:
# Test
bcced_people = get_bcced_people(collection)
print('Number of people Bcc\'ed: ',len(bcced_people),'\n')
pprint(bcced_people)

Number of people Bcc'ed:  58 

{'Arora, Harry',
 'Audrey Robertson',
 'Barrow, Cynthia',
 'Batista, Daniel',
 'Beck, Sally',
 'Benson, Robert',
 'Beth Apollo',
 'Carr, James',
 'Castano, Marianne',
 'Cebryk, Doug',
 'Choate, Heather',
 'Daily, Pamela',
 'Davis, Mark Dana',
 'Denny, Jennifer',
 'Dinari, Sabra L.',
 'Eisenstein, Arnold L.',
 'Ellenberg, Mark',
 'Evans, Carolyn',
 'Gadd, Eric',
 'Germany, Chris',
 'Gray, Barbara N.',
 'Greg Piper',
 'Harris, Steven',
 'Heard, Marie',
 'Issler, Paulo',
 'Jason Sokolov',
 'Joannie Williamson',
 'Kaminski, Vince J',
 'Kilmer III, Robert',
 'Lagrasta, Fred',
 'Lay, Kenneth',
 'Lee, Bob',
 'Majed Nachawati',
 'Margaret Daffin',
 'Mark Breese',
 'Mark Palmer',
 'Palmer, Mark A.',
 'Parks, Joe',
 'Patti Thompson',
 'Phil Lowry',
 'Rahaim, Christian',
 'Ratliff, Dale',
 'Robertson, Audrey',
 'Sanchez, Christina',
 'Schaffer, Brian',
 'Schoolcraft, Darrell',
 'Schwieger, Jim',
 'Steve Hotte',
 'Steven J Kean',
 'Storey, Geoff',
 'Sturm, Fletcher J

### 4)

Write a function with parameter subject, which gets all emails in a thread with that parameter, and orders them by date (ascending). “An email thread is an email message that includes a running list of all the succeeding replies starting with the original email.”, check for detail descriptions at https://www.techopedia.com/definition/1503/email-thread [4 points]

In [9]:
def get_emails_in_thread(collection, subject):
    """
    :param collection A PyMongo collection object
    :return All emails in the thread with that subject
    """    
    # YOUR CODE HERE
    match = {
        "$match":{
            "headers.Subject" :
            {"$regex": subject}
        }
    }
    project = {
        "$addFields":
            {
                "new_date":{
                    "$dateFromString":{
                        "dateString": {"$substr":["$headers.Date", 0, 30]}
                        }
                    }
            }
    }
    sort = {
        "$sort": {
            "new_date": pymongo.ASCENDING
        }
    }
    
    cursor = collection.aggregate([match,project,sort])
    
    return cursor

In [10]:
# Test
subject = 'check it out'
emails_by_subject = get_emails_in_thread(collection, subject)
# print(len(emails_by_subject), ' results.\n')
for item in emails_by_subject:
    pprint(item)

{'_id': ObjectId('4f16fc98d1e2d3237100435a'),
 'body': 'http://www.telski.com/tandp/extras/photo_of_day/index.html',
 'filename': '76.',
 'headers': {'Content-Transfer-Encoding': '7bit',
             'Content-Type': 'text/plain; charset=us-ascii',
             'Date': 'Tue, 14 Nov 2000 07:09:00 -0800 (PST)',
             'From': 'eric.bass@enron.com',
             'Message-ID': '<27972472.1075854684357.JavaMail.evans@thyme>',
             'Mime-Version': '1.0',
             'Subject': 'check it out',
             'To': 'shanna.husser@enron.com, '
                   'dfranklin@hanovermeasurement.com, \r\n'
                   '\tjason.bass2@compaq.com, daphneco64@bigplanet.com, \r\n'
                   '\tlwbthemarine@bigplanet.com',
             'X-FileName': 'ebass.nsf',
             'X-Folder': '\\Eric_Bass_Dec2000\\Notes Folders\\Sent',
             'X-From': 'Eric Bass',
             'X-Origin': 'Bass-E',
             'X-To': 'Shanna Husser, dfranklin@hanovermeasurement.com, '
     

### 5)

Write a function which returns the percentage of emails sent on a weekend (i.e., Saturday and Sunday) as a `float` between 0 and 1. [6 points]

In [11]:
def get_percentage_sent_on_weekend(collection):
    """
    :param collection A PyMongo collection object
    :return A float between 0 and 1
    """    
    # YOUR CODE HERE
    filter = {'headers.Date' : {'$regex' : '^(Sat|Sun)'}}
    num_sent_on_weekend = collection.count_documents(filter)
    num_sum = collection.count_documents({})
    percentage = num_sent_on_weekend / num_sum
    
    return percentage

In [12]:
# Test
percentage_sent_on_weekend = get_percentage_sent_on_weekend(collection)
print('Percentage sent on weekend is: ', percentage_sent_on_weekend)

Percentage sent on weekend is:  0.04005479419277267


### 6)

Write a function with parameter limit. The function should return for each email account: the number of emails sent, the number of emails received, and the total number of emails (sent and received). Use the following format: [{"contact": "michael.simmons@enron.com", "from": 42, "to": 92, "total": 134}] and the information contained in the To, From, and Cc headers. Sort the output in descending order by the total number of emails. Use the parameter limit to specify the number of results to be returned. If limit is null, the function should return all results. If limit is higher than null, the function should return the number of results specified as limit. limit cannot take negative values. [10 points]

In [13]:
def get_emails_between_contacts(collection,limit):
    """
    Shows the communications between contacts
    Sort by the descending order of total emails using the To, From, and Cc headers.
    :param `collection` A PyMongo collection object    
    :param `limit` An integer specifying the amount to display, or
    if null will display all outputs
    :return A list of objects of the form:
    [{
        'contact': <<Another email address>>
        'from': 
        'to': 
        'total': 
    },{.....}]
    """    
    # YOUR CODE HERE
    # Project
    project = {
        '$project':{
            "From": "$headers.From",
            "To": {"$split":["$headers.To", ","]},
            "Cc": {"$split":["$headers.Cc", ","]}
            }
     }
    unwind_to = {
         "$unwind":"$To"
         }
    unwind_cc = {
         "$unwind":"$Cc"
         }
    # number of from
    group_from = {
        "$group":{
            "_id": "$From",
            "from": {"$sum":1}
        }
    }
    num_from = collection.aggregate([project,group_from])
    # Number of To
    group_to = {
         "$group": {
             "_id": {"$trim":{"input":"$To"}},
             "to": {"$sum":1}
        }
    }
    num_to = collection.aggregate([project,unwind_to,group_to])
    # Number of Cc
    group_cc = {
        "$group":{
            "_id": {"$trim":{"input":"$Cc"}},
            "cc": {"$sum":1}
        }
    }
    num_cc = collection.aggregate([project,unwind_cc,group_cc])
    
    df_from = pd.DataFrame(list(num_from))
    df_to = pd.DataFrame(list(num_to))
    df_cc = pd.DataFrame(list(num_cc))
    
    
    df_merge = df_from.merge(df_to,how='outer').merge(df_cc,how='outer').head(limit)
    df_merge.fillna(0,inplace=True)
    df_merge['to'] = df_merge['to'] + df_merge['cc']
    col_total = df_merge['to'] + df_merge['from']
    df_merge['total'] = col_total
    df_merge.rename(columns={'_id':'contact'}, inplace=True)
    df_merge.drop(['cc'],axis=1,inplace=True)
    
    result = []
    for index, row in df_merge.iterrows():
        item = {}
        item['contact'] = row['contact']
        item['from'] = row['from']
        item['to'] = row['to']
        item['total'] = row['total']
        result.append(item)
    
    return result

In [14]:
limit = 20
result = get_emails_between_contacts(collection,limit)
for item in result:
    print(item)

{'contact': 'edwardc38@hotmail.com', 'from': 1.0, 'to': 0.0, 'total': 1.0}
{'contact': 'carol.moline@powerpool.ab.ca', 'from': 11.0, 'to': 1.0, 'total': 12.0}
{'contact': 'lebend@tdbank.ca', 'from': 1.0, 'to': 0.0, 'total': 1.0}
{'contact': 'ingjald@shaw.ca', 'from': 1.0, 'to': 0.0, 'total': 1.0}
{'contact': '6.1132.6c-af5ssclxjfagjsrr.1@mail3.travelocity.com', 'from': 1.0, 'to': 0.0, 'total': 1.0}
{'contact': 'livia_zufferli@monitor.com', 'from': 4.0, 'to': 23.0, 'total': 27.0}
{'contact': 'daemon.extra@enron.com', 'from': 7.0, 'to': 0.0, 'total': 7.0}
{'contact': 'ben.sturgeon@gfinet.co.uk', 'from': 1.0, 'to': 1.0, 'total': 2.0}
{'contact': 'robert.anderson@blakes.com', 'from': 1.0, 'to': 4.0, 'total': 5.0}
{'contact': 'davidpsmith@att.net', 'from': 2.0, 'to': 0.0, 'total': 2.0}
{'contact': 'rbhattacharya@velaw.com', 'from': 3.0, 'to': 3.0, 'total': 6.0}
{'contact': 'jrogers@mofo.com', 'from': 3.0, 'to': 1.0, 'total': 4.0}
{'contact': 'alexeaton@msn.com', 'from': 1.0, 'to': 3.0, 'tot

### 7)
Write a function to find out the number of senders who were also direct receivers. Direct receiver means the email is sent to the person directly, not via cc or bcc. [4 points]

In [15]:
def get_from_to_people(collection):
    """
    :param collection A PyMongo collection object
    :return the NUMBER of the people who have sent emails and received emails as direct receivers.
    """    
    # YOUR CODE HERE
    # From
    project_from = {
        '$project':{
            "From": "$headers.From"
            }
     }
    group_from = {
        '$group':{
            '_id':{'$trim':{'input':'$From'}}
        }
    }
    people_from = collection.aggregate([project_from, group_from])
    # To
    project_to = {
        '$project':{
            "To": {"$split":["$headers.To", ","]}
        }
    }
    unwind_to = {
         "$unwind":"$To"
         }
    group_to = {
        '$group':{
            '_id':{'$trim':{'input':'$To'}}
        }
    }
    people_to = collection.aggregate([project_to,unwind_to,group_to])
    # Store them into df
    df_from = pd.DataFrame(list(people_from))
    df_to = pd.DataFrame(list(people_to))
    
    df_merge = pd.merge(df_from,df_to,left_on='_id',right_on='_id',how='inner')
    
    return df_merge

In [16]:
results = get_from_to_people(collection)
print('The number of users who are sender and direct receivers is: ', results.shape[0])

The number of users who are sender and direct receivers is:  10590


### 8)
Write a function with parameters start_date and end_date, which returns the number of email messages that have been sent between those specified dates, including start_date and end_date [4 points] 

In [17]:
def get_emails_between_dates(collection, start_date, end_date):
    """
    :param collection A PyMongo collection object
    :return All emails between the specified start_date and end_date
    """    
    # YOUR CODE HERE
    start_date = start_date + ' 00:00:00'
    end_date = end_date + ' 23:59:59'
    start_date = datetime.strptime(start_date, '%Y-%m-%d %H:%M:%S')
    end_date = datetime.strptime(end_date, '%Y-%m-%d %H:%M:%S')
    filter = [
    {
        "$project":
            {
                "new_date":{
                    "$dateFromString":{
                        "dateString": {"$substr":["$headers.Date", 0, 30]},
                        "onError":""
                        }
                    }
            }
    },
    {
        '$project':{
            'result':{
                '$and':[
                    {'$gte':['$new_date', start_date]},
                    {'$lte':['$new_date', end_date]}
                ]
                }
            }
     },
    {
         '$group':{
             '_id':'$result',
             'num':{
                 '$sum': 1
                 }
             }
         }
    ]
    
    cursor = collection.aggregate(filter)
    num = 0
    for c in cursor:
        if c['_id'] == True:
            num = c['num']
    return num

In [18]:
start_date = '2000-01-01'
end_date = '2001-12-30'
num = get_emails_between_dates(collection,start_date,end_date)
print('The number of emails between given dates is: ', num)

The number of emails between given dates is:  453384


## Task 2
This task will assess your ability to use the Hadoop Streaming API and MapReduce to process data. For each of the questions below, you are expected to write two python scripts, one for the Map phase and one for the Reduce phase. You are also expected to provide the correct parameters to the `hadoop` command to run the MapReduce process. Write down your answers in the specified cells below.

To get started, you need to download and unzip the YouTube dataset (available at http://edshare.soton.ac.uk/19547/) onto the machine where you have Hadoop installed (this should be the virtual machine provided).

To help you, `%%writefile` has been added to the top of the cells, automatically writing them to "mapper.py" and "reducer.py" respectively when the cells are run.

### 1) 
Using Youtube01-Psy.csv, find the hourly interval in which most spam was sent. The output should be in the form of a single key-value pair, where the value is a datetime at the start of the hour with the highest number of spam comments. [9 points]

In [1]:
%%bash

head -n 10 Youtube01-Psy.csv

COMMENT_ID,AUTHOR,DATE,CONTENT,CLASS
LZQPQhLyRh80UYxNuaDWhIGQYNQ96IuCg-AYWqNPjpU,Julius NM,2013-11-07T06:20:48,"Huh, anyway check out this you[tube] channel: kobyoshi02",1
LZQPQhLyRh_C2cTtd9MvFRJedxydaVW-2sNg5Diuo4A,adam riyati,2013-11-07T12:37:15,"Hey guys check out my new channel and our first vid THIS IS US THE  MONKEYS!!! I'm the monkey in the white shirt,please leave a like comment  and please subscribe!!!!",1
LZQPQhLyRh9MSZYnf8djyk0gEF9BHDPYrrK-qCczIY8,Evgeny Murashkin,2013-11-08T17:34:21,just for test I have to say murdev.com,1
z13jhp0bxqncu512g22wvzkasxmvvzjaz04,ElNino Melendez,2013-11-09T08:28:43,me shaking my sexy ass on my channel enjoy ^_^ ﻿,1
z13fwbwp1oujthgqj04chlngpvzmtt3r3dw,GsMega,2013-11-10T16:05:38,watch?v=vtaRGgvGtWQ   Check this out .﻿,1
LZQPQhLyRh9-wNRtlZDM90f1k0BrdVdJyN_YsaSwfxc,Jason Haddad,2013-11-26T02:55:11,"Hey, check out my new website!! This site is about kids stuff. kidsmediausa  . com",1
z13lfzdo5vmdi1cm123te5uz2mqig1brz04,ferleck ferles,2013-11-27T21:39

In [5]:
%%writefile mapper.py
#!/usr/bin/env python
#Answer for mapper.py
import csv
import sys
import re

lines = sys.stdin.readlines()
csvreader = csv.reader(lines)
header = next(csvreader)
        
times = [row[2] for row in csvreader if row[4]=='1']

for time in times:
    token = re.split(":", time)[0]
    print(token + "\t1")

Overwriting mapper.py


In [6]:
%%writefile reducer.py
#!/usr/bin/env python
#Answer for reducer.py
import sys
from collections import defaultdict

input_pairs = sys.stdin.readlines()
accumulator = defaultdict(lambda: 0)

for row in input_pairs:
    key_value_pair = row.split("\t", 1)
    if len(key_value_pair) != 2:
        continue
        
    time = key_value_pair[0]
    count = int(key_value_pair[1].strip())
    accumulator[time] = accumulator[time] + count

for (key, value) in accumulator.items():
    print(key + ":00:00" + "\t" + str(value))

Overwriting reducer.py


In [7]:
%%bash
chmod a+x mapper.py reducer.py
cat Youtube01-Psy.csv | ./mapper.py | ./reducer.py | sort

2013-11-07T06:00:00	1
2013-11-07T12:00:00	1
2013-11-08T17:00:00	1
2013-11-09T08:00:00	1
2013-11-10T16:00:00	1
2013-11-26T02:00:00	1
2013-11-27T21:00:00	1
2013-11-28T16:00:00	2
2013-11-28T17:00:00	3
2013-11-28T18:00:00	1
2013-11-28T19:00:00	1
2013-11-28T21:00:00	2
2013-11-28T23:00:00	1
2013-11-29T00:00:00	1
2013-12-01T01:00:00	1
2013-12-01T03:00:00	1
2013-12-23T12:00:00	1
2013-12-25T19:00:00	1
2013-12-27T23:00:00	1
2014-01-19T00:00:00	2
2014-01-19T04:00:00	1
2014-01-19T08:00:00	1
2014-01-19T10:00:00	1
2014-01-19T16:00:00	1
2014-01-19T17:00:00	2
2014-01-19T19:00:00	1
2014-01-20T02:00:00	1
2014-01-20T04:00:00	1
2014-01-20T06:00:00	2
2014-01-20T09:00:00	1
2014-01-20T10:00:00	1
2014-01-20T15:00:00	1
2014-01-20T16:00:00	1
2014-01-20T17:00:00	2
2014-01-20T18:00:00	1
2014-01-20T20:00:00	3
2014-01-20T21:00:00	1
2014-11-02T00:00:00	1
2014-11-02T01:00:00	1
2014-11-02T05:00:00	1
2014-11-02T12:00:00	1
2014-11-02T14:00:00	2
2014-11-02T15:00:00	1
2014-11-02T16:00:00	1
2014-11-02T17:00:00	1
2014-11-02

In [8]:
%%bash
#Hadoop command to run the map reduce.
rm -rf output

hadoop-standalone-mode.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py   \
-input Youtube01-Psy.csv   \
-mapper ./mapper.py  \
-reducer ./reducer.py \
-output output

Hadoop switched to standalone mode.


It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
18/12/10 09:50:25 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/10 09:50:26 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
18/12/10 09:50:26 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
18/12/10 09:50:26 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
18/12/10 09:50:27 INFO mapred.FileInputFormat: Total input files to process : 1
18/12/10 09:50:27 INFO mapreduce.JobSubmitter: number of splits:1
18/12/10 09:50:27 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local209536761_0001
18/12/10 09:50:28 INFO mapred.LocalDistributedCacheManager: Localized file:/home/comp6235/Notebooks/cw2/mapper.py as file:/tmp/hadoop-comp6235/mapr

In [9]:
#Expected key-value output format:
#hour_with_most_spam	"2013-11-10T10:00:00"

#Additional key-value pairs are acceptable, as long as the hour_with_most_spam pair is correct.
import csv
import sys
import re

with open('output/part-00000','r') as f:
    lines = f.readlines()
    
lines_list = [line for line in lines]
num_list = [line.split()[1] for line in lines]
max_index = num_list.index(max(num_list))
most_spam = lines_list[max_index]
print("hour_with_most_spam\t" + most_spam.split()[0])

hour_with_most_spam	2014-11-08T10:00:00


### 2) 
Find all comments associated with a username (the AUTHOR field). Return a JSON array of all comments associated with that username. (This should use the data from all 5 data files: Psy, KatyPerry, LMFAO, Eminem, Shakira) [11 points]

In [10]:

%%writefile mapper2.py
#!/usr/bin/env python
#Answer for mapper2.py
import csv
import sys
import re

lines = sys.stdin.readlines()
csvreader = csv.reader(lines)
header = next(csvreader)
        
items = [[row[1],row[3]] for row in csvreader if row[4]=='1']

for item in items:
    print(item[0]+'\t'+item[1])

Overwriting mapper.py


In [11]:
%%writefile reducer2.py
#!/usr/bin/env python
#Answer for reducer2.py
import sys
from collections import defaultdict

input_pairs = sys.stdin.readlines()
accumulator = defaultdict(lambda: [])

for row in input_pairs:
    key_value_pair = row.split('\t',1)
    if len(key_value_pair) != 2:
        continue
        
    author = key_value_pair[0]
    comment = key_value_pair[1]
    accumulator[author].append(comment)

for (key, value) in accumulator.items():
    print(key + "\t" + str(value))

Overwriting reducer2.py


In [12]:
%%bash
chmod a+x mapper2.py reducer2.py
cat Youtube01-Psy.csv | ./mapper2.py | ./reducer2.py | sort

2666playboy	[' Follow me on Instagram. _chris_cz  \xef\xbb\xbf\n']
8-BitMusic	['Hey guys! Im a 12 yr old music producer. I make chiptunes and 8bit music.  It would be wonderful if you checked out some of my 8bit remixes! I even  have a gangnamstyle 8bit remix if you would like to check that out ;)  Thanks!!\n']
Aaa Aaa	['PLEASE SUBSCRIBE ME!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\xef\xbb\xbf\n']
abdellah chafouai	['Discover a beautiful song of A young Moroccan     http://www.linkbucks.com/AcN2g\xef\xbb\xbf\n']
Abdinasir Omar	['Great music anyway\xef\xbb\xbf\n']
Adam Mudd	['How are there 2 billion views and theres only 2 million people in the  world!?!?!?!! MULTIPLE ACCOUNTS!!!1111\xef\xbb\xbf\n']
adam riyati	["Hey guys check out my new channel and our first vid THIS IS US THE  MONKEYS!!! I'm the monkey in the white shirt,please leave a like comment  and please subscribe!!!!\n"]
Adele Lupei	['Is this the video that started the whole "got my dick stuck in an elevator"  excuse

In [13]:
%%bash
#Hadoop command to run the map reduce.
rm -rf output2

hadoop-standalone-mode.sh

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper2.py,reducer2.py   \
-input Youtube01-Psy.csv,Youtube02-KatyPerry.csv,Youtube03-LMFAO.csv,Youtube04-Eminem.csv,Youtube05-Shakira.csv   \
-mapper ./mapper2.py  \
-reducer ./reducer2.py \
-output output2

Hadoop switched to standalone mode.


It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
18/12/10 09:52:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/10 09:52:30 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
18/12/10 09:52:30 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
18/12/10 09:52:30 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
18/12/10 09:52:30 INFO mapred.FileInputFormat: Total input files to process : 5
18/12/10 09:52:30 INFO mapreduce.JobSubmitter: number of splits:5
18/12/10 09:52:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local560112486_0001
18/12/10 09:52:31 INFO mapred.LocalDistributedCacheManager: Localized file:/home/comp6235/Notebooks/cw2/mapper2.py as file:/tmp/hadoop-comp6235/map

In [None]:
#Expected key-value output format:
#John Smith	["Comment 1", "Comment 2", "Comment 3", "etc."]
#Jane Doe	["Comment 1", "Comment 2", "Comment 3", "etc."]

In [14]:
import csv
import sys
import re

username = 'Derek Moya'

def search_comments_by_username(username):
    with open('output2/part-00000','r') as f:
        lines = f.readlines()

    item_dict = {}
    for line in lines:
        line = line.split('\t',1)
        item_dict[line[0]] = line[1]
    
    print(username + '\t' + item_dict[username])
    
search_comments_by_username(username)

Derek Moya	['You guys should check out this EXTRAORDINARY website called MONEYGQ.COM .   You can make money online and start working from home today as I am!   I am making over $3,000+ per month at MONEYGQ.COM !   Visit MONEYGQ.COM and check it out!  Why does the wood photograph the husky breath? When does the act retain the delightful system? The rhythm fabricates the scintillating harbor.\n', 'You guys should check out this EXTRAORDINARY website called MONEYGQ.COM .   You can make money online and start working from home today as I am!   I am making over $3,000+ per month at MONEYGQ.COM !   Visit MONEYGQ.COM and check it out!  Why does the innocent woman prioritize the desire? The flight searchs the sad polish. When does the tax zip the omniscient record?\n', 'You guys should check out this EXTRAORDINARY website called MONEYGQ.COM .   You can make money online and start working from home today as I am!   I am making over $3,000+ per month at MONEYGQ.COM !   Visit MONEYGQ.COM and chec