In [1]:
pip install pyspark



In [2]:
import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

spark = SparkSession.builder.appName('Test_01').config('spark.ui.port', '9911').config('spark.executor.memoryOverhead', '1G').config('spark.shuffle.service.enabled', 'true').config('spark.dynamicAllocation.enabled', 'true').config('spark.driver.extraClassPath', '/opt/spark/jars/sqljdbc42.jar').config('spark.executor.extraClassPath', '/opt/spark/jars/sqljdbc42.jar').getOrCreate()

In [3]:
spark

In [4]:
from pyspark.sql import functions as f


In [5]:
logs_hotel = spark.read.option("header",True).option("sep",",").csv("Hotel.csv")


In [6]:
logs_hotel

DataFrame[ID: string, n_adults: string, n_children: string, weekend_nights: string, week_nights: string, meal_plan: string, car_parking_space: string, room_type: string, lead_time: string, year: string, month: string, date: string, market_segment: string, repeated_guest: string, previous_cancellations: string, previous_bookings_not_canceled: string, avg_room_price: string, special_requests: string, status: string]

In [7]:
logs_hotel.count()

36275

In [8]:
logs_hotel.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: string (nullable = true)
 |-- n_children: string (nullable = true)
 |-- weekend_nights: string (nullable = true)
 |-- week_nights: string (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)
 |-- date: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: string (nullable = true)
 |-- previous_cancellations: string (nullable = true)
 |-- previous_bookings_not_canceled: string (nullable = true)
 |-- avg_room_price: string (nullable = true)
 |-- special_requests: string (nullable = true)
 |-- status: string (nullable = true)



In [9]:
print(logs_hotel.show())

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|            65|       

In [10]:
from datetime import datetime, timedelta

#Генерируем список всех дат с 2017-01-01 по 2018-12-31
date_list = []
current_date = datetime(2017, 1, 1)
end_date = datetime(2018, 12, 31)

while current_date <= end_date:
    date_list.append((current_date.strftime('%Y-%m-%d'),))
    current_date += timedelta(days=1)

calendar = spark.createDataFrame(date_list, ["calendar_dt"])

In [11]:
print(calendar.head(10))

print(calendar.tail(10))

[Row(calendar_dt='2017-01-01'), Row(calendar_dt='2017-01-02'), Row(calendar_dt='2017-01-03'), Row(calendar_dt='2017-01-04'), Row(calendar_dt='2017-01-05'), Row(calendar_dt='2017-01-06'), Row(calendar_dt='2017-01-07'), Row(calendar_dt='2017-01-08'), Row(calendar_dt='2017-01-09'), Row(calendar_dt='2017-01-10')]
[Row(calendar_dt='2018-12-22'), Row(calendar_dt='2018-12-23'), Row(calendar_dt='2018-12-24'), Row(calendar_dt='2018-12-25'), Row(calendar_dt='2018-12-26'), Row(calendar_dt='2018-12-27'), Row(calendar_dt='2018-12-28'), Row(calendar_dt='2018-12-29'), Row(calendar_dt='2018-12-30'), Row(calendar_dt='2018-12-31')]



Запрос 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)


In [12]:
from pyspark.sql.functions import col, avg

result = (logs_hotel
    .filter(logs_hotel.status == 'Not_Canceled')
    .withColumn("total_nights",
                col("weekend_nights").cast("int") + col("week_nights").cast("int"))
    .groupBy("year", "month")
    .agg(avg("total_nights").alias("avg_nights"))
    .orderBy("year", "month")
)

result.show()

+----+-----+------------------+
|year|month|        avg_nights|
+----+-----+------------------+
|2017|   10|2.7032898820608318|
|2017|   11|2.7241935483870967|
|2017|   12| 3.043046357615894|
|2017|    7|3.0166666666666666|
|2017|    8|2.7189384800965017|
|2017|    9|2.6550783912747105|
|2018|    1|2.7414141414141415|
|2018|   10|2.8910186199342824|
|2018|   11|2.9784511784511785|
|2018|   12|3.2521891418563924|
|2018|    2|2.6891679748822606|
|2018|    3|3.0392038600723765|
|2018|    4| 2.924755887421022|
|2018|    5|2.8054545454545456|
|2018|    6| 2.596757322175732|
|2018|    7|3.1938088829071334|
|2018|    8|3.1544117647058822|
|2018|    9| 2.786425902864259|
+----+-----+------------------+



Запрос 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год

In [13]:
from pyspark.sql.functions import when, round, desc

top3_months = (logs_hotel
    .filter(col("year") == 2018)
    .groupBy("month")
    .agg(round(avg(when(col("status") == "Canceled", 1).otherwise(0)) * 100, 2)
         .alias("cancellation_rate"))
    .orderBy(desc("cancellation_rate"))
    .limit(3)
)

top3_months.show()

+-----+-----------------+
|month|cancellation_rate|
+-----+-----------------+
|    8|            46.55|
|   10|            46.36|
|    9|            45.78|
+-----+-----------------+



Запрос 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней

In [14]:

avg_lead_time_detailed = (logs_hotel
    .filter(logs_hotel.status == 'Not_Canceled')
    .filter(col("lead_time").isNotNull())
    .groupBy("year", "month")
    .agg(round(avg(col("lead_time").cast("int")), 2).alias("avg_lead_time_days"))
    .orderBy("year", "month")
)

avg_lead_time_detailed.show()

