# **HOLIDAY PLANNER**
By:
* Jorge Luis Perez Tortosa
* Özgür Sahin
* Bernhard Wieland

## **GENERAL INFORMATION**

### Application for -> Trip planning

Finds best:
- Flight <br>
- 3 x hotels <br>
- Monuments <br>

for selected **country*** and next **4 days*** departing from Vienna.

*The amount of countries and days is easily adaptable. It is restricted because of limited amount of free API requests.

### ARCHITECTURE OVERVIEW

![title](img/Architecture.png)

### DATA SOURCES:

- **Flight data:** it is received from the Skyscanner44 API in rapidapi.com. The API returns already the best flight by own criteria.
https://rapidapi.com/3b-data-3b-data-default/api/skyscanner44

- **Hotel data:** it is received from the Booking API in rapidapi.com
https://rapidapi.com/tipsters/api/booking-com/

- **Monument data:** data comes from a manually created CSV based on the book: 1000 Places to see before you die.

- **Country data:** dataset from Kaggle with country information (capital, location...). Enhanced with booking_id (for booking API requests) and airport_code (for skyScanner API requests).

### DATA STRUCTURE (after processing):
- **Flight (x1 per date/city):**<br>
each entry is the best flight for a destination for a certain date and in a certain import.<br>
<ins>f_id:</ins> unique identifier assigned by DB.<br>
<ins>date:</ins> departure date.<br>
<ins>time:</ins> departure time.<br>
<ins>arrival_time:</ins> arrival time.<br>
<ins>origin:</ins> start city for the trip. In this project, always Vienna.<br>
<ins>destination:</ins> final destination of the trip.<br>
<ins>company:</ins> carrier operating the flight.<br>
<ins>n_connections:</ins> number of connections needed for arriving the destination.<br>
<ins>import_date:</ins> when was the data imported (used for filtering to most recent in Streamlit).

- **Hotel (x3 per date/city)**: <br>
each entry is one of the 3 best hotels in a city for a certain date and in a certain import.<br>
<ins>h_id:</ins> unique identifier assigned by DB.<br>
<ins>checkin_date:</ins> arrival date in hotel.<br>
<ins>city:</ins> city where hotel is located.<br>
<ins>name:</ins> name of the hotel.<br>
<ins>price:</ins> daily price.<br>
<ins>longitude:</ins> longitude coordinate of the hotel.<br>
<ins>latitude:</ins> latitude coordinate of the hotel.<br>
<ins>import_date:</ins> when was the data imported (used for filtering to most recent in Streamlit).<br>

- **Monument**: <br>
each entry represents one of the monuments in the book.<br>
<ins> m_id:</ins> unique identifier assigned by DB.<br>
<ins> name:</ins> name of the monument. <br>
<ins> country:</ins> country where it is located. <br>
<ins> longitude:</ins> longitude coordinate of the monument. <br>
<ins> latitude:</ins> latitude coordinate of the monument. <br>

- **Country**: <br>
each city represents a country and its basic information. This data serves as a lookup table for API requests or Streamlit.<br>
<ins> c_id:</ins> unique identifier assigned by DB. <br>
<ins> capital:</ins> name of the country. <br>
<ins> country:</ins> capital of the country <br>
<ins> dest_id_booking:</ins> ID of the capital in the booking portal.
<ins> airport_code:</ins> code of the airport in the capital.

### Current version GitHub:
https://github.com/ds22m013/DSI_Project

## **PROJECT CODE**

In this part, the code of the project will be described with a step-by-step description on how to run it.

### **MODULES**:

In order to group the code thematically and not overload the docu, we have distributed key parts of the program in separate modules.
Their contents are:

#### **Key modules:**
* **API_Requests:** contains the functions in charge of sending the API requests (separately) and parsing the responses. They return a pandasDF with the relevant response data.
* **Kafka_Communication:** here are two functions included. One in charge of writing in a topic and the other one in charge of reading from a topic.
* **transformationBooking:** here are the functions needed for transforming the booking data. Transformation description: <br>
Transformation is done with SparkSQL. <br>
The data of many hotels for different dates is read as input into a SparkDF. <br>
It creates a rating for each date and city consisting of: 70% price and 30% rating. Then selects the best three ranked hotels for each day and location.<br>
Returns the data converted in a pandasDF.<br>
* **transformationSkyscanner:** here are the functions used for transforming the skyscanner data. Transformation description: <br>
Transformation is done using MapReduce.<br>
The data of one flight is read into a SparkDF and the columns duration and time are loaded into an RDD.<br>
The arrival_date is calculated using a function which takes the duration and time as arguments.<br>
Resulting data is appended to pandasDF and returned.<br>
* **Places1000:** streamlit script.

#### **Support modules:**
* **config:** different variables needed throughout the program and subject to change are saved here.
* **CSVLoader:** script used to create the database tables, read the monuments and capitalsCSV and load them in the database.
* **DB_Scripts:** different functions in relation with the database. Retrieve and insert data, or create tables.

