
# Домашнее задание 5. Анализ данных на Spark SQL

###  Цель домашней работы

Научиться работать с основными операторами Spark SQL, фильтровать таблицы по разным условиям, писать вложенные запросы, объединять таблицы.

**Первичный анализ данных для последующего создания schema_logs_hotel**

In [109]:
from google.colab import files

In [110]:
uploaded = files.upload()

Saving Hotel.csv to Hotel (1).csv


In [111]:
import pandas as pd

In [112]:
hotel_pd = pd.read_csv('Hotel.csv')
hotel_pd.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 36275 entries, 0 to 36274
Data columns (total 19 columns):
 #   Column                          Non-Null Count  Dtype  
---  ------                          --------------  -----  
 0   ID                              36275 non-null  object 
 1   n_adults                        36275 non-null  int64  
 2   n_children                      36275 non-null  int64  
 3   weekend_nights                  36275 non-null  int64  
 4   week_nights                     36275 non-null  int64  
 5   meal_plan                       36275 non-null  object 
 6   car_parking_space               36275 non-null  int64  
 7   room_type                       36275 non-null  object 
 8   lead_time                       36275 non-null  int64  
 9   year                            36275 non-null  int64  
 10  month                           36275 non-null  int64  
 11  date                            36275 non-null  int64  
 12  market_segment                  

In [113]:
hotel_pd.shape

(36275, 19)

In [114]:
hotel_pd.nunique()

Unnamed: 0,0
ID,36275
n_adults,5
n_children,6
weekend_nights,8
week_nights,18
meal_plan,4
car_parking_space,2
room_type,7
lead_time,352
year,2


**ОСНОВНОЕ ЗАДАНИЕ**

**1. Создание таблицы с использованием csv-файла.**

In [115]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [116]:
# Не забыть про установку!

!pip install pyspark
!pip install findspark



In [117]:
import findspark
findspark.init()

In [118]:
import pyspark
# from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import date, timedelta

In [119]:
spark = (
    SparkSession
    .builder
    .appName('hotel_calendar')
    .config('spark.ui.port', '9311')
    .config('spark.executor.memoryOverhead', '1G')
    .config('spark.shuffle.service.enabled', 'true')
    .config('spark.dynamicAllocation.enabled', 'true')
    .config('spark.driver.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')
    .config('spark.executor.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')
    .getOrCreate()
)

