In [1]:
!pip install pyspark findspark -q

In [2]:
import findspark
findspark.init()

import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

spark = (
    SparkSession
    .builder
    .appName("Hotel_logs")
    .config("spark.ui.port", "9311")
    .getOrCreate()
)

spark

In [3]:
from pyspark.sql import functions as F

In [4]:
df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("sep", ",")
    .csv("Hotel.csv")
)

df.show(5)
df.printSchema()
df.count()

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

36275

In [5]:
calendar_df = (
    spark.range(1)
    .select(
        F.explode(
            F.sequence(
                F.to_date(F.lit("2017-01-01"), "yyyy-MM-dd"),
                F.to_date(F.lit("2018-12-31"), "yyyy-MM-dd"),
                F.expr("interval 1 day")
            )
        ).alias("dt")
    )
    .select(
        F.date_format("dt", "yyyy-dd-MM").alias("calendar_dt")
    )
)

calendar_df.createOrReplaceTempView("calendar")
calendar_df.show(5)
calendar_df.printSchema()
calendar_df.count()

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-02-01|
| 2017-03-01|
| 2017-04-01|
| 2017-05-01|
+-----------+
only showing top 5 rows
root
 |-- calendar_dt: string (nullable = false)



730

In [6]:
calendar_dates = calendar_df.select(F.col("calendar_dt").alias("cal_dt"))


# 1.	Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам). Среднее количество ночей рассчитано только для подтвержденных броней, агрегировано по месяцам и годам, корректный вывод.

In [7]:
from pyspark.sql import functions as F

avg_nights_by_year_month = (
    df
    .filter(F.col("status") == "Not_Canceled")
    .withColumn("total_nights", F.col("weekend_nights") + F.col("week_nights"))
    .groupBy("year", "month")
    .agg(F.avg("total_nights").alias("avg_nights"))
    .orderBy("year", "month")
)

avg_nights_by_year_month.show()


+----+-----+------------------+
|year|month|        avg_nights|
+----+-----+------------------+
|2017|    7|3.0166666666666666|
|2017|    8|2.7189384800965017|
|2017|    9|2.6550783912747105|
|2017|   10|2.7032898820608318|
|2017|   11|2.7241935483870967|
|2017|   12| 3.043046357615894|
|2018|    1|2.7414141414141415|
|2018|    2|2.6891679748822606|
|2018|    3|3.0392038600723765|
|2018|    4| 2.924755887421022|
|2018|    5|2.8054545454545456|
|2018|    6| 2.596757322175732|
|2018|    7|3.1938088829071334|
|2018|    8|3.1544117647058822|
|2018|    9| 2.786425902864259|
|2018|   10|2.8910186199342824|
|2018|   11|2.9784511784511785|
|2018|   12|3.2521891418563924|
+----+-----+------------------+



# 2.	Определить ТОП-3 месяца по проценту отмененных броней за 2018 год. Правильный фильтр по 2018 году, рассчитано отношение отмененных к общим бронированиям по месяцам, отсортировано по убыванию, выбраны ТОП-3.

In [8]:
top3_cancel_months_2018 = (
    df
    .filter(F.col("year") == 2018)
    .groupBy("month")
    .agg(
        F.count("*").alias("total_bookings"),
        F.sum(F.when(F.col("status") == "Canceled", 1).otherwise(0)).alias("canceled_bookings")
    )
    .withColumn(
        "cancel_rate",
        F.col("canceled_bookings") / F.col("total_bookings")
    )
    .orderBy(F.col("cancel_rate").desc())
    .limit(3)
)

top3_cancel_months_2018.show()

+-----+--------------+-----------------+------------------+
|month|total_bookings|canceled_bookings|       cancel_rate|
+-----+--------------+-----------------+------------------+
|    8|          2799|             1303|0.4655234012147195|
|   10|          3404|             1578| 0.463572267920094|
|    9|          2962|             1356|0.4577987846049966|
+-----+--------------+-----------------+------------------+



# 3.	Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней. Корректно рассчитано среднее время, создан фильтр, группировка по месяцу, правильное округление и формат.

3.1. Вариант 1. Исходим из того, что требуется посчитать среднее за каждый календарный месяц.

In [9]:
avg_lead_time_by_month = (
    df
    .filter(F.col("status") == "Not_Canceled")
    .groupBy("year", "month")
    .agg(
        F.round(F.avg("lead_time"), 2).alias("avg_lead_time_days")
    )
    .orderBy("year", "month")
)

