In [1]:
!pip install pyspark py4j



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import functions as f

In [3]:
spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_HW')\
        .getOrCreate()

In [4]:
spark

# Задание 1
Создать таблицу logs_hotel, используя csv-файл.




In [5]:
data = spark.read.csv(
    'Hotel.csv',
    sep=',',
    header=True,
    inferSchema=True
)

data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- n_adults: integer (nullable = true)
 |-- n_children: integer (nullable = true)
 |-- weekend_nights: integer (nullable = true)
 |-- week_nights: integer (nullable = true)
 |-- meal_plan: string (nullable = true)
 |-- car_parking_space: integer (nullable = true)
 |-- room_type: string (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- date: integer (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- avg_room_price: double (nullable = true)
 |-- special_requests: integer (nullable = true)
 |-- status: string (nullable = true)



In [6]:
data.createOrReplaceTempView("logs_hotel")

# Задание 2
Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.

In [7]:
spark.sql("""
          CREATE OR REPLACE TEMPORARY VIEW temp_dates AS
            (SELECT
              CAST('2017-01-01' AS DATE) AS start_date,
              CAST('2018-12-31' AS DATE) AS end_date
          );""").show()

++
||
++
++



In [8]:
spark.sql("SELECT * FROM temp_dates").show()

+----------+----------+
|start_date|  end_date|
+----------+----------+
|2017-01-01|2018-12-31|
+----------+----------+



In [9]:
spark.sql(
    """
      CREATE OR REPLACE TEMPORARY VIEW calendar AS
      (
        SELECT date_add(start_date, seq) AS calendar_dt
        FROM temp_dates
        LATERAL VIEW explode(sequence(0, datediff(end_date, start_date))) exploded_seq AS seq
      );
    """
).show()

++
||
++
++



In [10]:
spark.sql("SELECT * FROM calendar").show()

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
| 2017-01-06|
| 2017-01-07|
| 2017-01-08|
| 2017-01-09|
| 2017-01-10|
| 2017-01-11|
| 2017-01-12|
| 2017-01-13|
| 2017-01-14|
| 2017-01-15|
| 2017-01-16|
| 2017-01-17|
| 2017-01-18|
| 2017-01-19|
| 2017-01-20|
+-----------+
only showing top 20 rows


In [11]:
calendar_df = spark.sql("SELECT * FROM calendar")
calendar_df.show()

+-----------+
|calendar_dt|
+-----------+
| 2017-01-01|
| 2017-01-02|
| 2017-01-03|
| 2017-01-04|
| 2017-01-05|
| 2017-01-06|
| 2017-01-07|
| 2017-01-08|
| 2017-01-09|
| 2017-01-10|
| 2017-01-11|
| 2017-01-12|
| 2017-01-13|
| 2017-01-14|
| 2017-01-15|
| 2017-01-16|
| 2017-01-17|
| 2017-01-18|
| 2017-01-19|
| 2017-01-20|
+-----------+
only showing top 20 rows


# Запрос 1
Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [34]:
spark.sql(
    """
      select round(avg(weekend_nights + week_nights)) as total_avg_nights, month, year
      from logs_hotel
      where status='Not_Canceled'
      group by month, year
      order by year, month
    """
).show()

+----------------+-----+----+
|total_avg_nights|month|year|
+----------------+-----+----+
|             3.0|    7|2017|
|             3.0|    8|2017|
|             3.0|    9|2017|
|             3.0|   10|2017|
|             3.0|   11|2017|
|             3.0|   12|2017|
|             3.0|    1|2018|
|             3.0|    2|2018|
|             3.0|    3|2018|
|             3.0|    4|2018|
|             3.0|    5|2018|
|             3.0|    6|2018|
|             3.0|    7|2018|
|             3.0|    8|2018|
|             3.0|    9|2018|
|             3.0|   10|2018|
|             3.0|   11|2018|
|             3.0|   12|2018|
+----------------+-----+----+



In [35]:
not_cancelled_bookings = data.filter(f.col('status') == 'Not_Canceled')
not_cancelled_bookings.groupBy(['month', 'year'])\
  .agg(
      f.round(f.avg(
          f.col('weekend_nights') + f.col('week_nights')
      )).alias('total_avg_nights')
  )\
  .orderBy('year', 'month')\
  .show()


+-----+----+----------------+
|month|year|total_avg_nights|
+-----+----+----------------+
|    7|2017|             3.0|
|    8|2017|             3.0|
|    9|2017|             3.0|
|   10|2017|             3.0|
|   11|2017|             3.0|
|   12|2017|             3.0|
|    1|2018|             3.0|
|    2|2018|             3.0|
|    3|2018|             3.0|
|    4|2018|             3.0|
|    5|2018|             3.0|
|    6|2018|             3.0|
|    7|2018|             3.0|
|    8|2018|             3.0|
|    9|2018|             3.0|
|   10|2018|             3.0|
|   11|2018|             3.0|
|   12|2018|             3.0|
+-----+----+----------------+



# Запрос 2
Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [27]:
spark.sql(
    """
    with cancels as (
        select count(*) as sum_bookings, month
        from logs_hotel
        where status='Canceled'
        and year = 2018
        group by month
        order by month
      ),
      totals as (
        select count(*) as sum_bookings, month
        from logs_hotel
        where year = 2018
        group by month
        order by month
      )
      select round((c.sum_bookings / t.sum_bookings) * 100, 2) as cancel_percent, c.month
      from cancels c
      join totals t using (month)
      order by cancel_percent desc
      limit 3
    """
).show()

+--------------+-----+
|cancel_percent|month|
+--------------+-----+
|         46.55|    8|
|         46.36|   10|
|         45.78|    9|
+--------------+-----+



In [28]:
data.filter(f.col('year') == 2018)\
  .groupBy('month')\
  .agg(
      f.count('*').alias("total"),
      f.sum(f.when(f.col("status") == 'Canceled', 1).otherwise(0)).alias("canceled")
  )\
  .select([
      f.round((f.col("canceled") / f.col("total")) * 100, 2).alias("percent"),
      'month'
      ])\
  .orderBy("percent", ascending=False)\
  .limit(3)\
  .show()

+-------+-----+
|percent|month|
+-------+-----+
|  46.55|    8|
|  46.36|   10|
|  45.78|    9|
+-------+-----+



# Запрос 3
Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

In [31]:
spark.sql(
    """
      select round(avg(lead_time)) as avg_lead_time, month, year
      from logs_hotel
      where status = 'Not_Canceled'
      group by year, month
      order by year, month
    """
).show()

+-------------+-----+----+
|avg_lead_time|month|year|
+-------------+-----+----+
|        131.0|    7|2017|
|         35.0|    8|2017|
|         52.0|    9|2017|
|         56.0|   10|2017|
|         33.0|   11|2017|
|         47.0|   12|2017|
|         35.0|    1|2018|
|         31.0|    2|2018|
|         43.0|    3|2018|
|         62.0|    4|2018|
|         61.0|    5|2018|
|         71.0|    6|2018|
|         87.0|    7|2018|
|         83.0|    8|2018|
|         63.0|    9|2018|
|         73.0|   10|2018|
|         44.0|   11|2018|
|         70.0|   12|2018|
+-------------+-----+----+



In [33]:
data.filter(f.col('status') == 'Not_Canceled')\
  .groupBy(['month', 'year'])\
  .agg(
      f.round(f.avg('lead_time'))
  )\
  .orderBy('year', 'month')\
  .show()

+-----+----+------------------------+
|month|year|round(avg(lead_time), 0)|
+-----+----+------------------------+
|    7|2017|                   131.0|
|    8|2017|                    35.0|
|    9|2017|                    52.0|
|   10|2017|                    56.0|
|   11|2017|                    33.0|
|   12|2017|                    47.0|
|    1|2018|                    35.0|
|    2|2018|                    31.0|
|    3|2018|                    43.0|
|    4|2018|                    62.0|
|    5|2018|                    61.0|
|    6|2018|                    71.0|
|    7|2018|                    87.0|
|    8|2018|                    83.0|
|    9|2018|                    63.0|
|   10|2018|                    73.0|
|   11|2018|                    44.0|
|   12|2018|                    70.0|
+-----+----+------------------------+



# Запрос 4
Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [41]:
data.filter(f.col('status') == 'Not_Canceled') \
    .groupBy('year', 'month') \
    .pivot('meal_plan') \
    .agg(f.round(
            f.avg((f.col('weekend_nights') + f.col('week_nights')) * f.col('avg_room_price'))
            )) \
    .orderBy('year', 'month') \
    .show()

+----+-----+-----------+-----------+-----------+------------+
|year|month|Meal Plan 1|Meal Plan 2|Meal Plan 3|Not Selected|
+----+-----+-----------+-----------+-----------+------------+
|2017|    7|      266.0|      216.0|       NULL|       189.0|
|2017|    8|      250.0|      234.0|       NULL|       262.0|
|2017|    9|      295.0|      208.0|       NULL|       310.0|
|2017|   10|      252.0|      263.0|       NULL|       287.0|
|2017|   11|      198.0|      198.0|       NULL|       187.0|
|2017|   12|      245.0|      141.0|       NULL|       196.0|
|2018|    1|      207.0|      301.0|        0.0|       177.0|
|2018|    2|      223.0|      425.0|       NULL|       170.0|
|2018|    3|      269.0|      398.0|        0.0|       212.0|
|2018|    4|      313.0|      240.0|        0.0|       202.0|
|2018|    5|      328.0|      398.0|       NULL|       218.0|
|2018|    6|      292.0|      344.0|        0.0|       237.0|
|2018|    7|      366.0|      416.0|       NULL|       276.0|
|2018|  

# Запрос 5
Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [42]:
top_5_guests = data.filter(f.col('status') == 'Not_Canceled')\
  .select([
      ((f.col('weekend_nights') + f.col('week_nights')) * f.col('avg_room_price')).alias('total_revenue'),

      'ID'
  ])\
  .orderBy('total_revenue', ascending=False)\
  .limit(5)
total_all_revenue = data.filter(f.col('status') == 'Not_Canceled') \
    .agg(f.sum((f.col('weekend_nights') + f.col('week_nights')) * f.col('avg_room_price'))) \
    .collect()[0][0]

top_5_guests.withColumn('percent', f.round(f.col('total_revenue') / total_all_revenue * 100, 2)) \
    .select('percent', 'ID', 'total_revenue') \
    .show()

+-------+--------+------------------+
|percent|      ID|     total_revenue|
+-------+--------+------------------+
|   0.03|INN12929|           2388.96|
|   0.03|INN14384|            2177.5|
|   0.03|INN09472|           1894.53|
|   0.03|INN35408|            1857.3|
|   0.03|INN17467|1774.8000000000002|
+-------+--------+------------------+



# Запрос 6
Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [37]:
spark.sql(
    """
    with hotels_with_date as (
      select *
        ,try_cast((h.year || '-' || h.month || '-' || h.date) as DATE) as arrival_date
      from logs_hotel h
      where status = 'Not_Canceled'
        and try_cast((h.year || '-' || h.month || '-' || h.date) as DATE) is not null
    )
    SELECT
      c.calendar_dt,
      COALESCE(SUM(h.n_adults + h.n_children), 0) AS total_guests,
      ROUND(
          COALESCE(SUM(h.n_adults + h.n_children), 0) / 400 * 100,
          2
      ) AS occupancy_percent
    FROM calendar c
    LEFT JOIN hotels_with_date h
        ON c.calendar_dt >= h.arrival_date
      AND c.calendar_dt < date_add(
          h.arrival_date,
          h.weekend_nights + h.week_nights
        )
    GROUP BY c.calendar_dt
    ORDER BY c.calendar_dt DESC
    """
).show()

+-----------+------------+-----------------+
|calendar_dt|total_guests|occupancy_percent|
+-----------+------------+-----------------+
| 2018-12-31|         562|            140.5|
| 2018-12-30|         572|            143.0|
| 2018-12-29|         542|            135.5|
| 2018-12-28|         507|           126.75|
| 2018-12-27|         552|            138.0|
| 2018-12-26|         422|            105.5|
| 2018-12-25|         397|            99.25|
| 2018-12-24|         373|            93.25|
| 2018-12-23|         341|            85.25|
| 2018-12-22|         282|             70.5|
| 2018-12-21|         247|            61.75|
| 2018-12-20|         240|             60.0|
| 2018-12-19|         228|             57.0|
| 2018-12-18|         258|             64.5|
| 2018-12-17|         274|             68.5|
| 2018-12-16|         254|             63.5|
| 2018-12-15|         170|             42.5|
| 2018-12-14|         155|            38.75|
| 2018-12-13|         153|            38.25|
| 2018-12-

In [40]:
hotel_with_arrival_date = data.filter(f.col('status') == 'Not_Canceled')\
  .withColumn("arrival_date",
              f.try_to_timestamp(f.concat_ws("-", f.col("year").cast(StringType())
                                , f.lpad(f.col("month").cast("string"), 2, "0")
                                , f.lpad(f.col("date").cast("string"), 2, "0")
                                )
                      )
              )
hotel_with_arrival_date = hotel_with_arrival_date.dropna(subset=["arrival_date"])
calendar_df.join(
    f.broadcast(hotel_with_arrival_date),
    (calendar_df.calendar_dt >= hotel_with_arrival_date.arrival_date) &
    (calendar_df.calendar_dt < f.date_add(
        hotel_with_arrival_date.arrival_date,
        hotel_with_arrival_date.week_nights + hotel_with_arrival_date.weekend_nights
    )),
    how='left'
)\
 .groupBy(calendar_df.calendar_dt)\
 .agg(
      f.sum(f.col('n_adults') + f.col('n_children')).alias('total_guests')
      , f.round(f.sum(f.col('n_adults') + f.col('n_children')) / 400 * 100, 2).alias('occupacy_percent')

 )\
 .orderBy(calendar_df.calendar_dt, ascending=False)\
 .show()

+-----------+------------+----------------+
|calendar_dt|total_guests|occupacy_percent|
+-----------+------------+----------------+
| 2018-12-31|         562|           140.5|
| 2018-12-30|         572|           143.0|
| 2018-12-29|         542|           135.5|
| 2018-12-28|         507|          126.75|
| 2018-12-27|         552|           138.0|
| 2018-12-26|         422|           105.5|
| 2018-12-25|         397|           99.25|
| 2018-12-24|         373|           93.25|
| 2018-12-23|         341|           85.25|
| 2018-12-22|         282|            70.5|
| 2018-12-21|         247|           61.75|
| 2018-12-20|         240|            60.0|
| 2018-12-19|         228|            57.0|
| 2018-12-18|         258|            64.5|
| 2018-12-17|         274|            68.5|
| 2018-12-16|         254|            63.5|
| 2018-12-15|         170|            42.5|
| 2018-12-14|         155|           38.75|
| 2018-12-13|         153|           38.25|
| 2018-12-12|         167|      