## Apache Spark

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format
from pyspark.sql.functions import col
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .enableHiveSupport() \
    .getOrCreate()

# Путь к CSV
csv_path = "Hotel.csv"

# Чтение CSV
df_logs = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(csv_path)


#Сохранение как Hive-таблицы logs_hotel
df_logs.write.mode("overwrite").saveAsTable("logs_hotel")


In [5]:
#Выведем первые 10 строк таблицы logs_hotel
spark.table("logs_hotel").show(10)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

In [19]:
calendar = spark.sql("""
    SELECT explode(
        sequence(
            to_date('2017-01-01'),
            to_date('2018-12-31'),
            interval 1 day
        )
    ) AS calendar_dt
""")

In [20]:
#Выведем первые 5 строк таблицы calendar
spark.table("calendar").show(5)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
+-----------+
only showing top 5 rows


In [198]:
#1. Среднее количество ночей, которые гости проводят в отеле
result_df = spark.sql("""
    SELECT
        year,
        month,
        ROUND(AVG(week_nights + weekend_nights), 2) AS avg_nights
    FROM logs_hotel
    WHERE status = "Canceled"
    GROUP BY year, month
    ORDER BY year, month
""")

result_df.show(10, truncate=False)

+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|7    |3.43      |
|2017|8    |3.71      |
|2017|9    |2.98      |
|2017|10   |2.64      |
|2017|11   |2.33      |
|2017|12   |4.23      |
|2018|1    |2.38      |
|2018|2    |3.43      |
|2018|3    |3.47      |
|2018|4    |3.02      |
+----+-----+----------+
only showing top 10 rows


In [199]:
#2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

#Фильтр по 2018 году
df_2018 = df_logs.filter(F.col("year") == 2018)

#Агрегация по месяцам
monthly_stats = (
    df_2018
    .groupBy("month")
    .agg(
        F.count("*").alias("total_bookings"),
        F.sum(
            F.when(F.col("status") == "Canceled", 1).otherwise(0)
        ).alias("canceled_bookings")
    )
)

#Расчёт процента отмен
monthly_stats = monthly_stats.withColumn(
    "cancel_percent",
    F.round(
        F.col("canceled_bookings") / F.col("total_bookings") * 100, 2
    )
)

#ТОП-3 месяца по проценту отмен
top3_months = (
    monthly_stats
    .orderBy(F.col("cancel_percent").desc())
    .limit(3)
)

top3_months.show()

+-----+--------------+-----------------+--------------+
|month|total_bookings|canceled_bookings|cancel_percent|
+-----+--------------+-----------------+--------------+
|    8|          2799|             1303|         46.55|
|   10|          3404|             1578|         46.36|
|    9|          2962|             1356|         45.78|
+-----+--------------+-----------------+--------------+



In [200]:
#3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

#Фильтрация подтверждённых броней
df_confirmed = df_logs.filter(F.col("status") == "Not_Canceled")

#Расчёт среднего lead_time по месяцам
result_df = (
    df_confirmed
    .groupBy("year", "month")
    .agg(
        F.round(F.avg("lead_time"), 2).alias("avg_lead_time_days")
    )
    .orderBy("year", "month")
)

result_df.show(10)

+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|    7|            130.73|
|2017|    8|             35.08|
|2017|    9|             51.72|
|2017|   10|             55.89|
|2017|   11|             33.28|
|2017|   12|             46.75|
|2018|    1|             34.87|
|2018|    2|             30.53|
|2018|    3|             43.19|
|2018|    4|             62.49|
+----+-----+------------------+
only showing top 10 rows


In [177]:
#4.Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

#Фильтрация подтверждённых броней
df_confirmed = (
    df_logs
    .filter(F.col("status") == "Not_Canceled")
    .withColumn(
        "revenue",
        (F.col("week_nights") + F.col("weekend_nights")) * F.col("avg_room_price")
    )
)

#Построение PIVOT-таблицы
pivot_df = (
    df_confirmed
    .groupBy("year", "month")
    .pivot("room_type")
    .agg(
        F.round(F.avg("revenue"), 2)
    )
    .orderBy("year", "month")
)

pivot_df.show(truncate=False)

+----+-----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|year|month|Room_Type 1|Room_Type 2|Room_Type 3|Room_Type 4|Room_Type 5|Room_Type 6|Room_Type 7|
+----+-----+-----------+-----------+-----------+-----------+-----------+-----------+-----------+
|2017|7    |238.28     |98.2       |NULL       |NULL       |0.0        |NULL       |NULL       |
|2017|8    |248.89     |84.82      |NULL       |140.61     |0.0        |315.13     |NULL       |
|2017|9    |265.36     |305.8      |NULL       |358.48     |167.1      |540.65     |639.0      |
|2017|10   |236.92     |369.47     |195.0      |311.76     |0.0        |561.25     |0.0        |
|2017|11   |181.89     |283.18     |NULL       |238.71     |45.0       |425.9      |0.0        |
|2017|12   |224.53     |182.9      |0.0        |300.89     |422.03     |600.12     |146.0      |
|2018|1    |200.97     |230.07     |NULL       |250.02     |322.38     |403.24     |0.67       |
|2018|2    |199.21     |221.33

