# Домашнее задание 5. Анализ данных отеля на Spark

## Установка окружения

In [None]:
# Установка библиотек
!pip install pyspark==3.5.1  # для Java 11.0.29, python 3.10.6
!pip install findspark

In [None]:
# Импорт библиотек
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType
from pyspark.sql.window import Window

# Инициализация сессии Spark
spark = SparkSession.builder.appName('hotel_analysis').getOrCreate()

## Шаг 1: Загрузка данных из Hotel.csv

In [36]:
# Читаем CSV файл
df_hotel = spark.read \
    .option('header', True) \
    .option('inferSchema', True) \
    .csv('Hotel.csv')

# Проверяем структуру данных
print('Схема таблицы hotel:')
df_hotel.printSchema()

Схема таблицы hotel:
root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



## Шаг 2: Создание таблицы calendar с датами от 2017-01-01 до 2018-12-31

In [6]:
from pyspark.sql.functions import sequence, to_date, lit, explode, expr

# Создание календарной таблицы
df_calendar = spark.range(1).select(
    explode(
        sequence(
            to_date(lit('2017-01-01')),
            to_date(lit('2018-12-31')),
            expr('interval 1 day')
        )
    ).alias('calendar_dt')
)

df_calendar.show(10, False)
print(f'(Календарь содержит {df_calendar.count()} дней)')


+-----------+
|calendar_dt|
+-----------+
|2017-01-01 |
|2017-01-02 |
|2017-01-03 |
|2017-01-04 |
|2017-01-05 |
|2017-01-06 |
|2017-01-07 |
|2017-01-08 |
|2017-01-09 |
|2017-01-10 |
+-----------+
only showing top 10 rows

(Календарь содержит 730 дней)


## Запрос 1: Среднее количество ночей для подтвержденных бронирований (по месяцам и годам)

In [19]:
# Запрос 1: Среднее количество проведенных в отеле ночей по годам и месяцам, подтвуржденных бронью!
query1 = df_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .withColumn('total_nights', F.col('weekend_nights') + F.col('week_nights')) \
    .groupBy('year', 'month') \
    .agg(F.round(F.avg('total_nights'), 2).alias('average_nights')) \
    .orderBy('year', 'month')
print('Запрос 1: Среднее количество проведенных в отеле ночей по годам и месяцам\n(для подтвуржденных бронью!)')
query1.show(30, False)

Запрос 1: Среднее количество проведенных в отеле ночей по годам и месяцам
(для подтвуржденных бронью!)
+----+-----+--------------+
|year|month|average_nights|
+----+-----+--------------+
|2017|7    |3.02          |
|2017|8    |2.72          |
|2017|9    |2.66          |
|2017|10   |2.7           |
|2017|11   |2.72          |
|2017|12   |3.04          |
|2018|1    |2.74          |
|2018|2    |2.69          |
|2018|3    |3.04          |
|2018|4    |2.92          |
|2018|5    |2.81          |
|2018|6    |2.6           |
|2018|7    |3.19          |
|2018|8    |3.15          |
|2018|9    |2.79          |
|2018|10   |2.89          |
|2018|11   |2.98          |
|2018|12   |3.25          |
+----+-----+--------------+



## Запрос 2: ТОП-3 месяца по проценту отмены за 2018 год

In [14]:
# Запрос 2: ТОП-3 месяца по проценту отмененных броней за 2018 год
query2 = df_hotel \
    .filter(F.col('year') == 2018) \
    .withColumn('is_canceled', (F.col('status') == 'Canceled').cast(IntegerType())) \
    .groupBy('month') \
    .agg(
        F.sum('is_canceled').alias('canceled_count'),
        F.count('*').alias('total_count')
    ) \
    .withColumn('cancel_percent', F.round((F.col('canceled_count') / F.col('total_count')) * 100, 2)) \
    .select('month', 'canceled_count', 'total_count', 'cancel_percent') \
    .orderBy(F.desc('cancel_percent')) \
    .limit(3)

print('Запрос 2: ТОП-3 месяца по проценту отмененных броней за 2018 год')
query2.show()

