# Подключаемся к Spark

In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .appName("Test_01")\
    .master("local")\
    .getOrCreate()

spark

In [3]:
from pyspark.sql import functions as f

# Шаг 1. Создать таблицу logs_hotel, используя csv-файл.

Читаем csv-файл и создаем датафрейм

In [4]:
logs_hotel = (
    spark.read
    .option("header", True)
    .option("sep", ",")
    .option("inferSchema", True)
    .csv("Hotel.csv")
)

Проверим первые 5 строк

In [5]:
logs_hotel.show(5, False)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

Проверим структуру

In [6]:
logs_hotel.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



# Шаг 2. Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.

Создаем датафрейм-календарь с днями 2017-01-01 - 2018-12-31 в формате ‘YYYY-DD-MM’ по условию

In [7]:
from datetime import date, timedelta

calendar_data = [
    ((date(2017, 1, 1) + timedelta(days=i)).strftime('%Y-%d-%m'),) for i in range((date(2018, 12, 31) - date(2017, 1, 1)).days + 1)
]

column = ["calendar_dt"]

calendar = spark.createDataFrame(calendar_data, column)

Проверим датафрейм

In [8]:
calendar.show(730)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-02-01|
| 2017-03-01|
| 2017-04-01|
| 2017-05-01|
| 2017-06-01|
| 2017-07-01|
| 2017-08-01|
| 2017-09-01|
| 2017-10-01|
| 2017-11-01|
| 2017-12-01|
| 2017-13-01|
| 2017-14-01|
| 2017-15-01|
| 2017-16-01|
| 2017-17-01|
| 2017-18-01|
| 2017-19-01|
| 2017-20-01|
| 2017-21-01|
| 2017-22-01|
| 2017-23-01|
| 2017-24-01|
| 2017-25-01|
| 2017-26-01|
| 2017-27-01|
| 2017-28-01|
| 2017-29-01|
| 2017-30-01|
| 2017-31-01|
| 2017-01-02|
| 2017-02-02|
| 2017-03-02|
| 2017-04-02|
| 2017-05-02|
| 2017-06-02|
| 2017-07-02|
| 2017-08-02|
| 2017-09-02|
| 2017-10-02|
| 2017-11-02|
| 2017-12-02|
| 2017-13-02|
| 2017-14-02|
| 2017-15-02|
| 2017-16-02|
| 2017-17-02|
| 2017-18-02|
| 2017-19-02|
| 2017-20-02|
| 2017-21-02|
| 2017-22-02|
| 2017-23-02|
| 2017-24-02|
| 2017-25-02|
| 2017-26-02|
| 2017-27-02|
| 2017-28-02|
| 2017-01-03|
| 2017-02-03|
| 2017-03-03|
| 2017-04-03|
| 2017-05-03|
| 2017-06-03|
| 2017-07-03|
| 2017-08-03|
| 2017-09-03|
| 2017

# Шаг 3. Выполнить следующие запросы:

## Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

Применим фильтр по статусу бронирований - не отмененные.  

Используем группировку по месяцам и годам для детализации.   

Выполняем агрегацию - вычисляяем среднее кол-во ночей (в будние и выходные дни) и округлим до сотых для удобства, назовем эту колонку "average_nights".   

Сортируем результат по возрастанию года и месяца для наглядности.   

Выводим результат.  

In [9]:
logs_hotel\
    .filter(f.col("status") == "Not_Canceled")\
    .groupBy("year", "month")\
    .agg(f.round(
        f.avg(f.col("weekend_nights") + f.col("week_nights")), 2).alias("average_nights")
    )\
    .orderBy(f.asc("year"), f.asc(f.col("month")))\
    .select("average_nights", "month", "year")\
    .show()

+--------------+-----+----+
|average_nights|month|year|
+--------------+-----+----+
|          3.02|    7|2017|
|          2.72|    8|2017|
|          2.66|    9|2017|
|           2.7|   10|2017|
|          2.72|   11|2017|
|          3.04|   12|2017|
|          2.74|    1|2018|
|          2.69|    2|2018|
|          3.04|    3|2018|
|          2.92|    4|2018|
|          2.81|    5|2018|
|           2.6|    6|2018|
|          3.19|    7|2018|
|          3.15|    8|2018|
|          2.79|    9|2018|
|          2.89|   10|2018|
|          2.98|   11|2018|
|          3.25|   12|2018|
+--------------+-----+----+



## Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

Применим фильтр по году - 2018.  

Используем группировку по месяцам и годам для детализации.   

