# Coursework 2: Data Processing

## Task 1
This coursework will assess your understanding of using NoSQL to store and retrieve data.  You will perform operations on data from the Enron email dataset in a MongoDB database, and write a report detailing the suitability of different types of databases for data science applications.  You will be required to run code to answer the given questions in the Jupyter notebook provided, and write a report describing alternative approaches to using MongoDB.

Download the JSON version of the Enron data (using the “Download as zip” to download the data file from http://edshare.soton.ac.uk/19548/, the file is about 380MB) and import into a collection called messages in a database called enron.  You do not need to set up any authentication.  In the Jupyter notebook provided, perform the following tasks, using the Python PyMongo library.

Answers should be efficient in terms of speed.  Answers which are less efficient will not get full marks.

In [1]:
import pymongo
from pymongo import MongoClient
from datetime import datetime
from pprint import pprint

### 1)
Write a function which returns a MongoDB connection object to the "messages" collection. [4 points] 

In [3]:
def get_collection():
    """
    Connects to the server, and returns a collection object
    of the `messages` collection in the `enron` database
    """
    # YOUR CODE HERE
    client = MongoClient('mongodb://localhost:27017')
    db = client.enron

    return db.messages

### 2)

Write a function which returns the amount of emails in the messages collection in total. [4 points] 

In [4]:
def get_amount_of_messages(collection):
    """
    :param collection A PyMongo collection object
    :return the amount of documents in the collection
    """    
    # YOUR CODE HERE
    #When the data is too big, collection.estimated_document_count() gives rough estmation of the count 
    # and returns the results faster as it uses metdata
    return collection.count_documents({})
    
    pass
    

In [5]:
#TEST
messages = get_collection()
x =get_amount_of_messages(messages)
print(x)

100000


### 3) 

Write a function which returns each person who was BCCed on an email.  Include each person only once, and display only their name according to the X-To header. [4 points] 



In [6]:
def get_bcced_people(collection):
    """
    :param collection A PyMongo collection object
    :return the names of the people who have received an email by BCC
    """    
    # YOUR CODE HERE
    bcc= collection.distinct("headers.X-bcc", {'headers.X-bcc': {'$nin': [None, '']}})
    #clean the data
    lis = []
    for i in bcc:
        ind = i.find("<")
        y=i[0:ind].replace("'", "").strip() 
        lis.append(y)
    #if any more duplicates after cleaning the data? => Ans: Yes, get the Set from the list(30 to 29)
    bccSet = set(lis)
    return bccSet
    pass
    

In [7]:
#TEST
pprint(get_bcced_people(messages))

set([u'Apollo, Beth',
     u'Barry, Patrick',
     u'Beck, Sally',
     u'Beth Apollo',
     u'Carr, James',
     u'Causey, Richard',
     u'Choate, Heather',
     u'Davis, Dana',
     u'De La Paz, Janet',
     u'Denny, Jennifer',
     u'Gilbert-smith, Doug',
     u'Greg Piper',
     u'Lokey, Teb',
     u'Ogenyi, Gloria',
     u'Patti Thompson',
     u'Piper, Greg',
     u'Ratliff, Dale',
     u'Richard Shapiro',
     u'Robert Superty',
     u'Robertson, Audrey',
     u'Shapiro, Richard',
     u'Storey, Geoff',
     u'Twiggs, Thane',
     u'Villarreal, Alex',
     u'Wehring, Linda',
     u'Westbrook, Cherylene R.',
     u'cameron@perfect.com',
     u'david.beck@houston.rr.com',
     u'ddavissprint20@earthlink.net'])


### 4)

Write a function with parameter subject, which gets all emails in a thread with that parameter, and orders them by date (ascending). “An email thread is an email message that includes a running list of all the succeeding replies starting with the original email.”, check for detail descriptions at https://www.techopedia.com/definition/1503/email-thread [4 points]

In [8]:
def get_emails_in_thread(collection, subject):
    """
    :param collection A PyMongo collection object
    :return All emails in the thread with that subject
    """    
    # YOUR CODE HERE  
    sort = [('headers.Date', pymongo.ASCENDING)]
    filter = {"headers.Subject": {"$regex": subject}}
    return list(messages.find(filter, sort=sort))
    
    pass

In [9]:
#TEST
pprint(get_emails_in_thread(messages, "check it out"))

[{u'_id': ObjectId('4f16fc98d1e2d3237100435a'),
  u'body': u'http://www.telski.com/tandp/extras/photo_of_day/index.html',
  u'filename': u'76.',
  u'headers': {u'Content-Transfer-Encoding': u'7bit',
               u'Content-Type': u'text/plain; charset=us-ascii',
               u'Date': u'Tue, 14 Nov 2000 07:09:00 -0800 (PST)',
               u'From': u'eric.bass@enron.com',
               u'Message-ID': u'<27972472.1075854684357.JavaMail.evans@thyme>',
               u'Mime-Version': u'1.0',
               u'Subject': u'check it out',
               u'To': u'shanna.husser@enron.com, dfranklin@hanovermeasurement.com, \r\n\tjason.bass2@compaq.com, daphneco64@bigplanet.com, \r\n\tlwbthemarine@bigplanet.com',
               u'X-FileName': u'ebass.nsf',
               u'X-Folder': u'\\Eric_Bass_Dec2000\\Notes Folders\\Sent',
               u'X-From': u'Eric Bass',
               u'X-Origin': u'Bass-E',
               u'X-To': u'Shanna Husser, dfranklin@hanovermeasurement.com, Jason.Bass2@C

### 5)

Write a function which returns the percentage of emails sent on a weekend (i.e., Saturday and Sunday) as a `float` between 0 and 1. [6 points]

In [10]:
def get_percentage_sent_on_weekend(collection):
    """
    :param collection A PyMongo collection object
    :return A float between 0 and 1
    """    
    # YOUR CODE HERE
      
    filter = {"$or": [{"headers.Date": {"$regex": '^Sat', "$options": "i"}},{"headers.Date": {"$regex": '^Sun', "$options": "i"}}]}
    weekend=float(collection.count_documents(filter))
    total=float(collection.count_documents({}))
    perc  = float(weekend/total)
    return perc
    pass

In [11]:
#TEST
pprint(get_percentage_sent_on_weekend(messages))
#weekend = 0.0393, weekday = 0.9607

0.0393


### 6)

Write a function with parameter limit. The function should return for each email account: the number of emails sent, the number of emails received, and the total number of emails (sent and received). Use the following format: [{"contact": "michael.simmons@enron.com", "from": 42, "to": 92, "total": 134}] and the information contained in the To, From, and Cc headers. Sort the output in descending order by the total number of emails. Use the parameter limit to specify the number of results to be returned. If limit is null, the function should return all results. If limit is higher than null, the function should return the number of results specified as limit. limit cannot take negative values. [10 points]

In [12]:
import json 
#Function to get the dictionary with from email along with it's count.
def getFromEmails():
    groupFrom = { '$group': { '_id': '$headers.From', 'from': {'$sum': 1}}}

    fromE = messages.aggregate([groupFrom])
    fromEmails ={}
    for i in fromE:
        email= i["_id"] 
        value=i["from"] 
        if email in fromEmails:
            fromEmails[email] +=value
        else:
            fromEmails[email] =value
    return fromEmails

# Tried to use $split from mongoDB along with butilt-in aggregations like above case "fromEmails".
#This version is not supporting $split, so writing for loops instead of upgrading the VM
def getToEmails():
    toE= messages.find({'headers.To': {'$nin': [None, '']}},projection={"headers.To":1, "_id":0} ).limit(100)
  
    #clean the data
    toEmails = {}
    for i in toE:
        emails= i["headers"]["To"].split(",")
        for i in emails:
            email = i.strip()
            if email in toEmails:
                toEmails[email] +=1
            else:
                toEmails[email] =1
        
    return toEmails
    pass

#Same as above, but get emails in "CC" and respective counts
def getCcEmails():
    ccE= messages.find({'headers.Cc': {'$nin': [None, '']}},projection={"headers.Cc":1, "_id":0} ).limit(100)
  
    #clean the data
    ccEmails = {}
    for i in ccE:
        emails= i["headers"]["Cc"].split(",")
        for i in emails:
            email = i.strip()
            if email in ccEmails:
                ccEmails[email] +=1
            else:
                ccEmails[email] =1     
    return ccEmails
    pass

#This function merges to 2 dictionary, sum over the values. Merge "To" and "Cc" email counts and retrun the totals
def mergeTwoDict(x,y):
    total =y
    for key in x:
        if key in total:
            total[key] +=x[key]
        else:
            total[key] =x[key]
    return total 

#This function merges the input dictionaries
#Merges "fromEmail" and "toEmail" dictionaries and gets the output list with json objects
def mergeFromToDict(x,y):
    output =[]
    z=set(x.keys()+y.keys())
    for key in z:
        fromE= x[key] if key in x  else 0
        toE=y[key] if key in y  else 0
        total = int(fromE+toE)
        
        output.append({"contact": key, "from":fromE, "to":toE, "total":total})
    return output 

def get_emails_between_contacts(collection, limit):
    """
    Shows the communications between contacts
    Sort by the descending order of total emails using the To, From, and Cc headers.
    :param `collection` A PyMongo collection object    
    :param `limit` An integer specifying the amount to display, or
    if null will display all outputs
    :return A list of objects of the form:
    [{
        'contact': <<Another email address>>
        'from': 
        'to': 
        'total': 
    },{.....}]
    """    
    # YOUR CODE HERE
    fromEmail =getFromEmails()
    #This would gets emails in "To" and also in "Cc" with respective counts
    toEmail=mergeTwoDict(getToEmails(),getCcEmails())
    
    output= mergeFromToDict(fromEmail,toEmail)
    
    #Sort in descending order
    output.sort(key=lambda s: s['total'],reverse=True)
    
    #Limit the results as given by the input parameter
    results =output[:limit]
    
    #Return results in nice Json format as requested
    return json.dumps(results, sort_keys=True, indent=4)
    
    
    pass

In [13]:
#TEST
print(get_emails_between_contacts(messages, 3))

[
    {
        "contact": "jeff.dasovich@enron.com", 
        "from": 9424, 
        "to": 0, 
        "total": 9424
    }, 
    {
        "contact": "sally.beck@enron.com", 
        "from": 4244, 
        "to": 0, 
        "total": 4244
    }, 
    {
        "contact": "david.delainey@enron.com", 
        "from": 2959, 
        "to": 0, 
        "total": 2959
    }
]


### 7)
Write a function to find out the number of senders who were also direct receivers. Direct receiver means the email is sent to the person directly, not via cc or bcc. [4 points]

In [14]:
def get_from_to_people(collection):
    """
    :param collection A PyMongo collection object
    :return the NUMBER of the people who have sent emails and received emails as direct receivers.
    """    
    # YOUR CODE HERE
    fromEmails= messages.distinct("headers.From", {'headers.From': {'$nin': [None, '']}})
    toEmails= getToEmails(messages)
    results =intersection(fromEmails,toEmails)
    return len(results)


    pass

#Function to get all unique emails in "To" list
def getToEmails(collection):
    toEmail= collection.distinct("headers.To", {'headers.To': {'$nin': [None, '']}})
    #clean the data
    lis = []
    for i in toEmail:
        ind = i.find("<")
        y=i[0:ind].strip().split(",")
        for j in y:
            lis.append(j.strip())
    myset = set(lis)
    return lis
    pass

# Gets the intersection
def intersection(lst1, lst2): 
    return list(set(lst1) & set(lst2))

In [15]:
# TESTS
pprint(get_from_to_people(messages))
# result length =2822


2822


### 8)
Write a function with parameters start_date and end_date, which returns the number of email messages that have been sent between those specified dates, including start_date and end_date [4 points] 

In [16]:
import datetime
from datetime import timedelta, datetime, tzinfo
def get_emails_between_dates(collection, start_date, end_date):
    """
    :param collection A PyMongo collection object
    :return All emails between the specified start_date and end_date
    """    
    # YOUR CODE HERE  
    #Convert the date into ISO format which is default for mongoDB
    #simple mongoDB function, which can't accept the datetime python object format or ISO as the it is retrived as string
        #'Fri, 1 Dec 2000 00:03:00 -0800 (PST)'}
        
    #format="%a, %-d %b %Y %H:%M:%S %z %Z"
    #start = start_date.strftime(format)
    #end= end_date.strftime(format)
    
    
    filter = {"$and": [{"headers.Date": { '$gte': start }},{"headers.Date": {'$lte':end}}]}
    return collection.count_documents(filter)
    
 
    pass

In [17]:
#Alternate solution so that this function can work with python datetime obects,
#but limit on number of records for performance reasons with sort on all items
def alternateSol(collection, start_date, end_date):
    sort1 = [("headers.Date", pymongo.ASCENDING)]
    x =collection.find({},projection={"headers.Date":1, "_id":0},sort=sort1 ).limit(100)
    results =0
    for i in x:

        y= i['headers']['Date'].split(",")[1].strip()
        ind= y.find("-")
        z= y[:ind].strip()
        z= datetime.strptime(z, '%d %b %Y %H:%M:%S')

        if z>= start_date and z<=end_date:
            results +=1
    return results

#TESTS
datefrom='2000-12-1 00:19:00'
dateto='2014-12-1 00:58:00'
startdate = datetime.strptime(datefrom, '%Y-%m-%d %H:%M:%S')
enddate =  datetime.strptime(dateto, '%Y-%m-%d %H:%M:%S')

emails=alternateSol(messages, startdate, enddate)
pprint(emails)

95


## Task 2
This task will assess your ability to use the Hadoop Streaming API and MapReduce to process data. For each of the questions below, you are expected to write two python scripts, one for the Map phase and one for the Reduce phase. You are also expected to provide the correct parameters to the `hadoop` command to run the MapReduce process. Write down your answers in the specified cells below.

To get started, you need to download and unzip the YouTube dataset (available at http://edshare.soton.ac.uk/19547/) onto the machine where you have Hadoop installed (this should be the virtual machine provided).

To help you, `%%writefile` has been added to the top of the cells, automatically writing them to "mapper.py" and "reducer.py" respectively when the cells are run.

### 1) 
Using Youtube01-Psy.csv, find the hourly interval in which most spam was sent. The output should be in the form of a single key-value pair, where the value is a datetime at the start of the hour with the highest number of spam comments. [9 points]

In [27]:
%%writefile mapper.py
#!/usr/bin/env python2.7
#Answer for mapper.py
#aggregation by hour and get the max of it
import csv
import sys
import re
from datetime import datetime, timedelta

def main():
    lines = sys.stdin.readlines()

    csvreader = csv.reader(lines)
    for row in csvreader:

        date,spam = row[2], row[4]

        if date.startswith("20") and int(spam)==1:
            #print(date,spam)
            datetime_object = datetime.strptime(date.split(".", 1)[0], '%Y-%m-%dT%H:%M:%S')
            result=datetime_object.replace(microsecond=0,second=0,minute=0)
            out=result.strftime('%Y-%m-%dT%H:%M:%S'),int(spam)
            print('%s\t%s' % (result.strftime('%Y-%m-%dT%H:%M:%S'),int(spam)))
            #Convert date to String => date.strftime('%Y-%m-%dT%H:%M:%S')
        
if __name__ == "__main__":
    main()
#STEPS:
#select DATE, CLASS and pass to reducer, filter for DATE where it is not real DATE format and send 
#send only date rounded to start top of the HOUR



Overwriting mapper.py


In [28]:

%%writefile reducer.py
#!/usr/bin/env python2.7
# REDUCER

import sys
from collections import defaultdict
# Keep simple example in for now, switch to stdin later
def main():
    input_pairs = sys.stdin.readlines()
    output ={}

    for word in input_pairs:
        word1=word.strip("()").strip(" ").replace("'", "").replace(")", "").replace("(", "").split("\t")
        (date,count) =(word1[0],int(word1[1].strip(" ")))

        if date in output:
            output[date] +=count
        else:
            output[date] =count
    max_=1
    result1=""
    result2=""
    for key, value in output.items():

        if value> max_:
            max_=value
            result2=result1
            result1=key

    print('%s\t%s' % ("hour_with_most_spam" ,result1))
    print('%s\t%s' % ("hour_with_next_most_spam", result2))

if __name__ == "__main__":
    main()
#STEPS:
# Sum over the count
# Return only tab sepearated string with the resulted DATE

Overwriting reducer.py


In [30]:
%%bash
#Hadoop command to run the map reduce.
hadoop fs -rm -r /home/comp6235/Notebooks/CW2/output

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input Youtube01-Psy.csv,Youtube02-KatyPerry.csv,Youtube03-LMFAO.csv,Youtube04-Eminem.csv,Youtube05-Shakira.csv \
-mapper ./mapper.py \
-reducer ./reducer.py \
-output output

It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
19/12/14 16:54:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `/home/comp6235/Notebooks/CW2/output': No such file or directory
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
19/12/14 16:54:04 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/12/14 16:54:04 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
19/12/14 16:54:04 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
19/12/14 16:54:04 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
19/12/14 16:54:04 INFO mapreduce.JobSubmitter: Cleaning up t

In [21]:
#Expected key-value output format:
#hour_with_most_spam	"2013-11-10T10:00:00"

#Additional key-value pairs are acceptable, as long as the hour_with_most_spam pair is correct.

### 2) 
Find all comments associated with a username (the AUTHOR field). Return a JSON array of all comments associated with that username. (This should use the data from all 5 data files: Psy, KatyPerry, LMFAO, Eminem, Shakira) [11 points]

In [22]:
%%writefile mapper.py
#!/usr/bin/env python2.7
#Answer for mapper.py
#aggregation by hour and get the max of it
import csv
import sys
import re
import os
from datetime import datetime, timedelta

def main():
   #Since it is not clear author field can be provided as input parameter, implementing the respective functionality
#Remove the filter to get the all list, if empty parameter also it should give all list
    #username=os.environ["user"]
    lines = sys.stdin.readlines()
   
    csvreader = csv.reader(lines)
    for row in csvreader:
        author,comment = row[1], row[3]
        if author and comment:
            out=author, comment
            print '%s\t%s'% (author ,comment)

                
            
        
if __name__ == "__main__":
    main()
#STEPS:
#select AUTHOR,COMMENT_ID filter for AUTHOR receiving from input parameter
#send only COMMENT_ID, CONTENT



Overwriting mapper.py


In [23]:

%%writefile reducer.py
#!/usr/bin/env python2.7
# REDUCER

import sys
from collections import defaultdict
# Keep simple example in for now, switch to stdin later
def main():
    input_pairs = sys.stdin.readlines()

    output ={}

    for word in input_pairs:
        word1=word.split("\t")
        (author,comment) =(word1[0].strip(),word1[1].strip())
        

        if author in output:
            output[author].append(comment)
        else:
            lis=[]
            output[author] =[comment]
    

        print('%s\t%s' % (author ,output[author]))

if __name__ == "__main__":
    main()
#STEPS:
# Sum over the count
# Return only tab sepearated string with the resulted DATE

Overwriting reducer.py


In [26]:
%%bash
#Hadoop command to run the map reduce.
hadoop fs -rm -r /home/comp6235/Notebooks/CW2/output

hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar \
-files mapper.py,reducer.py \
-input Youtube01-Psy.csv,Youtube02-KatyPerry.csv,Youtube03-LMFAO.csv,Youtube04-Eminem.csv,Youtube05-Shakira.csv \
-mapper mapper.py \
-reducer ./reducer.py \
-output output

It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
19/12/14 16:52:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
rm: `/home/comp6235/Notebooks/CW2/output': No such file or directory
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
19/12/14 16:52:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/12/14 16:52:30 INFO Configuration.deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
19/12/14 16:52:30 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
19/12/14 16:52:30 INFO jvm.JvmMetrics: Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
19/12/14 16:52:30 INFO mapreduce.JobSubmitter: Cleaning up t

In [25]:
#Expected key-value output format:
#John Smith	["Comment 1", "Comment 2", "Comment 3", "etc."]
#Jane Doe	["Comment 1", "Comment 2", "Comment 3", "etc."]