In [None]:
pip install pyspark




In [None]:
pip install findspark



In [None]:
import findspark
findspark.init()

In [None]:
import pyspark
from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, DateType, BooleanType
from pyspark.sql import functions as f

In [None]:
spark = (
    SparkSession
    .builder
    .appName('hw_05')
    .config('spark.ui.port', '9311')
    .config('spark.executor.memoryOverhead', '1G')
    .config('spark.shuffle.service.enabled', 'true')
    .config('spark.dynmicAllocation.enabled', 'true')
    .getOrCreate()
    )

In [None]:
spark

In [None]:
!ls

sample_data


In [None]:
schema_logs_hotel = StructType(
    [
        StructField("ID", StringType(), True),
        StructField("n_adults", IntegerType(), True),
        StructField("n_children", IntegerType(), True),
        StructField("weekend_nights", IntegerType(), True),
        StructField("week_nights", IntegerType(), True),
        StructField("meal_plan", StringType(), True),
        StructField("car_parking_space", IntegerType(), True),
        StructField("room_type", StringType(), True),
        StructField("lead_time", IntegerType(), True),
        StructField("year", IntegerType(), True),
        StructField("month", IntegerType(), True),
        StructField("date", IntegerType(), True),
        StructField("market_segment", StringType(), True),
        StructField("repeated_guest", IntegerType(), True),
        StructField("previous_cancellations", IntegerType(), True),
        StructField("previous_bookings_not_canceled", IntegerType(), True),
        StructField("avg_room_price", DoubleType(), True),
        StructField("special_requests", IntegerType(), True),
        StructField("status", StringType(), True)
    ]
)

In [None]:
df = (
    spark.read
    .option("header",True)
    .option("sep",",")
    .schema(schema_logs_hotel)
    .csv("./sample_data/Hotel.csv")
    )

In [None]:
df_filtered = df.filter(~((f.col('month') == 2) & (f.col('date') == 29)))

In [None]:
df_filtered.show(10, truncate=False)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65.0          |0      

In [None]:
df.count()
df_filtered.count()
diff_df_count = df.count() - df_filtered.count()
diff_df_count

37

In [None]:
df_filtered.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



In [None]:
# Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31
df_calendar = spark.range(1).select(
    f.lit(None).cast('date').alias('calendar_dt'),
    f.explode(
        f.sequence(
            f.to_date(f.lit('2017-01-01')),
            f.to_date(f.lit('2018-12-31')),
            f.expr('interval 1 day')
        )
    ).alias('date_dt')
)

df_calendar.show(10, truncate=False)

+-----------+----------+
|calendar_dt|date_dt   |
+-----------+----------+
|NULL       |2017-01-01|
|NULL       |2017-01-02|
|NULL       |2017-01-03|
|NULL       |2017-01-04|
|NULL       |2017-01-05|
|NULL       |2017-01-06|
|NULL       |2017-01-07|
|NULL       |2017-01-08|
|NULL       |2017-01-09|
|NULL       |2017-01-10|
+-----------+----------+
only showing top 10 rows


In [None]:
df_filtered.select(f.col('status')).where(f.col('status')=='Not_Canceled').count()

24360

In [None]:
# Вычислить среднее количество ночей, которые гости проводят в отеле
# (только для подтвержденных бронирований, с детализацией по месяцам и годам)

df_nights_avg = (
    df_filtered.where(f.col('status') == 'Not_Canceled')
    .withColumn('all_nights', f.col('weekend_nights') + f.col('week_nights'))
    .groupBy('year', 'month')
    .agg(
        f.count('*').alias('booking_count'),
        f.avg('all_nights').alias('nights_avg'),
        f.sum('all_nights').alias('sum_nights')
    )
    .orderBy('year', 'month')
)

df_nights_avg.show(5, truncate=False)

+----+-----+-------------+------------------+----------+
|year|month|booking_count|nights_avg        |sum_nights|
+----+-----+-------------+------------------+----------+
|2017|7    |120          |3.0166666666666666|362       |
|2017|8    |829          |2.7189384800965017|2254      |
|2017|9    |1467         |2.6550783912747105|3895      |
|2017|10   |1611         |2.7032898820608318|4355      |
|2017|11   |620          |2.7241935483870967|1689      |
+----+-----+-------------+------------------+----------+
only showing top 5 rows


