In [1]:
pip install pyspark



In [2]:
pip install findspark



In [3]:
from IPython.core.display import display, HTML
display(HTML("<style>.container {width: 90% !important; }</style>"))
import findspark
findspark.init()

In [4]:
import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import expr
spark = (
    SparkSession
    .builder
    .appName("Test_01")
    .config('spark.ui.port', '4050')
    .config('spark.executor.memoryOverhead', '1G')
    .config('spark.shuffle.service.enabled', 'true')
    .config('spark.dynamicAllocation.enabled', 'true')
    .getOrCreate()
)

from pyspark.sql.functions import *
from pyspark.sql.types import *

In [5]:
spark

## Шаг 1. Создание таблицы, используя csv-файл.

In [6]:
df = spark.read.option("header",True).option("sep",",").csv("Hotel.csv")

In [7]:
display(df)
display(df.printSchema())
display(df.show(5,False))

DataFrame[ID: string, n_adults: string, n_children: string, weekend_nights: string, week_nights: string, meal_plan: string, car_parking_space: string, room_type: string, lead_time: string, year: string, month: string, date: string, market_segment: string, repeated_guest: string, previous_cancellations: string, previous_bookings_not_canceled: string, avg_room_price: string, special_requests: string, status: string]

root
 |-- ID: string (nullable = true)
 |-- n_adults: string (nullable = true)
 |-- n_children: string (nullable = true)
 |-- weekend_nights: string (nullable = true)
 |-- week_nights: string (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- date: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: string (nullable = true)
 |-- previous_cancellations: string (nullable = true)
 |-- previous_bookings_not_canceled: string (nullable = true)
 |-- avg_room_price: string (nullable = true)
 |-- special_requests: string (nullable = true)
 |-- status: string (nullable = true)



None

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65            |0      

None

## Шаг 2. Генерация таблицы calendar, которая будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.

In [8]:
# диапазон дат
date_range_df = spark.createDataFrame([("2017-01-01", "2018-12-31")], ["start", "end"])

# календарь
calendar_df = date_range_df.select(
    expr("explode(sequence(to_date(start), to_date(end), interval 1 day)) as calendar_dt")
)

calendar_df.show(5,False)

+-----------+
|calendar_dt|
+-----------+
|2017-01-01 |
|2017-01-02 |
|2017-01-03 |
|2017-01-04 |
|2017-01-05 |
+-----------+
only showing top 5 rows


## Шаг 3. Выполнение запросов:

 - Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [9]:
print("=== Запрос 1: Среднее количество ночей ===")

confirmed_bookings = df.filter(col("status") != "Cancelled")
avg_nights_by_month_year = confirmed_bookings.withColumn(
    "total_nights",
    col("weekend_nights").cast("int") + col("week_nights").cast("int")
).groupBy("year", "month").agg(
    avg("total_nights").alias("avg_total_nights")
).orderBy("year", "month")

avg_nights_by_month_year.show()

=== Запрос 1: Среднее количество ночей ===
+----+-----+------------------+
|year|month|  avg_total_nights|
+----+-----+------------------+
|2017|   10|2.6926293779404076|
|2017|   11|2.7078825347758886|
|2017|   12|3.0711206896551726|
|2017|    7|3.2947658402203857|
|2017|    8|2.9003944773175543|
|2017|    9|2.6907216494845363|
|2018|    1| 2.732741617357002|
|2018|   10|2.9717978848413633|
|2018|   11|3.0921560222888984|
|2018|   12| 3.429049211657907|
|2018|    2|2.8773474178403755|
|2018|    3|3.1666666666666665|
|2018|    4|2.9590643274853803|
|2018|    5|2.9749807544264817|
|2018|    6| 2.724008741804558|
|2018|    7|3.3578412201798984|
|2018|    8| 3.360128617363344|
|2018|    9|2.9891964888588793|
+----+-----+------------------+



- Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [10]:
print("\n=== Запрос 2: ТОП-3 месяца по отменам в 2018 ===")

bookings_2018 = df.filter(col("year") == "2018")
monthly_stats = bookings_2018.groupBy("month").agg(
    count("*").alias("total_bookings"),
    sum(when(col("status") == "Cancelled", 1).otherwise(0)).alias("cancelled_bookings")
).withColumn(
    "cancellation_percentage",
    round((col("cancelled_bookings") * 100.0) / col("total_bookings"), 2)
).select("month", "cancellation_percentage").orderBy(col("cancellation_percentage").desc()).limit(3)

monthly_stats.show()


=== Запрос 2: ТОП-3 месяца по отменам в 2018 ===
+-----+-----------------------+
|month|cancellation_percentage|
+-----+-----------------------+
|    7|                    0.0|
|   11|                    0.0|
|    3|                    0.0|
+-----+-----------------------+



- Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.


In [11]:
print("\n=== Запрос 3: Среднее время от брони до заезда ===")

avg_lead_time = confirmed_bookings.withColumn(
    "lead_time_int", col("lead_time").cast("int")
).groupBy("year", "month").agg(
    avg("lead_time_int").alias("avg_lead_time_days")
).orderBy("year", "month")

avg_lead_time.show()


=== Запрос 3: Среднее время от брони до заезда ===
+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|   10| 66.25196027182436|
|2017|   11|34.425038639876355|
|2017|   12| 48.37823275862069|
|2017|    7|146.97796143250687|
|2017|    8| 42.25049309664694|
|2017|    9| 56.69254093389933|
|2018|    1| 34.89842209072978|
|2018|   10|124.78936545240893|
|2018|   11| 82.41191598799828|
|2018|   12| 87.50979455327281|
|2018|    2| 31.31338028169014|
|2018|    3| 47.13952502120441|
|2018|    4| 74.25255847953217|
|2018|    5| 84.63933795227098|
|2018|    6|100.41554792382142|
|2018|    7|111.91317950723504|
|2018|    8|115.65094676670239|
|2018|    9|119.36731937879811|
+----+-----+------------------+



- Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).


