# Домашнее задание 5. Анализ данных на Spark SQL

### Подготовка окружения.

In [1]:
%pip install pyspark findspark pandas

Note: you may need to restart the kernel to use updated packages.


In [2]:
from datetime import date, timedelta

import findspark
from pyspark.sql.functions import (
    col, avg, round, count, when, sum, first, concat_ws,
    lpad, to_date, explode, sequence, date_add,
)
from pyspark.sql.session import SparkSession
from pyspark.sql.types import (
    StructType, StructField, IntegerType,
    StringType, DecimalType,
)

### Инициализация сессии PySpark.

In [3]:
findspark.init()

spark = (
    SparkSession
    .builder
    .getOrCreate()
)

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/12 11:47:12 WARN Utils: Your hostname, OMEN, resolves to a loopback address: 127.0.1.1; using 192.168.0.143 instead (on interface wlo1)
25/12/12 11:47:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/12 11:47:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Создание таблицы Calendar, чтение данных.

In [4]:
filename = "Calendar.csv"
columns = ["calendar_dt"]
start_date = date(2017, 1, 1)
end_date = date(2018, 12, 31)
calendar = [columns]

with open(filename, "w", newline="") as csv_file:
    csv_file.write("calendar_dt\n")

    current_date = start_date
    while current_date <= end_date:
        csv_file.write(f"{current_date.strftime('%Y-%d-%m')}\n")
        current_date += timedelta(days=1)