In [None]:
# Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.
df_top_3 = (
    df_filtered.where(f.col('year') == 2018)
    .groupBy('month')
    .agg(
        f.count('*').alias('all_bookings'),
        f.sum(
            f.when(f.col('status') == 'Canceled', 1).otherwise(0)
        ).alias('cancel_bookings')
    )
    .withColumn('cancel_rate_percent',
                (f.col('cancel_bookings') * 100.0 / f.col('all_bookings')))
    .orderBy(f.col('cancel_rate_percent').desc())
    .limit(3)
    .select('month', 'all_bookings', 'cancel_bookings', 'cancel_rate_percent')
)

df_top_3.show(5, truncate=False)

+-----+------------+---------------+-------------------+
|month|all_bookings|cancel_bookings|cancel_rate_percent|
+-----+------------+---------------+-------------------+
|8    |2799        |1303           |46.55234012147196  |
|10   |3404        |1578           |46.3572267920094   |
|9    |2962        |1356           |45.77987846049966  |
+-----+------------+---------------+-------------------+



In [None]:
# Вычислить среднее время на каждый месяц между бронированием и заездом в отель
# для подтвержденных броней

df_time_avg_per_month = (
    df_filtered.where(f.col('status') == 'Not_Canceled')
    .groupBy('year', 'month')
    .agg(
        f.count('*').alias('not_canceled_count'),
        f.avg('lead_time').alias('avg_lead_time')
    )
    .orderBy('year', 'month')
)

df_time_avg_per_month.show(truncate=False)

+----+-----+------------------+------------------+
|year|month|not_canceled_count|avg_lead_time     |
+----+-----+------------------+------------------+
|2017|7    |120               |130.73333333333332|
|2017|8    |829               |35.08082026537998 |
|2017|9    |1467              |51.72188139059305 |
|2017|10   |1611              |55.885164494103044|
|2017|11   |620               |33.28225806451613 |
|2017|12   |906               |46.746136865342166|
|2018|1    |990               |34.87171717171717 |
|2018|2    |1244              |30.272508038585208|
|2018|3    |1658              |43.19420989143546 |
|2018|4    |1741              |62.492820218265365|
|2018|5    |1650              |60.98909090909091 |
|2018|6    |1912              |70.63546025104603 |
|2018|7    |1486              |86.88021534320323 |
|2018|8    |1496              |83.08622994652407 |
|2018|9    |1606              |63.32316313823163 |
|2018|10   |1826              |73.24370208105148 |
|2018|11   |1485              |

In [None]:
# Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав
# по всем типам бронирования для подтвержденных броней, и вывести это в виде
# сводной таблицы (PIVOT).

df_revenue = (
    df_filtered.where(f.col('status') == 'Not_Canceled')
    .withColumn('all_nights', f.col('weekend_nights') + f.col('week_nights'))
    .withColumn('all_revenue', f.col('avg_room_price') * f.col('all_nights'))
)

pivot = (
    df_revenue
    .groupBy('year')
    .pivot('month')
    .agg(
        f.avg('all_revenue').alias('avg_revenue')
    )
    .orderBy('year')
)

pivot_transpose = (
    df_revenue
    .groupBy('month')
    .pivot('year')
    .agg(
        f.round(f.avg('all_revenue'), 2).alias('avg_revenue')
    )
    .orderBy('month')
).show(truncate=False)


+-----+------+------+
|month|2017  |2018  |
+-----+------+------+
|1    |NULL  |208.47|
|2    |NULL  |217.96|
|3    |NULL  |265.14|
|4    |NULL  |291.7 |
|5    |NULL  |315.17|
|6    |NULL  |287.8 |
|7    |233.96|355.16|
|8    |246.36|369.65|
|9    |275.66|339.48|
|10   |254.18|315.03|
|11   |197.65|290.17|
|12   |235.67|310.88|
+-----+------+------+



