<h6><center>Big Data Algorithms Techniques & Platforms</center></h6>
<h1>
<hr style=" border:none; height:3px;">
<center>Assignment 1: Introduction to MapReduce</center>
<hr style=" border:none; height:3px;">
</h1>

## Flights and Airports Data

In this assignment, we are going to analyze a dataset that will include flight data. The dataset comes from <a href="https://www.kaggle.com/flashgordon/usa-airport-dataset">Kaggle</a> and is in a <code>.csv</code> file. Each line of the file represents a different flight. The data collected contain:


<code>Origin_airport</code>: Three letter airport code of the origin airport </br>
<code>Destination_airport</code>: Three letter airport code of the destination airport</br>
<code>Origin_city</code>: Origin city name</br>
<code>Destination_city</code>: Destination city name</br>
<code>Passengers</code>: Number of passengers transported from origin to destination</br>
<code>Seats</code>: Number of seats available on flights from origin to destination</br>
<code>Flights</code>: Number of flights between origin and destination (multiple records for one month, many with flights > 1)</br>
<code>Distance</code>: Distance (to nearest mile) flown between origin and destination</br>
<code>Fly_date</code>: The date (yyyymm) of flight</br>
<code>Origin_population</code>: Origin city's population as reported by US Census</br>
<code>Destination_population</code>: Destination city's population as reported by US Census</br>

## Assumption

In this assignment, I will assume that the data is partitioned into 100,000-line chunks. That is, all `map` functions defined below will be applied to 100,000 lines of data at a time.

In [15]:
from collections import defaultdict
from math import ceil
from re import compile
from os import listdir

FILENAME = "Airports2.csv"
ROWS_PER_PARTITION = 100000

In [3]:
with open(FILENAME, "r") as read_file:
    header = read_file.readline()
    line = read_file.readline()
    row_count = 0
    while line:
        row_count += 1
        line = read_file.readline()

    print(f"The total rows are {row_count}.")
    print(f"The number of partitions is {ceil(row_count / ROWS_PER_PARTITION)}.")

The total rows are 3606803.
The number of partitions is 37.


In [4]:
with open(FILENAME, "r") as read_file:  
    header = read_file.readline()
    file_num = 0
    file_name = f"data_part{file_num}.csv"
    partition = open(file_name, "w")
    partition.write(header)
    line = read_file.readline()
    rows_written = 0
    while line:
        partition.write(line)
        rows_written += 1
        line = read_file.readline()
        if rows_written % ROWS_PER_PARTITION == 0:
            partition.close()
            file_num = rows_written // ROWS_PER_PARTITION
            file_name = f"data_part{file_num}.csv"
            partition = open(file_name, "w")
            partition.write(header)

    partition.close()

In [28]:
filenames_pattern = compile(r"^data_part.*\.csv$") 
filenames = list(filter(filenames_pattern.match, listdir('./')))
print(f"The number of created files is {len(filenames)}.")
print("Sample names:", filenames[:5])

The number of created files is 37.
Sample names: ['data_part11.csv', 'data_part10.csv', 'data_part12.csv', 'data_part13.csv', 'data_part17.csv']


### Helper functions

In [None]:
def read_csv(filename):
    """
    Parse a CSV file with the given `filename`.

    Parameters
    ----------
    filename : str
        Full name of the CSV file containing records.

    Yields
    ------
    list of str
        Each line of the CSV file split into a list.

    """
    with open(filename, 'r') as file:
        for line in file:
            line_clean = line[:-1].replace("\"", "").replace(", ", ": ")
            yield line_clean.split(",")


contents = read_csv(FILENAME)
next(contents)

In [None]:
i = 0
for row in contents:
    if i < 5:
        print(row)
        i += 1
    else:
        break

In [None]:
def find_indices(rows_gen, column_names):
    """
    Find the indices of given columns.

    Modifies the input generator by moving it by 1 row.

    Parameters
    ----------
    rows_gen : generator
        Generator yielding the rows of the text file.
    column_names : tuple of str
        Names of the columns whose indices we need.

    Returns
    -------
    list of int
        List containing the index of each column.

    """
    header = next(rows_gen)
    indices = []
    for column in column_names:
        indices.append(header.index(column))
    return indices


