# Coursework 2: Data Processing

## Task 1
This coursework will assess your understanding of using NoSQL to store and retrieve data.  You will perform operations on data from the Enron email dataset in a MongoDB database, and write a report detailing the suitability of different types of databases for data science applications.  You will be required to run code to answer the given questions in the Jupyter notebook provided, and write a report describing alternative approaches to using MongoDB.

Download the JSON version of the Enron data (using the “Download as zip” to download the data file from http://edshare.soton.ac.uk/19548/, the file is about 380MB) and import into a collection called messages in a database called enron.  You do not need to set up any authentication.  In the Jupyter notebook provided, perform the following tasks, using the Python PyMongo library.

Answers should be efficient in terms of speed.  Answers which are less efficient will not get full marks.

### Importing the dataset

The JSON version of the dataset has been downloaded from [this link](http://edshare.soton.ac.uk/19548/)

The dataset has been imported into the database **enron**

The name of the collection is **messages**

**100000** documents have been imported

# CHANGE CODE TO IMPORT FULL DATASET

In [1]:
%%bash

# mongoimport is the Mongo command to import data.  
# It specifies the database, collection and format, and import file
# --drop means it's going to drop any collection with the same name which already exists
mongoimport --db enron_short --collection messages --drop --file ./messages_short.json
# Delete the JSON file we just downloaded
rm ./messages_short.json

2018-12-15T19:34:02.061+0000	Failed: open ./messages_short.json: no such file or directory
2018-12-15T19:34:02.061+0000	imported 0 documents
rm: cannot remove './messages_short.json': No such file or directory


In [6]:
# Importing libraries

import pymongo
from pymongo import MongoClient
from datetime import datetime
from pprint import pprint

import re

In [8]:
client = MongoClient('mongodb://localhost:27017')

client.list_database_names()

[u'local']

### 1)
Write a function which returns a MongoDB connection object to the "messages" collection. [4 points] 

# Change DB Name

In [4]:
db_name = 'enron_short'
coll_name = 'messages'

def get_collection():
    """
    Connects to the server, and returns a collection object
    of the `messages` collection in the `enron` database
    """
    # YOUR CODE HERE
    
    client = MongoClient('mongodb://localhost:27017')
    
    # check if the database is present
    if db_name in client.list_database_names():
        db = client[db_name]
        # check if collection is present
        if coll_name in db.list_collection_names():
            collection = db[coll_name]
        else:
            print("Collection:", coll_name, "not found")
            return False
    else:
        print ("Database:", db_name, "not found")
        return False
    
    
    return collection
            
        
messages_collection = get_collection()

('Database:', 'enron_short', 'not found')


Verifying that collection connection is able to read all documents

In [5]:
messages_collection.count_documents({})

AttributeError: 'bool' object has no attribute 'count_documents'

### 2)

Write a function which returns the amount of emails in the messages collection in total. [4 points] 

In [None]:
def get_amount_of_messages(collection):
    """
    :param collection A PyMongo collection object
    :return the amount of documents in the collection
    """    
    # YOUR CODE HERE
    return collection.count_documents({})
number_of_emails = get_amount_of_messages(messages_collection)    

print (number_of_emails)

### 3) 

Write a function which returns each person who was BCCed on an email.  Include each person only once, and display only their name according to the X-To header. [4 points] 



In [None]:
def get_bcced_people(collection):
    """
    :param collection A PyMongo collection object
    :return the names of the people who have received an email by BCC
    """    
    # YOUR CODE HERE

    # lists to store intermediate output
    bcc_list = []
    bcc_list1 = []
    bcc_list2 = []
    bcc_list3 = []
    
    
    # final list of names
    final_list = []

    # find docs where bcc field exists and is not empty and append in the list: bcc_list

    for doc in collection.find({ 'headers.X-bcc': {'$exists': True, '$ne': ''} }):
        bcc_list.append(doc['headers']['X-bcc'])


    # set of operations to clean the data

    for bcc_value in bcc_list:
        bcc_list1.append(bcc_value.split('>,'))



    for bcc_value in bcc_list1:
        for value in bcc_value:
            bcc_list2.append(value.split('</O')[0])


    for bcc_value in bcc_list2:
        bcc_list3.append(bcc_value.split(' <')[0])

    # now we have set of names and emails without the part between '<>'

    # we want only the names

    for bcc_value in bcc_list3:
        # ignore the email ids
        if '@' not in bcc_value:
            # strip the names of trailing whitespaces and check if the names are already in the final list 
            if bcc_value.strip() not in final_list:
                final_list.append(bcc_value.strip())

    return final_list
get_bcced_people(messages_collection)

For the following questions I have used the **Aggregation Pipeline** framework of MongoDB to process and aggregate the data into logical steps for easier debugging and improved performance

### 4)

Write a function with parameter subject, which gets all emails in a thread with that parameter, and orders them by date (ascending). “An email thread is an email message that includes a running list of all the succeeding replies starting with the original email.”, check for detail descriptions at https://www.techopedia.com/definition/1503/email-thread [4 points]

In [None]:
def get_emails_in_thread(collection, subject):
    """
    :param collection A PyMongo collection object
    :return All emails in the thread with that subject
    """ 
    
    # The output should include all emails in the thread including the original email
    # YOUR CODE HERE    
    
    output_emails = []
    
    # Get the main subject as parameter


    # other mails in the thread will have the subject : 'Re: [subject]'
    subject = subject.strip()

    reply_subject = "Re: " + subject

    # limit stage to limit results (may be added to the pipeline for debugging)

    limit_stage = {
        '$limit': 100
    }

    # This stage is to match the emails according to the subjects

    match_stage = {
        '$match': { 'headers.Subject': { '$in': [subject, reply_subject] } }
    }

    # This stage does the following operations:
    # 1. $substr: Extracts the first 25 chars from the Date Field
    # 2. $rtrim: removes trailing whitespace that appears if the day is a single digit number
    # 3. $dateFromString: converts the string to a Date object, which will be used in the sorting stage
    # I have also used this stage to display certain fields so that the output looks clean

    project_stage = {
         '$project': { 'DateOfMessage': 
                          {'$dateFromString': { 'dateString' : 
                                               { '$rtrim': {'input': {'$substr': ['$headers.Date', 0, 25] }}}

                                              }
                          },

                        'filename': 1,
                        'body': 1,
                        'Date': '$headers.Date',
                        'Subject': '$headers.Subject'
                     } 
    }

    # This stage sorts the docs in ascending order of Date

    sort_stage = {
        '$sort': {'DateOfMessage': 1}
    }

    #

    project_stage2 = {
        '$project': {
            'DateOfMessage': 0,
            '_id': 0
        }
    }

    pipeline = [match_stage, project_stage, sort_stage, project_stage2]

    # return the cursor
    return collection.aggregate(pipeline)

for doc in get_emails_in_thread(messages_collection, "Plays and other information"):
    pprint(doc)

### 5)

Write a function which returns the percentage of emails sent on a weekend (i.e., Saturday and Sunday) as a `float` between 0 and 1. [6 points]

In [None]:
def get_percentage_sent_on_weekend(collection):
    """
    :param collection A PyMongo collection object
    :return A float between 0 and 1
    """    
    # YOUR CODE HERE
    
    # compute total number of emails in our dataset
    
    total_documents = float(collection.count_documents({}))
    
    # limit stage to limit results (may be added to the pipeline for debugging)

    limit_stage = {
        '$limit': 100
    }

    # This stage does the following operations:
    # 1. $substr: Extracts the first 25 chars from the Date Field
    # 2. $rtrim: removes trailing whitespace that appears if the day is a single digit number
    # 3. $dateFromString: converts the string to a Date object
    project_stage = {
         '$project': { 'DateOfMessage': 
                          {'$dateFromString': { 'dateString' : 
                                               { '$rtrim': {'input': {'$substr': ['$headers.Date', 0, 25] }}}

                                              }
                          }
                     } 
    }
    
    
    # This stage computes the day of the week for a date as a number between 1 (Sunday) and 7 (Saturday)
    project_stage2 = {
         '$project': {
                        'DayOfWeek': {
                            '$dayOfWeek': '$DateOfMessage'
                        }
                     } 
    }
    

    # This stage is used to filter docs sent on Sunday(1) or Saturday(7)
    match_stage1 = {
            '$match': { 'DayOfWeek': { '$in': [1, 7] } }
    }

    # Group stage to count the number of documents
    group_stage1 = {
        '$group': {
            '_id': None, 'count': {'$sum': 1}
        }
    }


    # final project stage to compute percentage of emails sent on weekends    
    project_stage4 = {
        '$project': {
            'percentage_weekend': { '$divide': ['$count', total_documents] }
        }
    }




    pipeline = [project_stage, project_stage2, match_stage1, group_stage1, project_stage4]

    for doc in collection.aggregate(pipeline):
        return float(doc['percentage_weekend'])

get_percentage_sent_on_weekend(messages_collection)    

### 6)

Write a function with parameter limit. The function should return for each email account: the number of emails sent, the number of emails received, and the total number of emails (sent and received). Use the following format: [{"contact": "michael.simmons@enron.com", "from": 42, "to": 92, "total": 134}] and the information contained in the To, From, and Cc headers. Sort the output in descending order by the total number of emails. Use the parameter limit to specify the number of results to be returned. If limit is null, the function should return all results. If limit is higher than null, the function should return the number of results specified as limit. limit cannot take negative values. [10 points]

In [None]:
def get_emails_between_contacts(collection, limit):
    """
    Shows the communications between contacts
    Sort by the descending order of total emails using the To, From, and Cc headers.
    :param `collection` A PyMongo collection object    
    :param `limit` An integer specifying the amount to display, or
    if null will display all outputs
    :return A list of objects of the form:
    [{
        'contact': <<Another email address>>
        'from': 
        'to': 
        'total': 
    },{.....}]
    """    
    
    from_data = []
    to_data = []

    from_emails = []
    to_emails = []

    common_emails = []

    # limit stage to limit results (may be added to the pipeline for debugging)

    limit_stage = {
        '$limit': 100
    }

    final_list = []



    # In the first stage we project the fields "From" and "To" for each email 
    # From: sender of the email (headers.From)
    # To: receivers (To + Cc) of the email (headers.To + headers.Cc)

    project_stage1 = {
         '$project': {
                         '_id': 0,
                         'To': ['$headers.To', '$headers.Cc'],
                         'From': '$headers.From',
                     } 
    }

    # Unwind stage: Deconstructs the field: 'To' to output a document for each element

    unwind_stage1 = {
        '$unwind': {'path': '$To'}
    }

    # After the unwind stage, there are docs of the form: {'To': None, 'From': 'michael.simmons@enron.com'}
    # We filter out these docs with 'To': None

    match_stage1 = {
        '$match': {
            'To': {'$ne': None}
        }
    }

    # There are docs which have multiple ids in 'To field', separated by ', '. So we split them 

    project_stage2 = {
        '$project': {
                         'From': 1,
                         'To': {'$split': ['$To', ', ']}

                     } 
    }

    # Another Unwind stage: Deconstructs the field: 'To' to output a document for each element

    unwind_stage2 = {
        '$unwind': {'path': '$To'}
    }


    # Now we have single ids in 'From' and 'To' fields 

    # But some ids in 'To' field are of the form: To': '\r\n\tbryan.hull@enron.com'
    # So we use $trim to get rid of these unwanted characters


    project_stage3 = {
        '$project': {
            'To': {'$trim': {'input': '$To'}},
            'From': 1

        }
    }

    # Now we have documents of the form {'From': email_id1, 'To': email_id2} 
    # where each document represents an email between the 2 email ids

    # Stage to group the accounts according to 'To' field and compute the count (the number of emails received)


    group_stage1 = {
        '$group': { 
            '_id': '$To',
            'count_to': {'$sum': 1}          
                  }
    }

    # Stage to group the accounts according to 'From' field and compute the count (the number of emails sent)


    group_stage2 = {
        '$group': { 
            '_id': '$From',
            'count_from': {'$sum': 1}          
                  }
    }


    # We create two pripelines with the above mentioned stages
    # pipeline1: Compute docs for each email id and number of emails received by the id (count_to) 
    # pipeline2: Compute docs for each email id and number of emails sent by the id (count_from) 


    pipeline1 = [ project_stage1, unwind_stage1, match_stage1, project_stage2, 
                     unwind_stage2, project_stage3, group_stage1]

    pipeline2 = [ project_stage1, unwind_stage1, match_stage1, project_stage2, 
                     unwind_stage2, project_stage3, group_stage2]


    # Append the docs in two separate lists 
    for doc in collection.aggregate(pipeline1):
        to_data.append(doc)

    for doc in collection.aggregate(pipeline2):
        from_data.append(doc)


    # Create two lists that contains the email ids
    for user_data in to_data:
        email = user_data['_id']
        to_emails.append(email)


    for user_data in from_data:
        email = user_data['_id']
        from_emails.append(email)

    # for each account in the 'to' data, add the entry to a final list 

    for user_data in to_data:
        contact = user_data['_id']
        to_value = user_data['count_to']
        # total is set as to_value initially
        dict_entry = {'contact': contact, 'to': to_value, 'from': 0, 'total': to_value}
        final_list.append(dict_entry)

    # now we check each account in the 'from' data
    # if the account is already there in the 'to' email ids, then the entry must have been added in the previous step
    # so simply modify values (set 'from' value and add it to the 'total' value)
    # else create a new entry and append to the final list

    for user_data in from_data:
        contact = user_data['_id']
        from_value = user_data['count_from']
        if contact in to_emails:
            # already entry created,just modify values
            for user_data in final_list:
                if user_data['contact'] == contact:
                    # set 'From' field
                    user_data['from'] = from_value
                    # update 'Total' field
                    user_data['total'] += from_value

        else:
            # new user data
            dict_entry = {'contact': contact, 'to': 0, 'from': from_value, 'total': from_value}
            final_list.append(dict_entry)

    sorted_list = sorted(final_list, key=lambda k: k['total'], reverse=True) 
    
    if limit is None:
    
        return sorted_list
    else:
        limit = int(limit)
        return sorted_list[:limit]

get_emails_between_contacts(messages_collection, 5)  

### 7)
Write a function to find out the number of senders who were also direct receivers. Direct receiver means the email is sent to the person directly, not via cc or bcc. [4 points]

In [None]:
def get_from_to_people(collection):
    """
    :param collection A PyMongo collection object
    :return the NUMBER of the people who have sent emails and received emails as direct receivers.
    """    
    # YOUR CODE HERE

    #------------- FIND DIRECT RECEIVERS ---------------------------------

    direct_receivers = []

    direct_receivers1 = []

    # find docs which have the headers.To field and the field is not empty and append the data to a list

    for doc in collection.find({ 'headers.To': {'$exists': True, '$ne': ''} }):
        direct_receivers.append(doc['headers']['To'])

    # clean up the data and store the unique valyes in a new list

    for receivers in direct_receivers:
        for receiver in receivers.split(', '):
            if receiver.strip(' \t\n\r') not in direct_receivers1:
                direct_receivers1.append(receiver.strip(' \t\n\r'))
                
    # direct_receivers1 contains the unique list of direct reciver email ids
    
    #------------- FIND UNIQUE SENDERS ---------------------------------

    senders = []

    unique_senders = []

    # find docs which have the headers.From field and the field is not empty and append the data to a list


    for doc in collection.find({ 'headers.From': {'$exists': True, '$ne': ''} }):
        senders.append(doc['headers']['From'].strip())

    # store uniqie sender email ids in a list
    for sender in senders:
        if sender not in unique_senders:
            unique_senders.append(sender)



    #------------- FIND THE NUMBER OF COMMON ELEMENTS ---------------------------------

    return len(set(direct_receivers1).intersection(senders1))

get_from_to_people(messages_collection)

### 8)
Write a function with parameters start_date and end_date, which returns the number of email messages that have been sent between those specified dates, including start_date and end_date [4 points] 

The following function can accept dates in the following formats (with or without timezone specified)

- Tue, 14 Nov 2000 08:22:00 -0800 (PST)
- Tue, 14 Nov 2000 08:22:00

Whichever form is used, the function converts it into proper datetime format 

In [None]:
def get_emails_between_dates(collection, start_date, end_date):
    """
    :param collection A PyMongo collection object
    :return All emails between the specified start_date and end_date
    """    
    # YOUR CODE HERE 
    
    start_date_as_date = ''
    end_date_as_date = ''
    
    # start date and end date to be in form: Tue, 14 Nov 2000 08:22:00 -0800 (PST)
    # or in form: Tue, 14 Nov 2000 08:22:00
    
    start_date = start_date.strip()
    end_date = end_date.strip()
    
    # check for format used and remove timezone if present
    
    
    if start_date[25:].strip() == '-0800 (PST)':
        start_date = start_date[:25].strip()
        
    if end_date[25:].strip() == '-0800 (PST)':
        end_date = end_date[:25].strip()
    
    if len(start_date) <= 25 and len(end_date) <= 25:
        # parse as datetime
        start_date_as_date = datetime.strptime(start_date, '%a, %d %b %Y %H:%M:%S')
        end_date_as_date = datetime.strptime(end_date, '%a, %d %b %Y %H:%M:%S')
    
    # limit stage to limit results (may be added to the pipeline for debugging)
    
    limit_stage = {
    '$limit': 100
    }


    # This stage does the following operations:
    # 1. $substr: Extracts the first 25 chars from the Date Field
    # 2. $rtrim: removes trailing whitespace that appears if the day is a single digit number
    # 3. $dateFromString: converts the string to a Date object
    
    project_stage = {
         '$project': { 'DateOfMessage': 
                          {'$dateFromString': { 'dateString' : 
                                               { '$rtrim': {'input': {'$substr': ['$headers.Date', 0, 25] }}}

                                              }
                          }
                     } 
    }
    
    # match stage to filter documents with Date >= start_date and Date <= end_date
    
    match_stage = {
        '$match': {
            '$and': [ {'DateOfMessage' : { '$gte': start_date_as_date } },  
                      {'DateOfMessage' : { '$lte': end_date_as_date } } ]
        }
    }
    
    # group stage to count number of emails
    
    group_stage1 = {
        '$group': {
            '_id': None, 'count': {'$sum': 1}
        }
    }
    
    

    pipeline = [project_stage, match_stage, group_stage1]
    
    for doc in collection.aggregate(pipeline):
        return int(doc['count'])
    
    
get_emails_between_dates(messages_collection, "Sat, 11 Nov 2000 16:37:00", "Tue, 14 Nov 2000 08:22:00 -0800 (PST)")

## Task 2
This task will assess your ability to use the Hadoop Streaming API and MapReduce to process data. For each of the questions below, you are expected to write two python scripts, one for the Map phase and one for the Reduce phase. You are also expected to provide the correct parameters to the `hadoop` command to run the MapReduce process. Write down your answers in the specified cells below.

To get started, you need to download and unzip the YouTube dataset (available at http://edshare.soton.ac.uk/19547/) onto the machine where you have Hadoop installed (this should be the virtual machine provided).

To help you, `%%writefile` has been added to the top of the cells, automatically writing them to "mapper.py" and "reducer.py" respectively when the cells are run.

### 1) 
Using Youtube01-Psy.csv, find the hourly interval in which most spam was sent. The output should be in the form of a single key-value pair, where the value is a datetime at the start of the hour with the highest number of spam comments. [9 points]

In [19]:
%%writefile mapper1.py
#!/usr/bin/env python
#Answer for mapper.py


# import the libraries
import csv
import sys
from datetime import datetime


# read in through standard input
lines = sys.stdin.readlines()

# csvreader is a reader object which will iterate over lines
csvreader = csv.reader(lines)

# Lists to hold the dates of the comments and the class (spam or not)
dates = []

spam_class = []

# counter variable is used to skip first row which contains the headers
counter = 0
for row in csvreader:
    # skip first row
    if counter > 0:
        dates.append(row[2])
        spam_class.append(row[4])
    counter += 1
    
# check for number of dates and spam classes
if (len(dates) != len(spam_class)):
    print ('Unequal number of entries in Date and Class columns... Aborting...')
    sys.exit()

# The mapper generates key	value pairs for the reducer step to aggregate
# We generate key	value pairs of the form: day|month|year|hour	1
# Thus there will be a key	value pair for each spam comment, 
# with the key specifying the day,month, year and hour of the comment

for x in range(len(dates)):
    if spam_class[x] == '1':
        # remove leading or trailing whitespaces
        date = dates[x].strip()
        # convert to datetime format
        date_as_date = datetime.strptime(date, '%Y-%m-%dT%H:%M:%S')
        # find day, month, year and hour from the date
        day = date_as_date.date().day
        month = date_as_date.date().month
        year = date_as_date.date().year
        hour = date_as_date.hour
        
        # output the key	value pair
        print (str(day) + '|' + str(month) + '|' + str(year) + '|' + str(hour) + '\t' + '1')

        

Overwriting mapper1.py


In [20]:
%%writefile reducer1.py
#!/usr/bin/env python
#Answer for reducer.py


import sys
from datetime import datetime


# we get the input pairs in key	value format from the mapper step
input_pairs = sys.stdin.readlines()

dates_list = []

date_count_dict = dict()

final_dict = {
    'hour_with_most_spam': None
}

# iterate over each input pair and append the keys(dates) into a list
for input_pair in input_pairs:
    # split the input by '\t' to separate out the key and the value
    # the second param to split specifies that the maximum number of splits is 1
    input_list = input_pair.split('\t', 1)
    # check if the splitted list contains 2 elements i.e a key and a value. If not, we skip for that input
    if (len(input_list) != 2):
        continue
    
    dates_list.append(input_list[0])
        
# next we build a dictionary which contains the key as the date and the value as the number of spam comments for that date
# the key is in the format: day|month|year|hour 

for date in dates_list:
    # if that date is already present, increase the count
    if date in date_count_dict.keys():
        date_count_dict[date] += 1
    # if it is a new entry, set the count to 1
    else:
        date_count_dict[date] = 1
        
# sort the dictionary based on its values (count of spam comments) in DECREASING ORDER
date_count_dict_sorted = sorted(date_count_dict.items(), key=lambda date_count_value: date_count_value[1], 
                                reverse=True)

final_dict['hour_with_most_spam'] = date_count_dict_sorted[0][0]

# some processing to get o/p in form: 2013-11-10T10:00:00
# we split day|month|year|hour into 4 parts to separate out day, month, year and hour

output_date = final_dict['hour_with_most_spam'].split('|',4)
day = output_date[0]
month = output_date[1]
year = output_date[2]
hour = output_date[3]
final_output = str(year) + '-' + str(month) + '-' + str(day) + 'T' + str(hour) + ':00:00'

# we set the value to the desired o/p format

final_dict['hour_with_most_spam'] = final_output


# print out the output in format: hour_with_most_spam	"2013-11-10T10:00:00"
for key, value in final_dict.items():
    print (key + "\t" + str(value))

Overwriting reducer1.py


Next we run the hadoop commands for the mapreduce

The directory structure is:
```
-Notebooks
    -\output1
    -mapper1.py
    -reducer1.py
    -Youtube01-Psy.csv
    -Youtube02-KatyPerry.csv
    -Youtube03-LMFAO.csv
    -Youtube04-Eminem.csv
    -Youtube05-Shakira.csv
    
```

In [21]:
%%bash

# Clear output directory
rm -rf output1

# Main pipeline command
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper1.py,reducer1.py \
-input ./Youtube01-Psy.csv \
-mapper ./mapper1.py \
-reducer ./reducer1.py \
-output output1

It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
18/12/15 20:24:34 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/15 20:24:35 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
18/12/15 20:24:35 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
18/12/15 20:24:35 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
18/12/15 20:24:35 INFO mapred.FileInputFormat: Total input files to process : 1
18/12/15 20:24:35 INFO mapreduce.JobSubmitter: number of splits:1
18/12/15 20:24:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local2121305499_0001
18/12/15 20:24:36 INFO mapred.LocalDistributedCacheManager: Localized file:/home/comp6235/Notebooks/CW2/mapper1.py as file:/tmp/hadoop-comp6235/ma

After the above step runs we get the output in the directory: **output1** and in the file: **part-00000**

The output is:

```
hour_with_most_spam	2014-11-8T10:00:00
```

In [None]:
#Expected key-value output format:
#hour_with_most_spam	"2013-11-10T10:00:00"

#Additional key-value pairs are acceptable, as long as the hour_with_most_spam pair is correct.

### 2) 
Find all comments associated with a username (the AUTHOR field). Return a JSON array of all comments associated with that username. (This should use the data from all 5 data files: Psy, KatyPerry, LMFAO, Eminem, Shakira) [11 points]

In [13]:
%%writefile mapper2.py
#!/usr/bin/env python
#Answer for mapper.py

# importing the libraries
import csv
import sys


def mapper_function(required_username):

    # function that accepts an username as input 

    # counter variable is used to skip first row which contains the headers
    counter = 0
    for row in csvreader:
        if counter > 0:
            usernames.append(row[1])
            comments.append(row[3])
        counter += 1
        
    # check for number of usernames and comments
    if (len(usernames) != len(comments)):
        print ('Unequal number of entries in Author and Content... Aborting...')
        sys.exit()

    # pass the required username and the comments for that username to reducer stage
    for x in range(len(usernames)):
        if required_username == usernames[x]:
            print (str(usernames[x]) + '\t' + str(comments[x]))
            


# read in through standard input
lines = sys.stdin.readlines()

# csvreader is a reader object which will iterate over lines
csvreader = csv.reader(lines)

# lists to store usernames and comments
usernames = []
comments = []


# get username from command line argument

required_username = str(sys.argv[1])

# Run the mapper function for that username

mapper_function(required_username)


Overwriting mapper2.py


In [1]:
%%writefile reducer2.py
#!/usr/bin/env python
#Answer for reducer.py

import sys

final_dict = {
    'username': None,
    'comments': []
}

# get input from mapper job

input_pairs = sys.stdin.readlines()



for input_pair in input_pairs:
    # split the input by '\t' to separate out the key and the value
    # the second param to split specifies that the maximum number of splits is 1
    input_list = input_pair.split('\t', 1)
    if (len(input_list) != 2):
        continue
        
    # append each comment
    final_dict['comments'].append(input_list[1])
    # set the username if it is not set
    if final_dict['username'] is None:
        final_dict['username'] = input_list[0]
    

# print out the output in desired form: username\t[..comments..]
print (final_dict.values()[0] + '\t' + str(final_dict.values()[1]))
    

Writing reducer2.py


Next we run the hadoop commands for the mapreduce

The directory structure is:
```
-Notebooks
    -\output2
    -mapper2.py
    -reducer2.py
    -Youtube01-Psy.csv
    -Youtube02-KatyPerry.csv
    -Youtube03-LMFAO.csv
    -Youtube04-Eminem.csv
    -Youtube05-Shakira.csv
    
```

We pass the argument of the username as shown below within "" so that the space separated string is treated 
as a single argument for our mapper file.

In the example below we have passed in the user: "Kiddy Kidso" as the argument

In [17]:
%%bash

# Clear output
rm -rf output2

# Make sure hadoop is in standalone mode
hadoop-standalone-mode.sh

# Main pipeline command
hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper2.py,reducer2.py \
-input ./Youtube01-Psy.csv ./Youtube02-KatyPerry.csv ./Youtube03-LMFAO.csv ./Youtube04-Eminem.csv ./Youtube05-Shakira.csv \
-mapper 'mapper2.py "Kiddy Kidso"' -file ./mapper2.py  \
-reducer ./reducer2.py \
-output output2

Hadoop switched to standalone mode.
packageJobJar: [./mapper2.py] [] /tmp/streamjob3696086078029075728.jar tmpDir=null


It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
18/12/16 02:57:54 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/12/16 02:57:55 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
18/12/16 02:57:55 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
18/12/16 02:57:55 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
18/12/16 02:57:55 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
18/12/16 02:57:55 INFO mapred.FileInputFormat: Total input files to process : 5
18/12/16 02:57:56 INFO mapreduce.JobSubmitter: number of splits:5
18/12/16 02:57:56 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1804969977_0001
18/12/16 02:57:56 INFO mapred.Lo

After the above step runs we get the output in the directory: **output2** and in the file: **part-00000**

The output is of the format:

```
Username	["Comment 1", "Comment 2", "Comment 3", "etc."]
```

In [None]:
#Expected key-value output format:
#John Smith	["Comment 1", "Comment 2", "Comment 3", "etc."]
#Jane Doe	["Comment 1", "Comment 2", "Comment 3", "etc."]