# mrjob

[link](https://mrjob.readthedocs.io/)

`pip install mrjob`

In [7]:
!pip install mrjob



In [8]:
# this data is a subject of analysis
!wget https://raw.githubusercontent.com/sanityseeker/lspy-2023/main/data/crime-punishment.txt

--2024-03-01 10:05:37--  https://raw.githubusercontent.com/sanityseeker/lspy-2023/main/data/crime-punishment.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1201520 (1.1M) [text/plain]
Saving to: ‘crime-punishment.txt.3’


2024-03-01 10:05:37 (61.0 MB/s) - ‘crime-punishment.txt.3’ saved [1201520/1201520]



- A **mapper** takes a single key and value as input, and returns zero or more (key, value) pairs. The pairs from all map outputs of a single step are grouped by key.

- A **combiner** takes a key and a subset of the values for that key as input and returns zero or more (key, value) pairs. Combiners are optimizations that run immediately after each mapper and can be used to decrease total data transfer. Combiners should be idempotent (produce the same output if run multiple times in the job pipeline).

- A **reducer** takes a key and the complete set of values for that key in the current step, and returns zero or more arbitrary (key, value) pairs as output.
After the reducer has run, if there are more steps, the individual results are arbitrarily assigned to mappers for further processing. If there are no more steps, the results are sorted and made available for reading.

# Words Lines Chars

In [10]:
%%writefile job.py
from mrjob.job import MRJob


class MRWordFrequencyCount(MRJob):

    def mapper(self, _, line):
        yield "words", len(line.split())
        yield "line", 1
        yield "chars", len(line)

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRWordFrequencyCount.run()

Overwriting job.py


In [14]:
!python job.py crime-punishment.txt

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/job.root.20240301.101325.235928
Running step 1 of 1...
job output is in /tmp/job.root.20240301.101325.235928/output
Streaming final output from /tmp/job.root.20240301.101325.235928/output...
"words"	206551
"line"	22443
"chars"	1131926
Removing temp directory /tmp/job.root.20240301.101325.235928...


# Names

Давайте немного усложним задачу и попробуем прикинуть, сколько раз в тексте упоминаются пары Имя Отчество?

In [18]:
import re

regexp = re.compile('([A-Z][a-z]{2,})\s([A-Z][a-z]{3,}(vna|ich|itch))')

In [19]:
with open("crime-punishment.txt") as f:
    data = f.read()


for result in re.finditer(regexp, data):
    name = re.sub('\s+', ' ', result.group())
    print(name)

Alyona Ivanovna
Alyona Ivanovna
Alyona Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Amalia Fyodorovna
Katerina Ivanovna
Ivan Ivanitch
Katerina Ivanovna
Katerina Ivanovna
Darya Frantsovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Darya Frantsovna
Sofya Semyonovna
Amalia Fyodorovna
Darya Frantsovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Ivan Afanasyvitch
Ivan Afanasyvitch
Katerina Ivanovna
Semyon Zaharovitch
Katerina Ivanovna
Katerina Ivanovna
Amalia Fyodorovna
Semyon Zaharovitch
Semyon Zaharovitch
Semyon Zaharovitch
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Katerina Ivanovna
Praskovya Pavlovna
Vassily Ivanovitch
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Marfa Petrovna
Pyotr Petrovitch
Marfa Petrovna
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr Petrovitch
Pyotr 

In [20]:
%%writefile job2.py
from mrjob.job import MRJob
import re

regexp =re.compile('([A-Z][a-z]{2,})\s([A-Z][a-z]{3,}(vna|ich|itch))')


class MRNameFrequencyCount(MRJob):

    def mapper(self, _, line):
        for name in re.finditer(regexp, line):
            yield re.sub("\s+", " ", name.group()), 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRNameFrequencyCount.run()

Writing job2.py


In [22]:
!python job2.py -q crime-punishment.txt

"Nastasya Nikiforovna"	1
"Natalya Yegorovna"	1
"Nil Pavlitch"	1
"Porfiry Petrovitch"	75
"Praskovya Pavlovna"	9
"Pulcheria Alexandrovna"	101
"Pyotr Petrovitch"	148
"Ivan Ivanitch"	1
"Ivan Mihailovitch"	1
"Katerina Ivanovna"	186
"Lizaveta Ivanovna"	5
"Luise Ivanovna"	8
"Madame Resslich"	8
"Marfa Petrovna"	69
"Afanasy Ivanitch"	1
"Afanasy Ivanovitch"	6
"Alexandr Grigorievitch"	1
"Alexey Semyonovitch"	1
"Alyona Ivanovna"	11
"Amalia Fyodorovna"	3
"Amalia Ivanovna"	50
"Amalia Ludwigovna"	8
"Andrey Semyonovitch"	18
"Arkady Ivanovitch"	8
"Avdotya Romanovna"	102
"Darya Frantsovna"	4
"Dmitri Prokofitch"	22
"Ilya Petrovitch"	29
"Ivan Afanasyvitch"	2
"Rodion Romanovitch"	78
"Semyon Semyonovitch"	2
"Semyon Zaharovitch"	7
"Sofya Ivanovna"	3
"Sofya Semyonovna"	62
"Tit Vassilitch"	1
"Vassily Ivanovitch"	1


# Most common middle name

In [24]:
%%writefile job3.py
from mrjob.job import MRJob
from mrjob.step import MRStep

import re

regexp =re.compile('([A-Z][a-z]{2,})\s([A-Z][a-z]{3,}(vna|ich|itch))')


class MRMostCommonName(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer),
            MRStep(reducer=self.reducer2),
        ]

    def mapper(self, _, line):
        for name in re.finditer(regexp, line):
            yield re.sub("\s+", " ", name.group()), 1

    def reducer(self, key, values):
        yield None, (key, sum(values))

    def reducer2(self, _, values):
        yield max(values, key=lambda x: x[1])

