In [50]:
import os
import findspark

In [5]:
# Установка OpenJDK 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Скачивание Apache Spark
! wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

# Распаковка архива
! tar xf spark-3.3.1-bin-hadoop3.tgz

# Установка findspark
! pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [6]:
# Настройка переменных среды

# Путь к установленной Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Путь к распакованному Spark
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

# Инициализация findspark
findspark.init()

In [51]:
# Типы данных
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
# Функции
from pyspark.sql.functions import (
    col, explode, sequence, to_date,
    expr, when, sum as _sum,
    make_date,
    round, avg, format_number
)

# Инициализация Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("ColabSpark").getOrCreate()

# Проверяем, что сессия запущена
spark

In [52]:
# Шаг 1. Создать таблицу, используя csv-файл.
# Задаю схему
hotel_schema = StructType([
    StructField("ID", StringType(), True), # ID часто лучше держать строкой, если нет арифметики
    StructField("n_adults", IntegerType(), True),
    StructField("n_children", IntegerType(), True),
    StructField("weekend_nights", IntegerType(), True),
    StructField("week_nights", IntegerType(), True),
    StructField("meal_plan", StringType(), True),
    StructField("car_parking_space", IntegerType(), True), # 0 или 1
    StructField("room_type", StringType(), True),
    StructField("lead_time", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("date", IntegerType(), True),
    StructField("market_segment", StringType(), True),
    StructField("repeated_guest", IntegerType(), True), # 0 или 1
    StructField("previous_cancellations", IntegerType(), True),
    StructField("previous_bookings_not_canceled", IntegerType(), True),
    StructField("avg_room_price", DoubleType(), True), # Цена требует точности (Double)
    StructField("special_requests", IntegerType(), True),
    StructField("status", StringType(), True)
])

#  Чтение CSV с применением схемы
file_path = "Hotel.csv"

try:
    df_hotel = spark.read.csv(
        file_path,
        header=True,
        sep=",",
        schema=hotel_schema
    )

    # View
    df_hotel.createOrReplaceTempView("logs_hotel")

    print("Таблица logs_hotel успешно создана и загружена")

except Exception as e:
    print(f"Ошибка при чтении файла: {e}")

Таблица logs_hotel успешно создана и загружена


In [53]:
# Шаг 2. Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.
# Генерация календаря
df_calendar = spark.sql("""
    SELECT
      explode(
            sequence(
                to_date('2017-01-01'),
                to_date('2018-12-31'),
                interval 1 day
            )
        ) as calendar_dt
""")

# View
df_calendar.createOrReplaceTempView("calendar")

In [54]:
# Расширенное представление с вычисляемыми полями
# Собираю дату из года, месяца и дня
spark.sql("""
CREATE OR REPLACE TEMP VIEW hotel_enriched AS
SELECT
    *,
    -- Собираем полноценную дату прибытия
    make_date(year, month, date) as arrival_date_obj,

    -- Общее количество ночей
    (week_nights + weekend_nights) as total_nights,

    -- Выручка за бронирование (цена за ночь * кол-во ночей)
    (avg_room_price * (week_nights + weekend_nights)) as booking_revenue,

    -- Общее количество гостей
    (n_adults + n_children) as total_guests
FROM logs_hotel
""").show(1)
print("Представление hotel_enriched создано")

++
||
++
++

Представление hotel_enriched создано


In [55]:
# 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)
q1 = spark.sql("""
    SELECT
        year,
        month,
        ROUND(AVG(total_nights), 2) as avg_nights
    FROM hotel_enriched
    WHERE status != 'Canceled'
    GROUP BY year, month
    ORDER BY year, month
""")
q1.show(10)

+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|    7|      3.02|
|2017|    8|      2.72|
|2017|    9|      2.66|
|2017|   10|       2.7|
|2017|   11|      2.72|
|2017|   12|      3.04|
|2018|    1|      2.74|
|2018|    2|      2.69|
|2018|    3|      3.04|
|2018|    4|      2.92|
+----+-----+----------+
only showing top 10 rows



In [56]:
# 2.Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.
q2 = spark.sql("""
    SELECT
        month,
        COUNT(*) as total_bookings,
        SUM(CASE WHEN status = 'Canceled' THEN 1 ELSE 0 END) as canceled_bookings,
        ROUND(
            (SUM(CASE WHEN status = 'Canceled' THEN 1 ELSE 0 END) / COUNT(*)) * 100,
        2) as cancellation_rate_percent
    FROM hotel_enriched
    WHERE year = 2018
    GROUP BY month
    ORDER BY cancellation_rate_percent DESC
    LIMIT 3
""")
q2.show()

+-----+--------------+-----------------+-------------------------+
|month|total_bookings|canceled_bookings|cancellation_rate_percent|
+-----+--------------+-----------------+-------------------------+
|    8|          2799|             1303|                    46.55|
|   10|          3404|             1578|                    46.36|
|    9|          2962|             1356|                    45.78|
+-----+--------------+-----------------+-------------------------+



In [45]:
# 3.Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.
q3 = spark.sql("""
    SELECT
        year,
        month,
        ROUND(AVG(lead_time), 1) as avg_lead_time_days
    FROM hotel_enriched
    WHERE status != 'Canceled'
    GROUP BY year, month
    ORDER BY year, month
""")
q3.show(10)

+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|    7|             130.7|
|2017|    8|              35.1|
|2017|    9|              51.7|
|2017|   10|              55.9|
|2017|   11|              33.3|
|2017|   12|              46.7|
|2018|    1|              34.9|
|2018|    2|              30.5|
|2018|    3|              43.2|
|2018|    4|              62.5|
+----+-----+------------------+
only showing top 10 rows



In [57]:
# 4.Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT)

# Фильтруем подтвержденные бронирования
df_pivot = spark.table("hotel_enriched").filter("status != 'Canceled'")

# Группируем, пивотим по сегменту рынка и считаем среднюю выручку
pivot_table_rounded = df_pivot.groupBy("year", "month") \
    .pivot("market_segment") \
    .agg(round(avg("booking_revenue"), 2).alias("avg_rounded_revenue")) \
    .orderBy("year", "month")
# Вывод
pivot_table_rounded.show()

+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|    7|    null|         22.4|   113.75| 228.95|290.56|
|2017|    8|    null|         0.32|   156.42| 235.54|284.21|
|2017|    9|    null|        16.89|   177.83| 236.65|348.55|
|2017|   10|    null|         1.09|   180.26| 223.24|311.47|
|2017|   11|    null|        14.81|   102.97| 198.36|240.52|
|2017|   12|    null|         0.25|   141.11| 253.86|258.93|
|2018|    1|    null|         2.27|   113.03| 210.51|236.09|
|2018|    2|   352.0|         1.39|   115.06| 251.85|238.07|
|2018|    3|  118.33|        38.17|   142.39| 233.39|301.71|
|2018|    4|  321.81|          0.0|   108.42| 236.44|320.08|
|2018|    5|   262.5|          0.0|    229.5| 274.55|352.34|
|2018|    6|   247.0|          0.0|   148.13| 251.98|335.03|
|2018|    7|    79.0|         5.38|   146.99| 310.36|390.05|
|2018|    8|    null|   

In [58]:
# 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.
q5_modified = spark.sql("""
    WITH repeated_guests_stats AS (
        SELECT
            ID,
            ROUND(booking_revenue, 2) as booking_revenue_rounded
        FROM hotel_enriched
        WHERE repeated_guest = 1 AND status != 'Canceled'
    ),

    aggregated_guests AS (
        SELECT
            ID,
            SUM(booking_revenue_rounded) as total_guest_revenue
        FROM repeated_guests_stats
        GROUP BY ID
        ORDER BY total_guest_revenue DESC
        LIMIT 5
    ),

    total_repeated_revenue AS (
        SELECT SUM(ROUND(booking_revenue, 2)) as total_rev
        FROM hotel_enriched
        WHERE repeated_guest = 1 AND status != 'Canceled'
    )

    SELECT
        t.ID,
        t.total_guest_revenue,
        ROUND((t.total_guest_revenue / tr.total_rev) * 100, 2) as revenue_share_percent
    FROM aggregated_guests t
    CROSS JOIN total_repeated_revenue tr
    ORDER BY t.total_guest_revenue DESC
""")
q5_modified.show()

+--------+-------------------+---------------------+
|      ID|total_guest_revenue|revenue_share_percent|
+--------+-------------------+---------------------+
|INN19235|             1754.4|                 1.55|
|INN05222|              690.0|                 0.61|
|INN14189|              665.0|                 0.59|
|INN09923|              660.0|                 0.58|
|INN25479|              650.0|                 0.57|
+--------+-------------------+---------------------+



In [59]:
# 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.
q6 = spark.sql("""
    WITH booking_dates AS (
        SELECT
            arrival_date_obj as start_date,
            -- Дата выезда = Дата заезда + ночи
            date_add(arrival_date_obj, total_nights) as end_date,
            total_guests
        FROM hotel_enriched
        WHERE status != 'Canceled'
    ),
    daily_stats AS (
        SELECT
            c.calendar_dt,
            -- Если бронь попадает на дату календаря (не включая день выезда) считаем гостей
            SUM(COALESCE(b.total_guests, 0)) as total_guests_on_day
        FROM calendar c
        LEFT JOIN booking_dates b
            ON c.calendar_dt >= b.start_date
            AND c.calendar_dt < b.end_date -- Гость не занимает место в ночь выезда
        GROUP BY c.calendar_dt
    )

    SELECT
        calendar_dt,
        total_guests_on_day,
        ROUND((total_guests_on_day / 400) * 100, 2) as occupancy_percent
    FROM daily_stats
    ORDER BY calendar_dt DESC
""")
q6.show(10)

+-----------+-------------------+-----------------+
|calendar_dt|total_guests_on_day|occupancy_percent|
+-----------+-------------------+-----------------+
| 2018-12-31|                562|            140.5|
| 2018-12-30|                572|            143.0|
| 2018-12-29|                542|            135.5|
| 2018-12-28|                507|           126.75|
| 2018-12-27|                552|            138.0|
| 2018-12-26|                422|            105.5|
| 2018-12-25|                397|            99.25|
| 2018-12-24|                373|            93.25|
| 2018-12-23|                341|            85.25|
| 2018-12-22|                282|             70.5|
+-----------+-------------------+-----------------+
only showing top 10 rows