In [12]:
print("\n=== Запрос 4: Средняя выручка по типам бронирования ===")

revenue_by_segment = confirmed_bookings.withColumn(
    "avg_room_price_float", col("avg_room_price").cast("float")
).groupBy("year", "month", "market_segment").agg(
    avg("avg_room_price_float").alias("avg_revenue")
).orderBy("year", "month", "market_segment")

pivot_table = revenue_by_segment.groupBy("year", "month").pivot("market_segment").agg(first("avg_revenue"))
pivot_table.show()


=== Запрос 4: Средняя выручка по типам бронирования ===
+----+-----+------------------+-------------------+------------------+------------------+------------------+
|year|month|          Aviation|      Complementary|         Corporate|           Offline|            Online|
+----+-----+------------------+-------------------+------------------+------------------+------------------+
|2017|    7|              NULL|  7.466000366210937|              65.0|  94.6094610705347| 78.70657748462044|
|2018|    1|              NULL| 1.7333333333333334| 68.48745281291458|  73.9698407482949| 80.24478737513225|
|2018|    3|              89.0| 16.195652173913043|  74.4816765242708| 78.45033471912257| 99.18717891831447|
|2018|    6|              95.0|                0.0| 86.01666660501499|106.14490146205688|119.58873966123127|
|2018|   10|105.37931034482759|0.13043478260869565| 88.28815794827645| 98.25478179902687|120.27328314536658|
|2017|   12|              NULL|0.05000000074505806| 69.02763436430244| 

- Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [13]:
print("\n=== Запрос 5: ТОП-5 постоянных гостей по выручке ===")

repeat_guests_confirmed = df.filter((col("repeated_guest") == "1") & (col("status") != "Cancelled"))

