Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

final fixes

  • Loading branch information...
commit 6e7e1910358586d52e6338290f193c78ef93072e 1 parent 9e2c39a
@marcua marcua authored
Showing with 36 additions and 38 deletions.
  1. +36 −38 day5/mapreduce.py
View
74 day5/mapreduce.py
@@ -1,9 +1,9 @@
"""
-In day 4, we saw how to process text data using the Enron email dataset. In reality, we only processed a small fraction of the entire dataset: about 15 megabytes of Kenneth Lay's emails. The entire dataset containing many Enron employees' mailboxes is 1.3 gigabytes, about 87 times than what we worked with. And what if we worked on GMail, Yahoo! Mail, or Hotmail? We'd have several petabytes worth of emails, at least 71 million times the size of the data we dealt with.
+On day 4, we saw how to process text data using the Enron email dataset. In reality, we only processed a small fraction of the entire dataset: about 15 megabytes of Kenneth Lay's emails. The entire dataset containing many Enron employees' mailboxes is 1.3 gigabytes, about 87 times than what we worked with. And what if we worked on GMail, Yahoo! Mail, or Hotmail? We'd have several petabytes worth of emails, at least 71 million times the size of the data we dealt with.
-All that data would take a while to process, and it certainly couldn't fit on or be crunched by a single laptop. We'd have to store the data on many machines, and we'd have to process it (tokenize it, calculate tf-idf) using multiple machines. There are many ways to do this, but one of the more popular recent methods of _parallelizing data computation_ is on a programming framework called MapReduce, an idea that [Google presented to the world in 2004](http://en.wikipedia.org/wiki/MapReduce). Luckily, you do not have to work at Google to benefit from MapReduce: an open-source implementation called [Hadoop](https://hadoop.apache.org/) is available for your use!
+All that data would take a while to process, and it certainly couldn't fit on or be crunched by a single laptop. We'd have to store the data on many machines, and we'd have to process it (tokenize it, calculate tf-idf) using multiple machines. There are many ways to do this, but one of the more popular recent methods of _parallelizing data computation_ is based on a programming framework called MapReduce, an idea that [Google presented to the world in 2004](http://en.wikipedia.org/wiki/MapReduce). Luckily, you do not have to work at Google to benefit from MapReduce: an open-source implementation called [Hadoop](https://hadoop.apache.org/) is available for your use!
-But we don't have hundreds of machines sitting around for us to use them, you might say. Actually, we do! [Amazon Web Services](http://aws.amazon.com/) offers a service called Elastic MapReduce (EMR) that gives us access to as many machines as we would like for about [10 cents per hour](http://aws.amazon.com/elasticmapreduce/pricing/) of machine we use. Use 100 machines for 2 hours? Pay Amazon aroud $2.00. If you've ever heard the buzzword *cloud computing*, this elastic service is part of the hype.
+You might worry that we don't have hundreds of machines sitting around for us to use them. Actually, we do! [Amazon Web Services](http://aws.amazon.com/) offers a service called Elastic MapReduce (EMR) that gives us access to as many machines as we would like for about [10 cents per hour](http://aws.amazon.com/elasticmapreduce/pricing/) of machine we use. Use 100 machines for 2 hours? Pay Amazon aroud $2.00. If you've ever heard the buzzword *cloud computing*, this elastic service is part of the hype.
Let's start with a simple word count example, then rewrite it in MapReduce, then run MapReduce on 20 machines using Amazon's EMR, and finally write a big-person MapReduce workflow to calculate TF-IDF!
@@ -19,13 +19,13 @@
"""
-This will result in a new file called `lay-j.json`, which is JSON-encoded. What is JSON? You can think of it like a text representation of python dictionaries and lists. If you open up the file, you will see on each line something that looks like this:
+This will result in a new file called `lay-k.json`, which is JSON-encoded. What is JSON? You can think of it like a text representation of python dictionaries and lists. If you open up the file, you will see on each line something that looks like this:
`{"sender": "rosalee.fleming@enron.com", "recipients": ["lizard_ar@yahoo.com"], "cc": [], "text": "Liz, I don't know how the address shows up when sent, but they tell us it's \nkenneth.lay@enron.com.\n\nTalk to you soon, I hope.\n\nRosie", "mid": "32285792.1075840285818.JavaMail.evans@thyme", "fpath": "enron_mail_20110402/maildir/lay-k/_sent/108.", "bcc": [], "to": ["lizard_ar@yahoo.com"], "replyto": null, "ctype": "text/plain; charset=us-ascii", "fname": "108.", "date": "2000-08-10 03:27:00-07:00", "folder": "_sent", "subject": "KLL's e-mail address"}`
-It's a dictionary representing an email found in Kenneth Lay's mailbox. It contains the same content that we dealt with yesterday, but encoded into JSON, and rather than one file per email, we have a single file with one email per line.
+It's a dictionary representing an email found in Kenneth Lay's mailbox. It contains the same content that we dealt with on day 4, but encoded into JSON, and rather than one file per email, we have a single file with one email per line.
-Why did we do this? Big data crunching systems like Hadoop don't deal well with lots of small files: they want to be able to send a large chunk of data to a machine and have to crunch on it for a while. So we've processed the data to be in this format: one big file, a bunch of emails one per line. If you're curious how we did this, check out `dataiap/day5/emails_to_json.py`.
+Why did we do this? Big data crunching systems like Hadoop don't deal well with lots of small files: they want to be able to send a large chunk of data to a machine and have to crunch on it for a while. So we've processed the data to be in this format: one big file, a bunch of emails line-by-line. If you're curious how we did this, check out `dataiap/day5/emails_to_json.py`.
Aside from that, processing the emails is pretty similar to what we did on day 4. Let's look at a script that counts the words in the text of each email (Remember: it would help if you wrote and ran your code in `dataiap/day5/...` today, since several modules like `term_tools.py` are available in that directory).
@@ -48,7 +48,7 @@
"""
-You can save this script to `exercise1.py` and then run `python exercise2.py dataiap/datasets/emails/lay-k.json`. It will print the word count in due time. `get_terms` is similar to the word tokenizer we saw on day 4. `words` keeps track of the number of times we've seen each word. `email = JSONValueProtocol.read(line)[1]` uses a JSON decoder to convert each line into a dictionary called email, that we can then tokenize into individual terms.
+You can save this script to `exercise1.py` and then run `python exercise1.py dataiap/datasets/emails/lay-k.json`. It will print the word count in due time. `get_terms` is similar to the word tokenizer we saw on day 4. `words` keeps track of the number of times we've seen each word. `email = JSONValueProtocol.read(line)[1]` uses a JSON decoder to convert each line into a dictionary called email, that we can then tokenize into individual terms.
As we said before, running this process on several petabytes of data is infeasible because a single machine might not have petabytes of storage, and we would want to enlist multiple computers in the counting process to save time.
@@ -58,8 +58,7 @@
<h3>Motivating MapReduce</h3>
-You may have noticed that the code we have been writing for the past
-week look awfully similar.
+You may have noticed that the various programs we have written in previous exercises look somewhat repetitive.
Remember when we processed the campaign donations dataset to create a
histogram? We basically:
@@ -77,10 +76,9 @@
3. **summarized** each folder's group by counting the number of times
each term was seen.
-This three step pattern of 1) extracting and processing **from each**
+This three-step pattern of 1) extracting and processing **from each**
line or file, 2) **grouping by** some attribute(s), and 3)
-**summarizing"" the contents of each group is extremely common in the
-vast majority of data processing tasks. We implemented step 2 by
+**summarizing** the contents of each group is common in data processing tasks. We implemented step 2 by
adding elements to a global dictionary (e.g.,
`folder_tf[e['folder']].update` in <a
href="http://dataiap.github.com/dataiap/day4/index.html#tf">day 4's
@@ -92,9 +90,9 @@
may not fit into memory, and it'll take forever to run. Then what do
we do?
-The researchers at google also noticed this problem, and developed a
-framework to run these kinds
-of patterns on huge amounts of data (MapReduce). When you use this framework, you just need to write
+The researchers at Google also noticed this problem, and developed a
+framework called MapReduce to run these kinds
+of patterns on huge amounts of data. When you use this framework, you just need to write
functions to perform each of the three steps, and the framework takes
care of running the whole pattern on one machine or a thousand machines! They use a
different terminology for the three steps:
@@ -115,7 +113,7 @@
In the *map* phase (figure below), we are going to send each computer
1/3 of the lines. Each computer will process their 1,000,000 lines by
-reading each of the lines and extracting the words that make them up.
+reading each of the lines and tokenizing their words.
For example, the first machine may extract "enron, call,...", while
the second machine extracts "conference, call,...".
@@ -139,8 +137,8 @@
Finally, once each machine has received the values of the keys it's
responsible for, the *reduce* phase will process each key's value. It does this by
going through each key that is assigned to the machine and executing
-a `reducer` function on the 1's associated with the key. For example,
-"enron' was associated with a list of three 1's, and the reducer step
+a `reducer` function on the values associated with the key. For example,
+"enron" was associated with a list of three 1's, and the reducer step
simply adds them up.
<img src="./lab_reduce.png" width="500" />
@@ -176,23 +174,23 @@ def reducer(self, term, occurrences):
"""
-Let's break this thing down. You'll notice the terms MRJob in a bunch
+Let's break this thing down. You'll notice the term MRJob in a bunch
of places. [MRJob](https://github.com/Yelp/mrjob) is a python package
-that makes writing MapReduce programs easy. The developers at Yelp (they wrote the mrjob module)
+that makes writing MapReduce programs easy. The developers at Yelp (they wrote the `mrjob` module)
wrote a convenience class called `MRJob` that you will extend. When it's run, it automatically hooks into the MapReduce
framework, reads and parses the input files, and does a bunch of other
things for you.
What we do is create a class `MRWordCount` that extends `MRJob`, and
implement the `mapper` and `reducer` functions. If the program is run from the
-command line, (the `if __name__ == '__main__':` part) it will execute
+command line (the `if __name__ == '__main__':` part), it will execute
the MRWordCount MapRedce program.
Looking inside `MRWordCount`, we see `INPUT_PROTOCOL` being set to
`JSONValueProtocol`. By default, map functions expect a line of text
as input, but we've encoded our emails as JSON, so we let MRJob know
that. Similarly, we explain that our reduce tasks will emit
-dictionaries, and set `OUTPUT_PROTOCOL` appropriately.
+dictionaries by setting `OUTPUT_PROTOCOL` appropriately.
The `mapper` function handles the functionality described in the first
image of the last section. It takes each email, tokenizes it into
@@ -209,7 +207,7 @@ def reducer(self, term, occurrences):
The `reducer` function implements the third image of the last section.
We are given a word (the key emitted from mappers), and a list
`occurrences` of all of the values emitted for each instance of
-`term`. Since we are counting occurrences of words, we `field` a
+`term`. Since we are counting occurrences of words, we `yield` a
dictionary containing the term and a sum of the occurrences we've seen.
Note that we `sum` instead of `len` the `occurrences`. This allows us to change the mapper implementation to emit the number of times each word occurs in a document, rather than `1` for each word.
@@ -225,13 +223,13 @@ def reducer(self, term, occurrences):
"""
-The `-o` flag tells MrJob to output all reducer output to the `wordcount_test` directory. The `--no-output` flag says not to print the output of the reducers to the screen. The last argument (`'../datasets/emails/lay-k.json'`) specifies which file (or files) to read into the mappers as input.
+The `-o` flag tells MRJob to output all reducer output to the `wordcount_test` directory. The `--no-output` flag says not to print the output of the reducers to the screen. The last argument (`'../datasets/emails/lay-k.json'`) specifies which file (or files) to read into the mappers as input.
Take a look at the newly created `wordcount_test` directory. There should be at least one file (`part-00000`), and perhaps more. There is one file per reducer that counted words. Reducers don't talk to one-another as they do their work, and so we end up with multiple output files. While the count of a specific word will only appear in one file, we have no idea which reducer file will contain a given word.
The output files (open one up in a text editor) list each word as a dictionary on a single line (`OUTPUT_PROTOCOL = JSONValueProtocol` in `mr_wordcount.py` is what caused this).
-You will notice we have not yet run tasks on large datasets (we're still using `lay-k.json`) and we are still running them locally on our computers. We will soon learn to movet his work to Amazon's cloud infrastructure, but running MrJob tasks locally to test them on a small file is forever important. MapReduce tasks will take a long time to run and hold up several tens to several hundreds of machines. They also cost money to run, whether they contain a bug or not. Test them locally like we just did to make sure you don't have bugs before going to the full dataset.
+You will notice we have not yet run tasks on large datasets (we're still using `lay-k.json`) and we are still running them locally on our computers. We will soon learn to movet his work to Amazon's cloud infrastructure, but running MRJob tasks locally to test them on a small file is forever important. MapReduce tasks will take a long time to run and hold up several tens to several hundreds of machines. They also cost money to run, whether they contain a bug or not. Test them locally like we just did to make sure you don't have bugs before going to the full dataset.
<a name="firstexercise"><h3>Show off What you Learned</h3></a>
"""
@@ -241,7 +239,7 @@ def reducer(self, term, occurrences):
"""
"""
-** Optional Exercise ** Grep. The [`grep` command](http://en.wikipedia.org/wiki/Grep) on UNIX-like systems allows you to search text files for some term or terms. Typing `grep hotdogs file1` will return all instances of the word `hotdogs` in the file `file1`. Implement a `grep` for emails. When a user uses your mapreduce program to find a word in the email collection, they will be given a list of the subjects and senders of all emails that contain the word. You might find you do not need a particularly smart reducer in this case: that's fine. If you're pressed for time, you can skip this exercise.
+** (Optional) Exercise ** The [`grep` command](http://en.wikipedia.org/wiki/Grep) on UNIX-like systems allows you to search text files for some term or terms. Typing `grep hotdogs file1` will return all instances of the word `hotdogs` in the file `file1`. Implement a `grep` for emails. When a user uses your mapreduce program to find a word in the email collection, they will be given a list of the subjects and senders of all emails that contain the word. You might find you do not need a particularly smart reducer in this case: that's fine. If you're pressed for time, you can skip this exercise.
"""
"""
@@ -282,7 +280,7 @@ def reducer(self, term, occurrences):
S3 data is stored in ** buckets **. Within a bucket you create, you can store as many files or folders as you'd like. The name of your bucket has to be unique across all of the people that store their stuff in S3. Want to make your own bucket? Let's do this!
- * Go to the AWS console (the website), and click on the ** S3 ** tab. This will show you a file explorer-like interface, with buckets listed on the left and files per bucket listed on the right.
+ * Log in to the AWS console (the website), and click on the ** S3 ** tab. This will show you a file explorer-like interface, with buckets listed on the left and files per bucket listed on the right.
* Click "Create Bucket" near the top left.
* Enter a bucket name. This has to be unique across all users of S3. Pick something like `dataiap-YOURUSERNAME-testbucket`. ** Do not use underscores in the name of the bucket **.
* Click "Create"
@@ -297,7 +295,7 @@ def reducer(self, term, occurrences):
* Right click on the uploaded file, and click "Make Public."
* Verify the file is public by going to `http://dataiap-YOURUSERNAME-testbucket.s3.amazonaws.com/lay-k.json`.
-Awesome! We just uploaded our first file to S3. Amazon is now hosting the file. We can access it over the web, which means we can share it with other researchers or process it in elastic mapreduce. To save time, we've uploaded the entire enron dataset to [https://dataiap-enron-json.s3.amazonaws.com/](https://dataiap-enron-json.s3.amazonaws.com/). Head over there to see all of the different Enron employee's files listed (the first three should be `allen-p.json`, `arnold-j.json`, and `arora-h.json`).
+Awesome! We just uploaded our first file to S3. Amazon is now hosting the file. We can access it over the web, which means we can share it with other researchers or process it in Elastic MapReduce. To save time, we've uploaded the entire enron dataset to [https://dataiap-enron-json.s3.amazonaws.com/](https://dataiap-enron-json.s3.amazonaws.com/). Head over there to see all of the different Enron employee's files listed (the first three should be `allen-p.json`, `arnold-j.json`, and `arora-h.json`).
Two notes from here. First, uploading the file to S3 was just an exercise---we'll use the `dataiap-enron-json` bucket for our future exercises. That's because the total file upload is around 1.3 gigs, and we didn't want to put everyone through the pain of uploading it themselves. Second, most programmers don't use the web interface to upload their files. They instead opt to upload the files from the command line. If you have some free time, feel free to check out `dataiap/resources/s3_util.py` for a script that copies directories to and downloads buckets from S3.
@@ -325,11 +323,11 @@ def reducer(self, term, occurrences):
* `--no-output`: don't print the reducer output to the screen.
* `'s3://dataiap-enron-json/*.json'`: perform the mapreduce with input from the `dataiap-enron-json` bucket that the instructors created, and use as input any file that ends in `.json`. You could have named a specific file, like `lay-k.json` here, but the point is that we can run on much larger datasets.
-Check back on the script. Is it still running? It should be. You may as well keep reading, since you'll be here a while. In total, our run took three minutes for Amazon to requisition the machines, 4 minutes to install the necessary software on them, and between 15 adn 25 minutes to run the actual MapReduce tasks on Hadoop. That might strike some of you as weird, and we'll talk about it now.
+Check back on the script. Is it still running? It should be. You may as well keep reading, since you'll be here a while. In total, our run took three minutes for Amazon to requisition the machines, four minutes to install the necessary software on them, and between 15 adn 25 minutes to run the actual MapReduce tasks on Hadoop. That might strike some of you as weird, and we'll talk about it now.
-Understanding MapReduce is about understanding ** scale **. We're used to thinking of our programs as being about ** performance **, but that's not the role of MapReduce. Running a script on a single file on a single machine will be faster than running a script on multiple files split amongst multiple machines that shuffle data around to one-another and emit the data to a service (like EMR and S3) over the internet is not going to be fast. We write MapReduce programs because they let us easily ask for 10 more machines when the data we're processing grows by a factor of 10, not so that we can achieve sub-second processing times on large datasets. It's a mental model switch that will take a while to appreciate, so let it brew in your mind for a bit.
+Understanding MapReduce is about understanding ** scale **. We're used to thinking of our programs as being about ** performance **, but that's not the role of MapReduce. Running a script on a single file on a single machine will be faster than running a script on multiple files split amongst multiple machines that shuffle data around to one-another and emit the data to a service (like EMR and S3) over the internet is not going to be fast. We write MapReduce programs because they let us easily ask for 10 times more machines when the data we're processing grows by a factor of 10, not so that we can achieve sub-second processing times on large datasets. It's a mental model switch that will take a while to appreciate, so let it brew in your mind for a bit.
-What it does mean is that MapReduce as a programming model is not a magic bullet. The Enron dataset is not actually so large that it shouldn't be processed on your laptop. We used the dataset because it was large enough to give you an appreciation for order-of-magnitue file size differences, but not large enough that a modern laptop can't process the data. In reality, don't look into MapReduce until you have several tens or hundreds of gigabytes of data to analyze. In the world that exists inside most companies, this size dataset is easy to stumble upon. So don't be disheartened if you don't need the MapReduce skills just yet: you will likely need them one day.
+What it does mean is that MapReduce as a programming model is not a magic bullet. The Enron dataset is not actually so large that it shouldn't be processed on your laptop. We used the dataset because it was large enough to give you an appreciation for order-of-magnitue file size differences, but not large enough that a modern laptop can't process the data. In practice, don't look into MapReduce until you have several tens or hundreds of gigabytes of data to analyze. In the world that exists inside most companies, this size dataset is easy to stumble upon. So don't be disheartened if you don't need the MapReduce skills just yet: you will likely need them one day.
<h3>Analyzing the output</h3>
Hopefully your first mapreduce is done by now. There are two bits of output we should check out. First, when the MapReduce job finishes, you will see something like the following message in your terminal window:
@@ -359,8 +357,8 @@ def reducer(self, term, occurrences):
Spilled Records: 134445547
"""
-That's a summary of, on your 10 machines, how many Mappers and Reducers ran. You can run more than one of each on a physical machine, which explains why more than 10 of each ran in our tasks. Notice how many reducers ran your task. Each reducer is going to receive a set of words and their number of occurrences, and emit word counts. Reducers don't talk to one-another, so they end up writing their own files.
-`
+That's a summary of, on your 20 machines, how many Mappers and Reducers ran. You can run more than one of each on a physical machine, which explains why more than 20 of each ran in our tasks. Notice how many reducers ran your task. Each reducer is going to receive a set of words and their number of occurrences, and emit word counts. Reducers don't talk to one-another, so they end up writing their own files.
+
With this in mind, go to the S3 console, and look at the `output` directory of the S3 bucket to which you output your words. Notice that there are several files in the `output` directory named `part-00000`, `part-00001`. There should be as many files as there were reducers, since each wrote the file out. Download some of these files and open them up. You will see the various word counts for words across the entire Enron email corpus. Life is good!
"""
@@ -372,7 +370,7 @@ def reducer(self, term, occurrences):
"""
<h3>TF-IDF</h3>
-This section is going to further exercise our MapReduce-fu. If you're tired, feel free to [jump to the end](#wherefromhere) and comeback to it later!
+This section is going to further exercise our MapReduce-fu.
On [day 4](../day4/index.html), we learned that counting words is not enough to summarize text: common words like `the` and `and` are too popular. In order to discount those words, we multiplied by the term frequency of `wordX` by `log(total # documents/# documents with wordX)`. Let's do that with MapReduce!
@@ -384,11 +382,11 @@ def reducer(self, term, occurrences):
* The third calculates a per-sender IDF for each term after taking both the second MapReduce's term IDF and the email corpus as input.
-** HINT ** Do not run these MapReduce tasks on Amazon. You saw how slow it was tor un, so make sure the entire TF-IDF workflow works on your local machine with `lay-k.json` before moving to Amazon.
+** HINT ** Do not run these MapReduce tasks on Amazon. You saw how slow it was to run, so make sure the entire TF-IDF workflow works on your local machine with `lay-k.json` before moving to Amazon.
<h3>MapReduce 1: Total Number of Documents</h3>
-Eugene and I are the laziest of instructors. We don't like doing work where we don't have to. If you'd like a mental exercise as to how to write this MapReduce, you can do so yourself, but it's simpler than the wordcount example. Our dataset is not so large that we can't just use the `wc` UNIX command to count the number of lines in our corpus:
+Eugene and I are the laziest of instructors. We don't like doing work where we don't have to. If you'd like a mental exercise as to how to write this MapReduce, you can do so yourself, but it's simpler than the wordcount example. Our dataset is small enough that we can just use the `wc` UNIX command to count the number of lines in our corpus:
"""
wc -l lay-k.json
@@ -406,7 +404,7 @@ def reducer(self, term, occurrences):
<h3>MapReduce 3: Per-Sender TF-IDFs</h3>
-The third MapReduce multiplies per-sender term frequencies by per-term IDFs. This means it needs to take as input the IDFs calculated in the last step ** as well as ** calculate the per-sender TFs. That requires something we haven't seen yet: initialization logic. Let's show you the code, then tell you how it's done.
+The third MapReduce multiplies per-sender term frequencies by per-term IDFs. This means it needs to take as input the IDFs calculated in the last step ** and ** calculate the per-sender TFs. That requires something we haven't seen yet: initialization logic. Let's show you the code, then tell you how it's done.
"""
@@ -441,7 +439,7 @@ def reducer(self, term_sender, howmany):
If you did the [first exercise ](#firstexercise), the `mapper` and `reducer` functions should look a lot like the per-sender word count `mapper` and `reducer` functions you wrote for that. The only difference is that `reducer` takes the term frequencies and multiplies them by `self.idfs[term]`, to normalize by each word's IDF. The other difference is the addition of `reducer_init`, which we will describe next.
-`self.idfs` is a dictionary containing term-IDF mappings from the [first MapReduce](#tfidfstep2). Say you ran the IDF-calculating MapReduce like so:
+`self.idfs` is a dictionary containing term-IDF mappings from the [second MapReduce](#tfidfstep2). Say you ran the IDF-calculating MapReduce like so:
"""
Please sign in to comment.
Something went wrong with that request. Please try again.