In [23]:
!pip install pyspark



In [30]:
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [31]:
import findspark
findspark.init()
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
from pyspark.sql import functions as f
from google.colab import files
import pandas as pd

In [25]:
uploaded = files.upload()

Saving Hotel.csv to Hotel (2).csv


In [26]:
df = pd.read_csv(next(iter(uploaded.keys())))

In [27]:
spark = SparkSession \
    .builder \
    .appName("MySparkApp") \
    .master("local[*]") \
    .getOrCreate()

In [37]:
df = spark.read.option("header", True).option("sep", ",").csv("Hotel.csv")

df = df.withColumn("n_adults", df["n_adults"].cast("integer"))
df = df.withColumn("n_children", df["n_children"].cast("integer"))
df = df.withColumn("weekend_nights", df["weekend_nights"].cast("integer"))
df = df.withColumn("week_nights", df["week_nights"].cast("integer"))
df = df.withColumn("car_parking_space", df["car_parking_space"].cast("integer"))
df = df.withColumn("lead_time", df["lead_time"].cast("integer"))
df = df.withColumn("year", df["year"].cast("integer"))
df = df.withColumn("month", df["month"].cast("integer"))
df = df.withColumn("date", df["date"].cast("integer"))
df = df.withColumn("repeated_guest", df["repeated_guest"].cast("integer"))
df = df.withColumn("previous_cancellations", df["previous_cancellations"].cast("integer"))
df = df.withColumn("previous_bookings_not_canceled", df["previous_bookings_not_canceled"].cast("integer"))
df = df.withColumn("avg_room_price", df["avg_room_price"].cast("double"))
df = df.withColumn("special_requests", df["special_requests"].cast("integer"))

df.show(5)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

In [36]:
df.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



In [29]:
# Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.
# Описание таблицы calendar:
# Поле	Описание
# calendar_dt	Дата в формате ‘YYYY-DD-MM’ - видимо это опечатка. Будем стандартную дату применять

calendar = spark.range(1).select(f.explode(f.sequence(f.to_date(f.lit("2017-01-01")),
                                                      f.to_date(f.lit("2018-12-31")),
                                                      f.expr("interval 1 day")))
                                  .alias("calendar_dt"))

calendar.show(5)

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
+-----------+
only showing top 5 rows


In [46]:
# Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований с детализацией по месяцам и годам)
df = df.withColumnRenamed("date", "day")
df = df.withColumn(
    'booked_total_nights',
    df["week_nights"] + df["weekend_nights"]
)
mean_nights_booked = df \
    .filter(f.col("status") == "Not_Canceled") \
    .groupBy("year", "month") \
    .agg(
        f.mean("booked_total_nights").alias("mean_booked_nights")
    ) \
    .orderBy(["year", "month"], ascending=[True, True]) \
    .withColumn("mean_booked_nights", f.round(f.col("mean_booked_nights"), 1))

mean_nights_booked.show()


+----+-----+------------------+
|year|month|mean_booked_nights|
+----+-----+------------------+
|2017|    7|               3.0|
|2017|    8|               2.7|
|2017|    9|               2.7|
|2017|   10|               2.7|
|2017|   11|               2.7|
|2017|   12|               3.0|
|2018|    1|               2.7|
|2018|    2|               2.7|
|2018|    3|               3.0|
|2018|    4|               2.9|
|2018|    5|               2.8|
|2018|    6|               2.6|
|2018|    7|               3.2|
|2018|    8|               3.2|
|2018|    9|               2.8|
|2018|   10|               2.9|
|2018|   11|               3.0|
|2018|   12|               3.3|
+----+-----+------------------+



