<a href="https://colab.research.google.com/github/MohammadHeydari/BigData/blob/master/bda_mrjob_Mohammad_Heydari.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Big Data Analytics Assignment (mrjob)

This assignment consist of 3 problems with different datasets. You should solve all questions of each problem by using "MapReduce Programming Model" and implement them with mrjob python package.

### Problems:
<b>1) Store Management</b>

<i>
use the customer-orders.csv as dataset. The customer-orders.csv has three column: the ﬁrst column is customer ID, the second column is goods ID, and the third column is the price of the basket.
</i>
    

<b>2) Movielens Dataset</b>

<i>
    movielense dataset is consist by two ﬁles: u.data and u.item. u.data has 4 columns: user ID, movie ID, rating, date. the u.item has 2 columns: movie ID and movie name.
</i>
    

<b>3) Google Play Store App Reviews</b>

<i>
    "Google Play Store App Reviews" dataset is consist by two ﬁles: "googleplaystore.txt" and "googleplaystore_user_reviews.txt".    
    
googleplaystore.txt has 13 columns: App, Category, Rating, Reviews	Size, Installs, Type, Price, Content Rating, Genres, Last Updated, Current Ver, Android Ver.  

The "googleplaystore_user_reviews.txt" has 5 columns: App, Translated_Review, Sentiment, Sentiment_Polarity, Sentiment_Subjectivity.
</i>
    
<i>
    In these .txt files, the seperator is "∑". You can open corresponding .csv files to view datasets as tables.
</i>
    
### Notice:

* Install mrjob package first:
    ```!pip install mrjob```

* Write your codes and get your outputs in this notebook and just send .ipynb file (this file) with your name on it. 

* Use ```%%writefile name_of_file.py``` at the first line of the cell to create .py file for each cell.

* You can run your scripts in this notebook using ```!python name_of_file.py your_dataset.csv [optional arguments]``` .
    
* Send your solution to this email:
    ```m.h.zendehpey@gmail.com```
    
* Feel free to explore <a>https://mrjob.readthedocs.io/en/latest/</a>.


## Good Luck.

---


In [77]:
#درج فایل های دیتا در کولب
from google.colab import files
up = files.upload()

Saving googleplaystore.csv to googleplaystore (1).csv
Saving googleplaystore.txt to googleplaystore (1).txt
Saving googleplaystore_user_reviews.csv to googleplaystore_user_reviews (1).csv
Saving googleplaystore_user_reviews.txt to googleplaystore_user_reviews (1).txt


In [79]:
!pip install mrjob



# Samples:

In [0]:
%%writefile testtext.txt
hello world.
this is a text.
this is a test text, world.
goodbye world.

Writing testtext.txt


In [0]:
%%writefile sample1.py

from mrjob.job import MRJob,MRStep

class WordCount(MRJob):
    
    def mapper(self,_, line):
        for word in line.split(' '):
            yield word.lower(),1
            
    def reducer(self, word, counts):
        yield word, sum(counts)
        
        
if __name__ == '__main__':
    WordCount.run()

Writing sample1.py


In [0]:
!python sample1.py testtext.txt -q

"is"	2
"test"	1
"text,"	1
"a"	2
"goodbye"	1
"hello"	1
"text."	1
"this"	2
"world."	3


In [0]:
%%writefile sample2.py

from mrjob.job import MRJob,MRStep

class MaxCount(MRJob):
    
    def steps(self):
        return [
            MRStep(mapper = self.mapper1,
                  reducer = self.reducer1),
            MRStep(reducer = self.reducer2)
        ]
    
    
    def mapper1(self, _, line):
       
        for word in line.split(' '):
            yield word.lower(), 1
            
    def reducer1(self, word, counts):
        #key= None , Values(count , word)
        yield None, (sum(counts),word)
        
    def reducer2(self, _, pairs):
        yield max(pairs)
        

if __name__ == '__main__':
    MaxCount.run()

Writing sample2.py


In [0]:
!python sample2.py testtext.txt -q

3	"world."


# 1) Store Management

In [0]:
#type 1
%%writefile best_selling_and_how_much.py 

from mrjob.job import MRJob
from mrjob.step import MRStep

