In [1]:
pip install -q pyspark

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Hotel Analysis - Pure PySpark") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("Сессия создана")

Сессия создана


In [3]:
from google.colab import files

uploaded = files.upload()

Saving Hotel.csv to Hotel (2).csv


In [16]:
from pyspark.sql.types import *

hotel_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("Hotel.csv")

# Проверяем типы и первые строки
hotel_df.printSchema()

hotel_df.show(5)



root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------

In [18]:
from datetime import datetime, timedelta
from pyspark.sql import Row

# Диапазон дат: с 2017-01-01 по 2018-12-31
start = datetime.strptime("2017-01-01", "%Y-%m-%d")
end = datetime.strptime("2018-12-31", "%Y-%m-%d")

# Генерируем список объектов Row
date_rows = []
current = start
while current <= end:
    date_rows.append(Row(calendar_dt=current.date()))  # Row с полем calendar_dt
    current += timedelta(days=1)

# Создаём RDD, затем преобразуем в DataFrame
calendar_rdd = spark.sparkContext.parallelize(date_rows)
calendar_df = spark.createDataFrame(calendar_rdd)

# Регистрируем как временный view (чтобы можно было использовать в будущем с SQL, если понадобится)
calendar_df.createOrReplaceTempView("calendar")

print("Таблица calendar создана. Первые 5 строк:")
calendar_df.show(5)

print(f"Общее количество дней: {calendar_df.count()}")

Таблица calendar создана. Первые 5 строк:
+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
+-----------+
only showing top 5 rows
Общее количество дней: 730


In [19]:
from pyspark.sql.functions import col, concat, lit, to_date, year, month, when

# Удаляем только строку с 29 февраля 2018
hotel_clean = hotel_df.filter(
    ~((col("year") == 2018) & (col("month") == 2) & (col("date") == 29))
)


In [20]:
from pyspark.sql.functions import col, concat, lit, to_date

# Создаём колонку checkin_date в hotel_clean
hotel_clean = hotel_clean \
    .withColumn(
        "checkin_date",
        to_date(concat(col("year"), lit("-"), col("month"), lit("-"), col("date")), "yyyy-M-d")
    )

# Проверим, что колонка появилась
print("Схема после добавления checkin_date:")
hotel_clean.printSchema()

print("Пример данных:")
hotel_clean.select("year", "month", "date", "checkin_date").show(5)

Схема после добавления checkin_date:
root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- checkin_date: date (nullable = true)

Пример данных:
+----+-----+----+------------+
|year|month|date|checkin_date|

**1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)**

In [21]:
from pyspark.sql.functions import col, when, sum as spark_sum, count, avg, concat, lit, to_date, year, month

# Только подтверждённые брони
hotel_confirmed = hotel_clean.filter(col("status") == "Not_Canceled")

# Считаем ночи
hotel_with_nights = hotel_confirmed \
    .withColumn("total_nights", col("weekend_nights") + col("week_nights"))

# Создаём дату
hotel_with_checkin = hotel_with_nights \
    .withColumn(
        "checkin_date",
        to_date(concat(col("year"), lit("-"), col("month"), lit("-"), col("date")), "yyyy-M-d")
    )

# Год и месяц
hotel_with_month = hotel_with_checkin \
    .withColumn("checkin_year", year("checkin_date")) \
    .withColumn("checkin_month", month("checkin_date"))

# Среднее по месяцам
avg_nights_by_month = hotel_with_month \
    .groupBy("checkin_year", "checkin_month") \
    .agg(avg("total_nights").alias("avg_nights")) \
    .orderBy("checkin_year", "checkin_month")

print("Среднее количество ночей по месяцам (подтверждённые брони):")
avg_nights_by_month.show()

Среднее количество ночей по месяцам (подтверждённые брони):
+------------+-------------+------------------+
|checkin_year|checkin_month|        avg_nights|
+------------+-------------+------------------+
|        2017|            7|3.0166666666666666|
|        2017|            8|2.7189384800965017|
|        2017|            9|2.6550783912747105|
|        2017|           10|2.7032898820608318|
|        2017|           11|2.7241935483870967|
|        2017|           12| 3.043046357615894|
|        2018|            1|2.7414141414141415|
|        2018|            2|2.6768488745980705|
|        2018|            3|3.0392038600723765|
|        2018|            4| 2.924755887421022|
|        2018|            5|2.8054545454545456|
|        2018|            6| 2.596757322175732|
|        2018|            7|3.1938088829071334|
|        2018|            8|3.1544117647058822|
|        2018|            9| 2.786425902864259|
|        2018|           10|2.8910186199342824|
|        2018|           11|

