In [1]:
#inporting required libraries
import csv
import threading
from collections import defaultdict

In [2]:
# Defining Mapper function
def mapper_func(input_records):
    map_data = []
    for record in input_records:
        passenger_id, flight_id, _, _, _, _ = record.split(',')
        map_data.append((passenger_id, flight_id))
    return map_data

In [3]:
# Defining Combiner function
def combiner_func(map_data):
    combiner_data = defaultdict(list)
    for key, value in map_data:
        combiner_data[key].append(value)
    return list(combiner_data.items())

In [4]:
# Defining Reducer Function
def reducer_func(key, values):
    return (key, len(values))

In [5]:
# Defining Sorter Function
def sorter_func(reduced_data):
    reduced_data.sort(key=lambda x: x[1], reverse=True)
    return reduced_data

In [6]:
# Read the input passanger deatils file
input_file = '/content/AComp_Passenger_data_no_error.csv'  # Update with the actual name of your input file
with open(input_file, 'r') as f:
    input_records = f.readlines()


In [7]:
# Implicity diving the entire data into 4 chunks
chunk_size = len(input_records) // 4
chunks = [input_records[i:i+chunk_size] for i in range(0, len(input_records), chunk_size)]
print('chunk_size:',chunk_size)

chunk_size: 125


In [8]:
# Defining threads for mapper
mapper_threads = []
map_data = []
for i in range(4):
    t = threading.Thread(target=lambda: map_data.extend(mapper_func(chunks[i])))
    mapper_threads.append(t)
    t.start()

In [9]:
# For loop to complete all threads of mapper
for t in mapper_threads:
    t.join()

In [10]:
# Defining threads for combiner
combiner_threads = []
combiner_data = []
for i in range(4):
    t = threading.Thread(target=lambda: combiner_data.extend(combiner_func(map_data[i::4])))
    combiner_threads.append(t)
    t.start()

In [11]:
# For loop to complete all threads of combiner
for t in combiner_threads:
    t.join()

In [12]:
# Grouping by key using shuffler
group_data = defaultdict(list)
for key, value in combiner_data:
    group_data[key].extend(value)

In [13]:
# Defining threads for reducer
reducer_threads = []
reduced_data = []
for key, values in group_data.items():
    t = threading.Thread(target=lambda: reduced_data.append(reducer_func(key, values)))
    reducer_threads.append(t)
    t.start()

In [14]:
# For loop to complete threads of reducer
for t in reducer_threads:
    t.join()


In [15]:
print('t:',t)

t: <Thread(Thread-48, stopped 140066546009856)>


In [16]:
# Sorting the data based on the passanger_id
print('reduced_data:',reduced_data)
print('reducer_threads',reducer_threads)
sorted_data = sorter_func(reduced_data)

reduced_data: [('UES9151GS5', 25), ('CYJ0225CH1', 11), ('PAJ3974RK1', 10), ('MXU9187YC7', 14), ('BWI0520BG6', 23), ('PIT2755XC1', 8), ('EZC9678QI6', 21), ('CKZ3132BR4', 19), ('WBE6935NU3', 19), ('WYU2010YH8', 19), ('CDC0302NN5', 12), ('SJD8775RZ4', 16), ('PUD8209OG3', 23), ('HCA3158QA6', 21), ('JJM4724RF7', 21), ('SPR4484HA6', 23), ('LLZ3798PE3', 16), ('WTC9125IE5', 14), ('HGO4350KK1', 18), ('EDV2089LK5', 13), ('XFG5747ZT9', 13), ('IEG9308EA5', 10), ('DAZ3029XA0', 23), ('JBE2302VO4', 16), ('POP2875LH3', 19), ('YMH6360YP0', 16), ('CXN7304ER2', 17), ('VZY2993ME1', 16), ('ONL0812DH1', 12), ('KKP5277HZ7', 11), ('UMH6360YP0', 1)]
reducer_threads [<Thread(Thread-18, stopped 140066661029632)>, <Thread(Thread-19, stopped 140066661029632)>, <Thread(Thread-20, stopped 140066661029632)>, <Thread(Thread-21, stopped 140066661029632)>, <Thread(Thread-22, stopped 140066661029632)>, <Thread(Thread-23, stopped 140066661029632)>, <Thread(Thread-24, stopped 140066546009856)>, <Thread(Thread-25, stopped 1

In [17]:
# Print the passangers_id that took more number of flights
print("Top 20 Passengers_id's with Highest Number of trips:")
for passenger, flights in sorted_data[:20]:  
    print(f"Passenger ID: {passenger}, Flights Count: {flights}")

Top Passengers with Highest Number of Flights:
Passenger ID: UES9151GS5, Flights Count: 25
Passenger ID: BWI0520BG6, Flights Count: 23
Passenger ID: PUD8209OG3, Flights Count: 23
Passenger ID: SPR4484HA6, Flights Count: 23
Passenger ID: DAZ3029XA0, Flights Count: 23
Passenger ID: EZC9678QI6, Flights Count: 21
Passenger ID: HCA3158QA6, Flights Count: 21
Passenger ID: JJM4724RF7, Flights Count: 21
Passenger ID: CKZ3132BR4, Flights Count: 19
Passenger ID: WBE6935NU3, Flights Count: 19
Passenger ID: WYU2010YH8, Flights Count: 19
Passenger ID: POP2875LH3, Flights Count: 19
Passenger ID: HGO4350KK1, Flights Count: 18
Passenger ID: CXN7304ER2, Flights Count: 17
Passenger ID: SJD8775RZ4, Flights Count: 16
Passenger ID: LLZ3798PE3, Flights Count: 16
Passenger ID: JBE2302VO4, Flights Count: 16
Passenger ID: YMH6360YP0, Flights Count: 16
Passenger ID: VZY2993ME1, Flights Count: 16
Passenger ID: MXU9187YC7, Flights Count: 14
