In [4]:
import threading
import csv
from collections import Counter
from queue import Queue

# File handling with error checks
def read_csv_file(input_file):
    """
    Reads a CSV file and returns a list of rows.
    Handles potential file read errors and empty file scenarios.
    """
    try:
        with open(input_file, mode='r') as file:
            reader = csv.reader(file)
            return list(reader)
    except FileNotFoundError:
        print(f"Error: The file {input_file} was not found.")
        exit(1)
    except Exception as e:
        print(f"Unexpected error reading file {input_file}: {e}")
        exit(1)

# Map Phase
def map_phase(input_chunk, output_queue):
    """
    Map phase function processes a chunk of input data, extracting passenger IDs from each record.
    Each passenger ID is emitted to an output queue for further processing.
    
    input_chunk: A list of rows (each row is a list) from the CSV file.
    output_queue: A thread-safe queue to hold the output of the map phase.
    """
    for row in input_chunk:
        if row:  # Ensure row is not empty
            passenger_id = row[0]  # Assuming passenger ID is the first column
            output_queue.put(passenger_id)

# Reduce Phase
def reduce_phase(output_queue):
    """
    Reduce phase function aggregates passenger flight counts.
    Utilizes a Counter for efficient counting and determines the passenger(s) with the highest number of flights.
    
    output_queue: A thread-safe queue containing all passenger IDs from the map phase.
    return: A tuple containing a list of passenger(s) with the highest number of flights and the count of those flights.
    """
    passenger_counts = Counter()
    while not output_queue.empty():
        passenger_id = output_queue.get()
        passenger_counts[passenger_id] += 1

    max_flights = max(passenger_counts.values())
    top_passengers = [pid for pid, flights in passenger_counts.items() if flights == max_flights]
    
    return top_passengers, max_flights

# Main function
def main():
    input_file = 'AComp_Passenger_data_no_error.csv'
    data = read_csv_file(input_file)[1:]  # Skip header row
    chunk_size = 100  # Define chunk size for map phase processing
    output_queue = Queue()  # Thread-safe queue for map phase outputs

    # Split data into chunks for parallel processing
    chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]
    threads = []

    for chunk in chunks:
        thread = threading.Thread(target=map_phase, args=(chunk, output_queue))
        threads.append(thread)
        thread.start()

    # Allow all threads to complete execution before proceeding to the reduce phase
    for thread in threads:
        thread.join()

    top_passengers, max_flights = reduce_phase(output_queue)
    print(f"Passenger(s) with the highest number of flights: {top_passengers} (Total flights: {max_flights})")

if __name__ == "__main__":
    main()

Passenger(s) with the highest number of flights: ['UES9151GS5'] (Total flights: 24)
