In [None]:
# Spark Jupyter Homework

This application uses spark to get needed data from the hotels dataset.
Class Hotel is representing hotel.
You should provide a csv file with hotels information to proceed.

## Getting Started

Please run this notebook to calculate the results

## Methods

```
def hotelRDD(context: SparkContext, csvPath: String): RDD[Hotel]
```
* Method to create dataset of Hotels from the csv file
* @param context: Spark Context
* @param csvPath: Path to the csv file
* @throws InvalidInputException if path to the csv file is not correct
* @return: Resilient Distributed Dataset (RDD) of Hotels

```
def get3MostPopularCoupleHotels(hotelRDD: RDD[Hotel]): Array[((Int, Int, Int), Int)]
```
* Method to find top 3 most popular hotels between couples
* @param hotelRDD: Resilient Distributed Dataset (RDD) of Hotels to proceed
* @return Array[((continent, country, market), number)]: Array of hotels and their number in the dataset.
* Hotels are treated as composite keys of continent, country and market.

In [4]:
object Hotel {

  /**
    * Getting integer value in any case
    * @param array Array of strings
    * @param index Array index
    * @return converted from string integer value
    *         In case of any Exception method returns 0
    */
  private def getAsInt(array: Array[String], index: Int): Int = {
    try {
      array(index).toInt
    } catch {
      case e: Exception => 0
    }
  }

  /**
    * Create hotel object from CSV-like string
    * @param row String contains comma-separated values
    * @return Object of Hotel Class
    */
  def createHotel(row: String): Hotel = {

    val hotelElements: Array[String] = row.split(",")

    new Hotel(
      getAsInt(hotelElements,3),
      getAsInt(hotelElements,13),
      getAsInt(hotelElements,14),
      getAsInt(hotelElements,16),
      getAsInt(hotelElements,18),
      getAsInt(hotelElements,20),
      getAsInt(hotelElements,21),
      getAsInt(hotelElements,22),
      getAsInt(hotelElements,23)
    )
  }
}

  /**
    * Class representing Hotel
    */
case class Hotel(user_location_country: Int,
                 srch_adults_cnt: Int,
                 srch_children_cnt: Int,
                 srch_destination_id: Int,
                 is_booking: Int,
                 hotel_continent: Int,
                 hotel_country: Int,
                 hotel_market: Int,
                 hotel_cluster: Int)

defined object Hotel
defined class Hotel


In [21]:
//Create dataset of Hotels from the csv file
val hotelRDD = sc
                .textFile("train.csv")
                .filter(row => !row.startsWith("date_time,site_name,posa_continent,user_location_country"))
                .map(Hotel.createHotel)

//Finding top 3 most popular hotels between couples
//Valiable returns array of hotels and their number in the dataset
//Hotels are treated as composite keys of continent, country and market.
val get3MostPopularCoupleHotels = hotelRDD
      .filter(hotel => hotel.srch_adults_cnt == 2)
      .map(hotel => ((hotel.hotel_continent, hotel.hotel_country, hotel.hotel_market), 1))
      .reduceByKey((x, y) => x + y)
      .sortBy(_._2, ascending = false)
      .take(3)

hotelRDD = MapPartitionsRDD[31] at map at <console>:39
get3MostPopularCoupleHotels = Array(((2,50,628),1190143), ((2,50,675),1007502), ((4,8,110),588213))


Array(((2,50,628),1190143), ((2,50,675),1007502), ((4,8,110),588213))

In [22]:
//printing results
println("Top 3 most popular hotels between couples:")
get3MostPopularCoupleHotels.foreach(arr => println(arr._1 ))

Top 3 most popular hotels between couples:
(2,50,628)
(2,50,675)
(4,8,110)
