# Third exercise: plot the monthly number of searches for flights arriving at Málaga, Madrid or Barcelona

For the arriving airport, you can use the Destination column in the searches file. Plot a curve for Málaga, another one for Madrid, and another one for Barcelona, in the same figure. Bonus point: Solving this problem using pandas (instead of any other approach) 

In [126]:
val searches = sc.textFile("searches.csv.bz2")

// create an array of String for each line of RDD  cleandata: RDD[Array(String)]
val cleandata = searches.map(line => line.split("\\^").map(_.trim))  

// remove the first line (header) of RDD
val firstRow = cleandata.first
val data = cleandata.filter( line => !line.contains(firstRow(0)) ) 
data.count

20390198

In [130]:
val numberFields = firstRow.length   // number of columns excpected for each line
val filteredData = data.filter( line => line.length >= numberFields)  // remove all the empty or not complete lines
filteredData.first

Array(2013-01-01, 20:25:57, MPT, 624d8c3ac0b3a7ca03e3c167e0f48327, DE, TXL, AUH, 1, 2, TXL, AUH, 2013-01-26, D2, "", AUH, TXL, 2013-02-02, D2, "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", 1ASIWS, 0, 0, 0, d41d8cd98f00b204e9800998ecf8427e, FRA)

In [146]:
val destination = firstRow.indexOf("Destination") // index of destination airport column in csv data file
val date = firstRow.indexOf("Date")           // index of date column in csv data file

// create a Key-Value RDD with Key=destination airport and Value=date
val keyValRDD = filteredData.map( x => (x(destination), x(date) ) ) 
keyValRDD.first

// quick test to run an action on the Key-Value RDD filtered by Key==Madrid
val testSearches = keyValRDD.filter( kv => kv._1 == "MAD" )
testSearches.first

(MAD,2013-01-01)

In [183]:
import org.apache.spark.rdd.RDD

// function to count in the Key-Value RDD the number of date (Value) in the same month
def getSearchesPerMonth( searches: RDD[(String, java.util.Date)], month: Int) : Long =
{
    val filteredSearches = searches.filter( kv=> kv._2.getMonth() == month )
    return filteredSearches.count
}


// funtion to select from the Key-Value RDD only the pair with the Key = destination airport
// it also converts the Value String to a date format
def getSearchesForAirport( searches: RDD[(String, String)], airport: String) : RDD[(String, java.util.Date)] =
{
    // slect only tha Key = airport 
    val searchesAirport = searches.filter( kv => kv._1 == airport )
    
    // convert the Value String to a Value date format (java.util.Date)
    val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
    val returnValue = searchesAirport.mapValues( v => format.parse( v ));
    
    return returnValue
}


// month array
val monthNumberToName = Array( "Jan","Feb", "Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec")

// airport codes
val Madrid = "MAD"
val Barcellona = "BCN"
val Malaga = "AGP"

// select the searches for each airport
val madridSearches = getSearchesForAirport(keyValRDD, Madrid)
val barcellonaSearches = getSearchesForAirport(keyValRDD, Barcellona )
val malagaSearches = getSearchesForAirport( keyValRDD, Malaga )


//allocate the 12 size arrays to store the motnhly searches [(month, search counts)]
var malagaMonthlySearches = new Array[(String,Long)]( monthNumberToName.length )
var madridMonthlySearches = new Array[(String,Long)]( monthNumberToName.length )
var barcellonaMonthlySearches = new Array[(String,Long)]( monthNumberToName.length )


// fill the arrays with the searches count for each month

for( month <- 0 until monthNumberToName.length )
{
    malagaMonthlySearches(month) = ( monthNumberToName(month), getSearchesPerMonth(malagaSearches, month) )
    madridMonthlySearches(month) = ( monthNumberToName(month),  getSearchesPerMonth(madridSearches, month) )
    barcellonaMonthlySearches(month) = ( monthNumberToName(month), getSearchesPerMonth(barcellonaSearches, month) ) 
}



There are no built-in plot functions available in Scala and Spark
Some external library must be used like ScalaPlot ,Breeze ..
No time to bind these libraries and I just print a tab with the results


In [186]:

println( "Results: \nSearches for arriving flights\n")
println( "Month\tMalaga\tMadrid\tBarcellona")


for( month <- 0 until monthNumberToName.length )
{
    println( monthNumberToName(month) 
    + "\t" + malagaMonthlySearches( month)._2 
    + "\t" + madridMonthlySearches( month)._2
    + "\t" + barcellonaMonthlySearches( month) ._2
    )
}

Results: 
Searches for arriving flights

Month	Malaga	Madrid	Barcellona
Jan	9633	24258	29469
Feb	8379	22800	28329
Mar	10659	24681	30552
Apr	8265	25251	31236
May	10830	26334	28728
Jun	7923	22800	26505
Jul	8892	22971	29241
Aug	7866	21831	27075
Sep	8151	21147	23427
Oct	6499	22294	20276
Nov	6384	20272	19824
Dec	3696	14504	15400


The Scala shell complains about Suppressing empty output ''

[WARN] o.a.t.k.p.v.s.KernelOutputStream - Suppressing empty output: '\r\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\s\r'


That's a warning and I am not sure why it complains and I was not able to fix it.

It's also not clear why Jupyter Syntax highlighting doesn't work on these last cells with functions and code.