**2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.**

In [22]:
# Брони за 2018
hotel_2018 = hotel_clean.filter(col("year") == 2018)

# Флаг отмены
hotel_with_cancel = hotel_2018 \
    .withColumn("is_canceled", when(col("status") == "Canceled", 1).otherwise(0))

# По месяцам
cancellation_by_month = hotel_with_cancel \
    .groupBy("month") \
    .agg(
        count("*").alias("total"),
        spark_sum("is_canceled").alias("canceled")
    ) \
    .withColumn("cancellation_rate", col("canceled") / col("total") * 100)

# ТОП-3
top3_cancellation_months = cancellation_by_month \
    .orderBy(col("cancellation_rate").desc()) \
    .limit(3)

print("ТОП-3 месяца 2018 по проценту отмен:")
top3_cancellation_months.show()

ТОП-3 месяца 2018 по проценту отмен:
+-----+-----+--------+-----------------+
|month|total|canceled|cancellation_rate|
+-----+-----+--------+-----------------+
|    8| 2799|    1303|46.55234012147195|
|   10| 3404|    1578| 46.3572267920094|
|    9| 2962|    1356|45.77987846049966|
+-----+-----+--------+-----------------+



**3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.**

In [23]:
# Подтверждённые брони
hotel_confirmed = hotel_clean.filter(col("status") == "Not_Canceled")

# Дата заселения
hotel_with_checkin = hotel_confirmed \
    .withColumn(
        "checkin_date",
        to_date(concat(col("year"), lit("-"), col("month"), lit("-"), col("date")), "yyyy-M-d")
    )

# Год и месяц заселения
hotel_with_month = hotel_with_checkin \
    .withColumn("checkin_year", year("checkin_date")) \
    .withColumn("checkin_month", month("checkin_date"))

# Среднее lead_time
avg_lead_time_by_month = hotel_with_month \
    .groupBy("checkin_year", "checkin_month") \
    .agg(
        avg("lead_time").alias("avg_lead_time_days")
    ) \
    .orderBy("checkin_year", "checkin_month")

print("Среднее время между бронированием и заездом по месяцам (подтверждённые):")
avg_lead_time_by_month.show()


Среднее время между бронированием и заездом по месяцам (подтверждённые):
+------------+-------------+------------------+
|checkin_year|checkin_month|avg_lead_time_days|
+------------+-------------+------------------+
|        2017|            7|130.73333333333332|
|        2017|            8| 35.08082026537998|
|        2017|            9| 51.72188139059305|
|        2017|           10|55.885164494103044|
|        2017|           11| 33.28225806451613|
|        2017|           12|46.746136865342166|
|        2018|            1| 34.87171717171717|
|        2018|            2|30.272508038585208|
|        2018|            3| 43.19420989143546|
|        2018|            4|62.492820218265365|
|        2018|            5| 60.98909090909091|
|        2018|            6| 70.63546025104603|
|        2018|            7| 86.88021534320323|
|        2018|            8| 83.08622994652407|
|        2018|            9| 63.32316313823163|
|        2018|           10| 73.24370208105148|
|        2018| 

**4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).**

In [24]:
# Подтверждённые
hotel_confirmed = hotel_clean.filter(col("status") == "Not_Canceled")

# Выручка
hotel_with_revenue = hotel_confirmed \
    .withColumn("total_nights", col("weekend_nights") + col("week_nights")) \
    .withColumn("total_revenue", col("avg_room_price") * col("total_nights"))

# Дата заселения
hotel_with_checkin = hotel_with_revenue \
    .withColumn(
        "checkin_date",
        to_date(concat(col("year"), lit("-"), col("month"), lit("-"), col("date")), "yyyy-M-d")
    )

# Год и месяц
hotel_with_month = hotel_with_checkin \
    .withColumn("year", year("checkin_date")) \
    .withColumn("month", month("checkin_date"))

# PIVOT
pivot_revenue = hotel_with_month \
    .groupBy("month") \
    .pivot("market_segment", ["Offline", "Online", "Corporate", "Complementary", "Aviation", "Direct"]) \
    .agg(avg("total_revenue")) \
    .orderBy("month")