# вычисляем выручку по каждой брони (ID) как avg_room_price * общее количество ночей
guest_revenue = repeat_guests_confirmed.withColumn(
    "total_nights",
    col("week_nights").cast("int") + col("weekend_nights").cast("int")
).withColumn(
    "booking_revenue",
    col("avg_room_price").cast("float") * col("total_nights")
).groupBy("ID").agg(
    sum("booking_revenue").alias("total_revenue")
)

# вычисляем общую выручку от постоянных гостей
total_repeat_guest_revenue = guest_revenue.agg(sum("total_revenue")).collect()[0][0]

# вычисляем процент и получаем ТОП-5
top_5_guests = guest_revenue.withColumn(
    "revenue_percentage",
    round((col("total_revenue") * 100.0) / total_repeat_guest_revenue, 2)
).orderBy(col("total_revenue").desc()).limit(5)

top_5_guests.show()


=== Запрос 5: ТОП-5 постоянных гостей по выручке ===
+--------+------------------+------------------+
|      ID|     total_revenue|revenue_percentage|
+--------+------------------+------------------+
|INN19235|1754.3999633789062|              1.51|
|INN05222|             690.0|              0.59|
|INN14189|             665.0|              0.57|
|INN09923|             660.0|              0.57|
|INN25479|             650.0|              0.56|
+--------+------------------+------------------+



- Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [14]:
print("\n=== Запрос 6: Загрузка отеля по дням ===")

guest_arrivals = confirmed_bookings.filter(
    (col("year").isNotNull()) &
    (col("month").isNotNull()) &
    (col("date").isNotNull())
).withColumn(
    "date_string",
    concat(
        col("year"),
        lit("-"),
        lpad(col("month"), 2, "0"),
        lit("-"),
        lpad(col("date"), 2, "0")
    )
).withColumn(
    "valid_date",
    expr("try_cast(date_string as date)")
).filter(
    col("valid_date").isNotNull()
).withColumn(
    "total_guests",
    col("n_adults").cast("int") + col("n_children").cast("int")
).select(
    date_format(col("valid_date"), "yyyy-MM-dd").alias("arrival_date_str"),
    "total_guests"
)

daily_guests = guest_arrivals.groupBy("arrival_date_str").agg(
    sum("total_guests").alias("daily_guests")
)

hotel_load = calendar_df.join(
    daily_guests,
    calendar_df.calendar_dt == daily_guests.arrival_date_str,
    "left"
).fillna(0, subset=["daily_guests"]).withColumnRenamed("daily_guests", "guest_count")

hotel_load_with_percentage = hotel_load.withColumn(
    "load_percentage",
    round((col("guest_count") * 100.0) / 400, 2)
).select(
    col("calendar_dt").alias("date"),
    col("guest_count"),
    col("load_percentage")
).orderBy(col("date").desc())

hotel_load_with_percentage.show(20)


=== Запрос 6: Загрузка отеля по дням ===
+----------+-----------+---------------+
|      date|guest_count|load_percentage|
+----------+-----------+---------------+
|2018-12-31|         89|          22.25|
|2018-12-30|        212|           53.0|
|2018-12-29|        216|           54.0|
|2018-12-28|        183|          45.75|
|2018-12-27|        325|          81.25|
|2018-12-26|        236|           59.0|
|2018-12-25|        114|           28.5|
|2018-12-24|        119|          29.75|
|2018-12-23|        152|           38.0|
|2018-12-22|        109|          27.25|
|2018-12-21|        106|           26.5|
|2018-12-20|         73|          18.25|
|2018-12-19|         93|          23.25|
|2018-12-18|         87|          21.75|
|2018-12-17|         91|          22.75|
|2018-12-16|        166|           41.5|
|2018-12-15|         66|           16.5|
|2018-12-14|         50|           12.5|
|2018-12-13|         61|          15.25|
|2018-12-12|         56|           14.0|
+----------+---