## Домашнее задание 5. Анализ данных на Spark SQL

In [1]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [2]:
pip install pyspark



In [3]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [4]:
import findspark
findspark.init()

In [5]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [6]:
import pyspark
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.session import SparkSession
spark = (
    SparkSession
    .builder
    .appName('Test_01')
    .config('spark.ui.port', '9311')
    .config('spark.executor.memoryOverhead', '1G')
    .config('spark.shuffle.service.enabled', 'true')
    .config('spark.dynamicAllocation.enabled', 'true')
    .config('spark.driver.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
    .config('spark.executor.extraClassPath', '/opt/spark/jars/sqljdbc42.jar')\
    .getOrCreate()
)

## Шаг 1. Создание таблицы из csv-файла

In [7]:
from pyspark.sql import functions as f
from pyspark.sql.window import Window
import pandas as pd

In [180]:
logs_hotel = spark.read.option("header",True).option("sep",",").csv("Hotel.csv")

In [181]:
logs_hotel.show(10,0)

+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|ID      |n_adults|n_children|weekend_nights|week_nights|meal_plan   |car_parking_space|room_type  |lead_time|year|month|date|market_segment|repeated_guest|previous_cancellations|previous_bookings_not_canceled|avg_room_price|special_requests|status      |
+--------+--------+----------+--------------+-----------+------------+-----------------+-----------+---------+----+-----+----+--------------+--------------+----------------------+------------------------------+--------------+----------------+------------+
|INN00001|2       |0         |1             |2          |Meal Plan 1 |0                |Room_Type 1|224      |2017|10   |2   |Offline       |0             |0                     |0                             |65            |0      

##Шаг 2. Генерация таблицы с датами

Создать (сгенерировать) таблицу calendar, который будет состоять из одного поля calendar_dt со всеми днями с 2017-01-01 по 2018-12-31.

In [182]:
first_date = '2017-01-01'
last_date = '2018-12-31'
dates = pd.date_range(start=first_date, end=last_date, freq='D')
dates_df_pd = pd.DataFrame({'calendar_dt': dates})
calendar = spark.createDataFrame(dates_df_pd)
calendar = calendar.withColumn('calendar_dt', f.to_date(f.col('calendar_dt')))

In [183]:
calendar.show(10, 0)

+-----------+
|calendar_dt|
+-----------+
|2017-01-01 |
|2017-01-02 |
|2017-01-03 |
|2017-01-04 |
|2017-01-05 |
|2017-01-06 |
|2017-01-07 |
|2017-01-08 |
|2017-01-09 |
|2017-01-10 |
+-----------+
only showing top 10 rows


##Шаг 3. Запросы

1. Вычислить среднее количество ночей, которые гости проводят в отеле (только для подтвержденных бронирований, с детализацией по месяцам и годам)

In [184]:
result = logs_hotel.filter(
    logs_hotel["status"] == "Not_Canceled"
).groupBy("year", "month").agg(
    f.avg(f.col("weekend_nights").cast('int') +
          f.col("week_nights").cast('int')).alias("nights_mean")
).orderBy(f.col("year").cast('int'), f.col("month").cast('int'))
result.show()

+----+-----+------------------+
|year|month|       nights_mean|
+----+-----+------------------+
|2017|    7|3.0166666666666666|
|2017|    8|2.7189384800965017|
|2017|    9|2.6550783912747105|
|2017|   10|2.7032898820608318|
|2017|   11|2.7241935483870967|
|2017|   12| 3.043046357615894|
|2018|    1|2.7414141414141415|
|2018|    2|2.6891679748822606|
|2018|    3|3.0392038600723765|
|2018|    4| 2.924755887421022|
|2018|    5|2.8054545454545456|
|2018|    6| 2.596757322175732|
|2018|    7|3.1938088829071334|
|2018|    8|3.1544117647058822|
|2018|    9| 2.786425902864259|
|2018|   10|2.8910186199342824|
|2018|   11|2.9784511784511785|
|2018|   12|3.2521891418563924|
+----+-----+------------------+



2. Определить ТОП-3 месяца по проценту отмененных броней за 2018 год.

In [186]:
agg_table = logs_hotel.filter(logs_hotel["year"] == 2018)\
.groupBy("month").agg(
    f.round(f.sum(f.when(f.col("status") == "Canceled", 1).otherwise(0)) /
            f.count("*") * 100, 2).alias("Canceled_%")
).orderBy(f.col("Canceled_%").desc()).limit(3)
agg_table.show()

+-----+----------+
|month|Canceled_%|
+-----+----------+
|    8|     46.55|
|   10|     46.36|
|    9|     45.78|
+-----+----------+



3. Вычислить среднее время на каждый месяц между бронированием и заездом в отель для подтвержденных броней.

In [187]:
result = logs_hotel.filter(
    logs_hotel["status"] == "Not_Canceled"
).groupBy("year", "month").agg(
    f.round(f.avg("lead_time"), 2).alias("avg_lead_time"))\
    .orderBy(f.col("year").cast('int'), f.col("month").cast('int'))
result.show()

+----+-----+-------------+
|year|month|avg_lead_time|
+----+-----+-------------+
|2017|    7|       130.73|
|2017|    8|        35.08|
|2017|    9|        51.72|
|2017|   10|        55.89|
|2017|   11|        33.28|
|2017|   12|        46.75|
|2018|    1|        34.87|
|2018|    2|        30.53|
|2018|    3|        43.19|
|2018|    4|        62.49|
|2018|    5|        60.99|
|2018|    6|        70.64|
|2018|    7|        86.88|
|2018|    8|        83.09|
|2018|    9|        63.32|
|2018|   10|        73.24|
|2018|   11|        44.25|
|2018|   12|        69.75|
+----+-----+-------------+



4. Вычислить общую среднюю выручку на каждый месяц в каждом году, сгруппировав по всем типам бронирования для подтвержденных броней, и вывести это в виде сводной таблицы (PIVOT).

In [194]:
result = logs_hotel.filter(
    logs_hotel["status"] == "Not_Canceled"
).groupBy("year", "month", "market_segment").agg(
    f.sum(
          f.round((f.col("weekend_nights").cast('int') +
                   f.col("week_nights").cast('int')) *
                  f.col("avg_room_price").cast('float'))
    ).alias("avg_income")
).orderBy(
    f.col("year").cast('int'),
    f.col("month").cast('int'),
    "market_segment"
)
result.show(50, 0)


+----+-----+--------------+----------+
|year|month|market_segment|avg_income|
+----+-----+--------------+----------+
|2017|7    |Complementary |112.0     |
|2017|7    |Corporate     |455.0     |
|2017|7    |Offline       |17644.0   |
|2017|7    |Online        |9886.0    |
|2017|8    |Complementary |12.0      |
|2017|8    |Corporate     |8447.0    |
|2017|8    |Offline       |67639.0   |
|2017|8    |Online        |128203.0  |
|2017|9    |Complementary |456.0     |
|2017|9    |Corporate     |21162.0   |
|2017|9    |Offline       |164241.0  |
|2017|9    |Online        |218551.0  |
|2017|10   |Complementary |24.0      |
|2017|10   |Corporate     |18388.0   |
|2017|10   |Offline       |182378.0  |
|2017|10   |Online        |208725.0  |
|2017|11   |Complementary |237.0     |
|2017|11   |Corporate     |8547.0    |
|2017|11   |Offline       |54345.0   |
|2017|11   |Online        |59421.0   |
|2017|12   |Complementary |8.0       |
|2017|12   |Corporate     |13123.0   |
|2017|12   |Offline      

5. Выявить ТОП-5 постоянных гостей, которые принесли наибольшую выручку за все время, и показать их долю в общей выручке от постоянных гостей. Использовать уникальный идентификатор брони как уникальный идентификатор гостя, предположив, что 1 бронь = 1 гость.

In [198]:
# общая выручка от постоянных гостей
total_sum = logs_hotel.filter(
    (logs_hotel["status"] == "Not_Canceled") &
    (logs_hotel["repeated_guest"] == 1)
).agg(
    f.sum(
          (f.col("weekend_nights").cast('int') +
           f.col("week_nights").cast('int')) *
          f.col("avg_room_price").cast('float')
    ).alias("total_value")
).select('total_value').first()[0]

# ответ
result = logs_hotel.filter(
    (logs_hotel["status"] == "Not_Canceled") &
    (logs_hotel["repeated_guest"] == 1)
).withColumn(
    "income",
    f.round((f.col("weekend_nights").cast('int') +
             f.col("week_nights").cast('int')) *
            f.col("avg_room_price").cast('float'))
).withColumn(
    "income_share_%",
    f.round(f.col("income") / total_sum * 100, 2)
    ).orderBy(f.col("income").desc()).limit(5)\
.select("ID", "income", "income_share_%")
result.show()

+--------+------+--------------+
|      ID|income|income_share_%|
+--------+------+--------------+
|INN19235|1754.0|          1.55|
|INN05222| 690.0|          0.61|
|INN14189| 665.0|          0.59|
|INN09923| 660.0|          0.58|
|INN25479| 650.0|          0.57|
+--------+------+--------------+



6. Вывести общее количество гостей на каждый день в отеле, отсортировав по убыванию дат, включая дни, когда отель пустует. Также рассчитать процент загрузки для каждого дня, если известно, что общая вместимость отеля 400 человек.

In [202]:
# добавляется столбец с датой
# дата "2018-02-29" исправляется на "2018-03-01"
logs_hotel_dated = logs_hotel.filter(
    logs_hotel["status"] == "Not_Canceled"
).withColumn(
    "complete_date",
    f.concat(f.col("year"), f.lit("-"), f.col("month"),
             f.lit("-"), f.col("date")).alias("date_str")
).withColumn(
    "complete_date",
    f.to_date(
        f.when(f.col("complete_date") != "2018-2-29",
        f.col("complete_date")).otherwise("2018-3-1")
    )
)

# в датафрейм добавляются пропущенные даты - в которые не было новых гостей
# в соответствии с логами отеля из календаря берутся даты начиная с "2017-07-01"
logs_hotel_dated = calendar.filter(
    calendar["calendar_dt"] >= "2017-07-01"
).join(
    f.broadcast(logs_hotel_dated),
    calendar.calendar_dt == logs_hotel_dated.complete_date,
    'left'
).withColumn(
    # добавляется столбец с количеством дней, остающимся на следующую дату
    "left_days",
    f.when(
        f.col("ID").isNotNull(),
        (f.col("weekend_nights").cast('int') +
         f.col("week_nights").cast('int') - 1)
    ).otherwise(0)
)

# датафрейм с количеством гостей, остающихся на следующую дату
old_guests_cnt = logs_hotel_dated.groupBy("calendar_dt", "left_days")\
.agg(f.sum(f.when(
    f.col("ID").isNotNull(),
    (f.col("n_adults").cast('int') + f.col("n_children").cast('int'))
).otherwise(0)).alias("guests"))\
.withColumn(
    "last_date",
    f.date_add(f.col("calendar_dt"), f.col("left_days"))
).withColumn(
    "guests_accum",
    f.sum(f.col("guests").cast('int'))\
    .over(Window.partitionBy("calendar_dt").orderBy(f.col("left_days").desc()))
).groupBy("last_date").agg(
    f.sum(f.col("guests_accum").cast('int')).alias("old_guests")
)

# датафрейм с количеством новых гостей на текущую дату
new_guests_cnt = logs_hotel_dated.groupBy("calendar_dt")\
.agg(f.sum(f.when(
    f.col("ID").isNotNull(),
    (f.col("n_adults").cast('int') + f.col("n_children").cast('int'))
).otherwise(0)).alias("new_guests"))

# ответ
result = new_guests_cnt.join(
    f.broadcast(old_guests_cnt),
    new_guests_cnt.calendar_dt == old_guests_cnt.last_date,
    'left'
).withColumn(
    "guests_cnt",
    f.when(
        f.col("old_guests").isNotNull(),
        f.col("new_guests").cast('int') +
        f.col("old_guests").cast('int')
    ).otherwise(f.col("new_guests"))
).withColumn(
    "load_%",
    f.round(f.col("guests_cnt").cast('int') / 400 * 100, 2)
).orderBy(f.col("calendar_dt").desc())\
.select("calendar_dt", "guests_cnt", "load_%")
result.show(50, 0)


+-----------+----------+------+
|calendar_dt|guests_cnt|load_%|
+-----------+----------+------+
|2018-12-31 |619       |154.75|
|2018-12-30 |728       |182.0 |
|2018-12-29 |699       |174.75|
|2018-12-28 |641       |160.25|
|2018-12-27 |802       |200.5 |
|2018-12-26 |539       |134.75|
|2018-12-25 |471       |117.75|
|2018-12-24 |465       |116.25|
|2018-12-23 |452       |113.0 |
|2018-12-22 |369       |92.25 |
|2018-12-21 |262       |65.5  |
|2018-12-20 |295       |73.75 |
|2018-12-19 |267       |66.75 |
|2018-12-18 |333       |83.25 |
|2018-12-17 |354       |88.5  |
|2018-12-16 |353       |88.25 |
|2018-12-15 |204       |51.0  |
|2018-12-14 |180       |45.0  |
|2018-12-13 |186       |46.5  |
|2018-12-12 |214       |53.5  |
|2018-12-11 |269       |67.25 |
|2018-12-10 |657       |164.25|
|2018-12-09 |739       |184.75|
|2018-12-08 |789       |197.25|
|2018-12-07 |806       |201.5 |
|2018-12-06 |428       |107.0 |
|2018-12-05 |562       |140.5 |
|2018-12-04 |507       |126.75|
|2018-12

spark.stop()