# Анализ данных на Spark SQL

### Цель домашней работы

Научиться работать с основными операторами Spark SQL, фильтровать таблицы по разным условиям, писать вложенные запросы, объединять таблицы.

### Условия выполнения задания

Дан [csv-файл с логами отеля](data-samples/Hotel.csv).

**Максимальное количество баллов за выполнение домашней работы:** 10.

## Как выполнять задание

**Шаг 1. Создать таблицу, используя csv-файл.**

Описание таблицы logs_hotel:

|Поле|Описание|
|-|-|
|ID|Уникальный идентификатор брони|
|n_adults|Количество взрослых|
|n_children|Количество детей|
|weekend_nights|Количество забронированных ночей (суббота–воскресенье)|
|week_nights|Количество забронированных ночей (понедельник–пятница)|
|meal_plan|План питания|
|car_parking_space|Требуется ли парковка? (0 — нет, 1 — да)|
|room_type|Тип номера|
|lead_time|Количество дней между датой бронирования и датой прибытия|
|year|Год заселения|
|month|Месяц заселения|
|date|День заселения|
|market_segment|Тип бронирования (онлайн/оффлайн)|
|repeated_guest|Постоянный гость? (0 — нет, 1 — да)|
|previous_cancellations|Количество предыдущих заказов, которые были отменены клиентом до |текущего бронирования
|previous_bookings_not_canceled|Количество предыдущих заказов, не отмененных клиентом до |текущего бронирования
|avg_room_price|Средняя цена в день бронирования|
|special_requests|Общее количество специальных запросов, сделанных клиентом|
|status|Флаг, указывающий, было ли бронирование отменено или нет|
___
**Шаг 2. Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.**

Описание таблицы calendar:
|Поле|Описание|
|-|-|
|calendar_dt|Дата в формате ‘YYYY-DD-MM’|
___
**Шаг 3. Выполнить следующие запросы:**

 * Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)
 * Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.
 * Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.
 * Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).
 * Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.
 * Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.


## Формат сдачи и отправка задания

**Как отправить задание на проверку.** Загрузите файл в GitHub, в форму приложите ссылку на него. Назовите файл своим ФИО.

**Что нужно отправить:** ссылку на репозиторий, в котором будет ноутбук Jupyter с решением.

# Место, где нужно выполнить задание


### Подготовительная часть

In [1]:
# Импорт необходимых библиотек и модулей

import findspark
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import (col,
                                   concat_ws,
                                   lpad,
                                   explode,
                                   to_date,
                                   date_format,
                                   expr,
                                   lit,
                                   sequence,
                                   sum,
                                   avg,
                                   round,
                                   count,
                                   when,
                                   first,
                                   split)


In [2]:
# Подключение зависимостей ОС для Spark

findspark.init()


In [3]:
# Создание рабочего экземпляра класса Spark Session

"""
    В Warning сообщении говорится, что у меня в ОС нарочно не прописаны 
    пути виртуализации Spark и ip адрес для подключения
    (все значения стоят по упмолчанию)
"""

spark = SparkSession.builder.appName('HomeWork05').getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/14 19:27:10 WARN Utils: Your hostname, acer-wsl, resolves to a loopback address: 127.0.1.1; using 172.20.126.93 instead (on interface eth0)
25/12/14 19:27:10 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/14 19:27:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Создание рабочих таблиц из .csv файла

logs_hotel = (
    spark
    .read
    .option("header", True)
    .option("sep", ",")
    .option("inferSchema", True) 
    .csv("./data-samples/Hotel.csv")
)

calendar = spark.range(1).select(explode(sequence(
    to_date(lit("2017-01-01")),
    to_date(lit("2018-12-31")),
    expr("interval 1 day"),
)).alias("calendar_dt")).select(
    date_format("calendar_dt", "yyyy-dd-MM")
    .alias("calendar_dt")
)

In [5]:
# help(logs_hotel)


In [6]:
# Проверка созданных таблиц

