# Below we have constants set up to help with readability.

In [1]:
val hotelCountryId = 20

hotelCountryId = 20


20

In [2]:
val hotelMarketId = 21

hotelMarketId = 21


21

In [3]:
val hotelContinentId = 19

hotelContinentId = 19


19

In [4]:
val srchAdultsCntId = 14

srchAdultsCntId = 14


14

# This is a helper lambda that extracts only the necessary fields from our csv file.

#### We will be grouping by:
* hotel country
* hotel market
* hotel continent

#### And sorting by:
* search adults count.

In [5]:
val extractNecessaryFields : String => (String, String, String, Int) = (line: String) => {
    val splitLine = line.split(",")
    val hotelCountry = splitLine(hotelCountryId)
    val hotelMarket = splitLine(hotelMarketId)
    val hotelContinent = splitLine(hotelContinentId)
    val srchAdultsCnt = splitLine(srchAdultsCntId)
    val srchAdultsCntInt = Integer.parseInt(srchAdultsCnt)
    (hotelCountry, hotelMarket, hotelContinent, srchAdultsCntInt)
}

extractNecessaryFields = > (String, String, String, Int) = <function1>


> (String, String, String, Int) = <function1>

# Below we set up the config and the context of our application.

In [6]:
import org.apache.spark.SparkConf
val sparkConf = new SparkConf().setAppName("task1").setMaster("local")

sparkConf = org.apache.spark.SparkConf@325c09e0


org.apache.spark.SparkConf@325c09e0

In [7]:
import org.apache.spark.SparkContext
val sc = new SparkContext(sparkConf)

sc = org.apache.spark.SparkContext@245f2528


org.apache.spark.SparkContext@245f2528

# Here I create an RDD from the train.csv file.

In [8]:
val data = sc.textFile("train.csv")

data = train.csv MapPartitionsRDD[1] at textFile at <console>:31


train.csv MapPartitionsRDD[1] at textFile at <console>:31

## The main function performs the following steps:
* Skips the header
* Extracts the fields that we need to use in our query
* Filters out non-couples
* Groups everything by hotel country, hotel market, hotel continent
* Sorts everything by the number of group repetitions in descending order
* Leaves only top 3 results
* Prints everything to the screen

In [9]:
val header = data.first() // header
val result = data.filter(row => row != header) // skip header
    .map(extractNecessaryFields) // extract necessary fields
    .filter(_._4 == 2) // only choose couples
    .groupBy(row => (row._1, row._2, row._3)) // group by hotel country, hotel market and hotel continent
    .mapValues(_.size) // transform Iterable[(String,String,String,Int)] into Iterable[Int]
    .sortBy(kv => kv._2, false) // sort by the number of people in descending order
    .take(3) // leave only top 3 results

header = date_time,site_name,posa_continent,user_location_country,user_location_region,user_location_city,orig_destination_distance,user_id,is_mobile,is_package,channel,srch_ci,srch_co,srch_adults_cnt,srch_children_cnt,srch_rm_cnt,srch_destination_id,srch_destination_type_id,is_booking,cnt,hotel_continent,hotel_country,hotel_market,hotel_cluster
result = Array(((2,50,1),1277716), ((2,50,2),275737), ((4,8,1),141535))


Array(((2,50,1),1277716), ((2,50,2),275737), ((4,8,1),141535))

In [10]:
sc.stop()