<a href="https://colab.research.google.com/github/Nitin5499/Bank-Loan-analysis/blob/main/FDS___Assign_2___Nitin_Kumar_(G24AI2056).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Installation of mrjob and Java for Hadoop
!pip install mrjob

!apt-get install openjdk-8-jdk -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Downloading and settingup Hadoop
HADOOP_VERSION = "3.3.6"
!wget -q https://archive.apache.org/dist/hadoop/common/hadoop-{HADOOP_VERSION}/hadoop-{HADOOP_VERSION}.tar.gz
!tar -xzf hadoop-{HADOOP_VERSION}.tar.gz
!mv hadoop-{HADOOP_VERSION} /usr/local/hadoop

# Setting up the Hadoop environment variables
os.environ["HADOOP_HOME"] = "/usr/local/hadoop"
os.environ["PATH"] = os.environ["PATH"] + ":" + os.environ["HADOOP_HOME"] + "/bin"

# Handle CLASSPATH: Check if it exists before appending, otherwise initialize it.
if "CLASSPATH" in os.environ:
    os.environ["CLASSPATH"] = os.environ["CLASSPATH"] + ":" + os.environ["HADOOP_HOME"] + "/lib/*"
else:
    os.environ["CLASSPATH"] = os.environ["HADOOP_HOME"] + "/lib/*"

print("Dependencies and Hadoop setup complete!")

Collecting mrjob
  Downloading mrjob-0.7.4-py2.py3-none-any.whl.metadata (7.3 kB)