In [48]:
# Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.
top_3_months = df \
    .filter(f.col("year") == 2018) \
    .groupBy("month") \
    .agg(
        f.count("*").alias("total_bookings"),
        f.sum(f.when(f.col("status") == "Canceled", 1).otherwise(0)).alias("canceled_bookings")
    ) \
    .withColumn(
        "cancellation_rate_percent",
        f.round((f.col("canceled_bookings")*100.0)/f.col("total_bookings"), 2)
    ) \
    .orderBy(f.col("cancellation_rate_percent").desc()) \
    .limit(3) \
    .select("month", "total_bookings", "canceled_bookings", "cancellation_rate_percent")

top_3_months.show()

+-----+--------------+-----------------+-------------------------+
|month|total_bookings|canceled_bookings|cancellation_rate_percent|
+-----+--------------+-----------------+-------------------------+
|    8|          2799|             1303|                    46.55|
|   10|          3404|             1578|                    46.36|
|    9|          2962|             1356|                    45.78|
+-----+--------------+-----------------+-------------------------+



In [49]:
# Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.
mean_leadtime_days_by_months = df \
    .filter(f.col("status") == "Not_Canceled") \
    .groupBy("year", "month") \
    .agg(
        f.round(f.avg("lead_time"), 2).alias("mean_lead_time_days")
    ) \
    .orderBy("year", "month")

mean_leadtime_days_by_months.show()

+----+-----+-------------------+
|year|month|mean_lead_time_days|
+----+-----+-------------------+
|2017|    7|             130.73|
|2017|    8|              35.08|
|2017|    9|              51.72|
|2017|   10|              55.89|
|2017|   11|              33.28|
|2017|   12|              46.75|
|2018|    1|              34.87|
|2018|    2|              30.53|
|2018|    3|              43.19|
|2018|    4|              62.49|
|2018|    5|              60.99|
|2018|    6|              70.64|
|2018|    7|              86.88|
|2018|    8|              83.09|
|2018|    9|              63.32|
|2018|   10|              73.24|
|2018|   11|              44.25|
|2018|   12|              69.75|
+----+-----+-------------------+



In [50]:
# Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней,
# и вывести это в виде сводной таблицы (PIVOT)
pivot_table = df \
    .filter(f.col("status") == "Not_Canceled") \
    .withColumn("total_nights", f.col("week_nights") + f.col("weekend_nights")) \
    .withColumn("revenue", f.col("avg_room_price") * f.col("total_nights")) \
    .groupBy("year", "month", "market_segment") \
    .agg(f.round(f.avg("revenue"), 2).alias("avg_revenue")) \
    .groupBy("year", "month") \
    .pivot("market_segment") \
    .agg(f.first("avg_revenue")) \
    .fillna(0) \
    .orderBy("year", "month")

print("Итоговая PIVOT таблица:")
pivot_table.show(12)


Итоговая PIVOT таблица:
+----+-----+--------+-------------+---------+-------+------+
|year|month|Aviation|Complementary|Corporate|Offline|Online|
+----+-----+--------+-------------+---------+-------+------+
|2017|    7|     0.0|         22.4|   113.75| 228.95|290.56|
|2017|    8|     0.0|         0.32|   156.42| 235.54|284.21|
|2017|    9|     0.0|        16.89|   177.83| 236.65|348.55|
|2017|   10|     0.0|         1.09|   180.26| 223.24|311.47|
|2017|   11|     0.0|        14.81|   102.97| 198.36|240.52|
|2017|   12|     0.0|         0.25|   141.11| 253.86|258.93|
|2018|    1|     0.0|         2.27|   113.03| 210.51|236.09|
|2018|    2|   352.0|         1.39|   115.06| 251.85|238.07|
|2018|    3|  118.33|        38.17|   142.39| 233.39|301.71|
|2018|    4|  321.81|          0.0|   108.42| 236.44|320.08|
|2018|    5|   262.5|          0.0|    229.5| 274.55|352.34|
|2018|    6|   247.0|          0.0|   148.13| 251.98|335.03|
+----+-----+--------+-------------+---------+-------+------+


In [54]:
# Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей.
# Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.
repeated_guests_bookings = df.filter(
    (f.col("status") == "Not_Canceled") &
    (f.col("repeated_guest") == 1)
)

