# Advanced Spark

Сегодня пройдемся по каким-то аспектам фреймворка Spark, которые не затрагивали в предыдущий раз, но которые могут оказаться очень полезными.

Датасет на сегодня - данные с сайта Airbnb

In [None]:
! hdfs dfs -mkdir -p /user/airbnb

In [None]:
! curl https://storage.yandexcloud.net/lsml2022alexius/accommodation.tsv | hdfs dfs -put - /user/airbnb/accommodation.tsv
! curl https://storage.yandexcloud.net/lsml2022alexius/apartment.tsv | hdfs dfs -put - /user/airbnb/apartment.tsv
! curl https://storage.yandexcloud.net/lsml2022alexius/hosts.tsv | hdfs dfs -put - /user/airbnb/hosts.tsv
! curl https://storage.yandexcloud.net/lsml2022alexius/reviews.tsv | hdfs dfs -put - /user/airbnb/reviews.tsv

In [None]:
! hdfs dfs -ls -h /user/airbnb

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
sc = pyspark.SparkContext(appName="lsml-app-1")

In [3]:
from pyspark.sql import SparkSession, Row

In [4]:
se = SparkSession(sc)

In [None]:
accommodation = se.read.option("mode", "DROPMALFORMED").option('sep', "\t").csv("/user/airbnb/accommodation.tsv", header=True, inferSchema=True)
apartment = se.read.option("mode", "DROPMALFORMED").option('sep', "\t").csv("/user/airbnb/apartment.tsv", header=True, inferSchema=True)
hosts = se.read.option("mode", "DROPMALFORMED").option('sep', "\t").csv("/user/airbnb/hosts.tsv", header=True, inferSchema=True)
reviews = se.read.option("mode", "DROPMALFORMED").option('sep', "\t").csv("/user/airbnb/reviews.tsv", header=True, inferSchema=True)

In [None]:
accommodation.printSchema()
apartment.printSchema()
hosts.printSchema()
reviews.printSchema()

#### Улучшаем формат записи

Сейчас мы храним записи в сыром формате в виде просто строк. Такой формат удобен для чтения человеков, однако машине приходится прикладывать усилия, чтобы разобрать такой формат.

Существует ряд форматов, поддерживаемых в Spark, которые гораздо лучше подходят для обработки. Один из таких форматов - Parquet. Подробнее можно почитать про него на официальном сайте - https://parquet.apache.org/

In [None]:
%%time

apartment.count()

Посчитаем например среднее квадрата значения в колонке price

In [None]:
%%time

apartment.rdd.map(lambda x: float(x.price if x.price not in ("none", None) else "0")).map(lambda x: x**2).mean()

~10 секунд. Пробуем сконвертировать в паркет.

In [None]:
! hdfs dfs -rm -r /user/airbnb/parquet
! hdfs dfs -mkdir -p /user/airbnb/parquet

In [None]:
apartment.write.parquet("/user/airbnb/parquet/apartment.parquet")

In [None]:
apartment_parquet = se.read.parquet("/user/airbnb/parquet/apartment.parquet")

In [None]:
apartment_parquet.printSchema()

Попробуем вопрорить запрос выше.

In [None]:
%%time

apartment_parquet.rdd.map(lambda x: float(x.price if x.price not in ("none", None) else "0")).map(lambda x: x**2).mean()

~3 секунды. Не фантастика, часть оверхеда уже сняли. Переконвертируем сразу все в этот формат и дальше будем работать уже с ним.

In [None]:
! hdfs dfs -rm -r /user/airbnb/parquet/

In [None]:
accommodation.write.parquet("/user/airbnb/parquet/accommodation.parquet")
apartment.write.parquet("/user/airbnb/parquet/apartment.parquet")
hosts.write.parquet("/user/airbnb/parquet/hosts.parquet")
reviews.write.parquet("/user/airbnb/parquet/reviews.parquet")

In [5]:
accommodation = se.read.parquet('/user/airbnb/parquet/accommodation.parquet')
apartment = se.read.parquet('/user/airbnb/parquet/apartment.parquet')
hosts = se.read.parquet('/user/airbnb/parquet/hosts.parquet')
reviews = se.read.parquet('/user/airbnb/parquet/reviews.parquet')

In [6]:
accommodation.registerTempTable("accommodation")
apartment.registerTempTable("apartment")
hosts.registerTempTable("hosts")
reviews.registerTempTable("reviews")

#### Поисследуем таблички

In [None]:
se.sql("""
    SELECT * 
    FROM accommodation
    LIMIT 10
""").toPandas()

In [None]:
se.sql("""
    SELECT *
    FROM apartment
    LIMIT 10
""").toPandas()

In [None]:
se.sql("""
    SELECT *
    FROM hosts
    LIMIT 10
""").toPandas()

In [None]:
se.sql("""
    SELECT *
    FROM reviews
    LIMIT 10
""").toPandas()