In [5]:
hotel_schema = StructType([
    StructField("ID", StringType(), True),
    StructField("n_adults", IntegerType(), True),
    StructField("n_children", IntegerType(), True),
    StructField("weekend_nights", IntegerType(), True),
    StructField("week_nights", IntegerType(), True),
    StructField("meal_plan", StringType(), True),
    StructField("car_parking_space", IntegerType(), True),
    StructField("room_type", StringType(), True),
    StructField("lead_time", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("date", IntegerType(), True),
    StructField("market_segment", StringType(), True),
    StructField("repeated_guest", IntegerType(), True),
    StructField("previous_cancellations", IntegerType(), True),
    StructField("previous_bookings_not_canceled", IntegerType(), True),
    StructField("avg_room_price", DecimalType(10, 2), True),
    StructField("special_requests", IntegerType(), True),
    StructField("status", StringType(), True),
])

hotel_df = spark.read.csv("Hotel.csv", header=True, schema=hotel_schema)
hotel_df.show(5, 0)
hotel_df.printSchema()

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.00         |0      

In [6]:
calendar_df = spark.read.csv("Calendar.csv", header=True)

calendar_df = (
    calendar_df
    .withColumn(
        "calendar_dt",
        to_date(
            col("calendar_dt"),
            "yyyy-dd-MM",
        )
    )
)

calendar_df.show(5, 0)
calendar_df.printSchema()

+-----------+
|calendar_dt|
+-----------+
|2017-01-01 |
|2017-01-02 |
|2017-01-03 |
|2017-01-04 |
|2017-01-05 |
+-----------+
only showing top 5 rows
root
 |-- calendar_dt: date (nullable = true)



### Запросы.

#### 1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [7]:
(
    hotel_df
    .where(
        (col("status") != "Not_Canceled")
        & (col("status").isNotNull())
    )
    .groupBy(
        col("year"),
        col("month"),
    )
    .agg(
        round(avg(col("week_nights") + col("weekend_nights")), 2)
        .alias("avg_nights"),
    )
    .orderBy(
        col("year"),
        col("month"),
    )
    .show()
)

+----+-----+----------+
|year|month|avg_nights|
+----+-----+----------+
|2017|    7|      3.43|
|2017|    8|      3.71|
|2017|    9|      2.98|
|2017|   10|      2.64|
|2017|   11|      2.33|
|2017|   12|      4.23|
|2018|    1|      2.38|
|2018|    2|      3.43|
|2018|    3|      3.47|
|2018|    4|      3.02|
|2018|    5|      3.27|
|2018|    6|      2.91|
|2018|    7|      3.59|
|2018|    8|       3.6|
|2018|    9|      3.23|
|2018|   10|      3.07|
|2018|   11|      3.29|
|2018|   12|      4.23|
+----+-----+----------+



#### 2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [8]:
(
    hotel_df
    .where(
        (col("year") == 2018)
        & (col("year").isNotNull())
    )
    .groupBy(col("month"))
    .agg(
        count("*").alias("all_bookings"),
        sum(when(col("status") == "Canceled", 1).otherwise(0))
        .alias("canceled_bookings"),
    )
    .withColumn(
        "percent_of_canceling",
        round((col("canceled_bookings") / col("all_bookings")) * 100, 2)
    )
    .sort(col("percent_of_canceling").desc())
    .limit(3)
    .show()
)

+-----+------------+-----------------+--------------------+
|month|all_bookings|canceled_bookings|percent_of_canceling|
+-----+------------+-----------------+--------------------+
|    8|        2799|             1303|               46.55|
|   10|        3404|             1578|               46.36|
|    9|        2962|             1356|               45.78|
+-----+------------+-----------------+--------------------+



#### 3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

In [9]:
(
    hotel_df
    .where(
        (col("status") == "Not_Canceled")
        & (col("status").isNotNull())
    )
    .groupBy(col("month"))
    .agg(
        round(avg(col("lead_time")), 2)
        .alias("avg_lead_time"),
    )
    .orderBy(col("month"))
    .show()
)

+-----+-------------+
|month|avg_lead_time|
+-----+-------------+
|    1|        34.87|
|    2|        30.53|
|    3|        43.19|
|    4|        62.49|
|    5|        60.99|
|    6|        70.64|
|    7|        90.16|
|    8|        65.97|
|    9|        57.78|
|   10|        65.11|
|   11|        41.02|
|   12|        61.79|
+-----+-------------+



#### 4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [10]:
segment_avg = (
    hotel_df
    .where(
        (col("status") == "Not_Canceled")
        & (col("status").isNotNull())
    )
    .groupBy(
        col("year"),
        col("month"),
        col("market_segment"),
    )
    .agg(
        round(avg(col("avg_room_price") * (col("week_nights") + col("weekend_nights"))), 2)
        .alias("avg_month_total"),
    )
)

pivot_table = (
    segment_avg
    .groupBy(
        col("year"),
        col("month"),
    )
    .pivot("market_segment")
    .agg(first(col("avg_month_total")))
    .orderBy(
        col("year"),
        col("month"),
    )
)

pivot_table.fillna(0).show()

+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|    7|    0.00|        22.40|   113.75| 228.95|290.56|
|2017|    8|    0.00|         0.32|   156.42| 235.54|284.21|
|2017|    9|    0.00|        16.89|   177.83| 236.65|348.55|
|2017|   10|    0.00|         1.09|   180.26| 223.24|311.47|
|2017|   11|    0.00|        14.81|   102.97| 198.36|240.52|
|2017|   12|    0.00|         0.25|   141.11| 253.86|258.93|
|2018|    1|    0.00|         2.27|   113.03| 210.51|236.09|
|2018|    2|  352.00|         1.39|   115.06| 251.85|238.07|
|2018|    3|  118.33|        38.17|   142.40| 233.39|301.71|
|2018|    4|  321.81|         0.00|   108.42| 236.44|320.08|
|2018|    5|  262.50|         0.00|   229.50| 274.55|352.34|
|2018|    6|  247.00|         0.00|   148.13| 251.98|335.03|
|2018|    7|   79.00|         5.38|   146.99| 310.36|390.05|
|2018|    8|    0.00|   

#### 5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [11]:
avg_sum_mask = col("avg_room_price") * (col("week_nights") + col("weekend_nights"))

total_by_repeated_guest = (
    hotel_df
    .agg(
        sum(
            when(col("repeated_guest") == 1, avg_sum_mask)
            .otherwise(0)
        )
        .alias("total_by_repeated"),
    )
    .first()
)

total_by_repeated_guest: float = total_by_repeated_guest["total_by_repeated"]

(
    hotel_df
    .where(col("repeated_guest") == 1)
    .groupBy(col("ID"))
    .agg(
        sum(avg_sum_mask)
        .alias("total_by_guest"),
    )
    .withColumn(
        "percent_by_total_repeated_guest",
        round((col("total_by_guest") / total_by_repeated_guest) * 100, 2)
    )
    .orderBy(col("total_by_guest").desc())
    .limit(5)
    .show()
)


+--------+--------------+-------------------------------+
|      ID|total_by_guest|percent_by_total_repeated_guest|
+--------+--------------+-------------------------------+
|INN19235|       1754.40|                           1.51|
|INN05222|        690.00|                           0.59|
|INN14189|        665.00|                           0.57|
|INN09923|        660.00|                           0.57|
|INN25479|        650.00|                           0.56|
+--------+--------------+-------------------------------+



#### 6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [12]:
# 29.02.2018 - такой даты в календаре не существует и при попытке привести эти значения в to_date
# возникает ошибка "Text '2018-29-02' could not be parsed: Invalid date 'February 29' as '2018' is not a leap year.",
# поэтому для данного задания эти данные не будут учтены

without_unexpected_date = (
    hotel_df
    .where(
        ~((col("year") == 2018)
        & (col("month") == 2)
        & (col("date") == 29))
    )
)

booking_date_concat = (
    without_unexpected_date
    .where(
        (col("status") == "Not_Canceled")
        & (col("status").isNotNull())
    )
    .withColumn(
        "booking_date",
        to_date(
            concat_ws(
                "-",
                col("year").cast("string"),
                lpad(col("date").cast("string"), 2, "0"),
                lpad(col("month").cast("string"), 2, "0"),
            ),
            "yyyy-dd-MM",
        ),
    )
    .withColumn(
        "total_guests",
        col("n_adults") + col("n_children"),
    )
    .withColumn(
        "total_nights",
        col("weekend_nights") + col("week_nights"),
    )
)

add_bookings = (
    booking_date_concat
    .withColumn(
        "booking_day",
        explode(
            sequence(
                col("booking_date"),
                date_add(col("booking_date"), col("total_nights") - 1)
            )
        )
    )
)

booking_day_total_guests = (
    add_bookings
    .groupBy("booking_day")
    .agg(
        sum("total_guests").alias("total_guests_by_day")
    )
    .orderBy(col("total_guests_by_day").desc())
)

calendar_by_guests = (
    calendar_df
    .join(
        booking_day_total_guests,
        col("calendar_dt") == col("booking_day"),
        "left",
    )
    .withColumn(
        "fill_percent",
        round((col("total_guests_by_day") / 400) * 100, 2)
    )
    .orderBy(col("calendar_dt").desc())
    .select(
        col("calendar_dt"),
        col("total_guests_by_day"),
        col("fill_percent"),
    )
    .fillna(0)
    .show()
)


+-----------+-------------------+------------+
|calendar_dt|total_guests_by_day|fill_percent|
+-----------+-------------------+------------+
| 2018-12-31|                562|       140.5|
| 2018-12-30|                572|       143.0|
| 2018-12-29|                542|       135.5|
| 2018-12-28|                507|      126.75|
| 2018-12-27|                552|       138.0|
| 2018-12-26|                422|       105.5|
| 2018-12-25|                397|       99.25|
| 2018-12-24|                373|       93.25|
| 2018-12-23|                341|       85.25|
| 2018-12-22|                282|        70.5|
| 2018-12-21|                247|       61.75|
| 2018-12-20|                240|        60.0|
| 2018-12-19|                228|        57.0|
| 2018-12-18|                258|        64.5|
| 2018-12-17|                274|        68.5|
| 2018-12-16|                254|        63.5|
| 2018-12-15|                170|        42.5|
| 2018-12-14|                155|       38.75|
| 2018-12-13|