In [None]:
# Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все
# время, и показать их долю в общей выручке от постоянных гостей.
# Использовать уникальный идентификатор брони как уникальный идентификатор гостя,
# предположив, что 1 бронь = 1 гость.

guest_revenue = (
    df_filtered.where(f.col('repeated_guest') == 1)
    .where(f.col('status') == 'Not_Canceled')
    .withColumn('all_nights', f.col('weekend_nights') + f.col('week_nights'))
    .withColumn('booking_revenue', f.col('avg_room_price') * f.col('all_nights'))
    .groupBy('ID')
    .agg(
        f.count('*').alias('all_bookings'),
        f.round(f.sum('booking_revenue'), 2).alias('total'),
        f.round(f.avg('booking_revenue'), 2).alias('avg_revenue'),
        f.round(f.avg('all_nights'), 2).alias('avg_nights')
    )
    .orderBy(f.col('total').desc())
).limit(5).show(truncate=False)



+--------+------------+------+-----------+----------+
|ID      |all_bookings|total |avg_revenue|avg_nights|
+--------+------------+------+-----------+----------+
|INN19235|1           |1754.4|1754.4     |12.0      |
|INN05222|1           |690.0 |690.0      |10.0      |
|INN14189|1           |665.0 |665.0      |5.0       |
|INN09923|1           |660.0 |660.0      |3.0       |
|INN25479|1           |650.0 |650.0      |10.0      |
+--------+------------+------+-----------+----------+



In [None]:
# Вывести общее количество гостей на каждый день в отеле, отсортировав
# по убыванию дат, включая дни, когда отель пустует.
# Также рассчитать процент загрузки для каждого дня, если известно,
# что общая вместимость отеля 400 человек

df_filtered = (
    df_filtered
    .withColumn("all_guests", f.col("n_adults") + f.col("n_children"))
    )

df_filtered = (
    df_filtered
    .withColumn(
        "full_date",
        f.to_date(
            f.concat_ws("-", f.col("year"), f.col("month"), f.col("date"))
            )
        )
    )

daily_guests = (
    df_filtered
    .groupBy("full_date")
    .agg(f.sum("all_guests").alias("all_guests_in_day"))
    )
#df_filtered.show(2, 0)

min_date_row = df_filtered.agg({"full_date": "min"}).collect()[0]
max_date_row = df_filtered.agg({"full_date": "max"}).collect()[0]
min_date = min_date_row["min(full_date)"]
max_date = max_date_row["max(full_date)"]
num_days = (max_date - min_date).days + 1
date_range = (
    spark.range(num_days)
    .withColumn(
        "full_date", f.expr(f"date_add('{min_date}', cast(id as int))")
        ).drop("id")
        )

all_dates_with_guests = date_range.join(daily_guests, "full_date", "left_outer")

all_dates_with_guests = (
    all_dates_with_guests.fillna(0, subset=["all_guests_in_day"])
    .withColumn(
        "percent_occupancy %",
         (f.col("all_guests_in_day") / 400 * 100).cast("decimal(5,2)")
         ).orderBy(
             f.col("full_date").desc()
             )
         )

all_dates_with_guests.show(truncate=False)



+----------+-----------------+-------------------+
|full_date |all_guests_in_day|percent_occupancy %|
+----------+-----------------+-------------------+
|2018-12-31|89               |22.25              |
|2018-12-30|212              |53.00              |
|2018-12-29|216              |54.00              |
|2018-12-28|183              |45.75              |
|2018-12-27|325              |81.25              |
|2018-12-26|236              |59.00              |
|2018-12-25|114              |28.50              |
|2018-12-24|119              |29.75              |
|2018-12-23|152              |38.00              |
|2018-12-22|109              |27.25              |
|2018-12-21|106              |26.50              |
|2018-12-20|73               |18.25              |
|2018-12-19|93               |23.25              |
|2018-12-18|87               |21.75              |
|2018-12-17|91               |22.75              |
|2018-12-16|166              |41.50              |
|2018-12-15|66               |1