<a href="https://colab.research.google.com/github/dotavinash/VCC--Main/blob/main/Assignment2_G24AI2025_AvinashKumar.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install mrjob
!pip install mrjob

# Install Java (required for Hadoop)
!apt-get install openjdk-8-jdk -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Download and setup Hadoop (using a recent stable version, e.g., 3.3.6)
HADOOP_VERSION = "3.3.6"
!wget -q https://archive.apache.org/dist/hadoop/common/hadoop-{HADOOP_VERSION}/hadoop-{HADOOP_VERSION}.tar.gz
!tar -xzf hadoop-{HADOOP_VERSION}.tar.gz
!mv hadoop-{HADOOP_VERSION} /usr/local/hadoop

# Set Hadoop environment variables
os.environ["HADOOP_HOME"] = "/usr/local/hadoop"
os.environ["PATH"] = os.environ["PATH"] + ":" + os.environ["HADOOP_HOME"] + "/bin"

# Handle CLASSPATH: Check if it exists before appending, otherwise initialize it.
if "CLASSPATH" in os.environ:
    os.environ["CLASSPATH"] = os.environ["CLASSPATH"] + ":" + os.environ["HADOOP_HOME"] + "/lib/*"
else:
    os.environ["CLASSPATH"] = os.environ["HADOOP_HOME"] + "/lib/*"