#def class
class SpendByCustomerSorted(MRJob):

    #init 2 steps
    def steps(self):
        #steps = count, sum, key, res
        return [
            MRStep(mapper=self.mapper_count_amount,
                   reducer=self.reducer_sum_amount),
            MRStep(mapper=self.mapper_make_amounts_key,
                   reducer=self.reducer_ouput_amounts)
        ]

    #mapping count
    def mapper_count_amount(self, _, line):

        #split dataset lines
        (customer, item, orderAmount) = line.split(',')
        yield customer, float(orderAmount)

    #recuction phase
    def reducer_sum_amount(self, customer, amounts):

        #make sum of values
        yield customer, sum(amounts)

    #key init
    def mapper_make_amounts_key(self, customer, amounts):
        yield "%04.02f"%float(amounts), customer

    def reducer_ouput_amounts(self, amounts, customers):
        for customer in customers:
            yield amounts, customer

#main part to run the program
if __name__ == '__main__':

    #run main function to calc sorted sum
    SpendByCustomerSorted.run()

Writing best_selling_and_how_much.py


In [0]:
!python best_selling_and_how_much.py customer-orders.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/best_selling_and_how_much.root.20191217.075730.162488
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/best_selling_and_how_much.root.20191217.075730.162488/output
Streaming final output from /tmp/best_selling_and_how_much.root.20191217.075730.162488/output...
"4681.92"	"66"
"4701.02"	"56"
"4707.41"	"21"
"4727.86"	"80"
"4735.03"	"14"
"4735.20"	"37"
"4755.07"	"7"
"4756.89"	"44"
"4765.05"	"31"
"4812.49"	"82"
"4815.05"	"4"
"4819.70"	"10"
"4830.55"	"88"
"4836.86"	"20"
"4851.48"	"89"
"4876.84"	"95"
"4898.46"	"38"
"4904.21"	"76"
"4908.81"	"86"
"4915.89"	"27"
"4921.27"	"18"
"4945.30"	"53"
"4958.60"	"1"
"4975.22"	"51"
"4979.06"	"16"
"5330.80"	"34"
"5337.44"	"72"
"5368.25"	"70"
"5368.83"	"43"
"5379.28"	"92"
"5397.88"	"6"
"5413.51"	"15"
"5415.15"	"63"
"5437.73"	"58"
"5496.05"	"32"
"5497.48"	"61"
"5503.43"	"85"
"5517.24"	"8"
"5524.95"	"0"
"5637.62"	"41"
"5642

In [0]:
#type2
%%writefile Customer_Records.py 

from mrjob.job import MRJob

#define class
class Customer_Records(MRJob):

    #mapping phase
    def mapper(self, _, line):

        #spliting lines of the dataset
        (customer, item, orderAmount) = line.split(',')
        #float is cause of values can't be round sometimes
        yield customer, float(orderAmount)

    #reduce phase
    def reducer(self, customer, orders):

        #calc sum of customers order values
        yield customer, sum(orders)

#to run the main function to calc sum of customers order values
if __name__ == '__main__':
    #function run phase
    Customer_Records.run()

Overwriting Customer_Records.py


In [0]:
!python customer.py customer-orders.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/customer.root.20191217.074958.282619
Running step 1 of 1...
job output is in /tmp/customer.root.20191217.074958.282619/output
Streaming final output from /tmp/customer.root.20191217.074958.282619/output...
"31"	4765.05
"32"	5496.050000000004
"33"	5254.659999999998
"34"	5330.8
"35"	5155.419999999999
"36"	4278.049999999997
"37"	4735.200000000002
"38"	4898.460000000002
"39"	6193.109999999999
"4"	4815.050000000002
"40"	5186.429999999999
"41"	5637.62
"42"	5696.840000000003
"43"	5368.83
"44"	4756.8899999999985
"45"	3309.38
"46"	5963.109999999999
"47"	4316.299999999999
"48"	4384.33
"49"	4394.599999999999
"5"	4561.069999999999
"50"	4517.27
"51"	4975.22
"52"	5245.059999999999
"53"	4945.299999999999
"54"	6065.389999999999
"78"	4524.509999999999
"79"	3790.570000000001
"8"	5517.240000000001
"80"	4727.860000000001
"81"	5112.709999999999
"82"	4812.489999999998
"83"	4635.7999999999

In [0]:
%%writefile Top.py
from mrjob.job import MRJob,MRStep