if __name__ == '__main__':
    MRMostCommonName.run()

Writing job3.py


In [25]:
!python job3.py -q crime-punishment.txt

"Katerina Ivanovna"	186


In [26]:
%%writefile job4.py
from mrjob.job import MRJob
from mrjob.step import MRStep

import re

regexp =re.compile('([A-Z][a-z]{3,}(vna|ich|itch))')


class MRMostCommonMiddleName(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer),
            MRStep(reducer=self.reducer2),
        ]

    def mapper(self, _, line):
        for name in re.finditer(regexp, line):
            yield name.group(), 1

    def reducer(self, key, values):
        yield None, (key, sum(values))

    def reducer2(self, _, values):
        yield max(values, key=lambda x: x[1])

if __name__ == '__main__':
    MRMostCommonMiddleName.run()

Writing job4.py


In [27]:
!python job4.py -q crime-punishment.txt

"Ivanovna"	304


In [37]:
%%writefile job4_2.py
from mrjob.job import MRJob
from mrjob.step import MRStep

import re

regexp =re.compile('([A-Z][a-z]{3,}(vna|ich|itch))')


class MRMostCommonMiddleNameWithCombiner(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper, reducer=self.reducer, combiner=self.combiner),
            MRStep(reducer=self.reducer2),
        ]

    def mapper(self, _, line):
        for name in re.finditer(regexp, line):
            yield name.group(), 1

    def combiner(self, key, values):
        yield key, sum(values)

    def reducer(self, key, values):
        yield None, (key, sum(values))

    def reducer2(self, _, values):
        yield max(values, key=lambda x: x[1])

if __name__ == '__main__':
    MRMostCommonMiddleNameWithCombiner.run()

Writing job4_2.py


In [41]:
!python job4_2.py -q crime-punishment.txt

"Ivanovna"	304


# Average of numbers

In [31]:
# generate numbers
# write to file
# count average
import numpy as np

numbers = np.random.randint(-100, 100, size=(1000, 50))
with open("numbers.txt", "w") as f:
    for line in numbers:
        f.write(f"{str(line.tolist())[1:-1]}\n")

In [35]:
%%writefile job5.py
from mrjob.job import MRJob

import re


class MRAverage(MRJob):
    def mapper(self, _, line):
        for number in line.strip().split(","):
            yield 1, int(number)

    def reducer(self, key, values):
        values = list(values)
        yield "avg", sum(values) / len(values)

if __name__ == '__main__':
    MRAverage.run()

Overwriting job5.py


In [36]:
!python job5.py -q numbers.txt

"avg"	-0.33206
