## Title     : Extended MapReduce with Business Insights
## Module    : Big Data and Cloud Computing Coursework - Task A
## Name and Number : Greeshmi Appalapuram, 32822955

In [1]:
# Importing libraries
import csv
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import os

In [2]:
# Function: map_function
# Returns passenger_id
def map_function(line):
    passenger_id = line[0]
    return (passenger_id, 1)

In [3]:
# Function: reduce_function
# Returns flight counts per PassengerID

def reduce_function(mapped_data):
    flight_counts = defaultdict(int)
    for passenger_id, count in mapped_data:
        flight_counts[passenger_id] += count
    return flight_counts


In [4]:
# Function: classify_loyalty_tier
# Classifies passengers into loyalty tiers based on flight counts

def classify_loyalty_tier(flight_count):
    if flight_count >= 20:
        return "Platinum"
    elif flight_count >= 15:
        return "Gold"
    elif flight_count >= 10:
        return "Silver"
    else:
        return "Bronze"

In [5]:
# Function: export_to_csv
# Saves passenger insights to a CSV file for BI or ML usage

def export_to_csv(flight_counts, output_filename):
    with open(output_filename, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['PassengerID', 'TotalFlights', 'LoyaltyTier'])
        for passenger_id, count in sorted(flight_counts.items(), key=lambda x: x[1], reverse=True):
            loyalty_tier = classify_loyalty_tier(count)
            writer.writerow([passenger_id, count, loyalty_tier])
    print(f"\n Exported insights to {output_filename}")


In [6]:
# Function: generate_insights
# Prints business insights and exports results

def generate_insights(flight_counts, dataset_name):
    print(f"\n--- Enhanced Insights for {dataset_name} ---")
    sorted_counts = sorted(flight_counts.items(), key=lambda x: x[1], reverse=True)

    total_passengers = len(sorted_counts)
    total_flights = sum([count for _, count in sorted_counts])

    # Top 5 Frequent Flyers
    print("\nTop 5 Frequent Flyers:")
    top_n = 5
    top_flight_sum = 0
    for i, (pid, count) in enumerate(sorted_counts[:top_n], start=1):
        loyalty_tier = classify_loyalty_tier(count)
        print(f"{i}. Passenger ID: {pid}, Total Flights: {count}, Loyalty Tier: {loyalty_tier}")
        top_flight_sum += count

    # Loyalty Tier Summary with Counts and Percentages
    tier_summary = defaultdict(list)
    for pid, count in sorted_counts:
        tier_summary[classify_loyalty_tier(count)].append(count)

    print("\nLoyalty Tier Distribution with Percentages:")
    for tier, counts in tier_summary.items():
        num_passengers = len(counts)
        avg_flights = sum(counts) / num_passengers
        percentage = (num_passengers / total_passengers) * 100
        print(f"{tier}: {num_passengers} passenger(s), {percentage:.2f}% of all passengers, Avg Flights: {avg_flights:.2f}")

    # Cumulative Contribution Insight (Pareto Analysis)
    top_contribution_percent = (top_flight_sum / total_flights) * 100
    print(f"\nCumulative Contribution of Top {top_n} Flyers: {top_contribution_percent:.2f}% of all flights")

    print(f"\nTotal Unique Passengers: {total_passengers}")
    print(f"Total Flights Processed: {total_flights}")

    # Export to CSV for downstream ML or BI usage
    export_to_csv(flight_counts, f"{dataset_name}_PassengerInsights.csv")


In [7]:
# Function: execute_mapreduce_with_insights
# Runs MapReduce and generates insights
def execute_mapreduce_with_insights(input_file):
    mapped_data = []
    try:
        with open(input_file, 'r') as file:
            reader = csv.reader(file)
            next(reader)  # Skip header

            with ThreadPoolExecutor() as executor:
                results = executor.map(map_function, reader)
                mapped_data.extend(results)

        flight_counts = reduce_function(mapped_data)
        generate_insights(flight_counts, os.path.splitext(os.path.basename(input_file))[0])

    except FileNotFoundError:
        print(f"Error: The file '{input_file}' was not found.")
    except Exception as e:
        print(f"An error occurred while processing '{input_file}': {e}")


In [8]:
# Main Execution
if __name__ == "__main__":
    print("\n------ Passenger Loyalty Profiling and Insights -------\n")

    # All dataset
    input_file = r"C:\Users\agree\AComp_Passenger_data_no_error.csv"  # Mandatory dataset
    # Optional: Uncomment below lines to test with validation files
    # input_file = r"C:\Users\agree\AComp_Passenger_data.csv"
    # input_file = r"C:\Users\agree\AComp_Passenger_data_no_error_DateTime.csv"

    print(f" Processing file: {input_file}")
    execute_mapreduce_with_insights(input_file)

    print("----Executed Successfully----\n")


------ Passenger Loyalty Profiling and Insights -------

 Processing file: C:\Users\agree\AComp_Passenger_data_no_error.csv

--- Enhanced Insights for AComp_Passenger_data_no_error ---

Top 5 Frequent Flyers:
1. Passenger ID: UES9151GS5, Total Flights: 24, Loyalty Tier: Platinum
2. Passenger ID: PUD8209OG3, Total Flights: 23, Loyalty Tier: Platinum
3. Passenger ID: BWI0520BG6, Total Flights: 23, Loyalty Tier: Platinum
4. Passenger ID: DAZ3029XA0, Total Flights: 23, Loyalty Tier: Platinum
5. Passenger ID: SPR4484HA6, Total Flights: 23, Loyalty Tier: Platinum

Loyalty Tier Distribution with Percentages:
Platinum: 8 passenger(s), 25.81% of all passengers, Avg Flights: 22.38
Gold: 11 passenger(s), 35.48% of all passengers, Avg Flights: 17.36
Silver: 10 passenger(s), 32.26% of all passengers, Avg Flights: 12.00
Bronze: 2 passenger(s), 6.45% of all passengers, Avg Flights: 4.50

Cumulative Contribution of Top 5 Flyers: 23.25% of all flights

Total Unique Passengers: 31
Total Flights Process

## References