display(
    logs_hotel.count(),
    logs_hotel.show(3, False)
)
display(
    calendar.count(),
    calendar.show(3, False)
)


+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

36275

None

+-----------+
|calendar_dt|
+-----------+
|2017-01-01 |
|2017-02-01 |
|2017-03-01 |
+-----------+
only showing top 3 rows


730

None

### **Запросы**

___
### 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [7]:
# Запрос 1

query1 = (
    logs_hotel
    .where(col("status") == "Not_Canceled")
    .withColumn("nights", col("weekend_nights") + col("week_nights"))
    .groupBy("year", "month", )
    .agg(round(avg("nights"), 3).alias("avg_nights"))
    .orderBy("year", "month")
)
query1.show()


+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|    7|     3.017|
|2017|    8|     2.719|
|2017|    9|     2.655|
|2017|   10|     2.703|
|2017|   11|     2.724|
|2017|   12|     3.043|
|2018|    1|     2.741|
|2018|    2|     2.689|
|2018|    3|     3.039|
|2018|    4|     2.925|
|2018|    5|     2.805|
|2018|    6|     2.597|
|2018|    7|     3.194|
|2018|    8|     3.154|
|2018|    9|     2.786|
|2018|   10|     2.891|
|2018|   11|     2.978|
|2018|   12|     3.252|
+----+-----+----------+



___
### 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.
 

In [8]:
# Запрос 2

query2 = (
    logs_hotel
    .where(col("year") == 2018)
    .groupBy("month")
    .agg(
        round(
            (count(when(col("status") == "Canceled", 1)) / count("*") * 100),
            3
        ).alias("Cancel_proportion")
    )
    .orderBy(col("Cancel_proportion").desc())
    .limit(3)
)
query2.show()


+-----+-----------------+
|month|Cancel_proportion|
+-----+-----------------+
|    8|           46.552|
|   10|           46.357|
|    9|            45.78|
+-----+-----------------+



___
### 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.
 

In [9]:
# Запрос 3

query3 = (
    logs_hotel
    .where(col("status") == "Not_Canceled")
    .groupBy("month")
    .agg(round(avg("lead_time"), 3).alias("avg_lead_time"))
    .orderBy("month")
)
query3.show()


+-----+-------------+
|month|avg_lead_time|
+-----+-------------+
|    1|       34.872|
|    2|       30.534|
|    3|       43.194|
|    4|       62.493|
|    5|       60.989|
|    6|       70.635|
|    7|       90.157|
|    8|       65.969|
|    9|       57.785|
|   10|       65.107|
|   11|       41.022|
|   12|       61.794|
+-----+-------------+



___
### 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).
 

In [10]:
# Запрос 4

query4 = (
    logs_hotel
    .where(col("status") == "Not_Canceled")
    .withColumn("nights", col("weekend_nights") + col("week_nights"))
    .withColumn("revenue", col("nights") * col("avg_room_price"))
    .groupBy("year", "month", "market_segment")
    .agg(round(avg("revenue"), 3).alias("avg_revenue"))
    .groupBy("year", "month")
    .pivot("market_segment")
    .agg(first("avg_revenue"))
    .orderBy("year", "month")
    .fillna(0)
)
query4.show()


+----+-----+--------+-------------+---------+-------+-------+
|year|month|Aviation|Complementary|Corporate|Offline| Online|
+----+-----+--------+-------------+---------+-------+-------+
|2017|    7|     0.0|       22.398|   113.75|228.947|290.558|
|2017|    8|     0.0|        0.324|  156.417|235.537| 284.21|
|2017|    9|     0.0|       16.889|  177.832|236.653|348.552|
|2017|   10|     0.0|        1.091|  180.265|223.238|311.468|
|2017|   11|     0.0|       14.813|  102.972|198.357|240.521|
|2017|   12|     0.0|         0.25|  141.106|253.864|258.925|
|2018|    1|     0.0|        2.267|   113.03|210.515|236.095|
|2018|    2|   352.0|        1.389|  115.057|251.851|238.066|
|2018|    3| 118.333|       38.174|  142.395|233.394| 301.71|
|2018|    4|  321.81|          0.0|   108.42|236.437|320.085|
|2018|    5|   262.5|          0.0|  229.499|274.545|352.335|
|2018|    6|   247.0|          0.0|   148.13|251.976|335.028|
|2018|    7|    79.0|        5.385|  146.992|310.364|390.053|
|2018|  