В SQL есть готовые функции для манипуляции с данными. Весь список можно найти здесь - https://spark.apache.org/docs/2.3.0/api/sql/index.html . Конкретно сейчас воспользуемся split, которая превращает строку в массив строк, разбивая ее по указанному символу.

In [None]:
se.sql("""
    SELECT apartment_id, split(amenities, ',') as amenities
    FROM accommodation
    LIMIT 10
""").toPandas()

`explode` распупочивает список в отдельные записи в таблице

In [None]:
se.sql("""
    SELECT apartment_id, explode(split(amenities, ',')) as amenities
    FROM accommodation
    LIMIT 10
""").toPandas()

In [None]:
se.sql("""
    SELECT *
    FROM apartment
    LIMIT 10
""").toPandas()

Чтобы не плодить лишних таблиц, можно делать подзапросы. Распупочим удобства и сразу джойним их с информацией про квартиры.

In [None]:
se.sql("""
    WITH accommodation_full AS (
        SELECT apartment_id, explode(split(amenities, ',')) as amenities
        FROM accommodation
    )
    SELECT *
    FROM accommodation_full af
        join apartment a on a.id = af.apartment_id
    LIMIT 10
""").toPandas()

In [None]:
se.sql("""
    WITH accommodation_full AS (
        SELECT apartment_id, explode(split(amenities, ',')) as amenities
        FROM accommodation
    )
    SELECT *
    FROM accommodation_full af
        join apartment a on a.id = af.apartment_id
""").registerTempTable("app_n_accom")

In [None]:
se.sql("""
    SELECT apartment_id, name
    FROM app_n_accom
    WHERE amenities = "Hair dryer"
    LIMIT 10
""").toPandas()

Чтож, давайте немного позанимается машинным обучением. Задача будет простая и понятная - предсказываем цену для квартиры.

Давайте придумывать признаки. Попробуей вычленить что-то интересное из каждой таблички

In [None]:
se.sql("""
    SELECT *
    FROM hosts
    LIMIT 5
""").toPandas()

In [None]:
se.sql("""
SELECT (cast(now() as long) - cast(cast(host_since as timestamp) as long)) / (60 * 60 * 24) as f_host_for
FROM hosts
LIMIT 10
""").toPandas()

In [None]:
se.sql("""
SELECT cast(host_response_rate as int) as f_host_response_rate, cast(host_acceptance_rate as int) as f_host_acceptance_rate, cast(host_total_listings_count as int) as f_host_total_listings_count
FROM hosts
LIMIT 10
""").toPandas()

In [None]:
se.sql("""
SELECT size(split(host_verifications, ',')) as f_num_of_ver
FROM hosts
LIMIT 10
""").toPandas()

#### Used defined functions

Все числа было бы неплохо привести к нормальному виду. 
Делать это чисто из SQL достаточно непросто, поэтому можно попробовать заиспользовать питоновский код прямо в SQL

In [7]:
def to_number(raw_value):
    try:
        return float(raw_value)
    except:
        return 0.0

In [8]:
se.udf.register("to_number", to_number, "float")

<function __main__.to_number(raw_value)>

Соберем в одну табличку с признаками

In [9]:
se.sql("""
SELECT host_id, 
       (cast(now() as long) - cast(cast(host_since as timestamp) as long)) / (60 * 60 * 24) as f_host_for,
       to_number(cast(host_response_rate as int)) as f_host_response_rate, 
       to_number(cast(host_acceptance_rate as int)) as f_host_acceptance_rate, 
       to_number(cast(host_total_listings_count as int)) as f_host_total_listings_count,
       to_number(size(split(host_verifications, ','))) as f_num_of_ver
FROM hosts
""").registerTempTable("hosts_features")

In [10]:
se.sql("""
SELECT *
FROM hosts_features
LIMIT 5
""").toPandas()

Unnamed: 0,host_id,f_host_for,f_host_response_rate,f_host_acceptance_rate,f_host_total_listings_count,f_num_of_ver
0,39150424,2399.459826,100.0,0.0,2.0,7.0
1,75748444,2080.459826,0.0,0.0,1.0,6.0
2,11008595,2962.459826,100.0,0.0,2.0,6.0
3,9273916,3051.459826,0.0,0.0,1.0,4.0
4,45996316,2321.459826,0.0,0.0,1.0,4.0


In [11]:
se.sql("""
SELECT *
FROM reviews
LIMIT 10
""").toPandas()

Unnamed: 0,apartment_id,number_of_reviews,review_scores_rating,review_scores_accuracy,review_scores_cleanliness,review_scores_checkin,review_scores_communication,review_scores_location,review_scores_value,reviews_per_month
0,7677581,9,96.0,10.0,10.0,10.0,10.0,10.0,9.0,0.42
1,5831024,28,95.0,10.0,10.0,10.0,10.0,10.0,9.0,1.14
2,7292084,1,80.0,10.0,8.0,10.0,10.0,8.0,8.0,0.13
3,9674633,0,,,,,,,,
4,8495779,31,92.0,9.0,8.0,10.0,10.0,10.0,9.0,1.6
5,17923575,5,92.0,9.0,9.0,9.0,10.0,10.0,9.0,4.17
6,18507229,0,,,,,,,,
7,3429176,99,96.0,10.0,10.0,10.0,10.0,9.0,9.0,2.88
8,5389886,2,100.0,9.0,9.0,10.0,10.0,10.0,8.0,0.1
9,13994116,1,100.0,10.0,10.0,10.0,10.0,10.0,10.0,0.11


