# Домашнее задание № 5: Анализ логов отеля на pyspark

## Шаг 0: Настройка окружения для работы с PySpark

In [1]:
import os
os.environ['JAVA_HOME'] = r'C:\Program Files\Eclipse Adoptium\jdk-17.0.17.10-hotspot'
os.environ['PATH'] = os.environ['JAVA_HOME'] + r'\bin;' + os.environ['PATH']
os.environ['HADOOP_HOME'] = r'C:\hadoop'
os.environ['PATH'] += r';C:\hadoop\bin'

## Шаг 1: Загрузка данных из CSV и подготовка DataFrame

In [22]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, expr, lpad, year, month, sum as sum_, count as count_,
    avg, round, when, desc, asc, date_add, sequence, explode, lit, coalesce
)
from pyspark.sql.types import DateType
import datetime

# Запуск сессии Spark
spark = (SparkSession.builder
    .master("local[*]")
    .appName("Hotel Logs Analysis")
    .config("spark.driver.memory", "6g")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .getOrCreate())

print("Spark запущен, версия:", spark.version)
spark.sparkContext.setLogLevel("ERROR")

# Загрузка данных
df_logs = spark.read.csv("Hotel.csv", header=True, inferSchema=True)

# Расчет количества некорректных дат с 29.02.2018 - високосного года не было
incorrect_dates = df_logs.filter((col("year") == 2018) & (col("month") == 2) & (col("date") == 29))
incorrect_count = incorrect_dates.count()
print(f"Найдено некорректных дат (29 февраля 2018): {incorrect_count} записей")

# Удаление некорректных дат из датафрейма
df_logs_clean = df_logs.filter(
    ~((col("year") == 2018) & (col("month") == 2) & (col("date") == 29))
)

# Преобразование колонок с годом, месяцем и числом в одну дату
df_logs_clean = df_logs_clean.withColumn(
    "arrival_date",
    to_date(expr("concat(year, '-', lpad(month, 2, '0'), '-', lpad(date, 2, '0'))"))
)
# Отображение 5 строк для компактности
df_logs.show(5, truncate=False)

rows_count = df_log_clean.count()
print(f"Количество строк в df_logs_clean: {rows_count}")

Spark запущен, версия: 4.0.1
Найдено некорректных дат (29 февраля 2018): 37 записей
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0      


## Шаг 2: Создание календаря (все даты 2017-01-01 – 2018-12-31)

In [47]:
start_date = datetime.date(2017, 1, 1)
end_date = datetime.date(2018, 12, 31)

df_calendar = spark.range(1).select(
    explode(
        sequence(
            lit(start_date).cast(DateType()), 
            lit(end_date).cast(DateType()), 
            expr("interval 1 day")
        )
    ).alias("calendar_dt")
)

df_calendar.orderBy(desc("calendar_dt")).show(730)

+-----------+
|calendar_dt|
+-----------+
| 2018-12-31|
| 2018-12-30|
| 2018-12-29|
| 2018-12-28|
| 2018-12-27|
| 2018-12-26|
| 2018-12-25|
| 2018-12-24|
| 2018-12-23|
| 2018-12-22|
| 2018-12-21|
| 2018-12-20|
| 2018-12-19|
| 2018-12-18|
| 2018-12-17|
| 2018-12-16|
| 2018-12-15|
| 2018-12-14|
| 2018-12-13|
| 2018-12-12|
| 2018-12-11|
| 2018-12-10|
| 2018-12-09|
| 2018-12-08|
| 2018-12-07|
| 2018-12-06|
| 2018-12-05|
| 2018-12-04|
| 2018-12-03|
| 2018-12-02|
| 2018-12-01|
| 2018-11-30|
| 2018-11-29|
| 2018-11-28|
| 2018-11-27|
| 2018-11-26|
| 2018-11-25|
| 2018-11-24|
| 2018-11-23|
| 2018-11-22|
| 2018-11-21|
| 2018-11-20|
| 2018-11-19|
| 2018-11-18|
| 2018-11-17|
| 2018-11-16|
| 2018-11-15|
| 2018-11-14|
| 2018-11-13|
| 2018-11-12|
| 2018-11-11|
| 2018-11-10|
| 2018-11-09|
| 2018-11-08|
| 2018-11-07|
| 2018-11-06|
| 2018-11-05|
| 2018-11-04|
| 2018-11-03|
| 2018-11-02|
| 2018-11-01|
| 2018-10-31|
| 2018-10-30|
| 2018-10-29|
| 2018-10-28|
| 2018-10-27|
| 2018-10-26|
| 2018-10-25|
| 2018

