## RDD

In [2]:
import org.apache.spark.storage.StorageLevel._

import org.apache.spark.storage.StorageLevel._


## Read Files

In [3]:
val root_path: String = "./";//where to find files
val searches_path: String = root_path + "searches_sample.csv.bz2";
val bookings_path: String = root_path + "bookings_sample.csv.bz2";

root_path: String = ./
searches_path: String = ./searches_sample.csv.bz2
bookings_path: String = ./bookings_sample.csv.bz2


In [4]:
val searches = sc.textFile(searches_path)//Load RDD 1
val bookings = sc.textFile(bookings_path)//Load RDD 2

searches: org.apache.spark.rdd.RDD[String] = ./searches_sample.csv.bz2 MapPartitionsRDD[1] at textFile at <console>:35
bookings: org.apache.spark.rdd.RDD[String] = ./bookings_sample.csv.bz2 MapPartitionsRDD[3] at textFile at <console>:36


In [6]:
val headerColumns_searches = searches.first().split("\\^").to[List]
val head_searches = sc.textFile(searches_path).first()

headerColumns_searches: List[String] = List(Date, Time, TxnCode, OfficeID, Country, Origin, Destination, RoundTrip, NbSegments, Seg1Departure, Seg1Arrival, Seg1Date, Seg1Carrier, Seg1BookingCode, Seg2Departure, Seg2Arrival, Seg2Date, Seg2Carrier, Seg2BookingCode, Seg3Departure, Seg3Arrival, Seg3Date, Seg3Carrier, Seg3BookingCode, Seg4Departure, Seg4Arrival, Seg4Date, Seg4Carrier, Seg4BookingCode, Seg5Departure, Seg5Arrival, Seg5Date, Seg5Carrier, Seg5BookingCode, Seg6Departure, Seg6Arrival, Seg6Date, Seg6Carrier, Seg6BookingCode, From, IsPublishedForNeg, IsFromInternet, IsFromVista, TerminalID, InternetOffice)
head_searches: String = Date^Time^TxnCode^OfficeID^Country^Origin^Destination^RoundTrip^NbSegments^Seg1Departure^Seg1Arrival^Seg1Date^Seg1Carrier^Seg1BookingCode^Seg2Departure^Se...

In [13]:
val headerColumns_bookings = bookings.first().split("\\^").to[List]
val head_bookings = sc.textFile(bookings_path).first()
head_bookings

headerColumns_bookings: List[String] = List("act_date           ", source, pos_ctry, pos_iata, "pos_oid  ", "rloc          ", "cre_date           ", duration, distance, dep_port, dep_city, dep_ctry, arr_port, arr_city, arr_ctry, lst_port, lst_city, lst_ctry, brd_port, brd_city, brd_ctry, off_port, off_city, off_ctry, mkt_port, mkt_city, mkt_ctry, intl, "route          ", carrier, bkg_class, cab_class, "brd_time           ", "off_time           ", pax, year, month, "oid      ")
head_bookings: String = "act_date           ^source^pos_ctry^pos_iata^pos_oid  ^rloc          ^cre_date           ^duration^distance^dep_port^dep_city^dep_ctry^arr_port^arr_city^arr_ctry^lst_port^lst_city^lst_ctry^brd_port^brd_city^brd_ctry^off_port^off_city^off_ctry^mkt_port^mkt_city^mkt_ctry^intl^route         ...

In [17]:
val searches_count = searches.count()

searches_count: Long = 10000


In [18]:
val bookings_count = bookings.count()

bookings_count: Long = 10000


## 2) Top 10 arrival airports in the world in 2013 (using the bookings file)

In [19]:
/* Lets define a function to take each line and return what we want
 * In particular, we want the column 12 with arr_port and 34 with pax
 * VIP: to handle the wrong lines reading (ex. 14 fields) we need to cover that exception, 
 * otherwise it would corrupt the RDD and give many headaches...
 */
def parseLineBookings(line:String) = {
    val fields   = line.split("\\^")
    //check if the line is complete..
    if (fields.length>34){
    val arr_port = fields(12).trim
    val paxstring = fields(34).trim
    if (arr_port=="" || paxstring=="") ("KKK","0") else
    (arr_port, paxstring)
    }
    else ("KKK","0")
}

parseLineBookings: (line: String)(String, String)


In [20]:
//Load the pair RDD from bookings with arr_port and pax
val pair_rdd = bookings.map(parseLineBookings).persist(MEMORY_AND_DISK)

pair_rdd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[16] at map at <console>:39


In [21]:
pair_rdd.take(10)//test

res6: Array[(String, String)] = Array((arr_port,pax), (LHR,-1), (CLT,1), (CLT,1), (SVO,1), (SVO,1), (LGA,1), (LGA,1), (SIN,2), (SIN,2))


In [22]:
val kk = pair_rdd.filter(_._1 != "arr_port" ).persist(MEMORY_AND_DISK).filter(_._1 == "KKK").collect()//testing filtersval kk = pair_rdd.filter(_._1 != "arr_port" ).persist(MEMORY_AND_DISK).filter(_._1 == "KKK").collect()//testing filters

kk: Array[(String, String)] = Array()


In [23]:
val pair_rdd_good = pair_rdd.filter(_._1 != "arr_port" ).filter(_._1 != "KKK").persist(MEMORY_AND_DISK)//now create filtered pairRDD

pair_rdd_good: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[20] at filter at <console>:40


In [24]:
//Define a conversion to integer with exception handling. 
//This is important to avoid NaN when trying to convert your strings to integers
def toInt(s: String): Option[Int] = {
  try {
    Some(s.toInt)
  } catch {
    case e: Exception => None
  }
}

toInt: (s: String)Option[Int]


In [25]:
//We map with some care using match to handle the exceptions. If we find a None we will write 0 pax.
//Make sure wee have integers
val pair_bookings = pair_rdd_good.map(x=> {
      val tst = toInt(x._2)
      tst  match {
      case None => (x._1, 0)
      case Some(y) => (x._1, y) }
}).persist(MEMORY_AND_DISK)

pair_bookings: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at map at <console>:46


In [26]:
pair_bookings.take(10)//test

res7: Array[(String, Int)] = Array((LHR,-1), (CLT,1), (CLT,1), (SVO,1), (SVO,1), (LGA,1), (LGA,1), (SIN,2), (SIN,2), (SIN,2))


In [27]:
//reduceByKey will groupByKey and then do the required operation on the values!
val temp = pair_bookings.reduceByKey(_+_)

temp: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[22] at reduceByKey at <console>:47


In [28]:
temp.take(5)//test

res8: Array[(String, Int)] = Array((BEG,1), (SKP,4), (AYT,-6), (THR,4), (LOS,2))


In [29]:
//In order to use the efficient transformation sortByKey we need to swap Keys and Values first.
//val top10 = temp.map(x => (x._2,x._1)).sortByKey(false).take(10)
val top10 = temp.map(_.swap).sortByKey(false).take(10)//swap does the trick

top10: Array[(Int, String)] = Array((112,HKG), (95,LGA), (94,ORD), (92,JFK), (91,SFO), (91,LAX), (90,MCO), (82,DCA), (79,DEN), (76,LHR))


In [30]:
top10.foreach(e => println(e))

(112,HKG)
(95,LGA)
(94,ORD)
(92,JFK)
(91,SFO)
(91,LAX)
(90,MCO)
(82,DCA)
(79,DEN)
(76,LHR)