class Top(MRJob):    
    def steps(self):
        return [
            MRStep(
                mapper=self.mapper,
                reducer=self.first
            ),
            MRStep(
                reducer=self.second
            )
        ]
    def mapper(self, _, line):       
        sep = line.split(',')
        yield ('The topest one is : '+ ' '+sep[1]), float(sep[2])

    def first(self, key, values):                    
        yield None,(float(format(sum(values),'.2f')),key )  
    def second(self,_ , value):
        yield max(value)
           
if __name__ == '__main__':
    Top.run()

Overwriting Top.py


In [0]:
!python Top.py customer-orders.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/Top.root.20191217.080326.865661
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/Top.root.20191217.080326.865661/output
Streaming final output from /tmp/Top.root.20191217.080326.865661/output...
495.32	"The topest one is :  8395"
Removing temp directory /tmp/Top.root.20191217.080326.865661...


# 2) Movielens Dataset

Q1: What is the most visited movie?

In [0]:
%%writefile Top_Movie.py 


from mrjob.job import MRJob
from mrjob.step import MRStep

class Top_Movie(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.get_movies_rating,
                   reducer=self.reducer_movie_rating),
            MRStep(reducer=self.reducer_output)
        ]

    def movie_rate(self, key, value):
        (userID, movieID, rating, timestamp) = value.split('\t')
        yield movieID, 1

    def red_movie_rate(self, movieID, occurrences):
        # prepare a list of tuple
        yield None, (sum(occurrences), movieID)

    def red_res(self, _, values):
        # max value from th queue of tuple
        yield ('The Top Movie is : ', '+'  max(values))


if __name__ == '__main__':
    Top_Movie.run()

Overwriting Top_Movie.py


In [0]:
!python most_visited.py u.data

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/most_visited.root.20191217.081301.176411
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/most_visited.root.20191217.081301.176411/output
Streaming final output from /tmp/most_visited.root.20191217.081301.176411/output...
583	"50"
Removing temp directory /tmp/most_visited.root.20191217.081301.176411...


Q2: Find the user which most voted in this dataset.

In [0]:
%%writefile Most_Active_User.py
from mrjob.job import MRJob,MRStep

class Most_Active_User(MRJob):    
    def steps(self):
        return [
            MRStep(
                mapper=self.mapper,               
                reducer=self.first
            ),
            MRStep(
                reducer=self.second
            )
        ] 
    def mapper(self, _, line):       
        sep = line.split('	')
        yield ('Most Active Users ID:' +' '+ sep[0], 1)  
    def first(self, key, values):                    
        yield None,(sum(values),key )  
    def second(self,_ , value):
        yield max(value)
           
if __name__ == '__main__':
    Most_Active_User.run()

Writing Most_Active_User.py


In [0]:
!python Most_Active_User.py u.data

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/Most_Active_User.root.20191217.081225.266514
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/Most_Active_User.root.20191217.081225.266514/output
Streaming final output from /tmp/Most_Active_User.root.20191217.081225.266514/output...
737	"Most Active Users ID: 405"
Removing temp directory /tmp/Most_Active_User.root.20191217.081225.266514...


Q3: What is the average rate of each movie?

In [74]:
%%writefile Avg_Movies_Rate.py
from mrjob.job import MRJob,MRStep

class Avg_Movies_Rate(MRJob):    
    def steps(self):
        return [
            MRStep(
                mapper=self.mapper,               
                reducer=self.reducer
            )
        ] 
    def mapper(self, _, line):       
        sep = line.split('	')
        yield sep[1],(sep[2])

    def reducer(self, key, values): 
        rate_value = 0, 
        rate_count = 0, 
        rate_sum = 0

        for rate_value in values:
            rate_count += 1
            rate_sum += int(i)

        yield ' Avg Movies Rate is: '+' '+ str(key),float(format(rate_sum/float(rate_count),'.2f')) 
      
if __name__ == '__main__':
    Avg_Movies_Rate.run()

Overwriting Movie_AVG_Rate.py


