In [1]:
# ! hdfs dfs -rm -R /user/airbnb
# ! hdfs dfs -mkdir -p /user/airbnb
# ! hdfs dfs -put airbnb-listings.csv /user/airbnb/

In [2]:
# ! pip install findspark

In [56]:
import pandas as pd
import findspark
import pyspark
from pyspark.sql import SparkSession, Row

from pyspark.sql.functions import explode, split, col, collect_set, count, size

pd.set_option('display.max_colwidth', None)

In [3]:
findspark.init()

In [4]:
context = pyspark.SparkContext(appName = "lsml-airbnb")
session = SparkSession(context)

In [28]:
df = session.read.option("mode", "DROPMALFORMED").option('sep', ';') \
.csv("/user/airbnb/airbnb-listings.csv", header=True, inferSchema=True)
df.show()

+--------+--------------------+--------------+------------+--------------------+--------------------+--------------------+--------------------+-------------------+---------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------+--------------------+--------------+----------+--------------------+--------------------+------------------+------------------+--------------------+--------------------+--------------------+------------------+-------------------+-------------------------+--------------------+--------------------+-------------+----------------------+----------------------------+---------+-------------+-------+---------+--------------------+------------+-----------+------------------+------------------+---------------+---------------+------------+---------+--------+----+-------------+--------------------+-----------+-----+------

In [106]:
df.select("Amenities", "City").limit(4).show()

+---------+----+
|Amenities|City|
+---------+----+
|     null|null|
|     null|null|
|     null|null|
|        9|  50|
+---------+----+



In [107]:
df.limit(4).toPandas().columns

