In [1]:
!pip install pyspark
!pip install findspark

import findspark
findspark.init()


from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('CSV_Analysis') \
    .getOrCreate()

# Проверка версии Spark
print(f"Spark версия: {spark.version}")

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Spark версия: 4.0.1


In [7]:
!ls -la /content/Hotel.csv

ls: cannot access '/content/Hotel.csv': No such file or directory


In [9]:
# Загрузка данных

from google.colab import files
uploaded = files.upload()  # тут надо выбрать Hotel.csv, где он лежит на диске


logs_hotel = spark.read.csv('Hotel.csv',
                             header=True,
                             inferSchema=True)

print(f"Таблица logs_hotel создана. Строк: {logs_hotel.count()}")
logs_hotel.show(5,0)

Saving Hotel.csv to Hotel (1).csv
Таблица logs_hotel создана. Строк: 36275
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0

In [10]:
# Шаг 2. Создание таблицы calendar с датами от 2017-01-01 до 2018-12-31

from pyspark.sql import functions as F
from datetime import datetime, timedelta

# Создаем список всех дат
start_date = datetime(2017, 1, 1)
end_date = datetime(2018, 12, 31)
date_list = []

current_date = start_date
while current_date <= end_date:
    date_list.append((current_date.strftime('%Y-%m-%d'),))
    current_date += timedelta(days=1)

# Создаем Spark DataFrame
calendar = spark.createDataFrame(date_list, ['calendar_dt'])

# Проверка
print(f"Таблица calendar создана. Количество строк: {calendar.count()}")
calendar.show(5)
print("Последние 5 дат:")
calendar.tail(5)


Таблица calendar создана. Количество строк: 730
+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
+-----------+
only showing top 5 rows
Последние 5 дат:


[Row(calendar_dt='2018-12-27'),
 Row(calendar_dt='2018-12-28'),
 Row(calendar_dt='2018-12-29'),
 Row(calendar_dt='2018-12-30'),
 Row(calendar_dt='2018-12-31')]

In [12]:
# Шаг 3. Задание 1: Вычислить среднее количество ночей для подтвержденных бронирований
# с детализацией по месяцам и годам

# Фильтруем только подтвержденные бронирования (Not_Canceled)
# Суммируем weekend_nights и week_nights для общего количества ночей
avg_nights = logs_hotel.filter(F.col('status') == 'Not_Canceled') \
    .withColumn('total_nights', F.col('weekend_nights') + F.col('week_nights')) \
    .groupBy('year', 'month') \
    .agg(F.avg('total_nights').alias('avg_nights')) \
    .orderBy('year', 'month')

# Показываем результат
print("Среднее количество ночей для подтвержденных бронирований по месяцам и годам:")

avg_nights_rounded = avg_nights.withColumn('avg_nights', F.round('avg_nights', 2))
avg_nights_rounded.show(24, truncate=False)


Среднее количество ночей для подтвержденных бронирований по месяцам и годам:
+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|7    |3.02      |
|2017|8    |2.72      |
|2017|9    |2.66      |
|2017|10   |2.7       |
|2017|11   |2.72      |
|2017|12   |3.04      |
|2018|1    |2.74      |
|2018|2    |2.69      |
|2018|3    |3.04      |
|2018|4    |2.92      |
|2018|5    |2.81      |
|2018|6    |2.6       |
|2018|7    |3.19      |
|2018|8    |3.15      |
|2018|9    |2.79      |
|2018|10   |2.89      |
|2018|11   |2.98      |
|2018|12   |3.25      |
+----+-----+----------+



In [13]:
# Шаг 3. Задание 2: Определить ТОП-3 месяца по проценту отмененных броней за 2018 год

# Фильтруем только 2018 год
# Считаем количество отмененных и общее количество бронирований по месяцам
cancellation_rate_2018 = logs_hotel.filter(F.col('year') == 2018) \
    .groupBy('month') \
    .agg(
        F.count('*').alias('total_bookings'),
        F.sum(F.when(F.col('status') == 'Canceled', 1).otherwise(0)).alias('canceled_bookings')
    ) \
    .withColumn('cancellation_rate',
                F.round((F.col('canceled_bookings') / F.col('total_bookings')) * 100, 2)) \
    .orderBy(F.col('cancellation_rate').desc()) \
    .limit(3)

print("ТОП-3 месяца по проценту отмененных броней за 2018 год:")
cancellation_rate_2018.show(truncate=False)