def apply_map(filename, map_func, column_names):
    """
    Iterate over `filename` and apply `map_func` on each line.

    Parameters
    ----------
    filename : str
        Full name of a text file containing records.
    map_func : func
        Map function returning a tuple for each line.
    column_names : tuple of str
        Names of the columns that `map_func` needs.

    Returns
    -------
    list of tuples
        Contains tuples returned by each call of `map_func`. If the
        map result contains None, it is not included in the list.
    """
    lines = read_csv(filename)
    column_indices = find_indices(lines, column_names)
    map_result_list = []
    for line in lines:
        map_result = map_func(line, column_indices)
        if None not in map_result:
            map_result_list.append(map_result)

    return map_result_list


def apply_reduce(tuples_dict, reduce_func):
    """
    Iterate over `tuples_dict` and apply `reduce_func` on each key.

    Parameters
    ----------
    tuples_dict : dict of tuples
        Dictionary of key-value pairs produced by the corresponding
        shuffle function.
    reduce_func : func
        Reduce function returning a tuple for each key in `tuples_dict`.

    Returns
    -------
    list of tuples
        Contains tuples returned by each call of `reduce_func`.

    """
    reduce_list = []
    for key in tuples_dict:
        reduce_list.append(reduce_func(tuples_dict, key))

    return reduce_list

### <strong> Exercise 1 - Almost-empty flights</strong> 
#### <strong> 4 points </strong>
Describe and define a MapReduce procedure that gives the number of flights that departed with at most 10% capacity (i.e. empty, 1%, 6%, etc.). 

The output can be of any form you like. You can use any data structure you want to support your implementation.
#### <strong> Answer </strong>

In [None]:
def map_empty(line, column_indices):
    """
    Create a tuple for flights having at most 10% occupancy rate.

    Parameters
    ----------
    line : list of str
        Each line of a text file split into a list.
    column_indices : list of int
        List containing the indices of Passengers, Seats, and
        Flights columns.

    Returns
    -------
    tuple of (float, int) or (None, None)
        Each tuple is (occupancy rate, flight count) if occupancy
        rate is at most 10%. Otherwise, it is (None, None).

    """
    passenger_idx, seats_idx, flights_idx = column_indices
    passengers = int(line[passenger_idx])
    seats = int(line[seats_idx])
    try:
        occupancy_rate = passengers / seats
        if occupancy_rate <= 0.1:
            flights = int(line[flights_idx])
            return round(occupancy_rate, 2), flights
        else:
            return None, None
    except ZeroDivisionError:
        return None, None


columns1 = ("Passengers", "Seats", "Flights")
map_empty_result = apply_map(FILENAME, map_empty, columns1)
map_empty_result[:7]

In [None]:
def shuffle_flights(tuples_list):
    """
    Take the key-value pairs created by a map function and
    create a list of all values corresponding to the same key.

    Parameters
    ----------
    tuples_list : list of (hashable type, any type)
        List of tuples, where each tuple is a (key, value) pair.

    Returns
    -------
    defaultdict of {hashable type : list of any type}
        Keys are the individual values of the grouping variable,
        and values are the lists corresponding to same key.

    """
    pairs_dict = defaultdict(list)
    for key, value in tuples_list:
        pairs_dict[key].append(value)

    return pairs_dict


shuffle_empty_result = shuffle_flights(map_empty_result)
shuffle_empty_result[0.09][:10]

In [None]:
def reduce_flights(pairs_dict, key):
    """
    Sum all values corresponding to the given key.

    Parameters
    ----------
    pairs_dict : dict of {hashable type : list of int}
        Keys are the individual values of the grouping variable,
        and values are the lists corresponding to same key.
    key : hashable type
        Individual value of the grouping variable for which we
        need a sum.

    Returns
    -------
    tuple of (hashable type, int)
        tuple of the form (key, sum of all values).

    """
    values_list = pairs_dict[key]
    values_sum = sum(values_list)
    return key, values_sum