+----+-----+------------------+
|year|month|avg_lead_time_days|
+----+-----+------------------+
|2017|   10|             55.89|
|2017|   11|             33.28|
|2017|   12|             46.75|
|2017|    7|            130.73|
|2017|    8|             35.08|
|2017|    9|             51.72|
|2018|    1|             34.87|
|2018|   10|             73.24|
|2018|   11|             44.25|
|2018|   12|             69.75|
|2018|    2|             30.53|
|2018|    3|             43.19|
|2018|    4|             62.49|
|2018|    5|             60.99|
|2018|    6|             70.64|
|2018|    7|             86.88|
|2018|    8|             83.09|
|2018|    9|             63.32|
+----+-----+------------------+



Запрос 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT)

In [15]:

pivot_revenue = (logs_hotel
    .filter(logs_hotel.status == 'Not_Canceled')
    .withColumn("total_nights",
                col("weekend_nights").cast("int") + col("week_nights").cast("int"))
    .withColumn("total_revenue",
                col("avg_room_price").cast("double") * col("total_nights"))
    .groupBy("year", "month")
    .pivot("market_segment")
    .agg(round(avg("total_revenue"), 2))
    .orderBy("year", "month")
)

pivot_revenue.show()

+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|   10|    NULL|         1.09|   180.26| 223.24|311.47|
|2017|   11|    NULL|        14.81|   102.97| 198.36|240.52|
|2017|   12|    NULL|         0.25|   141.11| 253.86|258.93|
|2017|    7|    NULL|         22.4|   113.75| 228.95|290.56|
|2017|    8|    NULL|         0.32|   156.42| 235.54|284.21|
|2017|    9|    NULL|        16.89|   177.83| 236.65|348.55|
|2018|    1|    NULL|         2.27|   113.03| 210.51|236.09|
|2018|   10|  313.04|         0.26|   145.33| 276.64|358.81|
|2018|   11|  159.75|         8.39|    153.9| 226.33|332.04|
|2018|   12|    NULL|          4.0|   138.07| 254.97|349.46|
|2018|    2|   352.0|         1.39|   115.06| 251.85|238.07|
|2018|    3|  118.33|        38.17|   142.39| 233.39|301.71|
|2018|    4|  321.81|          0.0|   108.42| 236.44|320.08|
|2018|    5|   262.5|   

Запрос 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость

In [16]:
from pyspark.sql.functions import sum, row_number
from pyspark.sql.window import Window

window_spec = Window.orderBy(desc("guest_total_revenue"))

top5_guests_window = (logs_hotel
    .filter(logs_hotel.status == 'Not_Canceled')
    .filter(col("repeated_guest") == 1)
    .withColumn("total_nights",
                col("weekend_nights").cast("int") + col("week_nights").cast("int"))
    .withColumn("total_revenue",
                col("avg_room_price").cast("double") * col("total_nights"))
    .groupBy("ID")
    .agg(sum("total_revenue").alias("guest_total_revenue"))
    .withColumn("rank", row_number().over(window_spec))
    .filter(col("rank") <= 5)
    .withColumn("revenue_share_percent",
                round((col("guest_total_revenue") /
                           sum("guest_total_revenue").over(Window.partitionBy())) * 100, 2))
    .select("ID", "guest_total_revenue", "revenue_share_percent", "rank")
    .orderBy("rank")
)

top5_guests_window.show()

+--------+-------------------+---------------------+----+
|      ID|guest_total_revenue|revenue_share_percent|rank|
+--------+-------------------+---------------------+----+
|INN19235| 1754.3999999999999|                 39.7|   1|
|INN05222|              690.0|                15.61|   2|
|INN14189|              665.0|                15.05|   3|
|INN09923|              660.0|                14.93|   4|
|INN25479|              650.0|                14.71|   5|
+--------+-------------------+---------------------+----+



Запрос 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек


In [17]:
from pyspark.sql.functions import  lpad, concat, lit, coalesce

daily_guests = (logs_hotel
    .filter(col("status") == "Not_Canceled")
    .withColumn("check_in_date",
                concat(
                    col("year").cast("string"),
                    lit("-"),
                    lpad(col("month").cast("string"), 2, "0"),
                    lit("-"),
                    lpad(col("date").cast("string"), 2, "0")
                ))
    .withColumn("total_guests",
                col("n_adults").cast("int") + col("n_children").cast("int"))
    .groupBy("check_in_date")
    .agg(sum("total_guests").alias("guests_count"))
)

daily_occupancy = (calendar
    .join(daily_guests, calendar.calendar_dt == daily_guests.check_in_date, "left")
    .withColumn("guests_count", coalesce(col("guests_count"), lit(0)))
    .withColumn("occupancy_rate",
                round((col("guests_count") / lit(400)) * 100, 2))
    .select("calendar_dt", "guests_count", "occupancy_rate")
    .orderBy(desc("calendar_dt"))
)

daily_occupancy.show()

+-----------+------------+--------------+
|calendar_dt|guests_count|occupancy_rate|
+-----------+------------+--------------+
| 2018-12-31|          67|         16.75|
| 2018-12-30|         166|          41.5|
| 2018-12-29|         162|          40.5|
| 2018-12-28|         134|          33.5|
| 2018-12-27|         263|         65.75|
| 2018-12-26|         117|         29.25|
| 2018-12-25|          84|          21.0|
| 2018-12-24|          98|          24.5|
| 2018-12-23|         113|         28.25|
| 2018-12-22|          89|         22.25|
| 2018-12-21|          91|         22.75|
| 2018-12-20|          64|          16.0|
| 2018-12-19|          68|          17.0|
| 2018-12-18|          83|         20.75|
| 2018-12-17|          82|          20.5|
| 2018-12-16|         124|          31.0|
| 2018-12-15|          53|         13.25|
| 2018-12-14|          44|          11.0|
| 2018-12-13|          54|          13.5|
| 2018-12-12|          51|         12.75|
+-----------+------------+--------