In [1]:
%reload_ext autoreload
%autoreload 2

In [2]:
import json

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split

pd.set_option("display.precision", 3)

from time import time

In [None]:
pandas_data = pd.read_csv("assets/AB_NYC_2019.csv")
print(f"Суммарное кол-во строк: {len(pandas_data)}")

pandas_data_train, pandas_data = train_test_split(
    pandas_data, test_size=0.3, random_state=0
)
print(f"Train: {len(pandas_data_train)}, Test: {len(pandas_data)}")

Суммарное кол-во строк: 48895
Train: 34226, Test: 14669


In [None]:
with open("assets/final_neighbourhoods.txt", "r") as file:
    final_neighbourhoods = file.readlines()
final_neighbourhoods = [s.strip() for s in final_neighbourhoods]

print(final_neighbourhoods)

['Williamsburg', "Hell's Kitchen", 'small discricts in Brooklyn', 'Bushwick', 'Upper West Side', 'small discricts in Queens', 'East Village', 'Midtown', 'small discricts in Manhattan', 'Crown Heights', 'Bedford-Stuyvesant', 'Lower East Side', 'Harlem', 'Greenpoint', 'East Harlem', 'Financial District', 'Astoria', 'small discricts in Staten Island', 'small discricts in Bronx', 'Upper East Side', 'Washington Heights', 'Chelsea', 'West Village']


In [5]:
def categorize_review_recency(days):
    if pd.isna(days):
        return "No reviews"
    elif days <= 30:
        return "Last month"
    elif days <= 90:
        return "Last quarter"
    elif days <= 365:
        return "Last year"
    else:
        return "Over a year ago"

In [None]:
# Информация от энкодера/скейлера
with open("assets/preprocessing_info.json", "r") as f:
    preprocessing_info = json.load(f)

## PySpark

In [7]:
import os
import sys

os.environ["PYSPARK_PYTHON"] = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = sys.executable

sys.path.append(os.path.join(os.getenv("SPARK_HOME"), "python", "lib"))

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions", "1")

import pyspark.pandas as ps

ps.set_option("compute.ops_on_diff_frames", True)



In [9]:
data = spark.createDataFrame(pandas_data)