### **STEP-BY-STEP**

#### **1) SETUP BASIC INFRASTRUCTURE**

* **Run Docker**: docker compose -f docker-compose.yaml up -d
* **Create DB Basics**: run CSVLoader.create_start_DB()
* **API Register**: register in the APIs and get the key.
* **Config**: adapt the values in the config.py module to your personal setup.

#### **2) SERVER 1: CREATE REQUESTS + PRODUCE IN KAFKA**

This part is a loop which every hour:
- **Skyscanner**:
    * Sends API Request for every country in DB and days specified in config.
    * Writes response in Kafka topic.
- **Booking**:
    * Sends API Request for every country in DB and days specified in config.
    * Writes response in Kafka topic.

**Key modules involved:** API_Request, Kafka_Communication.

In [None]:
## IMPORTS
import API_Requests
import DB_Scripts
import Kafka_Communication
import transformationBooking
import transformationSkyscanner
from datetime import date, timedelta, datetime
import pandas as pd
import config
from time import sleep
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings("ignore")

In [None]:
## SERVER 1) REQUESTS + PRODUCE IN KAFKA

refDate = date.today() + timedelta(days=1)
countryData = DB_Scripts.get_country_data()
dayData = [refDate + timedelta(days=i) for i in range(1, config.days_to_add)]

try:
    while True:
        for index, row in countryData.iterrows():
            for day in dayData:
                ts1 = day.strftime("%Y-%m-%d")
                try:
                    bookingData = API_Requests.get_BookingAPI_response(str(int(row["dest_id_booking"])), ts1)
                    Kafka_Communication.kafka_send(bookingData, config.kafka_topic_booking)
                except KeyError:
                    pass
                try:
                    flightData = API_Requests.get_SkyScannerAPI_response(row[2], row[1], ts1)
                    Kafka_Communication.kafka_send(flightData, config.kafka_topic_skyscanner)
                except KeyError:
                    pass
        print("Loading complete") 
        sleep(3600)
except KeyboardInterrupt:
    print("Loading stopped")

#### **3) SERVER 2: CONSUME KAFKA + WRITE IN DB**
This part is a loop which every hour:

- **Skyscanner**:
    * Reads all the entries in the Kafka topic.
    * Does a transformation using RDD.
    * Writes result in PostgreSQL.
- **Booking**:
    * Reads all the entries in the Kafka topic.
    * Does a transformation using SparkSQL.
    * Writes result in PostgreSQL.
    
**Key modules involved:** Kafka_Communication, transformationBooking, transformationSkyscanner.

In [None]:
## SERVER 2) CONSUME KAFKA -> SPARK (MANIPULATE) -> SAVE IN DB

spark = SparkSession \
    .builder \
    .master(config.spark_master) \
    .appName(config.spark_app_name) \
    .getOrCreate()

sc = spark.sparkContext

try:
    while True:
        bookingDF = Kafka_Communication.kafka_receive(config.kafka_topic_booking)
        if (bookingDF.empty == False):
            bookingDF = transformationBooking.transform_booking_inSpark(spark, bookingDF)
            DB_Scripts.send_booking_to_DB(bookingDF)
        flightsDF = Kafka_Communication.kafka_receive(config.kafka_topic_skyscanner)
        if (flightsDF.empty == False):
            flightsDF = transformationSkyscanner.transform_Skyscanner_RDD(spark, flightsDF)
            DB_Scripts.send_skyscanner_to_DB(flightsDF)
        print("Loading complete")
        sleep(3600)
except KeyboardInterrupt:
    print("Loading stopped")

#### **4) RUN STREAMLIT**
Use in the terminal in this folder: streamlit run Places1000.py<br>

The applications welcomes the user.<br>
The user selects the country where he wants to travel in the selectbox of the left side.<br>
-> User gets flight information for the next 4 days.

![title](img/Streamlit1.png)

The user selects one of the four days in the selectbox on the left side.<br>
-> User gets displayed in a map the hotels and monument locations (with labels).<br>

![title](img/Streamlit2.png)

-> User gets an overview of the trip he selected (flight info, best three hotels and monument names).<br>

![title](img/Streamlit3.png)

## REQUIREMENTS OVERVIEW

* **Connection to multiple data sources**: YES. We use 2 APIs (Skyscanner, Booking) and 2 CSVs (monuments, country).
* **Data storage**: YES. We use a PostgreSQL database to save our data.
* **Kafka**: YES. We produce and consume data from 2 topics (one per API).
* **MapReduce with Spark**: YES. We use it for transforming the Skyscanner data.
* **SparkSQL and SparkDF**: YES. We use it for transforming the Booking data.
* **Visualisation**: YES. Covered in Streamlit with a chart and a map.
* **Documentation**: YES. You are reading it.
* **GIT**: YES. Link above.
* **NoSQL DB**: YES. ElasticSearch: in separate file.
* **Presentation, Punctuality**: YES. All the requirements were delivered before due date. Presentation was held.