Downloading mrjob-0.7.4-py2.py3-none-any.whl (439 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m [32m430.1/439.6 kB[0m [31m16.7 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mrjob
Successfully installed mrjob-0.7.4
^C

gzip: stdin: unexpected end of file
tar: Unexpected EOF in archive
tar: Unexpected EOF in archive
tar: Error is not recoverable: exiting now
Dependencies and Hadoop setup complete!


In [3]:
# Downloading the CSVs--
CRUISE_URL = "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/cruise.csv"
CHURN_URL = "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/customer_churn.csv"
ECOMMERCE_URL = "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/e-com_customer.csv"

CRUISE_FILE = "cruise.csv"
CHURN_FILE = "customer_churn.csv"
ECOMMERCE_FILE = "e_commerce_customer.csv"

print("Downloading files from provided URLs...")

!wget -O {CRUISE_FILE} {CRUISE_URL}

!wget -O {CHURN_FILE} {CHURN_URL}

!wget -O {ECOMMERCE_FILE} {ECOMMERCE_URL}

print("\nCSV files loaded/prepared!")
!ls -lh *.csv

Downloading files from provided URLs...
--2025-07-28 16:32:12--  https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/cruise.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 8734 (8.5K) [text/plain]
Saving to: ‘cruise.csv’


2025-07-28 16:32:12 (77.2 MB/s) - ‘cruise.csv’ saved [8734/8734]

--2025-07-28 16:32:12--  https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/customer_churn.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 115479 (113K) [text/p

Question 1: Cruise Line Aggregations

This task involves performing aggregations (e.g., total passengers and tonnage) by cruise line using MapReduce.

In [5]:
%%writefile cruise_aggregations_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class MRCruiseAggregations(MRJob):
    """
    Computes total ships, average tonnage, and maximum crew size for each cruise line.
    Uses a combiner for partial aggregation to improve efficiency.
    """

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_cruise_data,
                   combiner=self.combiner_partial_aggregations,
                   reducer=self.reducer_final_aggregations)
        ]

    def mapper_get_cruise_data(self, _, line):
        """
        Mapper: Parses each line of the cruise CSV.
        Emits (Cruise_line, (1, Tonnage, Crew)).
        '1' for counting ships, Tonnage for sum, Crew for max.
        """
        # Skip header
        if line.startswith("Cruise_line"):
            return

        try:
            # Using csv.reader for robust parsing, especially with commas in data
            # csv.reader expects an iterable of lines, so [line] wraps the current line.
            row = next(csv.reader([line]))
            cruise_line = row[0]
            # Tonnage is at index 2, crew at index 4 (0-indexed)
            tonnage = float(row[2])
            crew = int(row[4])
            # Emit: (count of ships, sum of tonnage, max crew size)
            yield cruise_line, (1, tonnage, crew)
        except (ValueError, IndexError) as e:
            # Increment a counter for bad lines to monitor data quality
            self.increment_counter('MRCruiseAggregations', 'Bad CSV lines', 1)
            # print(f"Skipping malformed line: {line} - Error: {e}") # Uncomment for debugging
            pass

    def combiner_partial_aggregations(self, cruise_line, values):
        """
        Combiner: Performs partial aggregation (sum for count/tonnage, max for crew)
        before sending to the reducer. This reduces data shuffled across the network.
        """
        total_ships = 0
        total_tonnage = 0.0
        max_crew = 0

        for count, tonnage, crew in values:
            total_ships += count
            total_tonnage += tonnage
            max_crew = max(max_crew, crew)
        yield cruise_line, (total_ships, total_tonnage, max_crew)

    def reducer_final_aggregations(self, cruise_line, values):
        """
        Reducer: Aggregates the partial results from combiners/mappers.
        Computes final total ships, average tonnage, and maximum crew size.
        """
        total_ships = 0
        total_tonnage = 0.0
        max_crew = 0

        for count, tonnage, crew in values:
            total_ships += count
            total_tonnage += tonnage
            max_crew = max(max_crew, crew)

        avg_tonnage = total_tonnage / total_ships if total_ships > 0 else 0.0
        yield cruise_line, (total_ships, round(avg_tonnage, 2), max_crew)

if __name__ == '__main__':
    MRCruiseAggregations.run()

Overwriting cruise_aggregations_job.py


In [7]:
# Dummy input file for demo
small_cruise_data = """Cruise_line,Cruise_ship_name,Tonnage,passengers,crew,built,Inaugural_Date,Years_in_service,Passenger_density,length,cabins
AIDA Cruises,AIDAbella,69203,2050,600,2008,2008,12,33.75,252.0,1025
AIDA Cruises,AIDAluna,69203,2050,600,2009,2009,11,33.75,252.0,1025
Carnival Cruise Line,Carnival Freedom,110000,2974,1150,2007,2007,13,37.00,290.0,1487
Carnival Cruise Line,Carnival Horizon,133500,3960,1450,2018,2018,2,33.71,323.0,1980
Royal Caribbean,Allure of the Seas,225282,5400,2200,2010,2010,10,41.67,362.0,2700
"""
with open("small_cruise.csv", "w") as f:
    f.write(small_cruise_data)

print("Running Cruise Aggregations Job on small_cruise.csv (inline test output):")
!python cruise_aggregations_job.py small_cruise.csv

Running Cruise Aggregations Job on small_cruise.csv (inline test output):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/cruise_aggregations_job.root.20250728.163849.395535
Running step 1 of 1...
job output is in /tmp/cruise_aggregations_job.root.20250728.163849.395535/output
Streaming final output from /tmp/cruise_aggregations_job.root.20250728.163849.395535/output...
"Carnival Cruise Line"	[2, 121750.0, 1450]
"Royal Caribbean"	[1, 225282.0, 2200]
"AIDA Cruises"	[2, 69203.0, 600]
Removing temp directory /tmp/cruise_aggregations_job.root.20250728.163849.395535...


In [8]:
%%writefile company_churn_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class MRCompanyChurn(MRJob):
    """
    Computes churn rate for specified companies from customer_churn.csv.
    Uses a two-step pipeline:
    Step 1: Mapper emits (Company, 'total') and (Company, 'churned')
            if the company is in the VIP list from distributed cache.
    Step 2: Reducer calculates churn rate (CHURNED / TOTAL) for each company.
    """

    # Define files to be placed in the distributed cache
    def configure_args(self):
        super(MRCompanyChurn, self).configure_args()
        self.add_file_arg('--vip-companies', help='Path to VIP companies list')

    def load_vip_companies(self):
        """
        Mapper initialization: Loads VIP company names from the distributed cache.
        This runs once per mapper process.
        """
        self.vip_companies = set()
        # self.options.vip_companies will contain the path to the file in the distributed cache
        if self.options.vip_companies:
            with open(self.options.vip_companies, 'r') as f:
                for line in f:
                    self.vip_companies.add(line.strip())
        else:
            # This warning appears if --vip-companies is not provided
            self.logger.warning("No VIP companies file provided. Processing all companies.")


    def steps(self):
        return [
            MRStep(mapper_init=self.load_vip_companies, # This runs once before the mapper starts processing data
                   mapper=self.mapper_churn_counts),
            MRStep(reducer=self.reducer_churn_rate)
        ]

    def mapper_churn_counts(self, _, line):
        """
        Mapper: Parses customer churn data.
        Emits (Company, 'total_count') for every record.
        Emits (Company, 'churned_count') if Churn == 1.
        Only processes companies present in the VIP list (if provided via distributed cache).
        """
        if line.startswith("Customer ID"): # Skip header
            return

        try:
            row = next(csv.reader([line]))
            # Assuming format: Customer ID,Company,Region,Churn
            company = row[1]
            churn = int(row[3])

            # Filter by VIP companies if the list is loaded, otherwise process all (if vip_companies is empty)
            if not self.vip_companies or company in self.vip_companies:
                yield company, ('total', 1)
                if churn == 1:
                    yield company, ('churned', 1)
        except (ValueError, IndexError) as e:
            self.increment_counter('MRCompanyChurn', 'Bad CSV lines', 1)
            # print(f"Skipping malformed line: {line} - Error: {e}") # Uncomment for debugging
            pass

    def reducer_churn_rate(self, company, counts):
        """
        Reducer: Aggregates counts and calculates the churn rate for each company.
        """
        total_customers = 0
        churned_customers = 0

        for count_type, value in counts:
            if count_type == 'total':
                total_customers += value
            elif count_type == 'churned':
                churned_customers += value

        churn_rate = 0.0
        if total_customers > 0:
            churn_rate = float(churned_customers) / total_customers

        yield company, f"{churn_rate:.4f}" # Output as four-decimal float

if __name__ == '__main__':
    MRCompanyChurn.run()

Writing company_churn_job.py


Inline Test Output for Cruise Aggregation

Below is the result of running the cruise aggregation job on sample input data.

In [9]:
%%writefile vip_companies.txt
AlphaCorp
BetaCorp
DeltaCorp

Writing vip_companies.txt


Question 2: Customer Churn by Company

This task analyzes churned customers grouped by their associated companies using MapReduce.

In [10]:
# Dummy input file for demo
small_churn_data = """Customer ID,Company,Region,Churn
C001,AlphaCorp,North,0
C002,BetaCorp,South,1
C003,AlphaCorp,East,1
C004,GammaCorp,West,0
C005,BetaCorp,North,0
C006,AlphaCorp,Central,0
C007,BetaCorp,South,1
C008,AlphaCorp,South,1
C009,DeltaCorp,East,0
C010,BetaCorp,West,0
"""
with open("small_customer_churn.csv", "w") as f:
    f.write(small_churn_data)

print("Running Company Churn Rate Job on small_customer_churn.csv with VIP companies (inline test output):")
# Use --files to put vip_companies.txt into the distributed cache (required by Hadoop)
# Use --vip-companies to tell your mrjob script the name of the file it needs to open
!python company_churn_job.py small_customer_churn.csv --files vip_companies.txt --vip-companies vip_companies.txt

Running Company Churn Rate Job on small_customer_churn.csv with VIP companies (inline test output):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/company_churn_job.root.20250728.164040.005449
Running step 1 of 2...
Running step 2 of 2...
job output is in /tmp/company_churn_job.root.20250728.164040.005449/output
Streaming final output from /tmp/company_churn_job.root.20250728.164040.005449/output...
"BetaCorp"	"0.5000"
"DeltaCorp"	"0.0000"
"AlphaCorp"	"0.5000"
Removing temp directory /tmp/company_churn_job.root.20250728.164040.005449...


In [11]:
%%writefile state_spending_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import re

class MRStateSpending(MRJob):
    """
    Computes total yearly amount spent per state from e-commerce customer data
    and then outputs the top 5 states by spending.
    """

    def steps(self):
        return [
            # Step 1: Map (extract state & spending) -> Reduce (sum spending per state)
            MRStep(mapper=self.mapper_get_state_spending,
                   reducer=self.reducer_sum_spending_per_state),
            # Step 2: Reduce (collect all sums, sort, and find top 5)
            MRStep(reducer=self.reducer_find_top_states)
        ]

    def mapper_get_state_spending(self, _, line):
        """
        Mapper: Parses the E-commerce Customer CSV, extracts the state code
        from the Address, and emits (State, Yearly Amount Spent).
        """
        if line.startswith("Email"): # Skip header line
            return

        try:
            row = next(csv.reader([line]))
            # Assuming columns are: Email,Address,Avatar,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
            address = row[1] # Address is the second column (index 1)
            yearly_amount_spent = float(row[7]) # Yearly Amount Spent is the eighth column (index 7)

            # Regex to find a two-letter uppercase state code followed by a 5-digit zip code
            # Example: "123 Main St, Anytown, CA 90210" -> extracts "CA"
            match = re.search(r',\s*([A-Z]{2})\s*\d{5}', address)
            if match:
                state_code = match.group(1) # Extract the two-letter state code
                yield state_code, yearly_amount_spent
            else:
                self.increment_counter('MRStateSpending', 'No state code found', 1)
        except (ValueError, IndexError, TypeError) as e:
            # Catch errors for malformed lines or incorrect data types
            self.increment_counter('MRStateSpending', 'Bad CSV lines', 1)
            # print(f"Skipping malformed line: {line} - Error: {e}") # Uncomment for debugging
            pass

    def reducer_sum_spending_per_state(self, state, amounts):
        """
        Reducer 1: Sums the yearly amount spent for each state.
        Emits (None, (total_spending, state)) to prepare for a global sort
        in the next reducer step.
        """
        total_spending = sum(amounts)
        # Emit with a None key so all pairs go to a single reducer in the next step
        yield None, (total_spending, state)

    def reducer_find_top_states(self, _, spending_state_pairs):
        """
        Reducer 2: Collects all (total_spending, state) pairs, sorts them globally,
        and yields only the top 5 states by spending.
        """
        # Collect all (spending, state) tuples and sort in descending order of spending
        sorted_states = sorted(spending_state_pairs, key=lambda x: x[0], reverse=True)

        # Emit only the top 5 states
        for i, (total_spending, state) in enumerate(sorted_states):
            if i < 5:
                yield state, round(total_spending, 2)
            else:
                break # Stop after emitting the top 5

if __name__ == '__main__':
    MRStateSpending.run()

Writing state_spending_job.py


Inline Test Output for Customer Churn Analysis

In [12]:
# Dummy input file for demo (with quotes address)
small_ecommerce_data = """Email,Address,Avatar,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
cust1@example.com,"123 Main St, Springfield, IL 62701",20.0,3.5,0.0,12.0,500.25
cust2@example.com,"456 Oak Ave, Pleasantville, CA 90210",30.5,4.0,2.1,24.5,1200.50
cust3@example.com,"789 Pine Ln, Metropolis, NY 10001",25.1,2.8,1.5,18.0,800.75
cust4@example.com,"101 Elm Blvd, Springfield, IL 62701",15.0,2.0,0.5,8.0,300.00
cust5@example.com,"202 Maple Dr, Sunnydale, CA 90210",40.0,5.0,3.0,36.0,1500.00
cust6@example.com,"303 River Rd, Gotham, NY 10001",22.5,3.1,1.0,15.0,950.00
cust7@example.com,"404 Hilltop, Smallville, KS 66044",18.0,2.5,0.2,10.0,400.00
cust8@example.com,"505 Valley Dr, Central City, CA 90210",35.0,4.5,2.5,30.0,1100.00
cust9@example.com,"606 Ocean Ave, Star City, WA 98001",28.0,3.8,1.8,20.0,1300.00
cust10@example.com,"707 Mountain Rd, Riverdale, NY 10001",10.0,1.5,0.0,5.0,200.00
"""
with open("small_e_commerce_customer.csv", "w") as f:
    f.write(small_ecommerce_data)

print("small_e_commerce_customer.csv has been updated with quoted addresses.")

# Optional: Verify the file content by reading it back
# with open("small_e_commerce_customer.csv", "r") as f:
#     print("\nContent of updated small_e_commerce_customer.csv:")
#     print(f.read())

small_e_commerce_customer.csv has been updated with quoted addresses.


Question 3: E-Commerce Customer Spending by Country

This job computes spending insights from an e-commerce dataset using MapReduce.

In [13]:
print("Running State-wise Spending Job on small_e_commerce_customer.csv (inline test output):")
!python state_spending_job.py small_e_commerce_customer.csv

Running State-wise Spending Job on small_e_commerce_customer.csv (inline test output):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/state_spending_job.root.20250728.164303.702416
Running step 1 of 2...

Counters: 1
	MRStateSpending
		Bad CSV lines=10

Running step 2 of 2...
job output is in /tmp/state_spending_job.root.20250728.164303.702416/output
Streaming final output from /tmp/state_spending_job.root.20250728.164303.702416/output...
Removing temp directory /tmp/state_spending_job.root.20250728.164303.702416...


In [14]:
# Dummy input file for demo (with quoted addess and complete columns)
small_ecommerce_data = """Email,Address,Avatar,Avg. Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
cust1@example.com,"123 Main St, Springfield, IL 62701",Lavender,20.0,3.5,0.0,12.0,500.25
cust2@example.com,"456 Oak Ave, Pleasantville, CA 90210",Teal,30.5,4.0,2.1,24.5,1200.50
cust3@example.com,"789 Pine Ln, Metropolis, NY 10001",Blue,25.1,2.8,1.5,18.0,800.75
cust4@example.com,"101 Elm Blvd, Springfield, IL 62701",Green,15.0,2.0,0.5,8.0,300.00
cust5@example.com,"202 Maple Dr, Sunnydale, CA 90210",Red,40.0,5.0,3.0,36.0,1500.00
cust6@example.com,"303 River Rd, Gotham, NY 10001",Orange,22.5,3.1,1.0,15.0,950.00
cust7@example.com,"404 Hilltop, Smallville, KS 66044",Yellow,18.0,2.5,0.2,10.0,400.00
cust8@example.com,"505 Valley Dr, Central City, CA 90210",Purple,35.0,4.5,2.5,30.0,1100.00
cust9@example.com,"606 Ocean Ave, Star City, WA 98001",Gray,28.0,3.8,1.8,20.0,1300.00
cust10@example.com,"707 Mountain Rd, Riverdale, NY 10001",Black,10.0,1.5,0.0,5.0,200.00
"""
with open("small_e_commerce_customer.csv", "w") as f:
    f.write(small_ecommerce_data)

print("small_e_commerce_customer.csv has been updated with complete, quoted data.")

small_e_commerce_customer.csv has been updated with complete, quoted data.


Inline Test Output for E-Commerce Spending Analysis

In [15]:
%%writefile ship_filter_median_length_job.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv
import math

class MRShipFilterMedianLength(MRJob):
    """
    A two-step MapReduce pipeline on cruise.csv:
    Step 1: Filters ships with passenger density > 35.0 and emits (Cruise line, length).
    Step 2: Computes the median of the lengths for each Cruise line.
    Handles both even and odd counts for median calculation.
    """

    def steps(self):
        return [
            MRStep(mapper=self.mapper_filter_ships,
                   # No combiner here as we need all lengths per key for median calculation
                   reducer=self.reducer_compute_median_length)
        ]

    def mapper_filter_ships(self, _, line):
        """
        Mapper: Filters ships based on passenger density.
        Emits (Cruise_line, length) for ships with passenger density > 35.0.
        """
        if line.startswith("Cruise_line"): # Skip header
            return

        try:
            # Using csv.reader for robust parsing
            row = next(csv.reader([line]))
            cruise_line = row[0]
            passenger_density = float(row[8]) # Passenger_density is 9th column (index 8)
            length = float(row[9]) # length is 10th column (index 9)

            if passenger_density > 35.0:
                yield cruise_line, length
        except (ValueError, IndexError) as e:
            self.increment_counter('MRShipFilterMedianLength', 'Bad CSV lines', 1)
            # print(f"Skipping malformed line: {line} - Error: {e}") # Uncomment for debugging
            pass

    def reducer_compute_median_length(self, cruise_line, lengths):
        """
        Reducer: Computes the median length for each cruise line.
        Handles even and odd counts for median calculation.
        """
        # Collect all lengths and sort them to find the median
        all_lengths = sorted(list(lengths))
        n = len(all_lengths)

        median = 0.0
        if n == 0:
            median = 0.0 # No lengths found for this cruise line after filtering
        elif n % 2 == 1: # Odd number of elements
            median = all_lengths[n // 2]
        else: # Even number of elements
            mid1 = all_lengths[n // 2 - 1]
            mid2 = all_lengths[n // 2]
            median = (mid1 + mid2) / 2.0

        yield cruise_line, round(median, 2)

if __name__ == '__main__':
    MRShipFilterMedianLength.run()

Writing ship_filter_median_length_job.py


In [16]:
small_cruise_data = """Cruise_line,Cruise_ship_name,Tonnage,passengers,crew,built,Inaugural_Date,Years_in_service,Passenger_density,length,cabins
AIDA Cruises,AIDAbella,69203,2050,600,2008,2008,12,33.75,252.0,1025     # Density 33.75 <= 35.0 -> FILTERED OUT
AIDA Cruises,AIDAluna,69203,2050,600,2009,2009,11,33.75,252.0,1025      # Density 33.75 <= 35.0 -> FILTERED OUT
Carnival Cruise Line,Carnival Freedom,110000,2974,1150,2007,2007,13,37.00,290.0,1487 # Density 37.00 > 35.0 -> KEPT (length 290.0)
Carnival Cruise Line,Carnival Horizon,133500,3960,1450,2018,2018,2,33.71,323.0,1980 # Density 33.71 <= 35.0 -> FILTERED OUT
Royal Caribbean,Allure of the Seas,225282,5400,2200,2010,2010,10,41.67,362.0,2700 # Density 41.67 > 35.0 -> KEPT (length 362.0)
"""
# The small_cruise.csv file should already exist from previous questions.
# If not, run the cell that defines and writes it in section 1.
# Or explicitly create it again if you're only testing this part:
with open("small_cruise.csv", "w") as f:
    f.write(small_cruise_data)

print("Running Ship Filter & Median Length Job on small_cruise.csv (inline test output):")
!python ship_filter_median_length_job.py small_cruise.csv

Running Ship Filter & Median Length Job on small_cruise.csv (inline test output):
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/ship_filter_median_length_job.root.20250728.164503.676263
Running step 1 of 1...
job output is in /tmp/ship_filter_median_length_job.root.20250728.164503.676263/output
Streaming final output from /tmp/ship_filter_median_length_job.root.20250728.164503.676263/output...
"Royal Caribbean"	362.0
"Carnival Cruise Line"	290.0
Removing temp directory /tmp/ship_filter_median_length_job.root.20250728.164503.676263...