In [75]:
!python Avg_Movies_Rate.py u.data

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/Movie_AVG_Rate.root.20191217.101516.274767
Running step 1 of 1...
job output is in /tmp/Movie_AVG_Rate.root.20191217.101516.274767/output
Streaming final output from /tmp/Movie_AVG_Rate.root.20191217.101516.274767/output...
"Avg Rate Of Movie 195"	3.93
"Avg Rate Of Movie 196"	3.92
"Avg Rate Of Movie 197"	4.1
"Avg Rate Of Movie 198"	4.01
"Avg Rate Of Movie 199"	4.18
"Avg Rate Of Movie 2"	3.21
"Avg Rate Of Movie 20"	3.42
"Avg Rate Of Movie 200"	3.83
"Avg Rate Of Movie 201"	3.52
"Avg Rate Of Movie 202"	3.76
"Avg Rate Of Movie 203"	3.87
"Avg Rate Of Movie 204"	3.83
"Avg Rate Of Movie 205"	3.99
"Avg Rate Of Movie 206"	3.38
"Avg Rate Of Movie 207"	3.82
"Avg Rate Of Movie 208"	3.94
"Avg Rate Of Movie 209"	3.91
"Avg Rate Of Movie 21"	2.76
"Avg Rate Of Movie 210"	3.93
"Avg Rate Of Movie 211"	3.91
"Avg Rate Of Movie 212"	3.46
"Avg Rate Of Movie 213"	3.99
"Avg Rate Of Movie 214

# 3) Google Play Store App Reviews

Q1: Return app count of each version, sorted ascending (Bonus: or descending).

output template:
<version, count>

In [85]:
%%writefile GP_1.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re #for regular expression

class GP_1(MRJob):

    
    def steps(self):
        return [
            MRStep(
                mapper=self.mapper,
                reducer=self.reducer1
            ),
            MRStep(
                reducer=self.reducer2
            )
        ]
    def mapper(self, _, line):
        
        #regular expression pattern
        r_e = r'(\".+?\"|\[.+?\]|\(.+?\)|\{.+?\}|[^\"[({]+?)(?:,|$)'

        #init regex 
        smaple = re.compile(r_e)

        #init regex to each line of the dataset
        each_line =  smaple.findall(line)      

        try:
            if "Android Ver" not in each_line:
                yield str( each_line[12]), 1

        #if exception occured
        except IndexError:

            #if we got missing value in our data
            nan_data = 'NaN'
             
    def reducer1(self, key, values):          
            yield None, (sum(values), key)
            
    def reducer2(self, _, counts):         

            for count, key in sorted(counts, reverse=True):
                yield (key,int(count))         
           
if __name__ == '__main__':
    GP_1.run()


Overwriting GP_1.py


In [86]:
!python GP_1.py googleplaystore.csv 

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/GP_1.root.20191217.104241.189997
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/GP_1.root.20191217.104241.189997/output
Streaming final output from /tmp/GP_1.root.20191217.104241.189997/output...
"4.1 and up"	2451
"4.0.3 and up"	1501
"4.0 and up"	1375
"Varies with device"	1361
"4.4 and up"	980
"2.3 and up"	652
"5.0 and up"	601
"4.2 and up"	394
"2.3.3 and up"	281
"2.2 and up"	244
"4.3 and up"	243
"3.0 and up"	241
"2.1 and up"	134
"1.6 and up"	116
"6.0 and up"	60
"7.0 and up"	42
"3.2 and up"	36
"2.0 and up"	32
"5.1 and up"	24
"1.5 and up"	19
"4.4W and up"	12
"3.1 and up"	10
"2.0.1 and up"	7
"8.0 and up"	6
"7.1 and up"	3
"NaN"	2
"5.0 - 8.0"	2
"4.0.3 - 7.1.1"	2
"1.0 and up"	2
"7.0 - 7.1.1"	1
"5.0 - 7.1.1"	1
"5.0 - 6.0"	1
"4.1 - 7.1.1"	1
"2.2 - 7.1.1"	1
Removing temp directory /tmp/GP_1.root.20191217.104241.189997...


Q2: Return app count of each category divided by android version and sorted by count.

output template:
<category, {count, version}>

In [90]:
%%writefile GP_2.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re
class GP_2(MRJob):

    
    def steps(self):
        return [
            MRStep(
                mapper=self.mapper,
                reducer=self.first_reducer
            ),
            MRStep(
                reducer=self.second_reducer
            )
        ]

    def mapper(self, _, line):

        #regular expression pattern
        r_e = r'(\".+?\"|\[.+?\]|\(.+?\)|\{.+?\}|[^\"[({]+?)(?:,|$)'

        #init regex
        sample = re.compile(r_e)

        #init regex to each line of the dataset
        each_line =  sample.findall(line)

        try:
            yield [str(each_line[1]),str(each_line[12])],1
        
        except IndexError:

            #if we got missing value in our data
            nan_data = 'NaN'
         
    def first_reducer(self, key, values):          
            yield  None,(key,sum(values))
            
    def second_reducer(self, _, values):                    
            for item in values:   
                yield   '<'+item[0][0]+',{',str((str(item[1]),item[0][1]))+'>}'
      
                                   