print("Средняя выручка по месяцам и каналам бронирования (PIVOT):")
pivot_revenue.show()

Средняя выручка по месяцам и каналам бронирования (PIVOT):
+-----+------------------+------------------+------------------+------------------+------------------+------+
|month|           Offline|            Online|         Corporate|     Complementary|          Aviation|Direct|
+-----+------------------+------------------+------------------+------------------+------------------+------+
|    1| 210.5148407643312| 236.0947252747252|          113.0301|2.2666666666666666|              NULL|  NULL|
|    2|253.50948328267472|234.30563380281694| 112.3742780748663|1.1176470588235294|             352.0|  NULL|
|    3|233.39374707259955|301.70977121067654|142.39499999999998| 38.17391304347826|118.33333333333333|  NULL|
|    4|236.43707070707077| 320.0847086991219|108.42033898305085|               0.0| 321.8095238095238|  NULL|
|    5|274.54517401392116|352.33520813165563| 229.4993464052288|               0.0|             262.5|  NULL|
|    6|251.97596774193568|335.02831155778875|            148.

**5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.**

In [25]:
# Постоянные и подтверждённые
loyal_guests = hotel_clean \
    .filter((col("repeated_guest") == 1) & (col("status") == "Not_Canceled"))

# Выручка
loyal_with_revenue = loyal_guests \
    .withColumn("total_nights", col("weekend_nights") + col("week_nights")) \
    .withColumn("revenue", col("avg_room_price") * col("total_nights"))

# По ID
guest_revenue = loyal_with_revenue \
    .groupBy("ID") \
    .agg(spark_sum("revenue").alias("total_revenue"))

# Общая выручка
total_loyal_revenue = guest_revenue.agg(spark_sum("total_revenue")).collect()[0][0]

# Доля и ТОП-5
guest_with_share = guest_revenue \
    .withColumn("revenue_share", col("total_revenue") / lit(total_loyal_revenue) * 100) \
    .orderBy(col("total_revenue").desc()) \
    .limit(5)

print("ТОП-5 постоянных гостей по выручке:")
guest_with_share.show()
print(f"Общая выручка от постоянных гостей: {total_loyal_revenue:,.2f}")

ТОП-5 постоянных гостей по выручке:
+--------+------------------+------------------+
|      ID|     total_revenue|     revenue_share|
+--------+------------------+------------------+
|INN19235|1754.3999999999999|1.5582592581374948|
|INN05222|             690.0|  0.61285846335777|
|INN14189|             665.0|0.5906534465694449|
|INN09923|             660.0|0.5862124432117799|
|INN25479|             650.0|0.5773304364964499|
+--------+------------------+------------------+

Общая выручка от постоянных гостей: 112,587.17


**6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.**

In [26]:
from pyspark.sql.functions import col, sum as spark_sum, coalesce, lit

# Подтверждённые брони
hotel_occupied = hotel_clean.filter(col("status") == "Not_Canceled")

# Считаем число гостей
hotel_with_guests = hotel_occupied \
    .withColumn("guests", col("n_adults") + col("n_children"))

# Группируем по checkin_date (теперь она есть!)
guests_per_day = hotel_with_guests \
    .groupBy("checkin_date") \
    .agg(
        spark_sum("guests").alias("total_guests")
    )

# Джойним с календарём
occupancy_daily = calendar_df \
    .join(
        guests_per_day,
        calendar_df.calendar_dt == col("checkin_date"),
        "left"
    ) \
    .select(
        col("calendar_dt").alias("date"),
        coalesce(col("total_guests"), lit(0)).alias("total_guests")
    ) \
    .withColumn(
        "occupancy_rate",
        col("total_guests") / 400 * 100  # вместимость = 400
    ) \
    .orderBy(col("date").desc())

print("Загрузка отеля по дням:")
occupancy_daily.show(10)


Загрузка отеля по дням:
+----------+------------+------------------+
|      date|total_guests|    occupancy_rate|
+----------+------------+------------------+
|2018-12-31|          67|             16.75|
|2018-12-30|         166|              41.5|
|2018-12-29|         162|              40.5|
|2018-12-28|         134|              33.5|
|2018-12-27|         263|             65.75|
|2018-12-26|         117|             29.25|
|2018-12-25|          84|              21.0|
|2018-12-24|          98|              24.5|
|2018-12-23|         113|28.249999999999996|
|2018-12-22|          89|             22.25|
+----------+------------+------------------+
only showing top 10 rows
