In [34]:
pip install pyspark



In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import pandas as pd
from pyspark.sql.functions import explode, sequence, to_date, date_format
from datetime import datetime, timedelta

In [36]:
spark = SparkSession.builder.appName("CSV Example").getOrCreate()
df = spark.read.csv("Hotel.csv", header=True, inferSchema=True)

In [37]:
spark = SparkSession.builder \
    .appName("Hotel") \
    .getOrCreate()

In [38]:
struct = StructType([
    StructField("ID", StringType(), True),
    StructField("n_adults", IntegerType(), True),
    StructField("n_children", IntegerType(), True),
    StructField("weekend_nights", IntegerType(), True),
    StructField("week_nights", IntegerType(), True),
    StructField("meal_plan", StringType(), True),
    StructField("car_parking_space", IntegerType(), True),
    StructField("room_type", StringType(), True),
    StructField("lead_time", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("date", IntegerType(), True),
    StructField("market_segment", StringType(), True),
    StructField("repeated_guest", IntegerType(), True),
    StructField("previous_cancellations", IntegerType(), True),
    StructField("previous_bookings_not_canceled", IntegerType(), True),
    StructField("avg_room_price", FloatType(), True),
    StructField("special_requests", IntegerType(), True),
    StructField("status", StringType(), True)
])

In [45]:
hotel = spark.read \
    .option("header", "true") \
    .option("inferSchema", "false") \
    .schema(struct) \
    .csv("Hotel.csv")

hotel.createOrReplaceTempView('hotel')

In [46]:
hotel.show()

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|      ID|n_adults|n_children|weekend_nights|week_nights|   meal_plan|car_parking_space|  room_type|lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|      status|
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|       2|         0|             1|          2| Meal Plan 1|                0|Room_Type 1|      224|2017|   10|   2|       Offline|             0|                     0|                             0|          65.0|       

In [67]:
begin_date = '2017-01-01'
end_date = '2018-12-31'

calendar = spark.sql(f"""
    select explode(sequence(to_date('{begin_date}'), to_date('{end_date}'), interval 1 day)) as calendar_date
""")

calendar.createOrReplaceTempView('calendar')

calendar.show()

+-------------+
|calendar_date|
+-------------+
|   2017-01-01|
|   2017-01-02|
|   2017-01-03|
|   2017-01-04|
|   2017-01-05|
|   2017-01-06|
|   2017-01-07|
|   2017-01-08|
|   2017-01-09|
|   2017-01-10|
|   2017-01-11|
|   2017-01-12|
|   2017-01-13|
|   2017-01-14|
|   2017-01-15|
|   2017-01-16|
|   2017-01-17|
|   2017-01-18|
|   2017-01-19|
|   2017-01-20|
+-------------+
only showing top 20 rows


In [55]:
#Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

spark.sql("""
    SELECT year, month, AVG(weekend_nights+week_nights) average_nights
    FROM hotel
    WHERE status = 'Not_Canceled'
    GROUP BY year, month
    ORDER BY year, month
""").show()

+----+-----+------------------+
|year|month|    average_nights|
+----+-----+------------------+
|2017|    7|3.0166666666666666|
|2017|    8|2.7189384800965017|
|2017|    9|2.6550783912747105|
|2017|   10|2.7032898820608318|
|2017|   11|2.7241935483870967|
|2017|   12| 3.043046357615894|
|2018|    1|2.7414141414141415|
|2018|    2|2.6891679748822606|
|2018|    3|3.0392038600723765|
|2018|    4| 2.924755887421022|
|2018|    5|2.8054545454545456|
|2018|    6| 2.596757322175732|
|2018|    7|3.1938088829071334|
|2018|    8|3.1544117647058822|
|2018|    9| 2.786425902864259|
|2018|   10|2.8910186199342824|
|2018|   11|2.9784511784511785|
|2018|   12|3.2521891418563924|
+----+-----+------------------+



In [56]:
#Определить ТОП-3 месяца по проценту отмененных броней за 2018 год
spark.sql("""
    SELECT month,
           ROUND(SUM(CASE WHEN status = 'Canceled' THEN 1 ELSE 0 END)*100.0 / COUNT(*), 2) top_canceled
    FROM hotel
    WHERE year = 2018
    GROUP BY month
    ORDER BY top_canceled DESC
    LIMIT 3
""").show()

+-----+------------+
|month|top_canceled|
+-----+------------+
|    8|       46.55|
|   10|       46.36|
|    9|       45.78|
+-----+------------+



In [59]:
#Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней
spark.sql("SELECT year, month, ROUND(AVG(lead_time), 2) average_days FROM hotel WHERE status='Not_Canceled' GROUP BY year, month ORDER BY year, month").show()

+----+-----+------------+
|year|month|average_days|
+----+-----+------------+
|2017|    7|      130.73|
|2017|    8|       35.08|
|2017|    9|       51.72|
|2017|   10|       55.89|
|2017|   11|       33.28|
|2017|   12|       46.75|
|2018|    1|       34.87|
|2018|    2|       30.53|
|2018|    3|       43.19|
|2018|    4|       62.49|
|2018|    5|       60.99|
|2018|    6|       70.64|
|2018|    7|       86.88|
|2018|    8|       83.09|
|2018|    9|       63.32|
|2018|   10|       73.24|
|2018|   11|       44.25|
|2018|   12|       69.75|
+----+-----+------------+



In [71]:
#Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT)

avr_revenue = hotel.filter("status='Not_Canceled'").withColumn("revenue", expr("(weekend_nights+week_nights)*avg_room_price"))
avr_revenue.groupBy("year","month").pivot("market_segment").agg(expr("AVG(revenue)")).orderBy("year","month").show()

+----+-----+------------------+-------------------+------------------+------------------+------------------+
|year|month|          Aviation|      Complementary|         Corporate|           Offline|            Online|
+----+-----+------------------+-------------------+------------------+------------------+------------------+
|2017|    7|              NULL|  22.39800109863281|            113.75|228.94688336260907| 290.5582356172449|
|2017|    8|              NULL|0.32432432432432434|156.41666666666666| 235.5371777737182|284.21033210500644|
|2017|    9|              NULL|  16.88888888888889|177.83193277310923| 236.6525355512196|  348.552185216779|
|2017|   10|              NULL| 1.0909090909090908|180.26470588235293|223.23766202704851|311.46756782531736|
|2017|   11|              NULL|            14.8125|102.97216852027249| 198.3566784823898| 240.5208890660089|
|2017|   12|              NULL| 0.2500000037252903|141.10634362825783|253.86361857672424|258.92541742438635|
|2018|    1|       

In [61]:
#Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость

spark.sql("""
    SELECT
        ID as id_guest,
        ROUND(SUM((weekend_nights+week_nights)*avg_room_price), 2) as profit
    FROM hotel
    WHERE status='Not_Canceled' AND repeated_guest=1
    GROUP BY ID
    ORDER BY profit DESC
    LIMIT 5
""").show()

+--------+------+
|id_guest|profit|
+--------+------+
|INN19235|1754.4|
|INN05222| 690.0|
|INN14189| 665.0|
|INN09923| 660.0|
|INN25479| 650.0|
+--------+------+



In [70]:
#Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

spark.sql("""
    SELECT
        c.calendar_date as date,
        COALESCE(SUM(h.n_adults + h.n_children), 0) as guests,
        ROUND(COALESCE(SUM(h.n_adults + h.n_children), 0) * 100.0 / 400, 2) as load_percent
    FROM calendar c
    LEFT JOIN hotel h ON CAST(c.calendar_date AS STRING) =
        CONCAT(CAST(h.year AS STRING), '-',
               LPAD(CAST(h.month AS STRING), 2, '0'), '-',
               LPAD(CAST(h.date AS STRING), 2, '0'))
        AND h.status = 'Not_Canceled'
    GROUP BY c.calendar_date
    ORDER BY c.calendar_date DESC
""").show(20)

+----------+------+------------+
|      date|guests|load_percent|
+----------+------+------------+
|2018-12-31|    67|       16.75|
|2018-12-30|   166|       41.50|
|2018-12-29|   162|       40.50|
|2018-12-28|   134|       33.50|
|2018-12-27|   263|       65.75|
|2018-12-26|   117|       29.25|
|2018-12-25|    84|       21.00|
|2018-12-24|    98|       24.50|
|2018-12-23|   113|       28.25|
|2018-12-22|    89|       22.25|
|2018-12-21|    91|       22.75|
|2018-12-20|    64|       16.00|
|2018-12-19|    68|       17.00|
|2018-12-18|    83|       20.75|
|2018-12-17|    82|       20.50|
|2018-12-16|   124|       31.00|
|2018-12-15|    53|       13.25|
|2018-12-14|    44|       11.00|
|2018-12-13|    54|       13.50|
|2018-12-12|    51|       12.75|
+----------+------+------------+
only showing top 20 rows