repeated_guests_with_revenue = repeated_guests_bookings.withColumn(
    "total_nights", f.col("week_nights") + f.col("weekend_nights")
).withColumn(
    "revenue", f.col("avg_room_price") * f.col("total_nights")
)

revenue_per_guest = repeated_guests_with_revenue.groupBy("ID").agg(
    f.round(f.sum("revenue"), 2).alias("total_revenue")
)

total_revenue_repeated_guests = revenue_per_guest.agg(
    f.sum("total_revenue").alias("overall_revenue")
).collect()[0]["overall_revenue"]

top_5_repeated_guests = revenue_per_guest.withColumn(
    "revenue_share_percent",
    f.round((f.col("total_revenue")*100.0) / total_revenue_repeated_guests, 4)
).orderBy(
    f.col("total_revenue").desc()
).limit(5)

top_5_repeated_guests.show()
sum_of_shares = top_5_repeated_guests.agg(
    f.sum("revenue_share_percent").alias("total_share")
).collect()[0]["total_share"]

+--------+-------------+---------------------+
|      ID|total_revenue|revenue_share_percent|
+--------+-------------+---------------------+
|INN19235|       1754.4|                 1.55|
|INN05222|        690.0|               0.6096|
|INN14189|        665.0|               0.5875|
|INN09923|        660.0|               0.5831|
|INN25479|        650.0|               0.5743|
+--------+-------------+---------------------+



In [61]:
# Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует.
# Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

df_clean = df.filter(
    ~((f.col("year") == 2018) & (f.col("month") == 2) & (f.col("day") == 29))
)

# Основной расчет
result = df_clean \
    .filter(f.col("status") == "Not_Canceled") \
    .withColumn(
        "checkin_date",
        f.to_date(f.concat_ws("-", f.col("year"), f.lpad(f.col("month"), 2, "0"), f.lpad(f.col("day"), 2, "0")))
    ) \
    .withColumn("total_nights", f.col("week_nights") + f.col("weekend_nights")) \
    .withColumn("guests_count", f.col("n_adults") + f.col("n_children")) \
    .filter(f.col("total_nights") > 0) \
    .select(
        f.explode(
            f.expr("sequence(checkin_date, date_add(checkin_date, total_nights - 1), interval 1 day)")
        ).alias("stay_date"),
        "guests_count"
    ) \
    .groupBy("stay_date") \
    .agg(f.sum("guests_count").alias("total_guests")) \
    .join(calendar.withColumnRenamed("calendar_dt", "date"),
          f.col("stay_date") == f.col("date"), "right") \
    .select(
        f.col("date"),
        f.coalesce(f.col("total_guests"), f.lit(0)).alias("total_guests"),
        f.round((f.coalesce(f.col("total_guests"), f.lit(0)) * 100.0) / 400, 2).alias("occupancy_percent")
    ) \
    .orderBy(f.col("date").desc())

result.show(20)

+----------+------------+-----------------+
|      date|total_guests|occupancy_percent|
+----------+------------+-----------------+
|2018-12-31|         562|            140.5|
|2018-12-30|         572|            143.0|
|2018-12-29|         542|            135.5|
|2018-12-28|         507|           126.75|
|2018-12-27|         552|            138.0|
|2018-12-26|         422|            105.5|
|2018-12-25|         397|            99.25|
|2018-12-24|         373|            93.25|
|2018-12-23|         341|            85.25|
|2018-12-22|         282|             70.5|
|2018-12-21|         247|            61.75|
|2018-12-20|         240|             60.0|
|2018-12-19|         228|             57.0|
|2018-12-18|         258|             64.5|
|2018-12-17|         274|             68.5|
|2018-12-16|         254|             63.5|
|2018-12-15|         170|             42.5|
|2018-12-14|         155|            38.75|
|2018-12-13|         153|            38.25|
|2018-12-12|         167|       