if __name__ == '__main__':
    GP_2.run()

Overwriting GP_2.py


In [91]:
!python GP_2.py googleplaystore.csv 

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/GP_2.root.20191217.105704.738509
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/GP_2.root.20191217.105704.738509/output
Streaming final output from /tmp/GP_2.root.20191217.105704.738509/output...
"<FAMILY,{"	"('307', '4.0.3 and up')>}"
"<FAMILY,{"	"('516', '4.1 and up')>}"
"<FAMILY,{"	"('72', '4.2 and up')>}"
"<FAMILY,{"	"('41', '4.3 and up')>}"
"<FAMILY,{"	"('150', '4.4 and up')>}"
"<FAMILY,{"	"('51', '5.0 and up')>}"
"<FAMILY,{"	"('6', '5.1 and up')>}"
"<FAMILY,{"	"('3', '6.0 and up')>}"
"<FAMILY,{"	"('10', '7.0 and up')>}"
"<FAMILY,{"	"('83', 'Varies with device')>}"
"<FINANCE,{"	"('1', '1.5 and up')>}"
"<FINANCE,{"	"('1', '1.6 and up')>}"
"<FINANCE,{"	"('3', '2.1 and up')>}"
"<FINANCE,{"	"('5', '2.2 and up')>}"
"<FINANCE,{"	"('8', '2.3 and up')>}"
"<FINANCE,{"	"('2', '2.3.3 and up')>}"
"<FINANCE,{"	"('7', '3.0 and up')>}"
"<FINANCE,{"	"('2', 

Q3: For each app review, which pair of words used together more frequently. sort by count.

output template:
<appname, {count, word1, word2} >

In [96]:
%%writefile GP_3.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import re
class GP_3(MRJob):

        #items are sorted in the desired oreder
        ordered = True

        def steps(self):
            return [
            MRStep(
                mapper=self.mapper,
                reducer=self.reducer
            )
        ]


        def mapper(self, _, line):

            r_e = r'(\".+?\"|\[.+?\]|\(.+?\)|\{.+?\}|[^\"[({]+?)(?:,|$)'
            sample = re.compile(r_e)
            
            each_line =  sample.findall(line)
            
            try:
                #split lines into the sepreated words
                words = each_line[1].split()

                #if we got a word not just an alphabet                              
                if len(words) > 1:

                    #iterate in the words list by enumeration                  
                    for item, word in enumerate(words):
                        if item +1 < len(words):

                            #turn words to the lower form if they are in upper form
                            ws = (word.lower(), words[item+1].lower())
                            yield ws, 1

            except IndexError:
                non_data = 'NaN'

        def reducer(self, key, values):                      
            yield  key,sum(values)
                 
if __name__ == '__main__':
    GP_3.run()

Overwriting GP_3.py


In [97]:
!python  GP_3.py googleplaystore_user_reviews.csv 

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/GP_3.root.20191217.110518.235643
Running step 1 of 1...
job output is in /tmp/GP_3.root.20191217.110518.235643/output
Streaming final output from /tmp/GP_3.root.20191217.110518.235643/output...
["done.", "will"]	1
["done.", "with"]	1
["done.", "you"]	1
["done.", "your"]	1
["done..", "choose"]	1
["done..", "had"]	2
["done..", "no"]	1
["done......my", "first"]	1
["done?", "or"]	1
["done\"\"", "idk"]	1
["donkey", "balls!\""]	1
["donkey", "balls"]	3
["donkey", "zebra"]	2
["donned", "f0r"]	1
["donor", "card"]	8
["donor", "card."]	4
["donor", "id"]	2
["donor", "id,"]	1
["donor", "information."]	1
["donot", "ever"]	1
["donot", "start"]	1
["dont", "."]	1
["dont", "...when"]	1
["dont", "allow"]	1
["dont", "anymore,now"]	1
["dont", "anymore.!!!"]	1
["dont", "anything"]	1
["dont", "app,"]	2
["dont", "appear."]	3
["dont", "apply"]	1
["dont", "appreciate"]	2
["dont", "available"]

IOPub data rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_data_rate_limit`.

Current values:
NotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
NotebookApp.rate_limit_window=3.0 (secs)