ТОП-3 месяца по проценту отмененных броней за 2018 год:
+-----+--------------+-----------------+-----------------+
|month|total_bookings|canceled_bookings|cancellation_rate|
+-----+--------------+-----------------+-----------------+
|8    |2799          |1303             |46.55            |
|10   |3404          |1578             |46.36            |
|9    |2962          |1356             |45.78            |
+-----+--------------+-----------------+-----------------+



In [14]:
# Шаг 3. Задание 3: Вычислить среднее время между бронированием и заездом
# для подтвержденных броней

# Вычисляем среднее время (lead_time) для подтвержденных бронирований
# с группировкой по месяцам и годам

avg_lead_time = logs_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .groupBy('year', 'month') \
    .agg(F.avg('lead_time').alias('avg_lead_time')) \
    .withColumn('avg_lead_time', F.round('avg_lead_time', 2)) \
    .orderBy('year', 'month')

print("Среднее время между бронированием и заездом (в днях) для подтвержденных броней:")
avg_lead_time.show(24, truncate=False)


Среднее время между бронированием и заездом (в днях) для подтвержденных броней:
+----+-----+-------------+
|year|month|avg_lead_time|
+----+-----+-------------+
|2017|7    |130.73       |
|2017|8    |35.08        |
|2017|9    |51.72        |
|2017|10   |55.89        |
|2017|11   |33.28        |
|2017|12   |46.75        |
|2018|1    |34.87        |
|2018|2    |30.53        |
|2018|3    |43.19        |
|2018|4    |62.49        |
|2018|5    |60.99        |
|2018|6    |70.64        |
|2018|7    |86.88        |
|2018|8    |83.09        |
|2018|9    |63.32        |
|2018|10   |73.24        |
|2018|11   |44.25        |
|2018|12   |69.75        |
+----+-----+-------------+



In [15]:
# Шаг 3. Задание 4: Вычислить общую среднюю выручку на каждый месяц в каждом году,
# сгруппировав по всем типам бронирования для подтвержденных броней,
# и вывести это в виде сводной таблицы (PIVOT)

# Вычисляю среднюю выручку по месяцам, годам и типам бронирования
avg_revenue_pivot = logs_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .groupBy('year', 'month', 'market_segment') \
    .agg(F.avg('avg_room_price').alias('avg_revenue')) \
    .groupBy('year', 'month') \
    .pivot('market_segment') \
    .agg(F.first('avg_revenue')) \
    .orderBy('year', 'month')

# Округляем значения для читаемости
for col_name in avg_revenue_pivot.columns:
    if col_name not in ['year', 'month']:
        avg_revenue_pivot = avg_revenue_pivot.withColumn(col_name, F.round(col_name, 2))

print("Средняя выручка по месяцам, годам и типам бронирования (PIVOT):")
avg_revenue_pivot.show(24, truncate=False)


Средняя выручка по месяцам, годам и типам бронирования (PIVOT):
+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|7    |NULL    |7.47         |65.0     |88.19  |65.55 |
|2017|8    |NULL    |0.32         |66.92    |97.16  |97.86 |
|2017|9    |NULL    |7.37         |86.03    |96.8   |113.33|
|2017|10   |NULL    |0.55         |84.48    |89.08  |101.97|
|2017|11   |NULL    |4.94         |68.08    |66.18  |84.75 |
|2017|12   |NULL    |0.05         |69.03    |69.95  |87.31 |
|2018|1    |NULL    |1.73         |68.88    |73.97  |80.19 |
|2018|2    |88.0    |0.75         |74.98    |75.07  |87.5  |
|2018|3    |89.0    |16.2         |74.05    |72.64  |96.81 |
|2018|4    |97.57   |0.0          |80.0     |86.83  |105.49|
|2018|5    |101.0   |0.0          |100.9    |94.97  |123.91|
|2018|6    |95.0    |0.0          |84.83    |103.01 |119.3 |
|2018|7    |79.0    |

In [16]:
# Шаг 3. Задание 5: Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку,
# и показать их долю в общей выручке от постоянных гостей.
# Использовать уникальный идентификатор брони как уникальный идентификатор гостя (1 бронь = 1 гость)

from pyspark.sql.window import Window

# Считаем выручку по каждому ID (гостю) среди постоянных гостей
guest_revenue = logs_hotel \
    .filter(F.col('repeated_guest') == 1) \
    .groupBy('ID') \
    .agg(F.sum('avg_room_price').alias('total_revenue')) \
    .orderBy(F.col('total_revenue').desc())