avg_lead_time_by_month.show()

+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|    7|            130.73|
|2017|    8|             35.08|
|2017|    9|             51.72|
|2017|   10|             55.89|
|2017|   11|             33.28|
|2017|   12|             46.75|
|2018|    1|             34.87|
|2018|    2|             30.53|
|2018|    3|             43.19|
|2018|    4|             62.49|
|2018|    5|             60.99|
|2018|    6|             70.64|
|2018|    7|             86.88|
|2018|    8|             83.09|
|2018|    9|             63.32|
|2018|   10|             73.24|
|2018|   11|             44.25|
|2018|   12|             69.75|
+----+-----+------------------+



3.2. Вариант 2. Полагаем, что требуется посчитать среднее за месяц из всей выборки (например, среднее за январь без учета года)

In [10]:
avg_lead_time_by_month_only = (
    df
    .filter(F.col("status") == "Not_Canceled")
    .groupBy("month")
    .agg(
        F.round(F.avg("lead_time"), 2).alias("avg_lead_time_days")
    )
    .orderBy("month")
)

avg_lead_time_by_month_only.show()

+-----+------------------+
|month|avg_lead_time_days|
+-----+------------------+
|    1|             34.87|
|    2|             30.53|
|    3|             43.19|
|    4|             62.49|
|    5|             60.99|
|    6|             70.64|
|    7|             90.16|
|    8|             65.97|
|    9|             57.78|
|   10|             65.11|
|   11|             41.02|
|   12|             61.79|
+-----+------------------+



# 4.	Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT). Корректная фильтрация по подтвержденным броням, группировка по году, месяцу, типу бронирования, подсчет средней выручки и вывод в виде сводной таблицы (PIVOT)

In [11]:
base_agg = (
    df
    .filter(F.col("status") == "Not_Canceled")
    .groupBy("year", "month", "market_segment")
    .agg(
        F.round(F.avg("avg_room_price"), 2)
         .alias("avg_revenue")
    )
)

pivot_revenue = (
    base_agg
    .groupBy("year", "month")
    .pivot("market_segment")
    .agg(F.first("avg_revenue"))
    .orderBy("year", "month")
)

pivot_revenue.show()

+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|    7|    NULL|         7.47|     65.0|  88.19| 65.55|
|2017|    8|    NULL|         0.32|    66.92|  97.16| 97.86|
|2017|    9|    NULL|         7.37|    86.03|   96.8|113.33|
|2017|   10|    NULL|         0.55|    84.48|  89.08|101.97|
|2017|   11|    NULL|         4.94|    68.08|  66.18| 84.75|
|2017|   12|    NULL|         0.05|    69.03|  69.95| 87.31|
|2018|    1|    NULL|         1.73|    68.88|  73.97| 80.19|
|2018|    2|    88.0|         0.75|    74.98|  75.07|  87.5|
|2018|    3|    89.0|         16.2|    74.05|  72.64| 96.81|
|2018|    4|   97.57|          0.0|     80.0|  86.83|105.49|
|2018|    5|   101.0|          0.0|    100.9|  94.97|123.91|
|2018|    6|    95.0|          0.0|    84.83| 103.01| 119.3|
|2018|    7|    79.0|         2.69|    84.86|  87.78|123.71|
|2018|    8|    NULL|   

# 5.	Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость. Отфильтрованы постоянные гости, рассчитана выручка по каждому, выбраны ТОП-5, посчитана доля каждого в общей выручке, отображены имя/ID, сумма, доля.

In [12]:
from pyspark.sql.window import Window

regular_guests = (
    df
    .filter((F.col("repeated_guest") == 1) & (F.col("status") == "Not_Canceled"))
    .groupBy("ID")
    .agg(F.sum("avg_room_price").alias("guest_revenue"))
)

w_all = Window.partitionBy()
regular_with_share = (
    regular_guests
    .withColumn(
        "revenue_share",
        F.col("guest_revenue") / F.sum("guest_revenue").over(w_all)
    )
)

top5_regular_guests = (
    regular_with_share
    .orderBy(F.col("guest_revenue").desc())
    .limit(5)
)

top5_regular_guests.show(truncate=False)


+--------+-------------+---------------------+
|ID      |guest_revenue|revenue_share        |
+--------+-------------+---------------------+
|INN09923|220.0        |0.0036840535574309626|
|INN35697|174.0        |0.002913751449968125 |
|INN25587|164.0        |0.002746294470084899 |
|INN35599|159.0        |0.0026625659801432864|
|INN06463|159.0        |0.0026625659801432864|
+--------+-------------+---------------------+