reduce_empty_partial = apply_reduce(shuffle_empty_result, reduce_flights)
reduce_empty_partial

In [None]:
reduce_empty_full = sum(x[1] for x in reduce_empty_partial)
print(f"The number of almost empty flights is {reduce_empty_full}.")

#### <strong> Combine function </strong>
In this particular case, it is not possible to use the `combine` operator. If processed parallelly by different machines, each input line produces only one key-value pair of (occupancy rate, flight count) after the one-to-one `map` function. That's why there are no other key-value pairs to combine.

### <strong> Exercise 2 - Top five destination airports </strong>
### <strong> 4 points </strong>

Provide now a function that lists the top five destination <strong>airports</strong>: the ones that have the highest number of incoming flights. Implement an algorithm that uses the MapReduce procedure.

#### <strong> Answer </strong>

In [None]:
def map_flights(line, column_indices):
    """
    Create a tuple of (group variable, flight count).

    Parameters
    ----------
    line : list of str
        Each line of a text file split into a list.
    column_indices : list of int
        List containing the indices of the grouping variable
        and the Flights columns.

    Returns
    -------
    tuple of (hashable type, int)
        Hashable type is the type of the grouping variable.

    """
    group_var_idx, flights_idx = column_indices
    group_var = line[group_var_idx]
    flight_count = int(line[flights_idx])
    return group_var, flight_count


columns2 = ("Destination_airport", "Flights")
map_top_airports = apply_map(FILENAME, map_flights, columns2)
map_top_airports[:10]

For the `shuffle` and `reduce` operations, we can take advantage of functions written for Exercise 1.

In [None]:
shuffle_top_airports = shuffle_flights(map_top_airports)
shuffle_top_airports["END"]

In [None]:
reduce_top_airports = apply_reduce(shuffle_top_airports, reduce_flights)
reduce_top_airports.sort(key=lambda x: x[1], reverse=True)
print("The top-5 airports are", reduce_top_airports[:5])

### <strong> Exercise 3 - Top 5 destination cities </strong>
#### <strong>  2 points </strong>

Try to reuse the code you run before and define a function that lists the top five destination <strong>cities</strong>: the ones that have the highest number of incoming flights. Implement an algorithm that uses the MapReduce procedure.


#### <strong> Answer </strong>

Since this question is almost identical to the previous one, the only thing to do is to use `Destination_city` column instead of the `Destination_airport` column. We do not need to write new functions.

In [None]:
columns3 = ("Destination_city", "Flights")
map_top_cities = apply_map(FILENAME, map_flights, columns3)
map_top_cities[:10]

In [None]:
shuffle_top_cities = shuffle_flights(map_top_cities)
shuffle_top_cities["Ames: IA"]

In [None]:
reduce_top_cities = apply_reduce(shuffle_top_cities, reduce_flights)
reduce_top_cities.sort(key=lambda x: x[1], reverse=True)
print("The top-5 cities are", reduce_top_cities[:5])

## Statistics on flights

<p align="justify">
<font size="3">
Now we want to run some more complex analysis on the flights. 
</font>
</p>





### <strong> Exercise 4 - Top five connections by month</strong>
#### <strong> 4 points </strong>

Try to reuse the code you run before and define now a function that lists the top five connections by each month: the top five pairs of cities that have the most number of flights. The function should take into account the flights from A to B and from B to A by month/year. Implement an algorithm that uses the MapReduce procedure.

#### <strong> Answer </strong>

For this question, we will use a two-step MapReduce procedure. After the first step, we will have the total number of flights between two cities in each month/year. After the second step, we will have the top-5 connected cities in each month/year. 

Here, we re-use the `shuffle` function twice and the `reduce` function once. (Both were written for Exercise 1.)