Index(['ID', 'Listing Url', 'Scrape ID', 'Last Scraped', 'Name', 'Summary',
       'Space', 'Description', 'Experiences Offered', 'Neighborhood Overview',
       'Notes', 'Transit', 'Access', 'Interaction', 'House Rules',
       'Thumbnail Url', 'Medium Url', 'Picture Url', 'XL Picture Url',
       'Host ID', 'Host URL', 'Host Name', 'Host Since', 'Host Location',
       'Host About', 'Host Response Time', 'Host Response Rate',
       'Host Acceptance Rate', 'Host Thumbnail Url', 'Host Picture Url',
       'Host Neighbourhood', 'Host Listings Count',
       'Host Total Listings Count', 'Host Verifications', 'Street',
       'Neighbourhood', 'Neighbourhood Cleansed',
       'Neighbourhood Group Cleansed', 'City', 'State', 'Zipcode', 'Market',
       'Smart Location', 'Country Code', 'Country', 'Latitude', 'Longitude',
       'Property Type', 'Room Type', 'Accommodates', 'Bathrooms', 'Bedrooms',
       'Beds', 'Bed Type', 'Amenities', 'Square Feet', 'Price', 'Weekly Price',
       'Month

Задание 1. [1 балл] В разрезе каждого города (колонка city) выделите весь список удобств (amenities), предлагаемых в жилплощади из этих городов (учитывайте возможные дубли в этих данных). В ответе выведите фрейм данных со списком всех удобств для следующих городов: Hoxton, Rainham, Rio Del Mar. По ответам на эти города мы увидим, правильные ли преобразования для данных вы собрали.

In [109]:
amenities_df = df.select("City", explode(split(col("Amenities"), ',')).alias("Amenity"))
amenities_df = amenities_df.groupBy("City").agg(collect_set("Amenity").alias("distinct_amenities"))

# Filter for specific cities
filtered_amenities_df = amenities_df.filter(col("City").isin("Hoxton", "Rainham", "Rio Del Mar"))
filtered_amenities_df.show(truncate=False)

+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|City       |distinct_amenities                                                                                                                                                  |
+-----------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Rainham    |[Wireless Internet, Essentials, Iron, Washer, TV, Heating, Kitchen, translation missing: en.hosting_amenity_49, translation missing: en.hosting_amenity_50, Hangers]|
|Hoxton     |[Wireless Internet, Essentials, Iron, TV, Heating, Smoke detector, Internet, Laptop friendly workspace, Kitchen, Hangers]                                           |
|Rio Del Mar|[Indoor Fireplace, Suitable for Events, Essentials, Shampoo, Washer, Heating, Kitchen, Free 

Задание 2. [1 балл] Также в разрезе каждого города выделите удобства, которые предлагаются в жилплощадях этих городов и выделите из них топ-5 городов, в жилье которых предлагается больше всего удобств. Учитывайте дубли в этих значениях относительно разного жилья, тут речь идет про поиск всех уникальных значений по городам и их подсчет для поиска самых удобных городов. В качестве ответа выведите фрейм данных с этими городами и числом уникальных удобств из них.

In [110]:
amenities_df = amenities_df.withColumn("number_of_amenities", size(amenities_df.distinct_amenities))

top_cities_df = amenities_df.orderBy(col("number_of_amenities").desc()).limit(5)
top_cities_df.show(truncate=False)

+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Задание 3. [1 балл] Давайте выясним, по каким объявлениям можно пройти верификацию на оффер по жилью через linkedin (колонка host_verifications), что в реалиях airbnb достаточно забавно. Нам также подойдут варианты объявлений, где верификация через линкед будет одним из возможных вариантов. В ответе укажите кол-во объявлений из стран (колонка country) Switzerland и Austria, по которым можно пройти верификацию через линкед.

In [111]:
df.select("Host Verifications", "Country").limit(10).show(truncate=False)

+----------------------------------+-----------+
|Host Verifications                |Country    |
+----------------------------------+-----------+
|null                              |null       |
|null                              |null       |
|null                              |null       |
|null                              |44         |
|null                              |null       |
|null                              |null       |
|null                              |null       |
|null                              |0          |
|email,phone,facebook,reviews,jumio|Netherlands|
|email,phone,reviews,jumio         |Netherlands|
+----------------------------------+-----------+



In [112]:
linkedin_df = df.select("Country", explode(split(col("Host Verifications"), ',')).alias("host_verification"))
linkedin_df = linkedin_df.where(col("host_verification") == "linkedin")

linkedin_df_countrywise = linkedin_df.groupBy("Country").count().withColumnRenamed("count", "verifications_with_linkedin")
linkedin_df_countrywise_selected = linkedin_df_countrywise.filter((col("Country") == "Switzerland") | (col("Country") == "Austria"))
linkedin_df_countrywise_selected.show(truncate=False)

+-----------+---------------------------+
|Country    |verifications_with_linkedin|
+-----------+---------------------------+
|Switzerland|9                          |
|Austria    |70                         |
+-----------+---------------------------+



Задание 4. [1 балл] Посмотрим объявления с большим кол-вом спален (колонка bedrooms). Найдите в наших данных объявления с 10 спальнями за самую высокую и самую низкую стоимости (колонка price). В ответ выведите фрейм данных с колонкой id по этим объявлениям. Учитывайте правильный тип данных для этого задания, чтобы получить верный ответ.

In [113]:
df.select("Bedrooms", "ID", "Price").limit(10).show(truncate=False)

+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Bedrooms|ID                                                                                                                                                                                                                                                                               |Price|
+--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|null    |4008728                                                                                                              

In [125]:
from pyspark.sql.types import IntegerType, FloatType, BooleanType
from pyspark.sql.functions import udf

def is_numerical(value):
    try:
        float(value)
        return True
    except (ValueError, TypeError):
        return False

# Register the UDF
is_numerical_udf = udf(is_numerical, BooleanType())

ten_bedrooms_df = df.select("ID", "Bedrooms", "Price").where(col("Bedrooms") == 10)
ten_bedrooms_df = ten_bedrooms_df.withColumn("Price", ten_bedrooms_df["Price"].cast(IntegerType()))
ten_bedrooms_df = ten_bedrooms_df.where(is_numerical_udf(col("ID")))

min_price = ten_bedrooms_df.agg({'Price': 'min'}).collect()[0].asDict()["min(Price)"]
max_price = ten_bedrooms_df.agg({'Price': 'max'}).collect()[0].asDict()["max(Price)"]

ten_bedrooms_extra_df = ten_bedrooms_df.filter((col("Price") == min_price) | (col("Price") == max_price))
ten_bedrooms_extra_df.show(truncate=False)

+--------+--------+-----+
|ID      |Bedrooms|Price|
+--------+--------+-----+
|11655788|10      |39   |
|10369462|10      |999  |
|14953643|10      |999  |
+--------+--------+-----+



Задание 5. [1 балл] Посмотрим на типы предлагаемого жилья (property_type) и их цену. Найдите в наших данных минимальную и максимальную стоимость для таких типов жилья как: Car, Castle, Lighthouse. В качестве ответа выведите фрей данных с этой инфой.

In [126]:
df.select("Property Type", "Price").limit(10).show(truncate=False)

+-------------+-----+
|Property Type|Price|
+-------------+-----+
|null         |null |
|null         |null |
|null         |null |
|2017-04-02   |10   |
|null         |null |
|null         |null |
|null         |null |
|2017-04-02   |10   |
|Apartment    |125  |
|Apartment    |130  |
+-------------+-----+



In [127]:
property_types_df = df.select("Property Type", "Price").filter(col("Property Type").isin("Car", "Castle", "Lighthouse"))
property_types_df = property_types_df.withColumn("Price", property_types_df["Price"].cast(IntegerType()))
property_types_df = property_types_df.withColumn("Price2", property_types_df.Price)
min_max_prices_df = property_types_df.groupBy("Property Type").agg({"Price": "min", "Price2": "max"}).withColumnRenamed("max(Price2)", "max(Price)")

min_max_prices_df.show(truncate=False)

+-------------+----------+----------+
|Property Type|max(Price)|min(Price)|
+-------------+----------+----------+
|Castle       |900       |35        |
|Lighthouse   |500       |36        |
|Car          |30        |30        |
+-------------+----------+----------+