In [12]:
reviews.printSchema()

root
 |-- apartment_id: string (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- review_scores_rating: integer (nullable = true)
 |-- review_scores_accuracy: integer (nullable = true)
 |-- review_scores_cleanliness: integer (nullable = true)
 |-- review_scores_checkin: integer (nullable = true)
 |-- review_scores_communication: integer (nullable = true)
 |-- review_scores_location: integer (nullable = true)
 |-- review_scores_value: integer (nullable = true)
 |-- reviews_per_month: double (nullable = true)



Пошаманим с запросом через вставку в SQL

In [13]:
query = ", ".join([
    "to_number({c}) as f_{c}".format(c=c)
    for c in reviews.columns
    if c != "apartment_id"
])

In [14]:
query

'to_number(number_of_reviews) as f_number_of_reviews, to_number(review_scores_rating) as f_review_scores_rating, to_number(review_scores_accuracy) as f_review_scores_accuracy, to_number(review_scores_cleanliness) as f_review_scores_cleanliness, to_number(review_scores_checkin) as f_review_scores_checkin, to_number(review_scores_communication) as f_review_scores_communication, to_number(review_scores_location) as f_review_scores_location, to_number(review_scores_value) as f_review_scores_value, to_number(reviews_per_month) as f_reviews_per_month'

In [15]:
se.sql("""
SELECT apartment_id, {}
FROM reviews
""".format(query)).registerTempTable("reviews_features")

In [16]:
se.sql("""
SELECT *
FROM reviews_features
LIMIT 5
""").toPandas()

Unnamed: 0,apartment_id,f_number_of_reviews,f_review_scores_rating,f_review_scores_accuracy,f_review_scores_cleanliness,f_review_scores_checkin,f_review_scores_communication,f_review_scores_location,f_review_scores_value,f_reviews_per_month
0,7677581,9.0,96.0,10.0,10.0,10.0,10.0,10.0,9.0,0.42
1,5831024,28.0,95.0,10.0,10.0,10.0,10.0,10.0,9.0,1.14
2,7292084,1.0,80.0,10.0,8.0,10.0,10.0,8.0,8.0,0.13
3,9674633,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
4,8495779,31.0,92.0,9.0,8.0,10.0,10.0,10.0,9.0,1.6


In [17]:
se.sql("""
    SELECT * 
    FROM accommodation
    LIMIT 10
""").toPandas()

Unnamed: 0,apartment_id,property_type,room_type,accommodates,bathrooms,bedrooms,beds,bed_type,amenities,guests_included,cleaning_fee,square_feet,extra_people
0,17560520,Apartment,Entire home/apt,3,1.0,0,2,Real Bed,"TV,Internet,Wireless Internet,Air conditioning...",1,65.0,,0
1,13168651,Townhouse,Entire home/apt,6,2.5,2,3,Real Bed,"TV,Wireless Internet,Air conditioning,Kitchen,...",1,,,0
2,5495668,House,Entire home/apt,10,3.0,4,6,Real Bed,"TV,Cable TV,Internet,Wireless Internet,Air con...",1,200.0,,0
3,5593531,Apartment,Entire home/apt,6,2.0,3,3,Real Bed,"TV,Cable TV,Internet,Wireless Internet,Air con...",1,85.0,,0
4,347342,House,Entire home/apt,8,2.0,3,3,Real Bed,"TV,Cable TV,Internet,Wireless Internet,Air con...",6,100.0,,25
5,1854638,Apartment,Entire home/apt,2,1.0,1,1,Real Bed,"TV,Cable TV,Internet,Wireless Internet,Air con...",1,,,0
6,7915978,Condominium,Entire home/apt,6,2.0,3,5,Real Bed,"TV,Cable TV,Internet,Wireless Internet,Air con...",1,150.0,,0
7,729648,Apartment,Entire home/apt,4,2.0,2,2,Real Bed,"TV,Internet,Wireless Internet,Air conditioning...",4,75.0,,50
8,4996627,House,Entire home/apt,4,1.0,2,3,Real Bed,"TV,Cable TV,Internet,Wireless Internet,Air con...",1,100.0,,0
9,10993131,Apartment,Entire home/apt,2,1.0,1,1,Real Bed,"TV,Internet,Wireless Internet,Air conditioning...",2,55.0,,100


В accommodation есть много хороших категориальных признаков
Закодируем их чуть позже, сейчас просто оценим масштаб

In [18]:
se.sql("""
SELECT distinct(property_type)
FROM accommodation
""").toPandas()

Unnamed: 0,property_type
0,Heritage hotel (India)
1,Apartment
2,Townhouse
3,Bed & Breakfast
4,Earth House
5,Pension (Korea)
6,Guest suite
7,Timeshare
8,Hut
9,


In [19]:
se.sql("""
SELECT distinct(room_type)
FROM accommodation
""").toPandas()

Unnamed: 0,room_type
0,Shared room
1,
2,Entire home/apt
3,9
4,Private room


In [20]:
se.sql("""
SELECT distinct(bed_type)
FROM accommodation
""").toPandas()

Unnamed: 0,bed_type
0,
1,Airbed
2,Futon
3,Pull-out Sofa
4,Couch
5,9
6,Real Bed


In [21]:
se.sql("""
    SELECT distinct(explode(split(amenities, ',')))
    FROM accommodation
""").toPandas()

Unnamed: 0,col
0,Lock on Bedroom Door
1,Indoor fireplace
2,Wheelchair accessible
3,Private bathroom
4,Private living room
...,...
131,Baby bath
132,Elevator in building
133,Free parking on premises
134,24-hour check-in


#### Программно создаваемые запросы через DataFrame API

Чтобы более гибко контролировать запросы, можно использовать не тестовый способ запуска SQL, а программный через DataFrame API.

Подробнее про то, как составлять такие запросы и какие готовые фукнции уже существуют - на официальном сайте https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html

In [22]:
from pyspark.sql import functions as F

In [23]:
apartment.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- description: string (nullable = true)
 |-- experiences_offered: string (nullable = true)
 |-- notes: string (nullable = true)
 |-- price: string (nullable = true)
 |-- host_id: string (nullable = true)



In [24]:
apartment.select(['id', 'name', 'price']).limit(10).toPandas()

Unnamed: 0,id,name,price
0,8195459,Queensize Bedroom Ensuite Bathroom,60
1,13101666,A charming little end Unit,119
2,16834619,Live by the beach 2!,69
3,10179588,"Great location, min from the beach",50
4,17404601,Cosy Bright Room,45
5,15469718,"Beach lifestyle/ Private Suite, Easy City access.",60
6,9944064,"Light, Bright and Fresh Home",149
7,13776164,Light and Charming QU sized room.,45
8,10440278,Bonbeach Water Fun - *RIVERFRONT + BEACH*,60
9,16802564,休闲度假的好去处！！,55


In [25]:
apartment.select(['id', 'name', 'price']).where(F.col('id') == '14916824').limit(10).toPandas()

Unnamed: 0,id,name,price
0,14916824,Chambre privée centre-ville/Private room downtown,25


In [26]:
amenities_c = se.sql("""
    SELECT distinct(explode(split(lower(amenities), ',')))
    FROM accommodation
""").rdd.map(lambda x: x.col).collect()

In [27]:
amenities_c

['refrigerator',
 'step-free access',
 'stove',
 'wide hallway clearance',
 'path to entrance lit at night',
 'ev charger',
 'wide doorway',
 'grab-rails for shower and toilet',
 'pets allowed',
 'cooking basics',
 'heating',
 'lake access',
 'patio or balcony',
 'wide clearance to shower and toilet',
 'washer / dryer',
 'doorman',
 'private living room',
 'game console',
 'long term stays allowed',
 'buzzer/wireless intercom',
 'coffee maker',
 'pocket wifi',
 'oven',
 'tub with shower bench',
 'host greets you',
 'tv',
 'pets live on this property',
 'garden or backyard',
 'crib',
 'carbon monoxide detector',
 'laptop friendly workspace',
 'hair dryer',
 'dishes and silverware',
 'wireless internet',
 'hangers',
 'pool',
 'kitchen',
 'safety card',
 'extra pillows and blankets',
 'fire extinguisher',
 'table corner guards',
 'family/kid friendly',
 'paid parking off premises',
 'wide clearance to bed',
 'translation missing: en.hosting_amenity_50',
 'indoor fireplace',
 'washer',
 'l

`F.when` проверяет условие и если оно верно то выставляет указанное значение, если не верно, то значение указанное в `otherwise`. 

In [28]:
import string

allowed = set(string.ascii_letters + string.digits + " ,")

def slugify(text):
    if not text:
        return ""
    text = "".join([ch for ch in text.lower() if ch in allowed])
    return text.replace(' ', '_')

In [29]:
f_slugify = se.udf.register("slugify", slugify, "string")

In [30]:
exprs = [
    F.when(
        F.array_contains(F.split(f_slugify('amenities'), ','), amenity),
        1
    )
    .otherwise(0).alias("f_amenity_" + slugify(amenity)) 
    for amenity in amenities_c
]

In [31]:
accommodation.select('apartment_id', *exprs).limit(5).toPandas()

Unnamed: 0,apartment_id,f_amenity_refrigerator,f_amenity_stepfree_access,f_amenity_stove,f_amenity_wide_hallway_clearance,f_amenity_path_to_entrance_lit_at_night,f_amenity_ev_charger,f_amenity_wide_doorway,f_amenity_grabrails_for_shower_and_toilet,f_amenity_pets_allowed,...,f_amenity_bbq_grill,f_amenity_dishwasher,f_amenity_smart_lock,f_amenity_pack_n_playtravel_crib,f_amenity_babysitter_recommendations,f_amenity_essentials,f_amenity_beach_essentials,f_amenity_beachfront,f_amenity_accessibleheight_bed,f_amenity_24hour_checkin
0,17560520,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
1,13168651,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
2,5495668,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
3,5593531,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0
4,347342,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,1,0,0,0,0


In [32]:
accommodation.select('apartment_id', *exprs).limit(1).collect()

[Row(apartment_id='17560520', f_amenity_refrigerator=0, f_amenity_stepfree_access=0, f_amenity_stove=0, f_amenity_wide_hallway_clearance=0, f_amenity_path_to_entrance_lit_at_night=0, f_amenity_ev_charger=0, f_amenity_wide_doorway=0, f_amenity_grabrails_for_shower_and_toilet=0, f_amenity_pets_allowed=0, f_amenity_cooking_basics=0, f_amenity_heating=1, f_amenity_lake_access=0, f_amenity_patio_or_balcony=0, f_amenity_wide_clearance_to_shower_and_toilet=0, f_amenity_washer__dryer=0, f_amenity_doorman=0, f_amenity_private_living_room=0, f_amenity_game_console=0, f_amenity_long_term_stays_allowed=0, f_amenity_buzzerwireless_intercom=0, f_amenity_coffee_maker=0, f_amenity_pocket_wifi=0, f_amenity_oven=0, f_amenity_tub_with_shower_bench=0, f_amenity_host_greets_you=0, f_amenity_tv=1, f_amenity_pets_live_on_this_property=0, f_amenity_garden_or_backyard=0, f_amenity_crib=0, f_amenity_carbon_monoxide_detector=0, f_amenity_laptop_friendly_workspace=0, f_amenity_hair_dryer=0, f_amenity_dishes_and_s

In [33]:
accommodation.select('apartment_id', *exprs).registerTempTable("amenity_features")

In [34]:
bed_types = accommodation.select('bed_type').distinct().rdd.map(lambda x: x.bed_type).collect()

In [35]:
bed_types = [x for x in bed_types if x is not None]

In [36]:
exprs = [
    F.when(
        F.col('bed_type') == btype,
        1
    )
    .otherwise(0).alias("f_bad_type_" + slugify(btype))
    for btype in bed_types
]

In [37]:
accommodation.select('apartment_id', *exprs).limit(5).toPandas()

Unnamed: 0,apartment_id,f_bad_type_airbed,f_bad_type_futon,f_bad_type_pullout_sofa,f_bad_type_couch,f_bad_type_9,f_bad_type_real_bed
0,17560520,0,0,0,0,0,1
1,13168651,0,0,0,0,0,1
2,5495668,0,0,0,0,0,1
3,5593531,0,0,0,0,0,1
4,347342,0,0,0,0,0,1


In [38]:
accommodation.select('apartment_id', *exprs).registerTempTable("bed_type_features")

In [39]:
room_types = accommodation.select('room_type').distinct().rdd.map(lambda x: x.room_type).collect()
room_types = [x for x in room_types if x is not None]
exprs = [
    F.when(
        F.col('bed_type') == btype,
        1
    )
    .otherwise(0).alias("f_room_type_" + slugify(btype))
    for btype in room_types
]
accommodation.select('apartment_id', *exprs).registerTempTable("room_types_features")

In [40]:
property_types = accommodation.select('property_type').distinct().rdd.map(lambda x: x.property_type).collect()
property_types = [x for x in property_types if x is not None]
exprs = [
    F.when(
        F.col('property_type') == btype,
        1
    )
    .otherwise(0).alias("f_property_type_" + slugify(btype))
    for btype in property_types
]
accommodation.select('apartment_id', *exprs).registerTempTable("property_types_features")

In [41]:
se.sql("""
SELECT apartment_id,
       to_number(accommodates) as f_accommodates,
       to_number(bathrooms) as f_bathrooms,
       to_number(bedrooms) as f_bedrooms,
       to_number(beds) as f_beds,
       to_number(guests_included) as f_guests_included,
       to_number(cleaning_fee) as f_cleaning_fee,
       to_number(square_feet) as f_square_feet,
       to_number(extra_people) as f_extra_people
FROM accommodation
""").registerTempTable("accommodation_features")

In [42]:
se.sql("""
SELECT *
FROM accommodation_features
LIMIT 5
""").toPandas()

Unnamed: 0,apartment_id,f_accommodates,f_bathrooms,f_bedrooms,f_beds,f_guests_included,f_cleaning_fee,f_square_feet,f_extra_people
0,17560520,3.0,1.0,0.0,2.0,1.0,65.0,0.0,0.0
1,13168651,6.0,2.5,2.0,3.0,1.0,0.0,0.0,0.0
2,5495668,10.0,3.0,4.0,6.0,1.0,200.0,0.0,0.0
3,5593531,6.0,2.0,3.0,3.0,1.0,85.0,0.0,0.0
4,347342,8.0,2.0,3.0,3.0,6.0,100.0,0.0,25.0


In [43]:
bias = apartment.select('id', F.when(F.col('id') == F.col('id'), 1).alias('f_bias'))

In [44]:
bias.limit(10).toPandas()

Unnamed: 0,id,f_bias
0,17619081,1
1,13356620,1
2,18993602,1
3,4313438,1
4,9029859,1
5,13028880,1
6,17876318,1
7,17151628,1
8,17154878,1
9,15462554,1


In [45]:
bias.registerTempTable("bias")

Чтож, для нашего примера должно быть достаточно. Соберем итоговый датасет.

In [46]:
datadet_df = se.sql("""
SELECT to_number(price) as target, *
FROM
    apartment a
    join hosts_features hf on hf.host_id = a.host_id
    join reviews_features rf on rf.apartment_id = a.id
    join amenity_features af on af.apartment_id = a.id
    join bed_type_features btf on btf.apartment_id = a.id
    join room_types_features rtf on rtf.apartment_id = a.id
    join property_types_features ptf on ptf.apartment_id = a.id
    join accommodation_features accf on accf.apartment_id = a.id
    join bias b on b.id = a.id
WHERE 
    to_number(price) > 0
""")

In [47]:
datadet_df.limit(5).toPandas()

Unnamed: 0,target,id,name,summary,description,experiences_offered,notes,price,host_id,host_id.1,...,f_accommodates,f_bathrooms,f_bedrooms,f_beds,f_guests_included,f_cleaning_fee,f_square_feet,f_extra_people,id.1,f_bias
0,120.0,10013419,Joli appartement parisien,Ce joli appartement au style très parisien se ...,Ce joli appartement au style très parisien se ...,none,,120,45810016,45810016,...,2.0,1.0,1.0,1.0,1.0,0.0,0.0,0.0,10013419,1
1,105.0,10014332,Furnished Studio Apartment,This fully furnished studio apartment in downt...,This fully furnished studio apartment in downt...,none,,105,49875622,49875622,...,3.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,10014332,1
2,105.0,10014332,Furnished Studio Apartment,This fully furnished studio apartment in downt...,This fully furnished studio apartment in downt...,none,,105,49875622,49875622,...,3.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,10014332,1
3,105.0,10014332,Furnished Studio Apartment,This fully furnished studio apartment in downt...,This fully furnished studio apartment in downt...,none,,105,49875622,49875622,...,3.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,10014332,1
4,105.0,10014332,Furnished Studio Apartment,This fully furnished studio apartment in downt...,This fully furnished studio apartment in downt...,none,,105,49875622,49875622,...,3.0,1.0,0.0,1.0,1.0,0.0,0.0,0.0,10014332,1


In [48]:
cols = datadet_df.columns
non_features_c = [
    c for c in cols
    if not (c == 'target' or c.startswith('f_'))
]

In [49]:
non_features_c

['id',
 'name',
 'summary',
 'description',
 'experiences_offered',
 'notes',
 'price',
 'host_id',
 'host_id',
 'apartment_id',
 'apartment_id',
 'apartment_id',
 'apartment_id',
 'apartment_id',
 'apartment_id',
 'id']

In [50]:
datadet_df.drop(*non_features_c).write.parquet("/user/airbnb/parquet/dataset.parquet")

In [53]:
! hdfs dfs -ls /user/airbnb/parquet

Found 5 items
drwxr-xr-x   - ubuntu hadoop          0 2022-02-13 10:05 /user/airbnb/parquet/accommodation.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-02-13 10:05 /user/airbnb/parquet/apartment.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-02-13 11:13 /user/airbnb/parquet/dataset.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-02-13 10:05 /user/airbnb/parquet/hosts.parquet
drwxr-xr-x   - ubuntu hadoop          0 2022-02-13 10:05 /user/airbnb/parquet/reviews.parquet


In [55]:
dataset_fd = se.read.parquet('/user/airbnb/parquet/dataset.parquet')

In [81]:
dataset_fd.select(F.mean('f_host_for').alias('f_host_for_mean'), F.stddev('f_host_for').alias('f_host_for_dev')).limit(5).toPandas()

Unnamed: 0,f_host_for_mean,f_host_for_dev
0,2751.014271,589.692084


In [82]:
f_cols = [f for f in dataset_fd.columns if f.startswith('f')]

In [83]:
exps_mean = [
    F.mean(c).alias('{}_mean'.format(c))
    for c in f_cols
]

exps_dev = [
    F.stddev(c).alias('{}_dev'.format(c))
    for c in f_cols
]

norm_f = dataset_fd.select(*exps_mean, *exps_dev).rdd.take(1)

In [86]:
norm_dict = norm_f[0].asDict()

In [90]:
exps = [
    (
        (F.col(c) - norm_dict["{}_mean".format(c)]) / (1 + norm_dict["{}_dev".format(c)])
    ).alias(c)
    for c in f_cols
]

In [91]:
dataset_fd.select("target", *exps).limit(5).toPandas()

Unnamed: 0,target,f_host_for,f_host_response_rate,f_host_acceptance_rate,f_host_total_listings_count,f_num_of_ver,f_number_of_reviews,f_review_scores_rating,f_review_scores_accuracy,f_review_scores_cleanliness,...,f_property_type_cabin,f_accommodates,f_bathrooms,f_bedrooms,f_beds,f_guests_included,f_cleaning_fee,f_square_feet,f_extra_people,f_bias
0,450.0,0.745652,0.443778,0.0,-0.417513,0.84823,0.016432,0.760037,0.675447,0.509344,...,-0.000815,-0.549206,-0.23215,-0.263258,-0.46241,-0.230703,-0.723646,-0.044524,-0.358428,0.0
1,450.0,0.745652,0.443778,0.0,-0.417513,0.84823,0.016432,0.760037,0.675447,0.509344,...,-0.000815,-0.549206,-0.23215,-0.263258,-0.46241,-0.230703,-0.723646,-0.044524,-0.358428,0.0
2,65.0,-0.381839,-2.746187,0.0,-0.422898,-1.447601,-0.459598,-1.415443,-1.175346,-1.170018,...,-0.000815,-0.549206,-0.23215,-0.263258,-0.46241,-0.230703,-0.723646,-0.044524,-0.358428,0.0
3,300.0,0.225922,-2.746187,0.0,-0.422898,-0.529268,-0.459598,-1.415443,-1.175346,-1.170018,...,-0.000815,0.040298,0.333231,0.702031,0.616622,-0.230703,-0.723646,-0.044524,-0.358428,0.0
4,199.0,-0.403847,0.379979,0.0,4.17602,0.389064,-0.459598,-1.415443,-1.175346,-1.170018,...,-0.000815,0.335051,0.333231,0.219387,-0.102732,-0.230703,0.654134,-0.044524,-0.358428,0.0


In [92]:
train_df, test_df = dataset_fd.select("target", *exps).randomSplit([0.8, 0.2], 432)

In [93]:
train = train_df.rdd.cache()
test = test_df.rdd.cache()

In [94]:
example = train.first()

In [95]:
example

Row(target=1.0, f_host_for=-1.5381100549600453, f_host_response_rate=0.44377826248804075, f_host_acceptance_rate=0.0, f_host_total_listings_count=-0.4228981418743758, f_num_of_ver=1.766562862017864, f_number_of_reviews=-0.4595978544535533, f_review_scores_rating=-1.4154427083019658, f_review_scores_accuracy=-1.1753455217702893, f_review_scores_cleanliness=-1.1700183840705847, f_review_scores_checkin=-1.181573715066312, f_review_scores_communication=-1.1844742381115585, f_review_scores_location=-1.1823447248750683, f_review_scores_value=-1.1650967767715705, f_reviews_per_month=-0.3676457661940611, f_amenity_refrigerator=-0.004012392049239458, f_amenity_stepfree_access=0.0, f_amenity_stove=-0.0038147745972070754, f_amenity_wide_hallway_clearance=0.0, f_amenity_path_to_entrance_lit_at_night=0.0, f_amenity_ev_charger=0.0, f_amenity_wide_doorway=0.0, f_amenity_grabrails_for_shower_and_toilet=0.0, f_amenity_pets_allowed=0.0, f_amenity_cooking_basics=0.0, f_amenity_heating=0.11048469539420204

In [96]:
features_num = len(example.asDict()) - 1

In [97]:
features_num

192

In [98]:
import numpy as np
from functools import partial

#### Broadcasts & Accumulators

Обучать будем обычную линейную регрессию

In [99]:
def compute_gradient(weights_broadcast, loss, example):
    # достаем целевую переменную и признаки из наблюдения
    gradient = np.zeros(len(weights_broadcast.value))
    data = example.asDict()
    
    y = data['target']
    data.pop('target')

    # признаки сортируем по названию для того, чтобы позиции точно не разъезались
    x = np.array([v or 0 for k, v in sorted(data.items(), key=lambda x: x[0])])

    # делаем предсказание с текущими весами
    prediction = x.dot(weights_broadcast.value)

    # считаем градиент на объекте
    gradient = x * (prediction - y) * 2
    
    # считаем потери
    loss.add((prediction - y) ** 2)
    
    return gradient

In [128]:

# Параметры
# learning_rate = 0.1
epochs = 20
l2_lambda = 0.01
l1_lambda = 0.01

np.random.seed(42201)

# Изначальные веса инициализируем случайно
weights = np.random.random(features_num)

N = train.count()

# Цикл по эпохам
for i in range(epochs):
    weights_broadcast = sc.broadcast(weights)  # Эту переменную будет бродкастить на всех воркеров
    loss = sc.accumulator(0.0) # В эту переменную будет частичные лоссы на объектах
    
    # Считаем средний градиент
    gradient = (
        train
        .map(partial(compute_gradient, weights_broadcast, loss))
        .mean()
    )
    
    gradient += 2 * l2_lambda * weights  # L2 регуляризация
    gradient += l1_lambda * np.sign(weights) # L1 регуляризация
    
    learning_rate = 1 / (10 + i)
    
    weights -= learning_rate * gradient
    weights_broadcast.destroy()
    
    print("epoch:", i, "loss:", loss.value / N)

epoch: 0 loss: 45851.77650118528
epoch: 1 loss: 41817.16639498316
epoch: 2 loss: 40123.73287651765
epoch: 3 loss: 39273.24952667046
epoch: 4 loss: 38803.49591898277
epoch: 5 loss: 38521.966452178225
epoch: 6 loss: 38340.91310263563
epoch: 7 loss: 38217.14452153731
epoch: 8 loss: 38127.9960557812
epoch: 9 loss: 38060.85707167186
epoch: 10 loss: 38008.35555717565
epoch: 11 loss: 37965.98468401477
epoch: 12 loss: 37930.876256555704
epoch: 13 loss: 37901.13955003687
epoch: 14 loss: 37875.48482804894
epoch: 15 loss: 37853.01489090475
epoch: 16 loss: 37833.07274083691
epoch: 17 loss: 37815.179198545615
epoch: 18 loss: 37798.97366126823
epoch: 19 loss: 37784.1795127419


In [129]:
weights

array([ 3.26259699e+01,  8.69178753e-01,  5.00485024e-01,  1.55384138e-01,
        9.49625098e-01,  3.35830936e-01,  9.56483471e-01,  5.52754474e-01,
        2.15522977e-01, -5.83638084e-01,  4.31184228e-01,  2.82142498e-02,
        2.17031368e-01,  7.92694344e-01, -2.57221019e+00,  9.58887210e-05,
        3.90479400e-01,  3.37967895e-04,  6.83137627e-02,  9.66127438e-01,
        8.77287107e-01,  9.13806583e-01,  3.37733782e-01,  8.38307405e-01,
        2.60520312e-01,  4.77569254e-01,  9.02005582e-01,  6.94907100e-01,
        5.81655106e-01,  1.28448554e-01,  3.72805761e+00,  4.61026522e-01,
        1.00287166e+01,  1.44016559e-01,  2.06034115e+00,  5.67573588e-03,
        1.73892231e-01,  6.74507631e-01,  6.18557371e-01,  7.96435853e-01,
        2.53048317e-01,  7.43124966e-01,  6.60477549e-01,  3.87806443e-02,
        2.71745536e-01,  7.05997979e-01,  1.07969078e-01,  7.38323900e-01,
        8.49797789e-01,  1.77267877e-01,  3.27465765e+00,  5.70744496e-01,
        2.30344198e-01, -

In [130]:
def calc_ss_res(weights_broadcast, example):
    # достаем целевую переменную и признаки из наблюдения
    gradient = np.zeros(len(weights_broadcast.value))
    data = example.asDict()
    
    y = data['target']
    data.pop('target')

    # признаки сортируем по названию для того, чтобы позиции точно не разъезались
    x = np.array([v or 0 for k, v in sorted(data.items(), key=lambda x: x[0])])

    # делаем предсказание с текущими весами
    prediction = x.dot(weights_broadcast.value)
    return (y - prediction) ** 2

In [131]:
weights_broadcast = sc.broadcast(weights)

y_avg = train.map(lambda x: x.target).mean()
ss_tot = train.map(lambda x: (x.target - y_avg) ** 2).sum()
ss_res = train.map(partial(calc_ss_res, weights_broadcast)).sum()

r2_score = 1 - ss_res / ss_tot
print(r2_score)

-0.7401309353981409


In [132]:
weights_broadcast = sc.broadcast(weights)

y_avg = test.map(lambda x: x.target).mean()
ss_tot = test.map(lambda x: (x.target - y_avg) ** 2).sum()
ss_res = test.map(partial(calc_ss_res, weights_broadcast)).sum()

r2_score = 1 - ss_res / ss_tot
print(r2_score)

-0.7330101852080344


Ну что можно сказать 

<img src="https://i.pinimg.com/originals/bb/0e/c3/bb0ec3d5987cff5c6bf8699b0d4e53b2.jpg" width="300">

Похоже, что задача не решается простой линейкой и для реального качества требуется поквахтать

Но что нужно отметить:

* Лосс падает!
* Решение довольно неплохо масштабируется - фактически мы можем обработать датасет произвольного размера
* Мы попробовали кучу всякий крутых примочек, которые есть в Spark

### Вы жжете бабло

<img src="https://grizzle.com/wp-content/uploads/2020/01/money-cash-fire-1200x900.png" width="300">

Напоминаю, выключайте ресурсы. Они жрут ваши деньги.