___
### 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.
 

In [11]:
# Запрос 5

guests = (
    logs_hotel
    .where(col("repeated_guest") == 1)
    .withColumn("nights", col("weekend_nights") + col("week_nights"))
    .withColumn("revenue", col("nights") * col("avg_room_price"))
    .groupBy("ID")
    .agg(sum("revenue").alias("guest_revenue"))
    .orderBy(col("guest_revenue").desc())
)
total_revenue = (
    guests
    .agg(sum("guest_revenue").alias("total_revenue"))
    .collect()[0]["total_revenue"]
)
query5 = (
    guests
    .withColumn("proportion",
                round(col("guest_revenue") / total_revenue * 100, 3))
    .select("ID",
            round("guest_revenue", 3).alias("guest_revenue"),"proportion")
    .limit(5)
)

query5.show()


+--------+-------------+----------+
|      ID|guest_revenue|proportion|
+--------+-------------+----------+
|INN19235|       1754.4|     1.512|
|INN05222|        690.0|     0.595|
|INN14189|        665.0|     0.573|
|INN09923|        660.0|     0.569|
|INN25479|        650.0|      0.56|
+--------+-------------+----------+



___
### 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.
 

In [12]:
# Запрос 6

capacity = 400

guests_per_order = (
    logs_hotel
    .withColumn("num_guests", col("n_adults") + col("n_children"))
    .withColumn("proportion", (col("num_guests") / capacity) * 100)
)

filling_dates = (
    guests_per_order
    .groupBy("year", "date", "month")
    .agg(
        sum("num_guests").alias("num_guests"),
        sum("proportion").alias("proportion")
    )
    .withColumn(
        "calendar_dt",
        concat_ws("-",
                  "year",
                  lpad(col("date").cast("string"), 2, "0"),
                  lpad(col("month").cast("string"), 2, "0"))
    )
    # .where(col("num_guests") > 400)
)

query6 = (
    calendar.join(filling_dates, on="calendar_dt", how="full")
    .withColumn("date_elems", split("calendar_dt", "-"))
    .withColumn("year", col("date_elems")[0].cast("int"))
    .withColumn("month", col("date_elems")[2].cast("int"))
    .withColumn("day", col("date_elems")[1].cast("int"))
    .orderBy(col("year").desc(), col("month").desc(), col("day").desc())
    .select("calendar_dt", "num_guests", "proportion")
)

query6.show()


+-----------+----------+----------+
|calendar_dt|num_guests|proportion|
+-----------+----------+----------+
| 2018-31-12|        89|     22.25|
| 2018-30-12|       212|      53.0|
| 2018-29-12|       216|      54.0|
| 2018-28-12|       183|     45.75|
| 2018-27-12|       325|     81.25|
| 2018-26-12|       236|      59.0|
| 2018-25-12|       114|      28.5|
| 2018-24-12|       119|     29.75|
| 2018-23-12|       152|      38.0|
| 2018-22-12|       109|     27.25|
| 2018-21-12|       106|      26.5|
| 2018-20-12|        73|     18.25|
| 2018-19-12|        93|     23.25|
| 2018-18-12|        87|     21.75|
| 2018-17-12|        91|     22.75|
| 2018-16-12|       166|      41.5|
| 2018-15-12|        66|      16.5|
| 2018-14-12|        50|      12.5|
| 2018-13-12|        61|     15.25|
| 2018-12-12|        56|      14.0|
+-----------+----------+----------+
only showing top 20 rows
