In [1]:
# Стандартные библиотеки
import datetime as dt
import traceback

# Сторонние библиотеки
import pandas as pd
from pymongo import MongoClient

# PySpark библиотеки
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import (
    ArrayType,
    BooleanType,
    DateType,
    IntegerType
)

# Инициализация Spark
spark = SparkSession.builder.appName("Hotel Data").getOrCreate()

try:
    # Чтение данных
    df = spark.read.csv("Hotel.csv", header=True, inferSchema=True)
    df.show(10)

    # Загрузка данных через pymongo, т.к. на моем сервере процессор без AVX
    # версия MongoDB 4.4 и не удалось подобрать Spark Connector
    client = MongoClient('mongodb://brasavar:sD58n36e@localhost:27017/')
    db = client['hotel_db']
    collection = db['bookings']

    # Очистка коллекции
    collection.delete_many({})

    # Конвертация и загрузка
    pandas_df = df.toPandas()
    print(len(pandas_df["ID"].unique()))
    data_dict = pandas_df.to_dict('records')

    if data_dict:
        batch_size = 100
        for i in range(0, len(data_dict), batch_size):
            collection.insert_many(data_dict[i:i+batch_size])

    # Создание индекса
    collection.create_index("ID", unique=True)

    # Проверка загрузки
    print(f"Загружено записей: {collection.count_documents({})}")
    print(f"Строк в датафрейме: {df.count()}")