In [10]:
data.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: long (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: long (nullable = true)
 |-- minimum_nights: long (nullable = true)
 |-- number_of_reviews: long (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: long (nullable = true)
 |-- availability_365: long (nullable = true)



In [11]:
data.show()

+--------+--------------------+---------+-----------------+-------------------+------------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|      id|                name|  host_id|        host_name|neighbourhood_group|     neighbourhood|latitude|longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|
+--------+--------------------+---------+-----------------+-------------------+------------------+--------+---------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+
|33893655|Studio in doorman...|138798990|            Jonas|          Manhattan|           Tribeca| 40.7243| -74.0111|Entire home/apt|  225|             3|                0|        NaN|              NaN|                             1|              

In [12]:
df = data.pandas_api()
df.head(5)

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,33893655,Studio in doorman building in Tribeca,138798990,Jonas,Manhattan,Tribeca,40.724,-74.011,Entire home/apt,225,3,0,,,1,42
1,25798461,Large 2 bedroom downtown Loft Apartment,195803,Zinnia,Manhattan,NoHo,40.726,-73.993,Entire home/apt,649,1,5,2018-11-03,0.4,1,75
2,20213045,Spacious and Modern 2 Bed/2.5 Bath Dream Townh...,2678122,Tasha,Brooklyn,Williamsburg,40.717,-73.95,Entire home/apt,300,5,5,2019-06-12,0.35,3,31
3,28670432,X 20-203,115993835,Shimin,Brooklyn,Sunset Park,40.64,-74.008,Private room,26,1,13,2019-04-12,1.36,5,141
4,13920697,Brownstone apt in Bklyn w/ gorgeous natural light,29513490,Whitney,Brooklyn,Bedford-Stuyvesant,40.684,-73.933,Entire home/apt,125,2,4,2017-01-02,0.12,1,0


### Preprocessing

In [None]:
t1 = time()
df = df.drop(["id", "name", "host_id", "host_name"], axis=1)


data_in_small_disctricts = ~df["neighbourhood"].isin(final_neighbourhoods)
df.loc[data_in_small_disctricts, "neighbourhood"] = (
    "small discricts in " + df.loc[data_in_small_disctricts, "neighbourhood_group"]
)


df["last_review"] = ps.to_datetime(
    df["last_review"], format="%Y-%m-%d", errors="coerce"
)
reference_date = df["last_review"].max()
df["days_since_review"] = (reference_date - df["last_review"]) / 60 / 60 / 24
df["review_recency"] = df["days_since_review"].apply(categorize_review_recency)

df = df.drop(["reviews_per_month", "days_since_review", "last_review"], axis=1)


df["hosts_multiple_apts"] = (df["calculated_host_listings_count"] > 1).astype(np.int8)
df["availability_365"] = (df["availability_365"] > 0).astype(np.int8)

df = df.drop(["calculated_host_listings_count", "minimum_nights"], axis=1)


for col, column_actions in preprocessing_info.items():
    if "encoder" in column_actions.keys():
        df[col] = df[col].map(column_actions["encoder"]).astype(np.int32)
    if "scaler" in column_actions.keys():
        df[col] = (df[col] - column_actions["scaler"]["min"]) / (
            column_actions["scaler"]["max"] - column_actions["scaler"]["min"]
        )

t2 = time()



In [14]:
t2 - t1

36.78664445877075

In [15]:
df.describe()

Unnamed: 0,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,number_of_reviews,availability_365,review_recency,hosts_multiple_apts
count,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0
mean,0.418,0.577,0.555,0.55,0.253,153.184,0.036,0.639,0.457,0.342
std,0.185,0.318,0.133,0.087,0.273,230.501,0.069,0.48,0.386,0.474
min,0.0,0.0,0.016,0.01,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.25,0.273,0.461,0.492,0.0,68.0,0.002,0.0,0.0,0.0
50%,0.5,0.636,0.54,0.544,0.0,105.0,0.008,1.0,0.5,0.0
75%,0.5,0.864,0.638,0.58,0.5,175.0,0.037,1.0,0.75,1.0
max,1.0,1.0,0.997,0.974,1.0,10000.0,0.863,1.0,1.0,1.0


## Pandas

In [None]:
t3 = time()
pandas_data = pandas_data.drop(["id", "name", "host_id", "host_name"], axis=1)

data_in_small_disctricts = ~pandas_data["neighbourhood"].isin(final_neighbourhoods)
pandas_data.loc[data_in_small_disctricts, "neighbourhood"] = (
    "small discricts in "
    + pandas_data.loc[data_in_small_disctricts, "neighbourhood_group"]
)


pandas_data["last_review"] = pd.to_datetime(
    pandas_data["last_review"], format="%Y-%m-%d", errors="coerce"
)
reference_date = pandas_data["last_review"].max()
pandas_data["days_since_review"] = (reference_date - pandas_data["last_review"]).dt.days
pandas_data["review_recency"] = pandas_data["days_since_review"].apply(
    categorize_review_recency
)

pandas_data = pandas_data.drop(
    ["reviews_per_month", "days_since_review", "last_review"], axis=1
)


pandas_data["hosts_multiple_apts"] = (
    pandas_data["calculated_host_listings_count"] > 1
).astype(np.int8)
pandas_data["availability_365"] = (pandas_data["availability_365"] > 0).astype(np.int8)

pandas_data = pandas_data.drop(
    ["calculated_host_listings_count", "minimum_nights"], axis=1
)


for col, column_actions in preprocessing_info.items():
    if "encoder" in column_actions.keys():
        pandas_data[col] = (
            pandas_data[col].map(column_actions["encoder"]).astype(np.int32)
        )
    if "scaler" in column_actions.keys():
        pandas_data[col] = (pandas_data[col] - column_actions["scaler"]["min"]) / (
            column_actions["scaler"]["max"] - column_actions["scaler"]["min"]
        )

t4 = time()

In [17]:
t4 - t3

0.03197002410888672

In [18]:
pandas_data.describe()

Unnamed: 0,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,number_of_reviews,availability_365,review_recency,hosts_multiple_apts
count,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0,14669.0
mean,0.418,0.577,0.555,0.55,0.253,153.184,0.036,0.639,0.457,0.342
std,0.185,0.318,0.133,0.087,0.273,230.501,0.069,0.48,0.386,0.474
min,0.0,0.0,0.016,0.01,0.0,0.0,0.0,0.0,0.0,0.0
25%,0.25,0.273,0.461,0.492,0.0,68.0,0.002,0.0,0.0,0.0
50%,0.5,0.636,0.54,0.544,0.0,105.0,0.008,1.0,0.5,0.0
75%,0.5,0.864,0.638,0.58,0.5,175.0,0.037,1.0,0.75,1.0
max,1.0,1.0,0.997,0.974,1.0,10000.0,0.863,1.0,1.0,1.0


## Сравнение

In [23]:
pyspark_back2pandas = df.to_pandas()
pyspark_back2pandas.index = pandas_data.index



In [25]:
pandas_data.compare(pyspark_back2pandas)

In [28]:
print(f"PySpark time: {t2 - t1:.3f} s")
print(f"Pandas time: {t4 - t3:.3f} s")

PySpark time: 36.787 s
Pandas time: 0.032 s


Результаты обработки полностью совпадают, однако на малом объеме данных преимущества PySpark нивелируются и локальная обработка на Pandas быстрее примерно в 1000 раз.