#### Pattern 1: Filtering patterns
Filtering: slicing the data based on a set of conditions.
We are looking to find the top 10 longest sentences in Rumi poems.

In [2]:
from time import time
import codecs

In [49]:
%%file codes/rumi_top10.py 
# this magic command allows saving output of python to folder
from mrjob.job import MRJob

class MRJobtoppoems(MRJob):
    
    
    def mapper(self,_,line):
        word_count = line.strip().split(' ')
        yield None, (len(word_count), line)
        
    def reducer(self, _, len_line_pairs):
        self.plist = []
        self.llist = []
        for v in len_line_pairs:
            self.plist.append(v)
        top10 = sorted(self.plist, reverse = True)[:10]
        for m in top10:
            yield m
        
    
       
if __name__ == '__main__':  
    MRJobtoppoems.run()  # where MRJobCategoryCost is your job class

Overwriting codes/rumi_top10.py


In [51]:
tic = time()
string =! python codes/rumi_top10.py < data/rumi.txt 
toc = time()

print('Running this query took {} seconds.'.format(toc - tic))
for s in string:
    try:
        print(codecs.decode(s, 'unicode_escape'))
    except:
        pass

Running this query took 7.614239931106567 seconds.
No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
reading from STDIN
15	"خموش این  بی  و این  تی  را به جادویی مده شکلی"
14	"نه از خاکم و نه از بادم نه از آتش و نه از آبم"
14	"از دل چه خوش دل می بری وز سر چه خوش سر می کشی"
13	"که آن سایه ست و این خورشید و آن پستست و این سامی"
13	"ره زن که خوش ره می زنی می کش که زیبا می کشی"
13	"بی دست و بی دل می شوم چون دست بر من می زنی"
13	"این کف به سر بر می رود چون سر به کیوان می کشی"
12	"یکی شاهی به معنی صد که جان و دل ز من بستد"
12	"گویم که  خمش کن که نه کی دانم و نی بی "
12	"که گوید شیر را هرگز   چه شیری تو که خونخواری "


#### Pattern 2:  Summarization patterns:
Let's find the mean number of characters in Rumi poem sentences.

In [58]:
%%file codes/sent_len.py 
# this magic command allows saving output of python to folder
from mrjob.job import MRJob

class MRJobSentLen(MRJob):
    
    def mapper(self,_,line):
        yield None, len(line)
        
    def reducer(self, _, lens):
        n = total = 0
        for l in lens:
            n += 1
            total += l
            
        yield "Sentence length average:", total / n
        
    
       
if __name__ == '__main__':  
    MRJobSentLen.run()  # where MRJobCategoryCost is your job class

Overwriting codes/sent_len.py


In [59]:
tic = time()
string =! python codes/sent_len.py < data/r.txt 
toc = time()

print('Running this query took {} seconds.'.format(toc - tic))
for s in string:
    try:
        print(codecs.decode(s, 'unicode_escape'))
    except:
        pass

Running this query took 2.440459728240967 seconds.
No configs found; falling back on auto-configuration
No configs specified for inline runner
Running step 1 of 1...
reading from STDIN
"Sentence length average:"	25.641


#### Pattern 3:  Structural patterns (join two tables):
For this pattern, we need relational database structured kind of data (primary key, foreign key needs to be avaialble). Therefore, I am going to demonstrate this using other datasets.
To do join using MRJob, we first need to concatanate the text files into one text file. This allows reading one stream of data.

In [11]:
%%file codes/join.py 
#!/usr/bin/python

# this magic command allows saving output of python to folder
from mrjob.job import MRJob
#from mr3px.csvprotocol import CsvProtocol # to output in csv format

class MRJobJoin(MRJob):
    #OUTPUT_PROTOCOL = CsvProtocol
    def mapper(self,_,line):
        data = line.split("\t")
        if len(data) == 5 and data[0] != "user_ptr_id": # making sure we are skipping the header
            user, reputation, gold, silver, bronze = data # unpacking the data
            yield user, ["U", reputation, gold, silver, bronze]  # U is used to flag user database
        elif len(data) == 19 and data[0] != "id": # making sure we are skipping the header
            id_tag = data[0]
            title = data[1]
            tagnames = data[2]
            user = data[3]
            node_type = data[5]
            parent_id = data[6]
            abs_parent_id = data[7]
            added_at = data[8]
            score = data[9]
            yield user, ["N", id_tag, title, tagnames, node_type, parent_id, abs_parent_id, added_at, score]
            
    def reducer(self, user, packed_values):
        self.outlist = [None for _ in range(13)]  # empty list to keep the joined data
        self.outlist[3] = user  # user id is the key
        for line in packed_values:
            if line[0] == 'U': # coming from the user file
                self.outlist[9] = line[1] # reputation
                self.outlist[10] = line[2] # gold
                self.outlist[11] = line[3] # silver
                self.outlist[12] = line[4] # bronze
            elif line[0] == 'N':  # coming from the node file
                self.outlist[0] = line[1]
                self.outlist[1] = line[2]
                self.outlist[2] = line[3]      
                self.outlist[4] = line[4]
                self.outlist[5] = line[5]
                self.outlist[6] = line[6]
                self.outlist[7] = line[7]      
                self.outlist[8] = line[8]
                yield None, self.outlist
  
       
if __name__ == '__main__':  
    MRJobJoin.run()  # where MRJobCategoryCost is your job class

Overwriting codes/join.py


In [9]:
tic = time()
string =! python codes/join.py data/forum_users.tsv data/sample.tsv > outputs/users_node_joined.csv
toc = time()

print('Running this query took {} seconds.'.format(toc - tic))
for s in string:
    print(s)

Running this query took 7.279247522354126 seconds.
Using configs in C:\Users\Amin\.mrjob.conf
No configs specified for inline runner
Running step 1 of 1...
Creating temp directory C:\Users\Amin\AppData\Local\Temp\join.Amin.20190304.002055.195473
job output is in C:\Users\Amin\AppData\Local\Temp\join.Amin.20190304.002055.195473\output
Streaming final output from C:\Users\Amin\AppData\Local\Temp\join.Amin.20190304.002055.195473\output...
Removing temp directory C:\Users\Amin\AppData\Local\Temp\join.Amin.20190304.002055.195473...


In [None]:
!python codes/join.py -r emr s3://mapreduce0000/data/forum_users.tsv \
    s3://mapreduce0000/data/forum_node.tsv \
        --output-dir=s3://mapreduce0000/joined_tables/