Выполняем агрегацию - вычисляяем общее число бронирований "count_total" и число отмененных бронирований "count_cancelled".   

Вычисляем процент отмененных бронирований "cancel_perc" и округляем до 2 знаков после точки для "красивого" вывода.

Сортируем результат по убыванию "cancel_perc".

Оставляем ТОП-3.

Выводим результат.  

In [10]:
logs_hotel\
    .filter(f.col("year") == 2018)\
    .groupBy("year", "month")\
    .agg(
        f.count("*").alias("count_total"),
        f.sum(f.when(f.col("status") == "Canceled", 1).otherwise(0)).alias("count_canceled")
    )\
    .withColumn("cancel_perc", f.round(f.col("count_canceled") * 100 / f.col("count_total"), 2))\
    .orderBy(f.desc("cancel_perc"))\
    .limit(3)\
    .select("year", "month", "cancel_perc")\
    .show()

+----+-----+-----------+
|year|month|cancel_perc|
+----+-----+-----------+
|2018|    8|      46.55|
|2018|   10|      46.36|
|2018|    9|      45.78|
+----+-----+-----------+



## Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

Применим фильтр по статусу бронирования - "не отмененные".  

Используем группировку по месяцам и годам для детализации.   

Выполняем агрегацию - для каждой группы вычисляем среднее значение "lead_time" и округляем до 2 знаков после точки.   

Сортируем результат по возрастанию года и месяца.

Выводим результат.  

In [11]:
logs_hotel\
    .filter(f.col("status") == "Not_Canceled")\
    .groupBy("year", "month")\
    .agg(
        f.round(f.avg(f.col("lead_time")), 2).alias("avg_time")
    )\
    .orderBy(f.asc("year"), f.asc(f.col("month")))\
    .select("year", "month", "avg_time")\
    .show()

+----+-----+--------+
|year|month|avg_time|
+----+-----+--------+
|2017|    7|  130.73|
|2017|    8|   35.08|
|2017|    9|   51.72|
|2017|   10|   55.89|
|2017|   11|   33.28|
|2017|   12|   46.75|
|2018|    1|   34.87|
|2018|    2|   30.53|
|2018|    3|   43.19|
|2018|    4|   62.49|
|2018|    5|   60.99|
|2018|    6|   70.64|
|2018|    7|   86.88|
|2018|    8|   83.09|
|2018|    9|   63.32|
|2018|   10|   73.24|
|2018|   11|   44.25|
|2018|   12|   69.75|
+----+-----+--------+



## Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

Применим фильтр по статусу бронирования - "не отмененные".

Рассчитываем выручку для каждого бронирования.

Группируем по году, месяцу и сегменту рынка.

Выполняем агрегацию - вычисляяем для каждой группы среднюю месячную выручку и округляем до 2 знаков после точки для "красивого" вывода.

Создаем сводную таблицу PIVOT, в которой столбцы - "market_segment", а значения - средняя выручка.

Отстуствующие значения заполним нулями.

Выводим результат.  

In [12]:
logs_hotel\
    .filter(f.col("status") == "Not_Canceled")\
    .withColumn("rev", f.col("avg_room_price") * ((f.col("weekend_nights") + f.col("week_nights"))))\
    .groupBy("year", "month", "market_segment")\
    .agg(
        f.round(f.avg("rev"), 2).alias("avg_monthly_rev")
    )\
    .groupBy("year", "month")\
    .pivot("market_segment")\
    .agg(
        f.first("avg_monthly_rev")
    )\
    .orderBy("year", "month")\
    .fillna(0)\
    .show()

+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|    7|     0.0|         22.4|   113.75| 228.95|290.56|
|2017|    8|     0.0|         0.32|   156.42| 235.54|284.21|
|2017|    9|     0.0|        16.89|   177.83| 236.65|348.55|
|2017|   10|     0.0|         1.09|   180.26| 223.24|311.47|
|2017|   11|     0.0|        14.81|   102.97| 198.36|240.52|
|2017|   12|     0.0|         0.25|   141.11| 253.86|258.93|
|2018|    1|     0.0|         2.27|   113.03| 210.51|236.09|
|2018|    2|   352.0|         1.39|   115.06| 251.85|238.07|
|2018|    3|  118.33|        38.17|   142.39| 233.39|301.71|
|2018|    4|  321.81|          0.0|   108.42| 236.44|320.08|
|2018|    5|   262.5|          0.0|    229.5| 274.55|352.34|
|2018|    6|   247.0|          0.0|   148.13| 251.98|335.03|
|2018|    7|    79.0|         5.38|   146.99| 310.36|390.05|
|2018|    8|     0.0|   

## Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

Применим фильтр по статусу бронирования - "не отмененные" и отфильтруем постоянных гостей, где "repeated_guest" == 1.

Рассчитываем выручку для каждого бронирования rep_guests_rev.

Группируем по ID бронирования, предположив, что 1 бронь = 1 гость.

Вычисляем общую выручку от всех постоянных гостей total_rev.

Для ТОП-5 гостей с наибольшей выручкой top_5 рассчитываем доли в общей выручке в %.

Выводим результат.

In [14]:
rep_guests_rev = logs_hotel\
    .filter((f.col("status") == "Not_Canceled") & (f.col("repeated_guest") == 1))\
    .withColumn("rev", f.col("avg_room_price") * (f.col("weekend_nights") + f.col("week_nights")))\
    .groupBy("ID")\
    .agg(f.sum("rev").alias("total_rev"))

total_rev = rep_guests_rev\
    .agg(f.sum("total_rev").alias("total"))\
    .collect()[0]["total"]

top_5 = rep_guests_rev\
    .orderBy(f.col("total_rev").desc())\
    .limit(5)\
    .withColumn("share_percent", f.round(f.col("total_rev") / total_rev * 100, 2))

top_5.select(
    "ID",
    f.round("total_rev", 2).alias("revenue"),
    "share_percent"
).show()

+--------+-------+-------------+
|      ID|revenue|share_percent|
+--------+-------+-------------+
|INN19235| 1754.4|         1.55|
|INN05222|  690.0|         0.61|
|INN14189|  665.0|         0.59|
|INN09923|  660.0|         0.58|
|INN25479|  650.0|         0.57|
+--------+-------+-------------+



## Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

guests_per_day - применим фильтр по статусу бронирования - "не отмененные".

Создаем колонку с датой привыбания, используя колонки year, month, date + обрабатыаем некорректные даты.

Рассчитываем общее кол-во ночей и гостей (взрослые + дети).

Для каждого дня пребывания генерируем запись с помощью "explode(sequence())"

Группируем по дате и суммируем оббщее кол-во гостей.

Объединяем результат с ранее созданной таблицей calendar, чтобы вколючить дни без гостей.

Рассчитываем процент загрузки отеля относительно максимальной вместимости 400.

Сортируем по дате в подярке убывания.

Выводим результат.

In [24]:
guests_per_day = logs_hotel\
    .filter(f.col("status") == "Not_Canceled")\
    .withColumn("arrival_date", f.expr("try_cast(concat(year, '-', month, '-', date) as date)"))\
    .filter(f.col("arrival_date").isNotNull())\
    .withColumn("nights", f.col("weekend_nights") + f.col("week_nights"))\
    .filter(f.col("nights") > 0)\
    .withColumn("guests", f.col("n_adults") + f.col("n_children"))\
    .select(
        f.expr("explode(sequence(arrival_date, date_add(arrival_date, nights-1)))").alias("date"),
        "guests"
    )\
    .groupBy("date")\
    .agg(f.sum("guests").alias("total_guests"))

result = calendar\
    .withColumn("date", f.to_date("calendar_dt", "yyyy-dd-MM"))\
    .join(guests_per_day, "date", "left")\
    .select(
        "calendar_dt",
        f.coalesce("total_guests", f.lit(0)).alias("total_guests"),
        f.round(f.coalesce("total_guests", f.lit(0)) / 400 * 100, 2).alias("occupancy_percent")
    )\
    .orderBy("calendar_dt", ascending=False)

result.show()

+-----------+------------+-----------------+
|calendar_dt|total_guests|occupancy_percent|
+-----------+------------+-----------------+
| 2018-31-12|         562|            140.5|
| 2018-31-10|         389|            97.25|
| 2018-31-08|         345|            86.25|
| 2018-31-07|         202|             50.5|
| 2018-31-05|         302|             75.5|
| 2018-31-03|         357|            89.25|
| 2018-31-01|         145|            36.25|
| 2018-30-12|         572|            143.0|
| 2018-30-11|         138|             34.5|
| 2018-30-10|         373|            93.25|
| 2018-30-09|         211|            52.75|
| 2018-30-08|         346|             86.5|
| 2018-30-07|         201|            50.25|
| 2018-30-06|         379|            94.75|
| 2018-30-05|         251|            62.75|
| 2018-30-04|         353|            88.25|
| 2018-30-03|         350|             87.5|
| 2018-30-01|         178|             44.5|
| 2018-29-12|         542|            135.5|
| 2018-29-