In [178]:
#5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

#Фильтрация и расчёт выручки
df_revenue = (
    df_logs
    .filter(
        (F.col("status") == "Not_Canceled") &
        (F.col("repeated_guest") == 1)
    )
    .withColumn(
        "revenue",
        (F.col("week_nights") + F.col("weekend_nights")) * F.col("avg_room_price")
    )
)

#Суммарная выручка по каждому гостю
guest_revenue = (
    df_revenue
    .groupBy("ID")
    .agg(
        F.round(F.sum("revenue"), 2).alias("guest_revenue")
    )
)

#Общая выручка от постоянных гостей
total_revenue = guest_revenue.agg(
    F.sum("guest_revenue").alias("total_revenue")
).collect()[0]["total_revenue"]

#ТОП-5 гостей и их доля в общей выручке
top5_guests = (
    guest_revenue
    .orderBy(F.col("guest_revenue").desc())
    .limit(5)
    .withColumn(
        "revenue_share_percent",
        F.round(F.col("guest_revenue") / F.lit(total_revenue) * 100, 2)
    )
)

top5_guests.show()

+--------+-------------+---------------------+
|      ID|guest_revenue|revenue_share_percent|
+--------+-------------+---------------------+
|INN19235|       1754.4|                 1.55|
|INN05222|        690.0|                 0.61|
|INN14189|        665.0|                 0.59|
|INN09923|        660.0|                 0.58|
|INN25479|        650.0|                 0.57|
+--------+-------------+---------------------+



In [24]:
#6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

#Подготовка бронирований (с удалением нулевой даты)
df_bookings = (
    df_logs
    .filter(F.col("status") == "Not_Canceled")
    .filter(
        ~(
            (F.col("month") == 2) &
            (F.col("date") == 29) &
            (F.col("year") % 4 != 0)
        )
    )
    .withColumn(
        "check_in_date",
        F.make_date(
            F.col("year"),
            F.col("month"),
            F.col("date")
        )
    )
    .withColumn(
        "total_nights",
        F.col("week_nights") + F.col("weekend_nights")
    )
    .withColumn(
        "guests",
        F.col("n_adults") + F.col("n_children")
    )
)

#Разворачивание бронирований в дни проживания
df_stay_days = (
    df_bookings
    .withColumn(
        "stay_date",
        F.explode(
            F.sequence(
                F.col("check_in_date"),
                F.expr("date_add(check_in_date, total_nights - 1)")
            )
        )
    )
    .select("stay_date", "guests")
)

#Гости по дням
daily_guests = (
    df_stay_days
    .groupBy("stay_date")
    .agg(
        F.sum("guests").alias("total_guests")
    )
)

#Все дни, включая пустые дни
result_df = (
    calendar
    .join(
        daily_guests,
        calendar.calendar_dt == daily_guests.stay_date,
        how="left"
    )
    .select(
        calendar.calendar_dt.alias("date"),
        F.coalesce(F.col("total_guests"), F.lit(0)).alias("total_guests")
    )
)

#Процент загрузки (вместимость 400)
final_df = (
    result_df
    .withColumn(
        "occupancy_percent",
        F.round(F.col("total_guests") / F.lit(400) * 100, 2)
    )
    .orderBy(F.col("date").desc())
)

#Выведем 20 результатов
final_df.show(20, truncate=False)

+----------+------------+-----------------+
|date      |total_guests|occupancy_percent|
+----------+------------+-----------------+
|2018-12-31|562         |140.5            |
|2018-12-30|572         |143.0            |
|2018-12-29|542         |135.5            |
|2018-12-28|507         |126.75           |
|2018-12-27|552         |138.0            |
|2018-12-26|422         |105.5            |
|2018-12-25|397         |99.25            |
|2018-12-24|373         |93.25            |
|2018-12-23|341         |85.25            |
|2018-12-22|282         |70.5             |
|2018-12-21|247         |61.75            |
|2018-12-20|240         |60.0             |
|2018-12-19|228         |57.0             |
|2018-12-18|258         |64.5             |
|2018-12-17|274         |68.5             |
|2018-12-16|254         |63.5             |
|2018-12-15|170         |42.5             |
|2018-12-14|155         |38.75            |
|2018-12-13|153         |38.25            |
|2018-12-12|167         |41.75  