Запрос 2: ТОП-3 месяца по проценту отмененных броней за 2018 год
+-----+--------------+-----------+--------------+
|month|canceled_count|total_count|cancel_percent|
+-----+--------------+-----------+--------------+
|    8|          1303|       2799|         46.55|
|   10|          1578|       3404|         46.36|
|    9|          1356|       2962|         45.78|
+-----+--------------+-----------+--------------+



## Запрос 3: Среднее время за каждый месяц между бронированием и заездом (для подтвержденных броней)

In [18]:
# Запрос 3: Среднее время за каждый месяц между бронированием и заездом (для подтвержденных броней)
query3 = df_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .groupBy('month') \
    .agg(F.round(F.avg('lead_time'), 2).alias('avg_lead_time')) \
    .orderBy('month')

print('Запрос 3: Среднее время за каждый месяц между бронированием и заездом\n(для подтвержденных броней)')
query3.show()

Запрос 3: Среднее время за каждый месяц между бронированием и заездом
(для подтвержденных броней)
+-----+-------------+
|month|avg_lead_time|
+-----+-------------+
|    1|        34.87|
|    2|        30.53|
|    3|        43.19|
|    4|        62.49|
|    5|        60.99|
|    6|        70.64|
|    7|        90.16|
|    8|        65.97|
|    9|        57.78|
|   10|        65.11|
|   11|        41.02|
|   12|        61.79|
+-----+-------------+



## Запрос 4: Средняя выручка по месяцам и годам по типам бронирования (для подтвержденных броней)

In [23]:
# Запрос 4: Средняя выручка по месяцам и годам по типам бронирования (для подтвержденных броней)
query4 = df_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .groupBy('year', 'month') \
    .pivot('market_segment') \
    .agg(F.round(F.avg('avg_room_price'), 2)) \
    .orderBy('year', 'month')

print('Запрос 4: Средняя выручка по месяцам и годам по типам бронирования\n(для подтвержденных броней)')
query4.show(50, False)

Запрос 4: Средняя выручка по месяцам и годам по типам бронирования
(для подтвержденных броней)
+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|7    |NULL    |7.47         |65.0     |88.19  |65.55 |
|2017|8    |NULL    |0.32         |66.92    |97.16  |97.86 |
|2017|9    |NULL    |7.37         |86.03    |96.8   |113.33|
|2017|10   |NULL    |0.55         |84.48    |89.08  |101.97|
|2017|11   |NULL    |4.94         |68.08    |66.18  |84.75 |
|2017|12   |NULL    |0.05         |69.03    |69.95  |87.31 |
|2018|1    |NULL    |1.73         |68.88    |73.97  |80.19 |
|2018|2    |88.0    |0.75         |74.98    |75.07  |87.5  |
|2018|3    |89.0    |16.2         |74.05    |72.64  |96.81 |
|2018|4    |97.57   |0.0          |80.0     |86.83  |105.49|
|2018|5    |101.0   |0.0          |100.9    |94.97  |123.91|
|2018|6    |95.0    |0.0          |84.83    |103.01

## Запрос 5: ТОП-5 постоянных гостей с наибольшей выручкой и их доля в общей выручке

In [25]:
# Запрос 5: ТОП-5 постоянных гостей с наибольшей выручкой и их доля в общей выручке

# выручка = avg_room_price * (weekend_nights + week_nights)
query5_base = df_hotel \
    .filter(F.col('repeated_guest') == 1) \
    .withColumn('revenue', F.col('avg_room_price') * (F.col('weekend_nights') + F.col('week_nights'))) \
    .groupBy('ID') \
    .agg(F.round(F.sum('revenue'), 2).alias('total_revenue'))

# Общая выручка постоянных гостей
total_revenue = query5_base.agg(F.sum('total_revenue')).collect()[0][0]

# ТОП-5 и их доли
query5 = query5_base \
    .withColumn('total_rev_const', F.lit(total_revenue)) \
    .withColumn('share_percent', F.round((F.col('total_revenue') / F.col('total_rev_const')) * 100, 2)) \
    .select('ID', 'total_revenue', 'share_percent') \
    .orderBy(F.desc('total_revenue')) \
    .limit(5)