In [None]:
def map_connections1(line, column_indices):
    """
    Create a tuple for flights between two cities by month/year.

    Parameters
    ----------
    line : list of str
        Each line of a text file split into a list.
    column_indices : list of int
        List containing the indices of Origin_city, Destination_city,
        Flights, and Fly_date columns.

    Returns
    -------
    tuple of ((str, str, str), int)
        Each tuple is ((date, city1, city2), flight count).

    """
    source_idx, destination_idx, flights_idx, date_idx = column_indices
    source = line[source_idx]
    destination = line[destination_idx]
    flights = int(line[flights_idx])
    date = line[date_idx][:-3]
    if source > destination:
        destination, source = source, destination  # ordering pairs alphabetically

    joint_key = (date, source, destination)
    return joint_key, flights


columns4 = ("Origin_city", "Destination_city", "Flights", "Fly_date")
map_connections_result1 = apply_map(FILENAME, map_connections1, columns4)
map_connections_result1[:10]

In [None]:
shuffle_connections_result1 = shuffle_flights(map_connections_result1)
shuffle_connections_result1[("1990-02", "Bend: OR", "Seattle: WA")]

In [None]:
reduce_connections_result1 = apply_reduce(shuffle_connections_result1, reduce_flights)
reduce_connections_result1[:10]

In [None]:
def map_connections2(key, flights):
    """
    Regroup flights between two cities by month/year.

    Parameters
    ----------
    key : tuple of (str, str, str)
        A tuple of the form (date, city1, city2)
    flights : int
        Count of flights between the two cities by month/year.

    Returns
    -------
    tuple of (str, (str, str, int))
        Each tuple is of the form (date, (city1, city2, count)).

    """
    date, source, destination = key
    new_value = (source, destination, flights)
    return date, new_value


map_connections_result2 = []
for key, flights in reduce_connections_result1:
    map_result = map_connections2(key, flights)
    map_connections_result2.append(map_result)

map_connections_result2[:10]

In [None]:
shuffle_connections_result2 = shuffle_flights(map_connections_result2)
shuffle_connections_result2['1999-01'][:10]

In [None]:
def reduce_connections2(flights_dict, date):
    """
    Find the top-5 cities with the most flights for the given date.

    Parameters
    ----------
    flights_dict : dict of {str : list of (str, str, int)}
        Keys are dates, and values are lists of tuples of the form
        (city1, city2, count).
    date : str
        Date of the form yyyy-mm for which we need the top-5 cities.

    Returns
    -------
    tuple of (str, list of (str, str, int))
        A tuple of the form (date, list of (city1, city2, count)).

    """
    flight_info = flights_dict[date]
    flight_info.sort(key=lambda x: x[2], reverse=True)
    return date, flight_info[:5]


reduce_connections_result2 = apply_reduce(
    shuffle_connections_result2, reduce_connections2)
reduce_connections_result2.sort()
reduce_connections_result2[:5]

#### <strong> Combine function </strong>

In this particular case, it is not possible to use the `combine` operator. If processed parallelly by different machines, each input produces only one key-value pair after the corresponding `map` function. In other words, both map functions are one-to-one. That's why there are no other key-value pairs to combine.

### <strong> Exercise 5 - Number of full flights</strong>
#### <strong> 2 points </strong>
<p align="justify">
Describe and implement an algorithm that, following MapReduce procedure, shows how many full flights have departed. This exercise gives you an idea about how many times you can re-use code in MapReduce with minimum effort for repetitive analysis.
</font>
</p>



#### <strong> Answer </strong>

To write a function that we can use for both Exercise 5 and Exercise 6, I will modify the `map` function of Exercise 1 slightly. For the `shuffle` and `reduce` operations, we can take advantage of functions written for Exercise 1.