In [120]:
schema_logs_hotel = StructType([
    StructField("ID", StringType(), True),
    StructField("n_adults", IntegerType(), True),
    StructField("n_children", IntegerType(), True),
    StructField("weekend_nights", IntegerType(), True),
    StructField("week_nights", IntegerType(), True),
    StructField("meal_plan", StringType(), True),
    StructField("car_parking_space", IntegerType(), True),
    StructField("room_type", StringType(), True),
    StructField("lead_time", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("date", IntegerType(), True),
    StructField("market_segment", StringType(), True),
    StructField("repeated_guest", IntegerType(), True),
    StructField("previous_cancellations", IntegerType(), True),
    StructField("previous_bookings_not_canceled", IntegerType(), True),
    StructField("avg_room_price", FloatType(), True),
    StructField("special_requests", IntegerType(), True),
    StructField("status", StringType(), True)
])

In [121]:
logs_hotel = spark.read.option("header", True).option("sep", ",").schema(schema_logs_hotel).csv("Hotel.csv")

In [122]:
logs_hotel.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

In [123]:
logs_hotel.count()

36275

Проверим есть ли некорректные даты в таблице logs_hotel

In [124]:
# Проверяем какие есть годы в таблице logs_hotel
years = logs_hotel.select("year").distinct().orderBy("year").collect()
year_list = [row.year for row in years]

In [125]:
year_list

[2017, 2018]

In [126]:
# Проверяем дату 29 февраля
bad_feb_29 = logs_hotel.filter(
    (col("month") == 2) &
    (col("date") == 29) &
    (col("year").isin(year_list))
)

In [127]:
bad_feb_29.count()

37

In [128]:
# Проверяем дату 30 февраля
bad_feb_30 = logs_hotel.filter(
    (col("month") == 2) &
    (col("date") == 30) &
    (col("year").isin(year_list))
)

In [129]:
bad_feb_30.count()

0

In [130]:
# Проверяем даты, которые не могут быть 31 числом
months = [2, 4, 6, 9, 11]
bad_other = logs_hotel.filter(
    (col("month").isin(months)) &
    (col("date") == 31) &
    (col("year").isin(year_list))
)

In [131]:
bad_other.count()

0

Некорректной является дата 29 февраля в таблице logs_hotel

In [132]:
# Фильтруем некорректные даты
logs_hotel_clean = logs_hotel.filter(
    ~((col("month") == 2) & (col("date") == 29) & (col("year").isin(year_list)))
)

In [133]:
logs_hotel_clean.count()

36238

In [134]:
logs_hotel = logs_hotel_clean

In [135]:
logs_hotel.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: float (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



**2. Создать (сгенерировать) таблицу calendar, которая будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.**

In [136]:
first_date = date(2017, 1, 1)
last_date = date(2018, 12, 31)
total_days = (last_date - first_date).days + 1
dates = []
for i in range(total_days):
    current_date = first_date + timedelta(days=i)
    dates.append(current_date)

In [137]:
print(f"Начальная дата: {dates[0]}")
print(f"Конечная дата: {dates[-1]}")

Начальная дата: 2017-01-01
Конечная дата: 2018-12-31


In [138]:
calendar = spark.createDataFrame([(date,) for date in dates], ['calendar_dt'])

In [139]:
calendar.show(10, 0)

+-----------+
|calendar_dt|
+-----------+
|2017-01-01 |
|2017-01-02 |
|2017-01-03 |
|2017-01-04 |
|2017-01-05 |
|2017-01-06 |
|2017-01-07 |
|2017-01-08 |
|2017-01-09 |
|2017-01-10 |
+-----------+
only showing top 10 rows


In [140]:
calendar.count()

730

In [141]:
calendar.printSchema()

root
 |-- calendar_dt: date (nullable = true)



In [142]:
calendar.write.format("csv").option("header", "true").mode("overwrite").save("/content/calendar.csv")

**3. Выполнить следующие запросы:**

1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам).

In [143]:
# Фильтруем неотмененные бронирования и создаем поле total_nights
bookings_not_canceled = logs_hotel.filter(col("status") == "Not_Canceled") \
    .withColumn("total_nights", col("weekend_nights") + col("week_nights"))

In [144]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |total_nights|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0             

In [145]:
# Группируем по году и месяцу и вычисляем среднее количество ночей
avg_nights_by_year_month = bookings_not_canceled \
    .groupBy("year", "month") \
    .agg(
        round(avg(col("total_nights")), 2).alias("avg_nights")
    ) \
    .orderBy("year", "month")

In [146]:
avg_nights_by_year_month.show()

+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|    7|      3.02|
|2017|    8|      2.72|
|2017|    9|      2.66|
|2017|   10|       2.7|
|2017|   11|      2.72|
|2017|   12|      3.04|
|2018|    1|      2.74|
|2018|    2|      2.68|
|2018|    3|      3.04|
|2018|    4|      2.92|
|2018|    5|      2.81|
|2018|    6|       2.6|
|2018|    7|      3.19|
|2018|    8|      3.15|
|2018|    9|      2.79|
|2018|   10|      2.89|
|2018|   11|      2.98|
|2018|   12|      3.25|
+----+-----+----------+



2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [147]:
# Фильтруем данные за 2018 год
bookings_2018 = logs_hotel.filter(col("year") == 2018)

In [148]:
bookings_2018.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00002|2       |0         |2             |3          |Not Selected|0                |Room_Type 1|5        |2018|11   |6   |Online        |0             |0                     |0                             |106.68        |1      

In [149]:
# Вычисляем общее количество броней броней и количество отмененных броней по месяцам
quantity_cancel_month = bookings_2018.groupBy("month") \
    .agg(
        count("*").alias("total_bookings"),
        count(when(col("status") == "Canceled", 1)).alias("canceled_bookings")
    ) \
    .orderBy("month")

In [150]:
quantity_cancel_month.show()

+-----+--------------+-----------------+
|month|total_bookings|canceled_bookings|
+-----+--------------+-----------------+
|    1|          1014|               24|
|    2|          1667|              423|
|    3|          2358|              700|
|    4|          2736|              995|
|    5|          2598|              948|
|    6|          3203|             1291|
|    7|          2557|             1071|
|    8|          2799|             1303|
|    9|          2962|             1356|
|   10|          3404|             1578|
|   11|          2333|              848|
|   12|          2093|              380|
+-----+--------------+-----------------+



In [151]:
# Вычисляем процент отмен
cancel_percent = quantity_cancel_month.withColumn(
    "cancellation_rate_percent",
    round((col("canceled_bookings") / col("total_bookings")) * 100, 2)
)

In [152]:
cancel_percent.show()

+-----+--------------+-----------------+-------------------------+
|month|total_bookings|canceled_bookings|cancellation_rate_percent|
+-----+--------------+-----------------+-------------------------+
|    1|          1014|               24|                     2.37|
|    2|          1667|              423|                    25.37|
|    3|          2358|              700|                    29.69|
|    4|          2736|              995|                    36.37|
|    5|          2598|              948|                    36.49|
|    6|          3203|             1291|                    40.31|
|    7|          2557|             1071|                    41.89|
|    8|          2799|             1303|                    46.55|
|    9|          2962|             1356|                    45.78|
|   10|          3404|             1578|                    46.36|
|   11|          2333|              848|                    36.35|
|   12|          2093|              380|                    18

In [153]:
# Сортируем по убыванию процента и берем топ-3
top_3_months = cancel_percent.orderBy(col("cancellation_rate_percent").desc()).limit(3)

In [154]:
top_3_months.show()

+-----+--------------+-----------------+-------------------------+
|month|total_bookings|canceled_bookings|cancellation_rate_percent|
+-----+--------------+-----------------+-------------------------+
|    8|          2799|             1303|                    46.55|
|   10|          3404|             1578|                    46.36|
|    9|          2962|             1356|                    45.78|
+-----+--------------+-----------------+-------------------------+



3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

*Вариант 1 (через lead_time)*

In [155]:
# Фильтруем неотмененные бронирования
bookings_not_canceled = logs_hotel.filter(col("status") == "Not_Canceled")

In [156]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

In [157]:
# Группируем по году и месяцу заезда, вычисляем среднее значение lead_time
avg_lead_time_by_month = bookings_not_canceled.groupBy("year", "month") \
    .agg(
        round(avg(col("lead_time")), 1).alias("avg_lead_time")
    ) \
    .orderBy("year", "month")

In [158]:
avg_lead_time_by_month.show()

+----+-----+-------------+
|year|month|avg_lead_time|
+----+-----+-------------+
|2017|    7|        130.7|
|2017|    8|         35.1|
|2017|    9|         51.7|
|2017|   10|         55.9|
|2017|   11|         33.3|
|2017|   12|         46.7|
|2018|    1|         34.9|
|2018|    2|         30.3|
|2018|    3|         43.2|
|2018|    4|         62.5|
|2018|    5|         61.0|
|2018|    6|         70.6|
|2018|    7|         86.9|
|2018|    8|         83.1|
|2018|    9|         63.3|
|2018|   10|         73.2|
|2018|   11|         44.3|
|2018|   12|         69.8|
+----+-----+-------------+



   *Вариант 2 (с созданием дат заезда и бронирования и вычислением среднего количества дней между ними)*


In [159]:
# Фильтруем неотмененные бронирования
bookings_not_canceled = logs_hotel.filter(col("status") == "Not_Canceled")

In [160]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

In [161]:
# Создаeм дату заезда (arrival_date) из year, month, date
bookings_not_canceled = bookings_not_canceled.withColumn(
    "arrival_date",
    to_date(
        concat_ws("-",
            col("year"),
            lpad(col("month"), 2, "0"),  # добавляем ведущий ноль к месяцу
            lpad(col("date"), 2, "0")    # добавляем ведущий ноль к дню
        ),
        "yyyy-MM-dd"
    )
)

In [162]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0             

In [163]:
# Создаем датy бронирования (booking_date)
# booking_date = arrival_date - lead_time дней
bookings_not_canceled = bookings_not_canceled.withColumn(
    "booking_date",
    date_sub(col("arrival_date"), col("lead_time")) # вычитаем из даты заезда lead_time дней
)

In [164]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|booking_date|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0            

In [165]:
# Вычисляем разницу между датами в днях
bookings_not_canceled = bookings_not_canceled.withColumn(
    "days_between_arrival_booking",
    datediff(col("arrival_date"), col("booking_date"))
)

In [166]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+----------------------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|booking_date|days_between_arrival_booking|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+----------------------------+
|INN00001|2       |0         |1             |2          |Meal Plan 

In [167]:
# Группируем по году и месяцу заезда и вычисляем среднее время в днях
avg_days_by_month = bookings_not_canceled.groupBy("year", "month") \
    .agg(
        round(avg(col("days_between_arrival_booking")), 1).alias("avg_days_between_arrival_booking")
    ) \
    .orderBy("year", "month")

In [168]:
avg_days_by_month.show()

+----+-----+--------------------------------+
|year|month|avg_days_between_arrival_booking|
+----+-----+--------------------------------+
|2017|    7|                           130.7|
|2017|    8|                            35.1|
|2017|    9|                            51.7|
|2017|   10|                            55.9|
|2017|   11|                            33.3|
|2017|   12|                            46.7|
|2018|    1|                            34.9|
|2018|    2|                            30.3|
|2018|    3|                            43.2|
|2018|    4|                            62.5|
|2018|    5|                            61.0|
|2018|    6|                            70.6|
|2018|    7|                            86.9|
|2018|    8|                            83.1|
|2018|    9|                            63.3|
|2018|   10|                            73.2|
|2018|   11|                            44.3|
|2018|   12|                            69.8|
+----+-----+----------------------

4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [169]:
# Фильтруем неотмененные бронирования
bookings_not_canceled = logs_hotel.filter(col("status") == "Not_Canceled")

In [170]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

In [171]:
# Создаем поле total_nights
bookings_not_canceled = bookings_not_canceled.withColumn(
    "total_nights",
    col("weekend_nights") + col("week_nights")
)

In [172]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |total_nights|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0             

In [173]:
# Вычисляем общую выручку за бронирование
bookings_not_canceled = bookings_not_canceled.withColumn(
    "total_revenue",
    round(col("avg_room_price") * col("total_nights"), 2)
)

In [174]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+-------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |total_nights|total_revenue|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+-------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0         

In [175]:
# Группируем по году, месяцу и типу бронирования, вычисляем среднюю выручку
avg_revenue_by_market_segment = bookings_not_canceled.groupBy("year", "month", "market_segment") \
    .agg(
        round(avg(col("total_revenue")), 2).alias("avg_total_revenue")
    ) \
    .orderBy("year", "month", "market_segment")

In [176]:
avg_revenue_by_market_segment.show(10, 0)

+----+-----+--------------+-----------------+
|year|month|market_segment|avg_total_revenue|
+----+-----+--------------+-----------------+
|2017|7    |Complementary |22.4             |
|2017|7    |Corporate     |113.75           |
|2017|7    |Offline       |228.95           |
|2017|7    |Online        |290.56           |
|2017|8    |Complementary |0.32             |
|2017|8    |Corporate     |156.42           |
|2017|8    |Offline       |235.54           |
|2017|8    |Online        |284.21           |
|2017|9    |Complementary |16.89            |
|2017|9    |Corporate     |177.83           |
+----+-----+--------------+-----------------+
only showing top 10 rows


In [177]:
# Создаем pivot-таблицу (вариант 1)
pivot_table_1 = avg_revenue_by_market_segment.groupBy("year", "month") \
    .pivot("market_segment") \
    .agg(first(col("avg_total_revenue"))) \
    .orderBy("year", "month")

pivot_table_1 = pivot_table_1.fillna(0)

In [178]:
pivot_table_1.show()

+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|    7|     0.0|         22.4|   113.75| 228.95|290.56|
|2017|    8|     0.0|         0.32|   156.42| 235.54|284.21|
|2017|    9|     0.0|        16.89|   177.83| 236.65|348.55|
|2017|   10|     0.0|         1.09|   180.26| 223.24|311.47|
|2017|   11|     0.0|        14.81|   102.97| 198.36|240.52|
|2017|   12|     0.0|         0.25|   141.11| 253.86|258.93|
|2018|    1|     0.0|         2.27|   113.03| 210.51|236.09|
|2018|    2|   352.0|         1.12|   112.37| 253.51|234.31|
|2018|    3|  118.33|        38.17|   142.39| 233.39|301.71|
|2018|    4|  321.81|          0.0|   108.42| 236.44|320.08|
|2018|    5|   262.5|          0.0|    229.5| 274.55|352.34|
|2018|    6|   247.0|          0.0|   148.13| 251.98|335.03|
|2018|    7|    79.0|         5.38|   146.99| 310.36|390.05|
|2018|    8|     0.0|   

In [179]:
# Создаем pivot-таблицу (вариант 2)
avg_revenue_by_market_segment_new = avg_revenue_by_market_segment.withColumn(
    "year-month",
    concat(col("year"), lit("-"), lpad(col("month"), 2, "0"))
)

pivot_table_2 = avg_revenue_by_market_segment_new.groupBy("market_segment") \
    .pivot("year-month") \
    .agg(first(col("avg_total_revenue"))) \
    .orderBy("market_segment")

pivot_table_2 = pivot_table_2.fillna(0)

In [180]:
pivot_table_2.show()

+--------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|market_segment|2017-07|2017-08|2017-09|2017-10|2017-11|2017-12|2018-01|2018-02|2018-03|2018-04|2018-05|2018-06|2018-07|2018-08|2018-09|2018-10|2018-11|2018-12|
+--------------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|      Aviation|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|    0.0|  352.0| 118.33| 321.81|  262.5|  247.0|   79.0|    0.0| 361.67| 313.04| 159.75|    0.0|
| Complementary|   22.4|   0.32|  16.89|   1.09|  14.81|   0.25|   2.27|   1.12|  38.17|    0.0|    0.0|    0.0|   5.38|    0.0|  21.92|   0.26|   8.39|    4.0|
|     Corporate| 113.75| 156.42| 177.83| 180.26| 102.97| 141.11| 113.03| 112.37| 142.39| 108.42|  229.5| 148.13| 146.99| 206.75| 201.12| 145.33|  153.9| 138.07|
|       Offline| 228.95| 235.54| 2

5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [181]:
# Проверим уникальность ID еще раз
logs_hotel.select("ID").distinct().count()

36238

In [182]:
logs_hotel.count()

36238

Все ID уникальны

In [183]:
# Фильтруем только постоянных гостей (repeated_guest = 1)
repeated_guests = logs_hotel.filter(col("repeated_guest") == 1)

In [184]:
repeated_guests.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00089|2       |0         |0             |1          |Not Selected|0                |Room_Type 1|17       |2018|12   |10  |Online        |1             |0                     |5                             |95.0          |0      

In [185]:
# Создаем поле total_nights и total_revenue для каждого бронирования
repeated_guests = repeated_guests.withColumn(
    "total_nights",
    col("weekend_nights") + col("week_nights")
).withColumn(
    "total_revenue",
    round(col("avg_room_price") * col("total_nights"), 2)
)

In [186]:
repeated_guests.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+-------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |total_nights|total_revenue|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+-------------+
|INN00089|2       |0         |0             |1          |Not Selected|0                |Room_Type 1|17       |2018|12   |10  |Online        |1         

In [187]:
# Рассчитываем выручку для каждой брони постоянного гостя и сортируем по убыванию выручки
repeated_guests_revenue = repeated_guests.select(
    "ID",
    "total_revenue"
).orderBy(col("total_revenue").desc())

In [188]:
repeated_guests_revenue.show(10, 0)

+--------+-------------+
|ID      |total_revenue|
+--------+-------------+
|INN19235|1754.4       |
|INN05222|690.0        |
|INN14189|665.0        |
|INN09923|660.0        |
|INN25479|650.0        |
|INN25495|640.08       |
|INN28305|590.4        |
|INN17871|585.0        |
|INN24266|558.0        |
|INN07175|547.19       |
+--------+-------------+
only showing top 10 rows


In [189]:
# Выбираем ТОП-5 постоянных гостей по выручке
top_5_repeated_guests = repeated_guests_revenue.limit(5)

In [190]:
top_5_repeated_guests.show()

+--------+-------------+
|      ID|total_revenue|
+--------+-------------+
|INN19235|       1754.4|
|INN05222|        690.0|
|INN14189|        665.0|
|INN09923|        660.0|
|INN25479|        650.0|
+--------+-------------+



In [191]:
# Вычисляем общую выручку от всех постоянных гостей
total_revenue_all_repeated_guests = repeated_guests.agg(
    round(sum(col("total_revenue")), 2).alias("total_revenue_all")
).collect()[0]["total_revenue_all"]

In [192]:
total_revenue_all_repeated_guests

115032.9

In [193]:
# Добавляем колонку к топ-5 с долей в общей выручки от постоянных гостей
top_5_repeated_guests = top_5_repeated_guests.withColumn(
    "revenue_percent_from_repeated_guests",
    round((col("total_revenue") / total_revenue_all_repeated_guests) * 100, 2)
)

In [194]:
top_5_repeated_guests.show()

+--------+-------------+------------------------------------+
|      ID|total_revenue|revenue_percent_from_repeated_guests|
+--------+-------------+------------------------------------+
|INN19235|       1754.4|                                1.53|
|INN05222|        690.0|                                 0.6|
|INN14189|        665.0|                                0.58|
|INN09923|        660.0|                                0.57|
|INN25479|        650.0|                                0.57|
+--------+-------------+------------------------------------+



6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [195]:
# Фильтруем неотмененные бронирования
bookings_not_canceled = logs_hotel.filter(col("status") == "Not_Canceled")

In [196]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

In [197]:
# Создаeм дату заезда (arrival_date) из year, month, date
bookings_not_canceled = bookings_not_canceled.withColumn(
    "arrival_date",
    to_date(
        concat_ws("-",
            col("year"),
            lpad(col("month"), 2, "0"),  # добавляем ведущий ноль к месяцу
            lpad(col("date"), 2, "0")    # добавляем ведущий ноль к дню
        ),
        "yyyy-MM-dd"
    )
)

In [198]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0             

In [199]:
# Вычисляем общее количество ночей для каждого бронирования
bookings_not_canceled = bookings_not_canceled.withColumn(
    "total_nights",
    col("weekend_nights") + col("week_nights")
)

In [200]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|total_nights|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0            

In [201]:
# Вычисляем дату выезда (arrival_date + total_nights)
bookings_not_canceled = bookings_not_canceled.withColumn(
    "departure_date",
    date_add(col("arrival_date"), col("total_nights"))
)

In [202]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+--------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|total_nights|departure_date|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+--------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      

In [203]:
# Вычисляем общее количество гостей в бронировании
bookings_not_canceled = bookings_not_canceled.withColumn(
    "total_guests",
    col("n_adults") + col("n_children")
)

In [204]:
bookings_not_canceled.show(10, 0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+--------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |arrival_date|total_nights|departure_date|total_guests|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+------------+------------+--------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |

In [205]:
# Создаем по одной записи на каждый день пребывания с учетом количества гостей
daily_guests = bookings_not_canceled.filter(col("total_nights") > 0).select(
    "ID",
    "total_guests",
    expr("explode(sequence(arrival_date, date_add(arrival_date, total_nights - 1), interval 1 day))").alias("hotel_date")
)

In [206]:
daily_guests.show(10, 0)

+--------+------------+----------+
|ID      |total_guests|hotel_date|
+--------+------------+----------+
|INN00001|2           |2017-10-02|
|INN00001|2           |2017-10-03|
|INN00001|2           |2017-10-04|
|INN00002|2           |2018-11-06|
|INN00002|2           |2018-11-07|
|INN00002|2           |2018-11-08|
|INN00002|2           |2018-11-09|
|INN00002|2           |2018-11-10|
|INN00007|2           |2017-10-15|
|INN00007|2           |2017-10-16|
+--------+------------+----------+
only showing top 10 rows


In [207]:
# Считаем общее количество гостей на каждый день
# Умножаем количество записей на количество гостей в бронировании
daily_guests_count = daily_guests.groupBy("hotel_date") \
    .agg(sum("total_guests").alias("total_guests"))

In [208]:
daily_guests_count.show(10, 0)

+----------+------------+
|hotel_date|total_guests|
+----------+------------+
|2017-08-11|170         |
|2018-08-10|323         |
|2017-09-11|197         |
|2018-05-28|145         |
|2018-03-17|285         |
|2018-06-06|151         |
|2018-11-02|394         |
|2018-10-05|315         |
|2018-08-11|328         |
|2018-08-08|366         |
+----------+------------+
only showing top 10 rows


In [209]:
# Объединяем с календарем, чтобы включить дни без гостей
hotel_daily_stats = calendar.join(
    daily_guests_count,
    calendar.calendar_dt == daily_guests_count.hotel_date,
    "left"
).select(
    calendar.calendar_dt.alias("date"),
    coalesce(daily_guests_count.total_guests, lit(0)).alias("total_guests")
)

In [210]:
hotel_daily_stats.show(10, 0)

+----------+------------+
|date      |total_guests|
+----------+------------+
|2017-01-01|0           |
|2017-01-02|0           |
|2017-01-03|0           |
|2017-01-04|0           |
|2017-01-05|0           |
|2017-01-06|0           |
|2017-01-07|0           |
|2017-01-08|0           |
|2017-01-09|0           |
|2017-01-10|0           |
+----------+------------+
only showing top 10 rows


In [211]:
# Рассчитываем процент загрузки и сортируем по убыванию даты
hotel_daily_stats = hotel_daily_stats.withColumn(
    "occupancy_percent",
    round((col("total_guests") / 400) * 100, 2)
).orderBy(col("date").desc())

In [212]:
hotel_daily_stats.show(20, 0)

+----------+------------+-----------------+
|date      |total_guests|occupancy_percent|
+----------+------------+-----------------+
|2018-12-31|562         |140.5            |
|2018-12-30|572         |143.0            |
|2018-12-29|542         |135.5            |
|2018-12-28|507         |126.75           |
|2018-12-27|552         |138.0            |
|2018-12-26|422         |105.5            |
|2018-12-25|397         |99.25            |
|2018-12-24|373         |93.25            |
|2018-12-23|341         |85.25            |
|2018-12-22|282         |70.5             |
|2018-12-21|247         |61.75            |
|2018-12-20|240         |60.0             |
|2018-12-19|228         |57.0             |
|2018-12-18|258         |64.5             |
|2018-12-17|274         |68.5             |
|2018-12-16|254         |63.5             |
|2018-12-15|170         |42.5             |
|2018-12-14|155         |38.75            |
|2018-12-13|153         |38.25            |
|2018-12-12|167         |41.75  