# Считаем общую выручку от всех постоянных гостей
total_repeated_revenue = guest_revenue.agg(F.sum('total_revenue')).collect()[0][0]

# Берем ТОП-5 гостей
top5_guests = guest_revenue.limit(5)

# Добавляем колонку с долей в общей выручке
top5_guests_with_share = top5_guests \
    .withColumn('revenue_share_percent',
                F.round((F.col('total_revenue') / total_repeated_revenue) * 100, 2))

print(f"Общая выручка от постоянных гостей: {total_repeated_revenue}")
print("\nТОП-5 постоянных гостей по выручке:")
top5_guests_with_share.show(truncate=False)


Общая выручка от постоянных гостей: 60998.24999999999

ТОП-5 постоянных гостей по выручке:
+--------+-------------+---------------------+
|ID      |total_revenue|revenue_share_percent|
+--------+-------------+---------------------+
|INN09923|220.0        |0.36                 |
|INN35697|174.0        |0.29                 |
|INN25587|164.0        |0.27                 |
|INN35599|159.0        |0.26                 |
|INN06463|159.0        |0.26                 |
+--------+-------------+---------------------+



In [23]:
# Шаг 3. Задание 6: Вывести общее количество гостей на каждый день в отеле,
# отсортировав по убыванию дат, включая дни, когда отель пустует.
# Также рассчитать процент загрузки для каждого дня (вместимость отеля 400 человек).


# Фильтруем невалидные даты ДО преобразования
# 2018-02-29 не существует, отфильтруем такие строки, не могу по другому исправить это
checkin_data = logs_hotel \
    .filter(F.col('status') == 'Not_Canceled') \
    .filter(~((F.col('year') == 2018) & (F.col('month') == 2) & (F.col('date') == 29))) \
    .withColumn('checkin_date_str',
                F.concat_ws('-', F.col('year'),
                           F.lpad(F.col('month'), 2, '0'),
                           F.lpad(F.col('date'), 2, '0'))) \
    .withColumn('total_nights', F.col('weekend_nights') + F.col('week_nights')) \
    .withColumn('total_guests', F.col('n_adults') + F.col('n_children')) \
    .withColumn('checkin_date', F.to_date('checkin_date_str'))

# Создаем список всех дней для каждого бронирования
daily_occupancy = checkin_data \
    .withColumn('stay_dates',
                F.expr('sequence(checkin_date, date_add(checkin_date, total_nights - 1))')) \
    .withColumn('stay_date', F.explode('stay_dates')) \
    .groupBy('stay_date') \
    .agg(F.sum('total_guests').alias('total_guests_per_day')) \
    .withColumn('occupancy_percent',
                F.round((F.col('total_guests_per_day') / 400) * 100, 2))

# Преобразуем calendar и делаем join
calendar_dates = calendar.withColumn('date', F.to_date('calendar_dt'))

calendar_with_guests = calendar_dates \
    .join(daily_occupancy, calendar_dates.date == daily_occupancy.stay_date, 'left') \
    .select(
        F.col('date'),
        F.coalesce(F.col('total_guests_per_day'), F.lit(0)).alias('total_guests'),
        F.coalesce(F.col('occupancy_percent'), F.lit(0.0)).alias('occupancy_percent')
    ) \
    .orderBy(F.col('date').desc())

print("Общее количество гостей и процент загрузки на каждый день:")
calendar_with_guests.show(30, truncate=False)


Общее количество гостей и процент загрузки на каждый день:
+----------+------------+-----------------+
|date      |total_guests|occupancy_percent|
+----------+------------+-----------------+
|2018-12-31|562         |140.5            |
|2018-12-30|572         |143.0            |
|2018-12-29|542         |135.5            |
|2018-12-28|507         |126.75           |
|2018-12-27|552         |138.0            |
|2018-12-26|422         |105.5            |
|2018-12-25|397         |99.25            |
|2018-12-24|373         |93.25            |
|2018-12-23|341         |85.25            |
|2018-12-22|282         |70.5             |
|2018-12-21|247         |61.75            |
|2018-12-20|240         |60.0             |
|2018-12-19|228         |57.0             |
|2018-12-18|258         |64.5             |
|2018-12-17|274         |68.5             |
|2018-12-16|254         |63.5             |
|2018-12-15|170         |42.5             |
|2018-12-14|155         |38.75            |
|2018-12-13|153  