Installing **mrjob**

In [1]:
!pip install mrjob

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m430.1/439.6 kB[0m [31m16.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m11.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mrjob
Successfully installed mrjob-0.7.4


Updating the Wine Quality.py file with the latest mrjob.script.

In [11]:
%%writefile WineQualityAnalysis.py
from mrjob.job import MRJob

class WineQualityAnalysis(MRJob):

    def mapper_init(self):
        # Initialize a flag to skip the header for each mapper instance.
        # For the local runner, this means the flag is initialized once.
        self.header_skipped = False

    def mapper(self, _, line):
        # Skip the first line (header)
        if not self.header_skipped:
            self.header_skipped = True
            return

        # Skip empty lines
        if not line.strip():
            return

        parts = line.split(';')

        # Skip malformed rows (length check is still good)
        if len(parts) < 12:
            return
        # The previous 'if "alcohol" in parts[10].lower(): return' check is now removed
        # as the 'header_skipped' flag handles the header more reliably.

        try:
            quality = parts[11].replace('"', '').strip()
            alcohol = float(parts[10].replace('"', '').strip())
            yield quality, alcohol
        except:
            # Skip any row that still fails (e.g., genuinely malformed data, non-numeric values)
            return

    def reducer(self, quality, alcohol_values):
        values = list(alcohol_values)
        if not values: # Prevent division by zero if a quality group has no values
            return
        avg_alcohol = sum(values) / len(values)
        yield f"Quality {quality} avg alcohol", avg_alcohol

if __name__ == '__main__':
    WineQualityAnalysis.run()

Overwriting WineQualityAnalysis.py


Using the Map Reduce logic to analyse wine quality for the white wine dataset.

In [16]:
!python WineQualityAnalysis.py winequality-white.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/WineQualityAnalysis.root.20260214.181903.237579
Running step 1 of 1...
job output is in /tmp/WineQualityAnalysis.root.20260214.181903.237579/output
Streaming final output from /tmp/WineQualityAnalysis.root.20260214.181903.237579/output...
"Quality 7 avg alcohol"	11.367935606060605
"Quality 8 avg alcohol"	11.636
"Quality 9 avg alcohol"	12.18
"Quality 6 avg alcohol"	10.575371549893843
"Quality 3 avg alcohol"	10.345
"Quality 4 avg alcohol"	10.15245398773006
"Quality 5 avg alcohol"	9.808840082361016
Removing temp directory /tmp/WineQualityAnalysis.root.20260214.181903.237579...


Change species from Iris-setosa to Iris-virginica while keeping the same sepal length

In [17]:
%%writefile UpdatedMapReduceIris.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

DATA_RE = re.compile(r"[\w.-]+")

class MRProb2_3(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_sepW_virginica,
                   reducer=self.reducer_get_avg)
        ]

    def mapper_get_sepW_virginica(self, _, line):
        data = DATA_RE.findall(line)
        if "Iris-virginica" in data:
            sep_W = float(data[1])   # still sepal width
            yield ("sepal width", sep_W)

    def reducer_get_avg(self, key, values):
        size, total = 0, 0
        for val in values:
            size += 1
            total += val
        yield ("virginica sepal width avg", round(total, 1) / size)

if __name__ == '__main__':
    MRProb2_3.run()


Overwriting UpdatedMapReduceIris.py


Using the Map Reduce logic to analyse a different specie of the iris data set particularly the iris-virginica while keeping the same sepal length

In [18]:
!python UpdatedMapReduceIris.py iris.data

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/UpdatedMapReduceIris.root.20260214.185851.956238
Running step 1 of 1...
job output is in /tmp/UpdatedMapReduceIris.root.20260214.185851.956238/output
Streaming final output from /tmp/UpdatedMapReduceIris.root.20260214.185851.956238/output...
"virginica sepal width avg"	2.9739999999999998
Removing temp directory /tmp/UpdatedMapReduceIris.root.20260214.185851.956238...


Updating the Wine Quality.py file with the latest mrjob.script.

In [19]:
%%writefile WineQualityAnalysis.py
from mrjob.job import MRJob

class WineQualityAnalysis(MRJob):

    def mapper_init(self):
        # Initialize a flag to skip the header for each mapper instance.
        # For the local runner, this means the flag is initialized once.
        self.header_skipped = False

    def mapper(self, _, line):
        # Skip the first line (header)
        if not self.header_skipped:
            self.header_skipped = True
            return

        # Skip empty lines
        if not line.strip():
            return

        parts = line.split(';')

        # Skip malformed rows (length check is still good)
        if len(parts) < 12:
            return
        # The previous 'if "alcohol" in parts[10].lower(): return' check is now removed
        # as the 'header_skipped' flag handles the header more reliably.

        try:
            quality = parts[11].replace('"', '').strip()
            alcohol = float(parts[10].replace('"', '').strip())
            yield quality, alcohol
        except:
            # Skip any row that still fails (e.g., genuinely malformed data, non-numeric values)
            return

    def reducer(self, quality, alcohol_values):
        values = list(alcohol_values)
        if not values: # Prevent division by zero if a quality group has no values
            return
        avg_alcohol = sum(values) / len(values)
        yield f"Quality {quality} avg alcohol", avg_alcohol

if __name__ == '__main__':
    WineQualityAnalysis.run()

Overwriting WineQualityAnalysis.py


Using the Map Reduce logic to analyse wine quality for the red wine dataset.

In [13]:
!python WineQualityAnalysis.py winequality-red.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/WineQualityAnalysis.root.20260214.181319.276537
Running step 1 of 1...
job output is in /tmp/WineQualityAnalysis.root.20260214.181319.276537/output
Streaming final output from /tmp/WineQualityAnalysis.root.20260214.181319.276537/output...
"Quality 7 avg alcohol"	11.459175084175083
"Quality 8 avg alcohol"	12.117647058823529
"Quality 6 avg alcohol"	10.629722658294087
"Quality 3 avg alcohol"	9.955
"Quality 4 avg alcohol"	10.26509433962264
"Quality 5 avg alcohol"	9.899706314243758
Removing temp directory /tmp/WineQualityAnalysis.root.20260214.181319.276537...