# 6.	Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек. Реализация полная: для всех дат в диапазоне бронирований отображено количество гостей и процент загрузки, включая нули; оптимизировано по производительности, корректная сортировка, учтен формат даты.

In [13]:
bookings_with_guests = df.withColumn(
    "guests", F.col("n_adults") + F.col("n_children")
).filter(F.col("status") == "Not_Canceled")

valid_bookings = (
    bookings_with_guests
    .filter((F.col("month") >= 1) & (F.col("month") <= 12))
    .filter((F.col("date") >= 1) & (F.col("date") <= 31))
    .filter(~((F.col("month") == 2) & (F.col("date") > 29)))
    .filter(
        ~(
            F.col("month").isin([4, 6, 9, 11]) &
            (F.col("date") > 30)
        )
    )
    .filter(
        ~(
            (F.col("month") == 2) &
            (F.col("date") == 29) &
            (
                (F.col("year") % 400 != 0) &
                ~((F.col("year") % 4 == 0) & (F.col("year") % 100 != 0))
            )
        )
    )
)

valid_bookings = valid_bookings.withColumn(
    "checkin_dt",
    F.to_date(
        F.concat_ws(
            "-",
            F.col("year").cast("string"),
            F.lpad(F.col("month").cast("string"), 2, "0"),
            F.lpad(F.col("date").cast("string"), 2, "0")
        ),
        "yyyy-MM-dd"
    )
)

week_days = (
    valid_bookings
    .filter(F.col("week_nights") > 0)
    .select(
        "guests",
        F.explode(
            F.sequence(
                F.col("checkin_dt"),
                F.expr("date_add(checkin_dt, week_nights - 1)"),
                F.expr("interval 1 day")
            )
        ).alias("stay_dt")
    )
)

weekend_days = (
    valid_bookings
    .filter(F.col("weekend_nights") > 0)
    .select(
        "guests",
        F.explode(
            F.sequence(
                F.expr("date_add(checkin_dt, week_nights)"),
                F.expr("date_add(checkin_dt, week_nights + weekend_nights - 1)"),
                F.expr("interval 1 day")
            )
        ).alias("stay_dt")
    )
)

stays_by_day = week_days.unionByName(weekend_days)

daily_guests = (
    stays_by_day
    .groupBy("stay_dt")
    .agg(F.sum("guests").alias("total_guests"))
)

calendar_dates = (
    calendar_df
    .withColumn("cal_dt", F.to_date("calendar_dt", "yyyy-dd-MM"))
    .select("cal_dt")
)

daily_occupancy = (
    calendar_dates
    .join(daily_guests, calendar_dates.cal_dt == daily_guests.stay_dt, "left")
    .select(
        F.date_format("cal_dt", "yyyy-MM-dd").alias("date"),
        F.coalesce(F.col("total_guests"), F.lit(0)).alias("total_guests")
    )
    .withColumn(
        "load_pct",
        F.round(F.col("total_guests") / F.lit(400) * 100, 2)
    )
    .orderBy(F.col("date").desc())
)

daily_occupancy.show(500, truncate=False)


+----------+------------+--------+
|date      |total_guests|load_pct|
+----------+------------+--------+
|2018-12-31|562         |140.5   |
|2018-12-30|572         |143.0   |
|2018-12-29|542         |135.5   |
|2018-12-28|507         |126.75  |
|2018-12-27|552         |138.0   |
|2018-12-26|422         |105.5   |
|2018-12-25|397         |99.25   |
|2018-12-24|373         |93.25   |
|2018-12-23|341         |85.25   |
|2018-12-22|282         |70.5    |
|2018-12-21|247         |61.75   |
|2018-12-20|240         |60.0    |
|2018-12-19|228         |57.0    |
|2018-12-18|258         |64.5    |
|2018-12-17|274         |68.5    |
|2018-12-16|254         |63.5    |
|2018-12-15|170         |42.5    |
|2018-12-14|155         |38.75   |
|2018-12-13|153         |38.25   |
|2018-12-12|167         |41.75   |
|2018-12-11|240         |60.0    |
|2018-12-10|573         |143.25  |
|2018-12-09|588         |147.0   |
|2018-12-08|530         |132.5   |
|2018-12-07|403         |100.75  |
|2018-12-06|347     