except Exception as e:
    print(f"Ошибка: {e}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/14 16:36:13 WARN Utils: Your hostname, brasavar-garage, resolves to a loopback address: 127.0.1.1; using 192.168.1.57 instead (on interface enp6s0)
25/12/14 16:36:13 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/14 16:36:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

                                                                                

36275
Загружено записей: 36275
Строк в датафрейме: 36275


In [2]:
# Параметры календаря
start_date = "2017-01-01"
end_date = "2018-12-31"

# Получаем количество дней
num_days = spark.sql(f"""
    SELECT datediff('{end_date}', '{start_date}') + 1 as num_days
""").collect()[0]["num_days"]

# Создание календаря
calendar_df = spark.range(num_days) \
    .withColumn(
        "id_int",
        col("id").cast(IntegerType())  # Преобразуем в Integer
) \
    .withColumn(
        "calendar_dt",
        to_date(expr(f"date_add('{start_date}', id_int)"))
) \
    .select("calendar_dt")
calendar_df.show(3)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
+-----------+
only showing top 3 rows


In [3]:
# Создание нового столбца с одновременной фильтрацией
df_filtred = (df
              .withColumn('total_nights', col('weekend_nights') + col('week_nights'))
              # Только подтвержденные бронирования
              .filter(col('status') == 'Not_Canceled'))

# Среднее количесвто ночей по всем записям
overall_avg = df_filtred.select(avg('total_nights')).collect()[0][0]
print(f"\nСреднее количество ночей: {overall_avg:.2f}")

# Агрегация по месяцам и годам
result = (df_filtred
          .groupBy('year', 'month')
          .agg(count('*').alias('bookings'),
               round(avg('total_nights'), 2).alias('avg_nights'))
          .orderBy('year', 'month'))

result.show(50, False)


Среднее количество ночей: 2.89
+----+-----+--------+----------+
|year|month|bookings|avg_nights|
+----+-----+--------+----------+
|2017|7    |120     |3.02      |
|2017|8    |829     |2.72      |
|2017|9    |1467    |2.66      |
|2017|10   |1611    |2.7       |
|2017|11   |620     |2.72      |
|2017|12   |906     |3.04      |
|2018|1    |990     |2.74      |
|2018|2    |1274    |2.69      |
|2018|3    |1658    |3.04      |
|2018|4    |1741    |2.92      |
|2018|5    |1650    |2.81      |
|2018|6    |1912    |2.6       |
|2018|7    |1486    |3.19      |
|2018|8    |1496    |3.15      |
|2018|9    |1606    |2.79      |
|2018|10   |1826    |2.89      |
|2018|11   |1485    |2.98      |
|2018|12   |1713    |3.25      |
+----+-----+--------+----------+



In [4]:
# Расчет процента отмен по месяцам
# Оставляем только 2018 год
df_filtred = df.filter(col("year") == 2018)

# Группируем по месяцам
# Считаем количество отмен в месяц
# Добавляем новый столбец с расчетом процента отмен
# Соритруем по проценту отмен от ольшего к меньшему
result = df_filtred.groupBy("month") \
    .agg(
        count(when(col("status") == "Canceled", True)).alias(
            "cancelled_count"),
        count("*").alias("total_count")
) \
    .withColumn("cancel_procent_rate", col("cancelled_count") / col("total_count")) \
    .orderBy(col("cancel_procent_rate").desc())

# Форматируем результаты
result_formatted = result.select(
    # Меняем заголовок столбца
    col("month").alias("месяц"),
    # Округляем процеты до двух знаков после запятой и менямем заголовок столбца
    format_number("cancel_procent_rate", 2).alias("процент_отмен"),
    # Выводим столбец с количеством отмен
    col("cancelled_count").alias("количество отмен"),
    # Выводим столбец с общим количесвтом отмен (не меняя заголовок)
    "total_count"
)

print("ТОП-3 месяца по проценту отмененных броней за 2018 год:")
result_formatted.show(3, False)

ТОП-3 месяца по проценту отмененных броней за 2018 год:
+-----+-------------+----------------+-----------+
|месяц|процент_отмен|количество отмен|total_count|
+-----+-------------+----------------+-----------+
|8    |0.47         |1303            |2799       |
|10   |0.46         |1578            |3404       |
|9    |0.46         |1356            |2962       |
+-----+-------------+----------------+-----------+
only showing top 3 rows


In [5]:
# Только подтвержденные бронирования
df_filtred = df.filter(col('status') == 'Not_Canceled')

# Группируем по году и месяцу, выситваем среднее время между
# бронированием и заездом в месяце
# Соритируем сначала по году, потом по месяцу
result = df_filtred.groupBy("year", "month") \
    .agg(
        avg("lead_time").alias("avg_lead_time")
) \
    .orderBy("year", "month")

# Форматирование результатов (Название столбцов и округление)
formatted_result = result.select(
    col("year").alias("Год"),
    col("month").alias("месяц"),
    round(col("avg_lead_time"), 2).alias("среднее_время_между")
)

print("Среднее время между бронированием и прибытием:")
formatted_result.show(truncate=False)

Среднее время между бронированием и прибытием:
+----+-----+-------------------+
|Год |месяц|среднее_время_между|
+----+-----+-------------------+
|2017|7    |130.73             |
|2017|8    |35.08              |
|2017|9    |51.72              |
|2017|10   |55.89              |
|2017|11   |33.28              |
|2017|12   |46.75              |
|2018|1    |34.87              |
|2018|2    |30.53              |
|2018|3    |43.19              |
|2018|4    |62.49              |
|2018|5    |60.99              |
|2018|6    |70.64              |
|2018|7    |86.88              |
|2018|8    |83.09              |
|2018|9    |63.32              |
|2018|10   |73.24              |
|2018|11   |44.25              |
|2018|12   |69.75              |
+----+-----+-------------------+



In [6]:
# Только подтвержденные бронирования
df_filtred = df.filter(col('status') == 'Not_Canceled')

# Расчет общей выручки, с добавлением новоого столбца
df_filtred = df_filtred.withColumn(
    "total_revenue",
    col("avg_room_price") * (col("weekend_nights") + col("week_nights"))
)

# Группировка по годам и месяцам
revenue_df = df_filtred.groupBy("year", "month") \
    .agg(avg("total_revenue").alias("средняя_выручка"))

# Создание сводной таблицы
pivot_df = revenue_df.groupBy("year") \
    .pivot("month") \
    .agg(round(avg("средняя_выручка"), 2))

# Форматирование результатов
formatted_pivot = pivot_df.select(
    col("year").alias("Год"),
    *[round(col(str(m)), 2).alias(f"месяц_{m}") for m in range(1, 13)]
)

print("Сводная таблица выручки по месяцам:")
formatted_pivot.show(truncate=False)

Сводная таблица выручки по месяцам:
+----+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+
|Год |месяц_1|месяц_2|месяц_3|месяц_4|месяц_5|месяц_6|месяц_7|месяц_8|месяц_9|месяц_10|месяц_11|месяц_12|
+----+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+
|2018|208.47 |219.83 |265.14 |291.7  |315.17 |287.8  |355.16 |369.65 |339.48 |315.03  |290.17  |310.88  |
|2017|NULL   |NULL   |NULL   |NULL   |NULL   |NULL   |233.96 |246.36 |275.66 |254.18  |197.65  |235.67  |
+----+-------+-------+-------+-------+-------+-------+-------+-------+-------+--------+--------+--------+



In [7]:
# Только подтвержденные бронирования
df_filtred = df.filter((col('status') == 'Not_Canceled')
                       & (col("repeated_guest") == 1))

df_filtred = df_filtred.withColumn(
    "revenue",
    col("avg_room_price") * (col("weekend_nights") + col("week_nights"))
)

# Агрегация по ID гостя
guest_revenue = df_filtred.groupBy("ID") \
    .agg(
        sum("revenue").alias("total_revenue"),
        count("*").alias("booking_count")
)

# Расчет общей выручки от постоянных гостей
total_revenue = guest_revenue.select(sum("total_revenue")).collect()[0][0]

# Получение ТОП-5 гостей
top_guests = guest_revenue.orderBy(desc("total_revenue")) \
    .limit(5) \
    .withColumn(
        "revenue_share",
        (col("total_revenue") / total_revenue) * 100
)

# Форматирование результатов
result = top_guests.select(
    col("ID"),
    round(col("total_revenue"), 2).alias("общая_выручка"),
    round(col("revenue_share"), 2).alias("доля_в_общей_выручке")
)

print("ТОП-5 постоянных гостей по выручке:")
result.show(truncate=False)

ТОП-5 постоянных гостей по выручке:
+--------+-------------+--------------------+
|ID      |общая_выручка|доля_в_общей_выручке|
+--------+-------------+--------------------+
|INN19235|1754.4       |1.55                |
|INN05222|690.0        |0.61                |
|INN14189|665.0        |0.59                |
|INN09923|660.0        |0.58                |
|INN25479|650.0        |0.57                |
+--------+-------------+--------------------+



In [8]:
# В моей версии spark нет обработки високосного года и try_to_date
def safe_parse_date(year, month, day):
    try:
        year_int = int(year) if year else None
        month_int = int(month) if month else None
        day_int = int(day) if day else None

        if None in [year_int, month_int, day_int]:
            return None

        if month_int < 1 or month_int > 12 or day_int < 1 or day_int > 31:
            return None

        # Проверка февраля
        if month_int == 2:
            is_leap = (year_int % 4 == 0 and year_int %
                       100 != 0) or (year_int % 400 == 0)
            max_day = 29 if is_leap else 28
            if day_int > max_day:
                return None if day_int > 29 or not is_leap else dt.datetime(year_int, month_int, 28)

        # Проверка месяцев с 30 днями
        if month_int in [4, 6, 9, 11] and day_int > 30:
            return None

        return dt.datetime(year_int, month_int, day_int)
    except:
        return None


# Добавляем столбец с датой и убираем невалидные данные (там где не удалось преобразовать дату)
safe_date_udf = udf(safe_parse_date, DateType())
df = df.withColumn("date", safe_date_udf(
    col("year"), col("month"), col("date")))
df = df.filter(col("date").isNotNull())

# Добавляем колонки с полным количесвтом гостей и полным количеством ночей
df = df.withColumn("total_guests", col("n_adults") + col("n_children")) \
       .withColumn("total_nights", col("weekend_nights") + col("week_nights"))

# Диапазон дат в датасете (от, до)
min_date = df.agg(min("date")).collect()[0][0]
max_date = df.agg(max("date")).collect()[0][0]

# Создаем еще один календарь, для джойна
date_range = spark.range((max_date - min_date).days + 1) \
    .select(col("id").cast("int").alias("days_offset")) \
    .select(date_add(lit(min_date), col("days_offset")).alias("day"))

# Расчет загрузки по дням
load = date_range.join(
    df,
    (date_range.day >= df.date) &
    (date_range.day < date_add(df.date, df.total_nights)),
    "left"
).groupBy("day").agg(
    coalesce(sum("total_guests"), lit(0)).alias("гостей_в_день")
).withColumn(
    "процент_загрузки",
    (col("гостей_в_день") / 400 * 100).cast("decimal(5,2)")
).orderBy(col("day").desc())

# Вывод результатов
load.show(1000, False)

# Проверим все ли дни из диапазона попали в результат
date_range_count = date_range.count()
load_count = load.count()

print(f"Количество дней в диапазоне: {date_range_count}")
print(f"Количество дней в результатах: {load_count}")
print(f"Все ли дни включены: {date_range_count == load_count}")

# Вывести минимальное количество гостей
min_guests = load.agg(min("гостей_в_день")).collect()[0][0]
print(f"Минимальное количество гостей за день: {min_guests}")

                                                                                

+----------+-------------+----------------+
|day       |гостей_в_день|процент_загрузки|
+----------+-------------+----------------+
|2018-12-31|794          |198.50          |
|2018-12-30|822          |205.50          |
|2018-12-29|836          |209.00          |
|2018-12-28|774          |193.50          |
|2018-12-27|781          |195.25          |
|2018-12-26|611          |152.75          |
|2018-12-25|498          |124.50          |
|2018-12-24|457          |114.25          |
|2018-12-23|415          |103.75          |
|2018-12-22|336          |84.00           |
|2018-12-21|297          |74.25           |
|2018-12-20|292          |73.00           |
|2018-12-19|287          |71.75           |
|2018-12-18|321          |80.25           |
|2018-12-17|351          |87.75           |
|2018-12-16|328          |82.00           |
|2018-12-15|206          |51.50           |
|2018-12-14|184          |46.00           |
|2018-12-13|187          |46.75           |
|2018-12-12|205          |51.25 

                                                                                

Минимальное количество гостей за день: 24
