In [10]:
%%file word_count.py
# From http://mrjob.readthedocs.org/en/latest/guides/quickstart.html#writing-your-first-job

from mrjob.job import MRJob
from mrjob.step import MRStep
from itertools import tee
import re
import sys

WORD_RE = re.compile(r"[\w']+")

# Legacy class to remember how jobs work
class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

# New class for MRJob Word Probability
class MRWordProbability(MRJob):
    
    # All the steps taken to produce probabilities and most common occurences
    def steps(self):
        return [
            # Pull strings out of the csv
            MRStep(mapper=self.mapper_pull_csv),
            # Produce bigrams from the string
            MRStep(mapper=self.mapper_get_bigrams,
                   combiner=self.combiner_count_bigrams,
                   reducer=self.reducer_count_bigrams),
            # Calculate percents and most common occurences
            MRStep(reducer=self.reducer_calculate_percents)
        ]
    
    # Just get the string from the csv
    def mapper_pull_csv(self, _, line):
        if(line[0] != '"'):
            yield (None, line[line.find(","):].lower())
    
    # Pull words from the string and make a bigram for every instance of each word following each other
    def mapper_get_bigrams(self, _, line):
        prevWord = ""
        # Use regex to find words
        for word in WORD_RE.findall(line):
            if(prevWord != ""):
                yield ((prevWord, word), 1)
            prevWord = word
    
    # Combine all like bigrams
    def combiner_count_bigrams(self, word, counts):
        yield (word, sum(counts))
    
    # Combine all like bigrams
    def reducer_count_bigrams(self, word, counts):
        first_word, second_word = word
        yield first_word, (sum(counts), second_word)
        
    # Function used to sort based on probability
    def mostUsed(self, x):
        num, word = x
        return num
    
    # Calculate percentage of each word showing up
    def reducer_calculate_percents(self, word, pairs):
        
        total = 0
        
        # Tee off the iterator so we can have 3 total runs through the data
        pairs, secondPairs = tee(pairs)
        pairs, sortedPairs = tee(pairs)
        
        # First calculate the total number of occurences of each bigram
        for pair in pairs:
            tmpCnt, _ = pair
            
            total = total + tmpCnt
        
        # Then print out all the rest of the words based on most common occurence
        probabilityList = sorted(secondPairs, key=self.mostUsed, reverse = True)
        for anotherPair in probabilityList:
            word_count, word_key = anotherPair
            
            yield (word, word_key), ((float(word_count) / total), word_count)
        
        # If the first word is "my", print the most used pairs
        if (word == "my"):
            for i in range(10):
                if i == len(probabilityList): 
                    break
                word_count, word_key = probabilityList[i]
                yield 'Most used number ' + str(i+1), ((word, word_key), word_count / total, word_count)
        
        

# Run the program
if __name__ == '__main__':
    MRWordProbability.run()


Overwriting word_count.py


In [2]:
!pip install mrjob

Collecting mrjob
[?25l  Downloading https://files.pythonhosted.org/packages/40/e3/53ee0f4a5791e856065878751fa1959b0a5ea0b20d458c8b6bf28c59020d/mrjob-0.6.8-py2.py3-none-any.whl (428kB)
[K     |████████████████████████████████| 430kB 3.7MB/s eta 0:00:01
Collecting google-cloud-storage>=1.13.1 (from mrjob)
[?25l  Downloading https://files.pythonhosted.org/packages/d2/eb/782c13b27192914a3b76ca85023e061f6e96cbe8e29ed0b1591600165d01/google_cloud_storage-1.15.1-py2.py3-none-any.whl (64kB)
[K     |████████████████████████████████| 71kB 14.0MB/s eta 0:00:01
[?25hCollecting google-cloud-dataproc>=0.3.0 (from mrjob)
[?25l  Downloading https://files.pythonhosted.org/packages/86/9b/30f1e5f55515334b2d897afd19234da53113910ac9fb2d9b2ec128dd60d5/google_cloud_dataproc-0.3.1-py2.py3-none-any.whl (211kB)
[K     |████████████████████████████████| 215kB 23.1MB/s eta 0:00:01
[?25hCollecting google-cloud-logging>=1.9.0 (from mrjob)
[?25l  Downloading https://files.pythonhosted.org/packages/6e/f6/bdfa

In [11]:
!python word_count.py -r local jokes.csv --output-dir=word_count_out--no-output

No configs found; falling back on auto-configuration
No configs specified for local runner
Creating temp directory /tmp/word_count.nbuser.20190517.011809.333602
Running step 1 of 3...
Running step 2 of 3...
Running step 3 of 3...
job output is in word_count_out--no-output
Removing temp directory /tmp/word_count.nbuser.20190517.011809.333602...


In [12]:
%%file ~/.mrjob.conf

# http://mrjob.readthedocs.io/en/stable/guides/emr-opts.html

runners:
  emr:
    aws_access_key_id:
    aws_secret_access_key:
    ec2_key_pair: secure
    ec2_key_pair_file: /secure.pem
    region: us-east-1 # http://docs.aws.amazon.com/general/latest/gr/rande.html
    master_instance_type: m5.xlarge # https://aws.amazon.com/emr/pricing/
    instance_type: m5.xlarge
    num_core_instances: 1
    ssh_tunnel: true

Writing /home/nbuser/.mrjob.conf


In [14]:
!python word_count.py -r emr s3://mrjob-cs351-cory/Input/shortjokes.csv \
--output-dir=s3://mrjob-cs351-cory/bigrams_out \
--no-output

Using configs in /home/nbuser/.mrjob.conf
Auto-created temp S3 bucket mrjob-2f3b5b12e1141640
Using s3://mrjob-2f3b5b12e1141640/tmp/ as our temp dir on S3
Creating temp directory /tmp/word_count.nbuser.20190517.012349.219767
writing master bootstrap script to /tmp/word_count.nbuser.20190517.012349.219767/b.sh
uploading working dir files to s3://mrjob-2f3b5b12e1141640/tmp/word_count.nbuser.20190517.012349.219767/files/wd...
Copying other local files to s3://mrjob-2f3b5b12e1141640/tmp/word_count.nbuser.20190517.012349.219767/files/
Created new cluster j-3AD92YMNRRS13
Added EMR tags to cluster j-3AD92YMNRRS13: __mrjob_label=word_count, __mrjob_owner=nbuser, __mrjob_version=0.6.8
Waiting for Step 1 of 3 (s-3N3M0MUIIX7RS) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Con

Removing temp directory /tmp/word_count.nbuser.20190517.012349.219767...
Removing log files in s3://mrjob-2f3b5b12e1141640/tmp/logs/j-3AD92YMNRRS13/...
Terminating cluster: j-3AD92YMNRRS13