## Шаг 3: Выполнение запросов

### Запрос 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [28]:
df_confirmed = df_logs_clean.filter(col("status") == "Not_Canceled")
(df_confirmed
 .withColumn("total_nights", col("week_nights") + col("weekend_nights"))
 .groupBy("year", "month")
 .agg(round(avg("total_nights"), 2).alias("avg_nights"))
 .orderBy("year", "month")
 .show(24))

+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|    7|      3.02|
|2017|    8|      2.72|
|2017|    9|      2.66|
|2017|   10|       2.7|
|2017|   11|      2.72|
|2017|   12|      3.04|
|2018|    1|      2.74|
|2018|    2|      2.68|
|2018|    3|      3.04|
|2018|    4|      2.92|
|2018|    5|      2.81|
|2018|    6|       2.6|
|2018|    7|      3.19|
|2018|    8|      3.15|
|2018|    9|      2.79|
|2018|   10|      2.89|
|2018|   11|      2.98|
|2018|   12|      3.25|
+----+-----+----------+



### Запрос 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год

In [31]:
(df_logs_clean.filter(col("year") == 2018)
 .groupBy("month")
 .agg(
     count_("*").alias("total_bookings"),                                   
     sum_(when(col("status") == "Canceled", 1).otherwise(0)).alias("canceled")
 )
 .withColumn("cancel_percent", round(col("canceled") * 100 / col("total_bookings"), 2))
 .orderBy(desc("cancel_percent"))
 .limit(3)
 .show())

+-----+--------------+--------+--------------+
|month|total_bookings|canceled|cancel_percent|
+-----+--------------+--------+--------------+
|    8|          2799|    1303|         46.55|
|   10|          3404|    1578|         46.36|
|    9|          2962|    1356|         45.78|
+-----+--------------+--------+--------------+



### Запрос 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней

In [33]:
(df_confirmed
 .groupBy("year", "month")
 .agg(round(avg("lead_time"), 2).alias("avg_lead_time_days"))
 .orderBy("year", "month")
 .show(24))

+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|    7|            130.73|
|2017|    8|             35.08|
|2017|    9|             51.72|
|2017|   10|             55.89|
|2017|   11|             33.28|
|2017|   12|             46.75|
|2018|    1|             34.87|
|2018|    2|             30.27|
|2018|    3|             43.19|
|2018|    4|             62.49|
|2018|    5|             60.99|
|2018|    6|             70.64|
|2018|    7|             86.88|
|2018|    8|             83.09|
|2018|    9|             63.32|
|2018|   10|             73.24|
|2018|   11|             44.25|
|2018|   12|             69.75|
+----+-----+------------------+



### Запрос 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT)

In [41]:
# Вычисление выручки для каждого бронирования
revenue_df = (df_confirmed
    .withColumn("total_nights", col("week_nights") + col("weekend_nights"))
    .withColumn("total_revenue", col("avg_room_price") * col("total_nights"))
)

# Вычисление средней выручки по году, месяцу и типу бронирования
grouped_revenue = (revenue_df
    .groupBy("year", "month", "market_segment")
    .agg(
        round(avg("total_revenue"), 2).alias("avg_revenue"),
        count_("*").alias("booking_count")
    )
    .orderBy("year", "month", "market_segment")
)

print("Группированные данные (перед PIVOT):")
grouped_revenue.show(100)

# Создание PIVOT таблицы
pivot_result = (grouped_revenue
    .groupBy("year", "month")
    .pivot("market_segment")
    .agg(
        round(avg("avg_revenue"), 2).alias("avg_revenue")  
    )
    .orderBy("year", "month")
)

pivot_result.show(24)