print("Added dependencies !")

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m174.1/439.6 kB[0m [31m4.9 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m6.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mrjob
Successfully installed mrjob-0.7.4
Added dependencies !


In [None]:
# --- Source URLs for datasets ---
URL_CRUISE_DATA = "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/cruise.csv"
URL_CHURN_DATA = "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/customer_churn.csv"
URL_ECOM_DATA = "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/e-com_customer.csv"

# --- Filenames for saving the datasets locally ---
FILENAME_CRUISE = "cruise_data.csv"
FILENAME_CHURN = "churn_data.csv"
FILENAME_ECOM = "ecommerce_data.csv"

print("Initiating download of datasets from GitHub URLs...")

# Fetch cruise dataset
!wget -q -O {FILENAME_CRUISE} {URL_CRUISE_DATA}

# Fetch churn dataset
!wget -q -O {FILENAME_CHURN} {URL_CHURN_DATA}

# Fetch e-commerce dataset
!wget -q -O {FILENAME_ECOM} {URL_ECOM_DATA}

print("\nDownload completed. Listing CSV files:")
!ls -lh *.csv


Initiating download of datasets from GitHub URLs...

Download completed. Listing CSV files:
-rw-r--r-- 1 root root 113K Jul 29 15:57 churn_data.csv
-rw-r--r-- 1 root root 8.6K Jul 29 15:57 cruise_data.csv
-rw-r--r-- 1 root root  85K Jul 29 15:57 ecommerce_data.csv


Question 1: Cruise Line Aggregations

This task involves performing aggregations (e.g., total passengers and tonnage) by cruise line using MapReduce.

In [None]:
%%writefile cruise_summary_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CruiseStatsAggregator(MRJob):
    """
    This MRJob calculates:
    - Number of ships
    - Average tonnage
    - Maximum crew size
    for each cruise line using MapReduce.
    Partial aggregation is handled in the combiner to reduce data shuffle.
    """

    def steps(self):
        return [
            MRStep(mapper=self.map_cruise_data,
                   combiner=self.combine_stats,
                   reducer=self.reduce_stats)
        ]

    def map_cruise_data(self, _, line):
        """
        Mapper function:
        Processes CSV input lines and emits:
        (CruiseLine, (1, Tonnage, CrewSize)).
        """
        if line.startswith("Cruise_line"):
            return

        try:
            row = next(csv.reader([line]))
            cruise_line = row[0]
            tonnage_val = float(row[2])
            crew_val = int(row[4])
            yield cruise_line, (1, tonnage_val, crew_val)
        except (ValueError, IndexError):
            self.increment_counter('CruiseStatsAggregator', 'Invalid Rows', 1)
            return

    def combine_stats(self, cruise_line, entries):
        """
        Combiner function:
        Aggregates intermediate results to reduce network overhead.
        Adds ship counts and tonnage; tracks max crew size.
        """
        count = 0
        tonnage_total = 0.0
        crew_max = 0

        for ships, tons, crew in entries:
            count += ships
            tonnage_total += tons
            crew_max = max(crew_max, crew)

        yield cruise_line, (count, tonnage_total, crew_max)

    def reduce_stats(self, cruise_line, entries):
        """
        Reducer function:
        Merges combined results from all mappers.
        Computes final ship count, average tonnage, and largest crew size.
        """
        total_ships = 0
        total_tons = 0.0
        max_crew_seen = 0

        for ships, tons, crew in entries:
            total_ships += ships
            total_tons += tons
            max_crew_seen = max(max_crew_seen, crew)

        avg_tonnage = total_tons / total_ships if total_ships > 0 else 0.0
        yield cruise_line, (total_ships, round(avg_tonnage, 2), max_crew_seen)

if __name__ == '__main__':
    CruiseStatsAggregator.run()


Writing cruise_summary_job.py


In [None]:
# Generate a mock dataset file for testing the MapReduce job
sample_data = """Cruise_line,Cruise_ship_name,Tonnage,passengers,crew,built,Inaugural_Date,Years_in_service,Passenger_density,length,cabins
AIDA Cruises,AIDAbella,69203,2050,600,2008,2008,12,33.75,252.0,1025
AIDA Cruises,AIDAluna,69203,2050,600,2009,2009,11,33.75,252.0,1025
Carnival Cruise Line,Carnival Freedom,110000,2974,1150,2007,2007,13,37.00,290.0,1487
Carnival Cruise Line,Carnival Horizon,133500,3960,1450,2018,2018,2,33.71,323.0,1980
Royal Caribbean,Allure of the Seas,225282,5400,2200,2010,2010,10,41.67,362.0,2700
"""

# Save the mock CSV data into a file
with open("demo_cruise_data.csv", "w") as file:
    file.write(sample_data)

print("Executing cruise summary job on demo_cruise_data.csv:")

# Run the MapReduce script using MRJob
!python cruise_summary_job.py demo_cruise_data.csv


Executing cruise summary job on demo_cruise_data.csv:
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/cruise_summary_job.root.20250729.160025.021909
Running step 1 of 1...
job output is in /tmp/cruise_summary_job.root.20250729.160025.021909/output
Streaming final output from /tmp/cruise_summary_job.root.20250729.160025.021909/output...
"Royal Caribbean"	[1, 225282.0, 2200]
"AIDA Cruises"	[2, 69203.0, 600]
"Carnival Cruise Line"	[2, 121750.0, 1450]
Removing temp directory /tmp/cruise_summary_job.root.20250729.160025.021909...


In [None]:
%%writefile churn_rate_analysis.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CompanyChurnRateJob(MRJob):
    """
    This job computes the churn percentage for selected companies
    by analyzing records in the customer churn dataset.
    This job computes the churn percentage for selected companies
    by analyzing records in the customer churn dataset.This job computes the churn percentage for selected companies
    by analyzing records in the customer churn dataset.This job computes the churn percentage for selected companies
    by analyzing records in the customer churn dataset.

    """

    def configure_args(self):
        super(CompanyChurnRateJob, self).configure_args()
        self.add_file_arg('--vip-companies', help='File containing names of VIP companies')

    def load_target_companies(self):
        """
        This runs once per mapper instance and reads the list of target companies
        from a file passed via --vip-companies argument.

        This runs once per mapper instance and reads the list of target companies
        from a file passed via --vip-companies argument.

        This runs once per mapper instance and reads the list of target companies
        from a file passed via --vip-companies argument.
        """
        self.target_companies = set()
        if self.options.vip_companies:
            with open(self.options.vip_companies, 'r') as file:
                for line in file:
                    self.target_companies.add(line.strip())
        else:
            self.logger.warning("VIP company file missing. All records will be evaluated.")

    def steps(self):
        return [
            MRStep(mapper_init=self.load_target_companies,
                   mapper=self.map_customer_records),
            MRStep(reducer=self.compute_churn_percentage)
        ]

    def map_customer_records(self, _, record):
        """
        Mapper logic:
        Emits (Company, ('total', 1)) for every record.
        Emits (Company, ('churned', 1)) when the customer has churned.
        Filters input based on the provided VIP company list, if available.
        """
        if record.startswith("Customer ID"):
            return  # Skip header

        try:
            row = next(csv.reader([record]))
            company_name = row[1]
            churn_status = int(row[3])

            if not self.target_companies or company_name in self.target_companies:
                yield company_name, ('total', 1)
                if churn_status == 1:
                    yield company_name, ('churned', 1)
        except (ValueError, IndexError):
            self.increment_counter('CompanyChurnRateJob', 'CorruptedInputLines', 1)

    def compute_churn_percentage(self, company, stats):
        """
        Reducer logic:
        Aggregates 'total' and 'churned' counts and computes churn ratio.
        """
        total_users = 0
        churned_users = 0

        for entry_type, count in stats:
            if entry_type == 'total':
                total_users += count
            elif entry_type == 'churned':
                churned_users += count

        churn_ratio = float(churned_users) / total_users if total_users > 0 else 0.0
        yield company, f"{churn_ratio:.4f}"

if __name__ == '__main__':
    CompanyChurnRateJob.run()


Writing churn_rate_analysis.py




Inline Test Output for Cruise Aggregation

Below is the result of running the cruise aggregation job on sample input data.

In [None]:
%%writefile vip_companies.txt
AlphaCorp
BetaCorp
DeltaCorp

Writing vip_companies.txt


Question 2: Customer Churn by Company

This task analyzes churned customers grouped by their associated companies using MapReduce.

In [None]:
# Generate a compact sample churn dataset for testing
demo_churn_content = """Customer ID,Company,Region,Churn
C001,AlphaCorp,North,0
C002,BetaCorp,South,1
C003,AlphaCorp,East,1
C004,GammaCorp,West,0
C005,BetaCorp,North,0
C006,AlphaCorp,Central,0
C007,BetaCorp,South,1
C008,AlphaCorp,South,1
C009,DeltaCorp,East,0
C010,BetaCorp,West,0
"""

# Write the test data to a file
with open("demo_churn_data.csv", "w") as file:
    file.write(demo_churn_content)

print("Executing churn analysis on demo_churn_data.csv with a list of selected companies (VIP):")

# Use --files to include vip_companies.txt in the distributed cache
# Pass the file using --vip-companies so the MRJob script can access it
!python churn_rate_analysis.py demo_churn_data.csv --files vip_companies.txt --vip-companies vip_companies.txt


Executing churn analysis on demo_churn_data.csv with a list of selected companies (VIP):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/churn_rate_analysis.root.20250729.160510.273787
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/churn_rate_analysis.root.20250729.160510.273787/output
Streaming final output from /tmp/churn_rate_analysis.root.20250729.160510.273787/output...
"DeltaCorp"	"0.0000"
"AlphaCorp"	"0.5000"
"BetaCorp"	"0.5000"
Removing temp directory /tmp/churn_rate_analysis.root.20250729.160510.273787...


In [None]:
%%writefile top_spending_states_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import re

class TopStateSpenders(MRJob):
    """
    This MapReduce job calculates the total yearly spending per state
    based on customer address data, and then outputs the top 5 highest spending states.


    This MapReduce job calculates the total yearly spending per state
    based on customer address data, and then outputs the top 5 highest spending states.



    This MapReduce job calculates the total yearly spending per state
    based on customer address data, and then outputs the top 5 highest spending states.
    """

    def steps(self):
        return [
            MRStep(mapper=self.extract_state_and_spending,
                   reducer=self.aggregate_spending_by_state),
            MRStep(reducer=self.select_top_spending_states)
        ]

    def extract_state_and_spending(self, _, line):
        """
        Mapper:
        Extracts U.S. state codes and corresponding yearly spend from customer address records.
        Emits: (State, YearlyAmountSpent)



         Mapper:
        Extracts U.S. state codes and corresponding yearly spend from customer address records.
        Emits: (State, YearlyAmountSpent)




         Mapper:
        Extracts U.S. state codes and corresponding yearly spend from customer address records.
        Emits: (State, YearlyAmountSpent)
        """
        if line.startswith("Email"):
            return  # Skip header row

        try:
            fields = next(csv.reader([line]))
            address_field = fields[1]  # Address column
            spending = float(fields[7])  # Yearly Amount Spent column

            # Match a 2-letter state abbreviation followed by a ZIP code
            match = re.search(r',\s*([A-Z]{2})\s*\d{5}', address_field)
            if match:
                state = match.group(1)
                yield state, spending
            else:
                self.increment_counter('TopStateSpenders', 'MissingStateCode', 1)
        except (ValueError, IndexError, TypeError):
            self.increment_counter('TopStateSpenders', 'InvalidLines', 1)

    def aggregate_spending_by_state(self, state, values):
        """
        Reducer (Step 1):

        """
        total = sum(values)
        yield None, (total, state)

    def select_top_spending_states(self, _, state_spend_list):
        """
        Reducer (Step 2):

        """
        sorted_results = sorted(state_spend_list, key=lambda x: x[0], reverse=True)

        for index, (total, state) in enumerate(sorted_results):
            if index < 5:
                yield state, round(total, 2)
            else:
                break

if __name__ == '__main__':
    TopStateSpenders.run()


Writing top_spending_states_job.py


Inline Test Output for Customer Churn Analysis

In [None]:
# Prepare mock E-commerce dataset with quoted address fields for testing
test_ecom_data = """Email,Address,Avatar,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
cust1@example.com,"123 Main St, Springfield, IL 62701",20.0,3.5,0.0,12.0,500.25
cust2@example.com,"456 Oak Ave, Pleasantville, CA 90210",30.5,4.0,2.1,24.5,1200.50
cust3@example.com,"789 Pine Ln, Metropolis, NY 10001",25.1,2.8,1.5,18.0,800.75
cust4@example.com,"101 Elm Blvd, Springfield, IL 62701",15.0,2.0,0.5,8.0,300.00
cust5@example.com,"202 Maple Dr, Sunnydale, CA 90210",40.0,5.0,3.0,36.0,1500.00
cust6@example.com,"303 River Rd, Gotham, NY 10001",22.5,3.1,1.0,15.0,950.00
cust7@example.com,"404 Hilltop, Smallville, KS 66044",18.0,2.5,0.2,10.0,400.00
cust8@example.com,"505 Valley Dr, Central City, CA 90210",35.0,4.5,2.5,30.0,1100.00
cust9@example.com,"606 Ocean Ave, Star City, WA 98001",28.0,3.8,1.8,20.0,1300.00
cust10@example.com,"707 Mountain Rd, Riverdale, NY 10001",10.0,1.5,0.0,5.0,200.00
"""

# Save data to file
with open("test_ecommerce_data.csv", "w") as file:
    file.write(test_ecom_data)

print("Test file 'test_ecommerce_data.csv' has been successfully created with properly quoted address fields.")


Test file 'test_ecommerce_data.csv' has been successfully created with properly quoted address fields.


Question 3: E-Commerce Customer Spending by Country

This job computes spending insights from an e-commerce dataset using MapReduce.

In [None]:
print("Launching Top Spending States analysis using test_ecommerce_data.csv (sample run):")
!python top_spending_states_job.py test_ecommerce_data.csv


Launching Top Spending States analysis using test_ecommerce_data.csv (sample run):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/top_spending_states_job.root.20250729.161312.852051
Running step 1 of 2...

Counters: 1
	TopStateSpenders
		InvalidLines=10

Running step 2 of 2...
job output is in /tmp/top_spending_states_job.root.20250729.161312.852051/output
Streaming final output from /tmp/top_spending_states_job.root.20250729.161312.852051/output...
Removing temp directory /tmp/top_spending_states_job.root.20250729.161312.852051...


In [None]:
# Create a small dummy input file for testing (WITH QUOTED ADDRESSES AND COMPLETE COLUMNS)
test_ecommerce_data = """Email,Address,Avatar,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
cust1@example.com,"123 Main St, Springfield, IL 62701",Lavender,20.0,3.5,0.0,12.0,500.25
cust2@example.com,"456 Oak Ave, Pleasantville, CA 90210",Teal,30.5,4.0,2.1,24.5,1200.50
cust3@example.com,"789 Pine Ln, Metropolis, NY 10001",Blue,25.1,2.8,1.5,18.0,800.75
cust4@example.com,"101 Elm Blvd, Springfield, IL 62701",Green,15.0,2.0,0.5,8.0,300.00
cust5@example.com,"202 Maple Dr, Sunnydale, CA 90210",Red,40.0,5.0,3.0,36.0,1500.00
cust6@example.com,"303 River Rd, Gotham, NY 10001",Orange,22.5,3.1,1.0,15.0,950.00
cust7@example.com,"404 Hilltop, Smallville, KS 66044",Yellow,18.0,2.5,0.2,10.0,400.00
cust8@example.com,"505 Valley Dr, Central City, CA 90210",Purple,35.0,4.5,2.5,30.0,1100.00
cust9@example.com,"606 Ocean Ave, Star City, WA 98001",Gray,28.0,3.8,1.8,20.0,1300.00
cust10@example.com,"707 Mountain Rd, Riverdale, NY 10001",Black,10.0,1.5,0.0,5.0,200.00
"""

with open("test_ecommerce_data.csv", "w") as f:
    f.write(test_ecommerce_data)

print("✅ test_ecommerce_data.csv has been created with quoted addresses and full column data.")


✅ test_ecommerce_data.csv has been created with quoted addresses and full column data.


Inline Test Output for E-Commerce Spending Analysis

In [None]:
%%writefile ship_filter_median_length_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class MRShipFilterMedianLength(MRJob):
    """
  n.
    """

    def steps(self):
        return [
            MRStep(mapper=self.mapper_filter_ships,
                   reducer=self.reducer_compute_median_length)
        ]

    def mapper_filter_ships(self, _, line):
        """
       .
        """
        if line.startswith("Cruise_line"):  # Skip header
            return

        try:
            # Using csv.reader for robust parsing
            row = next(csv.reader([line]))
            cruise_line = row[0]
            passenger_density = float(row[8])  # Passenger_density is 9th column (index 8)
            length = float(row[9])  # Length is 10th column (index 9)

            if passenger_density > 35.0:
                yield cruise_line, length
        except (ValueError, IndexError):
            self.increment_counter('MRShipFilterMedianLength', 'Bad CSV lines', 1)
            pass  # Skip malformed lines

    def reducer_compute_median_length(self, cruise_line, lengths):
        """
        Reducer: Computes the median length for each cruise line.
        """
        all_lengths = sorted(lengths)
        n = len(all_lengths)

        if n == 0:
            median = 0.0
        elif n % 2 == 1:
            median = all_lengths[n // 2]
        else:
            mid1 = all_lengths[n // 2 - 1]
            mid2 = all_lengths[n // 2]
            median = (mid1 + mid2) / 2.0

        yield cruise_line, round(median, 2)

if __name__ == '__main__':
    MRShipFilterMedianLength.run()


Writing ship_filter_median_length_job.py


In [None]:
# Create a small dummy cruise.csv file for testing
small_cruise_data = """Cruise_line,Cruise_ship_name,Tonnage,passengers,crew,built,Inaugural_Date,Years_in_service,Passenger_density,length,cabins
AIDA Cruises,AIDAbella,69203,2050,600,2008,2008,12,33.75,252.0,1025
AIDA Cruises,AIDAluna,69203,2050,600,2009,2009,11,33.75,252.0,1025
Carnival Cruise Line,Carnival Freedom,110000,2974,1150,2007,2007,13,37.00,290.0,1487
Carnival Cruise Line,Carnival Horizon,133500,3960,1450,2018,2018,2,33.71,323.0,1980
Royal Caribbean,Allure of the Seas,225282,5400,2200,2010,2010,10,41.67,362.0,2700
"""

with open("small_cruise.csv", "w") as f:
    f.write(small_cruise_data)

print("Running Ship Filter & Median Length Job on small_cruise.csv (inline test output):")
!python ship_filter_median_length_job.py small_cruise.csv


Running Ship Filter & Median Length Job on small_cruise.csv (inline test output):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/ship_filter_median_length_job.root.20250729.161840.608725
Running step 1 of 1...
job output is in /tmp/ship_filter_median_length_job.root.20250729.161840.608725/output
Streaming final output from /tmp/ship_filter_median_length_job.root.20250729.161840.608725/output...
"Carnival Cruise Line"	290.0
"Royal Caribbean"	362.0
Removing temp directory /tmp/ship_filter_median_length_job.root.20250729.161840.608725...
