# Flight Data Assignment
**Dieter Esteban de Wit Torres**

### Setting up Spark

In [1]:
// Import SparkSQL version 2.4.8
import $ivy.`org.apache.spark::spark-sql:2.4.8`

[32mimport [39m[36m$ivy.$                                  [39m

In [2]:
// Imports
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.DataFrame

import java.time.LocalDate
import java.sql.Date

[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql.expressions.Window
[39m
[32mimport [39m[36morg.apache.spark.sql.types.StructType
[39m
[32mimport [39m[36morg.apache.spark.sql.DataFrame

[39m
[32mimport [39m[36mjava.time.LocalDate
[39m
[32mimport [39m[36mjava.sql.Date[39m

In [3]:
// Disable server warnings to avoid extra text on cells
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m

#### To run in MyBinder
Comment the following cell if running with Spark Standalone or Yarn Cluster

In [4]:
// Local Mode, in the same JVM as the kernel
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Loading spark-stubs


SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.


Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@70935890

#### Running locally with a Spark Standalone Cluster
Comment the following cell if running it on MyBinder

In [4]:
// Connect to the Spark Standalone cluster
// val spark = {
//    NotebookSparkSession.builder()
//    .master("spark://localhost:7077")
//    .config("spark.executor.instances", "4")
//    .config("spark.executor.memory", "2g")
//    .getOrCreate()
//}

In [5]:
// Spark Imports
import spark.implicits._

[32mimport [39m[36mspark.implicits._[39m

In [6]:
// Get a Spark Context
def sc = spark.sparkContext

defined [32mfunction[39m [36msc[39m

### Reading the data

In [7]:
/**
  * Reads a CSV file into a dataset of a specified type & schema.
  *
  * @param spark   SparkSession to use for reading the file.
  * @param path    Path to the CSV file.
  * @param schema  Schema of the CSV file.
  * @param encoder Encoder used to convert CSV rows to objects of type T
  * @tparam T      Type of the objects to be returned
  * @return A dataset of the specified type.
  */
def readCsvToDataset[T](spark: SparkSession, path: String, schema: StructType, encoder: Encoder[T]): Dataset[T] = {
    val dateFormat = "yyyy-MM-dd"
    val dateColumnExists = schema.fieldNames.contains("date")
    
    val reader = spark.read
        .option("header", "true")
        .schema(schema)
        // In case of handling large files, set the csv to be read in
        // parallel, partitions depending on cores on cluster.
        .option("spark.sql.shuffle.partitions", "4")
    
    if (dateColumnExists) {
        reader.option("dateFormat", dateFormat)
    }
    
    reader.csv(path).as(encoder)
}

defined [32mfunction[39m [36mreadCsvToDataset[39m

In [8]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);

case class Passenger(passengerId: Int, firstName: String, lastName: String)

val passengerSchema = new StructType()
  .add("passengerId", "integer")
  .add("firstName", "string")
  .add("lastName", "string")

val passengerData: Dataset[Passenger] = readCsvToDataset[Passenger](
    spark,
    "passengers.csv",
    passengerSchema,
    Encoders.product[Passenger]
)

defined [32mclass[39m [36mPassenger[39m
[36mpassengerSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"passengerId"[39m, IntegerType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"firstName"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"lastName"[39m, StringType, [32mtrue[39m, {})
)
[36mpassengerData[39m: [32mDataset[39m[[32mPassenger[39m] = [passengerId: int, firstName: string ... 1 more field]

In [9]:
passengerData.show()

+-----------+---------+--------+
|passengerId|firstName|lastName|
+-----------+---------+--------+
|      14751| Napoleon| Gaylene|
|       2359| Katherin| Shanell|
|       5872|   Stevie|  Steven|
|       3346|Margarita|   Gerri|
|       3704|    Earle|  Candis|
|       1226|    Trent|    Omer|
|       2677|    Janee|  Lillia|
|        179|     Gita|Chastity|
|       9763|   Hilton|Jaquelyn|
|      11414|      Leo|Margaret|
|       6870|     Tama|     Bok|
|       3290|    Logan|    Anya|
|      13264|   Lowell|Kathryne|
|        455|  Maritza|  Maxima|
|      13006|     Yuri|   Joyce|
|      10323|  Latasha|  Estell|
|       7376|   Kaycee|Kiersten|
|      15015|   Curtis| Abraham|
|       9217|   Verena|Josefine|
|       5183|     Loan| Latonya|
+-----------+---------+--------+
only showing top 20 rows



In [10]:
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this);

case class FlightRecord(passengerId: Int, flightId: Int, from: String, to: String, date: String)

val flightRecordSchema = new StructType()
  .add("passengerId", "integer")
  .add("flightId", "integer")
  .add("from", "string")
  .add("to", "string")
  .add("date", "string")

val flightRecordData: Dataset[FlightRecord] = readCsvToDataset[FlightRecord](
    spark,
    "flightData.csv",
    flightRecordSchema,
    Encoders.product[FlightRecord]
)

defined [32mclass[39m [36mFlightRecord[39m
[36mflightRecordSchema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"passengerId"[39m, IntegerType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"flightId"[39m, IntegerType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"from"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"to"[39m, StringType, [32mtrue[39m, {}),
  [33mStructField[39m([32m"date"[39m, StringType, [32mtrue[39m, {})
)
[36mflightRecordData[39m: [32mDataset[39m[[32mFlightRecord[39m] = [passengerId: int, flightId: int ... 3 more fields]

In [11]:
flightRecordData.show()

+-----------+--------+----+---+----------+
|passengerId|flightId|from| to|      date|
+-----------+--------+----+---+----------+
|         48|       0|  cg| ir|2017-01-01|
|         94|       0|  cg| ir|2017-01-01|
|         82|       0|  cg| ir|2017-01-01|
|         21|       0|  cg| ir|2017-01-01|
|         51|       0|  cg| ir|2017-01-01|
|         33|       0|  cg| ir|2017-01-01|
|         20|       0|  cg| ir|2017-01-01|
|         10|       0|  cg| ir|2017-01-01|
|         49|       0|  cg| ir|2017-01-01|
|         32|       0|  cg| ir|2017-01-01|
|         70|       0|  cg| ir|2017-01-01|
|         28|       0|  cg| ir|2017-01-01|
|         42|       0|  cg| ir|2017-01-01|
|         62|       0|  cg| ir|2017-01-01|
|         80|       0|  cg| ir|2017-01-01|
|         13|       0|  cg| ir|2017-01-01|
|         46|       0|  cg| ir|2017-01-01|
|         43|       0|  cg| ir|2017-01-01|
|         17|       0|  cg| ir|2017-01-01|
|         16|       0|  cg| ir|2017-01-01|
+----------

### Question 1
Find the total number of flights for each month

In [12]:
/**
 * Find the total number of flights for each month.
 *
 * @param flightRecordData Dataset of flight records
 * @return A dataset of pairs containing the month and the number of flights in that month
 */
def countFlightsByMonth(flightRecordData: Dataset[FlightRecord]): Dataset[(Int, Long)] = {
  flightRecordData
    .groupBy(month($"date").as("Month"))
    .agg(count("*").as("Number of Flights"))
    .orderBy("Month")
    .as[(Int, Long)]
}

defined [32mfunction[39m [36mcountFlightsByMonth[39m

In [13]:
val flightsByMonth = countFlightsByMonth(flightRecordData)

[36mflightsByMonth[39m: [32mDataset[39m[([32mInt[39m, [32mLong[39m)] = [Month: int, Number of Flights: bigint]

In [14]:
flightsByMonth.show()

+-----+-----------------+
|Month|Number of Flights|
+-----+-----------------+
|    1|             9700|
|    2|             7300|
|    3|             8200|
|    4|             9200|
|    5|             9200|
|    6|             7100|
|    7|             8700|
|    8|             7600|
|    9|             8500|
|   10|             7600|
|   11|             7500|
|   12|             9400|
+-----+-----------------+



In [15]:
val question1 = flightsByMonth.toDF()

try {
    question1.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter", ",")
      .csv("question1.csv")
} catch {
    case e: AnalysisException => println("File already exists.")
}

File already exists.


[36mquestion1[39m: [32mDataFrame[39m = [Month: int, Number of Flights: bigint]

### Question 2
Find the names of the 100 most frequent flyers.

In [16]:
/**
  * Retrieves the 100 most frequent flyers from the given `flightRecordData` and `passengerData` datasets.
  * @param flightRecordData Dataset of flight records.
  * @param passengerData    Dataset of passengers..
  * @return A dataset of tuples containing the top 100 frequent flyers.
*/
def getTop100FrequentFlyers(flightRecordData: Dataset[FlightRecord], passengerData: Dataset[Passenger]): Dataset[(Int, Long, String, String)] = {
  val passengerFlights = flightRecordData
    .groupBy("passengerId")
    .agg(count("*").as("Number of Flights"))

  val passengerNames = passengerData
    .join(passengerFlights, Seq("passengerId"))
    .select($"passengerId", $"Number of Flights", $"firstName", $"lastName")

  passengerNames
    .orderBy(desc("Number of Flights"))
    .limit(100)
    .as[(Int, Long, String, String)]
}

defined [32mfunction[39m [36mgetTop100FrequentFlyers[39m

In [17]:
val top100FrequentFlyers = getTop100FrequentFlyers(flightRecordData, passengerData)

[36mtop100FrequentFlyers[39m: [32mDataset[39m[([32mInt[39m, [32mLong[39m, [32mString[39m, [32mString[39m)] = [passengerId: int, Number of Flights: bigint ... 2 more fields]

In [18]:
top100FrequentFlyers.show()

+-----------+-----------------+---------+--------+
|passengerId|Number of Flights|firstName|lastName|
+-----------+-----------------+---------+--------+
|       2068|               32|  Yolande|    Pete|
|       1677|               27|Katherina|Vasiliki|
|       4827|               27|    Jaime|   Renay|
|       8961|               26|    Ginny|   Clara|
|       3173|               26| Sunshine|   Scott|
|       5867|               25|    Luise| Raymond|
|       2857|               25|      Son| Ginette|
|        760|               25|   Vernia|     Mui|
|       8363|               25|   Branda|  Kimiko|
|       5096|               25|   Blythe|    Hyon|
|       6084|               25|     Cole|  Sharyl|
|        288|               25|   Pamila|   Mavis|
|        917|               25|   Anisha|  Alaine|
|       1240|               24|Catherine|   Missy|
|       5668|               24|   Gladis| Earlene|
|       1343|               24|  Bennett|   Staci|
|       2441|               24|

**I rewrite the function above to account for some performance modifications. First, we are able to send as a parameter the number of frequent flyers we want to retrieve, giving the functional programming a reusable approach. Second, I am using the approach of Broadcast Join, here, if the table to be joint is relatively small (in this case passenger being smaller than flightRecords) you can send small partitions to all nodes so that the operation can be performed locally on all nodes and to avoid shuffling the data across the network**

In [19]:
/**
  * Retrieves the `n` most frequent flyers from the given `flightRecordData` and `passengerData` datasets.
  *
  * @param flightRecordData Dataset of FlightRecord objects containing information about each flight.
  * @param passengerData    Dataset of Passenger objects containing information about each passenger.
  * @param n                Number of frequent flyers to retrieve.
  * @return A dataset of tuples containing the top 'n' frequent flyers.
  */
def getFrequentFlyers(flightRecordData: Dataset[FlightRecord], passengerData: Dataset[Passenger], n: Int): Dataset[(Int, Long, String, String)] = {
  val flightsByPassenger = flightRecordData
    .join(broadcast(passengerData), Seq("passengerId"))
    .groupBy("passengerId")
    .agg(
      first("firstName").as("firstName"), 
      first("lastName").as("lastName"), 
      count("*").as("Number of Flights")
    )
    .orderBy(desc("Number of Flights"))
    .limit(n)
    .select("passengerId", "Number of Flights", "firstName", "lastName")
    .as[(Int, Long, String, String)]

  flightsByPassenger
}

defined [32mfunction[39m [36mgetFrequentFlyers[39m

In [20]:
val topFrequentFlyers = getFrequentFlyers(flightRecordData, passengerData, 100)

[36mtopFrequentFlyers[39m: [32mDataset[39m[([32mInt[39m, [32mLong[39m, [32mString[39m, [32mString[39m)] = [passengerId: int, Number of Flights: bigint ... 2 more fields]

In [21]:
topFrequentFlyers.show()

+-----------+-----------------+---------+--------+
|passengerId|Number of Flights|firstName|lastName|
+-----------+-----------------+---------+--------+
|       2068|               32|  Yolande|    Pete|
|       1677|               27|Katherina|Vasiliki|
|       4827|               27|    Jaime|   Renay|
|       8961|               26|    Ginny|   Clara|
|       3173|               26| Sunshine|   Scott|
|       5867|               25|    Luise| Raymond|
|       2857|               25|      Son| Ginette|
|        760|               25|   Vernia|     Mui|
|       8363|               25|   Branda|  Kimiko|
|       5096|               25|   Blythe|    Hyon|
|       6084|               25|     Cole|  Sharyl|
|        288|               25|   Pamila|   Mavis|
|        917|               25|   Anisha|  Alaine|
|       1240|               24|Catherine|   Missy|
|       5668|               24|   Gladis| Earlene|
|       1343|               24|  Bennett|   Staci|
|       2441|               24|

In [22]:
val question2 = top100FrequentFlyers.toDF()

try {
    question2.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter", ",")
      .csv("question2.csv")
} catch {
    case e: AnalysisException => println("File already exists.")
}

File already exists.


[36mquestion2[39m: [32mDataFrame[39m = [passengerId: int, Number of Flights: bigint ... 2 more fields]

### Question 3
Find the greatest number of countries a passenger has been in without being in the UK.

**To encourage reusability of functions, again, we include the ability to send as parameter the country to find the longest streaks. Also, some performance tunning are included in the following function. First, the use of repartition after each transformation which avoids the uneven distribution of data. Also, the use of Window to sort the data and avoid the data to be shuffled unnecessarily**

In [23]:
/**
 * Returns the greatest streak of flights for each passenger from having landed in a country and without landing again.
 *
 * @param passengerFlightsData Dataset containing the flight records of passengers.
 * @param country              Country which passengers cannot land in to be counted in the streak.
 * @return a dataset with the passenger ID and their longest streak without landing on the given country.
 */
def greatestStreakWithoutCountry(passengerFlightsData: Dataset[FlightRecord], country: String): Dataset[(Int, BigInt)] = {

  val countryCounts = passengerFlightsData
    .select("passengerId", "from", "to", "date")
    .filter(col("to") =!= country)  // filters out flights to the given country
    .withColumn("previousCountry", lag("to", 1).over(Window.partitionBy("passengerId").orderBy("date")))
    .filter(col("previousCountry") =!= country)
    .select("passengerId", "to", "previousCountry")  // filters out flights with previous destination as the given country
    .distinct()
    .groupBy("passengerId")
    .agg(countDistinct("to").as("countryCount"))  // counts the number of unique destinations for each passenger
    .filter("countryCount > 0")
    .repartition(col("passengerId"))

  val selectedCountryCounts = passengerFlightsData
    .filter(col("to") === country)  // filter flights that arrive at the specified country
    .select("passengerId", "from", "to", "date")
    .withColumn("previousCountry", lag("to", 1).over(Window.partitionBy("passengerId").orderBy("date")))
    .filter(col("previousCountry") === country)  // filter flights that previously departed from the specified country
    .select("passengerId", "previousCountry")
    .distinct()
    .groupBy("passengerId")
    .agg(countDistinct("previousCountry").as("selectedCountryCounts"))  // count the number of unique previous countries
    .repartition(col("passengerId"))

  countryCounts
    .join(selectedCountryCounts, Seq("passengerId"), "left")
    .withColumn("Longest Run", when(col("selectedCountryCounts").isNull, col("countryCount")).otherwise(col("countryCount") - col("selectedCountryCounts")))
    .select("passengerId", "Longest Run")
    .orderBy(col("Longest Run").desc)
    .as[(Int, BigInt)]
}

defined [32mfunction[39m [36mgreatestStreakWithoutCountry[39m

In [24]:
val longestStreak = greatestStreakWithoutCountry(flightRecordData, "uk")

[36mlongestStreak[39m: [32mDataset[39m[([32mInt[39m, [32mBigInt[39m)] = [passengerId: int, Longest Run: bigint]

In [25]:
longestStreak.show()

+-----------+-----------+
|passengerId|Longest Run|
+-----------+-----------+
|       9441|         18|
|        798|         17|
|        288|         17|
|       2378|         17|
|       3608|         17|
|       2857|         17|
|        721|         16|
|       8353|         16|
|       2867|         16|
|       1677|         16|
|       3173|         16|
|       1337|         16|
|       8411|         16|
|       6084|         16|
|       2068|         16|
|       5668|         16|
|       2437|         16|
|       1651|         16|
|         92|         16|
|       3367|         16|
+-----------+-----------+
only showing top 20 rows



In [26]:
val question3 = longestStreak.toDF()

try {
    question3.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter", ",")
      .csv("question3.csv")
} catch {
    case e: AnalysisException => println("File already exists.")
}

File already exists.


[36mquestion3[39m: [32mDataFrame[39m = [passengerId: int, Longest Run: bigint]

### Question 4
Find the passengers who have been on more than 3 flights together.

In [27]:
// Self-join the DataFrame on flightId and group by passengerId pairs to count the number of flights they have taken together
val sharedFlights = flightRecordData.as("passenger1Data")
  .join(flightRecordData.as("passenger2Data"), "flightId")
  .where($"passenger1Data.passengerId" < $"passenger2Data.passengerId")
  .groupBy($"passenger1Data.passengerId", $"passenger2Data.passengerId")
  .agg(count("*").as("Number of Flights Together"))
  .where($"Number of Flights Together" > 3)
  .orderBy($"Number of Flights Together".desc)

// Return the result as a dataset
val result = sharedFlights.select(
    $"passenger1Data.passengerId".as("Passenger 1 ID"), 
    $"passenger2Data.passengerId".as("Passenger 2 ID"), 
    $"Number of Flights Together"
  ).as[(Int, Int, Long)]

result.show()

+--------------+--------------+--------------------------+
|Passenger 1 ID|Passenger 2 ID|Number of Flights Together|
+--------------+--------------+--------------------------+
|           701|           760|                        15|
|          3503|          3590|                        14|
|          2717|          2759|                        14|
|          2939|          5490|                        13|
|          4395|          4399|                        12|
|          1208|          3093|                        12|
|           382|           392|                        12|
|          2759|          4316|                        12|
|          4316|          4373|                        12|
|          1337|          2867|                        12|
|          2926|          3590|                        12|
|          7877|          9252|                        12|
|           366|           374|                        12|
|          2550|          4441|                        1

[36msharedFlights[39m: [32mDataset[39m[[32mRow[39m] = [passengerId: int, passengerId: int ... 1 more field]
[36mresult[39m: [32mDataset[39m[([32mInt[39m, [32mInt[39m, [32mLong[39m)] = [Passenger 1 ID: int, Passenger 2 ID: int ... 1 more field]

In [28]:
val question4 = result.toDF()

try {
    question4.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter", ",")
      .csv("question4.csv")
} catch {
    case e: AnalysisException => println("File already exists.")
}

File already exists.


[36mquestion4[39m: [32mDataFrame[39m = [Passenger 1 ID: int, Passenger 2 ID: int ... 1 more field]

### For Extra Marks
Find the passengers who have been on more than N flights together within the range (from, to).

In [29]:
/**
  * Pairs the flightRecordData dataset by passengers and returns the ones that have flown
  * together at least N times within the date range specified.
  *
  * @param atLeastNTimes Minimum number of times the passengers must have flown together
  * @param from          Starting date of the date range
  * @param to            Ending date of the date range
  * @return Dataset with the following columns:
  *         Passenger 1 ID, Passenger 2 ID, Number of flights together, From, To
  */
def searchSharedFlights(atLeastNTimes: Int, from: Date, to: Date) = {
  // Filter the dataset based on the date range
  val filteredFlightsByDate = flightRecordData.filter($"date" >= from && $"date" <= to)

  // Self-join the dataset on flightId and group by passengerId pairs to count the number of flights they have taken together
  val sharedFlights = filteredFlightsByDate.as("passenger1Data")
    .join(filteredFlightsByDate.as("passenger2Data"), "flightId")
    .where($"passenger1Data.passengerId" < $"passenger2Data.passengerId")
    .groupBy($"passenger1Data.passengerId", $"passenger2Data.passengerId")
    .agg(count("*").as("Number of Flights Together"))
    .where($"Number of Flights Together" > atLeastNTimes)
    .orderBy($"Number of Flights Together".desc)

  val result = sharedFlights.select(
      $"passenger1Data.passengerId".as("Passenger 1 ID"), 
      $"passenger2Data.passengerId".as("Passenger 2 ID"),
      $"Number of Flights Together", lit(from).as("From"), lit(to).as("To"))
    .as[(Int, Int, Long, Date, Date)]
  result
}

defined [32mfunction[39m [36msearchSharedFlights[39m

In [30]:
val fromDate = Date.valueOf("2017-01-01")
val toDate = Date.valueOf("2017-12-31")

val flightsTogether = searchSharedFlights(10, fromDate, toDate)

[36mfromDate[39m: [32mDate[39m = 2017-01-01
[36mtoDate[39m: [32mDate[39m = 2017-12-31
[36mflightsTogether[39m: [32mDataset[39m[([32mInt[39m, [32mInt[39m, [32mLong[39m, [32mDate[39m, [32mDate[39m)] = [Passenger 1 ID: int, Passenger 2 ID: int ... 3 more fields]

In [31]:
flightsTogether.show()

+--------------+--------------+--------------------------+----------+----------+
|Passenger 1 ID|Passenger 2 ID|Number of Flights Together|      From|        To|
+--------------+--------------+--------------------------+----------+----------+
|           701|           760|                        15|2017-01-01|2017-12-31|
|          3503|          3590|                        14|2017-01-01|2017-12-31|
|          2717|          2759|                        14|2017-01-01|2017-12-31|
|          2939|          5490|                        13|2017-01-01|2017-12-31|
|          3278|          5423|                        12|2017-01-01|2017-12-31|
|          1208|          3093|                        12|2017-01-01|2017-12-31|
|           382|           392|                        12|2017-01-01|2017-12-31|
|           701|           763|                        12|2017-01-01|2017-12-31|
|          4316|          4373|                        12|2017-01-01|2017-12-31|
|          1337|          28

In [32]:
val questionExtra = flightsTogether.toDF()

try {
    questionExtra.coalesce(1)
      .write.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("delimiter", ",")
      .csv("questionExtra.csv")
} catch {
    case e: AnalysisException => println("File already exists.")
}

File already exists.


[36mquestionExtra[39m: [32mDataFrame[39m = [Passenger 1 ID: int, Passenger 2 ID: int ... 3 more fields]