In [73]:
%%file word_count.py

from mrjob.job import MRJob
from mrjob.step import MRStep
from itertools import tee
import re
import sys

WORD_KEY = re.compile(r"[\w']+")

# New class for MRJob Word Probability
class MapReduce(MRJob):
    
    def steps(self):
        return [
            MRStep(mapper=self.input_file),
            MRStep(mapper=self.set_bigrams,
                   reducer=self.count_bigrams),
            MRStep(reducer=self.solution)
        ]
    
    # Step 1 - Edits the file's lines to make them more "readable"
    def input_file(self, _, line):
        if(line[0] != '"'):
            yield (None, line[line.find(","):].lower())
    
    # Step 2 - Pull words from the string and make a bigram for every word
    def set_bigrams(self, _, line):
        prev = ""
        # Find all the words
        for word in WORD_KEY.findall(line):
            if(prev != ""):
                yield ((prev, word), 1)
            prev = word
    
    # Step 2 - Combine all like bigrams
    def count_bigrams(self, word, counts):
        first_word, second_word = word
        yield first_word, (sum(counts), second_word)
        
    # Function used to sort by probability
    def mostUsed(self, x):
        num, word = x
        return num
    
    # Step 3 - Calculate percentages
    def solution(self, word, pairs):
        total = 0
        
        # Tee off the iterator so we can have 3 total runs through the data
        pairs, secondPairs = tee(pairs)
        pairs, secondPairs = tee(pairs)
        
        # First, calculate the total number of occurences of each bigram
        for pair in pairs:
            tempCount, _ = pair
            total = total + tempCount
        
        # Second, create and print the probability list
        probabilityList = sorted(secondPairs, key=self.mostUsed, reverse = True)
        for tempPair in probabilityList:
            word_count, word_key = tempPair
            yield (word, word_key), ((float(word_count) / total), word_count)
        
        # Third, if the first word is "my", print the most used pairs
        if (word == "my"):
            for i in range(10):
                if i == len(probabilityList): 
                    break
                word_count, word_key = probabilityList[i]
                yield 'Most Used - ' + str(i+1), ((word, word_key), word_count / total, word_count)
        
        # Third, if the first word is "my", print the most used pairs
        if (word == "my"):
            for i in range(10):
                if i == len(probabilityList): 
                    break
                word_count, word_key = probabilityList[i]
                yield 'Most Used - ' + str(i+1), ((word, word_key), word_count / total, word_count)
        
        

# Run the program
if __name__ == '__main__':
    MapReduce.run()

Overwriting word_count.py


In [4]:
!pip install mrjob

Collecting mrjob
[?25l  Downloading https://files.pythonhosted.org/packages/40/e3/53ee0f4a5791e856065878751fa1959b0a5ea0b20d458c8b6bf28c59020d/mrjob-0.6.8-py2.py3-none-any.whl (428kB)
[K     |████████████████████████████████| 430kB 1.5MB/s eta 0:00:01
[?25hCollecting google-cloud-dataproc>=0.3.0 (from mrjob)
[?25l  Downloading https://files.pythonhosted.org/packages/86/9b/30f1e5f55515334b2d897afd19234da53113910ac9fb2d9b2ec128dd60d5/google_cloud_dataproc-0.3.1-py2.py3-none-any.whl (211kB)
[K     |████████████████████████████████| 215kB 6.4MB/s eta 0:00:01
[?25hCollecting google-cloud-storage>=1.13.1 (from mrjob)
[?25l  Downloading https://files.pythonhosted.org/packages/34/7c/e2f563c1af1d9042b1a4e03dfcdd73fc31a5236fd108440b4981b4eaea8d/google_cloud_storage-1.16.0-py2.py3-none-any.whl (65kB)
[K     |████████████████████████████████| 71kB 2.9MB/s eta 0:00:011
Collecting google-cloud-logging>=1.9.0 (from mrjob)
[?25l  Downloading https://files.pythonhosted.org/packages/6e/f6/bdfa6

In [None]:
!python word_count.py -r local shortjokes.csv --output-dir=word_count_out --no-output

Using configs in /home/nbuser/.mrjob.conf
No configs specified for local runner
Creating temp directory /tmp/word_count.nbuser.20190520.051012.491557
Running step 1 of 3...
Running step 2 of 3...
Running step 3 of 3...


In [77]:
!python word_count.py -r emr s3://cs351-mapreduce/input/shortjokes.csv \
--output-dir=s3://cs351-mapreduce/word_count_out \
--no-output

Using configs in /home/nbuser/.mrjob.conf
Using s3://mrjob-269de06342093388/tmp/ as our temp dir on S3
Creating temp directory /tmp/word_count.nbuser.20190520.051620.228053
writing master bootstrap script to /tmp/word_count.nbuser.20190520.051620.228053/b.sh
uploading working dir files to s3://mrjob-269de06342093388/tmp/word_count.nbuser.20190520.051620.228053/files/wd...
Copying other local files to s3://mrjob-269de06342093388/tmp/word_count.nbuser.20190520.051620.228053/files/
Created new cluster j-1890DE4UYNBNF
Added EMR tags to cluster j-1890DE4UYNBNF: __mrjob_label=word_count, __mrjob_owner=nbuser, __mrjob_version=0.6.8
Waiting for Step 1 of 3 (s-5ZC9GLTZ9MN9) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  Connect to resource man

In [36]:
%%file ~/.mrjob.conf

# http://mrjob.readthedocs.io/en/stable/guides/emr-opts.html

runners:
  emr:
    aws_access_key_id: AKIAIJS3VLGEQGPGKASQ
    aws_secret_access_key: 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
    ec2_key_pair: cs351
    ec2_key_pair_file: cs351.pem
    region: us-east-2 # http://docs.aws.amazon.com/general/latest/gr/rande.html
    master_instance_type: m5.xlarge # https://aws.amazon.com/emr/pricing/
    instance_type: m5.xlarge
    num_core_instances: 1
    ssh_tunnel: true

Overwriting /home/nbuser/.mrjob.conf