In [None]:
def map_full(line, column_indices):
    """
    Create a tuple for flights depending on their occupancy rate.

    Parameters
    ----------
    line : list of str
        Each line of a text file split into a list.
    column_indices : list of int
        List containing the indices of Passengers, Seats, and
        Flights columns.

    Returns
    -------
    tuple of (int, int) or (None, None)
        Each tuple is (dummy key, flight count). The dummy key
        is 1 if the occupancy rate is 100% and 0 otherwise. The
        return values is (None, None) if seats count is zero.

    """
    passenger_idx, seats_idx, flights_idx = column_indices
    passengers = int(line[passenger_idx])
    seats = int(line[seats_idx])
    try:
        occupancy_rate = passengers / seats
        flight_count = int(line[flights_idx])
        if occupancy_rate == 1:
            return 1, flight_count
        else:
            return 0, flight_count
    except ZeroDivisionError:
        return None, None


map_full_result = apply_map(FILENAME, map_full, columns1)
map_full_result[:7]

In [None]:
shuffle_full_result = shuffle_flights(map_full_result)
shuffle_full_result[1.0][:10]

In [None]:
reduce_full = apply_reduce(shuffle_full_result, reduce_flights)
reduce_full.sort()
reduce_full

In [None]:
number_full = reduce_full[1][1]
print(f"The number of full flights is {number_full}.")

In [None]:
share_full = reduce_full[1][1] / (reduce_full[0][1]+reduce_full[1][1])
print(f"The share of full flights is {share_full*100:.2f}%.")


### <strong> Exercise 6 -  Percentage of full flights </strong>
#### <strong> 4 points </strong>

<p align="justify">
Describe and implement a MapReduce procedure that gives, for each city, the percentage of full flights that have departed.

Notice that this exercise shares some similarities with one of the previous exercises. Think how and if you can modify (generalize) one of the functions already implemented before. 
</font>
</p>



#### <strong> Answer </strong>
Inside the `map` function of this exercise, I will delegate most of the computation to the `map` function of Exercise 5. Once again, the `shuffle` function of Exercise 1 will be re-used.

In [None]:
def map_proportion(line, column_indices):
    """

    Parameters
    ----------
    line : list of str
        Each line of a text file split into a list.
    column_indices : list of int
        List containing the indices of Origin_city, Passengers,
        Seats, and Flights columns.

    Returns
    -------

    """
    city_idx = column_indices[0]
    city = line[city_idx]
    new_column_indices = column_indices[1:]
    is_full_tuple = map_full(line, new_column_indices)
    if is_full_tuple != (None, None):
        return city, is_full_tuple
    else:
        return None, is_full_tuple


columns6 = ("Origin_city", "Passengers", "Seats", "Flights")
map_proportion_result = apply_map(FILENAME, map_proportion, columns6)
map_proportion_result[:7]

In [None]:
shuffle_proportion_result = shuffle_flights(map_proportion_result)
shuffle_proportion_result["Los Angeles: CA"][:10]

In [None]:
def reduce_proportion(counts_dict, key):
    """
    Compute the weighted average of all values for the given key.

    Parameters
    ----------
    counts_dict : dict of {hashable type : list of tuple}
        Keys are the individual values of the grouping variable,
        and values are the lists of tuples corresponding to same key.
        The first element of each tuple is the value to be averaged,
        and the second element is the corresponding weight.
    key : hashable type
        Individual value of the grouping variable for which we
        need a weighted average.

    Returns
    -------
    tuple of (hashable type, float)
        tuple of the form (key, weighted average of all values).

    """
    value_weight_list = counts_dict[key]
    sum_of_weights = 0
    weighted_sum = 0
    for value, weight in value_weight_list:
        weighted_sum += value * weight
        sum_of_weights += weight

    weighted_mean = weighted_sum / sum_of_weights
    return key, weighted_mean


reduce_proportion_result = apply_reduce(shuffle_proportion_result, reduce_proportion)
reduce_percent_result = [(key, share*100) for key, share in reduce_proportion_result]
reduce_percent_result.sort(key=lambda x: x[1], reverse=True)
reduce_percent_result[:7]

#### <strong> Combine function </strong>
Once again, it is not possible to use the `combine` operator. If processed parallelly by different machines, each input line produces only one key-value pair after the `map` function. That's why there are no other key-value pairs to combine.