print('Запрос 5: ТОП-5 постоянных гостей с наибольшей выручкой и их доля в общей выручке')
query5.show()

Запрос 5: ТОП-5 постоянных гостей с наибольшей выручкой и их доля в общей выручке
+--------+-------------+-------------+
|      ID|total_revenue|share_percent|
+--------+-------------+-------------+
|INN19235|       1754.4|         1.51|
|INN05222|        690.0|         0.59|
|INN14189|        665.0|         0.57|
|INN09923|        660.0|         0.57|
|INN25479|        650.0|         0.56|
+--------+-------------+-------------+



## Запрос 6: Количество гостей и процент загрузки на каждый день (включая пустые дни и при вместимости отеля в 400 мест)

In [None]:
# Запрос 6: Количество гостей и процент загрузки на каждый день, включая пустые дни
# (при вместимости отеля в 400 мест)
df_stay = df_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .select(
        'ID',
        F.col('n_adults').cast(IntegerType()).alias('n_adults'),
        F.col('n_children').cast(IntegerType()).alias('n_children'),
        F.col('weekend_nights').cast(IntegerType()),
        F.col('week_nights').cast(IntegerType()),
        'year', 'month', 'date'
    ) \
    .withColumn('check_in_date', 
        F.to_date(
            F.concat_ws('-', F.col('year'), 
                F.lpad(F.col('month'), 2, '0'),
                F.lpad(F.col('date'), 2, '0')
            ),
            'yyyy-MM-dd'
        )
    ) \
    .withColumn('total_nights', F.col('weekend_nights') + F.col('week_nights')) \
    .withColumn('guests_count', F.col('n_adults') + F.col('n_children'))

# Последовательность дат для каждого бронирования
df_exploded = df_stay \
    .select(
        'ID',
        'guests_count',
        'check_in_date',
        'total_nights',
        F.explode(
            F.sequence(
                F.col('check_in_date'),
                F.date_add(F.col('check_in_date'), F.col('total_nights') - 1)
            )
        ).alias('stay_date')
    ) \
    .select('stay_date', 'guests_count')

# Группировка по датам, подсчет гостей
df_daily_guests = df_exploded \
    .groupBy('stay_date') \
    .agg(F.sum('guests_count').alias('daily_guests')) \
    .select(F.col('stay_date').alias('calendar_dt'), 'daily_guests')

# Объединяем с календарем, чтобы включить пустые дни
query6 = df_calendar \
    .join(df_daily_guests, 'calendar_dt', how='left_outer') \
    .fillna(0, subset=['daily_guests']) \
    .withColumn('occupancy_percent', 
        F.round((F.col('daily_guests') / 400) * 100, 2)
    ) \
    .select('calendar_dt', 'daily_guests', 'occupancy_percent') \
    .orderBy(F.desc('calendar_dt'))

print('Запрос 6: Количество гостей и процент загрузки на каждый день, включая пустые дни\n(при вместимости отеля в 400 мест)')
query6.show(15, False)


Запрос 6: Количество гостей и процент загрузки на каждый день, включая пустые дни
(при вместимости отеля в 400 мест)
+-----------+------------+-----------------+
|calendar_dt|daily_guests|occupancy_percent|
+-----------+------------+-----------------+
|2018-12-31 |562         |140.5            |
|2018-12-30 |572         |143.0            |
|2018-12-29 |542         |135.5            |
|2018-12-28 |507         |126.75           |
|2018-12-27 |552         |138.0            |
|2018-12-26 |422         |105.5            |
|2018-12-25 |397         |99.25            |
|2018-12-24 |373         |93.25            |
|2018-12-23 |341         |85.25            |
|2018-12-22 |282         |70.5             |
|2018-12-21 |247         |61.75            |
|2018-12-20 |240         |60.0             |
|2018-12-19 |228         |57.0             |
|2018-12-18 |258         |64.5             |
|2018-12-17 |274         |68.5             |
+-----------+------------+-----------------+
only showing top 15 rows

