<a href="https://colab.research.google.com/github/RajatPanda/fds_assignment2/blob/main/Assignment2_G24AI2073_RajatBhusanPanda.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Install Java, Hadoop & mrjob

In [1]:
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq
!wget -q https://downloads.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz
!tar -xf hadoop-3.3.6.tar.gz
import os, pathlib, sys, json, re, textwrap, math, itertools
os.environ["JAVA_HOME"]  = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["HADOOP_HOME"] = "/content/hadoop-3.3.6"
!pip install -q mrjob==0.7.4
!ln -sf /content/hadoop-3.3.6/bin/* /usr/bin/
!hadoop version

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 126284 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u452-ga~us1-0ubuntu1~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u452-ga~us1-0ubuntu1~22.04) ...
Selecting previously unselected package openjdk-8-jdk-headless:amd64.
Preparing to unpack .../openjdk-8-jdk-headless_8u452-ga~us1-0ubuntu1~22.04_amd64.deb ...
Unpacking openjdk-8-jdk-headless:amd64 (8u452-ga~us1-0ubuntu1~22.04) ...
Setting up libxtst6:amd64 (2:1.2.3-1build4) ...
Setting up openjdk-8-jre-headless:amd64 (8u452-ga~us1-0ubu

Get CSVs

In [2]:
!wget -q -O cruise.csv  https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/cruise.csv
!wget -q -O churn.csv   https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/customer_churn.csv
!wget -q -O ecommerce.csv https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/e-com_customer.csv

In [3]:
from mrjob.job import MRJob
from mrjob.protocol import RawValueProtocol
from io import StringIO, BytesIO

Function for local runner

In [10]:
def run_local(mr_cls, sample_str):
    runner = mr_cls(args=['-r', 'inline', '--no-conf'])
    stdin  = BytesIO(sample_str.encode('utf-8'))
    with runner.make_runner() as r:
        r._stdin = stdin
        r.run()
        return [line.decode('utf-8').strip() for line in r.cat_output()]

Q1: For each Cruise line compute:
* ship_count
* avg_tonnage
* max_crew

Uses a combiner to lower shuffle volume.

In [9]:
%%file cruiseline_agg.py
from mrjob.job import MRJob
from mrjob.step import MRStep
from mrjob.protocol import RawValueProtocol
import csv, sys
class CruiseLineAgg(MRJob):
    """Q1: For each Cruise line compute
       (a) ship_count
       (b) avg_tonnage
       (c) max_crew
       Uses a combiner to lower shuffle volume."""
    OUTPUT_PROTOCOL = RawValueProtocol
    def mapper(self, _, line):
        if line.startswith('Ship_name'):
            return
        row = next(csv.reader([line]))
        line_name = row[1]
        tonnage   = float(row[3])
        crew      = float(row[8])
        yield line_name, (1, tonnage, crew)
    def combiner(self, line_name, vals):
        c, t_sum, c_max = 0, 0.0, 0.0
        for cnt, ton, cr in vals:
            c += cnt; t_sum += ton; c_max = max(c_max, cr)
        yield line_name, (c, t_sum, c_max)
    def reducer(self, line_name, vals):
        c, t_sum, c_max = 0, 0.0, 0.0
        for cnt, ton, cr in vals:
            c += cnt; t_sum += ton; c_max = max(c_max, cr)
        avg_ton = round(t_sum / c, 2)
        yield None, f"{line_name}|{c}|{avg_ton:.2f}|{c_max}"
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       combiner=self.combiner,
                       reducer=self.reducer)]
if __name__ == '__main__':
    CruiseLineAgg.run()

Overwriting cruiseline_agg.py


In [11]:
from cruiseline_agg import CruiseLineAgg

sample = (
    "Ship_name,Cruise_line,Age,Tonnage,passengers,length,cabins,passenger_density,crew\n"
    "Journey,Azamara,6,30.276999999999997,6.94,5.94,3.55,42.64,3.55\n"
    "Quest,Azamara,6,30.276999999999997,6.94,5.94,3.55,42.64,3.55\n"
    "Celebration,Carnival,26,47.262,14.86,7.22,7.43,31.8,6.7\n"
)

output_lines = run_local(CruiseLineAgg, sample)
for line in output_lines:
    print(line)




Azamara	2	30.28	3.55

Carnival	1	47.26	6.7


Q2: Multi-step job that filters to VIP firms (from distributed cache) then emits company-level churn rate to 4-decimal precision.

In [14]:
%%file churn_rate.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv, os
class CompanyChurnRate(MRJob):
    """Q2: Multi-step job that filters to VIP firms (from distributed
       cache) then emits company-level churn rate to 4-decimal precision."""
    FILES = ['vip_companies.txt']
    def configure_args(self):
        super().configure_args()
        self.add_file_arg('--vip', default='vip_companies.txt',
                          help='VIP company list, one per line')
    def load_options(self, args):
        super().load_options(args)
        with open(self.options.vip) as f:
            self.vip = {l.strip() for l in f if l.strip()}
    def mapper(self, _, line):
        if line.lower().startswith('customerid'):
            return
        row = next(csv.reader([line]))
        company = row[2]             # Company column
        churned = int(row[-1] == 'Yes')
        if company in self.vip:
            yield (company, 'TOTAL'), 1
            if churned: yield (company, 'CHURNED'), 1
    def reducer(self, key, vals):
        yield key, sum(vals)
    def calc_rate(self, company, pairs):
        pairs = dict(pairs)
        rate  = pairs.get('CHURNED',0) / pairs.get('TOTAL',1)
        yield None, f"{company}|{rate:.4f}"
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer),
                MRStep(reducer=self.calc_rate)]
if __name__ == '__main__':
    CompanyChurnRate.run()

Writing churn_rate.py


In [16]:
from churn_rate import CompanyChurnRate

sample = (
    "Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn\n"
    "Cameron Williams,42.0,11066.8,0,7.22,8.0,2013-08-30 07:00:40,\"10265 Elizabeth Mission Barkerburgh, AK 89518\",Harvey LLC,1\n"
    "Kevin Mueller,41.0,11916.22,0,6.5,11.0,2013-08-13 00:38:46,\"6157 Frank Gardens Suite 019 Carloshaven, RI 17756\",Wilson PLC,1\n"
    "Eric Lozano,38.0,12884.75,0,6.67,12.0,2016-06-29 06:20:07,\"1331 Keith Court Alyssahaven, DE 90114\",\"Miller, Johnson and Wallace\",1\n"
    "Phillip White,42.0,8010.76,0,6.71,10.0,2014-04-22 12:43:12,\"13120 Daniel Mount Angelabury, WY 30645-4695\",Smith Inc,1\n"
)

output_lines = run_local(CompanyChurnRate, sample)
for line in output_lines:
    print(line)

ERROR:mrjob.inline:
Error while reading from /tmp/churn_rate.root.20250728.173528.387534/step/000/mapper/00000/input:



AttributeError: 'CompanyChurnRate' object has no attribute 'vip'

Q3: Extract 2-letter state code from Address, sum Yearly Amount Spent, output top-5.

In [17]:
%%file state_spend.py
from mrjob.job import MRJob
from mrjob.step import MRStep, MRStep
import csv, re, heapq
STATE_RE = re.compile(r',\s*([A-Z]{2})\s+\d{5}$')
class StateSpend(MRJob):
    """Q3: Extract 2-letter state code from Address,
       sum Yearly Amount Spent, output top-5."""
    def mapper(self, _, line):
        if line.startswith('Email'):
            return
        row = next(csv.reader([line]))
        addr, amt = row[1], float(row[-1])
        m = STATE_RE.search(addr)
        if m:
            yield m.group(1), amt
    def reducer_sum(self, state, amts):
        yield None, (sum(amts), state)
    def reducer_top5(self, _, state_amt_pairs):
        top = heapq.nlargest(5, state_amt_pairs)
        for amt, st in top:
            yield None, f"{st}\t{amt:.2f}"
    def steps(self):
        return [MRStep(mapper=self.mapper,
                       reducer=self.reducer_sum),
                MRStep(reducer=self.reducer_top5)]
if __name__ == '__main__':
    StateSpend.run()


Writing state_spend.py


In [19]:
from state_spend import StateSpend

sample = (
  "Email,Address,Avatar,Avg Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent\n"
  "mstephenson@fernandez.com,\"835 Frank TunnelWrightmouth, MI 82180-9605\",Violet,34.49726772511229,12.65565114916675,39.57766801952616,4.0826206329529615,587.9510539684005\n"
  "hduke@hotmail.com,\"4547 Archer CommonDiazchester, CA 06566-8576\",DarkGreen,31.92627202636016,11.109460728682564,37.268958868297744,2.66403418213262,392.2049334443264\n"
  "pallen@yahoo.com,\"24645 Valerie Unions Suite 582Cobbborough, DC 99414-7564\",Bisque,33.000914755642675,11.330278057777512,37.110597442120856,4.104543202376424,487.54750486747207\n"
  "riverarebecca@gmail.com,\"1414 David ThroughwayPort Jason, OH 22070-1220\",SaddleBrown,34.30555662975554,13.717513665142507,36.72128267790313,3.120178782748092,581.8523440352177\n"
  "mstephens@davidson-herman.com,\"14023 Rodriguez PassagePort Jacobville, PR 37242-1057\",MediumAquaMarine,33.33067252364639,12.795188551078114,37.53665330059473,4.446308318351434,599.4060920457634\n"
  "alvareznancy@lucas.biz,\"645 Martha Park Apt. 611Jeffreychester, MN 67218-7250\",FloralWhite,33.871037879341976,12.026925339755056,34.47687762925054,5.493507201364199,637.102447915074\n"
)

output_lines = run_local(StateSpend, sample)
for line in output_lines:
    print(line)



Q4:
* filter ships with passenger_density>35.0
* compute per-line median length (2-dec).

In [None]:
%%file median_length.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv, statistics
class MedianLength(MRJob):
    """Q4: (1) filter ships with passenger_density>35.0
       (2) compute per-line median length (2-dec)."""
    def mapper(self, _, line):
        if line.startswith('Ship_name'):
            return
        row = next(csv.reader([line]))
        line_name = row[1]
        density   = float(row[7])
        if density > 35.0:
            yield line_name, float(row[5])   # length
    def reducer(self, line_name, lengths):
        ls = sorted(lengths)
        med = statistics.median(ls)
        yield None, f"{line_name}\t{med:.2f}"
    def steps(self):
        return [MRStep(mapper=self.mapper, reducer=self.reducer)]
if __name__ == '__main__':
    MedianLength.run()


Run every job on Hadoop inside Colab

In [None]:
%%bash
python cruiseline_aggs.py   cruise.csv      -r hadoop  --output-dir q1_out  --no-conf --quiet
python churn_rate.py        churn.csv       -r hadoop  --vip vip_companies.txt --output-dir q2_out --no-conf --quiet
python state_spend.py       ecommerce.csv   -r hadoop  --output-dir q3_out  --no-conf --quiet
python median_length.py     cruise.csv      -r hadoop  --output-dir q4_out  --no-conf --quiet

echo "=== Q1 ==="; hdfs dfs -cat q1_out/part* | head
echo "=== Q2 ==="; hdfs dfs -cat q2_out/part* | head
echo "=== Q3 ==="; hdfs dfs -cat q3_out/part* | head
echo "=== Q4 ==="; hdfs dfs -cat q4_out/part* | head