Группированные данные (перед PIVOT):
+----+-----+--------------+-----------+-------------+
|year|month|market_segment|avg_revenue|booking_count|
+----+-----+--------------+-----------+-------------+
|2017|    7| Complementary|       22.4|            5|
|2017|    7|     Corporate|     113.75|            4|
|2017|    7|       Offline|     228.95|           77|
|2017|    7|        Online|     290.56|           34|
|2017|    8| Complementary|       0.32|           37|
|2017|    8|     Corporate|     156.42|           54|
|2017|    8|       Offline|     235.54|          287|
|2017|    8|        Online|     284.21|          451|
|2017|    9| Complementary|      16.89|           27|
|2017|    9|     Corporate|     177.83|          119|
|2017|    9|       Offline|     236.65|          694|
|2017|    9|        Online|     348.55|          627|
|2017|   10| Complementary|       1.09|           22|
|2017|   10|     Corporate|     180.26|          102|
|2017|   10|       Offline|     223.24|      

### Запрос 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость

In [42]:
repeated = df_confirmed.filter(col("repeated_guest") == 1)

guest_revenue = (repeated
 .withColumn("total_nights", col("week_nights") + col("weekend_nights"))
 .withColumn("revenue", col("avg_room_price") * col("total_nights"))
 .groupBy("ID")
 .agg(sum_("revenue").alias("total_revenue")))

total_repeated_rev = guest_revenue.agg(sum_("total_revenue")).collect()[0][0]

(guest_revenue
 .orderBy(desc("total_revenue"))
 .limit(5)
 .withColumn("share_percent", round(col("total_revenue") * 100 / total_repeated_rev, 2))
 .select("ID", "total_revenue", "share_percent")
 .show(truncate=False))

+--------+------------------+-------------+
|ID      |total_revenue     |share_percent|
+--------+------------------+-------------+
|INN19235|1754.3999999999999|1.56         |
|INN05222|690.0             |0.61         |
|INN14189|665.0             |0.59         |
|INN09923|660.0             |0.59         |
|INN25479|650.0             |0.58         |
+--------+------------------+-------------+



### Запрос 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек

In [46]:
from pyspark.sql.functions import col, sum as sum_, lit, coalesce, explode, sequence, date_add, expr, desc, when, round

df_valid = df_logs_clean.filter(col("status") == "Not_Canceled")

stays = (df_valid
    .withColumn("total_nights", col("week_nights") + col("weekend_nights"))
    .withColumn("num_guests", col("n_adults") + col("n_children"))
    .withColumn("departure_date", 
                when(col("total_nights") == 0, col("arrival_date"))
                .otherwise(date_add(col("arrival_date"), col("total_nights"))))
    .filter(col("arrival_date").isNotNull())
)
# Развертка по дням пребывания
stay_days = stays.select(
    explode(
        expr("sequence(arrival_date, departure_date, interval 1 day)")
    ).alias("stay_date"),
    col("num_guests")
)

# Агрегация гостей по дням
daily_guests = stay_days.groupBy("stay_date").agg(
    sum_("num_guests").alias("total_guests")
)

# Объединение с календарем
result = (df_calendar
    .join(daily_guests, df_calendar.calendar_dt == daily_guests.stay_date, "left")
    .select(
        col("calendar_dt").alias("date"),
        coalesce(col("total_guests"), lit(0)).alias("total_guests")
    )
    .withColumn("occupancy_percent", 
                round(col("total_guests") * 100.0 / lit(400), 2))
    .orderBy(desc("date"))
)

result.show(730)

+----------+------------+-----------------+
|      date|total_guests|occupancy_percent|
+----------+------------+-----------------+
|2018-12-31|         639|           159.75|
|2018-12-30|         708|            177.0|
|2018-12-29|         669|           167.25|
|2018-12-28|         686|            171.5|
|2018-12-27|         685|           171.25|
|2018-12-26|         514|            128.5|
|2018-12-25|         457|           114.25|
|2018-12-24|         439|           109.75|
|2018-12-23|         395|            98.75|
|2018-12-22|         336|             84.0|
|2018-12-21|         331|            82.75|
|2018-12-20|         292|             73.0|
|2018-12-19|         326|             81.5|
|2018-12-18|         357|            89.25|
|2018-12-17|         336|             84.0|
|2018-12-16|         294|             73.5|
|2018-12-15|         208|             52.0|
|2018-12-14|         197|            49.25|
|2018-12-13|         221|            55.25|
|2018-12-12|         291|       

In [None]:
spark.stop()